Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

SRPC.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // Last modified on Fri Apr 22 14:29:59 EDT 2005 by irina.furman@intel.com 
00020 //      modified on Tue Mar 22 14:48:54 EST 2005 by ken@xorian.net         
00021 //      modified on Fri Sep  8 18:32:00 PDT 2000 by mann   
00022 //      modified on Thu Oct  8 09:17:37 PDT 1998 by heydon 
00023 //      modified on Thu Jul 11 09:02:33 PDT 1996 by levin 
00024 
00025 //  *************************
00026 //  *  SRPC implementation  *
00027 //  *************************
00028 
00029 #include <Basics.H>
00030 #include <VestaConfig.H>
00031 
00032 #include "SRPC.H"
00033 #include "SRPC_impl.H"
00034 #include "chars_seq_private.H"
00035 #include "int_seq_private.H"
00036 
00037 //  *************************************
00038 //  *  Connection setup (constructors)  *
00039 //  *************************************
00040 
00041 SRPC::SRPC(SRPC::which_end me, const Text &interface, const Text &hostname)
00042   throw (SRPC::failure)
00043 {
00044   try {
00045     p = NEW_CONSTR(SRPC_impl, (me == caller));
00046   } catch (TCP_sock::failure f) { throw(convert_TCP_failure(f)); }
00047 
00048   try {
00049     try {
00050       if (p->is_caller) {
00051         if (hostname.Empty())
00052           p->throw_failure(not_implemented,
00053                            "hostname may not be defaulted", true);
00054         p->s = NEW(TCP_sock);
00055         p->s->connect_to(hostname, interface);
00056         // Enable the TCP keep-alive timer
00057         p->s->enable_keepalive();
00058       } else {
00059         // For callee, create a listener and defer socket acquisition
00060         // and connection until first recv operation.
00061         u_short port = TCP_sock::parse_port(interface);
00062         p->listener = SRPC_listener::create(port);
00063       }
00064     } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00065   } catch (SRPC::failure f) { delete p; throw(f); }
00066 }
00067 
00068 SRPC::SRPC(SRPC::which_end me, TCP_sock *sock) throw (SRPC::failure)
00069 {
00070   try {
00071     p = NEW_CONSTR(SRPC_impl, (me == caller));
00072   } catch (TCP_sock::failure f) { throw(convert_TCP_failure(f)); }
00073 
00074   try {
00075     try {
00076       if (p->is_caller)
00077         p->s = sock;
00078       else {
00079         // For callee, create a listener and defer socket acquisition
00080         // and connection until first recv operation.
00081         sockaddr_in addr;
00082         sock->get_local_addr(addr);
00083         p->listener = SRPC_listener::create(addr.sin_port, sock);
00084       }
00085     } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00086   } catch (SRPC::failure f) { delete p; throw(f); }
00087 }
00088 
00089 SRPC::~SRPC() throw(SRPC::failure)
00090 {
00091   if (!(p->state == SRPC_impl::initial || p->state == SRPC_impl::failed)) {
00092     try {
00093       send_failure(partner_went_away, "Partner destroyed its SRPC object");
00094     } catch (...) { /*SKIP*/ }
00095   }
00096   delete p;
00097 }
00098 
00099 Text SRPC::this_host() throw(SRPC::failure)
00100 {
00101   try {
00102     return TCP_sock::this_host();
00103   } catch (TCP_sock::failure f) { throw(convert_TCP_failure(f)); }
00104 
00105 }
00106 
00107 void SRPC::split_name(const Text &host_and_port, Text &host, Text &port)
00108 {
00109   int i = host_and_port.FindCharR(':');
00110   if (i < 0) { host = host_and_port; port = ""; }
00111   else { host = host_and_port.Sub(0, i); port = host_and_port.Sub(i+1); }
00112 }
00113 
00114 
00115 //  ****************************
00116 //  *  Connection information  *
00117 //  ****************************
00118 
00119 bool SRPC::previous_failure(SRPC::failure *f) throw()
00120 {
00121   if (p->state != SRPC_impl::failed) return false;
00122   if (f != NULL) *f = p->f;
00123   return true;
00124 }
00125 
00126 bool SRPC::alive(SRPC::failure *f) throw()
00127 {
00128   if (previous_failure(f)) return false;
00129   try {
00130     return socket()->alive();
00131   } catch (TCP_sock::failure why) {
00132     if (f != NULL) *f = SRPC::convert_TCP_failure(why);
00133     return false;
00134   };
00135 }
00136 
00137 TCP_sock *SRPC::socket() throw(SRPC::failure) {
00138   return p->s;
00139 }
00140 
00141 Text SRPC::local_socket() throw(SRPC::failure)
00142 {
00143   TCP_sock *sock = socket();
00144   sockaddr_in me;
00145   Text t;
00146   char a[15+1+5+1];
00147   if (sock == NULL) return "";
00148   try {
00149     sock->get_local_addr(me);
00150   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00151   t = inet_ntoa_r(me.sin_addr);
00152   sprintf(a, "%s:%d", t.cchars(), ntohs(me.sin_port));
00153   return Text(a);
00154 }
00155 
00156 Text SRPC::remote_socket() throw(SRPC::failure)
00157 {
00158   TCP_sock *sock = socket();
00159   sockaddr_in him;
00160   Text t;
00161   char a[15+1+5+1];
00162   if (sock == NULL) return "";
00163   try {
00164     sock->get_remote_addr(him);
00165   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00166   t = inet_ntoa_r(him.sin_addr);
00167   sprintf(a, "%s:%d", t.cchars(), ntohs(him.sin_port));
00168   return Text(a);
00169 }
00170 
00171 //  *******************
00172 //  *  Configuration  *
00173 //  *******************
00174 
00175 void SRPC::enable_read_timeout(unsigned int seconds,
00176                                bool use_between_calls)
00177   throw ()
00178 {
00179   p->use_read_timeout = true;
00180   p->use_read_timeout_between_calls = use_between_calls;
00181   p->read_timeout_secs = seconds;
00182 }
00183 
00184 void SRPC::disable_read_timeout() throw ()
00185 {
00186   p->use_read_timeout = false;
00187 }
00188 
00189 bool SRPC::get_read_timeout(/*OUT*/ unsigned int &seconds,
00190                             /*OUT*/ bool &use_between_calls) throw()
00191 {
00192   if(p->use_read_timeout)
00193     {
00194       seconds = p->read_timeout_secs;
00195       use_between_calls = p->use_read_timeout_between_calls;
00196     }
00197   return p->use_read_timeout;
00198 }
00199 
00200 //  *********************
00201 //  *  Call initiation  *
00202 //  *********************
00203 
00204 void SRPC::start_call(int proc_id, int intf_version) throw(SRPC::failure)
00205 {
00206   p->check_state(SRPC_impl::op_start_call);
00207 
00208   try {
00209     p->send_item_code(SRPC_impl::ic_start_call);
00210     p->send_int32(proc_id);
00211     p->send_int32(intf_version);
00212   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00213 
00214   p->state = SRPC_impl::ready;
00215 }
00216 
00217 void SRPC::await_call(int &proc_id, int &intf_version) throw(SRPC::failure)
00218 {
00219   int remote_proc_id;
00220   int remote_intf_version;
00221 
00222   p->check_state(SRPC_impl::op_await_call);
00223 
00224   try {
00225     item_code ic;
00226     ic = p->read_item_code();
00227     if (ic != SRPC_impl::ic_start_call) {
00228       // Assume start_call was defaulted.
00229       p->buffered_ic = ic;                   // put back for next time
00230       remote_proc_id = default_proc_id;
00231       remote_intf_version = default_intf_version;
00232     } else {
00233       remote_proc_id = p->recv_int32();
00234       remote_intf_version = p->recv_int32();
00235     }
00236   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00237 
00238   // Check for acceptable intf_version
00239   if (intf_version == default_intf_version)
00240     intf_version = remote_intf_version;
00241   else if (intf_version != remote_intf_version)
00242     p->throw_failure(version_skew,
00243                      "start_call/await_call disagree on intf_version");
00244 
00245   // Check for acceptable proc_id
00246   if (proc_id == default_proc_id)
00247     proc_id = remote_proc_id;
00248   else if (proc_id != remote_proc_id)
00249     p->throw_failure(version_skew,
00250                      "start_call/await_call disagree on proc_id");
00251 
00252   p->state = SRPC_impl::ready;
00253 }
00254 
00255 
00256 //  **************************
00257 //  *  Integer transmission  *
00258 //  **************************
00259 
00260 void SRPC::send_int32(int i) throw(SRPC::failure)
00261 {
00262   p->check_state(SRPC_impl::op_send_int32);
00263 
00264   try {
00265     p->send_item_code(SRPC_impl::ic_int32);
00266     p->send_int32(i);
00267   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00268 
00269   p->ensure_sending_state();
00270 }
00271 
00272 int SRPC::recv_int32(bool *got_end) throw(SRPC::failure)
00273 {
00274   int result;
00275   p->check_state(SRPC_impl::op_recv_int32);
00276 
00277   try {
00278     p->recv_item_code(SRPC_impl::ic_int32, got_end);
00279     result = (got_end != NULL && *got_end) ? 0 : p->recv_int32();
00280   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00281 
00282   p->ensure_recving_state();
00283 
00284   return result;
00285 }
00286 
00287 void SRPC::send_int16(Basics::int16 i) throw(SRPC::failure)
00288 {
00289   p->check_state(SRPC_impl::op_send_int16);
00290 
00291   try {
00292     p->send_item_code(SRPC_impl::ic_int16);
00293     p->send_int16(i);
00294   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00295 
00296   p->ensure_sending_state();
00297 }
00298 
00299 Basics::int16 SRPC::recv_int16(bool *got_end) throw(SRPC::failure)
00300 {
00301   Basics::int16 result;
00302   p->check_state(SRPC_impl::op_recv_int16);
00303 
00304   try {
00305     p->recv_item_code(SRPC_impl::ic_int16, got_end);
00306     result = (got_end != NULL && *got_end) ? 0 : p->recv_int16();
00307   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00308 
00309   p->ensure_recving_state();
00310 
00311   return result;
00312 }
00313 
00314 void SRPC::send_int64(Basics::int64 i) throw(SRPC::failure)
00315 {
00316   p->check_state(SRPC_impl::op_send_int64);
00317 
00318   try {
00319     p->send_item_code(SRPC_impl::ic_int64);
00320     p->send_int64(i);
00321   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00322 
00323   p->ensure_sending_state();
00324 }
00325 
00326 Basics::int64 SRPC::recv_int64(bool *got_end) throw(SRPC::failure)
00327 {
00328   Basics::int64 result;
00329   p->check_state(SRPC_impl::op_recv_int64);
00330 
00331   try {
00332     p->recv_item_code(SRPC_impl::ic_int64, got_end);
00333     result = (got_end != NULL && *got_end) ? 0 : p->recv_int64();
00334   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00335 
00336   p->ensure_recving_state();
00337 
00338   return result;
00339 }
00340 
00341 void SRPC::send_bool(bool b) throw (SRPC::failure)
00342 {
00343   p->check_state(SRPC_impl::op_send_bool);
00344 
00345   try {
00346     p->send_item_code(b ? SRPC_impl::ic_bool_true : SRPC_impl::ic_bool_false);
00347   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00348 
00349   p->ensure_sending_state();
00350 }
00351 
00352 bool SRPC::recv_bool(bool *got_end) throw(SRPC::failure)
00353 {
00354   bool result = false;
00355   p->check_state(SRPC_impl::op_recv_bool);
00356 
00357   try {
00358     SRPC_impl::item_code ic = p->recv_item_code(SRPC_impl::ic_bool_true,
00359                                                 SRPC_impl::ic_bool_false,
00360                                                 got_end);
00361     if(got_end != NULL && *got_end)
00362       {
00363         result = false;
00364       }
00365     else if(ic == SRPC_impl::ic_bool_true)
00366       {
00367         result = true;
00368       }
00369     else if(ic == SRPC_impl::ic_bool_false)
00370       {
00371         result = false;
00372       }
00373   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00374 
00375   p->ensure_recving_state();
00376 
00377   return result;
00378 }
00379 
00380 //  ***********************************
00381 //  *  Character string transmission  *
00382 //  ***********************************
00383 
00384 void SRPC::send_chars(const char *s) throw(SRPC::failure)
00385 {
00386   p->check_state(SRPC_impl::op_send_chars);
00387 
00388   try {
00389     p->send_item_code(SRPC_impl::ic_chars);
00390     p->send_bytes(s, s == NULL ? 0 : strlen(s));
00391   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00392 
00393   p->ensure_sending_state();
00394 }
00395 
00396 char *SRPC::recv_chars(bool *got_end) throw(SRPC::failure)
00397 {
00398   int len;
00399   char *str;
00400 
00401   p->check_state(SRPC_impl::op_recv_chars);
00402 
00403   try {
00404     p->recv_item_code(SRPC_impl::ic_chars, got_end);
00405     if (got_end != NULL && *got_end) str = NULL;
00406     else {
00407       len = p->recv_int32();
00408 
00409       if (len < 0)
00410         p->throw_failure(internal_trouble, "Malformed string length");
00411 
00412       str = NEW_PTRFREE_ARRAY(char, len+1);
00413       p->recv_bytes(str, len);
00414       str[len] = '\0';
00415     }
00416 
00417   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00418 
00419   p->ensure_recving_state();
00420   return str;
00421 }
00422 
00423 char *SRPC::recv_chars(char *buff, int buff_len, bool *got_end)
00424   throw(SRPC::failure)
00425 {
00426   int len;
00427   char *str;
00428 
00429   p->check_state(SRPC_impl::op_recv_chars);
00430 
00431   try {
00432     p->recv_item_code(SRPC_impl::ic_chars, got_end);
00433     if (got_end != NULL && *got_end) str = NULL;
00434     else {
00435       len = p->recv_int32();
00436 
00437       if (len < 0)
00438         p->throw_failure(internal_trouble, "Malformed string length");
00439 
00440       if (len < buff_len)
00441         str = buff;            // fast path, use client-supplied buffer
00442       else
00443         // slow path, allocate buffer on heap
00444         str = NEW_PTRFREE_ARRAY(char, len+1);
00445       p->recv_bytes(str, len);
00446       str[len] = '\0';
00447     }
00448 
00449   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00450 
00451   p->ensure_recving_state();
00452   return str;
00453 }
00454 
00455 void SRPC::recv_chars_here(char *buffer, int &len, bool *got_end)
00456     throw(SRPC::failure)
00457 {
00458   p->check_state(SRPC_impl::op_recv_chars_here);
00459 
00460   try {
00461     int incoming_len;
00462 
00463     p->recv_item_code(SRPC_impl::ic_chars, got_end);
00464     if (got_end != NULL && *got_end) len = 0;
00465     else {
00466       incoming_len = p->recv_int32();
00467       if (incoming_len + 1 > len)
00468         p->throw_failure(buffer_too_small, "Buffer too small");
00469 
00470       p->recv_bytes(buffer, incoming_len);
00471       buffer[incoming_len] = '\0';
00472       len = incoming_len;
00473     }
00474   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00475 
00476   p->ensure_recving_state();
00477 }
00478 
00479 //  ***********************
00480 //  *  Text transmission  *
00481 //  ***********************
00482 
00483 void SRPC::send_Text(const Text &t) throw(SRPC::failure)
00484 {
00485   p->check_state(SRPC_impl::op_send_text);
00486 
00487   try {
00488     p->send_item_code(SRPC_impl::ic_text);
00489     char *s = t.chars();              // gets underlying pointer
00490     p->send_bytes(s, strlen(s));
00491   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00492 
00493   p->ensure_sending_state();
00494 }
00495 
00496 void SRPC::recv_Text(/*OUT*/ Text &t, bool *got_end) throw(SRPC::failure)
00497 {
00498   p->check_state(SRPC_impl::op_recv_text);
00499 
00500   try {
00501     p->recv_item_code(SRPC_impl::ic_text, got_end);
00502     if (got_end != NULL && *got_end) {
00503       t.Init("");
00504     } else {
00505       int len = p->recv_int32();
00506       if (len < 0) p->throw_failure(internal_trouble, "Malformed Text length");
00507       char *str = NEW_PTRFREE_ARRAY(char, len+1);
00508       p->recv_bytes(str, len);
00509       str[len] = '\0';
00510       t.Init(str);
00511     }
00512 
00513   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00514 
00515   p->ensure_recving_state();
00516 }
00517 
00518 //  ********************************
00519 //  *  Byte-sequence transmission  *
00520 //  ********************************
00521 
00522 void SRPC::send_bytes(const char *buffer, int len) throw(SRPC::failure)
00523 {
00524   p->check_state(SRPC_impl::op_send_bytes);
00525 
00526   try {
00527     p->send_item_code(SRPC_impl::ic_bytes);
00528     p->send_bytes(buffer, len);
00529   } catch(TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00530 
00531   p->ensure_sending_state();
00532 }
00533 
00534 char *SRPC::recv_bytes(int &len, bool *got_end) throw(SRPC::failure)
00535 {
00536   char *buffer;
00537 
00538   p->check_state(SRPC_impl::op_recv_chars);
00539 
00540   try {
00541     p->recv_item_code(SRPC_impl::ic_bytes, got_end);
00542     if (got_end != NULL && *got_end) buffer = NULL;
00543     else {
00544       len = p->recv_int32();
00545 
00546       if (len < 0)
00547         p->throw_failure(internal_trouble, "Malformed string length");
00548 
00549       buffer = NEW_PTRFREE_ARRAY(char, len);
00550       p->recv_bytes(buffer, len);
00551     }
00552   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00553 
00554   p->ensure_recving_state();
00555   return buffer;
00556 }
00557 
00558 void SRPC::recv_bytes_here(char *buffer, int &len, bool *got_end)
00559     throw(SRPC::failure)
00560 {
00561   p->check_state(SRPC_impl::op_recv_bytes_here);
00562 
00563   try {
00564     int incoming_len;
00565 
00566     p->recv_item_code(SRPC_impl::ic_bytes, got_end);
00567     if (got_end != NULL && *got_end) len = 0;
00568     else {
00569       incoming_len = p->recv_int32();
00570       if (incoming_len > len)
00571         p->throw_failure(buffer_too_small, "Buffer too small");
00572 
00573       p->recv_bytes(buffer, incoming_len);
00574       len = incoming_len;
00575     }
00576   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00577 
00578   p->ensure_recving_state();
00579 }
00580 
00581 //  *********************************
00582 //  *  Socket address transmission  *
00583 //  *********************************
00584 
00585 // Care is required here to make sure that a machine-independent
00586 // representation of socket addresses is used.  The relevant fields of
00587 // a socket address are already in network byte order, and their lengths
00588 // are machine-independent (4 bytes for IP address, 2 for port).  Therefore,
00589 // it suffices to assemble them in a single byte array without any
00590 // other change of representation.
00591 
00592 static sockaddr_in dummy_sock;
00593 const int bytes_for_socket =
00594             sizeof(dummy_sock.sin_addr)+sizeof(dummy_sock.sin_port);
00595 
00596 void SRPC::send_socket(sockaddr_in &sock) throw(SRPC::failure)
00597 {
00598   char buff[bytes_for_socket];
00599   p->check_state(SRPC_impl::op_send_socket);
00600 
00601   memcpy(buff, (char *) &sock.sin_addr, sizeof(sock.sin_addr));
00602   memcpy(buff + sizeof(sock.sin_addr), (char *) &sock.sin_port,
00603          sizeof(sock.sin_port));
00604   try {
00605     p->send_item_code(SRPC_impl::ic_socket);
00606     p->send_bytes(buff, sizeof(buff));
00607   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00608 
00609   p->ensure_sending_state();
00610 }
00611 
00612 sockaddr_in SRPC::recv_socket(bool *got_end) throw(SRPC::failure)
00613 {
00614   char buff[bytes_for_socket];
00615   sockaddr_in sock;
00616 
00617   p->check_state(SRPC_impl::op_recv_socket);
00618 
00619   memset((char *)&sock, 0, sizeof(sock));
00620   try {
00621     p->recv_item_code(SRPC_impl::ic_socket, got_end);
00622     if (got_end == NULL || !*got_end) {
00623       int incoming_len = p->recv_int32();
00624 
00625       if (incoming_len != bytes_for_socket)
00626         p->throw_failure(internal_trouble, "Malformed socket");
00627 
00628       p->recv_bytes(buff, incoming_len);
00629 
00630       sock.sin_family = AF_INET;
00631       memcpy((char *) &sock.sin_addr, buff, sizeof(sock.sin_addr));
00632       memcpy((char *) &sock.sin_port, buff + sizeof(sock.sin_addr),
00633              sizeof(sock.sin_port));
00634     }
00635   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00636 
00637   p->ensure_recving_state();
00638   return sock;
00639 }
00640 
00641 
00642 //  ***********************************
00643 //  *  Integer sequence transmission  *
00644 //  ***********************************
00645 
00646 void SRPC::send_int_seq(int_seq &is) throw(SRPC::failure)
00647 {
00648   p->check_state(SRPC_impl::op_send_int32_seq);
00649 
00650   try {
00651     // If buffer allocation for 'is' was deferred, do it now.
00652     if (is.p == NULL) is.p = int_seq_impl::allocate_buffer();
00653 
00654     int n = is.length();
00655     p->send_item_code(SRPC_impl::ic_int32_seq);
00656     p->send_int32(n);
00657     for (int i = 0; i < n; i++) p->send_int32(is[i]);
00658   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00659 
00660   p->ensure_sending_state();
00661 }
00662 
00663 void SRPC::recv_int_seq(int_seq &is) throw (SRPC::failure)
00664 {
00665   p->check_state(SRPC_impl::op_recv_int32_seq);
00666 
00667   try {
00668     int n;
00669     int *seq;
00670 
00671     p->recv_item_code(SRPC_impl::ic_int32_seq);
00672     n = p->recv_int32();
00673     // If buffer allocation for 'is' was deferred, do it now.
00674     // Otherwise, ensure buffer is large enough for incoming sequence.
00675     if (is.p == NULL) is.p = int_seq_impl::allocate_buffer(n);
00676     else is.lengthen(n);
00677     seq = &is.p->base;
00678     is.p->h.len = n;
00679     for (int i = 0; i < n; i++) seq[i] = p->recv_int32();
00680 
00681   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00682 
00683   p->ensure_recving_state();
00684 }
00685 
00686 //  ******************************
00687 //  *  Short array transmission  *
00688 //  ******************************
00689 
00690 void SRPC::send_int16_array(const Basics::int16* seq, int len)
00691   throw (SRPC::failure)
00692 {
00693   p->check_state(SRPC_impl::op_send_int16_array);
00694 
00695   try {
00696     p->send_item_code(SRPC_impl::ic_int16_array);
00697     p->send_int32(len);
00698     for (int i = 0; i < len; i++) p->send_int16(seq[i]);
00699   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00700 
00701   p->ensure_sending_state();
00702 }
00703 
00704 Basics::int16* SRPC::recv_int16_array(/*OUT*/ int &len)
00705   throw (SRPC::failure)
00706 {
00707   p->check_state(SRPC_impl::op_recv_int16_array);
00708 
00709   Basics::int16 *seq;
00710   try {
00711     p->recv_item_code(SRPC_impl::ic_int16_array);
00712     len = p->recv_int32();
00713     seq = NEW_PTRFREE_ARRAY(Basics::int16, len);
00714     for (int i = 0; i < len; i++) seq[i] = p->recv_int16();
00715   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00716 
00717   p->ensure_recving_state();
00718   return seq;
00719 }
00720 
00721 //  ***************************************
00722 //  *  32-bit integer array transmission  *
00723 //  ***************************************
00724 
00725 void SRPC::send_int32_array(const Basics::int32* seq, int len)
00726   throw (SRPC::failure)
00727 {
00728   p->check_state(SRPC_impl::op_send_int32_array);
00729 
00730   try {
00731     p->send_item_code(SRPC_impl::ic_int32_array);
00732     p->send_int32(len);
00733     for (int i = 0; i < len; i++) p->send_int32(seq[i]);
00734   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00735 
00736   p->ensure_sending_state();
00737 }
00738 
00739 Basics::int32* SRPC::recv_int32_array(/*OUT*/ int &len)
00740   throw (SRPC::failure)
00741 {
00742   p->check_state(SRPC_impl::op_recv_int32_array);
00743 
00744   Basics::int32 *seq;
00745   try {
00746     p->recv_item_code(SRPC_impl::ic_int32_array);
00747     len = p->recv_int32();
00748     seq = NEW_PTRFREE_ARRAY(Basics::int32, len);
00749     for (int i = 0; i < len; i++) seq[i] = p->recv_int32();
00750   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00751 
00752   p->ensure_recving_state();
00753   return seq;
00754 }
00755 
00756 //  ***************************************
00757 //  *  64-bit integer array transmission  *
00758 //  ***************************************
00759 
00760 void SRPC::send_int64_array(const Basics::int64* seq, int len)
00761   throw (SRPC::failure)
00762 {
00763   p->check_state(SRPC_impl::op_send_int64_array);
00764 
00765   try {
00766     p->send_item_code(SRPC_impl::ic_int64_array);
00767     p->send_int32(len);
00768     for (int i = 0; i < len; i++) p->send_int64(seq[i]);
00769   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00770 
00771   p->ensure_sending_state();
00772 }
00773 
00774 Basics::int64* SRPC::recv_int64_array(/*OUT*/ int &len)
00775   throw (SRPC::failure)
00776 {
00777   p->check_state(SRPC_impl::op_recv_int64_array);
00778 
00779   Basics::int64 *seq;
00780   try {
00781     p->recv_item_code(SRPC_impl::ic_int64_array);
00782     len = p->recv_int32();
00783     seq = NEW_PTRFREE_ARRAY(Basics::int64, len);
00784     for (int i = 0; i < len; i++) seq[i] = p->recv_int64();
00785   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00786 
00787   p->ensure_recving_state();
00788   return seq;
00789 }
00790 
00791 //  ********************************************
00792 //  *  Character string sequence transmission  *
00793 //  ********************************************
00794 
00795 void SRPC::send_chars_seq(chars_seq &cs) throw(SRPC::failure)
00796 {
00797   p->check_state(SRPC_impl::op_send_chars_seq);
00798 
00799   try {
00800     // If buffer allocation for 'cs' was deferred, do it now.
00801     if (cs.p == NULL) cs.p = chars_seq_impl::allocate_buffer();
00802 
00803     // We send the sequence as a single item, which requires that we
00804     // know the aggregate length of the string bodies.  We reach inside
00805     // the representation to make this efficient.
00806     int i;
00807     int n = cs.p->h.len;
00808     char **seq = &cs.p->base;
00809     p->send_item_code(SRPC_impl::ic_chars_seq);
00810     p->send_int32(n);
00811 
00812     int tot_len = cs.p->h.limit - cs.p->h.bodies;
00813     p->send_bytes(cs.p->h.bodies, tot_len);
00814     for (i = 0; i < n; i++)
00815       {
00816         assert(seq[i] >= cs.p->h.bodies && seq[i] < cs.p->h.limit);
00817         p->send_int32(seq[i] - cs.p->h.bodies); 
00818       }
00819 
00820   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00821 
00822   p->ensure_sending_state();
00823 }
00824 
00825 void SRPC::recv_chars_seq(chars_seq &cs) throw(SRPC::failure)
00826 {
00827   p->check_state(SRPC_impl::op_recv_chars_seq);
00828   if (cs.p != NULL) {
00829     if (cs.p->h.storage == chars_seq_impl::full)
00830       p->throw_failure(invalid_parameter, "Wrong kind of chars_seq");
00831     if (cs.p->h.len != 0)
00832       p->throw_failure(invalid_parameter, "Chars_seq must be empty");
00833   }
00834 
00835   try {
00836     int n;
00837     int body_bytes;
00838     char **seq;
00839     p->recv_item_code(SRPC_impl::ic_chars_seq);
00840     n = p->recv_int32();
00841     body_bytes = p->recv_int32();
00842     // If buffer allocation for 'cs' was deferred, do it now.
00843     if (cs.p == NULL) cs.p = chars_seq_impl::allocate_buffer(n, body_bytes);
00844     if (cs.p->h.limit - (char *)cs.p < cs.min_size(n) + body_bytes) {
00845       // 'cs' isn't big enough to hold incoming sequence.
00846       if (cs.p->h.storage == chars_seq_impl::manual)
00847         p->throw_failure(buffer_too_small, "buffer too small");
00848       else chars_seq_impl::expand(cs.p, n, body_bytes);
00849     }
00850     seq = &cs.p->base;
00851     // Receive all string bodies in a lump
00852     cs.p->h.bodies = cs.p->h.limit - body_bytes;
00853     p->recv_bytes(cs.p->h.bodies, body_bytes);
00854     // Receive offsets into bodies; set sequence pointers
00855     for (int i = 0; i < n; i++)
00856       seq[i] = cs.p->h.bodies + p->recv_int32();
00857     cs.p->h.len = n;
00858   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00859 
00860   p->ensure_recving_state();
00861 }
00862 
00863 //  ***************************************
00864 //  *  Byte-string sequence transmission  *
00865 //  ***************************************
00866 
00867 void SRPC::send_bytes_seq(bytes_seq &bs) throw(SRPC::failure)
00868 {
00869   p->check_state(SRPC_impl::op_send_bytes_seq);
00870 
00871   try {
00872     // If buffer allocation for 'bs' was deferred, do it now.
00873     if (bs.p == NULL) bs.p = bytes_seq_impl::allocate_buffer();
00874 
00875     // We send the sequence as a single item, which requires that we
00876     // know the aggregate length of the string bodies.  We reach inside
00877     // the representation to make this efficient.
00878     int i;
00879     int n = bs.p->h.len;
00880     byte_str *seq = &bs.p->base;
00881     p->send_item_code(SRPC_impl::ic_bytes_seq);
00882     p->send_int32(n);
00883 
00884     int tot_len = bs.p->h.limit - bs.p->h.bodies;
00885     p->send_bytes(bs.p->h.bodies, tot_len);
00886     for (i = 0; i < n; i++)
00887       {
00888         assert(seq[i].p >= bs.p->h.bodies && seq[i].p < bs.p->h.limit);
00889         p->send_int32(seq[i].l); 
00890       }
00891 
00892   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00893 
00894   p->ensure_sending_state();
00895 }
00896 
00897 void SRPC::recv_bytes_seq(bytes_seq &bs) throw(SRPC::failure)
00898 {
00899   p->check_state(SRPC_impl::op_recv_bytes_seq);
00900   if (bs.p != NULL) {
00901     if (bs.p->h.storage == bytes_seq_impl::full)
00902       p->throw_failure(invalid_parameter, "Wrong kind of bytes_seq");
00903     if (bs.p->h.len != 0)
00904       p->throw_failure(invalid_parameter, "Bytes_seq must be empty");
00905   }
00906 
00907   try {
00908     int n;
00909     int body_bytes;
00910     byte_str *seq;
00911     p->recv_item_code(SRPC_impl::ic_bytes_seq);
00912     n = p->recv_int32();
00913     body_bytes = p->recv_int32();
00914     // If buffer allocation for 'bs' was deferred, do it now.
00915     if (bs.p == NULL) bs.p = bytes_seq_impl::allocate_buffer(n, body_bytes);
00916     if (bs.p->h.limit - (char *)bs.p < bs.min_size(n) + body_bytes) {
00917       // 'bs' isn't big enough to hold incoming sequence.
00918       if (bs.p->h.storage == bytes_seq_impl::manual)
00919         p->throw_failure(buffer_too_small, "buffer too small");
00920       else bytes_seq_impl::expand(bs.p, n, body_bytes);
00921     }
00922     seq = &bs.p->base;
00923     // Receive all string bodies in a lump
00924     bs.p->h.bodies = bs.p->h.limit - body_bytes;
00925     p->recv_bytes(bs.p->h.bodies, body_bytes);
00926     // Receive byte-string lengths; compute pointers
00927     int offset = 0;
00928     for (int i = 0; i < n; i++) {
00929       int len = p->recv_int32();
00930       seq[i].l = len;
00931       seq[i].p = bs.p->h.bodies + offset;
00932       offset += len;
00933     }
00934     bs.p->h.len = n;
00935   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00936 
00937   p->ensure_recving_state();
00938 }
00939 
00940 
00941 //  ******************************************
00942 //  *  Sequence transmission (general case)  *
00943 //  ******************************************
00944 
00945 void SRPC::send_seq_start(int len, int bytes) throw(SRPC::failure)
00946 {
00947   p->check_state(SRPC_impl::op_send_seq_start);
00948 
00949   try {
00950     p->send_item_code(SRPC_impl::ic_seq_start);
00951     p->send_int32(len);
00952     p->send_int32(bytes);
00953   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00954 
00955   p->state = SRPC_impl::seq_out;
00956 }
00957 
00958 void SRPC::recv_seq_start(int *len, int *bytes) throw(SRPC::failure)
00959 {
00960   p->check_state(SRPC_impl::op_recv_seq_start);
00961 
00962   try {
00963     p->recv_item_code(SRPC_impl::ic_seq_start);
00964     int remote_len = p->recv_int32();
00965     if (len != NULL) *len = remote_len;
00966     int remote_bytes = p->recv_int32();
00967     if (bytes != NULL) *bytes = remote_bytes;
00968   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00969 
00970   p->state = SRPC_impl::seq_in;
00971 }
00972 
00973 void SRPC::send_seq_end() throw(SRPC::failure)
00974 {
00975   p->check_state(SRPC_impl::op_send_seq_end);
00976 
00977   try {
00978     p->send_item_code(SRPC_impl::ic_seq_end);
00979   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00980 
00981   p->state = SRPC_impl::data_out;
00982 }
00983 
00984 void SRPC::recv_seq_end() throw(SRPC::failure)
00985 {
00986   p->check_state(SRPC_impl::op_recv_seq_end);
00987 
00988   try {
00989     p->recv_item_code(SRPC_impl::ic_seq_end);
00990   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
00991 
00992   p->state = SRPC_impl::data_in;
00993 }
00994 
00995 
00996 //  **************************************
00997 //  *  End argument/result transmission  *
00998 //  **************************************
00999 
01000 void SRPC::send_end() throw(SRPC::failure)
01001 {
01002   p->check_state(SRPC_impl::op_send_end);
01003 
01004   try {
01005     p->send_item_code(SRPC_impl::ic_end_call, /*flush=*/ true);
01006 
01007     // If this is the callee, verify that other end is happy.
01008     // This ensures that any problem at the caller side while receiving
01009     // the results is reported to the callee before the callee finishes.
01010     // Otherwise, the callee will assume all is well, and an error reported
01011     // from the other end will be delivered on the *next* call, which
01012     // will confuse everyone.
01013     // Note:  if we are the caller, this handshake can be omitted, since
01014     // we're about to turn around and wait for the callee to send the
01015     // results anyway, at which time any error will be reported.
01016 
01017     if (!p->is_caller) {
01018       p->recv_item_code(SRPC_impl::ic_end_ack);
01019       p->state = SRPC_impl::initial;
01020     }
01021     else
01022       p->state =  SRPC_impl::ready;
01023 
01024     p->is_sender = false;
01025 
01026   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
01027 }
01028 
01029 void SRPC::recv_end() throw(SRPC::failure)
01030 {
01031   p->check_state(SRPC_impl::op_recv_end);
01032 
01033   try {
01034     p->recv_item_code(SRPC_impl::ic_end_call);
01035 
01036     if (p->is_caller) {
01037       // Tell the other end we're happy.  See comment in send_end.
01038       p->send_item_code(SRPC_impl::ic_end_ack, /*flush=*/ true);
01039       p->state = SRPC_impl::initial;
01040     }
01041     else
01042       p->state = SRPC_impl::ready;
01043 
01044     p->is_sender = true;
01045 
01046   } catch (TCP_sock::failure f) { p->SRPC_impl::report_TCP_failure(f); }
01047 }
01048 
01049 
01050 //  ***********************
01051 //  *  Failure reporting  *
01052 //  ***********************
01053 
01054 void SRPC::send_failure(int r,
01055                         const Text &msg,
01056                         bool remote_only) throw(SRPC::failure)
01057 {
01058   p->check_state(SRPC_impl::op_send_failure);
01059   p->state = SRPC_impl::failed;
01060   p->f = failure(r, msg);
01061 
01062   try {
01063     p->send_item_code(SRPC_impl::ic_failure);
01064     p->send_int32(r);
01065     p->send_bytes(msg.cchars(), msg.Length(), true);
01066   } catch(TCP_sock::failure tf) {
01067     throw(failure(transport_failure, tf.msg));
01068   }
01069   if (!remote_only) throw(p->f);
01070 }
01071 
01072 SRPC::failure SRPC::convert_TCP_failure(TCP_sock::failure tf)
01073 {
01074   int r;
01075   Text msg = tf.msg;
01076   switch (tf.reason) {
01077   case TCP_sock::unknown_host: r = SRPC::unknown_host; break;
01078   case TCP_sock::unknown_port: r = SRPC::unknown_interface; break;
01079   case TCP_sock::partner_went_away: r = SRPC::partner_went_away; break;
01080   case TCP_sock::read_timeout: r = SRPC::read_timeout; break;
01081   case TCP_sock::environment_problem:
01082     r = SRPC::transport_failure;
01083     msg = "TCP trouble: " + tf.msg;
01084     break;
01085   default:
01086     r = SRPC::internal_trouble;
01087     msg = "Internal error (TCP): " + tf.msg;
01088     break;
01089   }
01090   return failure(r, msg);
01091 }
01092 

Generated on Mon May 8 00:48:57 2006 for Vesta by  doxygen 1.4.2