00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <Basics.H>
00024
00025
00026 #include <SRPC.H>
00027 #include <MultiSRPC.H>
00028
00029
00030 #include <FP.H>
00031
00032
00033 #include <CacheConfig.H>
00034 #include <CacheIntf.H>
00035 #include <ReadConfig.H>
00036 #include <BitVector.H>
00037 #include <FV.H>
00038 #include <CacheIndex.H>
00039 #include <Model.H>
00040 #include <VestaVal.H>
00041 #include <Debug.H>
00042 #include <FV2.H>
00043 #include <CompactFV.H>
00044
00045
00046 #include "ParCacheC.H"
00047 #include "CacheC.H"
00048
00049 using std::cout;
00050 using std::endl;
00051
00052
00053
00054
00055
00056 unsigned int g_debug_call_count = 1;
00057
00058
00059 CacheC::CacheC(CacheIntf::DebugLevel debug) throw (SRPC::failure)
00060 : debug(debug)
00061 {
00062 Text serverPort(ReadConfig::TextVal(Config_CacheSection, "Port"));
00063 this->conns = NEW_CONSTR(MultiSRPC, (serverPort));
00064
00065
00066 init_server_instance();
00067 }
00068
00069
00070 CacheC::~CacheC() throw ()
00071 {
00072 if (this->conns != (MultiSRPC *)NULL)
00073 delete this->conns;
00074 }
00075
00076 void CacheC::init_server_instance() throw(SRPC::failure)
00077 {
00078 unsigned int l_debug_call_number;
00079
00080
00081
00082 MultiSRPC::ConnId cid = -1;
00083 try {
00084
00085 SRPC *srpc;
00086 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00087
00088
00089 if (this->debug >= CacheIntf::OtherOps) {
00090 Debug::Lock();
00091 l_debug_call_number = g_debug_call_count++;
00092 cout << Debug::Timestamp() << "CALLING -- GetCacheInstance"
00093 << " (call #" << l_debug_call_number << ")"
00094 << endl << endl;
00095 Debug::Unlock();
00096 }
00097
00098
00099 srpc->start_call(CacheIntf::GetCacheInstanceProc, CacheIntf::Version);
00100 srpc->send_end();
00101
00102
00103 this->server_instance.Recv(*srpc);
00104 srpc->recv_end();
00105
00106
00107 if (this->debug >= CacheIntf::OtherOps) {
00108 Debug::Lock();
00109 cout << Debug::Timestamp() << "RETURNED -- GetCacheInstance"
00110 << " (call #" << l_debug_call_number << ")"
00111 << endl;
00112 cout << " instance fingerprint = " << server_instance << endl;
00113 cout << endl;
00114 Debug::Unlock();
00115 }
00116
00117
00118 this->conns->End(cid);
00119 }
00120 catch (SRPC::failure) {
00121 this->conns->End(cid);
00122 throw;
00123 }
00124 }
00125
00126 void CacheC::recv_server_instance_check(SRPC *srpc, const char *func_name)
00127 const
00128 throw(SRPC::failure)
00129 {
00130
00131 bool l_server_instance_match = (bool)(srpc->recv_int());
00132 if(!l_server_instance_match)
00133 {
00134
00135 srpc->recv_end();
00136
00137
00138 if (this->debug >= CacheIntf::OtherOps) {
00139 Debug::Lock();
00140 cout << Debug::Timestamp()
00141 << "RETURNED -- " << func_name << endl;
00142 cout << " server instance mismatch, call aborted" << endl;
00143 cout << endl;
00144 Debug::Unlock();
00145 }
00146
00147
00148
00149 throw SRPC::failure(1, ("Cache server instance mismatch "
00150 "(cache server must have restarted)"));
00151 }
00152 }
00153
00154 FV::Epoch CacheC::FreeVariables(const FP::Tag& pk,
00155 CompactFV::List& names, bool &isEmpty)
00156 const throw (SRPC::failure)
00157 {
00158 FV::Epoch res;
00159 MultiSRPC::ConnId cid = -1;
00160 unsigned int l_debug_call_number;
00161
00162 try {
00163
00164 SRPC *srpc;
00165 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00166
00167
00168 if (this->debug >= CacheIntf::OtherOps) {
00169 Debug::Lock();
00170 l_debug_call_number = g_debug_call_count++;
00171 cout << Debug::Timestamp() << "CALLING -- FreeVariables"
00172 << " (call #" << l_debug_call_number << ")" << endl;
00173 cout << " pk = " << pk << endl;
00174 cout << endl;
00175 Debug::Unlock();
00176 }
00177
00178
00179 srpc->start_call(CacheIntf::FreeVarsProc, CacheIntf::Version);
00180
00181 server_instance.Send(*srpc);
00182
00183 pk.Send(*srpc);
00184 srpc->send_end();
00185
00186
00187 recv_server_instance_check(srpc, "FreeVariables");
00188
00189
00190 isEmpty = (bool)(srpc->recv_int());
00191 names.Recv(*srpc);
00192 res = (FV::Epoch)(srpc->recv_int());
00193 srpc->recv_end();
00194
00195
00196 if (this->debug >= CacheIntf::OtherOps) {
00197 Debug::Lock();
00198 cout << Debug::Timestamp() << "RETURNED -- FreeVariables"
00199 << " (call #" << l_debug_call_number << ")" << endl;
00200 cout << " epoch = " << res << endl;
00201 cout << " isEmpty = " << BoolName[isEmpty] << endl;
00202 cout << " names =" << endl; names.Print(cout, 4);
00203 cout << " names.tbl =";
00204 if (names.tbl.NumArcs() > 0) {
00205 cout << endl; names.tbl.Print(cout, 4);
00206 } else {
00207 cout << " <empty>" << endl;
00208 }
00209 cout << endl;
00210 Debug::Unlock();
00211 }
00212
00213
00214 this->conns->End(cid);
00215 }
00216 catch (SRPC::failure) {
00217 this->conns->End(cid);
00218 throw;
00219 }
00220 return res;
00221 }
00222
00223 CacheIntf::LookupRes CacheC::Lookup(
00224 const FP::Tag& pk, const FV::Epoch id, const FP::List& fps,
00225 CacheEntry::Index& ci, VestaVal::T& value)
00226 const throw (SRPC::failure)
00227 {
00228 CacheIntf::LookupRes res;
00229 MultiSRPC::ConnId cid = -1;
00230 unsigned int l_debug_call_number;
00231
00232 try {
00233
00234 SRPC *srpc;
00235 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00236
00237
00238 if (this->debug >= CacheIntf::OtherOps) {
00239 Debug::Lock();
00240 l_debug_call_number = g_debug_call_count++;
00241 cout << Debug::Timestamp() << "CALLING -- Lookup"
00242 << " (call #" << l_debug_call_number << ")" << endl;
00243 cout << " pk = " << pk << endl;
00244 cout << " epoch = " << id << endl;
00245 cout << " fps =" << endl; fps.Print(cout, 4);
00246 cout << endl;
00247 Debug::Unlock();
00248 }
00249
00250
00251 srpc->start_call(CacheIntf::LookupProc, CacheIntf::Version);
00252
00253 server_instance.Send(*srpc);
00254
00255 pk.Send(*srpc);
00256 srpc->send_int(id);
00257 fps.Send(*srpc);
00258 srpc->send_end();
00259
00260
00261 recv_server_instance_check(srpc, "Lookup");
00262
00263
00264 res = (CacheIntf::LookupRes)(srpc->recv_int());
00265 if (res == CacheIntf::Hit) {
00266 ci = srpc->recv_int();
00267 value.Recv(*srpc);
00268 }
00269 srpc->recv_end();
00270
00271
00272 if (this->debug >= CacheIntf::OtherOps) {
00273 Debug::Lock();
00274 cout << Debug::Timestamp() << "RETURNED -- Lookup"
00275 << " (call #" << l_debug_call_number << ")" << endl;
00276 cout << " result = " << CacheIntf::LookupResName(res) << endl;
00277 if (res == CacheIntf::Hit) {
00278 cout << " index = " << ci << endl;
00279 cout << " value =" << endl; value.Print(cout, 4);
00280 }
00281 cout << endl;
00282 Debug::Unlock();
00283 }
00284
00285
00286 this->conns->End(cid);
00287 }
00288 catch (SRPC::failure) {
00289 this->conns->End(cid);
00290 throw;
00291 }
00292 return res;
00293 }
00294
00295 CacheIntf::AddEntryRes CacheC::AddEntry(
00296 const FP::Tag& pk, const char *types, const FV2::List& names,
00297 const FP::List& fps, const VestaVal::T& value, const Model::T model,
00298 const CacheEntry::Indices& kids, const Text& sourceFunc,
00299 CacheEntry::Index& ci) throw (SRPC::failure)
00300 {
00301 CacheIntf::AddEntryRes res;
00302 MultiSRPC::ConnId cid = -1;
00303 unsigned int l_debug_call_number;
00304
00305 try {
00306
00307 SRPC *srpc;
00308 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00309
00310
00311 if (this->debug >= CacheIntf::AddEntryOp) {
00312 Debug::Lock();
00313 l_debug_call_number = g_debug_call_count++;
00314 cout << Debug::Timestamp() << "CALLING -- AddEntry"
00315 << " (call #" << l_debug_call_number << ")" << endl;
00316 cout << " pk = " << pk << endl;
00317 cout << " names ="<<endl; names.Print(cout, 4, types);
00318 cout << " fps =" << endl; fps.Print(cout, 4);
00319 cout << " value =" << endl; value.Print(cout, 4);
00320 cout << " model = " << model << endl;
00321 cout << " kids = " << kids << endl;
00322 cout << " sourceFunc = " << sourceFunc << endl;
00323 cout << endl;
00324 Debug::Unlock();
00325 }
00326
00327
00328 srpc->start_call(CacheIntf::AddEntryProc, CacheIntf::Version);
00329
00330 server_instance.Send(*srpc);
00331
00332 pk.Send(*srpc);
00333 names.Send(*srpc, types);
00334 fps.Send(*srpc);
00335 value.Send(*srpc);
00336 Model::Send(model, *srpc);
00337 kids.Send(*srpc);
00338 srpc->send_Text(sourceFunc);
00339 srpc->send_end();
00340
00341
00342 recv_server_instance_check(srpc, "AddEntry");
00343
00344
00345 ci = srpc->recv_int();
00346 res = (CacheIntf::AddEntryRes)(srpc->recv_int());
00347 srpc->recv_end();
00348
00349
00350 if (this->debug >= CacheIntf::AddEntryOp) {
00351 Debug::Lock();
00352 cout << Debug::Timestamp() << "RETURNED -- AddEntry"
00353 << " (call #" << l_debug_call_number << ")" << endl;
00354 cout << " result = " << CacheIntf::AddEntryResName(res) << endl;
00355 if (res == CacheIntf::EntryAdded) {
00356 cout << " ci = " << ci << endl;
00357 }
00358 cout << endl;
00359 Debug::Unlock();
00360 }
00361
00362
00363 this->conns->End(cid);
00364 }
00365 catch (SRPC::failure) {
00366 this->conns->End(cid);
00367 throw;
00368 }
00369 return res;
00370 }
00371
00372 void CacheC::Checkpoint (const FP::Tag &pkgVersion, Model::T model,
00373 const CacheEntry::Indices& cis, bool done) throw (SRPC::failure)
00374 {
00375 MultiSRPC::ConnId cid = -1;
00376 unsigned int l_debug_call_number;
00377
00378 try {
00379
00380 SRPC *srpc;
00381 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00382
00383
00384 if (this->debug >= CacheIntf::LogFlush) {
00385 Debug::Lock();
00386 l_debug_call_number = g_debug_call_count++;
00387 cout << Debug::Timestamp() << "CALLING -- Checkpoint"
00388 << " (call #" << l_debug_call_number << ")" << endl;
00389 cout << " pkgVersion = " << pkgVersion << endl;
00390 cout << " model = " << model << endl;
00391 cout << " cis = " << cis << endl;
00392 cout << " done = " << BoolName[done] << endl;
00393 cout << endl;
00394 Debug::Unlock();
00395 }
00396
00397
00398 srpc->start_call(CacheIntf::CheckpointProc, CacheIntf::Version);
00399
00400 server_instance.Send(*srpc);
00401
00402 pkgVersion.Send(*srpc);
00403 Model::Send(model, *srpc);
00404 cis.Send(*srpc);
00405 srpc->send_int((int)done);
00406 srpc->send_end();
00407
00408
00409 recv_server_instance_check(srpc, "Checkpoint");
00410
00411
00412 srpc->recv_end();
00413
00414
00415 if (this->debug >= CacheIntf::LogFlush) {
00416 Debug::Lock();
00417 cout << Debug::Timestamp() << "RETURNED Checkpoint"
00418 << " (call #" << l_debug_call_number << ")" << endl;
00419 cout << endl;
00420 Debug::Unlock();
00421 }
00422
00423
00424 this->conns->End(cid);
00425 }
00426 catch (SRPC::failure) {
00427 this->conns->End(cid);
00428 throw;
00429 }
00430 }
00431
00432 bool CacheC::RenewLeases(const CacheEntry::Indices& cis)
00433 throw (SRPC::failure)
00434 {
00435 bool res;
00436 MultiSRPC::ConnId cid = -1;
00437 unsigned int l_debug_call_number;
00438
00439 try {
00440
00441 SRPC *srpc;
00442 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00443
00444
00445 if (this->debug >= CacheIntf::LeaseExp) {
00446 Debug::Lock();
00447 l_debug_call_number = g_debug_call_count++;
00448 cout << Debug::Timestamp() << "CALLING -- RenewLeases"
00449 << " (call #" << l_debug_call_number << ")" << endl;
00450 cout << " cis = " << cis << endl;
00451 cout << endl;
00452 Debug::Unlock();
00453 }
00454
00455
00456 srpc->start_call(CacheIntf::RenewLeasesProc, CacheIntf::Version);
00457
00458 server_instance.Send(*srpc);
00459
00460 cis.Send(*srpc);
00461 srpc->send_end();
00462
00463
00464 recv_server_instance_check(srpc, "RenewLeases");
00465
00466
00467 res = (bool)(srpc->recv_int());
00468 srpc->recv_end();
00469
00470
00471 if (this->debug >= CacheIntf::LeaseExp) {
00472 Debug::Lock();
00473 cout << Debug::Timestamp() << "RETURNED -- RenewLeases"
00474 << " (call #" << l_debug_call_number << ")" << endl;
00475 cout << " res = " << (res ? "true" : "false") << endl;
00476 cout << endl;
00477 Debug::Unlock();
00478 }
00479
00480
00481 this->conns->End(cid);
00482 }
00483 catch (SRPC::failure) {
00484 this->conns->End(cid);
00485 throw;
00486 }
00487 return res;
00488 }