00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include <Basics.H>
00027 #include <VestaConfig.H>
00028 #include <RunToolClient.H>
00029 #include <fnmatch.h>
00030 #include "RunToolHost.H"
00031 #include "RunToolClient.H"
00032 #include "ThreadData.H"
00033
00034 using std::cerr;
00035 using std::endl;
00036
00037 #define DEBUG(s)
00038
00039
00040
00041 struct Host {
00042 Text name;
00043 FP::Tag uniqueid;
00044 int usecount;
00045 bool bad;
00046
00047 Host() : usecount(0), bad(false) { }
00048 };
00049
00050
00051 struct Platform {
00052 Platform* next;
00053 Text name;
00054
00055 Text sysname, release, version, machine;
00056
00057 int cpus, cpuMHz, memKB;
00058 int nhosts;
00059 Host* hosts;
00060
00061 bool anchorfirst;
00062
00063
00064 bool lowSlotsMessagePrinted;
00065 };
00066
00067
00068 static Basics::mutex mu;
00069 static Platform* platforms;
00070 static bool initialized = false;
00071 static float negligibleExternalLoadPerCPU = 0.75;
00072 static unsigned int myhosthash;
00073 static Basics::cond toolDone;
00074 static unsigned int usecountTotal = 0;
00075
00076
00077 static Platform* LookupPlatform(TextVC *platform, SrcLoc *loc);
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133 Text
00134 RunToolHost(TextVC *platform, SrcLoc *loc, void*& handle)
00135 {
00136 Host* host = NULL;
00137 mu.lock();
00138 try {
00139
00140 Platform* plat = LookupPlatform(platform, loc);
00141 int id = ThreadDataGet()->id;
00142 DEBUG(cout << "thread " << id << ": ");
00143
00144
00145
00146 while(host == NULL)
00147 {
00148
00149
00150
00151 float lightload = 3.4e38;
00152
00153 int lighthost = 0;
00154 RunTool::Host_info light_hinfo;
00155
00156 for (unsigned int ii=0; ii<plat->nhosts; ii++) {
00157 unsigned int i;
00158
00159 if (plat->anchorfirst) {
00160 i = ii ? ((ii + myhosthash) % (plat->nhosts - 1)) + 1 : 0;
00161 } else {
00162 i = (ii + myhosthash) % plat->nhosts;
00163 }
00164 DEBUG(cout << "host " << i << " ");
00165
00166
00167 if (plat->hosts[i].bad) continue;
00168 RunTool::Host_info hinfo;
00169 try {
00170 RunTool::get_info(plat->hosts[i].name, hinfo);
00171 } catch (SRPC::failure f) {
00172 outputMu.lock();
00173 cerr << "Warning: failed to contact runtool server on host \""
00174 << plat->hosts[i].name << "\": " << f.msg << endl;
00175 outputMu.unlock();
00176 plat->hosts[i].bad = true;
00177 continue;
00178 }
00179
00180
00181 if (hinfo.uniqueid != plat->hosts[i].uniqueid) {
00182 int j;
00183 plat->hosts[i].uniqueid = hinfo.uniqueid;
00184 for (j=0; j<plat->nhosts; j++) {
00185 if (j != i && !plat->hosts[j].bad &&
00186 plat->hosts[j].uniqueid == plat->hosts[i].uniqueid) {
00187 DEBUG(cout << "(duplicate of " << j << ") ");
00188 plat->hosts[i].bad = true;
00189 break;
00190 }
00191 }
00192 if (plat->hosts[i].bad) continue;
00193 }
00194
00195
00196 if(fnmatch(plat->sysname.cchars(), hinfo.sysname.cchars(), 0) != 0 ||
00197 fnmatch(plat->release.cchars(), hinfo.release.cchars(), 0) != 0 ||
00198 fnmatch(plat->version.cchars(), hinfo.version.cchars(), 0) != 0 ||
00199 fnmatch(plat->machine.cchars(), hinfo.machine.cchars(), 0) != 0 ||
00200 hinfo.cpus < plat->cpus ||
00201 hinfo.cpuMHz < plat->cpuMHz ||
00202 hinfo.memKB < plat->memKB) {
00203
00204
00205 if (i > 0) {
00206 outputMu.lock();
00207 cerr << "Warning: runtool server on host \""
00208 << plat->hosts[i].name << "\" does not match platform \""
00209 << plat->name << "\"." << endl;
00210 outputMu.unlock();
00211 }
00212 plat->hosts[i].bad = true;
00213 continue;
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 int cur_tools = hinfo.cur_tools + plat->hosts[i].usecount;
00225 DEBUG(cout << plat->hosts[i].name << " tools=" << cur_tools << endl);
00226
00227
00228
00229
00230 float max_load = hinfo.cpus * negligibleExternalLoadPerCPU;
00231 if (cur_tools == 0 && hinfo.load < max_load) {
00232 host = &plat->hosts[i];
00233 break;
00234 }
00235
00236
00237 float tool_load = ((float) cur_tools)/((float) hinfo.max_tools);
00238 float host_load = hinfo.load / hinfo.cpus;
00239 float load;
00240 if(host_load < tool_load) {
00241 load = tool_load;
00242 } else {
00243 load = host_load;
00244 }
00245 if (load < lightload) {
00246 lighthost = i;
00247 lightload = load;
00248 light_hinfo = hinfo;
00249 }
00250 }
00251
00252 if (host == NULL) {
00253 if (plat->hosts[lighthost].bad) {
00254
00255
00256 outputMu.lock();
00257 Error("No runtool server found for platform ", loc);
00258 platform->PrintD(&cerr);
00259 ErrorDetail(".\n");
00260 outputMu.unlock();
00261 throw(Evaluator::failure(Text("exiting"), false));
00262 }
00263
00264
00265
00266
00267
00268
00269
00270 int total_tools = (light_hinfo.cur_tools + light_hinfo.cur_pending +
00271 plat->hosts[lighthost].usecount);
00272
00273
00274
00275
00276
00277 if(total_tools >= (light_hinfo.max_tools + light_hinfo.max_pending))
00278 {
00279
00280
00281
00282 if(usecountTotal > 0)
00283 {
00284
00285
00286
00287 if(!plat->lowSlotsMessagePrinted)
00288 {
00289 outputMu.lock();
00290 cerr << "Warning: Difficulty finding an available "
00291 << "runtool server slot for platform \""
00292 << plat->name << "\". You may see less "
00293 << "parallelism than expected." << endl;
00294 outputMu.unlock();
00295 plat->lowSlotsMessagePrinted = true;
00296 }
00297
00298
00299 toolDone.wait(mu);
00300 }
00301 else
00302 {
00303
00304
00305
00306 Text message =
00307 Text("With no tools running, no available runtool server "
00308 "slots for platform \"") +
00309 plat->name + "\"";
00310 outputMu.lock();
00311 Error(message, loc);
00312 outputMu.unlock();
00313 throw(Evaluator::failure(Text("can't start tool"), false));
00314 }
00315 }
00316
00317 else
00318 {
00319 host = &plat->hosts[lighthost];
00320 }
00321 }
00322 }
00323
00324 } catch (...) {
00325 mu.unlock();
00326 throw;
00327 }
00328 host->usecount++;
00329 usecountTotal++;
00330 mu.unlock();
00331 handle = (void*) host;
00332 return host->name;
00333 }
00334
00335 void
00336 RunToolDone(void* handle)
00337 {
00338 Host* host = (Host*) handle;
00339 mu.lock();
00340 host->usecount--;
00341 usecountTotal--;
00342 mu.unlock();
00343
00344
00345
00346
00347
00348 toolDone.broadcast();
00349 }
00350
00351
00352
00353
00354 static Platform*
00355 LookupPlatform(TextVC *platform, SrcLoc *loc)
00356 {
00357 Text platname = platform->NDS().chars();
00358 Platform* plat = platforms;
00359
00360
00361 while (plat) {
00362 if (platname == plat->name) return plat;
00363 plat = plat->next;
00364 }
00365
00366
00367 plat = NEW(Platform);
00368 plat->name = platname;
00369 Text hosts;
00370 try {
00371 plat->sysname = VestaConfig::get_Text(platname, "sysname");
00372 plat->release = VestaConfig::get_Text(platname, "release");
00373 plat->version = VestaConfig::get_Text(platname, "version");
00374 plat->machine = VestaConfig::get_Text(platname, "machine");
00375 plat->cpus = VestaConfig::get_int (platname, "cpus");
00376 plat->cpuMHz = VestaConfig::get_int (platname, "cpuMHz");
00377 plat->memKB = VestaConfig::get_int (platname, "memKB");
00378 hosts = VestaConfig::get_Text(platname, "hosts");
00379 } catch (VestaConfig::failure f) {
00380 outputMu.lock();
00381 Error("Unknown platform ", loc);
00382 platform->PrintD(&cerr);
00383 ErrorDetail(": " + f.msg + ".\n");
00384 outputMu.unlock();
00385 throw(Evaluator::failure(Text("exiting"), false));
00386 }
00387
00388
00389 plat->nhosts = 0;
00390 int len = hosts.Length();
00391 int i = 0;
00392 for (;;) {
00393 while (i < len && isspace(hosts[i])) i++;
00394 if (i >= len) break;
00395 plat->nhosts++;
00396 while (i < len && !isspace(hosts[i])) i++;
00397 }
00398
00399
00400 plat->hosts = NEW_ARRAY(Host, plat->nhosts);
00401 int h = 0;
00402 i = 0;
00403 for (;;) {
00404 while (i < len && isspace(hosts[i])) i++;
00405 if (i >= len) break;
00406 int j = i;
00407 while (j < len && !isspace(hosts[j])) j++;
00408 plat->hosts[h].name = hosts.Sub(i, j-i);
00409 i = j + 1;
00410 h++;
00411 }
00412
00413 if (plat->anchorfirst = ((plat->nhosts > 0) &&
00414 (plat->hosts[0].name == Text("localhost")))) {
00415
00416 plat->hosts[0].name = TCP_sock::this_host();
00417 }
00418
00419 plat->lowSlotsMessagePrinted = false;
00420
00421 plat->next = platforms;
00422 platforms = plat;
00423 return plat;
00424 }
00425
00426
00427
00428
00429 void
00430 RunToolHostInit()
00431 {
00432 mu.lock();
00433 if (!initialized) {
00434 initialized = true;
00435 Word full_hash = TCP_sock::this_host().Hash();
00436 myhosthash = (unsigned int) full_hash ^ (full_hash >> (sizeof(unsigned int)*8));
00437
00438 try
00439 {
00440
00441
00442
00443 if(VestaConfig::is_set("Evaluator", "NegligibleExternalLoadPerCPU"))
00444 {
00445 negligibleExternalLoadPerCPU = VestaConfig::get_float("Evaluator",
00446 "NegligibleExternalLoadPerCPU");
00447 }
00448 }
00449 catch (VestaConfig::failure f)
00450 {
00451 Error(f.msg);
00452 }
00453 }
00454 mu.unlock();
00455 }