00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <Basics.H>
00025 #include <Table.H>
00026
00027
00028 #include <SRPC.H>
00029 #include <MultiSRPC.H>
00030
00031
00032 #include <CacheConfig.H>
00033 #include <CacheIntf.H>
00034 #include <BitVector.H>
00035 #include <ReadConfig.H>
00036 #include <Debug.H>
00037
00038
00039 #include "ParCacheC.H"
00040 #include "WeederC.H"
00041
00042 using std::cout;
00043 using std::endl;
00044
00045
00046 WeederC::WeederC(CacheIntf::DebugLevel debug) throw (SRPC::failure)
00047 : debug(debug)
00048 {
00049 Text serverPort(ReadConfig::TextVal(Config_CacheSection, "Port"));
00050 this->conns = NEW_CONSTR(MultiSRPC, (serverPort));
00051
00052
00053 init_server_instance();
00054 }
00055
00056
00057 WeederC::~WeederC() throw ()
00058 {
00059 if (this->conns != (MultiSRPC *)NULL)
00060 delete this->conns;
00061 }
00062
00063 void WeederC::init_server_instance() throw(SRPC::failure)
00064 {
00065
00066
00067 MultiSRPC::ConnId cid = -1;
00068 try {
00069
00070 SRPC *srpc;
00071 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00072
00073
00074 if (this->debug >= CacheIntf::OtherOps) {
00075 Debug::Lock();
00076 cout << Debug::Timestamp() << "CALLING -- GetCacheInstance"
00077 << endl << endl;
00078 Debug::Unlock();
00079 }
00080
00081
00082 srpc->start_call(CacheIntf::GetCacheInstanceProc, CacheIntf::Version);
00083 srpc->send_end();
00084
00085
00086 this->server_instance.Recv(*srpc);
00087 srpc->recv_end();
00088
00089
00090 if (this->debug >= CacheIntf::OtherOps) {
00091 Debug::Lock();
00092 cout << Debug::Timestamp() << "RETURNED -- GetCacheInstance"
00093 << endl;
00094 cout << " instance fingerprint = " << server_instance << endl;
00095 cout << endl;
00096 Debug::Unlock();
00097 }
00098
00099
00100 this->conns->End(cid);
00101 }
00102 catch (SRPC::failure) {
00103 this->conns->End(cid);
00104 throw;
00105 }
00106 }
00107
00108 void WeederC::recv_server_instance_check(SRPC *srpc, const char *func_name)
00109 const
00110 throw(SRPC::failure)
00111 {
00112
00113 bool l_server_instance_match = (bool)(srpc->recv_int());
00114 if(!l_server_instance_match)
00115 {
00116
00117 srpc->recv_end();
00118
00119
00120 if (this->debug >= CacheIntf::OtherOps) {
00121 Debug::Lock();
00122 cout << Debug::Timestamp()
00123 << "RETURNED -- " << func_name << endl;
00124 cout << " server instance mismatch, call aborted" << endl;
00125 cout << endl;
00126 Debug::Unlock();
00127 }
00128
00129
00130
00131 throw SRPC::failure(1, ("Cache server instance mismatch "
00132 "(cache server must have restarted)"));
00133 }
00134 }
00135
00136
00137
00138
00139
00140
00141 void WeederC::NullCall(CacheIntf::ProcIds id, char *name,
00142 CacheIntf::DebugLevel level) const throw (SRPC::failure)
00143 {
00144 MultiSRPC::ConnId cid = -1;
00145
00146 try {
00147
00148 SRPC *srpc;
00149 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00150
00151
00152 if (this->debug >= level) {
00153 Debug::Lock();
00154 cout << Debug::Timestamp() << "CALLED -- " << name << endl;
00155 cout << endl;
00156 Debug::Unlock();
00157 }
00158
00159
00160 srpc->start_call(id, CacheIntf::Version);
00161
00162 server_instance.Send(*srpc);
00163
00164 srpc->send_end();
00165
00166
00167 recv_server_instance_check(srpc, name);
00168
00169
00170 srpc->recv_end();
00171
00172
00173 if (this->debug >= level) {
00174 Debug::Lock();
00175 cout << Debug::Timestamp() << "RETURNED -- " << name << endl;
00176 cout << endl;
00177 Debug::Unlock();
00178 }
00179
00180
00181 this->conns->End(cid);
00182 }
00183 catch (SRPC::failure) {
00184 this->conns->End(cid);
00185 throw;
00186 }
00187 }
00188
00189 bool WeederC::WeederRecovering(bool doneMarking) const throw (SRPC::failure)
00190 {
00191 MultiSRPC::ConnId cid = -1;
00192
00193 try {
00194
00195 SRPC *srpc;
00196 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00197
00198
00199 if (this->debug >= CacheIntf::WeederOps) {
00200 Debug::Lock();
00201 cout << Debug::Timestamp() << "CALLING -- WeederRecovering" <<endl;
00202 cout << " doneMarking = " << BoolName[doneMarking] << endl;
00203 cout << endl;
00204 Debug::Unlock();
00205 }
00206
00207
00208 srpc->start_call(CacheIntf::WeedRecoverProc, CacheIntf::Version);
00209
00210 server_instance.Send(*srpc);
00211
00212 srpc->send_int((int)doneMarking);
00213 srpc->send_end();
00214
00215
00216
00217
00218
00219
00220
00221 recv_server_instance_check(srpc, "WeederRecovering");
00222
00223
00224 bool err = srpc->recv_int();
00225 srpc->recv_end();
00226
00227
00228 if (this->debug >= CacheIntf::WeederOps) {
00229 Debug::Lock();
00230 cout << Debug::Timestamp() << "RETURNED -- WeederRecovering"<<endl;
00231 cout << " err = " << BoolName[err] << endl;
00232 cout << endl;
00233 Debug::Unlock();
00234 }
00235
00236
00237 this->conns->End(cid);
00238 return err;
00239 }
00240 catch (SRPC::failure) {
00241 this->conns->End(cid);
00242 throw;
00243 }
00244 }
00245
00246 BitVector* WeederC::StartMark( int &newLogVer) const
00247 throw (SRPC::failure)
00248 {
00249 BitVector* res;
00250 MultiSRPC::ConnId cid = -1;
00251
00252 try {
00253
00254 SRPC *srpc;
00255 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00256
00257
00258 if (this->debug >= CacheIntf::WeederOps) {
00259 Debug::Lock();
00260 cout << Debug::Timestamp() << "CALLING -- StartMark" << endl;
00261 cout << endl;
00262 Debug::Unlock();
00263 }
00264
00265
00266 srpc->start_call(CacheIntf::StartMarkProc, CacheIntf::Version);
00267
00268 server_instance.Send(*srpc);
00269
00270 srpc->send_end();
00271
00272
00273 recv_server_instance_check(srpc, "StartMark");
00274
00275
00276 res = NEW_CONSTR(BitVector, (*srpc));
00277 newLogVer = srpc->recv_int();
00278 srpc->recv_end();
00279
00280
00281 if (this->debug >= CacheIntf::WeederOps) {
00282 Debug::Lock();
00283 cout << Debug::Timestamp() << "RETURNED -- StartMark" << endl;
00284 cout << " initCIs = " << endl;
00285 res->PrintAll(cout, 6);
00286 cout << " (" << res->Cardinality() << " total)" << endl;
00287 cout << " newLogVer = " << newLogVer << endl;
00288 cout << endl;
00289 Debug::Unlock();
00290 }
00291
00292
00293 this->conns->End(cid);
00294 }
00295 catch (SRPC::failure) {
00296 this->conns->End(cid);
00297 throw;
00298 }
00299 return res;
00300 }
00301
00302 void WeederC::SetHitFilter(const BitVector &cis) const throw (SRPC::failure)
00303 {
00304 MultiSRPC::ConnId cid = -1;
00305
00306 try {
00307
00308 SRPC *srpc;
00309 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00310
00311
00312 if (this->debug >= CacheIntf::WeederOps) {
00313 Debug::Lock();
00314 cout << Debug::Timestamp() << "CALLING -- SetHitFilter" << endl;
00315 cout << " cis = " << endl;
00316 cis.PrintAll(cout, 6);
00317 cout << " (" << cis.Cardinality() << " total)" << endl;
00318 cout << endl;
00319 Debug::Unlock();
00320 }
00321
00322
00323 srpc->start_call(CacheIntf::SetHitFilterProc, CacheIntf::Version);
00324
00325 server_instance.Send(*srpc);
00326
00327 cis.Send(*srpc);
00328 srpc->send_end();
00329
00330
00331 recv_server_instance_check(srpc, "SetHitFilter");
00332
00333
00334 srpc->recv_end();
00335
00336
00337 if (this->debug >= CacheIntf::WeederOps) {
00338 Debug::Lock();
00339 cout << Debug::Timestamp() << "RETURNED -- SetHitFilter" << endl;
00340 cout << endl;
00341 Debug::Unlock();
00342 }
00343
00344
00345 this->conns->End(cid);
00346 }
00347 catch (SRPC::failure) {
00348 this->conns->End(cid);
00349 throw;
00350 }
00351 }
00352
00353 BitVector* WeederC::GetLeases() const throw (SRPC::failure)
00354 {
00355 BitVector* res;
00356 MultiSRPC::ConnId cid = -1;
00357
00358 try {
00359
00360 SRPC *srpc;
00361 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00362
00363
00364 if (this->debug >= CacheIntf::WeederOps) {
00365 Debug::Lock();
00366 cout << Debug::Timestamp() << "CALLING -- GetLeases" << endl;
00367 cout << endl;
00368 Debug::Unlock();
00369 }
00370
00371
00372 srpc->start_call(CacheIntf::GetLeasesProc, CacheIntf::Version);
00373
00374 server_instance.Send(*srpc);
00375
00376 srpc->send_end();
00377
00378
00379 recv_server_instance_check(srpc, "GetLeases");
00380
00381
00382 res = NEW_CONSTR(BitVector, (*srpc));
00383 srpc->recv_end();
00384
00385
00386 if (this->debug >= CacheIntf::WeederOps) {
00387 Debug::Lock();
00388 cout << Debug::Timestamp() << "RETURNED -- GetLeases" << endl;
00389 cout << " cis = " << endl;
00390 res->PrintAll(cout, 6);
00391 cout << " (" << res->Cardinality() << " total)" << endl;
00392 cout << endl;
00393 Debug::Unlock();
00394 }
00395
00396
00397 this->conns->End(cid);
00398 }
00399 catch (SRPC::failure) {
00400 this->conns->End(cid);
00401 throw;
00402 }
00403 return res;
00404 }
00405
00406 void WeederC::ResumeLeaseExp() const throw (SRPC::failure)
00407 {
00408 NullCall(CacheIntf::ResumeLeasesProc, "ResumeLeaseExp",
00409 CacheIntf::WeederOps);
00410 }
00411
00412 static void SendPKPrefixes(SRPC &srpc, const PKPrefixTbl &pfxs)
00413 throw (SRPC::failure)
00414
00415
00416 {
00417
00418 srpc.send_seq_start(pfxs.Size());
00419
00420
00421 PKPrefix::T pfx; char dummy;
00422 PKPrefixIter it(&pfxs);
00423 while (it.Next( pfx, dummy)) {
00424 pfx.Send(srpc);
00425 }
00426 srpc.send_seq_end();
00427 }
00428
00429 int WeederC::EndMark(const BitVector &cis, const PKPrefixTbl &pfxs) const
00430 throw (SRPC::failure)
00431 {
00432 int chkptVer;
00433 MultiSRPC::ConnId cid = -1;
00434
00435 try {
00436
00437 SRPC *srpc;
00438 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00439
00440
00441 if (this->debug >= CacheIntf::WeederOps) {
00442 Debug::Lock();
00443 cout << Debug::Timestamp() << "CALLING -- EndMark" << endl;
00444 cout << " cis = " << endl;
00445 cis.PrintAll(cout, 6);
00446 cout << " (" << cis.Cardinality() << " total)" << endl;
00447 cout << " prefixes = " << pfxs.Size() << " total" << endl;
00448 cout << endl;
00449 Debug::Unlock();
00450 }
00451
00452
00453 srpc->start_call(CacheIntf::EndMarkProc, CacheIntf::Version);
00454
00455 server_instance.Send(*srpc);
00456
00457 cis.Send(*srpc);
00458 SendPKPrefixes(*srpc, pfxs);
00459 srpc->send_end();
00460
00461
00462 recv_server_instance_check(srpc, "EndMark");
00463
00464
00465 chkptVer = srpc->recv_int();
00466 srpc->recv_end();
00467
00468
00469 if (this->debug >= CacheIntf::WeederOps) {
00470 Debug::Lock();
00471 cout << Debug::Timestamp() << "RETURNED -- EndMark" << endl;
00472 cout << " chkptVer = " << chkptVer << endl;
00473 cout << endl;
00474 Debug::Unlock();
00475 }
00476
00477
00478 this->conns->End(cid);
00479 }
00480 catch (SRPC::failure) {
00481 this->conns->End(cid);
00482 throw;
00483 }
00484 return chkptVer;
00485 }
00486
00487 bool WeederC::CommitChkpt(const Text &chkptFileName)
00488 const throw (SRPC::failure)
00489 {
00490 MultiSRPC::ConnId cid = -1;
00491
00492 try {
00493
00494 SRPC *srpc;
00495 cid = this->conns->Start(*ParCacheC::Locate(), srpc);
00496
00497
00498 if (this->debug >= CacheIntf::WeederOps) {
00499 Debug::Lock();
00500 cout << Debug::Timestamp() << "CALLED -- CommitChkpt" << endl;
00501 cout << " chkptFileName = " << chkptFileName << endl;
00502 cout << endl;
00503 Debug::Unlock();
00504 }
00505
00506
00507 srpc->start_call(CacheIntf::CommitChkptProc, CacheIntf::Version);
00508
00509 server_instance.Send(*srpc);
00510
00511 srpc->send_Text(chkptFileName);
00512 srpc->send_end();
00513
00514
00515 recv_server_instance_check(srpc, "CommitChkpt");
00516
00517
00518 bool l_result = srpc->recv_int();
00519 srpc->recv_end();
00520
00521
00522 if (this->debug >= CacheIntf::WeederOps) {
00523 Debug::Lock();
00524 cout << Debug::Timestamp() << "RETURNED -- CommitChkpt" << endl;
00525 cout << " result: checkpoint " << (l_result?"accepted":"rejected")
00526 << endl;
00527 cout << endl;
00528 Debug::Unlock();
00529 }
00530
00531
00532 this->conns->End(cid);
00533 return l_result;
00534 }
00535 catch (SRPC::failure) {
00536 this->conns->End(cid);
00537 throw;
00538 }
00539 }