00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include <Basics.H>
00030 #include <VestaConfig.H>
00031
00032 #include "SRPC.H"
00033 #include "SRPC_impl.H"
00034 #include "chars_seq_private.H"
00035 #include "int_seq_private.H"
00036
00037
00038
00039
00040
00041 SRPC::SRPC(SRPC::which_end me, const Text &interface, const Text &hostname)
00042 throw (SRPC::failure)
00043 {
00044 try {
00045 p = NEW_CONSTR(SRPC_impl, (me == caller));
00046 } catch (TCP_sock::failure f) { throw(convert_TCP_failure(f)); }
00047
00048 try {
00049 try {
00050 if (p->is_caller) {
00051 if (hostname.Empty())
00052 p->throw_failure(not_implemented,
00053 "hostname may not be defaulted", true);
00054 p->s = NEW(TCP_sock);
00055 p->s->connect_to(hostname, interface);
00056
00057 p->s->enable_keepalive();
00058 } else {
00059
00060
00061 u_short port = TCP_sock::parse_port(interface);
00062 p->listener = SRPC_listener::create(port);
00063 }
00064 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00065 } catch (SRPC::failure f) { delete p; throw(f); }
00066 }
00067
00068 SRPC::SRPC(SRPC::which_end me, TCP_sock *sock) throw (SRPC::failure)
00069 {
00070 try {
00071 p = NEW_CONSTR(SRPC_impl, (me == caller));
00072 } catch (TCP_sock::failure f) { throw(convert_TCP_failure(f)); }
00073
00074 try {
00075 try {
00076 if (p->is_caller)
00077 p->s = sock;
00078 else {
00079
00080
00081 sockaddr_in addr;
00082 sock->get_local_addr(addr);
00083 p->listener = SRPC_listener::create(addr.sin_port, sock);
00084 }
00085 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00086 } catch (SRPC::failure f) { delete p; throw(f); }
00087 }
00088
00089 SRPC::~SRPC() throw(SRPC::failure)
00090 {
00091 if (!(p->state == SRPC_impl::initial || p->state == SRPC_impl::failed)) {
00092 try {
00093 send_failure(partner_went_away, "Partner destroyed its SRPC object");
00094 } catch (...) { }
00095 }
00096 delete p;
00097 }
00098
00099 Text SRPC::this_host() throw(SRPC::failure)
00100 {
00101 try {
00102 return TCP_sock::this_host();
00103 } catch (TCP_sock::failure f) { throw(convert_TCP_failure(f)); }
00104
00105 }
00106
00107 void SRPC::split_name(const Text &host_and_port, Text &host, Text &port)
00108 {
00109 int i = host_and_port.FindCharR(':');
00110 if (i < 0) { host = host_and_port; port = ""; }
00111 else { host = host_and_port.Sub(0, i); port = host_and_port.Sub(i+1); }
00112 }
00113
00114
00115
00116
00117
00118
00119 bool SRPC::previous_failure(SRPC::failure *f) throw()
00120 {
00121 if (p->state != SRPC_impl::failed) return false;
00122 if (f != NULL) *f = p->f;
00123 return true;
00124 }
00125
00126 bool SRPC::alive(SRPC::failure *f) throw()
00127 {
00128 if (previous_failure(f)) return false;
00129 try {
00130 return socket()->alive();
00131 } catch (TCP_sock::failure why) {
00132 if (f != NULL) *f = SRPC::convert_TCP_failure(why);
00133 return false;
00134 };
00135 }
00136
00137 TCP_sock *SRPC::socket() throw(SRPC::failure) {
00138 return p->s;
00139 }
00140
00141 Text SRPC::local_socket() throw(SRPC::failure)
00142 {
00143 TCP_sock *sock = socket();
00144 sockaddr_in me;
00145 Text t;
00146 char a[15+1+5+1];
00147 if (sock == NULL) return "";
00148 try {
00149 sock->get_local_addr(me);
00150 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00151 t = inet_ntoa_r(me.sin_addr);
00152 sprintf(a, "%s:%d", t.cchars(), ntohs(me.sin_port));
00153 return Text(a);
00154 }
00155
00156 Text SRPC::remote_socket() throw(SRPC::failure)
00157 {
00158 TCP_sock *sock = socket();
00159 sockaddr_in him;
00160 Text t;
00161 char a[15+1+5+1];
00162 if (sock == NULL) return "";
00163 try {
00164 sock->get_remote_addr(him);
00165 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00166 t = inet_ntoa_r(him.sin_addr);
00167 sprintf(a, "%s:%d", t.cchars(), ntohs(him.sin_port));
00168 return Text(a);
00169 }
00170
00171
00172
00173
00174
00175 void SRPC::enable_read_timeout(unsigned int seconds,
00176 bool use_between_calls)
00177 throw ()
00178 {
00179 p->use_read_timeout = true;
00180 p->use_read_timeout_between_calls = use_between_calls;
00181 p->read_timeout_secs = seconds;
00182 }
00183
00184 void SRPC::disable_read_timeout() throw ()
00185 {
00186 p->use_read_timeout = false;
00187 }
00188
00189 bool SRPC::get_read_timeout( unsigned int &seconds,
00190 bool &use_between_calls) throw()
00191 {
00192 if(p->use_read_timeout)
00193 {
00194 seconds = p->read_timeout_secs;
00195 use_between_calls = p->use_read_timeout_between_calls;
00196 }
00197 return p->use_read_timeout;
00198 }
00199
00200
00201
00202
00203
00204 void SRPC::start_call(int proc_id, int intf_version) throw(SRPC::failure)
00205 {
00206 p->check_state(SRPC_impl::op_start_call);
00207
00208 try {
00209 p->send_item_code(SRPC_impl::ic_start_call);
00210 p->send_int32(proc_id);
00211 p->send_int32(intf_version);
00212 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00213
00214 p->state = SRPC_impl::ready;
00215 }
00216
00217 void SRPC::await_call(int &proc_id, int &intf_version) throw(SRPC::failure)
00218 {
00219 int remote_proc_id;
00220 int remote_intf_version;
00221
00222 p->check_state(SRPC_impl::op_await_call);
00223
00224 try {
00225 item_code ic;
00226 ic = p->read_item_code();
00227 if (ic != SRPC_impl::ic_start_call) {
00228
00229 p->buffered_ic = ic;
00230 remote_proc_id = default_proc_id;
00231 remote_intf_version = default_intf_version;
00232 } else {
00233 remote_proc_id = p->recv_int32();
00234 remote_intf_version = p->recv_int32();
00235 }
00236 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00237
00238
00239 if (intf_version == default_intf_version)
00240 intf_version = remote_intf_version;
00241 else if (intf_version != remote_intf_version)
00242 p->throw_failure(version_skew,
00243 "start_call/await_call disagree on intf_version");
00244
00245
00246 if (proc_id == default_proc_id)
00247 proc_id = remote_proc_id;
00248 else if (proc_id != remote_proc_id)
00249 p->throw_failure(version_skew,
00250 "start_call/await_call disagree on proc_id");
00251
00252 p->state = SRPC_impl::ready;
00253 }
00254
00255
00256
00257
00258
00259
00260 void SRPC::send_int32(int i) throw(SRPC::failure)
00261 {
00262 p->check_state(SRPC_impl::op_send_int32);
00263
00264 try {
00265 p->send_item_code(SRPC_impl::ic_int32);
00266 p->send_int32(i);
00267 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00268
00269 p->ensure_sending_state();
00270 }
00271
00272 int SRPC::recv_int32(bool *got_end) throw(SRPC::failure)
00273 {
00274 int result;
00275 p->check_state(SRPC_impl::op_recv_int32);
00276
00277 try {
00278 p->recv_item_code(SRPC_impl::ic_int32, got_end);
00279 result = (got_end != NULL && *got_end) ? 0 : p->recv_int32();
00280 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00281
00282 p->ensure_recving_state();
00283
00284 return result;
00285 }
00286
00287 void SRPC::send_int16(Basics::int16 i) throw(SRPC::failure)
00288 {
00289 p->check_state(SRPC_impl::op_send_int16);
00290
00291 try {
00292 p->send_item_code(SRPC_impl::ic_int16);
00293 p->send_int16(i);
00294 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00295
00296 p->ensure_sending_state();
00297 }
00298
00299 Basics::int16 SRPC::recv_int16(bool *got_end) throw(SRPC::failure)
00300 {
00301 Basics::int16 result;
00302 p->check_state(SRPC_impl::op_recv_int16);
00303
00304 try {
00305 p->recv_item_code(SRPC_impl::ic_int16, got_end);
00306 result = (got_end != NULL && *got_end) ? 0 : p->recv_int16();
00307 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00308
00309 p->ensure_recving_state();
00310
00311 return result;
00312 }
00313
00314 void SRPC::send_int64(Basics::int64 i) throw(SRPC::failure)
00315 {
00316 p->check_state(SRPC_impl::op_send_int64);
00317
00318 try {
00319 p->send_item_code(SRPC_impl::ic_int64);
00320 p->send_int64(i);
00321 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00322
00323 p->ensure_sending_state();
00324 }
00325
00326 Basics::int64 SRPC::recv_int64(bool *got_end) throw(SRPC::failure)
00327 {
00328 Basics::int64 result;
00329 p->check_state(SRPC_impl::op_recv_int64);
00330
00331 try {
00332 p->recv_item_code(SRPC_impl::ic_int64, got_end);
00333 result = (got_end != NULL && *got_end) ? 0 : p->recv_int64();
00334 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00335
00336 p->ensure_recving_state();
00337
00338 return result;
00339 }
00340
00341 void SRPC::send_bool(bool b) throw (SRPC::failure)
00342 {
00343 p->check_state(SRPC_impl::op_send_bool);
00344
00345 try {
00346 p->send_item_code(b ? SRPC_impl::ic_bool_true : SRPC_impl::ic_bool_false);
00347 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00348
00349 p->ensure_sending_state();
00350 }
00351
00352 bool SRPC::recv_bool(bool *got_end) throw(SRPC::failure)
00353 {
00354 bool result = false;
00355 p->check_state(SRPC_impl::op_recv_bool);
00356
00357 try {
00358 SRPC_impl::item_code ic = p->recv_item_code(SRPC_impl::ic_bool_true,
00359 SRPC_impl::ic_bool_false,
00360 got_end);
00361 if(got_end != NULL && *got_end)
00362 {
00363 result = false;
00364 }
00365 else if(ic == SRPC_impl::ic_bool_true)
00366 {
00367 result = true;
00368 }
00369 else if(ic == SRPC_impl::ic_bool_false)
00370 {
00371 result = false;
00372 }
00373 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00374
00375 p->ensure_recving_state();
00376
00377 return result;
00378 }
00379
00380
00381
00382
00383
00384 void SRPC::send_chars(const char *s) throw(SRPC::failure)
00385 {
00386 p->check_state(SRPC_impl::op_send_chars);
00387
00388 try {
00389 p->send_item_code(SRPC_impl::ic_chars);
00390 p->send_bytes(s, s == NULL ? 0 : strlen(s));
00391 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00392
00393 p->ensure_sending_state();
00394 }
00395
00396 char *SRPC::recv_chars(bool *got_end) throw(SRPC::failure)
00397 {
00398 int len;
00399 char *str;
00400
00401 p->check_state(SRPC_impl::op_recv_chars);
00402
00403 try {
00404 p->recv_item_code(SRPC_impl::ic_chars, got_end);
00405 if (got_end != NULL && *got_end) str = NULL;
00406 else {
00407 len = p->recv_int32();
00408
00409 if (len < 0)
00410 p->throw_failure(internal_trouble, "Malformed string length");
00411
00412 str = NEW_PTRFREE_ARRAY(char, len+1);
00413 p->recv_bytes(str, len);
00414 str[len] = '\0';
00415 }
00416
00417 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00418
00419 p->ensure_recving_state();
00420 return str;
00421 }
00422
00423 char *SRPC::recv_chars(char *buff, int buff_len, bool *got_end)
00424 throw(SRPC::failure)
00425 {
00426 int len;
00427 char *str;
00428
00429 p->check_state(SRPC_impl::op_recv_chars);
00430
00431 try {
00432 p->recv_item_code(SRPC_impl::ic_chars, got_end);
00433 if (got_end != NULL && *got_end) str = NULL;
00434 else {
00435 len = p->recv_int32();
00436
00437 if (len < 0)
00438 p->throw_failure(internal_trouble, "Malformed string length");
00439
00440 if (len < buff_len)
00441 str = buff;
00442 else
00443
00444 str = NEW_PTRFREE_ARRAY(char, len+1);
00445 p->recv_bytes(str, len);
00446 str[len] = '\0';
00447 }
00448
00449 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00450
00451 p->ensure_recving_state();
00452 return str;
00453 }
00454
00455 void SRPC::recv_chars_here(char *buffer, int &len, bool *got_end)
00456 throw(SRPC::failure)
00457 {
00458 p->check_state(SRPC_impl::op_recv_chars_here);
00459
00460 try {
00461 int incoming_len;
00462
00463 p->recv_item_code(SRPC_impl::ic_chars, got_end);
00464 if (got_end != NULL && *got_end) len = 0;
00465 else {
00466 incoming_len = p->recv_int32();
00467 if (incoming_len + 1 > len)
00468 p->throw_failure(buffer_too_small, "Buffer too small");
00469
00470 p->recv_bytes(buffer, incoming_len);
00471 buffer[incoming_len] = '\0';
00472 len = incoming_len;
00473 }
00474 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00475
00476 p->ensure_recving_state();
00477 }
00478
00479
00480
00481
00482
00483 void SRPC::send_Text(const Text &t) throw(SRPC::failure)
00484 {
00485 p->check_state(SRPC_impl::op_send_text);
00486
00487 try {
00488 p->send_item_code(SRPC_impl::ic_text);
00489 char *s = t.chars();
00490 p->send_bytes(s, strlen(s));
00491 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00492
00493 p->ensure_sending_state();
00494 }
00495
00496 void SRPC::recv_Text( Text &t, bool *got_end) throw(SRPC::failure)
00497 {
00498 p->check_state(SRPC_impl::op_recv_text);
00499
00500 try {
00501 p->recv_item_code(SRPC_impl::ic_text, got_end);
00502 if (got_end != NULL && *got_end) {
00503 t.Init("");
00504 } else {
00505 int len = p->recv_int32();
00506 if (len < 0) p->throw_failure(internal_trouble, "Malformed Text length");
00507 char *str = NEW_PTRFREE_ARRAY(char, len+1);
00508 p->recv_bytes(str, len);
00509 str[len] = '\0';
00510 t.Init(str);
00511 }
00512
00513 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00514
00515 p->ensure_recving_state();
00516 }
00517
00518
00519
00520
00521
00522 void SRPC::send_bytes(const char *buffer, int len) throw(SRPC::failure)
00523 {
00524 p->check_state(SRPC_impl::op_send_bytes);
00525
00526 try {
00527 p->send_item_code(SRPC_impl::ic_bytes);
00528 p->send_bytes(buffer, len);
00529 } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00530
00531 p->ensure_sending_state();
00532 }
00533
00534 char *SRPC::recv_bytes(int &len, bool *got_end) throw(SRPC::failure)
00535 {
00536 char *buffer;
00537
00538 p->check_state(SRPC_impl::op_recv_chars);
00539
00540 try {
00541 p->recv_item_code(SRPC_impl::ic_bytes, got_end);
00542 if (got_end != NULL && *got_end) buffer = NULL;
00543 else {
00544 len = p->recv_int32();
00545
00546 if (len < 0)
00547 p->throw_failure(internal_trouble, "Malformed string length");
00548
00549 buffer = NEW_PTRFREE_ARRAY(char, len);
00550 p->recv_bytes(buffer, len);
00551 }
00552 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00553
00554 p->ensure_recving_state();
00555 return buffer;
00556 }
00557
00558 void SRPC::recv_bytes_here(char *buffer, int &len, bool *got_end)
00559 throw(SRPC::failure)
00560 {
00561 p->check_state(SRPC_impl::op_recv_bytes_here);
00562
00563 try {
00564 int incoming_len;
00565
00566 p->recv_item_code(SRPC_impl::ic_bytes, got_end);
00567 if (got_end != NULL && *got_end) len = 0;
00568 else {
00569 incoming_len = p->recv_int32();
00570 if (incoming_len > len)
00571 p->throw_failure(buffer_too_small, "Buffer too small");
00572
00573 p->recv_bytes(buffer, incoming_len);
00574 len = incoming_len;
00575 }
00576 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00577
00578 p->ensure_recving_state();
00579 }
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592 static sockaddr_in dummy_sock;
00593 const int bytes_for_socket =
00594 sizeof(dummy_sock.sin_addr)+sizeof(dummy_sock.sin_port);
00595
00596 void SRPC::send_socket(sockaddr_in &sock) throw(SRPC::failure)
00597 {
00598 char buff[bytes_for_socket];
00599 p->check_state(SRPC_impl::op_send_socket);
00600
00601 memcpy(buff, (char *) &sock.sin_addr, sizeof(sock.sin_addr));
00602 memcpy(buff + sizeof(sock.sin_addr), (char *) &sock.sin_port,
00603 sizeof(sock.sin_port));
00604 try {
00605 p->send_item_code(SRPC_impl::ic_socket);
00606 p->send_bytes(buff, sizeof(buff));
00607 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00608
00609 p->ensure_sending_state();
00610 }
00611
00612 sockaddr_in SRPC::recv_socket(bool *got_end) throw(SRPC::failure)
00613 {
00614 char buff[bytes_for_socket];
00615 sockaddr_in sock;
00616
00617 p->check_state(SRPC_impl::op_recv_socket);
00618
00619 memset((char *)&sock, 0, sizeof(sock));
00620 try {
00621 p->recv_item_code(SRPC_impl::ic_socket, got_end);
00622 if (got_end == NULL || !*got_end) {
00623 int incoming_len = p->recv_int32();
00624
00625 if (incoming_len != bytes_for_socket)
00626 p->throw_failure(internal_trouble, "Malformed socket");
00627
00628 p->recv_bytes(buff, incoming_len);
00629
00630 sock.sin_family = AF_INET;
00631 memcpy((char *) &sock.sin_addr, buff, sizeof(sock.sin_addr));
00632 memcpy((char *) &sock.sin_port, buff + sizeof(sock.sin_addr),
00633 sizeof(sock.sin_port));
00634 }
00635 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00636
00637 p->ensure_recving_state();
00638 return sock;
00639 }
00640
00641
00642
00643
00644
00645
00646 void SRPC::send_int_seq(int_seq &is) throw(SRPC::failure)
00647 {
00648 p->check_state(SRPC_impl::op_send_int32_seq);
00649
00650 try {
00651
00652 if (is.p == NULL) is.p = int_seq_impl::allocate_buffer();
00653
00654 int n = is.length();
00655 p->send_item_code(SRPC_impl::ic_int32_seq);
00656 p->send_int32(n);
00657 for (int i = 0; i < n; i++) p->send_int32(is[i]);
00658 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00659
00660 p->ensure_sending_state();
00661 }
00662
00663 void SRPC::recv_int_seq(int_seq &is) throw (SRPC::failure)
00664 {
00665 p->check_state(SRPC_impl::op_recv_int32_seq);
00666
00667 try {
00668 int n;
00669 int *seq;
00670
00671 p->recv_item_code(SRPC_impl::ic_int32_seq);
00672 n = p->recv_int32();
00673
00674
00675 if (is.p == NULL) is.p = int_seq_impl::allocate_buffer(n);
00676 else is.lengthen(n);
00677 seq = &is.p->base;
00678 is.p->h.len = n;
00679 for (int i = 0; i < n; i++) seq[i] = p->recv_int32();
00680
00681 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00682
00683 p->ensure_recving_state();
00684 }
00685
00686
00687
00688
00689
00690 void SRPC::send_int16_array(const Basics::int16* seq, int len)
00691 throw (SRPC::failure)
00692 {
00693 p->check_state(SRPC_impl::op_send_int16_array);
00694
00695 try {
00696 p->send_item_code(SRPC_impl::ic_int16_array);
00697 p->send_int32(len);
00698 for (int i = 0; i < len; i++) p->send_int16(seq[i]);
00699 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00700
00701 p->ensure_sending_state();
00702 }
00703
00704 Basics::int16* SRPC::recv_int16_array( int &len)
00705 throw (SRPC::failure)
00706 {
00707 p->check_state(SRPC_impl::op_recv_int16_array);
00708
00709 Basics::int16 *seq;
00710 try {
00711 p->recv_item_code(SRPC_impl::ic_int16_array);
00712 len = p->recv_int32();
00713 seq = NEW_PTRFREE_ARRAY(Basics::int16, len);
00714 for (int i = 0; i < len; i++) seq[i] = p->recv_int16();
00715 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00716
00717 p->ensure_recving_state();
00718 return seq;
00719 }
00720
00721
00722
00723
00724
00725 void SRPC::send_int32_array(const Basics::int32* seq, int len)
00726 throw (SRPC::failure)
00727 {
00728 p->check_state(SRPC_impl::op_send_int32_array);
00729
00730 try {
00731 p->send_item_code(SRPC_impl::ic_int32_array);
00732 p->send_int32(len);
00733 for (int i = 0; i < len; i++) p->send_int32(seq[i]);
00734 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00735
00736 p->ensure_sending_state();
00737 }
00738
00739 Basics::int32* SRPC::recv_int32_array( int &len)
00740 throw (SRPC::failure)
00741 {
00742 p->check_state(SRPC_impl::op_recv_int32_array);
00743
00744 Basics::int32 *seq;
00745 try {
00746 p->recv_item_code(SRPC_impl::ic_int32_array);
00747 len = p->recv_int32();
00748 seq = NEW_PTRFREE_ARRAY(Basics::int32, len);
00749 for (int i = 0; i < len; i++) seq[i] = p->recv_int32();
00750 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00751
00752 p->ensure_recving_state();
00753 return seq;
00754 }
00755
00756
00757
00758
00759
00760 void SRPC::send_int64_array(const Basics::int64* seq, int len)
00761 throw (SRPC::failure)
00762 {
00763 p->check_state(SRPC_impl::op_send_int64_array);
00764
00765 try {
00766 p->send_item_code(SRPC_impl::ic_int64_array);
00767 p->send_int32(len);
00768 for (int i = 0; i < len; i++) p->send_int64(seq[i]);
00769 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00770
00771 p->ensure_sending_state();
00772 }
00773
00774 Basics::int64* SRPC::recv_int64_array( int &len)
00775 throw (SRPC::failure)
00776 {
00777 p->check_state(SRPC_impl::op_recv_int64_array);
00778
00779 Basics::int64 *seq;
00780 try {
00781 p->recv_item_code(SRPC_impl::ic_int64_array);
00782 len = p->recv_int32();
00783 seq = NEW_PTRFREE_ARRAY(Basics::int64, len);
00784 for (int i = 0; i < len; i++) seq[i] = p->recv_int64();
00785 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00786
00787 p->ensure_recving_state();
00788 return seq;
00789 }
00790
00791
00792
00793
00794
00795 void SRPC::send_chars_seq(chars_seq &cs) throw(SRPC::failure)
00796 {
00797 p->check_state(SRPC_impl::op_send_chars_seq);
00798
00799 try {
00800
00801 if (cs.p == NULL) cs.p = chars_seq_impl::allocate_buffer();
00802
00803
00804
00805
00806 int i;
00807 int n = cs.p->h.len;
00808 char **seq = &cs.p->base;
00809 p->send_item_code(SRPC_impl::ic_chars_seq);
00810 p->send_int32(n);
00811
00812 int tot_len = cs.p->h.limit - cs.p->h.bodies;
00813 p->send_bytes(cs.p->h.bodies, tot_len);
00814 for (i = 0; i < n; i++)
00815 {
00816 assert(seq[i] >= cs.p->h.bodies && seq[i] < cs.p->h.limit);
00817 p->send_int32(seq[i] - cs.p->h.bodies);
00818 }
00819
00820 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00821
00822 p->ensure_sending_state();
00823 }
00824
00825 void SRPC::recv_chars_seq(chars_seq &cs) throw(SRPC::failure)
00826 {
00827 p->check_state(SRPC_impl::op_recv_chars_seq);
00828 if (cs.p != NULL) {
00829 if (cs.p->h.storage == chars_seq_impl::full)
00830 p->throw_failure(invalid_parameter, "Wrong kind of chars_seq");
00831 if (cs.p->h.len != 0)
00832 p->throw_failure(invalid_parameter, "Chars_seq must be empty");
00833 }
00834
00835 try {
00836 int n;
00837 int body_bytes;
00838 char **seq;
00839 p->recv_item_code(SRPC_impl::ic_chars_seq);
00840 n = p->recv_int32();
00841 body_bytes = p->recv_int32();
00842
00843 if (cs.p == NULL) cs.p = chars_seq_impl::allocate_buffer(n, body_bytes);
00844 if (cs.p->h.limit - (char *)cs.p < cs.min_size(n) + body_bytes) {
00845
00846 if (cs.p->h.storage == chars_seq_impl::manual)
00847 p->throw_failure(buffer_too_small, "buffer too small");
00848 else chars_seq_impl::expand(cs.p, n, body_bytes);
00849 }
00850 seq = &cs.p->base;
00851
00852 cs.p->h.bodies = cs.p->h.limit - body_bytes;
00853 p->recv_bytes(cs.p->h.bodies, body_bytes);
00854
00855 for (int i = 0; i < n; i++)
00856 seq[i] = cs.p->h.bodies + p->recv_int32();
00857 cs.p->h.len = n;
00858 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00859
00860 p->ensure_recving_state();
00861 }
00862
00863
00864
00865
00866
00867 void SRPC::send_bytes_seq(bytes_seq &bs) throw(SRPC::failure)
00868 {
00869 p->check_state(SRPC_impl::op_send_bytes_seq);
00870
00871 try {
00872
00873 if (bs.p == NULL) bs.p = bytes_seq_impl::allocate_buffer();
00874
00875
00876
00877
00878 int i;
00879 int n = bs.p->h.len;
00880 byte_str *seq = &bs.p->base;
00881 p->send_item_code(SRPC_impl::ic_bytes_seq);
00882 p->send_int32(n);
00883
00884 int tot_len = bs.p->h.limit - bs.p->h.bodies;
00885 p->send_bytes(bs.p->h.bodies, tot_len);
00886 for (i = 0; i < n; i++)
00887 {
00888 assert(seq[i].p >= bs.p->h.bodies && seq[i].p < bs.p->h.limit);
00889 p->send_int32(seq[i].l);
00890 }
00891
00892 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00893
00894 p->ensure_sending_state();
00895 }
00896
00897 void SRPC::recv_bytes_seq(bytes_seq &bs) throw(SRPC::failure)
00898 {
00899 p->check_state(SRPC_impl::op_recv_bytes_seq);
00900 if (bs.p != NULL) {
00901 if (bs.p->h.storage == bytes_seq_impl::full)
00902 p->throw_failure(invalid_parameter, "Wrong kind of bytes_seq");
00903 if (bs.p->h.len != 0)
00904 p->throw_failure(invalid_parameter, "Bytes_seq must be empty");
00905 }
00906
00907 try {
00908 int n;
00909 int body_bytes;
00910 byte_str *seq;
00911 p->recv_item_code(SRPC_impl::ic_bytes_seq);
00912 n = p->recv_int32();
00913 body_bytes = p->recv_int32();
00914
00915 if (bs.p == NULL) bs.p = bytes_seq_impl::allocate_buffer(n, body_bytes);
00916 if (bs.p->h.limit - (char *)bs.p < bs.min_size(n) + body_bytes) {
00917
00918 if (bs.p->h.storage == bytes_seq_impl::manual)
00919 p->throw_failure(buffer_too_small, "buffer too small");
00920 else bytes_seq_impl::expand(bs.p, n, body_bytes);
00921 }
00922 seq = &bs.p->base;
00923
00924 bs.p->h.bodies = bs.p->h.limit - body_bytes;
00925 p->recv_bytes(bs.p->h.bodies, body_bytes);
00926
00927 int offset = 0;
00928 for (int i = 0; i < n; i++) {
00929 int len = p->recv_int32();
00930 seq[i].l = len;
00931 seq[i].p = bs.p->h.bodies + offset;
00932 offset += len;
00933 }
00934 bs.p->h.len = n;
00935 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00936
00937 p->ensure_recving_state();
00938 }
00939
00940
00941
00942
00943
00944
00945 void SRPC::send_seq_start(int len, int bytes) throw(SRPC::failure)
00946 {
00947 p->check_state(SRPC_impl::op_send_seq_start);
00948
00949 try {
00950 p->send_item_code(SRPC_impl::ic_seq_start);
00951 p->send_int32(len);
00952 p->send_int32(bytes);
00953 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00954
00955 p->state = SRPC_impl::seq_out;
00956 }
00957
00958 void SRPC::recv_seq_start(int *len, int *bytes) throw(SRPC::failure)
00959 {
00960 p->check_state(SRPC_impl::op_recv_seq_start);
00961
00962 try {
00963 p->recv_item_code(SRPC_impl::ic_seq_start);
00964 int remote_len = p->recv_int32();
00965 if (len != NULL) *len = remote_len;
00966 int remote_bytes = p->recv_int32();
00967 if (bytes != NULL) *bytes = remote_bytes;
00968 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00969
00970 p->state = SRPC_impl::seq_in;
00971 }
00972
00973 void SRPC::send_seq_end() throw(SRPC::failure)
00974 {
00975 p->check_state(SRPC_impl::op_send_seq_end);
00976
00977 try {
00978 p->send_item_code(SRPC_impl::ic_seq_end);
00979 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00980
00981 p->state = SRPC_impl::data_out;
00982 }
00983
00984 void SRPC::recv_seq_end() throw(SRPC::failure)
00985 {
00986 p->check_state(SRPC_impl::op_recv_seq_end);
00987
00988 try {
00989 p->recv_item_code(SRPC_impl::ic_seq_end);
00990 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00991
00992 p->state = SRPC_impl::data_in;
00993 }
00994
00995
00996
00997
00998
00999
01000 void SRPC::send_end() throw(SRPC::failure)
01001 {
01002 p->check_state(SRPC_impl::op_send_end);
01003
01004 try {
01005 p->send_item_code(SRPC_impl::ic_end_call, true);
01006
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017 if (!p->is_caller) {
01018 p->recv_item_code(SRPC_impl::ic_end_ack);
01019 p->state = SRPC_impl::initial;
01020 }
01021 else
01022 p->state = SRPC_impl::ready;
01023
01024 p->is_sender = false;
01025
01026 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
01027 }
01028
01029 void SRPC::recv_end() throw(SRPC::failure)
01030 {
01031 p->check_state(SRPC_impl::op_recv_end);
01032
01033 try {
01034 p->recv_item_code(SRPC_impl::ic_end_call);
01035
01036 if (p->is_caller) {
01037
01038 p->send_item_code(SRPC_impl::ic_end_ack, true);
01039 p->state = SRPC_impl::initial;
01040 }
01041 else
01042 p->state = SRPC_impl::ready;
01043
01044 p->is_sender = true;
01045
01046 } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
01047 }
01048
01049
01050
01051
01052
01053
01054 void SRPC::send_failure(int r,
01055 const Text &msg,
01056 bool remote_only) throw(SRPC::failure)
01057 {
01058 p->check_state(SRPC_impl::op_send_failure);
01059 p->state = SRPC_impl::failed;
01060 p->f = failure(r, msg);
01061
01062 try {
01063 p->send_item_code(SRPC_impl::ic_failure);
01064 p->send_int32(r);
01065 p->send_bytes(msg.cchars(), msg.Length(), true);
01066 } catch(TCP_sock::failure tf) {
01067 throw(failure(transport_failure, tf.msg));
01068 }
01069 if (!remote_only) throw(p->f);
01070 }
01071
01072 SRPC::failure SRPC::convert_TCP_failure(TCP_sock::failure tf)
01073 {
01074 int r;
01075 Text msg = tf.msg;
01076 switch (tf.reason) {
01077 case TCP_sock::unknown_host: r = SRPC::unknown_host; break;
01078 case TCP_sock::unknown_port: r = SRPC::unknown_interface; break;
01079 case TCP_sock::partner_went_away: r = SRPC::partner_went_away; break;
01080 case TCP_sock::read_timeout: r = SRPC::read_timeout; break;
01081 case TCP_sock::environment_problem:
01082 r = SRPC::transport_failure;
01083 msg = "TCP trouble: " + tf.msg;
01084 break;
01085 default:
01086 r = SRPC::internal_trouble;
01087 msg = "Internal error (TCP): " + tf.msg;
01088 break;
01089 }
01090 return failure(r, msg);
01091 }
01092