Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

TCP_sock.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // Last modified on Mon Feb 13 01:21:00 EST 2006 by ken@xorian.net         
00020 //      modified on Fri Apr 22 14:51:27 EDT 2005 by irina.furman@intel.com 
00021 //      modified on Wed Jul 31 13:24:03 EDT 2002 by kcschalk@shr.intel.com 
00022 //      modified on Fri Sep  8 13:02:25 PDT 2000 by mann   
00023 //      modified on Thu Oct  8 09:29:35 PDT 1998 by heydon 
00024 //      modified on Tue Aug 12 18:43:19 PDT 1997 by yuanyu 
00025 //      modified on Fri Sep 27 13:54:47 PDT 1996 by sirer  
00026 //      modified on Thu Jul 11 08:00:40 PDT 1996 by levin 
00027 
00028 //  *******************************
00029 //  *  TCP sockets for pseudo RPC *
00030 //  *******************************
00031 
00032 //  Standard definitions
00033 
00034 #include <sys/types.h>
00035 #include <netinet/tcp.h>
00036 #include <poll.h>
00037 #include <Basics.H>
00038 #include <OS.H>
00039 #include <VestaConfig.H>
00040 
00041 #if defined(__sun__)
00042 #include <sys/filio.h>
00043 #endif
00044 
00045 #include "TCP_sock.H"
00046 
00047 #if defined(__digital__)
00048 // The sys/socket.h checked into Vesta for Tru64 is missing this POSIX
00049 // typedef
00050 typedef int socklen_t;
00051 
00052 #ifndef NETDB_INTERNAL
00053 // This should probably be removed after we update the Tru64 libc
00054 // checked into Vesta.
00055 #define NETDB_INTERNAL  -1      /* see errno */
00056 #endif
00057 #endif
00058 
00059 //  *******************************
00060 //  *  Miscellaneous definitions  *
00061 //  *******************************
00062 
00063 const char *TCP_NAME = "tcp";
00064 
00065 //  ************************************************
00066 //  *  Static variables defined in TCP_sock class  *
00067 //  ************************************************
00068 
00069 pthread_once_t TCP_sock::init_block = PTHREAD_ONCE_INIT;
00070 
00071 // static class data
00072 Text TCP_sock::my_hostname;
00073 sockaddr_in TCP_sock::defaultsock;
00074 in_addr TCP_sock::self_addr;
00075 int TCP_sock::tcp_protolevel;
00076 static unsigned int g_resolv_try_again_limit = 100;
00077 
00078 #if defined(__linux__)
00079 Basics::mutex TCP_sock::ndb_mutex;
00080 #endif
00081 
00082 //  ***************
00083 //  *  Utilities  *
00084 //  ***************
00085 
00086 static void die(const Text &m) throw (TCP_sock::failure)
00087 {
00088   throw TCP_sock::failure(TCP_sock::internal_trouble, m);
00089 }
00090 
00091 static void die_with_errno(int reason, const Text &m, int e = 0)
00092   throw (TCP_sock::failure)
00093 {
00094   if (e == 0) e = errno;
00095   Text emsg = Basics::errno_Text(e);
00096   throw TCP_sock::failure(reason, m + " (" + emsg + ")", e);
00097 }
00098 
00099 extern "C"
00100 {
00101   void TCP_sock_init()
00102   {
00103     TCP_sock::init();
00104   }
00105 }
00106 
00107 /* static */
00108 void TCP_sock::ensure_init() throw (TCP_sock::failure)
00109 {
00110   if (pthread_once(&init_block, TCP_sock_init) != 0)
00111     throw TCP_sock::failure(internal_trouble, "pthread_once failed!");
00112 }
00113 
00114 static bool close_fd(fd_t &fd) throw ()
00115 {
00116   if (fd == NO_FD) return true;
00117   if (close(fd) == SYSERROR) return false;
00118   fd = NO_FD;
00119   return true;
00120 }
00121 
00122 static Text TCP_state_str[4] = { "fresh", "listening", "connected", "dead" };
00123 
00124 void TCP_sock::ensure_state(sock_state should_be)
00125   throw (TCP_sock::failure)
00126 {
00127   if (this->state != should_be) {
00128     sock_state is = this->state;
00129     this->state = TCP_sock::dead;
00130     throw failure(TCP_sock::wrong_state,
00131         Text("Socket in wrong state; is: ") + TCP_state_str[is] +
00132         ", should be: " + TCP_state_str[should_be]);
00133   }
00134 }
00135 
00136 void TCP_sock::ensure_sndbuffer() throw ()
00137 {
00138   if (this->sndbuff != NULL) return;
00139   this->sndbuff = NEW_PTRFREE_ARRAY(char, this->sndbuff_size);
00140   this->sndbuff_len = 0;
00141 }
00142 
00143 void TCP_sock::ensure_rcvbuffer() throw ()
00144 {
00145   if (this->rcvbuff != NULL) return;
00146   this->rcvbuff = NEW_PTRFREE_ARRAY(char, this->rcvbuff_size);
00147   this->rcvcurp = this->rcvend = this->rcvbuff;
00148 }
00149 
00150 void TCP_sock::configure_blocking() throw (TCP_sock::failure)
00151 {
00152   // Establish blocking/non-blocking mode
00153   bool non_blocking = (this->alerts_enabled || this->read_timeout_enabled);
00154   if (this->non_blocking != non_blocking) {
00155     // Convert to int type needed for FIONBIO parameter.
00156     int nb_param = non_blocking ? 1 : 0;
00157     if (ioctl(this->fd, FIONBIO, &nb_param) == SYSERROR)
00158       die_with_errno(TCP_sock::internal_trouble, "ioctl(FIONBIO) failed");
00159     this->non_blocking = non_blocking;
00160   }
00161 }
00162 
00163 void TCP_sock::controlled_wait()
00164   throw (TCP_sock::alerted, TCP_sock::failure)
00165 /* This procedure uses poll(2) to wait until one of four things happens:
00166 
00167    1. 'poll' returns indicating that data is available to be read
00168       on the file descriptor 'fd'.  In this case, the procedure
00169       simply returns.  There may also be data available to be read on
00170       'pipe_rd', but, if so, it is left untouched.  The procedure returns
00171       normally.
00172 
00173    2. 'poll' returns indicating that data is available to be read
00174       solely on the file descriptor 'pipe_rd'.  This means there may
00175       be a pending alert, represented by 'alert_flag'.  All available
00176       data is read from 'pipe_rd'.  Then, if 'alert_flag' is true,
00177       it is set to false and 'alerted' is thrown.  If 'alert_flag' is
00178       is false, 'poll' is retried.
00179 
00180    3. 'poll' returns 0, indicating that the timeout has elapsed.  If
00181    'read_timeout_enabled' is true, throw 'failure' with the reason
00182    'read_timeout'.
00183 
00184    4. None of the above.  The procedure throws 'failure'.
00185 
00186    Not all of these cases can occur in all configurations.  Case 1 is
00187    always possible.  Case 2 can arise only if 'alerts_enabled' is
00188    true.  Case 3 can arise only if 'read_timeout_enabled' is true.
00189    Case 4, of course, can always occur.
00190 
00191 */
00192 {
00193   // By default, wait indefinitely.
00194   int timeout = -1;
00195   time_t wait_start;
00196 
00197   // If read timeouts are enabled, use a positive timeout with poll.
00198   if(this->read_timeout_enabled)
00199     {
00200       // Convert to milliseconds
00201       timeout = this->read_timeout_secs * 1000;
00202 
00203       // If we have a time limit, remember when we start waiting in case
00204       // we get interrupted to try and avoid waiting indefinitely.  (We
00205       // could still be in trouble if we get interrupted after less than
00206       // a second.)
00207       wait_start = time(NULL);
00208     }
00209 
00210   while (true) {
00211     struct pollfd poll_data[2];
00212     unsigned int nfds = 1;
00213     poll_data[0].fd = this->fd;
00214     poll_data[0].events = POLLIN;
00215     if (this->alerts_enabled) {
00216       nfds = 2;
00217       poll_data[1].fd = this->pipe_rd;
00218       poll_data[1].events = POLLIN;
00219     }
00220 
00221     int nready = poll(poll_data, nfds, timeout);
00222 
00223     if (nready == SYSERROR) {
00224       // Case 4: error.  Deal with transient error cases.
00225       if (errno == EAGAIN)
00226         // Insufficient resources to perform the 'poll' (whatever
00227         // that means...).  Wait a while (with 'sleep', hoping that
00228         // it uses different resources!) and try again.
00229         sleep(1);
00230       else if (errno != EINTR) {
00231         this->state = TCP_sock::dead;
00232         die_with_errno(TCP_sock::internal_trouble, "Error on 'poll'");
00233       }
00234       // Loop and retry 'poll' if EINTR or EAGAIN.
00235     } else if(poll_data[0].revents & POLLNVAL) {
00236       // Case 4:  error
00237       this->state = TCP_sock::dead;
00238       die("Error on socket input");
00239     } else if(this->alerts_enabled && 
00240               (poll_data[1].revents & (POLLERR | POLLNVAL))) {
00241       // Case 4:  error
00242       this->state = TCP_sock::dead;
00243       die("Error on alert channel pipe");
00244     } else if(poll_data[0].revents & (POLLIN | POLLERR)) {
00245       // Case 1: data available on sock_rd.  (Note: we treat POLLERR
00246       // as readable so that we can get the error status on a
00247       // subsequent read.  This is also the way select(2) treats
00248       // errors.)
00249       break;
00250     } else if (this->alerts_enabled && (poll_data[1].revents & POLLIN)) {
00251       // Case 2:  data available on pipe_rd
00252       this->m.lock();
00253       // Drain the pipe by reading 'bytes_in_pipe' bytes of data
00254       // from it.  In principle, this number can be arbitrarily large;
00255       // in practice, it is 1.  Thus, the following inefficient code
00256       // is ok; it avoids allocating a buffer.
00257       while (this->bytes_in_pipe > 0) {
00258         char c;
00259         if (read(this->pipe_rd, &c, 1) == SYSERROR) {
00260           this->state = TCP_sock::dead;
00261           this->m.unlock();
00262           die_with_errno(TCP_sock::internal_trouble, "Error reading pipe");
00263         }
00264         this->bytes_in_pipe--;
00265       }
00266       if (this->alert_flag) {
00267         this->alert_flag = false;
00268         this->m.unlock();
00269         throw TCP_sock::alerted();
00270       }
00271       this->m.unlock();
00272     } else if(this->read_timeout_enabled && (nready == 0)) {
00273       // Case 3: no data from peer within the timeout period.
00274       throw
00275         TCP_sock::failure(TCP_sock::read_timeout,
00276                           "read timeout (no data from peer)");
00277     }
00278     // Go around the loop again, retrying the 'poll'.
00279 
00280     if(this->read_timeout_enabled)
00281       {
00282         // Check to see how much time there is left until we reach our
00283         // limit waiting for the peer.
00284         time_t now = time(NULL);
00285 
00286         // Technically, time can go backwards (thanks to things like
00287         // ntp), or we could get here less than a second after we
00288         // start waiting.
00289         if(now > wait_start)
00290           {
00291             if((now - wait_start) < this->read_timeout_secs)
00292               {
00293                 // Still some time left: decrease the timeout used with poll.
00294                 timeout = (this->read_timeout_secs - (now - wait_start)) * 1000;
00295               }
00296             else
00297               {
00298                 // Technically the timeout has elapsed, but maybe we
00299                 // got EINTR or EAGAIN, so give it one more chance
00300                 // with a minimal timeout.
00301                 timeout = 1;
00302               }
00303           }
00304       }
00305 
00306   }
00307 }
00308 
00309 //  ********************
00310 //  *  Initialization  *
00311 //  ********************
00312 
00313 /* static */
00314 void TCP_sock::init() throw (TCP_sock::failure)
00315 {
00316     {
00317       // If set, [TCP_sock]resolv_try_again_limit specifies the
00318       // maximum number of times we'll retry resolving a hostname with
00319       // the non-fatal TRY_AGAIN error.
00320       Text try_again_limit;
00321       if(VestaConfig::get("TCP_sock", "resolv_try_again_limit",
00322                           try_again_limit))
00323         {
00324           g_resolv_try_again_limit = atoi(try_again_limit.cchars());
00325         }
00326     }
00327 
00328     // Who am I?  Save the local host name.  (This is used in some
00329     // cases where one program needs to tell another "call me back".)
00330     char temp_hostname[MAXHOSTNAMELEN+1];
00331     if (gethostname(temp_hostname, MAXHOSTNAMELEN) == SYSERROR)
00332       die_with_errno(TCP_sock::internal_trouble,
00333                      "initial gethostname failed!");
00334     TCP_sock::my_hostname = Text(temp_hostname);
00335 
00336     // Default values for connections.  Use the wildcard address by
00337     // default, which let the OS chose the right SRC IP.
00338     memset((char *)(&(TCP_sock::defaultsock)), 0,
00339            sizeof(TCP_sock::defaultsock));
00340     TCP_sock::defaultsock.sin_family = AF_INET;
00341     TCP_sock::defaultsock.sin_addr.s_addr = INADDR_ANY;
00342 
00343     // Look up our hostname's IP address and remember it.
00344     try
00345       {
00346         TCP_sock::self_addr = TCP_sock::host_to_addr(TCP_sock::my_hostname);
00347       }
00348     catch(TCP_sock::failure f)
00349       {
00350         // If we have trouble, add a little text to emphasize that
00351         // this happened while trying to resolve our own hostname.
00352         f.msg += " (resolving our own hostname!)";
00353         throw f;
00354       }
00355 
00356     // Determine TCP protocol number.
00357 
00358     // This code used to use getprotobyname_r, but that's not standard
00359     // between platforms, and some don't even have it.  We could use
00360     // getprotobyname, but the TCP protocol number is available as a
00361     // constant, so we'll just use that.
00362 
00363     TCP_sock::tcp_protolevel = IPPROTO_TCP;
00364     endprotoent();
00365 }
00366 
00367 //  ******************
00368 //  *  Construction  *
00369 //  ******************
00370 
00371 TCP_sock::TCP_sock(u_short port) throw (TCP_sock::failure)
00372 {
00373   TCP_sock::ensure_init();
00374   this->instance_init();
00375   this->new_socket(port);
00376 }
00377 
00378 TCP_sock::TCP_sock(const empty_sock_marker &p_ignored)
00379   throw (TCP_sock::failure)
00380 {
00381   TCP_sock::ensure_init();
00382 
00383   this->instance_init();
00384 
00385   // No file descriptor here.
00386   this->fd = SYSERROR;
00387 }
00388 
00389 /* static */
00390 void TCP_sock::set_no_delay(int fd) throw (TCP_sock::failure)
00391 /* Tell kernel "send" operations on "fd" to send without delay, since
00392    we do our own output buffering. This disables Nagle's algorithm. */
00393 {
00394   int no_delay = 1;
00395   if (setsockopt(fd, TCP_sock::tcp_protolevel, TCP_NODELAY,
00396       (char *)(&no_delay), sizeof(no_delay)) == SYSERROR) {
00397     die_with_errno(TCP_sock::internal_trouble,
00398       "setsocketopt failed on TCP_NODELAY");
00399   }
00400 }
00401 
00402 void TCP_sock::instance_init() throw()
00403 {
00404   memset(&this->me, 0, sizeof(this->me));
00405 
00406   // Initialize other fields.
00407   this->sndbuff = (char *)NULL;
00408   this->sndbuff_len = 0;
00409 
00410   this->rcvbuff = (char *)NULL;
00411   this->rcvcurp = (char *)NULL;
00412   this->rcvend = (char *)NULL;
00413 
00414   this->sndbuff_size = TCP_sock::default_sndbuffer_size;
00415   this->rcvbuff_size = TCP_sock::default_rcvbuffer_size;
00416 
00417   this->non_blocking = false;
00418 
00419   this->alerts_enabled = false;
00420 
00421   this->read_timeout_enabled = false;
00422 
00423   this->state = TCP_sock::fresh;
00424 }
00425 
00426 void TCP_sock::new_socket(u_short port)
00427   throw (TCP_sock::failure)
00428 {
00429   // Initialize to default values.  This includes using the wildcard
00430   // address (INADDR_ANY) for the IP.
00431   this->me = TCP_sock::defaultsock;
00432 
00433   // Use the specified port
00434   this->me.sin_port = htons(port);
00435 
00436   // First, create an Internet domain socket for TCP.
00437   this->fd = socket(AF_INET, SOCK_STREAM, TCP_sock::tcp_protolevel);
00438   if (this->fd == SYSERROR) {
00439     int code = (errno == ENOBUFS || errno == ENOMEM)
00440       ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
00441     die_with_errno(code, "couldn't create TCP socket");
00442   }
00443 
00444   // We allow reuse of a previous socket name to allow listeners to come
00445   // back at a well-known address following a crash.
00446   int reuseaddr = 1;
00447   if (setsockopt(this->fd, SOL_SOCKET, SO_REUSEADDR,
00448       (char *)(&reuseaddr), sizeof(reuseaddr)) == SYSERROR)
00449     die_with_errno(TCP_sock::internal_trouble, "setsockopt(reuseaddr) failed");
00450 
00451   // Next, bind the specified address/port to the socket.
00452   if (bind(this->fd, (sockaddr *)(&me), sizeof(me)) == SYSERROR) {
00453     int code = (errno == EADDRINUSE) ? TCP_sock::environment_problem :
00454       (errno == EADDRNOTAVAIL || errno == EACCES)
00455       ? TCP_sock::invalid_parameter : TCP_sock::internal_trouble;
00456     die_with_errno(code,
00457       "bind socket to " + inet_ntoa_r(this->me.sin_addr) + " failed!");
00458   }
00459 
00460   // The socket is now has a name.  If the port was omitted, fill it in.
00461   if (this->me.sin_port == 0) {
00462     socklen_t size = sizeof(this->me);
00463     if (getsockname(this->fd, (sockaddr *)(&(this->me)), &size) == SYSERROR) {
00464       int code = (errno == ENOBUFS)
00465         ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
00466       die_with_errno(code,
00467         "getsockname failed on " + inet_ntoa_r(this->me.sin_addr));
00468     }
00469   };
00470 
00471   // Disable Nagle's algorithm
00472   TCP_sock::set_no_delay(this->fd);
00473 }
00474 
00475 TCP_sock::~TCP_sock() throw (TCP_sock::failure)
00476 {
00477   if (this->fd != SYSERROR) (void)close_fd(this->fd);
00478   if (this->alerts_enabled) {
00479     if (!close_fd(this->pipe_wr))
00480       die_with_errno(TCP_sock::internal_trouble, "close of pipe_wr failed!");
00481     if (!close_fd(this->pipe_rd))
00482       die_with_errno(TCP_sock::internal_trouble, "close of pipe_rd failed!");
00483     this->alerts_enabled = false;
00484   }
00485   if (this->sndbuff != (char *)NULL) delete[] this->sndbuff;
00486   if (this->rcvbuff != (char *)NULL) delete[] this->rcvbuff;
00487 }
00488 
00489 //  ****************************
00490 //  *  Optional configuration  *
00491 //  ****************************
00492 
00493 void TCP_sock::set_output_buffer_size(int bytes)
00494   throw (TCP_sock::failure)
00495 {
00496   if (bytes <= 0)
00497     throw(TCP_sock::failure(TCP_sock::invalid_parameter,
00498       "Illegal buffer size"));
00499   this->m.lock();
00500   try {
00501     if (this->sndbuff != (char *)NULL) {
00502       throw(TCP_sock::failure(TCP_sock::wrong_state,
00503         "Can't set output buffer size now"));
00504     }
00505     this->sndbuff_size = bytes;
00506   } catch(TCP_sock::failure &f) { this->m.unlock(); throw; }
00507   this->m.unlock();
00508 }
00509 
00510 void TCP_sock::set_input_buffer_size(int bytes)
00511   throw (TCP_sock::failure)
00512 {
00513   if (bytes <= 0)
00514     throw(TCP_sock::failure(TCP_sock::invalid_parameter,
00515       "Illegal buffer size"));
00516   this->m.lock();
00517   try {
00518     if (this->rcvbuff != (char *)NULL) {
00519       throw(TCP_sock::failure(TCP_sock::wrong_state,
00520         "Can't set input buffer size now"));
00521     }
00522     this->rcvbuff_size = bytes;
00523   } catch(failure &f) { this->m.unlock(); throw; }
00524   this->m.unlock();
00525 }
00526 
00527 void TCP_sock::enable_alerts() throw (TCP_sock::failure)
00528 {
00529   this->m.lock();
00530   try {
00531     if (!(this->alerts_enabled)) {
00532       fd_t pipe_fds[2];
00533       if (pipe(pipe_fds) == SYSERROR) {
00534         int code = (errno == ENFILE || errno == ENOMEM)
00535           ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
00536         die_with_errno(code, "pipe call failed!");
00537       }
00538       this->pipe_rd = pipe_fds[0];
00539       this->pipe_wr = pipe_fds[1];
00540       this->bytes_in_pipe = 0;
00541       this->alert_flag = false;
00542       this->alerts_enabled = true;
00543       this->configure_blocking();
00544     }
00545   } catch(...) { this->m.unlock(); throw; }
00546   this->m.unlock();
00547 }
00548 
00549 void TCP_sock::disable_alerts() throw (TCP_sock::failure)
00550 {
00551   this->m.lock();
00552   try {
00553     if (this->alerts_enabled) {
00554       if (!close_fd(this->pipe_wr))
00555         die_with_errno(TCP_sock::internal_trouble, "close of pipe_wr failed!");
00556       if (!close_fd(this->pipe_rd))
00557         die_with_errno(TCP_sock::internal_trouble, "close of pipe_rd failed!");
00558       this->alerts_enabled = false;
00559       this->configure_blocking();
00560     }
00561   } catch(...) { this->m.unlock(); throw; }
00562   this->m.unlock();
00563 }
00564 
00565 void TCP_sock::alert() throw (TCP_sock::failure)
00566 {
00567   this->m.lock();
00568   try {
00569     if (!(this->alerts_enabled)) {
00570       throw(TCP_sock::failure(TCP_sock::wrong_state,
00571         "alerts aren't enabled on this socket"));
00572     }
00573     if (!(this->alert_flag)) {
00574       char c = '!';
00575       if (write(this->pipe_wr, &c, 1) == SYSERROR)
00576         die_with_errno(TCP_sock::internal_trouble, "write to pipe failed!");
00577       this->bytes_in_pipe++;
00578       this->alert_flag = true;
00579     }
00580   } catch(...) { this->m.unlock(); throw; }
00581   this->m.unlock();
00582 }
00583 
00584 bool TCP_sock::test_alert() throw(TCP_sock::failure)
00585 {
00586   bool res;
00587   this->m.lock();
00588   try {
00589     if (!(this->alerts_enabled))
00590       throw failure(TCP_sock::wrong_state,
00591         "alerts aren't enabled on this socket.");
00592     res = this->alert_flag;
00593     this->alert_flag = false;
00594   } catch(...) { this->m.unlock(); throw; }
00595   this->m.unlock();
00596   return res;
00597 }
00598 
00599 void TCP_sock::set_keepalive(bool state)  throw (failure)
00600 {
00601   int keepalive = state ? 1 : 0;
00602 
00603   this->m.lock();
00604   int result = setsockopt(this->fd, SOL_SOCKET, SO_KEEPALIVE,
00605                           (char *)(&keepalive), sizeof(keepalive));
00606   this->m.unlock();
00607 
00608   if(result == SYSERROR)
00609     {
00610       die_with_errno(TCP_sock::internal_trouble,
00611                      "setsocketopt failed on SO_KEEPALIVE");
00612     }
00613 }
00614 
00615 bool TCP_sock::get_keepalive() throw (failure)
00616 {
00617   int keepalive = 0;
00618   socklen_t keepalive_size = sizeof(keepalive);
00619 
00620   this->m.lock();
00621   int result = getsockopt(this->fd, SOL_SOCKET, SO_KEEPALIVE,
00622                           (char *)(&keepalive), &keepalive_size);
00623   this->m.unlock();
00624 
00625   if(result == SYSERROR)
00626     {
00627       die_with_errno(TCP_sock::internal_trouble,
00628                      "getsocketopt failed on SO_KEEPALIVE");
00629     }
00630 
00631   return (keepalive != 0);
00632 }
00633 
00634 void TCP_sock::enable_read_timeout(unsigned int seconds)
00635   throw (TCP_sock::failure)
00636 {
00637   read_timeout_enabled = true;
00638   read_timeout_secs = seconds;
00639 
00640   // The read timeout enables non-blocking I/O, so we may need to turn
00641   // that on.
00642   configure_blocking();
00643 }
00644 
00645 void TCP_sock::disable_read_timeout() throw (TCP_sock::failure)
00646 {
00647   read_timeout_enabled = false;
00648 
00649   // The read timeout enables non-blocking I/O, so we may need to turn
00650   // that off.
00651   configure_blocking();
00652 }
00653 
00654 
00655 //  *******************
00656 //  *  Interrogation  *
00657 //  *******************
00658 
00659 Text TCP_sock::this_host() throw (TCP_sock::failure)
00660 {
00661   TCP_sock::ensure_init();
00662   return TCP_sock::my_hostname;
00663 }
00664 
00665 void TCP_sock::get_local_addr(/*OUT*/ sockaddr_in &addr)
00666   throw (TCP_sock::failure)
00667 {
00668   addr = this->me;
00669 
00670   // If we're a listener or not yet connected, we probably have the
00671   // wildcard address for our end.  In this case, return the IP
00672   // address that our hostname resolves to.
00673   if(addr.sin_addr.s_addr == INADDR_ANY)
00674     {
00675       addr.sin_addr = TCP_sock::self_addr;
00676     }
00677 }
00678 
00679 void TCP_sock::get_remote_addr(/*OUT*/ sockaddr_in &addr)
00680   throw (TCP_sock::failure)
00681 {
00682   if (this->state != TCP_sock::dead) {
00683     // Permit either connected or dead, though exception message will 
00684     // refer only to connected.
00685     this->ensure_state(TCP_sock::connected);
00686   }
00687   addr = this->him;
00688 }
00689 
00690 bool TCP_sock::alive() throw (TCP_sock::failure)
00691 {
00692   bool result = true;
00693   this->m.lock();
00694   try {
00695     // Use 'poll' with a short timeout to determine the state of
00696     // the socket.  If 'poll' returns indicating data available or
00697     // timeout, assume the socket is happy.
00698     while (true) {
00699       struct pollfd poll_data;
00700       poll_data.fd = this->fd;
00701       poll_data.events = POLLIN;
00702       int nready = poll(&poll_data, 1, 0);
00703 
00704       if (nready == 0) break;      // timeout; assume healthy
00705       else if (nready > 0) {
00706         if (poll_data.revents & (POLLERR | POLLNVAL)) {
00707           result = false;  // unhappy
00708         } else if (poll_data.revents & POLLIN) {
00709           int bytes_to_read;
00710           if (ioctl(this->fd, FIONREAD, &bytes_to_read) == SYSERROR)
00711             die_with_errno(TCP_sock::internal_trouble, "Error on 'ioctl'");
00712           result = (bytes_to_read != 0);
00713         } else {
00714           assert(false);
00715         }
00716         break;
00717       } else if (nready == SYSERROR) {
00718         // Deal with transient error cases.
00719         if (errno == EAGAIN) {
00720           // Insufficient resources to perform the 'poll' (whatever
00721           // that means...).  Wait a while (with 'sleep', hoping that
00722           // it uses different resources!) and try again.
00723           sleep(1);
00724           // Go around the loop again, retrying the 'poll'.
00725         } else if (errno != EINTR) {
00726           this->state = TCP_sock::dead;
00727           die_with_errno(TCP_sock::internal_trouble, "Error on 'poll'");
00728         }
00729         // Loop and retry 'poll' if EINTR/EAGAIN.
00730       } else {
00731         assert(false);
00732       }
00733     }
00734   } catch (failure &f) { this->m.unlock(); throw; }
00735   this->m.unlock();
00736   return result;
00737 }
00738 
00739 u_short TCP_sock::parse_port(const Text &port)
00740   throw (TCP_sock::failure)
00741 {
00742   u_short l_result = 0;
00743 
00744   // Parse port number or look it up (as service name) in network database.
00745   if (isdigit(port[0]))
00746     {
00747       l_result = atoi(port.cchars());
00748     }
00749   else
00750     {
00751 #if defined(__digital__) || defined(__linux__)
00752 #define USE_GETSERVBYNAME_R
00753 #endif
00754 
00755 #if defined(USE_GETSERVBYNAME_R)
00756       // Data structures for getservbyname_r
00757       struct servent sp;
00758 # if defined(__digital__)
00759       struct servent_data sd;
00760 # elif defined(__linux__)
00761       struct servent *sp_result;
00762       // getservbyname_r uses this storage when filling entries in the
00763       // struct servent.  The size of this array was an arbitrary
00764       // choice.  There may be better or worse values.  Somebody should
00765       // look into it.
00766       char sp_buf[1024];
00767 # endif
00768 
00769 # if defined(__digital__)
00770       memset((char *)(&sd), 0, sizeof(sd));
00771 # elif defined(__linux__)
00772       memset(sp_buf, 0, sizeof(sp_buf));
00773 # endif
00774 
00775 # if defined(__digital__)
00776       if (getservbyname_r(port.cchars(), TCP_NAME, &sp, &sd) == SYSOK)
00777 # elif defined(__linux__)
00778       ndb_mutex.lock(); 
00779       if (getservbyname_r(port.cchars(), TCP_NAME,
00780                              &sp,
00781                              sp_buf, sizeof(sp_buf),
00782                              &sp_result) == SYSOK)
00783 # else
00784 # error Do not know how to do getservbyname_r on this platform!
00785 # endif
00786         {
00787 # if defined(__linux__)
00788           ndb_mutex.unlock(); 
00789 # endif
00790           l_result = ntohs(sp.s_port);  // returned in network byte order
00791           endservent();
00792         } else {
00793 # if defined(__linux__)
00794           ndb_mutex.unlock(); 
00795 # endif
00796           throw(TCP_sock::failure(TCP_sock::unknown_port,
00797                                   "unknown port: " + port));
00798         }
00799 #else
00800       struct addrinfo hints, *res = 0;
00801       memset(&hints, 0, sizeof(hints));
00802       hints.ai_family = PF_INET;
00803       int err = getaddrinfo(0, port.cchars(), &hints, &res);
00804       if(err != 0)
00805         {
00806           throw(TCP_sock::failure(TCP_sock::unknown_port,
00807                                   "unknown port: " + port + ": " +
00808                                   gai_strerror(err)));
00809         }
00810       assert(res != 0);
00811 
00812       bool found = false;
00813       for(struct addrinfo *cur = res;
00814           (cur != 0) && !found;
00815           cur = cur->ai_next)
00816         {
00817           assert(cur->ai_addr != 0);
00818           if(cur->ai_addr->sa_family == AF_INET)
00819             {
00820               l_result = ((sockaddr_in *) cur->ai_addr)->sin_port;
00821               found = true;
00822             }
00823         }
00824 
00825       freeaddrinfo(res);
00826 
00827       if(!found)
00828         {
00829           throw(TCP_sock::failure(TCP_sock::unknown_port,
00830                                   "unknown port: " + port +
00831                                   " (getaddrinfo indicates success, "
00832                                   "but no usable result found)"));
00833         }
00834 #endif
00835     }
00836 
00837   // Return the port number.
00838   return l_result;
00839 }
00840 
00841 in_addr TCP_sock::host_to_addr(const Text &hostname)
00842   throw (TCP_sock::failure)
00843 {
00844   in_addr l_result;
00845 
00846   // Parse host name or look it up in network database.
00847   int dummy;
00848   if(isdigit(hostname[0]) &&
00849      (sscanf(hostname.chars(), "%d.%d.%d.%d",
00850              &dummy, &dummy, &dummy, &dummy) == 4))
00851     {
00852       l_result.s_addr = inet_addr(hostname.chars());
00853     }
00854   else
00855     {
00856 #if defined(__digital__) || defined(__linux__)
00857 #define USE_GETHOSTBYNAME_R
00858 #endif
00859 
00860 #if defined(USE_GETHOSTBYNAME_R)
00861       // Data structures for gethostbyname_r
00862       struct hostent hp;
00863 # if defined(__digital__)
00864       struct hostent_data hd;
00865 # elif defined(__linux__)
00866       struct hostent *hp_result;
00867       // gethostbyname_r uses this storage when filling entries in the
00868       // struct hostent.  The size of this array was an arbitrary
00869       // choice.  There may be better or worse values.  Somebody
00870       // should look into it.
00871       char hp_buf[1024];
00872 # endif
00873       int h_errno_r;
00874 
00875       memset((char *)&l_result, 0, sizeof(l_result));
00876 # if defined(__digital__)
00877       memset((char *)(&hd), 0, sizeof(hd));
00878 # elif defined(__linux__)
00879       memset(hp_buf, 0, sizeof(hp_buf));
00880 # endif
00881 
00882       // Used to count the number of times we've gotten "TRY_AGAIN".
00883       unsigned int try_again_count = 0;
00884       while(1)
00885         {
00886           // If we've been spinning with h_errno == TRY_AGAIN, the
00887           // name resolution setup is probably misconfigured.
00888           if(try_again_count > g_resolv_try_again_limit)
00889             {
00890               throw
00891                 TCP_sock::failure(TCP_sock::unknown_host,
00892                                   Text("misconfigured name resolution "
00893                                        "(spinning on non-fatal error) "
00894                                        "resolving host: ") +
00895                                   hostname);
00896               
00897             }
00898 # if defined(__digital__)
00899           if (gethostbyname_r(hostname.chars(), &hp, &hd) == SYSERROR)
00900 # elif defined(__linux__)
00901           ndb_mutex.lock();         
00902           if (gethostbyname_r(hostname.chars(), &hp,
00903                               hp_buf, sizeof(hp_buf),
00904                               &hp_result,
00905                               &h_errno_r) != SYSOK)
00906 # else
00907 # error Do not know how to do gethostbyname_r on this platform!
00908 # endif
00909             {
00910 # if defined(__digital__)
00911               h_errno_r = h_errno; 
00912 # elif defined(__linux__)
00913               ndb_mutex.unlock(); 
00914 # endif
00915               switch (h_errno_r)
00916                 {
00917                 case TRY_AGAIN:
00918                   // TRY_AGAIN means there was a non-fatal problem
00919                   // with resolving this name.  Loop around and give
00920                   // it another shot.  Count the number of times this
00921                   // happens.
00922                   try_again_count++;
00923                   break;
00924                 case NETDB_INTERNAL:
00925                   // An internal error, refer to errno.
00926                   die_with_errno(TCP_sock::unknown_host,
00927                                  Text("error resolving host: ") + hostname);
00928                 case HOST_NOT_FOUND:
00929                   // No such host.
00930                 case NO_DATA:
00931                   // No A record for host.
00932                   throw(TCP_sock::failure(TCP_sock::unknown_host,
00933                                           Text("unknown host: ") + hostname));
00934                 case NO_RECOVERY:
00935                   // Unrecoverable error in host lookup.
00936                 default:
00937                   // There shouldn't be any other cases that'll get us
00938                   // here, so stick with the generic message.
00939                   throw(TCP_sock::failure(TCP_sock::unknown_host,
00940                                           Text("error resolving host: ") +
00941                                           hostname));
00942                 }
00943             }
00944           else
00945             {
00946 # if defined(__linux__)
00947               ndb_mutex.unlock(); 
00948 # endif
00949 
00950               // At this time, we're not ready for IPv6 or any other
00951               // address family type.  At least TCP_sock.C and
00952               // probably the rest of the SRPC library would need a
00953               // significant audit before being ready for that.
00954               if(hp.h_addrtype != AF_INET)
00955                 {
00956                   throw(TCP_sock::failure(TCP_sock::unknown_host,
00957                                           Text("non-IPv4 host: ") + hostname));
00958                 }
00959 
00960               // I've seen this happen (and cause a crash) in some
00961               // cases.  (Why is gethostbyname_r so broken?)  Treat
00962               // this like TRY_AGAIN.
00963               if(hp.h_addr == 0)
00964                 {
00965                   try_again_count++;
00966                   continue;
00967                 }
00968 
00969               memcpy((char *)(&l_result), hp.h_addr, hp.h_length);
00970               endhostent();
00971               break;
00972             }
00973         }
00974 #else
00975       // Used to count the number of times we've gotten "TRY_AGAIN".
00976       unsigned int try_again_count = 0;
00977       while(1)
00978         {
00979           // If we've been spinning with err == EAI_AGAIN, the
00980           // name resolution setup is probably misconfigured.
00981           if(try_again_count > g_resolv_try_again_limit)
00982             {
00983               throw
00984                 TCP_sock::failure(TCP_sock::unknown_host,
00985                                   Text("misconfigured name resolution "
00986                                        "(spinning on non-fatal error) "
00987                                        "resolving host: ") +
00988                                   hostname);
00989               
00990             }
00991 
00992           struct addrinfo hints, *res = 0;
00993           memset(&hints, 0, sizeof(hints));
00994           hints.ai_family = PF_INET;
00995           int err = getaddrinfo(hostname.cchars(), 0, &hints, &res);
00996           if(err != 0)
00997             {
00998               switch (err)
00999                 {
01000                 case EAI_AGAIN:
01001                   // TRY_AGAIN means there was a non-fatal problem
01002                   // with resolving this name.  Loop around and give
01003                   // it another shot.  Count the number of times this
01004                   // happens.
01005                   try_again_count++;
01006                   break;
01007                 case EAI_SYSTEM:
01008                   // An internal error, refer to errno.
01009                   die_with_errno(TCP_sock::unknown_host,
01010                                  Text("error resolving host: ") + hostname);
01011                 case EAI_NONAME:
01012                   // No such host.
01013 #ifdef EAI_NODATA
01014                 case EAI_NODATA:
01015                   // No A record for host.
01016 #endif
01017                   throw(TCP_sock::failure(TCP_sock::unknown_host,
01018                                           Text("unknown host: ") + hostname));
01019                 case EAI_FAIL:
01020                   // Unrecoverable error in host lookup.
01021                 default:
01022                   // There shouldn't be any other cases that'll get us
01023                   // here, so stick with the generic message.
01024                   throw(TCP_sock::failure(TCP_sock::unknown_host,
01025                                           Text("error resolving host: ") +
01026                                           hostname + ": " +
01027                                           gai_strerror(err)));
01028                 }
01029             }
01030           else
01031             {
01032               assert(res != 0);
01033 
01034               bool found = false;
01035               for(struct addrinfo *cur = res;
01036                   (cur != 0) && !found;
01037                   cur = cur->ai_next)
01038                 {
01039                   assert(cur->ai_addr != 0);
01040                   if(cur->ai_addr->sa_family == AF_INET)
01041                     {
01042                       l_result = ((sockaddr_in *) cur->ai_addr)->sin_addr;
01043                       found = true;
01044                     }
01045                 }
01046 
01047               freeaddrinfo(res);
01048 
01049               if(!found)
01050                 {
01051                   throw(TCP_sock::failure(TCP_sock::unknown_host,
01052                                           Text("error resolving host: ") +
01053                                           hostname +
01054                                           " (getaddrinfo indicates success, "
01055                                           "but no usable result found)"));
01056                 }
01057 
01058               break;
01059             }
01060         }
01061 #endif
01062     }
01063 
01064   return l_result;
01065 }
01066 
01067 /* static */
01068 void TCP_sock::name_to_sockaddr(const Text &hostname, 
01069   const Text &port, /*OUT*/ sockaddr_in &s) throw (TCP_sock::failure)
01070 {
01071   // Make sure we've initialized static members.
01072   TCP_sock::ensure_init();
01073 
01074   // Start by clearing the structure we are to fill in.
01075   memset((char *)(&s), 0, sizeof(s));
01076 
01077   // Get the address of this hostname.  If no hostname was provided
01078   // default to the hostname of this machine.
01079   s.sin_addr = TCP_sock::host_to_addr(hostname.Empty()
01080                                       ? TCP_sock::my_hostname
01081                                       : hostname);
01082 
01083   // We assume AF_INET.  (If it were something else,
01084   // TCP_sock::host_to_addr would throw a failure anyway.)
01085   s.sin_family = AF_INET;
01086 
01087   // Parse port number or look it up (as service name) in network database.
01088   s.sin_port = htons(TCP_sock::parse_port(port));
01089 }
01090 
01091 /* static */
01092 void TCP_sock::name_to_sockaddr(const Text &host_and_port,
01093   /*OUT*/ sockaddr_in &s) throw(TCP_sock::failure)
01094 {
01095     int i = host_and_port.FindCharR(':');
01096     if (i < 1) throw(TCP_sock::failure(TCP_sock::invalid_parameter,
01097       "No colon separating hostname and port number"));
01098     name_to_sockaddr(host_and_port.Sub(0, i),
01099       host_and_port.Sub(i+1), /*OUT*/ s);
01100 }
01101 
01102 //  ********************
01103 //  *  Client support  *
01104 //  ********************
01105 
01106 void TCP_sock::connect_to(const Text &hostname, const Text &port)
01107   throw (TCP_sock::failure)
01108 {
01109   sockaddr_in addr;
01110   try {
01111     TCP_sock::name_to_sockaddr(hostname, port, /*OUT*/ addr);
01112   } catch (TCP_sock::failure &f) { this->state = TCP_sock::dead; throw; };
01113   this->connect_to(addr);
01114 }
01115 
01116 void TCP_sock::connect_to(const Text &host_and_port)
01117   throw (TCP_sock::failure)
01118 {
01119   sockaddr_in addr;
01120   try {
01121     TCP_sock::name_to_sockaddr(host_and_port, /*OUT*/ addr);
01122   } catch (TCP_sock::failure &f) { this->state = TCP_sock::dead; throw; };
01123   this->connect_to(addr);
01124 }
01125 
01126 void TCP_sock::connect_to(sockaddr_in &addr)
01127   throw (TCP_sock::failure)
01128 {
01129     this->ensure_state(TCP_sock::fresh);
01130 
01131     while (connect(this->fd, (sockaddr *)(&addr), sizeof(addr)) < 0) {
01132         // Capture errno in a local to make sure another system call
01133         // doesn't interfere.
01134         int l_errno = errno;
01135         if (l_errno == EISCONN) {
01136             // got it!
01137             break;
01138         } else if (l_errno == EINTR) {
01139             /* The case where connect(2) was interrupted is not fatal;
01140                just go around the loop and try again. */
01141         } else if ((l_errno == EWOULDBLOCK) || (l_errno == EINPROGRESS) ||
01142                    (l_errno == EALREADY) || (l_errno == EAGAIN)) {
01143             /* Wait for connection to finish (fd to become writable) */
01144             struct pollfd flist[1];
01145             flist[0].fd = this->fd;
01146             flist[0].events = POLLOUT;
01147             flist[0].revents = 0;
01148             poll(flist, 1, -1);
01149         } else if (l_errno == EADDRINUSE) {
01150             /* This should be a transient condition.  We can check for
01151                it going away by either executing a read that succeeds,
01152                or by retrying connect. */
01153             char b;
01154             if (read(fd, &b, 0) == 0) {
01155                 break;
01156             }
01157             /* Read failed: retry */
01158         } else {
01159             // Otherwise, the error is fatal
01160             this->state = TCP_sock::dead;
01161             int code = TCP_sock::internal_trouble; 
01162             switch (l_errno) {
01163             case ETIMEDOUT:
01164             case ENETUNREACH:
01165               code = TCP_sock::unknown_host;
01166               break;
01167             case ECONNREFUSED:
01168               code = TCP_sock::unknown_port;
01169               break;
01170             case EADDRNOTAVAIL:
01171               code = TCP_sock::invalid_parameter;
01172               break;
01173               // Note: default value already set above
01174             }
01175 
01176             Text l_msg = "Can't connect to ";
01177             l_msg += inet_ntoa_r(addr.sin_addr);
01178             l_msg += ":";
01179             
01180             char l_port_txt[6];
01181             sprintf(l_port_txt, "%d", ntohs(addr.sin_port));
01182 
01183             l_msg += l_port_txt;
01184 
01185             die_with_errno(code, l_msg, l_errno);
01186         }
01187     }
01188     this->him = addr;
01189     this->state = TCP_sock::connected;
01190 }
01191 
01192 //  **********************
01193 //  *  Listener support  *
01194 //  **********************
01195 
01196 void TCP_sock::set_waiters(int waiters) throw (TCP_sock::failure)
01197 {
01198     this->ensure_state(TCP_sock::fresh);
01199     if (listen(this->fd, waiters) == SYSERROR)
01200         die_with_errno(TCP_sock::internal_trouble,
01201           "can't set_waiters ('listen' failed)");
01202     this->state = TCP_sock::listening;
01203 }
01204 
01205 TCP_sock *TCP_sock::wait_for_connect()
01206   throw(TCP_sock::alerted, TCP_sock::failure)
01207 {
01208     socklen_t len;
01209 
01210     // Create a new socket object but not a socket (as that's handled
01211     // by accept below).
01212     TCP_sock *ns = NEW_CONSTR(TCP_sock, (empty_sock_marker()));
01213 
01214     if (this->state == TCP_sock::fresh) this->set_waiters();
01215 
01216     this->ensure_state(TCP_sock::listening);
01217 
01218     while (true) {
01219         len = sizeof(ns->him);
01220         ns->fd = accept(this->fd, (sockaddr *)(&(ns->him)), &len);
01221         if (ns->fd != SYSERROR) break;
01222         // Accept failed.
01223         if (errno == EWOULDBLOCK) {
01224             // Must use poll to wait (for possible alert)
01225             this->controlled_wait();
01226             // Loop and retry 'accept', which generally shouldn't fail.
01227         } else if (errno == EINTR) {
01228             /*SKIP*/   // go around the loop again.
01229         } else {
01230             this->state = TCP_sock::dead;
01231             int code = (errno == ENOMEM)
01232               ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
01233             die_with_errno(code,
01234               "accept failed on socket " + inet_ntoa_r(ns->me.sin_addr));
01235         }
01236     }
01237 
01238     // Disable Nagle's algorithm
01239     TCP_sock::set_no_delay(ns->fd);
01240 
01241     // Fill in complete socket name (mostly for cleanliness)
01242     len = sizeof(ns->me);
01243     if (getsockname(ns->fd, (sockaddr *)(&(ns->me)), &len) == SYSERROR) {
01244         this->state = TCP_sock::dead;
01245         int code = (errno == ENOBUFS)
01246           ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
01247         die_with_errno(code,
01248           "getsockname failed on " + inet_ntoa_r(ns->me.sin_addr));
01249     }
01250     ns->state = TCP_sock::connected;
01251     return ns;
01252 }
01253 
01254 //  ***********************
01255 //  *  Data transmission  *
01256 //  ***********************
01257 
01258 void TCP_sock::send_data(const char *buffer, int len, bool flush)
01259   throw (TCP_sock::failure)
01260 {
01261   int i = 0;
01262 
01263   this->ensure_state(TCP_sock::connected);
01264   this->ensure_sndbuffer();
01265 
01266   while (len > 0) {
01267     int n = min(len, this->sndbuff_size - this->sndbuff_len);
01268     memcpy(this->sndbuff + this->sndbuff_len, buffer + i, n);
01269     this->sndbuff_len += n;
01270     i += n; len -= n;
01271     if (len > 0 || flush) {
01272       int unsent = this->sndbuff_len, j = 0, sent;
01273       while (true) {
01274         sent = write(this->fd, sndbuff + j, unsent);
01275         if (sent == unsent) break;
01276         if (sent == SYSERROR) {
01277           if (errno == EINTR || errno == EWOULDBLOCK || errno == ENOBUFS) {
01278             //
01279             // A transient send error occurred due to lack of space
01280             // on the outgoing socket buffer or to a transient 
01281             // interruption. Just wait until it is ok to send using
01282             // poll, and retry the write.
01283             while (true) {
01284               struct pollfd poll_data[2];
01285               unsigned int nfds = 1;
01286               // Wait for our socket to be writable
01287               poll_data[0].fd = this->fd;
01288               poll_data[0].events = POLLOUT;
01289               if (this->alerts_enabled) {
01290                 nfds = 2;
01291                 poll_data[1].fd = this->pipe_rd;
01292                 // We're only interested in exceptions on this
01293                 // channel.
01294                 poll_data[1].events = 0;
01295               }
01296               // Use poll with a short timeout (1s)
01297               int nready = poll(poll_data, nfds, 1000);
01298               
01299               if (nready == 0) {
01300                 // Timeout. Retry the write.
01301                 break;
01302               } else if (nready == SYSERROR) {
01303                 // Deal with transient error cases.
01304                 if (errno == EAGAIN) {
01305                   // Insufficient resources to perform the 'poll'
01306                   // (whatever that means...).  Wait a while
01307                   // (with 'sleep', hoping that it uses different
01308                   // resources!) and try the write again.
01309                   sleep(1);
01310                   break;
01311                 } else if (errno != EINTR) {
01312                   this->state = TCP_sock::dead;
01313                   die_with_errno(TCP_sock::internal_trouble,
01314                     "Error during 'poll'");
01315                 }
01316                 /* Loop and retry 'poll' if EINTR. Strictly speaking,
01317                    we should decrement the waiting time by the amount
01318                    of time already lapsed, but it's not worth the trouble.
01319                 */
01320               } else if (poll_data[0].revents & POLLNVAL) {
01321                 // Error
01322                 this->state = TCP_sock::dead;
01323                 die("Error on socket output");
01324               } else if (this->alerts_enabled &&
01325                          (poll_data[1].revents & (POLLERR | POLLNVAL))) {
01326                 // Error
01327                 this->state = TCP_sock::dead;
01328                 die("Error on alert channel pipe");
01329               } else if (poll_data[0].revents & (POLLOUT |
01330                                                  POLLERR | POLLHUP)) {
01331                 // Case 1: possible to send data.  (Note: POLLHUP or
01332                 // POLLERR means that we *can't* write, but attempting
01333                 // to will get us an error that indicates what
01334                 // happened, so we treat it as writable.  This is also
01335                 // the way select(2) treats errors.)
01336                 break;
01337               } 
01338             }
01339           } else {
01340             this->state = TCP_sock::dead;
01341             int code = (errno == ENOBUFS)
01342               ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
01343             die_with_errno(code, "Unable to send data");
01344           }
01345         } else {
01346           j += sent;
01347           unsent -= sent;
01348         }
01349       }
01350       this->sndbuff_len = 0;
01351     }
01352   }
01353 }
01354 
01355 int TCP_sock::fill_rcvbuffer()
01356   throw(TCP_sock::alerted, TCP_sock::failure)
01357 {
01358   int bytes_read;
01359   int debug_errno = 0;
01360 
01361   if (this->non_blocking) {
01362     // Read with bounded delay
01363     while (true) {
01364       /* Socket is in non-blocking mode, so the following read won't
01365          block.  However, there is a possibility that the kernel will
01366          deliver a signal and abort the read, even though it isn't
01367          really waiting.  This is foolish, since the kernel could just
01368          as well return 0, but we guard against it anyway with a loop.
01369       */
01370       do {
01371         bytes_read = read(this->fd, this->rcvbuff,
01372           TCP_sock::default_rcvbuffer_size);
01373       } while (bytes_read == SYSERROR && errno == EINTR);
01374 
01375       debug_errno = errno;
01376 
01377       /* There are four possibilities now:
01378 
01379            (1) bytes_read > 0, implying data was read;
01380 
01381            (2a) bytes_read == 0, implying end-of-file (i.e., the
01382                 remote end closed its socket);
01383 
01384            (2b) bytes_read == SYSERROR and errno == ECONNRESET, which
01385                 sometimes occurs in situations in which one would expect
01386                 case 2a;
01387 
01388            (2c) bytes_read == SYSERROR and errno == ETIMEDOUT or
01389                 EHOSTUNREACH; these errors can occur when the TCP
01390                 keep-alive mechanism is enabled, and indicates that
01391                 the keep-alive time has elapsed and that the remote
01392                 host is either down or no longer reachable by the
01393                 network.
01394 
01395            (3) bytes_read == SYSERROR and errno == EWOULDBLOCK, implying
01396                no data available at the moment; or
01397 
01398            (4) bytes_read == SYSERROR and errno has some other value,
01399                implying a real error.
01400 
01401          If case (1), (2a), (2b), (2c), or (4), the outcome of
01402          recv_data is determined (i.e., success or failure), so we
01403          exit the loop.  If case (3), we need to wait using 'poll'
01404          for data to arrive or a condition to arise that terminates
01405          the waiting (an alert).
01406       */
01407       if (bytes_read > 0) {
01408         return bytes_read; // case (1)
01409       } else if (bytes_read == 0) {
01410         break; // case (2a)
01411       } else if (bytes_read == SYSERROR && errno != EWOULDBLOCK) {
01412         break; // case (2b) or case (4)
01413       }
01414 
01415       // Case (3):
01416       assert(bytes_read == SYSERROR && errno == EWOULDBLOCK);
01417       this->controlled_wait();
01418 
01419       // Loop and retry the read, which should immediately return with
01420       // a positive value of bytes_read and thus exit the loop.
01421     } /* while */
01422   } else {
01423     // Read with potentially indefinite wait
01424     do {
01425       bytes_read = read(fd, rcvbuff, TCP_sock::default_rcvbuffer_size);
01426     } while (bytes_read == SYSERROR && errno == EINTR);
01427     debug_errno = errno;
01428   } /* if */
01429 
01430   // Control comes here when data has been read or a fatal error occurred.
01431   if (bytes_read > 0) {
01432     // Case (1)
01433     return bytes_read;
01434   } else if (bytes_read == 0) {
01435     // case (2a)
01436     this->state = TCP_sock::dead;
01437     throw(TCP_sock::failure(TCP_sock::partner_went_away,
01438       "connection closed by peer"));
01439   } else if (bytes_read == SYSERROR && errno == ECONNRESET) { 
01440     // case (2b)
01441     this->state = TCP_sock::dead;
01442     throw(TCP_sock::failure(TCP_sock::partner_went_away,
01443       "connection reset by peer"));
01444   } else if (bytes_read == SYSERROR && (errno == ETIMEDOUT ||
01445                                         errno == EHOSTUNREACH)) { 
01446     // case (2c)
01447     this->state = TCP_sock::dead;
01448     throw(TCP_sock::failure(TCP_sock::partner_went_away,
01449                             (errno == ETIMEDOUT)
01450                             ? "connection timed out"
01451                             : "peer unreachable"));
01452   } else if (bytes_read == SYSERROR) {
01453     // case (4)
01454     this->state = TCP_sock::dead;
01455     die_with_errno(TCP_sock::internal_trouble, "Error receiving data");
01456   }
01457   return -1; //not reached
01458 }
01459 
01460 int TCP_sock::recv_data(char *buffer, int len)
01461   throw (TCP_sock::alerted, TCP_sock::failure)
01462 {
01463   int nread, bytes_read;
01464   this->ensure_state(TCP_sock::connected);
01465   this->ensure_rcvbuffer();
01466 
01467   if (this->rcvcurp >= this->rcvend) {
01468     assert(this->rcvcurp == this->rcvend);
01469     // The input buffer is empty, ask the kernel for more data.
01470     nread = this->fill_rcvbuffer();
01471     this->rcvcurp = this->rcvbuff;
01472     this->rcvend = this->rcvbuff + nread;
01473   }
01474   bytes_read = min(len, this->rcvend - this->rcvcurp);
01475   memcpy(buffer, this->rcvcurp, bytes_read);
01476   this->rcvcurp += bytes_read;
01477   return bytes_read;
01478 }

Generated on Mon May 8 00:48:57 2006 for Vesta by  doxygen 1.4.2