00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <Basics.H>
00026 #include <VestaConfig.H>
00027
00028
00029 #include <TCP_sock.H>
00030
00031
00032
00033 #include <VDirSurrogate.H>
00034
00035
00036 #include "RunToolClient.H"
00037
00038 using std::ostream;
00039 using std::flush;
00040
00041
00042
00043 pthread_once_t RunTool::init_block = PTHREAD_ONCE_INIT;
00044
00045 MultiSRPC *RunTool::srpc_cache = NULL;
00046 Text default_port;
00047
00048
00049
00050
00051
00052
00053 static unsigned int get_info_read_timeout = 120;
00054
00055
00056
00057
00058
00059 extern "C"
00060 {
00061 void RunTool_init()
00062 {
00063 RunTool::init();
00064 }
00065 }
00066
00067 void RunTool::ensure_init() throw (SRPC::failure)
00068 {
00069 if (pthread_once(&init_block, RunTool_init) != 0)
00070 throw(SRPC::failure(implementation_error, "pthread_once failed!"));
00071 }
00072
00073 void RunTool::init() throw(SRPC::failure)
00074 {
00075 try {
00076 default_port = VestaConfig::get_Text(RUN_TOOL_CONFIG_SECTION,
00077 "SRPC_port");
00078 Text info_timeout_t;
00079 if(VestaConfig::get(RUN_TOOL_CONFIG_SECTION, "info_read_timeout", info_timeout_t))
00080 {
00081 get_info_read_timeout = strtoul(info_timeout_t.cchars(), NULL, 0);
00082 }
00083 srpc_cache = NEW(MultiSRPC);
00084 } catch (VestaConfig::failure f) {
00085 throw(SRPC::failure(implementation_error, f.msg));
00086 }
00087 }
00088
00089
00090
00091
00092
00093
00094 void RunTool::send_relay_socket(Tool_Relay &r, SRPC *srpc)
00095 {
00096 sockaddr_in sock;
00097 r.get_sockaddr(sock);
00098 srpc->send_socket(sock);
00099 }
00100
00101 void RunTool::send_vdir(LongId &fsroot, SRPC *srpc)
00102 {
00103
00104
00105
00106
00107 char buffer[100];
00108 char *nfs_sock_and_port = buffer;
00109 LongId root, muRoot;
00110 sockaddr_in sock;
00111 VDirSurrogate::getNFSInfo(nfs_sock_and_port, root, muRoot);
00112
00113 TCP_sock::name_to_sockaddr(nfs_sock_and_port, sock);
00114 srpc->send_socket(sock);
00115 srpc->send_bytes((char *)&fsroot, sizeof(fsroot));
00116 }
00117
00118
00119
00120
00121
00122
00123
00124 RunTool::Tool_result
00125 RunTool::do_it(
00126 const Text &host,
00127 chars_seq *argv,
00128 LongId &fsroot,
00129 const Text &wd,
00130 chars_seq *ev,
00131 ostream *report_out,
00132 ostream *value_out,
00133 ostream *report_err,
00134 ostream *value_err,
00135 Basics::mutex *mu,
00136 const Text &label,
00137 const Text &stdin_name
00138 ) throw(SRPC::failure, FS::Failure)
00139 {
00140 MultiSRPC::ConnId host_conn = -1;
00141 Tool_Relay out;
00142 Tool_Relay err;
00143 Tool_result res;
00144 SRPC *srpc;
00145 SRPC::failure f(0, "");
00146
00147 bool write_failed = false;
00148 FS::Failure write_failure;
00149
00150 ensure_init();
00151
00152 res.status = res.sigval = 0;
00153 res.stdout_written = res.stderr_written = false;
00154
00155 try {
00156 int colon = host.FindChar(':');
00157 if (colon == -1) {
00158
00159 host_conn = srpc_cache->Start(host, default_port, srpc);
00160 } else {
00161 Text host_only = host.Sub(0, colon);
00162 Text port = host.Sub(colon + 1);
00163 host_conn = srpc_cache->Start(host_only, port, srpc);
00164 }
00165
00166 out.setup(report_out, value_out, mu, label);
00167 err.setup(report_err, value_err, mu, label);
00168
00169
00170 srpc->start_call(RUN_TOOL_DOIT, RUN_TOOL_INTERFACE_VERSION);
00171
00172
00173 srpc->send_Text(stdin_name);
00174 send_relay_socket(out, srpc);
00175 send_relay_socket(err, srpc);
00176 send_vdir(fsroot, srpc);
00177 srpc->send_Text(wd);
00178 if (ev == NULL) {
00179 ev = NEW(chars_seq);
00180 srpc->send_chars_seq(*ev);
00181 delete ev;
00182 } else
00183 srpc->send_chars_seq(*ev);
00184 srpc->send_chars_seq(*argv);
00185 srpc->send_end();
00186
00187
00188 res.status = srpc->recv_int();
00189 res.sigval = srpc->recv_int();
00190 res.dumped_core = srpc->recv_bool();
00191 srpc->recv_end();
00192 } catch(SRPC::failure ff) {
00193
00194 f = ff;
00195 } catch(TCP_sock::failure tf) {
00196
00197 f = SRPC::convert_TCP_failure(tf);
00198 }
00199
00200 try {
00201 res.stdout_written = out.finish(f.r == 0);
00202 } catch (TCP_sock::failure tf) {
00203 if (f.r == 0) f = SRPC::convert_TCP_failure(tf);
00204 } catch (FS::Failure f) {
00205
00206 if(!write_failed)
00207 {
00208
00209 write_failure = f;
00210 write_failed = true;
00211 }
00212 }
00213 try {
00214 res.stderr_written = err.finish(f.r == 0);
00215 } catch (TCP_sock::failure tf) {
00216 if (f.r == 0) f = SRPC::convert_TCP_failure(tf);
00217 } catch (FS::Failure f) {
00218
00219 if(!write_failed)
00220 {
00221
00222 write_failure = f;
00223 write_failed = true;
00224 }
00225 }
00226 srpc = NULL;
00227 srpc_cache->End(host_conn);
00228 if (f.r != 0) throw f;
00229 if(write_failed) throw write_failure;
00230
00231 return res;
00232 }
00233
00234 void
00235 RunTool::get_info(const Text &host,
00236 Host_info& hinfo) throw(SRPC::failure)
00237 {
00238 MultiSRPC::ConnId host_conn = -1;
00239 SRPC* srpc;
00240 ensure_init();
00241
00242 try {
00243 int colon = host.FindChar(':');
00244 if (colon == -1) {
00245
00246 host_conn = srpc_cache->Start(host, default_port, srpc);
00247 } else {
00248 Text host_only = host.Sub(0, colon);
00249 Text port = host.Sub(colon + 1);
00250 host_conn = srpc_cache->Start(host_only, port, srpc);
00251 }
00252
00253
00254
00255 srpc->enable_read_timeout(get_info_read_timeout);
00256
00257
00258 srpc->start_call(RUN_TOOL_INFO, RUN_TOOL_INTERFACE_VERSION);
00259
00260
00261 srpc->send_end();
00262
00263
00264 srpc->recv_Text(hinfo.sysname);
00265 srpc->recv_Text(hinfo.release);
00266 srpc->recv_Text(hinfo.version);
00267 srpc->recv_Text(hinfo.machine);
00268 hinfo.cpus = srpc->recv_int();
00269 hinfo.cpuMHz = srpc->recv_int();
00270 hinfo.memKB = srpc->recv_int();
00271 hinfo.max_tools = srpc->recv_int();
00272 hinfo.cur_tools = srpc->recv_int();
00273 hinfo.load = srpc->recv_float();
00274 hinfo.cur_pending = srpc->recv_int();
00275 hinfo.max_pending = srpc->recv_int();
00276 hinfo.uniqueid.Recv(*srpc);
00277 srpc->recv_end();
00278
00279
00280
00281
00282 srpc->disable_read_timeout();
00283
00284 } catch(SRPC::failure f) {
00285
00286 srpc_cache->End(host_conn);
00287 throw;
00288 }
00289 srpc_cache->End(host_conn);
00290 }
00291
00292
00293
00294
00295
00296
00297
00298
00299 Tool_Relay::Tool_Relay():tcp_f(0, ""), label("")
00300 {
00301 listener = NULL;
00302 s = NULL;
00303 buffer = NULL;
00304 report = NULL;
00305 value = NULL;
00306 mu = NULL;
00307 buff_len = 0;
00308 remaining = 0;
00309 write_failed = false;
00310 }
00311
00312 Tool_Relay::~Tool_Relay()
00313 {
00314 if (buffer != NULL) delete[] buffer;
00315 if (s != NULL) delete s;
00316 if (listener != NULL) delete listener;
00317 }
00318
00319 void *relay_thread(void *arg)
00320 {
00321 Tool_Relay *r = (Tool_Relay *)arg;
00322 r->body();
00323 return NULL;
00324 }
00325
00326 void Tool_Relay::body()
00327 {
00328 try {
00329 TCP_sock *ss = listener->wait_for_connect();
00330 ss->enable_alerts();
00331 m.lock();
00332
00333 s = ss;
00334 bool quit = listener->test_alert();
00335 m.unlock();
00336 if (quit) {
00337 TCP_sock::alerted a;
00338 throw a;
00339 }
00340 while (true) {
00341
00342 if (buff_len - remaining < RELAY_BUFFER_SIZE/2) {
00343
00344 buff_len *= 2;
00345 char* newbuff = NEW_PTRFREE_ARRAY(char, buff_len);
00346 memcpy(newbuff, buffer, remaining);
00347 delete[] buffer;
00348 buffer = newbuff;
00349 }
00350 int len = s->recv_data(buffer + remaining, buff_len - remaining);
00351 if (value) {
00352 FS::Write(*value, buffer + remaining, len);
00353 }
00354 if (report) {
00355 if (label == "") {
00356
00357 report->write(buffer + remaining, len);
00358 } else {
00359 mu->lock();
00360
00361
00362 char* pos = buffer;
00363 char* end;
00364 remaining += len;
00365 while (remaining) {
00366 end = (char *) memchr(pos, '\n', remaining);
00367 if (end == NULL) {
00368 if (buffer != pos) memmove(buffer, pos, remaining);
00369 break;
00370 } else {
00371 *report << label;
00372 end++;
00373 int linelen = end - pos;
00374 report->write(pos, linelen);
00375 remaining -= linelen;
00376 pos = end;
00377 }
00378 }
00379 *report << flush;
00380 mu->unlock();
00381 }
00382 }
00383 written = true;
00384 }
00385 } catch(TCP_sock::failure f) {
00386 if (f.reason != TCP_sock::end_of_file) tcp_f = f;
00387 } catch (TCP_sock::alerted) {
00388
00389 } catch (FS::Failure f) {
00390
00391 if(!write_failed)
00392 {
00393
00394 write_failure = f;
00395 write_failed = true;
00396 }
00397 }
00398
00399
00400 if (report) {
00401 if (label == "") {
00402
00403 *report << flush;
00404 } else {
00405
00406 if (remaining) {
00407
00408 mu->lock();
00409 *report << label;
00410 report->write(buffer, remaining);
00411 *report << '\n' << flush;
00412 mu->unlock();
00413 }
00414 }
00415 }
00416 }
00417
00418 void Tool_Relay::setup(ostream* report, ostream* value, Basics::mutex* mu,
00419 const Text& label)
00420 throw (TCP_sock::failure)
00421 {
00422 if (report == NULL && value == NULL) {
00423
00424
00425 return;
00426 }
00427 listener = NEW(TCP_sock);
00428 listener->enable_alerts();
00429
00430
00431 listener->set_waiters();
00432 this->report = report;
00433 this->value = value;
00434 this->mu = mu;
00435 this->label = label;
00436 buff_len = RELAY_BUFFER_SIZE;
00437 buffer = NEW_PTRFREE_ARRAY(char, buff_len);
00438 written = false;
00439 relay.fork(&relay_thread, (void *)this);
00440 }
00441
00442 bool Tool_Relay::finish(bool wait) throw(TCP_sock::failure, FS::Failure)
00443 {
00444 if (listener == NULL) {
00445
00446 return false;
00447 }
00448 if (!wait) {
00449
00450
00451
00452
00453 m.lock();
00454 (s == NULL ? listener : s)->alert();
00455 m.unlock();
00456 }
00457 (void) relay.join();
00458 if (tcp_f.reason != 0) throw tcp_f;
00459 if(write_failed) throw write_failure;
00460 return written;
00461 }
00462
00463 void Tool_Relay::get_sockaddr(sockaddr_in &sock)
00464 {
00465 if (listener == NULL) bzero((char *)&sock, sizeof(sock));
00466 else if (s == NULL) listener->get_local_addr(sock);
00467 else s->get_local_addr(sock);
00468 }