00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include <Basics.H>
00029 #include <Atom.H>
00030 #include <SRPC.H>
00031 #include <LimService.H>
00032 #include <VestaConfig.H>
00033 #include "ToolDirectoryServer.H"
00034 #include "EvalBasics.H"
00035 #include "Val.H"
00036 #include "Expr.H"
00037 #include "Evaluator_Dir_SRPC.H"
00038
00039 #include <sstream>
00040
00041 using std::ostringstream;
00042 using std::cin;
00043 using std::cerr;
00044 using std::endl;
00045 using std::flush;
00046
00047 static Basics::mutex TDSMu;
00048
00049 static Text tmpName("tmp");
00050
00051
00052 const long CallbackStackSize = 0x20000;
00053
00054
00055 RunToolCalls runToolCalls(maxThreads);
00056
00057 static pthread_once_t init_block = PTHREAD_ONCE_INIT;
00058
00059
00060 static Bit32 toolDirServerReqs = 0UL;
00061
00062
00063 static Bit64 toolCalls = 0UL;
00064
00065 Word GetNewHandle() {
00066 TDSMu.lock();
00067 toolDirServerReqs++;
00068 toolCalls++;
00069 Word newHandle = ((toolCalls<<20) |
00070 (toolDirServerReqs & 0xFFFFF));
00071 TDSMu.unlock();
00072 return newHandle;
00073 }
00074
00075 Word GetNewHandle(Word handle) {
00076 TDSMu.lock();
00077 toolDirServerReqs++;
00078 Word newHandle = ((handle & CONST_INT_64(0xFFFFFFFFFFF00000)) |
00079 (toolDirServerReqs & 0xFFFFF));
00080 TDSMu.unlock();
00081 return newHandle;
00082 }
00083
00084
00085 void send_dir(SRPC *srpc, Word dirHandle) {
00086 DirInfos *dirInfos;
00087 assert(runToolCalls.Get(WordKey(dirHandle>>20), dirInfos));
00088 srpc->send_bytes((char*)&dirHandle, DIR_HANDLE_BYTES);
00089 }
00090
00091 DirInfo *recv_dir(SRPC *srpc, Word& dirHandle, DirInfos*& dirInfos) {
00092 int len = DIR_HANDLE_BYTES;
00093 srpc->recv_bytes_here((char*)&dirHandle, len);
00094 if (len != DIR_HANDLE_BYTES) {
00095 throw(SRPC::failure(SRPC::invalid_parameter, "Invalid directory handle"));
00096 }
00097 if (!runToolCalls.Get(WordKey(dirHandle>>20), dirInfos)) {
00098 throw(SRPC::failure(SRPC::invalid_parameter, "No runtool for this directory handle"));
00099 }
00100 DirInfo *dirInfo;
00101 dirInfos->mu.lock();
00102 bool inTbl = dirInfos->dirInfoTbl->Get(WordKey(dirHandle), dirInfo);
00103 dirInfos->mu.unlock();
00104 if (!inTbl) {
00105 throw(SRPC::failure(SRPC::invalid_parameter, "No such directory handle"));
00106 }
00107 return dirInfo;
00108 }
00109
00110 void DirInfos::AddDpnd(DirInfo *dir, const Text& name, Val v, const PathKind pk) {
00111 if (dir->coarseDep) return;
00112
00113 dir->child_lookup = true;
00114
00115 DepPath newDepPath(dir->path.content);
00116 if (pk == BLenPK) {
00117 newDepPath.content->pKind = BLenPK;
00118 v = ((BindingVC*)v)->Names();
00119 }
00120 else {
00121 newDepPath.Extend(name, pk);
00122 }
00123 if (fsDeps) {
00124 outputMu.lock();
00125 cerr << "FS dependency: ";
00126 newDepPath.Print(&cerr);
00127 cerr << "\n";
00128 outputMu.unlock();
00129 }
00130 DepPathTbl::KVPairPtr pr;
00131 this->mu.lock();
00132 this->dep->Put(newDepPath, v, pr);
00133 this->mu.unlock();
00134
00135 }
00136
00137 Word DirInfos::LookupDir(DirInfo *dir, const Text& name, Val v, Word dirHandle) {
00138
00139
00140
00141
00142 DepPath newPath(dir->path.content);
00143
00144 newPath.Extend(name);
00145 DirInfo *newDirInfo = NEW(DirInfo);
00146 newDirInfo->b = (BindingVC*)v;
00147 newDirInfo->path = newPath;
00148 newDirInfo->isRoot = false;
00149 newDirInfo->coarseDep = (dir->isRoot && name == tmpName);
00150
00151 newDirInfo->child_lookup = false;
00152 dir->child_lookup = true;
00153
00154 this->mu.lock();
00155
00156 if (newDirInfo->coarseDep) {
00157
00158 DepPath newDepPath(newDirInfo->path.content);
00159 if (fsDeps) {
00160 outputMu.lock();
00161 cerr << "FS dependency: ";
00162 newDepPath.Print(&cerr);
00163 cerr << "\n";
00164 outputMu.unlock();
00165 }
00166 DepPathTbl::KVPairPtr pr;
00167 this->dep->Put(newDepPath, v, pr);
00168 }
00169
00170
00171 Word newDirHandle = GetNewHandle(dirHandle);
00172 this->dirInfoTbl->Put(WordKey(newDirHandle), newDirInfo);
00173
00174 this->mu.unlock();
00175
00176 return newDirHandle;
00177 }
00178
00179
00180 void Evaluator_Dir_Server_Inner(SRPC *srpc, int intfVersion, int procId, void *arg)
00181 throw(SRPC::failure, Evaluator::failure)
00182 {
00183 int index;
00184 Val v;
00185 DirInfo *dir;
00186 Word dirHandle;
00187 DirInfos *dirs;
00188
00189 if (intfVersion != EVALUATOR_DIR_SRPC_VERSION) {
00190 srpc->send_failure(SRPC::version_skew, "Unrecognized intfVersion");
00191 }
00192
00193
00194 switch(procId) {
00195 case ed_lookup:
00196 {
00197 dir = recv_dir(srpc, dirHandle, dirs);
00198
00199
00200 Atom id(srpc->recv_chars(), (void*)1);
00201 srpc->recv_end();
00202 if (evalCalls) {
00203 outputMu.lock();
00204 cerr << "EvalDir lookup: ";
00205 dir->path.Print(&cerr);
00206 cerr << '/' << id << '\n';
00207 outputMu.unlock();
00208 }
00209 v = dir->b->GetElemNoDpnd(id, index);
00210 if (v->vKind != ErrorVK) {
00211 switch (v->vKind) {
00212 case BindingVK:
00213 srpc->send_int(ed_directory);
00214 srpc->send_int(index);
00215 send_dir(srpc, dirs->LookupDir(dir, id, v, dirHandle));
00216 break;
00217 case TextVK:
00218 dirs->AddDpnd(dir, id, v, NormPK);
00219 srpc->send_int(ed_file);
00220 srpc->send_int(index);
00221 srpc->send_int(((TextVC*)v)->Sid());
00222 ((TextVC*)v)->FingerPrint().Send(*srpc);
00223 break;
00224 case ModelVK:
00225 dirs->AddDpnd(dir, id, v, NormPK);
00226 srpc->send_int(ed_file);
00227 srpc->send_int(index);
00228 srpc->send_int(((ModelVC*)v)->Sid());
00229 ((ModelVC*)v)->FingerPrintFile().Send(*srpc);
00230 break;
00231 case IntegerVK:
00232 dirs->AddDpnd(dir, id, v, NormPK);
00233 srpc->send_int(ed_device);
00234 srpc->send_int(index);
00235 srpc->send_int(((IntegerVC*)v)->num);
00236 break;
00237 default:
00238 throw(SRPC::failure(SRPC::invalid_parameter,
00239 id + " is not a file or directory"));
00240 }
00241 }
00242 else {
00243 dirs->AddDpnd(dir, id, valFalse, BangPK);
00244 srpc->send_int(ed_none);
00245 }
00246 srpc->send_end();
00247 break;
00248 }
00249 case ed_lookup_index:
00250 {
00251 Text id;
00252 dir = recv_dir(srpc, dirHandle, dirs);
00253 index = srpc->recv_int();
00254 srpc->recv_end();
00255 if (evalCalls) {
00256 outputMu.lock();
00257 cerr << "EvalDir index: ";
00258 dir->path.Print(&cerr);
00259 cerr << '/' << index << '\n';
00260 outputMu.unlock();
00261 }
00262 v = dir->b->GetElemNoDpnd(index, id);
00263 if (v->vKind != ErrorVK) {
00264 switch (v->vKind) {
00265 case BindingVK:
00266 srpc->send_int(ed_directory);
00267 srpc->send_chars(id.chars());
00268 send_dir(srpc, dirs->LookupDir(dir, id, v, dirHandle));
00269 break;
00270 case TextVK:
00271 dirs->AddDpnd(dir, id, v, NormPK);
00272 srpc->send_int(ed_file);
00273 srpc->send_chars(id.chars());
00274 srpc->send_int(((TextVC*)v)->Sid());
00275 ((TextVC*)v)->FingerPrint().Send(*srpc);
00276 break;
00277 case ModelVK:
00278 dirs->AddDpnd(dir, id, v, NormPK);
00279 srpc->send_int(ed_file);
00280 srpc->send_chars(id.chars());
00281 srpc->send_int(((ModelVC*)v)->Sid());
00282 ((ModelVC*)v)->FingerPrintFile().Send(*srpc);
00283 break;
00284 case IntegerVK:
00285 dirs->AddDpnd(dir, id, v, NormPK);
00286 srpc->send_int(ed_device);
00287 srpc->send_chars(id.chars());
00288 srpc->send_int(((IntegerVC*)v)->num);
00289 break;
00290 default:
00291 throw(SRPC::failure(SRPC::invalid_parameter,
00292 id + " is not a file or directory"));
00293 }
00294 }
00295 else {
00296 dirs->AddDpnd(dir, id, valFalse, BangPK);
00297 srpc->send_int(ed_none);
00298 }
00299 srpc->send_end();
00300 break;
00301 }
00302 case ed_oldlist:
00303 case ed_list:
00304 {
00305 Text id;
00306 dir = recv_dir(srpc, dirHandle, dirs);
00307 index = srpc->recv_int();
00308 int limit = srpc->recv_int();
00309 int overhead = srpc->recv_int();
00310 srpc->recv_end();
00311 if (evalCalls) {
00312 outputMu.lock();
00313 cerr << "EvalDir list: ";
00314 dir->path.Print(&cerr);
00315 cerr << '/' << index << '\n';
00316 outputMu.unlock();
00317 }
00318 dirs->AddDpnd(dir, emptyText, dir->b, BLenPK);
00319 srpc->send_seq_start();
00320 while (true) {
00321 v = dir->b->GetElemNoDpnd(index, id);
00322 if (v->vKind == ErrorVK) {
00323
00324 srpc->send_chars("");
00325 srpc->send_int(ed_none);
00326 if (procId != ed_oldlist) {
00327 srpc->send_int(NullShortId);
00328 }
00329 break;
00330 }
00331
00332 limit -= id.Length()+overhead;
00333 if (limit < 0) break;
00334
00335 srpc->send_chars(id.chars());
00336 ed_entry_type etype =
00337 (v->vKind == TextVK) ? ed_file :
00338 (v->vKind == BindingVK) ? ed_directory :
00339 (v->vKind == IntegerVK) ? ed_device : ed_none;
00340 if (etype == ed_none)
00341 throw(SRPC::failure(SRPC::invalid_parameter,
00342 id + " is not a file or directory"));
00343 srpc->send_int(etype);
00344 if (procId != ed_oldlist) {
00345 if (etype == ed_file) {
00346 srpc->send_int(((TextVC*)v)->Sid());
00347 } else {
00348 srpc->send_int(NullShortId);
00349 }
00350 }
00351 index++;
00352 }
00353 srpc->send_seq_end();
00354 srpc->send_end();
00355 break;
00356 }
00357 default:
00358 srpc->send_failure(SRPC::invalid_parameter, "Unrecognized procId");
00359 break;
00360 }
00361 }
00362
00363 void Evaluator_Dir_Server(SRPC *srpc, int intfVersion, int procId, void *arg)
00364 throw(SRPC::failure)
00365
00366
00367
00368 {
00369 try {
00370 Evaluator_Dir_Server_Inner(srpc, intfVersion, procId, arg);
00371 }
00372 catch (Evaluator::failure f) {
00373 outputMu.lock();
00374 cerr << "Evaluator failure occured in tool directory lookup.\n";
00375 outputMu.unlock();
00376 exit(1);
00377 }
00378 }
00379
00380 Text convert_failure(const SRPC::failure& f) {
00381 ostringstream s;
00382 s << "SRPC failure in ToolDirServer (";
00383 switch (f.r) {
00384 case SRPC::unknown_host:
00385 s << "unknown_host"; break;
00386 case SRPC::unknown_interface:
00387 s << "unknown_interface"; break;
00388 case SRPC::version_skew:
00389 s << "version_skew"; break;
00390 case SRPC::protocol_violation:
00391 s << "protocol_violation"; break;
00392 case SRPC::buffer_too_small:
00393 s << "buffer_too_small"; break;
00394 case SRPC::transport_failure:
00395 s << "transport_failure"; break;
00396 case SRPC::internal_trouble:
00397 s << "internal_trouble"; break;
00398 case SRPC::invalid_parameter:
00399 s << "invalid_parameter"; break;
00400 case SRPC::partner_went_away:
00401 s << "partner_went_away"; break;
00402 case SRPC::not_implemented:
00403 s << "not_implemented"; break;
00404 default:
00405 s << f.r; break;
00406 }
00407 s << "): " << f.msg.chars();
00408 Text t(s.str());
00409 return t;
00410 }
00411
00412 void Evaluator_Dir_Server_Failure(SRPC *srpc, const SRPC::failure& f,
00413 void *arg) {
00414
00415 if (f.r == SRPC::partner_went_away) return;
00416 outputMu.lock();
00417 cerr << convert_failure(f).chars() << " (continuing...)\n";
00418 outputMu.unlock();
00419 }
00420
00421 extern "C"
00422 {
00423 void init() {
00424 toolCalls = ((Bit64) time(NULL))<<20;
00425 try {
00426
00427
00428 int max_running = VestaConfig::get_int("Evaluator", "server_max_threads");
00429
00430
00431
00432 Text max_pending_t;
00433 int max_pending;
00434 if(!VestaConfig::get("Evaluator", "server_max_pending", max_pending_t) ||
00435 (max_pending_t == "auto"))
00436 {
00437
00438
00439
00440 int max_needed = max(maxThreads,
00441 VestaConfig::get_int("Repository", "threads"));
00442
00443 max_pending = max_needed - max_running;
00444 }
00445 else
00446 {
00447
00448
00449 max_pending = VestaConfig::get_int("Evaluator", "server_max_pending");
00450 }
00451
00452 LimService *ls =
00453 NEW_CONSTR(LimService, (max_running, max_pending,
00454 Evaluator_Dir_Server, Evaluator_Dir_Server_Failure,
00455 NULL, CallbackStackSize));
00456 toolDirServerIntfName = ls->IntfName();
00457 if (getenv("DEBUG_TOOL_DIR_SERVER")) {
00458 outputMu.lock();
00459 cerr << "Tool directory server listening on port "
00460 << toolDirServerIntfName << endl
00461 << "Hit enter to continue: " << flush;
00462 char enter;
00463 cin.get(enter);
00464 outputMu.unlock();
00465 }
00466 (void) ls->Forked_Run();
00467 } catch (VestaConfig::failure f) {
00468 outputMu.lock();
00469 cerr << endl
00470 << "Configuration error starting tool directory server:" << endl
00471 << f.msg << endl;
00472 exit(1);
00473 } catch (SRPC::failure f) {
00474 outputMu.lock();
00475 cerr << endl
00476 << "Error starting tool directory server:" << endl
00477 << convert_failure(f) << endl;
00478 outputMu.unlock();
00479 exit(1);
00480 } catch(...) {
00481 outputMu.lock();
00482 cerr << endl
00483 << "Unexpected exception starting tool directory server!" << endl;
00484 outputMu.unlock();
00485 exit(1);
00486 }
00487 }
00488 }
00489
00490 void create_tool_directory_server() {
00491 pthread_once(&init_block, &init);
00492 }