Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

RunToolClient.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // Last modified on Mon May 23 22:16:20 EDT 2005 by ken@xorian.net         
00020 //      modified on Mon Aug 12 13:16:09 EDT 2002 by kcschalk@shr.intel.com 
00021 //      modified on Tue Jan  9 16:21:10 PST 2001 by mann   
00022 //      modified on Fri Feb  5 17:14:18 PST 1999 by heydon 
00023 //      modified on Wed Apr 10 17:16:09 PDT 1996 by levin
00024 
00025 #include <Basics.H>
00026 #include <VestaConfig.H>
00027 
00028 // SRPC stuff
00029 #include <TCP_sock.H>
00030     // for name_to_sockaddr
00031 
00032 // Repository stuff
00033 #include <VDirSurrogate.H>
00034     // for getNFSInfo
00035 
00036 #include "RunToolClient.H"
00037 
00038 using std::ostream;
00039 using std::flush;
00040 
00041 // Class global variables
00042 
00043 pthread_once_t RunTool::init_block = PTHREAD_ONCE_INIT;
00044 
00045 MultiSRPC *RunTool::srpc_cache = NULL;
00046 Text default_port;
00047 
00048 // By default, we give a RunToolServer a maximum of 2 minutes to
00049 // respond to an inquiry for its status.  This keeps a client from
00050 // getting stuck talking to a misbehaving RunToolServer (such as an
00051 // overloaded or wedged system that fails to schedule the
00052 // RunToolServer to respond).
00053 static unsigned int get_info_read_timeout = 120;
00054 
00055 //  ********************
00056 //  *  Initialization  *
00057 //  ********************
00058 
00059 extern "C"
00060 {
00061   void RunTool_init()
00062   {
00063     RunTool::init();
00064   }
00065 }
00066 
00067 void RunTool::ensure_init() throw (SRPC::failure)
00068 {
00069   if (pthread_once(&init_block, RunTool_init) != 0)
00070     throw(SRPC::failure(implementation_error, "pthread_once failed!"));
00071 }
00072 
00073 void RunTool::init() throw(SRPC::failure)
00074 {
00075   try {
00076     default_port = VestaConfig::get_Text(RUN_TOOL_CONFIG_SECTION,
00077                                          "SRPC_port");
00078     Text info_timeout_t;
00079     if(VestaConfig::get(RUN_TOOL_CONFIG_SECTION, "info_read_timeout", info_timeout_t))
00080       {
00081         get_info_read_timeout = strtoul(info_timeout_t.cchars(), NULL, 0);
00082       }
00083     srpc_cache = NEW(MultiSRPC);
00084   } catch (VestaConfig::failure f) {
00085     throw(SRPC::failure(implementation_error, f.msg));
00086   }
00087 }
00088 
00089 //  ***************
00090 //  *  Utilities  *
00091 //  ***************
00092 
00093 
00094 void RunTool::send_relay_socket(Tool_Relay &r, SRPC *srpc)
00095 {
00096   sockaddr_in sock;
00097   r.get_sockaddr(sock);
00098   srpc->send_socket(sock);
00099 }
00100 
00101 void RunTool::send_vdir(LongId &fsroot, SRPC *srpc)
00102 {
00103   // The server needs only the information required to establish a mount
00104   // point for the 'fsroot' directory.  This consists of a socket/port
00105   // and an NFS file handle understood by the Vesta repository, which it
00106   // calls a LongId.
00107   char buffer[100];      // ugh
00108   char *nfs_sock_and_port = buffer;
00109   LongId root, muRoot;        // unused here
00110   sockaddr_in sock;
00111   VDirSurrogate::getNFSInfo(nfs_sock_and_port, root, muRoot);
00112   
00113   TCP_sock::name_to_sockaddr(nfs_sock_and_port, sock);
00114   srpc->send_socket(sock);
00115   srpc->send_bytes((char *)&fsroot, sizeof(fsroot));
00116 }
00117 
00118 
00119 //  ********************
00120 //  *  The main event  *
00121 //  ********************
00122 
00123 
00124 RunTool::Tool_result
00125 RunTool::do_it(
00126                 const Text &host,
00127                 chars_seq *argv,
00128                 LongId &fsroot,
00129                 const Text &wd,
00130                 chars_seq *ev,
00131                 ostream *report_out,
00132                 ostream *value_out,
00133                 ostream *report_err,
00134                 ostream *value_err,
00135                 Basics::mutex *mu,
00136                 const Text &label,
00137                 const Text &stdin_name
00138                 ) throw(SRPC::failure, FS::Failure)
00139 {
00140   MultiSRPC::ConnId host_conn = -1;
00141   Tool_Relay out;
00142   Tool_Relay err;
00143   Tool_result res;
00144   SRPC *srpc;
00145   SRPC::failure f(0, "");
00146 
00147   bool write_failed = false; // Has there been a write failure?
00148   FS::Failure write_failure; // Saved write failure
00149 
00150   ensure_init();
00151 
00152   res.status = res.sigval = 0;
00153   res.stdout_written = res.stderr_written = false;
00154 
00155   try {
00156     int colon = host.FindChar(':');
00157     if (colon == -1) {
00158       // Host name only
00159       host_conn = srpc_cache->Start(host, default_port, srpc);
00160     } else {
00161       Text host_only = host.Sub(0, colon);
00162       Text port = host.Sub(colon + 1);
00163       host_conn = srpc_cache->Start(host_only, port, srpc);
00164     }
00165 
00166     out.setup(report_out, value_out, mu, label);
00167     err.setup(report_err, value_err, mu, label);
00168 
00169     // Establish call
00170     srpc->start_call(RUN_TOOL_DOIT, RUN_TOOL_INTERFACE_VERSION);
00171 
00172     // Send arguments
00173     srpc->send_Text(stdin_name);
00174     send_relay_socket(out, srpc);
00175     send_relay_socket(err, srpc);
00176     send_vdir(fsroot, srpc);
00177     srpc->send_Text(wd);
00178     if (ev == NULL) {
00179       ev = NEW(chars_seq);
00180       srpc->send_chars_seq(*ev);
00181       delete ev;
00182     } else
00183       srpc->send_chars_seq(*ev);
00184     srpc->send_chars_seq(*argv);
00185     srpc->send_end();
00186 
00187     // Receive results
00188     res.status = srpc->recv_int();
00189     res.sigval = srpc->recv_int();
00190     res.dumped_core = srpc->recv_bool();
00191     srpc->recv_end();
00192   } catch(SRPC::failure ff) {
00193     // srpc is no longer usable; MultiSRPC will deal with it.
00194     f = ff;
00195   } catch(TCP_sock::failure tf) {
00196     // happens only on 'setup' and 'send_vdir' calls
00197     f = SRPC::convert_TCP_failure(tf);
00198   }
00199 
00200   try {
00201     res.stdout_written = out.finish(f.r == 0);
00202   } catch (TCP_sock::failure tf) {
00203     if (f.r == 0) f = SRPC::convert_TCP_failure(tf);
00204   } catch (FS::Failure f) {
00205     // Write failure (e.g. disk full writing value stream)
00206     if(!write_failed)
00207       {
00208         // Note that we only save the first write failure.
00209         write_failure = f;
00210         write_failed = true;
00211       }
00212   }
00213   try {
00214     res.stderr_written = err.finish(f.r == 0);
00215   } catch (TCP_sock::failure tf) {
00216     if (f.r == 0) f = SRPC::convert_TCP_failure(tf);
00217   } catch (FS::Failure f) {
00218     // Write failure (e.g. disk full writing value stream)
00219     if(!write_failed)
00220       {
00221         // Note that we only save the first write failure.
00222         write_failure = f;
00223         write_failed = true;
00224       }
00225   }
00226   srpc = NULL;                    // cleanliness
00227   srpc_cache->End(host_conn);
00228   if (f.r != 0) throw f;
00229   if(write_failed) throw write_failure;
00230 
00231   return res;
00232 }
00233 
00234 void 
00235 RunTool::get_info(const Text &host,
00236                   /*OUT*/ Host_info& hinfo) throw(SRPC::failure)
00237 {
00238   MultiSRPC::ConnId host_conn = -1;
00239   SRPC* srpc;
00240   ensure_init();
00241 
00242   try {
00243     int colon = host.FindChar(':');
00244     if (colon == -1) {
00245       // Host name only
00246       host_conn = srpc_cache->Start(host, default_port, srpc);
00247     } else {
00248       Text host_only = host.Sub(0, colon);
00249       Text port = host.Sub(colon + 1);
00250       host_conn = srpc_cache->Start(host_only, port, srpc);
00251     }
00252 
00253     // Avoid RunToolServer "tar pits" by setting a limit on how long
00254     // we'll wait for a response when trying to get information.
00255     srpc->enable_read_timeout(get_info_read_timeout);
00256 
00257     // Establish call
00258     srpc->start_call(RUN_TOOL_INFO, RUN_TOOL_INTERFACE_VERSION);
00259 
00260     // No arguments
00261     srpc->send_end();
00262 
00263     // Receive results
00264     srpc->recv_Text(hinfo.sysname);
00265     srpc->recv_Text(hinfo.release);
00266     srpc->recv_Text(hinfo.version);
00267     srpc->recv_Text(hinfo.machine);
00268     hinfo.cpus = srpc->recv_int();
00269     hinfo.cpuMHz = srpc->recv_int();
00270     hinfo.memKB = srpc->recv_int();
00271     hinfo.max_tools = srpc->recv_int();
00272     hinfo.cur_tools = srpc->recv_int();
00273     hinfo.load = srpc->recv_float();
00274     hinfo.cur_pending = srpc->recv_int();
00275     hinfo.max_pending = srpc->recv_int();
00276     hinfo.uniqueid.Recv(*srpc);
00277     srpc->recv_end();
00278 
00279     // Turn the read tiemout back off, as we don't want to put a bound
00280     // on the time until the RunToolServer sends us the result of a
00281     // tool invocation.
00282     srpc->disable_read_timeout();
00283 
00284   } catch(SRPC::failure f) {
00285     // srpc is no longer usable; MultiSRPC will deal with it.
00286     srpc_cache->End(host_conn);
00287     throw;
00288   }
00289   srpc_cache->End(host_conn);
00290 }
00291 
00292 
00293 // =====================================================================
00294 
00295 //  *******************************
00296 //  *  Tool_Relay implementation  *
00297 //  *******************************
00298 
00299 Tool_Relay::Tool_Relay():tcp_f(0, ""), label("")
00300 {
00301   listener = NULL;
00302   s = NULL;
00303   buffer = NULL;
00304   report = NULL;
00305   value = NULL;
00306   mu = NULL;
00307   buff_len = 0;
00308   remaining = 0;
00309   write_failed = false;
00310 }
00311 
00312 Tool_Relay::~Tool_Relay()
00313 {
00314   if (buffer != NULL) delete[] buffer;
00315   if (s != NULL) delete s;
00316   if (listener != NULL) delete listener;
00317 }
00318 
00319 void *relay_thread(void *arg)
00320 {
00321   Tool_Relay *r = (Tool_Relay *)arg;
00322   r->body();
00323   return NULL;
00324 }
00325 
00326 void Tool_Relay::body()
00327 {
00328   try {
00329     TCP_sock *ss = listener->wait_for_connect();
00330     ss->enable_alerts();
00331     m.lock();
00332     // Prevent race between body() and finish()
00333     s = ss;  // finish will now alert s instead of listener
00334     bool quit = listener->test_alert();
00335     m.unlock();
00336     if (quit) {
00337         TCP_sock::alerted a;
00338         throw a;
00339     }
00340     while (true) {  // loop exits with TCP_sock::alerted exception
00341       // Read data and write to out_fd
00342       if (buff_len - remaining < RELAY_BUFFER_SIZE/2) {
00343         // Too much stuff left over from last read; grow buffer
00344         buff_len *= 2;
00345         char* newbuff = NEW_PTRFREE_ARRAY(char, buff_len);
00346         memcpy(newbuff, buffer, remaining);
00347         delete[] buffer;
00348         buffer = newbuff;
00349       }
00350       int len = s->recv_data(buffer + remaining, buff_len - remaining);
00351       if (value) {
00352         FS::Write(*value, buffer + remaining, len);
00353       }
00354       if (report) {
00355         if (label == "") {
00356           // Single-threaded case, no labeling or locking
00357           report->write(buffer + remaining, len);
00358         } else {
00359           mu->lock();
00360           // Multi-threaded case.  Label each line, and don't
00361           // output a partial line.
00362           char* pos = buffer;
00363           char* end;
00364           remaining += len;
00365           while (remaining) {
00366             end = (char *) memchr(pos, '\n', remaining);
00367             if (end == NULL) {
00368               if (buffer != pos) memmove(buffer, pos, remaining);
00369               break;
00370             } else {
00371               *report << label;
00372               end++;
00373               int linelen = end - pos;
00374               report->write(pos, linelen);
00375               remaining -= linelen;
00376               pos = end;
00377             }
00378           }
00379           *report << flush;
00380           mu->unlock();
00381         }
00382       }
00383       written = true;
00384     }
00385   } catch(TCP_sock::failure f) {
00386     if (f.reason != TCP_sock::end_of_file) tcp_f = f;
00387   } catch (TCP_sock::alerted) {
00388     /* forced termination */
00389   } catch (FS::Failure f) {
00390     // Write failure (e.g. disk full writing value stream)
00391     if(!write_failed)
00392       {
00393         // Note that we only save the first write failure.
00394         write_failure = f;
00395         write_failed = true;
00396       }
00397   }
00398 
00399   // Final cleanup
00400   if (report) {
00401     if (label == "") {
00402       // Single-threaded case; we haven't been flushing
00403       *report << flush;
00404     } else {
00405       // Multi-threaded case; each full line was flushed
00406       if (remaining) {
00407         // Output remaining partial line, adding a \n for neatness
00408         mu->lock();
00409         *report << label;
00410         report->write(buffer, remaining);
00411         *report << '\n' << flush;
00412         mu->unlock();
00413       }
00414     }
00415   }
00416 }
00417 
00418 void Tool_Relay::setup(ostream* report, ostream* value, Basics::mutex* mu,
00419                        const Text& label)
00420      throw (TCP_sock::failure)
00421 {
00422   if (report == NULL && value == NULL) {
00423       // no relay needed; server will get socket address of
00424       // zero (see get_sockaddr).
00425       return;
00426   }
00427   listener = NEW(TCP_sock);
00428   listener->enable_alerts();
00429   // Make sure that we're listening in case the RunToolServer contacts
00430   // us before the relay thread starts.
00431   listener->set_waiters();
00432   this->report = report;
00433   this->value = value;
00434   this->mu = mu;
00435   this->label = label;
00436   buff_len = RELAY_BUFFER_SIZE;
00437   buffer = NEW_PTRFREE_ARRAY(char, buff_len);
00438   written = false;
00439   relay.fork(&relay_thread, (void *)this);
00440 }
00441 
00442 bool Tool_Relay::finish(bool wait) throw(TCP_sock::failure, FS::Failure)
00443 {
00444   if (listener == NULL) {
00445       // 'setup' was never called, or fd == NO_FD
00446       return false;
00447   }
00448   if (!wait) {
00449       // The relay thread might be waiting either to establish the connection
00450       // or for data to arrive.  In the former case, it is waiting on the
00451       // listener socket, and the regular socket is NULL.  In the latter case,
00452       // it is waiting on the regular socket.
00453       m.lock();
00454       (s == NULL ? listener : s)->alert();
00455       m.unlock();
00456   }
00457   (void) relay.join();
00458   if (tcp_f.reason != 0) throw tcp_f;
00459   if(write_failed) throw write_failure;
00460   return written;
00461 }
00462 
00463 void Tool_Relay::get_sockaddr(sockaddr_in &sock)
00464 {
00465   if (listener == NULL) bzero((char *)&sock, sizeof(sock));
00466   else if (s == NULL) listener->get_local_addr(sock);
00467   else s->get_local_addr(sock);
00468 }

Generated on Mon May 8 00:48:56 2006 for Vesta by  doxygen 1.4.2