Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

SRPC_impl.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // Last modified on Sun May 22 22:50:30 EDT 2005 by ken@xorian.net         
00020 //      modified on Fri Apr 22 15:02:11 EDT 2005 by irina.furman@intel.com 
00021 //      modified on Fri Sep  8 17:49:15 PDT 2000 by mann   
00022 //      modified on Thu Oct  8 09:17:17 PDT 1998 by heydon 
00023 //      modified on Fri Aug 29 15:19:08 PDT 1997 by yuanyu 
00024 //      modified on Thu Aug  1 14:47:19 PDT 1996 by levin 
00025 
00026 //  ***************************************
00027 //  *  Implementation of SRPC_impl class  *
00028 //  ***************************************
00029 
00030 #include <Basics.H>
00031 #include "SRPC.H"
00032 #include "SRPC_impl.H"
00033 
00034 #if defined(__linux__)
00035 // Linux lacks these typedefs.
00036 typedef        unsigned short  in_port_t;
00037 typedef        unsigned int    in_addr_t;
00038 #endif
00039 
00040 // ------------------------------------------------------------
00041 // SRPC protocol changes:
00042 
00043 // V1.4:
00044 
00045 //       Replaced the pinger thread mechanism with the OS-supplied TCP
00046 //       keep-alive timer (set by the SO_KEEPALIVE socket option).
00047 //       This includes the removal of the call-back port from the
00048 //       initial "hello" message (as it's no longer needed).
00049 
00050 // ------------------------------------------------------------
00051 const char *SRPC_VERSION = "SRPC V1.4";
00052 
00053 const int INITIAL_WAITERS_KLUDGE = 5;
00054 
00055 //  **********************************
00056 //  *  Constructors and Destructors  *
00057 //  **********************************
00058 
00059 SRPC_impl::SRPC_impl(bool i_am_caller)
00060 : is_caller(i_am_caller), is_sender(i_am_caller),
00061   version_sent(false), version_checked(false),
00062   state(SRPC_impl::initial),
00063   buffered_ic(SRPC_impl::ic_null), s((TCP_sock *)NULL),
00064   listener((SRPC_listener *)NULL),
00065   use_read_timeout(false), use_read_timeout_between_calls(false),
00066   read_timeout_secs(0)
00067 {
00068 }
00069 
00070 SRPC_impl::~SRPC_impl() throw (SRPC::failure)
00071 {
00072   if (this->listener != (SRPC_listener *)NULL) {
00073     if (!SRPC_listener::destroy(this->listener))
00074       throw_failure(SRPC::internal_trouble, "listeners list scrambled.");
00075   }
00076   if (this->s != (TCP_sock *)NULL) delete this->s;
00077 }
00078 
00079 //  ***********************
00080 //  *  Failure reporting  *
00081 //  ***********************
00082 
00083 void SRPC_impl::throw_failure(int reason, const Text &msg, bool local_only)
00084   throw(SRPC::failure)
00085 {
00086   if (!(local_only || this->s == (TCP_sock *)NULL)) {
00087     try {
00088       Text rmsg = "[from other end] " + msg;
00089       send_item_code(SRPC_impl::ic_failure);
00090       send_int32(reason);
00091       send_bytes(rmsg.chars(), rmsg.Length(), /*flush=*/ true);
00092     } catch (TCP_sock::failure) {
00093       // Well, we tried...
00094     }
00095   }
00096   this->state = SRPC_impl::failed;
00097   this->f = SRPC::failure(reason, msg);
00098   throw(this->f);
00099 }
00100 
00101 void SRPC_impl::report_TCP_failure(TCP_sock::failure &tf)
00102   throw (SRPC::failure)
00103 {
00104   SRPC::failure f = SRPC::convert_TCP_failure(tf);
00105   throw_failure(f.r, f.msg, /*local_only=*/ true);
00106 }
00107 
00108 //  *******************
00109 //  *  State machine  *
00110 //  *******************
00111 
00112 chars op_names[] = {
00113   "start_call",       "await_call",
00114   "send_int32",       "recv_int32",
00115   "send_chars",       "recv_chars",      "recv_chars_here", 
00116   "send_bytes",       "recv_bytes",      "recv_bytes_here",
00117   "send_text",        "recv_text",
00118   "send_socket",      "recv_socket",
00119   "send_int32_seq",   "recv_int32_seq",
00120   "send_chars_seq",   "recv_chars_seq",
00121   "send_bytes_seq",   "recv_bytes_seq",
00122   "send_seq_start",   "recv_seq_start", "send_seq_end",    "recv_seq_end",
00123   "send_end",         "recv_end",
00124   "send_failure",
00125 
00126   // Added later:
00127   "send_int16",       "recv_int16",
00128   "send_int16_array", "recv_int16_array",
00129   "send_int64",       "recv_int64",
00130 
00131   "send_int32_array", "recv_int32_array",
00132   "send_int64_array", "recv_int64_array",
00133 
00134   "send_bool",        "recv_bool"
00135 };
00136 
00137 chars state_names[] = {
00138   "initial", "ready", "data_out", "seq_out", "data_in", "seq_in", "failed"
00139 };
00140 
00141 enum {
00142   rs_pos = 0,
00143   recver_pos = 0, sender_pos = 1,
00144   cc_pos = 2,
00145   callee_pos = 2, caller_pos = 3,
00146   state0_pos = 4
00147 };
00148 
00149 #define M(x) (1<<(state0_pos+SRPC_impl::x))
00150 #define ANY_STATE (-1)
00151 #define IS_SENDER (1<<sender_pos)
00152 #define IS_RECVER (1<<recver_pos)
00153 #define IS_CALLER (1<<caller_pos)
00154 #define IS_CALLEE (1<<callee_pos)
00155 #define EITHER_END (IS_CALLER | IS_CALLEE)
00156 #define FRESH_SENDER    EITHER_END | IS_SENDER | M(initial) | M(ready)
00157 #define FRESH_RECVER    EITHER_END | IS_RECVER | M(initial) | M(ready)
00158 
00159 int legal_states[] = {
00160   /* start_call       */ IS_CALLER | IS_SENDER | M(initial),
00161   /* await_call       */ IS_CALLEE | IS_RECVER | M(initial),
00162   /* send_int32       */ FRESH_SENDER | M(data_out) | M(seq_out),
00163   /* recv_int32       */ FRESH_RECVER | M(data_in)  | M(seq_in),
00164   /* send_chars       */ FRESH_SENDER | M(data_out) | M(seq_out),
00165   /* recv_chars       */ FRESH_RECVER | M(data_in)  | M(seq_in),
00166   /* recv_chars_here  */ FRESH_RECVER | M(data_in)  | M(seq_in),
00167   /* send_bytes       */ FRESH_SENDER | M(data_out) | M(seq_out),
00168   /* recv_bytes       */ FRESH_RECVER | M(data_in)  | M(seq_in),
00169   /* recv_bytes_here  */ FRESH_RECVER | M(data_in)  | M(seq_in),
00170   /* send_text        */ FRESH_SENDER | M(data_out) | M(seq_out),
00171   /* recv_text        */ FRESH_RECVER | M(data_in)  | M(seq_in),
00172   /* send_socket      */ FRESH_SENDER | M(data_out) | M(seq_out),
00173   /* recv_socket      */ FRESH_RECVER | M(data_in)  | M(seq_in),
00174   /* send_int32_seq   */ FRESH_SENDER | M(data_out) | M(seq_out),
00175   /* recv_int32_seq   */ FRESH_RECVER | M(data_in)  | M(seq_in),
00176   /* send_chars_seq   */ FRESH_SENDER | M(data_out) | M(seq_out),
00177   /* recv_chars_seq   */ FRESH_RECVER | M(data_in)  | M(seq_in),
00178   /* send_bytes_seq   */ FRESH_SENDER | M(data_out) | M(seq_out),
00179   /* recv_bytes_seq   */ FRESH_RECVER | M(data_in)  | M(seq_in),
00180   /* send_seq_start   */ FRESH_SENDER | M(data_out),
00181   /* recv_seq_start   */ FRESH_RECVER | M(data_in),
00182   /* send_seq_end     */ EITHER_END | IS_SENDER | M(seq_out),
00183   /* recv_seq_end     */ EITHER_END | IS_RECVER | M(seq_in),
00184   /* send_end         */ FRESH_SENDER | M(data_out),
00185   /* recv_end         */ FRESH_RECVER | M(data_in),
00186   /* send_failure     */ ANY_STATE,
00187 
00188   // Added later:
00189   /* send_int16       */ FRESH_SENDER | M(data_out) | M(seq_out),
00190   /* recv_int16       */ FRESH_RECVER | M(data_in)  | M(seq_in),
00191   /* send_int16_array */ FRESH_SENDER | M(data_out) | M(seq_out),
00192   /* recv_int16_array */ FRESH_RECVER | M(data_in)  | M(seq_in),
00193   /* send_int64       */ FRESH_SENDER | M(data_out) | M(seq_out),
00194   /* recv_int64       */ FRESH_RECVER | M(data_in)  | M(seq_in),
00195   /* send_int32_array */ FRESH_SENDER | M(data_out) | M(seq_out),
00196   /* recv_int32_array */ FRESH_RECVER | M(data_in)  | M(seq_in),
00197   /* send_int64_array */ FRESH_SENDER | M(data_out) | M(seq_out),
00198   /* recv_int64_array */ FRESH_RECVER | M(data_in)  | M(seq_in),
00199 
00200   /* send_bool */        FRESH_SENDER | M(data_out) | M(seq_out),
00201   /* recv_bool */        FRESH_RECVER | M(data_in)  | M(seq_in)
00202 };
00203 
00204 #undef M
00205 #undef ANY_STATE
00206 #undef IS_SENDER
00207 #undef IS_RECVER
00208 #undef IS_CALLER
00209 #undef IS_CALLEE
00210 #undef EITHER_END
00211 #undef FRESH_SENDER
00212 #undef FRESH_RECVER
00213 
00214 void SRPC_impl::check_state(int op) throw (SRPC::failure)
00215 {
00216   int m =
00217     (1<<(this->is_sender+rs_pos)) | (1<<(this->is_caller+cc_pos)) |
00218     (1<<(this->state+state0_pos));
00219   if ((m & legal_states[op]) != m) {
00220     throw_failure(SRPC::protocol_violation,
00221       Text(op_names[op]) + " isn't legal now (in state '" +
00222       state_names[this->state] + "')");
00223   }
00224 }
00225 
00226 void SRPC_impl::ensure_sending_state() throw ()
00227 {
00228   switch (this->state) {
00229   case SRPC_impl::data_out:
00230   case SRPC_impl::seq_out:
00231     break;
00232   default:
00233     this->state = SRPC_impl::data_out;
00234   }
00235 }
00236 
00237 void SRPC_impl::ensure_recving_state() throw ()
00238 {
00239   switch (this->state) {
00240   case SRPC_impl::data_in:
00241   case SRPC_impl::seq_in:
00242     break;
00243   default:
00244     this->state = SRPC_impl::data_in;
00245   }
00246 }
00247 
00248 //  ********************************
00249 //  *  Byte-sequence transmission  *
00250 //  ********************************
00251 
00252 void SRPC_impl::send_bytes(const char *bs, int len, bool flush)
00253   throw(TCP_sock::failure)
00254 {
00255   send_int32(len);
00256   s->send_data(bs, len, flush);
00257 }
00258 
00259 void SRPC_impl::recv_bytes(char *bs, int len)
00260   throw(TCP_sock::failure, SRPC::failure)
00261 {
00262   // Set or clear read timeout before trying to read data from the
00263   // peer.  Enable the read timeout if it's turned on and we're either
00264   // not in the initial state (i.e. between calls) or we're supposed
00265   // to use it between calls.
00266   if(this->use_read_timeout &&
00267      ((!this->state == SRPC_impl::initial) ||
00268       this->use_read_timeout_between_calls))
00269     {
00270       this->s->enable_read_timeout(this->read_timeout_secs);
00271     }
00272   else
00273     {
00274       this->s->disable_read_timeout();
00275     }
00276 
00277   int curr = 0;
00278   while (len > 0) {
00279     int bytes_read;
00280     bytes_read = this->s->recv_data(&bs[curr], len);
00281     curr += bytes_read;
00282     len -= bytes_read;
00283   }
00284 }
00285 
00286 //  **************************
00287 //  *  Integer transmission  *
00288 //  **************************
00289 
00290 void SRPC_impl::send_int16(Basics::int16 i, bool flush)
00291   throw (TCP_sock::failure)
00292 {
00293   i = Basics::hton16(i);
00294   this->s->send_data((const char *) &i, sizeof(i), flush);
00295 }
00296   
00297 Basics::int16 SRPC_impl::recv_int16()
00298   throw (TCP_sock::failure)
00299 {
00300   Basics::int16 x;
00301   recv_bytes((char *) &x, sizeof(x));
00302   return Basics::ntoh16(x);
00303 }
00304 
00305 void SRPC_impl::send_int32(Basics::int32 i, bool flush)
00306   throw (TCP_sock::failure)
00307 {
00308   i = Basics::hton32(i);
00309   this->s->send_data((const char *) &i, sizeof(i), flush);
00310 }
00311   
00312 Basics::int32 SRPC_impl::recv_int32()
00313   throw (TCP_sock::failure)
00314 {
00315   Basics::int32 x;
00316   recv_bytes((char *) &x, sizeof(x));
00317   return Basics::ntoh32(x);
00318 }
00319 
00320 void SRPC_impl::send_int64(Basics::int64 i, bool flush)
00321   throw (TCP_sock::failure)
00322 {
00323   i = Basics::hton64(i);
00324   this->s->send_data((const char *) &i, sizeof(i), flush);
00325 }
00326   
00327 Basics::int64 SRPC_impl::recv_int64()
00328   throw (TCP_sock::failure)
00329 {
00330   Basics::int64 x;
00331   recv_bytes((char *) &x, sizeof(x));
00332   return Basics::ntoh64(x);
00333 }
00334 
00335 //  ****************************
00336 //  *  Item code transmission  *
00337 //  ****************************
00338 
00339 void SRPC_impl::send_item_code(item_code ic, bool flush)
00340   throw (TCP_sock::failure)
00341 {
00342   char c;
00343   if (!(this->version_sent)) {
00344     c = SRPC_impl::ic_hello;
00345     this->s->send_data(&c, 1);
00346     send_bytes(SRPC_VERSION, strlen(SRPC_VERSION)+1);
00347     this->version_sent = true;
00348   }
00349   c = ic;
00350   this->s->send_data(&c, 1, flush);
00351 }
00352 
00353 item_code SRPC_impl::read_item_code()
00354   throw (TCP_sock::failure, SRPC::failure)
00355 {
00356   char c;
00357   item_code ic;
00358 
00359   // Make sure there's a connection to the other end.  This is the point at
00360   // which a callee thread will block waiting for a caller.
00361   if (this->s == (TCP_sock *)NULL && !(this->is_caller)) {
00362     this->s = this->listener->sock->wait_for_connect();
00363   }
00364 
00365   // Read an item code.
00366 
00367   if (this->buffered_ic == SRPC_impl::ic_null) {
00368     this->recv_bytes(&c, 1);
00369     ic = (item_code) (unsigned char) c;
00370   } else {
00371     ic = this->buffered_ic;
00372     this->buffered_ic = SRPC_impl::ic_null;
00373   }
00374 
00375   if (!(this->version_checked)) {
00376     if (ic != SRPC_impl::ic_hello) {
00377       this->throw_failure(SRPC::protocol_violation,
00378         "Other end didn't say hello.", true);
00379     }
00380     // Read other end's version identification
00381     int len = this->recv_int32();
00382     char *ver = NEW_PTRFREE_ARRAY(char, len+1);
00383     this->recv_bytes(ver, len);
00384     ver[len] = '\0';
00385 
00386     if (strcmp(ver, SRPC_VERSION) != 0) {
00387       delete[] ver;
00388       this->throw_failure(SRPC::protocol_violation,
00389         "Client/server SRPC version mismatch", true);
00390     }
00391     delete[] ver;
00392 
00393     // Protocol versions match.  We can communicate.
00394     // We can now accept incoming items.
00395     this->version_checked = true;
00396 
00397     // Read an item code.
00398     this->recv_bytes(&c, 1);
00399     ic = (item_code) (unsigned char) c;
00400   }
00401 
00402   if (ic == SRPC_impl::ic_failure) {
00403     // Other side reports a failure.  Throw it.
00404     int code = this->recv_int32();
00405     int len = this->recv_int32();
00406     char *str = NEW_PTRFREE_ARRAY(char, len);
00407     this->recv_bytes(str, len);
00408     Text msg(str, len);
00409     delete[] str;
00410     this->throw_failure(code, msg, /*local_only=*/ true);
00411   }
00412 
00413   return ic;
00414 }
00415 
00416 void SRPC_impl::recv_item_code(item_code expected, bool *got_end)
00417   throw (TCP_sock::failure, SRPC::failure)
00418 {
00419   item_code ic = this->read_item_code();
00420 
00421   if (got_end != (bool *)NULL) *got_end = false;
00422   if (expected == ic) return;
00423   if (this->state == SRPC_impl::seq_in && ic == SRPC_impl::ic_seq_end
00424       && got_end != (bool *)NULL) {
00425     // Put item code back and report to caller
00426     this->buffered_ic = ic;
00427     *got_end = true;
00428     return;
00429   }
00430 
00431   char msg[50];
00432   sprintf(msg, "Mix-up: expected item code %d, got %d", expected, ic);
00433   this->throw_failure(SRPC::protocol_violation, msg);
00434 }
00435 
00436 item_code SRPC_impl::recv_item_code(item_code expected1, item_code expected2,
00437                                     bool *got_end)
00438   throw (TCP_sock::failure, SRPC::failure)
00439 {
00440   item_code ic = this->read_item_code();
00441 
00442   if (got_end != (bool *)NULL) *got_end = false;
00443   if ((expected1 == ic) || (expected2 == ic)) return ic;
00444   if (this->state == SRPC_impl::seq_in && ic == SRPC_impl::ic_seq_end
00445       && got_end != (bool *)NULL) {
00446     // Put item code back and report to caller
00447     this->buffered_ic = ic;
00448     *got_end = true;
00449     return SRPC_impl::ic_seq_end;
00450   }
00451 
00452   char msg[60];
00453   sprintf(msg, "Mix-up: expected item code %d or %d, got %d",
00454           expected1, expected2, ic);
00455   this->throw_failure(SRPC::protocol_violation, msg);
00456 
00457   // Unreachable, but the compiler doesn't know that.
00458   return SRPC_impl::ic_null;
00459 }
00460 
00461 // ====================================================================
00462 
00463 //  ****************************************
00464 //  *  SRPC_listener class implementation  *
00465 //  ****************************************
00466 
00467 //  **********************
00468 //  *  Static variables  *
00469 //  **********************
00470 
00471 Basics::mutex SRPC_listener::m;         // initialized by constructor
00472 SRPC_listener *SRPC_listener::listeners = NULL;
00473 
00474 //  **********************
00475 //  *  Member functions  *
00476 //  **********************
00477 
00478 /* static */
00479 SRPC_listener *SRPC_listener::create(u_short port, TCP_sock *ls)
00480   throw (TCP_sock::failure)
00481 {
00482   SRPC_listener *s;
00483   SRPC_listener::m.lock();
00484   try {
00485     s = SRPC_listener::listeners;
00486     while (s != (SRPC_listener *)NULL) {
00487       if (s->port == port) break;
00488       s = s->next;
00489     }
00490     if (s == (SRPC_listener *)NULL) {
00491       s = NEW(SRPC_listener);
00492       s->next = SRPC_listener::listeners;
00493       if (s->my_sock = (ls == (TCP_sock *)NULL)) {
00494         s->sock = NEW_CONSTR(TCP_sock, (port));
00495         s->sock->set_waiters(INITIAL_WAITERS_KLUDGE);
00496         // Enable the TCP keep-alive timer now, so that it gets
00497         // inherited by accepted connections.
00498         s->sock->enable_keepalive();
00499       }
00500       else s->sock = ls;
00501       s->port = port;
00502       s->n = 0;
00503       SRPC_listener::listeners = s;
00504     }
00505     s->n++;
00506   } catch (...) { SRPC_listener::m.unlock(); throw; }
00507   SRPC_listener::m.unlock();
00508   return s;
00509 }
00510 
00511 /* static */
00512 bool SRPC_listener::destroy(SRPC_listener *listener)
00513   throw (TCP_sock::failure)
00514 {
00515   bool result = true;
00516   SRPC_listener::m.lock();
00517   try {
00518     if (--(listener->n) == 0) {
00519       // Remove listener from list
00520       SRPC_listener *l = SRPC_listener::listeners;
00521       SRPC_listener *prev = (SRPC_listener *)NULL;
00522       while (result = (l != (SRPC_listener *)NULL)) {
00523         if (l->sock == listener->sock) {
00524           if (prev == (SRPC_listener *)NULL) {
00525             SRPC_listener::listeners = l->next;
00526           } else {
00527             prev->next = l->next;
00528           }
00529           if (listener->my_sock) {
00530             // destroy TCP socket
00531             delete listener->sock;
00532           }
00533           delete listener;
00534           break;
00535         }
00536         prev = l;
00537         l = l->next;
00538       }
00539     }
00540   } catch (...) { SRPC_listener::m.unlock(); throw; }
00541   SRPC_listener::m.unlock();
00542   return result;
00543 }
00544 
00545 // ======================================================================
00546 
00547 /* Socket usage in SRPC and SRPC_impl:
00548    ------------------------------------
00549 
00550    A connected pair of TCP_sock objects are used for the transmission
00551    of arguments and results between the caller and callee.
00552 
00553    The connection is established by having the callee (the server)
00554    create a listener socket and wait for a connection to be made by a
00555    caller (the client).  The listener socket corresponds to an
00556    instance of RPC interface being "exported" by the server, and many
00557    threads may be listening on the socket.  Accordingly, the
00558    implementation maintains a list of listener sockets; when a callee
00559    is constructed (i.e., SRPC(callee) is invoked), the appropriate
00560    socket is found on this list (or inserted, if one didn't previously
00561    exist).  However, the callee doesn't actually wait for a connection
00562    until await_call or recv_X is invoked, at which time
00563    TCP_sock::wait_for_connection is used to obtain a unique socket for
00564    the connection.  This wait is an indefinite one, unlike most of the
00565    other waits in SRPC.
00566 
00567    The caller side of the main connection is more straightforward.  As
00568    part of the SRPC(caller) constructor invocation, a connection is
00569    attempted to a specified server.  The SRPC machinery does not
00570    explicitly time out this connection attempt, but the underlying OS
00571    socket machinery does.
00572 
00573    When the caller begins sending arguments on the main connection,
00574    the SRPC implementation prefixes a synchronizing message containing
00575    a version identifier for the SRPC wire protocol.  The callee is
00576    expected to read the version message and, if the version is
00577    acceptable, proceed with reading the first call.  (If the version
00578    is unacceptable, an exception is thrown and sent to the peer, which
00579    normally results in tremination of the connection.)
00580 
00581    Many calls using the underlying TCP_sock can block for an unbounded
00582    amount of time.  To protect against indefinitely tying up system
00583    resources in the event of a peer system crashing, rebooting, or
00584    becoming unreachable, the TCP keep-alive mechanism is turned on for
00585    both the callee and caller ends.
00586 */

Generated on Mon May 8 00:48:57 2006 for Vesta by  doxygen 1.4.2