00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include <sys/types.h>
00028 #include <dirent.h>
00029 #include <time.h>
00030 #include <unistd.h>
00031
00032
00033 #include <Basics.H>
00034 #include <FS.H>
00035 #include <VestaLog.H>
00036 #include <Recovery.H>
00037 #include <VestaLogSeq.H>
00038 #include <AtomicFile.H>
00039
00040 #include <FP.H>
00041 #include <UniqueId.H>
00042
00043
00044 #include <BitVector.H>
00045 #include <CacheConfig.H>
00046 #include <CacheIntf.H>
00047 #include <Debug.H>
00048 #include <PKPrefix.H>
00049 #include <Timer.H>
00050 #include <Model.H>
00051 #include <PKEpoch.H>
00052 #include <CacheIndex.H>
00053 #include <Derived.H>
00054 #include <FV.H>
00055 #include <VestaVal.H>
00056 #include <GraphLog.H>
00057
00058
00059 #include <CacheConfigServer.H>
00060 #include <Leases.H>
00061 #include <CacheEntry.H>
00062 #include <CacheLog.H>
00063 #include <Intvl.H>
00064 #include <VPKFile.H>
00065 #include <VMultiPKFile.H>
00066
00067
00068 #include "VCacheVersion.H"
00069 #include "PKPrefixSeq.H"
00070 #include "CacheS.H"
00071
00072 using std::ios;
00073 using std::fstream;
00074 using std::ifstream;
00075 using std::cout;
00076 using std::cerr;
00077 using std::endl;
00078
00079 CacheS::CacheS(CacheIntf::DebugLevel debug, bool noHits) throw ()
00080
00081 : debug(debug), noHits(noHits)
00082 {
00083 try {
00084
00085 this->mu.lock();
00086 try {
00087 this->leases =
00088 NEW_CONSTR(Leases, (&(this->mu),
00089 Config_LeaseTimeoutSecs,
00090 (this->debug >= CacheIntf::LeaseExp)));
00091 this->cache = NEW_CONSTR(CacheMap, (5, true));
00092 this->mpkTbl = NEW_CONSTR(MPKMap, (5, true));
00093 this->emptyPKLog = NEW_CONSTR(EmptyPKLog,
00094 (&(this->mu), this->debug));
00095 this->weederSRPC = (SRPC *)NULL;
00096 this->graphLogChkptVer = -1;
00097 this->freeMPKFileEpoch = 0;
00098 this->entryCnt = 0;
00099 this->idleFlushWorkers = (FlushWorkerList *)NULL;
00100 this->numActiveFlushWorkers = 0;
00101 } catch (...) { this->mu.unlock(); throw; }
00102 this->mu.unlock();
00103
00104 this->Recover();
00105
00106
00107 this->instanceFp = UniqueId();
00108
00109
00110 this->instanceFp.Extend((char *) &(this->entryCnt),
00111 sizeof(this->entryCnt));
00112
00113 if (this->debug >= CacheIntf::StatusMsgs)
00114 {
00115 Debug::Lock();
00116 cout << Debug::Timestamp()
00117 << "Instance fingerprint: "
00118 << this->instanceFp << endl;
00119 Debug::Unlock();
00120 }
00121
00122
00123 Basics::thread *th2 = NEW_PTRFREE(Basics::thread);
00124 th2->fork_and_detach(CacheS_DoFreeMPKFiles, (void *)this);
00125
00126
00127 Basics::thread *th1 = NEW_PTRFREE(Basics::thread);
00128 th1->fork_and_detach(CacheS_DoDeletions, (void *)this);
00129
00130
00131 this->cacheLogMu.lock();
00132 try {
00133 this->idleCleanWorker = NEW_CONSTR(CleanWorker, (this));
00134 this->cacheLogLen = 0;
00135 } catch (...) { this->cacheLogMu.unlock(); throw; }
00136 this->cacheLogMu.unlock();
00137
00138
00139 this->chkptMu.lock();
00140 this->availChkptWorkers = (ChkptWorker *)NULL;
00141 this->queuedChkptWorkers = (ChkptWorker *)NULL;
00142 this->chkptMu.unlock();
00143 this->chkptTh.fork_and_detach(ChkptWorker::MainLoop, (void *)this);
00144
00145
00146 typedef FlushWorker *FlushWorkerPtr;
00147 FlushWorkerPtr *workers = NEW_ARRAY(FlushWorkerPtr,
00148 Config_FlushWorkerCnt);
00149 this->mu.lock();
00150 int i;
00151 for (i = 0; i < Config_FlushWorkerCnt; i++) {
00152 workers[i] = this->NewFlushWorker( false);
00153 }
00154 this->mu.unlock();
00155 for (i = 0; i < Config_FlushWorkerCnt; i++) {
00156 RegisterIdleFlushWorker(workers[i]);
00157 workers[i] = NULL;
00158 }
00159 delete[] workers;
00160
00161
00162 this->mu.lock();
00163 this->startTime = time((time_t *)NULL);
00164 assert(this->startTime >= 0);
00165 this->mu.unlock();
00166 }
00167 catch (const VestaLog::Error &err) {
00168 cerr << "VestaLog fatal error: ";
00169 cerr << "failed recovering cache server" << endl;
00170 cerr << " num = " << err.r << "; " << err.msg << endl;
00171 cerr << "Exiting..." << endl;
00172 exit(1);
00173 }
00174 catch (VestaLog::Eof) {
00175 cerr << "VestaLog fatal error: ";
00176 cerr << "unexpected EOF recovering cache server; exiting..." << endl;
00177 exit(1);
00178 }
00179 catch (const FS::Failure &f) {
00180 cerr << "FS fatal error: ";
00181 cerr << "unexpected FS error recovering cache server; exiting..."
00182 << endl << f;
00183 exit(1);
00184 }
00185 }
00186
00187
00188
00189 bool CacheS::FindVPKFile(const FP::Tag &pk, VPKFile* &vpk) throw ()
00190
00191
00192
00193 {
00194 bool res = this->cache->Get(pk, vpk);
00195 if (!res) {
00196
00197 try {
00198 PKFile::Epoch delPKEpoch = 0;
00199 (void)(this->emptyPKLog->GetEpoch0(pk, delPKEpoch));
00200 FV::Epoch namesEpoch = 0;
00201 (void)evictedNamesEpochs.Delete(pk, namesEpoch);
00202 vpk = NEW_CONSTR(VPKFile, (pk, delPKEpoch, namesEpoch));
00203 }
00204 catch (const FS::Failure &f) {
00205
00206 cerr << f;
00207 assert(false);
00208 }
00209 }
00210 return res;
00211 }
00212
00213 PKFile::Epoch CacheS::PKFileEpoch(const FP::Tag &pk) throw ()
00214
00215 {
00216 VPKFile *vpk;
00217 (void)(this->FindVPKFile(pk, vpk));
00218 return vpk->PKEpoch();
00219 }
00220
00221 bool CacheS::GetVPKFile(const FP::Tag &pk, VPKFile* &vpk) throw ()
00222
00223
00224
00225 {
00226 bool res = this->FindVPKFile(pk, vpk);
00227 if (!res) {
00228
00229 bool inCache = cache->Put(pk, vpk); assert(!inCache);
00230
00231
00232 PKPrefix::T pfx(pk); VMultiPKFile *mpk;
00233 if (!(this->mpkTbl->Get(pfx, mpk))) {
00234
00235 mpk = NEW_CONSTR(VMultiPKFile, (pfx));
00236 bool inMPK = this->mpkTbl->Put(pfx, mpk); assert(!inMPK);
00237 }
00238 bool inVMPK = mpk->Put(pk, vpk); assert(!inVMPK);
00239 }
00240
00241 return res;
00242 }
00243
00244 void CacheS::FreeVariables(const FP::Tag& pk, VPKFile* &vf)
00245 throw ()
00246
00247 {
00248 this->mu.lock();
00249 this->cnt.freeVarsCnt++;
00250 (void)(this->GetVPKFile(pk, vf));
00251 int currentFreeEpoch = this->freeMPKFileEpoch;
00252 this->mu.unlock();
00253
00254
00255 vf->mu.lock();
00256
00257
00258
00259 while(vf->Evicted())
00260 {
00261
00262 vf->mu.unlock();
00263
00264
00265 this->mu.lock();
00266 (void)(this->GetVPKFile(pk, vf));
00267 this->mu.unlock();
00268
00269 vf->mu.lock();
00270 }
00271
00272 vf->UpdateFreeEpoch(currentFreeEpoch);
00273 vf->mu.unlock();
00274 }
00275
00276 static void Etp_CacheS_Lookup(int fps_bytes, int outcome, int val_bytes)
00277 throw ()
00278
00279
00280
00281
00282
00283
00284 { }
00285
00286 CacheIntf::LookupRes
00287 CacheS::Lookup(const FP::Tag &pk, FV::Epoch id, const FP::List &fps,
00288 CacheEntry::Index &ci, const VestaVal::T* &value) throw ()
00289
00290 {
00291 CacheIntf::LookupRes res;
00292 CacheIntf::LookupOutcome outcome;
00293
00294
00295 VPKFile *vf;
00296 this->mu.lock();
00297 this->cnt.lookupCnt++;
00298 (void)(this->GetVPKFile(pk, vf));
00299 int currentFreeEpoch = this->freeMPKFileEpoch;
00300 this->mu.unlock();
00301
00302 vf->mu.lock();
00303
00304
00305
00306 while(vf->Evicted())
00307 {
00308
00309 vf->mu.unlock();
00310
00311
00312 this->mu.lock();
00313 (void)(this->GetVPKFile(pk, vf));
00314 this->mu.unlock();
00315
00316 vf->mu.lock();
00317 }
00318
00319
00320 vf->UpdateFreeEpoch(currentFreeEpoch);
00321
00322
00323 try {
00324 res = vf->Lookup(id, fps,
00325 ci, value, outcome);
00326 }
00327 catch (const FS::Failure &f) {
00328
00329 cerr << f;
00330 assert(false);
00331 }
00332
00333 vf->mu.unlock();
00334
00335 if (res == CacheIntf::Hit) {
00336 if (this->noHits) {
00337
00338 res = CacheIntf::Miss;
00339 } else {
00340 this->mu.lock();
00341 try {
00342
00343 if (this->hitFilter.Read(ci) &&
00344 !(this->leases->IsLeased(ci))) {
00345 res = CacheIntf::Miss;
00346 } else {
00347
00348 this->leases->NewLease(ci);
00349 }
00350
00351
00352 if (outcome == CacheIntf::DiskHits) {
00353 this->state.oldEntryCnt++;
00354 this->state.oldPklSize += value->len;
00355 }
00356
00357
00358
00359
00360 if(!this->usedCIs.Read(ci))
00361 {
00362 Debug::Lock();
00363 cerr << Debug::Timestamp()
00364 << "INTERNAL ERROR: hit on unused CI" << endl
00365 << " ci = " << ci << endl
00366 << " pk = " << pk << endl
00367 << " (Please report this as a bug.)" << endl;
00368 Debug::Unlock();
00369 abort();
00370 }
00371 } catch (...) { this->mu.unlock(); throw; }
00372 this->mu.unlock();
00373 }
00374 }
00375 Etp_CacheS_Lookup(fps.Size(), outcome,
00376 (res == CacheIntf::Hit) ? value->Size() : sizeof(value));
00377 return res;
00378 }
00379
00380 static void Etp_CacheS_AddEntry(int arg_bytes) throw ()
00381
00382
00383
00384 { }
00385
00386 CacheIntf::AddEntryRes
00387 CacheS::AddEntry(FP::Tag *pk, FV::List *names, FP::List *fps,
00388 VestaVal::T *value, Model::T model, CacheEntry::Indices *kids,
00389 const Text& sourceFunc, CacheEntry::Index& ci) throw ()
00390
00391 {
00392 GraphLog::Node *dummy_var;
00393 CacheIntf::AddEntryRes res = CacheIntf::BadAddEntryArgs;
00394 if (names->len != fps->len) {
00395 res = CacheIntf::BadAddEntryArgs;
00396 } else {
00397 int kid; VPKFile *vf;
00398 this->mu.lock();
00399 this->cnt.addEntryCnt++;
00400 int currentFreeEpoch = this->freeMPKFileEpoch;
00401 try {
00402
00403 (void)(this->GetVPKFile(*pk, vf));
00404
00405
00406 BitVector *except = this->deleting
00407 ? &(this->hitFilter) : (BitVector *)NULL;
00408 ci = this->usedCIs.NextAvailExcept(except);
00409 this->entryCnt++;
00410 this->LogCI( Intvl::Add, ci);
00411
00412
00413 this->leases->NewLease(ci);
00414
00415
00416 for (kid = 0; kid < kids->len; kid++) {
00417 if (!this->leases->IsLeased(kids->index[kid])) break;
00418 }
00419
00420
00421 if (kid == kids->len) {
00422
00423 this->LogGraphNode(ci, pk, model, kids, &(value->dis));
00424 }
00425 } catch (...) { this->mu.unlock(); throw; }
00426 this->mu.unlock();
00427
00428 if (kid < kids->len) {
00429
00430 res = CacheIntf::NoLease;
00431 } else {
00432 vf->mu.lock();
00433
00434
00435
00436 while(vf->Evicted())
00437 {
00438
00439 vf->mu.unlock();
00440
00441
00442 this->mu.lock();
00443 (void)(this->GetVPKFile(*pk, vf));
00444 this->mu.unlock();
00445
00446 vf->mu.lock();
00447 }
00448
00449
00450 vf->UpdateFreeEpoch(currentFreeEpoch);
00451
00452 try {
00453
00454 FP::Tag *commonFP;
00455 CE::T *entry = vf->NewEntry(ci, names, fps,
00456 value, model, kids,
00457 commonFP);
00458
00459
00460 this->mu.lock();
00461 try {
00462 this->LogCacheEntry(sourceFunc, pk, vf->PKEpoch(),
00463 ci, value, model, kids,
00464 names, fps);
00465 } catch (...) { this->mu.unlock(); throw; }
00466 this->mu.unlock();
00467
00468
00469 vf->AddEntry(NEW_CONSTR(Text, (sourceFunc)), entry, commonFP);
00470 this->mu.lock();
00471 this->state.newEntryCnt++;
00472 this->state.newPklSize += entry->Value()->len;
00473 this->mu.unlock();
00474 res = CacheIntf::EntryAdded;
00475 }
00476 catch (DuplicateNames) {
00477 res = CacheIntf::BadAddEntryArgs;
00478 }
00479 catch (...) { vf->mu.unlock(); throw; }
00480 vf->mu.unlock();
00481
00482
00483 PKPrefix::T pfx(*pk);
00484 this->AddEntryToMPKFile(pfx,
00485 "MultiPKFile threshold exceeded on CacheS::AddEntry");
00486 }
00487 }
00488 Etp_CacheS_AddEntry(sizeof(*pk) + names->CheapSize()
00489 + fps->Size() + value->Size() + kids->Size());
00490 return res;
00491 }
00492
00493
00494
00495 void CacheS::AddEntryToMPKFile(const PKPrefix::T &pfx, const char *reason)
00496 throw ()
00497
00498 {
00499 VMultiPKFile *mpk;
00500 this->mu.lock();
00501 try {
00502 bool inTbl = this->mpkTbl->Get(pfx, mpk); assert(inTbl);
00503 mpk->IncEntries();
00504 mpk->UpdateEpoch(this->freeMPKFileEpoch);
00505 if (mpk->IsFull()) {
00506 this->FlushMPKFile(pfx, reason, false);
00507 }
00508 } catch (...) { this->mu.unlock(); throw; }
00509 this->mu.unlock();
00510 }
00511
00512 void CacheS::FlushMPKFile(const PKPrefix::T &pfx,
00513 const char *reason, bool block) throw ()
00514
00515 {
00516 FlushWorker *worker = this->NewFlushWorker(block);
00517 worker->Start(pfx, reason);
00518 }
00519
00520 FlushWorker *CacheS::NewFlushWorker(bool block) throw ()
00521
00522 {
00523 FlushWorker *res;
00524 if (!block && this->idleFlushWorkers == (FlushWorkerList *)NULL) {
00525
00526 res = NEW_CONSTR(FlushWorker, (this));
00527 } else {
00528 while (this->idleFlushWorkers == NULL) {
00529 assert(block);
00530 this->availFlushWorker.wait(this->mu);
00531 }
00532
00533 res = this->idleFlushWorkers->worker;
00534 this->idleFlushWorkers = this->idleFlushWorkers->next;
00535 }
00536 this->numActiveFlushWorkers++;
00537 return res;
00538 }
00539
00540 void CacheS::RegisterIdleFlushWorker(FlushWorker *worker) throw ()
00541
00542 {
00543 FlushWorkerList *cons = NEW(FlushWorkerList);
00544 cons->worker = worker;
00545 this->mu.lock();
00546 cons->next = this->idleFlushWorkers;
00547 this->idleFlushWorkers = cons;
00548 this->numActiveFlushWorkers--;
00549 if (this->numActiveFlushWorkers == 0) {
00550 this->allFlushWorkersDone.signal();
00551 }
00552 this->mu.unlock();
00553 this->availFlushWorker.signal();
00554 }
00555
00556 void *CacheS_DoFreeMPKFiles(void *arg) throw ()
00557
00558 {
00559 CacheS *cache = (CacheS *)arg;
00560
00561 while (true) {
00562 MethodCnts cnt;
00563 if (!Config_FreeAggressively) {
00564
00565 cache->mu.lock();
00566 cnt = cache->cnt;
00567 cache->mu.unlock();
00568 }
00569
00570
00571 Basics::thread::pause(Config_FreePauseDur);
00572
00573
00574 cache->mu.lock();
00575 int lastFreeMPKFileEpoch = cache->freeMPKFileEpoch;
00576 cache->freeMPKFileEpoch++;
00577
00578 if (!Config_FreeAggressively) {
00579
00580 if (cnt != cache->cnt) {
00581 cache->mu.unlock();
00582 continue;
00583 }
00584 }
00585
00586
00587 CacheS::MPKIter it(cache->mpkTbl);
00588 PKPrefixSeq toFlush( cache->mpkTbl->Size() / 2);
00589 PKPrefixSeq toPurge( cache->mpkTbl->Size() / 2);
00590 PKPrefix::T pfx; VMultiPKFile *mpk;
00591 while (it.Next( pfx, mpk)) {
00592 if (mpk->IsStale(lastFreeMPKFileEpoch)) {
00593 toFlush.addhi(pfx);
00594 } else if (mpk->IsUnmodified()) {
00595 toPurge.addhi(pfx);
00596 }
00597 }
00598 cache->mu.unlock();
00599
00600 if (toFlush.size() > 0) {
00601
00602 if (cache->debug >= CacheIntf::MPKFileFlush) {
00603 Debug::Lock();
00604 cout << Debug::Timestamp()
00605 << "STARTED -- Flushing VMultiPKFiles" << endl;
00606 cout << " number = " << toFlush.size() << endl << endl;
00607 Debug::Unlock();
00608 }
00609
00610 cache->mu.lock();
00611 try {
00612
00613 while (toFlush.size() > 0) {
00614 cache->FlushMPKFile(toFlush.remlo(),
00615 "CacheS_DoFreeMPKFiles called",
00616 true);
00617 }
00618
00619
00620 while (cache->numActiveFlushWorkers > 0) {
00621 cache->allFlushWorkersDone.wait(cache->mu);
00622 }
00623 } catch (...) { cache->mu.unlock(); throw; }
00624 cache->mu.unlock();
00625
00626
00627 if (cache->debug >= CacheIntf::MPKFileFlush) {
00628 Debug::Lock();
00629 cout << Debug::Timestamp()
00630 << "FINISHED -- Flushing VMultiPKFiles" << endl << endl;
00631 Debug::Unlock();
00632 }
00633
00634
00635 cache->cacheLogMu.lock();
00636 try {
00637 cache->TryCleanCacheLog( -1,
00638 "CacheS_DoFreeMPKFiles thread");
00639 } catch (...) { cache->cacheLogMu.unlock(); throw; }
00640 cache->cacheLogMu.unlock();
00641 }
00642
00643
00644
00645 if(toPurge.size() > 0)
00646 {
00647
00648 if (cache->debug >= CacheIntf::MPKFileFlush)
00649 {
00650 Debug::Lock();
00651 cout << Debug::Timestamp()
00652 << "STARTED -- Freeing old entries from unmodified VMultiPKFiles" << endl;
00653 cout << " number = " << toPurge.size() << endl << endl;
00654 Debug::Unlock();
00655 }
00656
00657 EntryState dState;
00658
00659 while(toPurge.size() > 0)
00660 {
00661 pfx = toPurge.remhi();
00662
00663 cache->mu.lock();
00664
00665
00666 bool inTbl = cache->mpkTbl->Get(pfx, mpk);
00667 assert(inTbl && (mpk != 0));
00668
00669
00670 SMultiPKFile::VPKFileMap vpks_in_mpk(&(mpk->VPKFileTbl()));
00671
00672 cache->mu.unlock();
00673
00674
00675 SMultiPKFile::VPKFileIter it(&vpks_in_mpk);
00676 FP::Tag pk;
00677 VPKFile *vpk;
00678 while(it.Next( pk, vpk))
00679 {
00680
00681 vpk->mu.lock();
00682
00683
00684
00685 if(vpk->ReadyForPurgeWarm(lastFreeMPKFileEpoch))
00686 {
00687
00688
00689
00690 int sizeHint = vpk->OldEntries()->Size() / 2;
00691
00692
00693
00694 vpk->DeleteOldEntries(sizeHint, dState);
00695 }
00696
00697
00698 vpk->mu.unlock();
00699 }
00700 }
00701
00702
00703
00704 cache->mu.lock();
00705 cache->state += dState;
00706 cache->mu.unlock();
00707
00708
00709 if (cache->debug >= CacheIntf::MPKFileFlush)
00710 {
00711 Debug::Lock();
00712 cout << Debug::Timestamp()
00713 << "FINISHED -- Freeing old entries from unmodified VMultiPKFiles" << endl
00714 << " entries freed = " << -(dState.oldEntryCnt) << endl
00715 << " pickled bytes freed = " << -(dState.oldPklSize)
00716 << endl << endl;
00717 Debug::Unlock();
00718 }
00719 }
00720
00721
00722 if (cache->debug >= CacheIntf::MPKFileFlush)
00723 {
00724 Debug::Lock();
00725 cout << Debug::Timestamp()
00726 << "STARTED -- Freeing empty VPKFiles and VMultiPKFiles"
00727 << endl << endl;
00728 Debug::Unlock();
00729 }
00730
00731
00732
00733
00734
00735 cache->mu.lock();
00736 CacheS::CacheMap vpk_tbl_copy(cache->cache, true);
00737 cache->mu.unlock();
00738
00739
00740 unsigned int num_vpks_freed = 0;
00741 CacheS::CacheMapIter vpk_it(&vpk_tbl_copy);
00742 FP::Tag pk;
00743 VPKFile *vpk;
00744 while(vpk_it.Next( pk, vpk))
00745 {
00746 vpk->mu.lock();
00747
00748
00749
00750
00751 if(vpk->ReadyForEviction(lastFreeMPKFileEpoch))
00752 {
00753 cache->mu.lock();
00754
00755
00756 PKPrefix::T pfx(pk);
00757 VMultiPKFile *mpk;
00758 bool inTbl = cache->mpkTbl->Get(pfx, mpk);
00759 assert(inTbl && (mpk != 0));
00760
00761
00762
00763
00764
00765
00766 if(!mpk->FlushRunning() && !mpk->FlushPending())
00767 {
00768
00769
00770 VPKFile *removed;
00771 inTbl = cache->cache->Delete(pk, removed, false);
00772 assert(inTbl && (removed == vpk));
00773
00774
00775 inTbl = mpk->Delete(pk, removed);
00776 assert(inTbl && (removed == vpk));
00777
00778
00779
00780 vpk->Evict();
00781
00782
00783
00784
00785
00786
00787
00788
00789 if(vpk->IsStableEmpty() && (vpk->NamesEpoch() != 0))
00790 {
00791 inTbl =
00792 cache->evictedNamesEpochs.Put(pk,
00793 vpk->NamesEpoch());
00794 assert(!inTbl);
00795 }
00796
00797
00798
00799
00800 num_vpks_freed++;
00801 }
00802
00803 cache->mu.unlock();
00804 }
00805 vpk->mu.unlock();
00806 }
00807
00808
00809
00810 vpk_tbl_copy.Init();
00811 vpk = 0;
00812
00813
00814
00815
00816
00817 if(num_vpks_freed > 0)
00818 {
00819 cache->mu.lock();
00820 cache->cache->Resize();
00821 cache->mu.unlock();
00822 }
00823
00824
00825
00826
00827 cache->mu.lock();
00828
00829
00830 unsigned int num_vmpks_freed = 0;
00831 it.Reset();
00832 while (it.Next( pfx, mpk))
00833 {
00834
00835
00836
00837
00838 if((mpk->NumVPKFiles() == 0) &&
00839 !mpk->FlushRunning() && !mpk->FlushPending())
00840 {
00841 VMultiPKFile *removed;
00842 bool inTbl = cache->mpkTbl->Delete(pfx, removed, false);
00843 assert(inTbl && (removed == mpk));
00844
00845 num_vmpks_freed++;
00846 }
00847 }
00848
00849
00850 mpk = 0;
00851
00852
00853 if(num_vmpks_freed > 0)
00854 {
00855 cache->mpkTbl->Resize();
00856 }
00857
00858
00859
00860
00861 cache->mu.unlock();
00862
00863
00864 if (cache->debug >= CacheIntf::MPKFileFlush)
00865 {
00866 Debug::Lock();
00867 cout << Debug::Timestamp()
00868 << "FINISHED -- Freeing empty VPKFiles and VMultiPKFiles"
00869 << endl
00870 << " VPKFiles freed = " << num_vpks_freed << endl
00871 << " VMultiPKFiles freed = " << num_vmpks_freed
00872 << endl << endl;
00873 Debug::Unlock();
00874 }
00875 }
00876
00877
00878 }
00879
00880
00881
00882 bool CacheS::WeederRecovering(SRPC *srpc, bool doneMarking) throw ()
00883
00884 {
00885 this->mu.lock();
00886
00887
00888 if (this->weederSRPC != (SRPC *)NULL) {
00889 if (this->weederSRPC->alive()) {
00890 this->mu.unlock();
00891 return true;
00892 }
00893 }
00894
00895
00896 this->weederSRPC = srpc;
00897
00898
00899 try {
00900
00901 assert(this->leases->ExpirationIsEnabled() || !doneMarking);
00902 this->leases->EnableExpiration();
00903
00904
00905 if (!(this->hitFilter.IsEmpty()) && !this->deleting && !doneMarking) {
00906 this->ClearStableHitFilter();
00907 }
00908 } catch (...) { this->mu.unlock(); throw; }
00909 this->mu.unlock();
00910 return false;
00911 }
00912
00913 BitVector* CacheS::StartMark( int &newLogVer) throw ()
00914
00915 {
00916
00917 BitVector *res;
00918 int currChkptVer;
00919 this->mu.lock();
00920 try {
00921 while (this->deleting) {
00922 this->notDeleting.wait(this->mu);
00923 }
00924 res = NEW_CONSTR(BitVector, (&(this->usedCIs)));
00925 this->leases->DisableExpiration();
00926 currChkptVer = this->graphLogChkptVer;
00927 } catch (...) { this->mu.unlock(); throw; }
00928 this->mu.unlock();
00929
00930
00931 try {
00932 this->FlushGraphLog();
00933
00934
00935 if (currChkptVer >= 0) this->AbortGraphLogChkpt();
00936
00937
00938 newLogVer = this->ChkptGraphLog();
00939 assert(newLogVer >= 0);
00940 } catch (const FS::Failure &f) {
00941 cerr << "Fatal FS error: ";
00942 cerr << "while checkpointing \"graphLog\"; exiting..." << endl;
00943 cerr << f << endl;
00944 exit(1);
00945 } catch (const VestaLog::Error &err) {
00946 cerr << "Fatal VestaLog error: ";
00947 cerr << "while flushing \"graphLog\"" << endl
00948 << " " << err.msg;
00949 if(err.r != 0)
00950 {
00951 cerr << " (errno = " << err.r << ")";
00952 }
00953 cerr << endl << "Exiting..." << endl;
00954 exit(1);
00955 }
00956
00957
00958 this->mu.lock();
00959 this->graphLogChkptVer = newLogVer;
00960 this->mu.unlock();
00961
00962 return res;
00963 }
00964
00965 void CacheS::SetHitFilter(const BitVector &cis) throw ()
00966
00967 {
00968 this->mu.lock();
00969 try {
00970 assert(!this->deleting);
00971 this->SetStableHitFilter(cis);
00972 } catch (...) { this->mu.unlock(); throw; }
00973 this->mu.unlock();
00974 }
00975
00976 BitVector* CacheS::GetLeases() throw ()
00977
00978 {
00979 BitVector *res;
00980 this->mu.lock();
00981 try {
00982 res = leases->LeaseSet();
00983 } catch (...) { this->mu.unlock(); throw; }
00984 this->mu.unlock();
00985 return res;
00986 }
00987
00988 void CacheS::ResumeLeaseExp() throw ()
00989
00990 {
00991 this->mu.lock();
00992 try {
00993 leases->EnableExpiration();
00994 } catch (...) { this->mu.unlock(); throw; }
00995 this->mu.unlock();
00996 }
00997
00998 int CacheS::EndMark(const BitVector &cis, const PKPrefix::List &pfxs) throw ()
00999
01000
01001
01002
01003
01004 {
01005 int chkptVer;
01006
01007
01008 assert(!(cis.IsEmpty()));
01009 bool emptyHF, deleting2;
01010 this->mu.lock();
01011 try {
01012 emptyHF = this->hitFilter.IsEmpty();
01013 deleting2 = this->deleting;
01014 chkptVer = this->graphLogChkptVer;
01015 assert(emptyHF || (cis <= this->hitFilter));
01016 } catch (...) { this->mu.unlock(); throw; }
01017 this->mu.unlock();
01018
01019
01020 if (chkptVer < 0) {
01021 this->graphLogMu.lock();
01022 try {
01023 chkptVer = this->graphLog->logVersion();
01024 } catch (const VestaLog::Error &err) {
01025 cerr << "VestaLog fatal error: ";
01026 cerr << "failed calling 'logVersion' in 'CacheS::EndMark" << endl;
01027 cerr << " num = " << err.r << "; " << err.msg << endl;
01028 cerr << "Exiting..." << endl;
01029 exit(1);
01030 }
01031 this->graphLogMu.unlock();
01032 this->mu.lock();
01033 this->graphLogChkptVer = chkptVer;
01034 this->mu.unlock();
01035 }
01036
01037
01038 if (!emptyHF && !deleting2) {
01039 this->mu.lock();
01040 try {
01041
01042 this->SetStableHitFilter(cis);
01043
01044
01045 this->SetMPKsToWeed(pfxs);
01046 this->ResetWeededMPKs();
01047
01048
01049 this->SetStableDeleting(true);
01050 this->doDeleting.signal();
01051 } catch (...) { this->mu.unlock(); throw; }
01052 this->mu.unlock();
01053 }
01054 assert(chkptVer >= 0);
01055 return chkptVer;
01056 }
01057
01058 bool CacheS::CommitChkpt(const Text &chkptFileName) throw ()
01059
01060 {
01061
01062
01063
01064 Text temp_chkptFileName(Config_GraphLogPath);
01065 temp_chkptFileName += '/';
01066 temp_chkptFileName += chkptFileName;
01067
01068
01069 this->mu.lock();
01070 int chkptVer = this->graphLogChkptVer;
01071 this->mu.unlock();
01072
01073
01074
01075
01076 if(chkptVer >= 0)
01077 {
01078
01079
01080 if(chkptFileName[0] == '/')
01081 {
01082 return false;
01083 }
01084
01085
01086 Text final_chkptFileName(Config_GraphLogPath);
01087
01088
01089 char buff[26];
01090 int spres = sprintf(buff, "/%d.ckp", chkptVer);
01091 assert((spres >= 0) && (spres <= 25));
01092 final_chkptFileName += buff;
01093
01094
01095
01096 if((temp_chkptFileName.Length() <= final_chkptFileName.Length()) ||
01097 (temp_chkptFileName.Sub(0, final_chkptFileName.Length())
01098 != final_chkptFileName))
01099 {
01100 return false;
01101 }
01102
01103
01104
01105 if(!FS::Exists(temp_chkptFileName))
01106 {
01107 return false;
01108 }
01109
01110
01111
01112 if(rename(temp_chkptFileName.cchars(),
01113 final_chkptFileName.cchars()) != 0)
01114 {
01115
01116
01117
01118
01119
01120 cerr << "CacheS::CommitChkpt fatal error: rename(2) failed!" << endl;
01121 cerr << " errno = " << errno << endl;
01122 cerr << " from = " << temp_chkptFileName << endl;
01123 cerr << " to = " << final_chkptFileName << endl;
01124 assert(false);
01125 }
01126 else
01127 {
01128
01129 this->graphLogMu.lock();
01130 try {
01131 this->graphLog->checkpointEnd();
01132 this->graphLog->prune( 1);
01133 this->mu.lock();
01134 this->graphLogChkptVer = -1;
01135 this->weederSRPC = NULL;
01136 this->mu.unlock();
01137 } catch (const VestaLog::Error &err) {
01138 cerr << "Fatal VestaLog error: ";
01139 cerr << "while committing \"graphLog\" checkpoint" << endl
01140 << " " << err.msg;
01141 if(err.r != 0)
01142 {
01143 cerr << " (errno = " << err.r << ")";
01144 }
01145 cerr << endl << "Exiting..." << endl;
01146 exit(1);
01147 }
01148 this->graphLogMu.unlock();
01149
01150
01151 return true;
01152 }
01153 }
01154
01155
01156 else
01157 {
01158 try
01159 {
01160 FS::Delete(temp_chkptFileName);
01161 }
01162 catch(FS::Failure)
01163 {
01164
01165 }
01166
01167
01168
01169
01170 }
01171
01172
01173
01174
01175 return false;
01176 }
01177
01178 void *CacheS_DoDeletions(void *arg) throw ()
01179
01180 {
01181 CacheS *cs = (CacheS *)arg;
01182
01183 while (true) {
01184
01185 cs->mu.lock();
01186 while (!(cs->deleting)) {
01187 cs->doDeleting.wait(cs->mu);
01188 }
01189
01190
01191 if (cs->debug >= CacheIntf::WeederOps) {
01192 Debug::Lock();
01193 cout << Debug::Timestamp() << "STARTED -- DoDeletions" << endl;
01194 cout << endl;
01195 Debug::Unlock();
01196 }
01197
01198
01199 int cnt = 0;
01200 while (cs->nextMPKToWeed < cs->mpksToWeed.len) {
01201
01202
01203
01204 int next = cs->nextMPKToWeed;
01205 cs->mu.unlock();
01206 cs->VToSCache(cs->mpksToWeed.pfx[next], &(cs->hitFilter));
01207 cs->mu.lock();
01208 cs->nextMPKToWeed++;
01209
01210
01211 if (++cnt == 10) {
01212 cs->AddToWeededMPKs(cnt);
01213 cnt = 0;
01214 }
01215 }
01216 if (cnt > 0) cs->AddToWeededMPKs(cnt);
01217 cs->mu.unlock();
01218
01219
01220 try {
01221 cs->ChkptUsedCIs(cs->hitFilter);
01222 } catch (const VestaLog::Error &err) {
01223 cerr << "Fatal VestaLog error: ";
01224 cerr << "while checkpointing \"usedCIs\"" << endl
01225 << " " << err.msg;
01226 if(err.r != 0)
01227 {
01228 cerr << " (errno = " << err.r << ")";
01229 }
01230 cerr << endl << "Exiting..." << endl;
01231 exit(1);
01232 } catch (const FS::Failure &f) {
01233 cerr << "Fatal FS error: ";
01234 cerr << "while checkpointing \"usedCIs\"; exiting..." << endl;
01235 cerr << f << endl;
01236 exit(1);
01237 }
01238
01239
01240 cs->mu.lock();
01241 cs->ClearStableHitFilter();
01242
01243
01244 cs->SetStableDeleting(false);
01245 cs->notDeleting.signal();
01246 cs->mu.unlock();
01247
01248
01249 if (cs->debug >= CacheIntf::WeederOps) {
01250 Debug::Lock();
01251 cout << Debug::Timestamp() << "FINISHED -- DoDeletions" << endl;
01252 cout << endl;
01253 Debug::Unlock();
01254 }
01255
01256
01257 cs->cacheLogMu.lock();
01258 try {
01259 cs->TryCleanCacheLog( -1,
01260 "CacheS_DoDeletions thread");
01261 } catch (...) { cs->cacheLogMu.unlock(); throw; }
01262 cs->cacheLogMu.unlock();
01263 }
01264
01265
01266 }
01267
01268
01269
01270 void CacheS::Checkpoint(const FP::Tag &pkgVersion, Model::T model,
01271 CacheEntry::Indices *cis, bool done) throw ()
01272
01273 {
01274
01275 ChkptWorker *ckpt = this->QueueChkpt(pkgVersion, model, cis, done);
01276
01277
01278 if (done) {
01279 ckpt->WaitUntilDone();
01280 this->FinishChkpt(ckpt);
01281 }
01282 }
01283
01284 ChkptWorker *CacheS::QueueChkpt(const FP::Tag &pkgVersion, Model::T model,
01285 CacheEntry::Indices *cis, bool done) throw ()
01286
01287 {
01288 ChkptWorker *res;
01289 this->chkptMu.lock();
01290 try {
01291
01292 if (this->availChkptWorkers != (ChkptWorker *)NULL) {
01293 res = this->availChkptWorkers;
01294 this->availChkptWorkers = res->next;
01295 res->Init(pkgVersion, model, cis, done);
01296 } else {
01297 res = NEW_CONSTR(ChkptWorker, (pkgVersion, model, cis, done));
01298 }
01299
01300
01301 ChkptWorker *last = (ChkptWorker *)NULL;
01302 ChkptWorker *curr = this->queuedChkptWorkers;
01303 while (curr != (ChkptWorker *)NULL) {
01304 last = curr; curr = curr->next;
01305 }
01306 if (last == (ChkptWorker *)NULL)
01307 this->queuedChkptWorkers = res;
01308 else
01309 last->next = res;
01310 } catch (...) { this->chkptMu.unlock(); throw; }
01311 this->chkptMu.unlock();
01312
01313
01314 this->waitingChkptWorker.signal();
01315 return res;
01316 }
01317
01318 void CacheS::FinishChkpt(ChkptWorker *cw) throw ()
01319
01320 {
01321 this->chkptMu.lock();
01322 try {
01323
01324 cw->Reset();
01325 cw->next = this->availChkptWorkers;
01326 this->availChkptWorkers = cw;
01327 } catch (...) { this->chkptMu.unlock(); throw; }
01328 this->chkptMu.unlock();
01329 }
01330
01331 void *ChkptWorker::MainLoop(void *arg) throw ()
01332
01333 {
01334 CacheS *cache = (CacheS *)arg;
01335 while (true) {
01336
01337 cache->chkptMu.lock();
01338 while (cache->queuedChkptWorkers == (ChkptWorker *)NULL) {
01339 cache->waitingChkptWorker.wait(cache->chkptMu);
01340 }
01341 ChkptWorker *curr = cache->queuedChkptWorkers;
01342
01343 while (!(curr->done) && curr->next != (ChkptWorker *)NULL) {
01344
01345 ChkptWorker *avail = curr;
01346 curr = curr->next;
01347 avail->Reset();
01348 avail->next = cache->availChkptWorkers;
01349 cache->availChkptWorkers = avail;
01350 }
01351 assert(curr != (ChkptWorker *)NULL);
01352 cache->queuedChkptWorkers = curr->next;
01353 curr->next = (ChkptWorker *)NULL;
01354 cache->chkptMu.unlock();
01355
01356
01357 cache->DoCheckpoint(curr->pkgVersion, curr->model,
01358 curr->cis, curr->done);
01359
01360
01361 if (curr->done) {
01362
01363 curr->mu.lock();
01364 curr->chkptComplete = true;
01365 curr->mu.unlock();
01366 curr->isDone.signal();
01367 } else {
01368
01369 cache->FinishChkpt(curr);
01370 }
01371 }
01372
01373
01374 }
01375
01376 void CacheS::DoCheckpoint(const FP::Tag &pkgVersion, Model::T model,
01377 CacheEntry::Indices *cis, bool done) throw ()
01378
01379 {
01380
01381 try {
01382 this->FlushCacheLog();
01383 }
01384 catch (const VestaLog::Error &err) {
01385 cerr << "Fatal VestaLog error: ";
01386 cerr << "while flushing \"cacheLog\"" << endl
01387 << " " << err.msg;
01388 if(err.r != 0)
01389 {
01390 cerr << " (errno = " << err.r << ")";
01391 }
01392 cerr << endl << "Exiting..." << endl;
01393 exit(1);
01394 }
01395
01396
01397 GraphLog::Root root(pkgVersion, model, cis, done);
01398
01399 int i;
01400 this->mu.lock();
01401
01402
01403 for(i = 0; i < cis->len; i++) {
01404 if(!this->leases->IsLeased(cis->index[i])) break;
01405 }
01406 this->mu.unlock();
01407 if(i < cis->len) {
01408
01409 Debug::Lock();
01410 cerr << Debug::Timestamp()\
01411 << "Checkpint rejected: ci " << cis->index[i]
01412 << " is not leased." << endl << endl;
01413
01414 root.DebugFull(cerr);
01415 Debug::Unlock();
01416 }
01417 else {
01418
01419 if (this->debug >= CacheIntf::LogFlush) {
01420 Debug::Lock();
01421 try {
01422 cout << Debug::Timestamp() << "Writing root to GraphLog:" << endl;
01423 root.Debug(cout);
01424 cout << endl;
01425 }
01426 catch (...) { Debug::Unlock(); throw; }
01427 Debug::Unlock();
01428 }
01429
01430
01431 this->graphLogMu.lock();
01432 try {
01433 this->graphLog->start();
01434 root.Log(*(this->graphLog));
01435 this->graphLog->commit();
01436 }
01437 catch (const VestaLog::Error &err) {
01438 cerr << "Fatal VestaLog error: ";
01439 cerr << "writing root node to \"graphLog\"" << endl
01440 << " " << err.msg;
01441 if(err.r != 0)
01442 {
01443 cerr << " (errno = " << err.r << ")";
01444 }
01445 cerr << endl << "Exiting..." << endl;
01446 exit(1);
01447 }
01448 catch (...) { this->graphLogMu.unlock(); throw; }
01449 this->graphLogMu.unlock();
01450 }
01451 }
01452
01453 void ChkptWorker::Init(const FP::Tag &pkgVersion, Model::T model,
01454 CacheEntry::Indices *cis, bool done) throw ()
01455 {
01456 this->next = (ChkptWorker *)NULL;
01457 this->pkgVersion = pkgVersion;
01458 this->model = model;
01459 this->cis = cis;
01460 this->done = done;
01461 this->chkptComplete = false;
01462 }
01463
01464 void ChkptWorker::Reset() throw ()
01465 {
01466 this->cis = (CacheEntry::Indices *)NULL;
01467 }
01468
01469 void ChkptWorker::WaitUntilDone() throw ()
01470
01471 {
01472 this->mu.lock();
01473 while (!(this->chkptComplete)) {
01474 this->isDone.wait(this->mu);
01475 }
01476 this->mu.unlock();
01477 }
01478
01479 void CacheS::FlushAll() throw ()
01480
01481
01482
01483 {
01484
01485 this->mu.lock();
01486 int mpkCnt = this->mpkTbl->Size();
01487 this->mu.unlock();
01488
01489
01490 if (mpkCnt == 0) return;
01491
01492
01493 PKPrefix::T *toFlush = NEW_PTRFREE_ARRAY(PKPrefix::T, mpkCnt);
01494 this->mu.lock();
01495 try {
01496
01497 MPKIter it(this->mpkTbl);
01498 PKPrefix::T pfx; VMultiPKFile *mpk;
01499 int i;
01500 for (i = 0; it.Next( pfx, mpk); i++) {
01501 assert(i < mpkCnt);
01502 toFlush[i] = pfx;
01503 }
01504
01505
01506 for (i = 0; i < mpkCnt; i++) {
01507 this->FlushMPKFile(toFlush[i],
01508 "CacheS::FlushAll called", true);
01509 }
01510
01511
01512 while (this->numActiveFlushWorkers > 0) {
01513 this->allFlushWorkersDone.wait(this->mu);
01514 }
01515 } catch (...) { this->mu.unlock(); throw; }
01516 this->mu.unlock();
01517
01518
01519 this->cacheLogMu.lock();
01520 try {
01521 this->TryCleanCacheLog( -1,
01522 "CacheS::FlushAll called");
01523 } catch (...) { this->cacheLogMu.unlock(); throw; }
01524 this->cacheLogMu.unlock();
01525 }
01526
01527 void CacheS::GetCacheId( CacheId &id) throw ()
01528
01529 {
01530 char name[MAXHOSTNAMELEN];
01531 int res = gethostname(name, MAXHOSTNAMELEN);
01532 id.host = (res == 0) ? Text(name) : Text("<unknown host>");
01533 id.port = Config_Port;
01534 id.stableDir = Config_SCachePath;
01535 id.cacheVersion = VCacheVersion;
01536 id.intfVersion = CacheIntf::Version;
01537 this->mu.lock();
01538 id.startTime = this->startTime;
01539 this->mu.unlock();
01540 }
01541
01542 void CacheS::GetCacheState( CacheState &state) throw ()
01543
01544 {
01545 unsigned long total, resident;
01546 OS::GetProcessSize(total, resident);
01547 state.virtualSize = total;
01548 state.physicalSize = resident;
01549
01550 this->mu.lock();
01551 state.vmpkCnt = this->mpkTbl->Size();
01552 state.vpkCnt = this->cache->Size();
01553 state.entryCnt = this->entryCnt;
01554 state.cnt = this->cnt;
01555 state.s = this->state;
01556 state.hitFilterCnt = this->hitFilter.Cardinality();
01557 if (this->deleting) {
01558 state.delEntryCnt = state.hitFilterCnt;
01559 state.mpkWeedCnt = this->mpksToWeed.len - this->nextMPKToWeed;
01560 } else {
01561 state.delEntryCnt = 0;
01562 state.mpkWeedCnt = 0;
01563 }
01564 this->mu.unlock();
01565 }
01566
01567 const FP::Tag &CacheS::GetCacheInstance()
01568 const
01569 throw ()
01570
01571
01572 {
01573 return this->instanceFp;
01574 }
01575
01576 bool CacheS::RenewLeases(CacheEntry::Indices *cis) throw ()
01577
01578 {
01579 bool res = true;
01580 this->mu.lock();
01581 try {
01582 for (int i = 0; i < cis->len; i++) {
01583 CacheEntry::Index ci = cis->index[i];
01584 if (this->usedCIs.Read(ci)) {
01585 try {
01586 this->leases->RenewLease(ci);
01587 } catch (Leases::NoLease) {
01588 res = false;
01589 }
01590 } else {
01591 res = false;
01592 }
01593 }
01594 } catch (...) { this->mu.unlock(); throw; }
01595 this->mu.unlock();
01596 return res;
01597 }
01598
01599 void CacheS::VToSCache(const PKPrefix::T &pfx, const BitVector *toDelete)
01600 throw ()
01601
01602
01603
01604 {
01605
01606 VMultiPKFile *mpk = (VMultiPKFile *)NULL;
01607 this->mu.lock();
01608 try {
01609 if (!this->mpkTbl->Get(pfx, mpk)) {
01610
01611
01612
01613 if (toDelete != (BitVector *)NULL) {
01614
01615 mpk = NEW_CONSTR(VMultiPKFile, (pfx));
01616 bool inMPK = this->mpkTbl->Put(pfx, mpk); assert(!inMPK);
01617 }
01618 } else {
01619 assert((mpk != NULL) && (mpk->Prefix() == pfx));
01620 }
01621 } catch (...) { this->mu.unlock(); throw; }
01622
01623
01624 if (mpk == (VMultiPKFile *)NULL)
01625 {
01626 this->mu.unlock();
01627 return;
01628 }
01629
01630
01631
01632 if(mpk->LockForWrite(this->mu, toDelete))
01633 {
01634 this->mu.unlock();
01635
01636
01637 if (this->debug >= CacheIntf::MPKFileFlush) {
01638 Debug::Lock();
01639 cout << Debug::Timestamp() << "STARTED -- Flushing MPKFile" << endl;
01640 cout << " prefix = " << pfx << endl;
01641 cout << endl;
01642 Debug::Unlock();
01643 }
01644
01645
01646 try {
01647
01648
01649
01650
01651
01652
01653
01654 ifstream mpkfile_ifs;
01655 SMultiPKFileRep::Header *mpkfile_hdr;
01656 bool mpkFileExists =
01657 SMultiPKFile::PrepareForRewrite(pfx, mpkfile_ifs, mpkfile_hdr);
01658
01659
01660 for (unsigned int i = 0; i < mpkfile_hdr->num; i++)
01661 {
01662 FP::Tag *pki = mpkfile_hdr->pkSeq[i];
01663 assert(PKPrefix::T(*pki) == pfx);
01664
01665 VPKFile *vpk;
01666
01667
01668
01669 this->mu.lock();
01670 if(!this->FindVPKFile(*pki, vpk))
01671 {
01672
01673 bool inCache = cache->Put(*pki, vpk); assert(!inCache);
01674 bool inVMPK = mpk->Put(*pki, vpk); assert(!inVMPK);
01675 }
01676 this->mu.unlock();
01677 }
01678
01679
01680
01681
01682
01683
01684
01685
01686
01687 SMultiPKFile::VPKFileMap vpksToFlush;
01688 SMultiPKFile::ChkPtTbl vpkChkptTbl;
01689 if(mpk->ChkptForWrite(this->mu, toDelete, vpksToFlush, vpkChkptTbl))
01690 {
01691
01692
01693 try
01694 {
01695 this->FlushGraphLog();
01696 }
01697
01698 catch(...)
01699 {
01700
01701
01702 mpk->ReleaseWriteLock(this->mu);
01703
01704
01705 if(mpkFileExists) FS::Close(mpkfile_ifs);
01706
01707 throw;
01708 }
01709
01710
01711 EntryState dState;
01712 mpk->ToSCache(this->mu,
01713
01714
01715 mpkFileExists, mpkfile_ifs, mpkfile_hdr,
01716
01717
01718 vpksToFlush, vpkChkptTbl,
01719
01720 toDelete, this->emptyPKLog,
01721 dState);
01722
01723
01724
01725
01726
01727 this->mu.lock();
01728 this->state += dState;
01729 this->mu.unlock();
01730 }
01731 else
01732 {
01733
01734
01735 if(mpkFileExists) FS::Close(mpkfile_ifs);
01736 }
01737 }
01738 catch (const VestaLog::Error &e) {
01739
01740 cerr << "VestaLog::Error:" << endl;
01741 cerr << " " << e.msg;
01742 if (e.r != 0) cerr << ", errno = " << e.r;
01743 cerr << endl;
01744 assert(false);
01745 }
01746 catch (const FS::Failure &f) {
01747
01748 cerr << f;
01749 assert(false);
01750 }
01751 catch (const FS::EndOfFile &f) {
01752
01753 cerr << "CacheS::VToSCache: Unexpected end of file" << endl;
01754 assert(false);
01755 }
01756
01757
01758 if (this->debug >= CacheIntf::MPKFileFlush) {
01759 Debug::Lock();
01760 cout << Debug::Timestamp() << "FINISHED -- Flushing MPKFile" << endl;
01761 cout << " prefix = " << pfx << endl;
01762 cout << endl;
01763 Debug::Unlock();
01764 }
01765 }
01766
01767
01768 else
01769 {
01770 this->mu.unlock();
01771 }
01772 }
01773
01774 void CacheS::Recover() throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
01775
01776 {
01777 this->RecoverCILog();
01778 this->RecoverDeleting();
01779 this->RecoverHitFilter();
01780 this->mu.lock();
01781 bool localDeleting;
01782 localDeleting = this->deleting;
01783 this->mu.unlock();
01784 if (localDeleting) {
01785
01786
01787 this->RecoverMPKsToWeed();
01788 }
01789 this->RecoverWeededMPKs();
01790 this->RecoverGraphLog();
01791 this->RecoverCacheLog();
01792 }
01793
01794
01795
01796 void CacheS::RecoverDeleting() throw (FS::Failure)
01797
01798 {
01799 ifstream ifs;
01800 bool delVal;
01801 try {
01802 FS::OpenReadOnly(Config_DeletingFile, ifs);
01803 this->mu.lock();
01804 try {
01805 FS::Read(ifs, (char *)(&(this->deleting)), sizeof(this->deleting));
01806 delVal = this->deleting;
01807 } catch (...) {
01808 this->mu.unlock();
01809 FS::Close(ifs);
01810 throw;
01811 }
01812 this->mu.unlock();
01813 FS::Close(ifs);
01814 }
01815 catch (FS::DoesNotExist) {
01816 this->mu.lock();
01817 delVal = this->deleting = false;
01818 this->mu.unlock();
01819 }
01820 catch (FS::EndOfFile) {
01821
01822 assert(false);
01823 }
01824
01825
01826 if (this->debug >= CacheIntf::LogRecover) {
01827 Debug::Lock();
01828 cout << Debug::Timestamp() << "RECOVERED -- Deleting" << endl;
01829 cout << " deleting = " << BoolName[delVal] << endl;
01830 cout << endl;
01831 Debug::Unlock();
01832 }
01833 }
01834
01835 void CacheS::SetStableDeleting(bool del) throw ()
01836
01837 {
01838
01839 this->deleting = del;
01840
01841
01842 AtomicFile ofs;
01843 try {
01844 ofs.open(Config_DeletingFile.chars(), ios::out);
01845 if (!ofs) {
01846 throw (FS::Failure(Text("CacheS::SetStableDeleting"),
01847 Config_DeletingFile));
01848 }
01849 FS::Write(ofs, (char *)(&(this->deleting)), sizeof(this->deleting));
01850 FS::Close(ofs);
01851 }
01852 catch (const FS::Failure &f) {
01853 cerr << "unexpected FS error saving \"deleting\"; exiting..." << endl;
01854 cerr << f << endl;
01855 exit(1);
01856 }
01857 }
01858
01859 void CacheS::RecoverHitFilter() throw (FS::Failure)
01860
01861 {
01862 ifstream ifs;
01863 try {
01864 FS::OpenReadOnly(Config_HitFilterFile, ifs);
01865 this->mu.lock();
01866 try {
01867 this->hitFilter.Read(ifs);
01868 } catch (...) {
01869 this->mu.unlock();
01870 FS::Close(ifs);
01871 throw;
01872 }
01873 this->mu.unlock();
01874 FS::Close(ifs);
01875 }
01876 catch (FS::DoesNotExist) {
01877
01878 }
01879 catch (FS::EndOfFile) {
01880
01881 assert(false);
01882 }
01883
01884
01885 if (this->debug >= CacheIntf::LogRecover) {
01886 this->mu.lock();
01887 try {
01888 Debug::Lock();
01889 try {
01890 cout << Debug::Timestamp() << "RECOVERED -- HitFilter" << endl;
01891 cout << " hitFilter = " << this->hitFilter << endl << endl;
01892 } catch (...) { Debug::Unlock(); throw; }
01893 Debug::Unlock();
01894 } catch (...) { this->mu.unlock(); throw; }
01895 this->mu.unlock();
01896 }
01897 }
01898
01899 void CacheS::WriteHitFilter() throw ()
01900
01901 {
01902
01903
01904 AtomicFile ofs;
01905 try {
01906 ofs.open(Config_HitFilterFile.chars(), ios::out);
01907 if (ofs.fail()) {
01908 throw (FS::Failure(Text("CacheS::WriteHitFilter"),
01909 Config_HitFilterFile));
01910 }
01911 this->hitFilter.Write(ofs);
01912 FS::Close(ofs);
01913 }
01914 catch (const FS::Failure &f) {
01915 cerr << "unexpected FS error saving \"hitFilter\"; exiting..." << endl;
01916 cerr << f;
01917 exit(1);
01918 }
01919 }
01920
01921 void CacheS::ClearStableHitFilter() throw ()
01922
01923 {
01924 this->hitFilter.ResetAll( true);
01925 this->WriteHitFilter();
01926 }
01927
01928 void CacheS::SetStableHitFilter(const BitVector &hf) throw ()
01929
01930 {
01931 this->hitFilter = hf;
01932 this->WriteHitFilter();
01933 }
01934
01935 void CacheS::SetMPKsToWeed(const PKPrefix::List &pfxs) throw ()
01936
01937 {
01938
01939 this->mpksToWeed = pfxs;
01940
01941
01942 AtomicFile ofs;
01943 try {
01944 ofs.open(Config_MPKsToWeedFile.chars(), ios::out);
01945 if (ofs.fail()) {
01946 throw (FS::Failure(Text("CacheS::SetMPKsToWeed"),
01947 Config_MPKsToWeedFile));
01948 }
01949 this->mpksToWeed.Write(ofs);
01950 FS::Close(ofs);
01951 }
01952 catch (const FS::Failure &f) {
01953 cerr << "unexpected FS error saving \"mpksToWeed\"; exiting..." <<endl;
01954 cerr << f;
01955 exit(1);
01956 }
01957 }
01958
01959 void CacheS::RecoverMPKsToWeed() throw (FS::Failure)
01960
01961 {
01962 ifstream ifs;
01963 try {
01964 FS::OpenReadOnly(Config_MPKsToWeedFile, ifs);
01965 this->mu.lock();
01966 try {
01967 mpksToWeed.Read(ifs);
01968 } catch (...) {
01969 this->mu.unlock();
01970 FS::Close(ifs);
01971 throw;
01972 }
01973 this->mu.unlock();
01974 FS::Close(ifs);
01975 }
01976 catch (FS::DoesNotExist) {
01977
01978 }
01979 catch (FS::EndOfFile) {
01980
01981 assert(false);
01982 }
01983
01984
01985 if (this->debug >= CacheIntf::LogRecover) {
01986 this->mu.lock();
01987 try {
01988 Debug::Lock();
01989 try {
01990 cout << Debug::Timestamp() << "RECOVERED -- MPKsToWeed" <<endl;
01991 if (this->mpksToWeed.len > 0) {
01992 for (int i = 0; i < this->mpksToWeed.len; i++) {
01993 cout << " pfx[" << i << "] = "
01994 << this->mpksToWeed.pfx[i] << endl;
01995 }
01996 } else {
01997 cout << " <<Empty>>" << endl;
01998 }
01999 cout << endl;
02000 } catch (...) { Debug::Unlock(); throw; }
02001 Debug::Unlock();
02002 } catch (...) { this->mu.unlock(); throw; }
02003 this->mu.unlock();
02004 }
02005 }
02006
02007 static int CacheS_ReadLogVersion(const Text &dirName)
02008 throw (FS::Failure, FS::DoesNotExist)
02009
02010
02011 {
02012 ifstream ifs;
02013 FS::OpenReadOnly(dirName + "/version", ifs);
02014 int res;
02015 ifs >> res;
02016 assert(ifs.good());
02017 FS::Close(ifs);
02018 return res;
02019 }
02020
02021 void CacheS::ResetWeededMPKs() throw ()
02022
02023 {
02024
02025 this->nextMPKToWeed = 0;
02026 assert(this->weededMPKsLog != (VestaLog *)NULL);
02027 try {
02028 fstream *chkptFile = weededMPKsLog->checkpointBegin();
02029
02030 FS::Close(*chkptFile);
02031 this->weededMPKsLog->checkpointEnd();
02032 this->weededMPKsLog->prune( 1);
02033 } catch (const FS::Failure &f) {
02034 cerr << "Fatal FS error: ";
02035 cerr << "while checkpointing \"weededMPKsLog\"; exiting..." << endl;
02036 cerr << f << endl;
02037 exit(1);
02038 }
02039 catch (const VestaLog::Error &err) {
02040 cerr << "Fatal VestaLog error: ";
02041 cerr << "reseting \"weededMPKsLog\"" << endl
02042 << " " << err.msg;
02043 if(err.r != 0)
02044 {
02045 cerr << " (errno = " << err.r << ")";
02046 }
02047 cerr << endl << "Exiting..." << endl;
02048 exit(1);
02049 }
02050 }
02051
02052 void CacheS::AddToWeededMPKs(int num) throw ()
02053
02054 {
02055 try {
02056 this->weededMPKsLog->start();
02057 this->weededMPKsLog->write((char *)(&num), sizeof(num));
02058 this->weededMPKsLog->commit();
02059 }
02060 catch (const VestaLog::Error &err) {
02061 cerr << "Fatal VestaLog error: ";
02062 cerr << "adding entry to \"weededMPKsLog\"" << endl
02063 << " " << err.msg;
02064 if(err.r != 0)
02065 {
02066 cerr << " (errno = " << err.r << ")";
02067 }
02068 cerr << endl << "Exiting..." << endl;
02069 exit(1);
02070 }
02071 }
02072
02073 void CacheS::RecoverWeededMPKs() throw (VestaLog::Eof, VestaLog::Error)
02074
02075 {
02076 this->mu.lock();
02077 try {
02078
02079 this->weededMPKsLog = NEW(VestaLog);
02080 this->nextMPKToWeed = 0;
02081 this->weededMPKsLog->open(Config_WeededLogPath.chars());
02082
02083
02084 do {
02085 while (!this->weededMPKsLog->eof()) {
02086 int delta;
02087 this->weededMPKsLog->readAll((char *)(&delta), sizeof(delta));
02088 this->nextMPKToWeed += delta;
02089 }
02090 } while (this->weededMPKsLog->nextLog());
02091 this->weededMPKsLog->loggingBegin();
02092 } catch (...) { this->mu.lock(); throw; }
02093 this->mu.unlock();
02094
02095
02096 if (this->debug >= CacheIntf::LogRecover) {
02097 this->mu.lock();
02098 try {
02099 Debug::Lock();
02100 try {
02101 cout << Debug::Timestamp() << "RECOVERED -- WeededLog" << endl;
02102 if (this->nextMPKToWeed > 0) {
02103 cout << " Weeded MultiPKFiles = " << this->nextMPKToWeed;
02104 } else {
02105 cout << " <<Empty>>";
02106 }
02107 cout << endl << endl;
02108 } catch (...) { Debug::Unlock(); throw; }
02109 Debug::Unlock();
02110 } catch (...) { this->mu.unlock(); throw; }
02111 this->mu.unlock();
02112 }
02113 }
02114
02115
02116
02117 void CacheS::LogCacheEntry(const Text& sourceFunc, FP::Tag *pk,
02118 PKFile::Epoch pkEpoch,
02119 CacheEntry::Index ci, VestaVal::T *value,
02120 Model::T model, CacheEntry::Indices *kids,
02121 FV::List *names, FP::List *fps)
02122 throw ()
02123
02124 {
02125 CacheLog::Entry *logEntry;
02126
02127
02128 if (this->vCacheAvail == (CacheLog::Entry *)NULL) {
02129 logEntry = NEW_CONSTR(CacheLog::Entry, (sourceFunc, pk, pkEpoch,
02130 ci, value, model, kids,
02131 names, fps));
02132 } else {
02133 logEntry = this->vCacheAvail;
02134 this->vCacheAvail = logEntry->next;
02135
02136 logEntry->Init(sourceFunc, pk, pkEpoch,
02137 ci, value, model, kids, names, fps);
02138 }
02139
02140
02141 if (this->vCacheLogTail == (CacheLog::Entry *)NULL)
02142 this->vCacheLog = logEntry;
02143 else
02144 this->vCacheLogTail->next = logEntry;
02145 this->vCacheLogTail = logEntry;
02146 }
02147
02148 void CacheS::FlushCacheLog() throw (VestaLog::Error)
02149
02150 {
02151 CacheLog::Entry *vLog, *curr;
02152
02153
02154 this->mu.lock();
02155 vLog = this->vCacheLog;
02156 this->vCacheLog = this->vCacheLogTail = (CacheLog::Entry *)NULL;
02157 this->cacheLogFlushQ->Enqueue();
02158 this->mu.unlock();
02159
02160
02161 this->FlushGraphLog();
02162
02163
02164
02165
02166
02167 if (vLog == (CacheLog::Entry *)NULL)
02168 {
02169 this->mu.lock();
02170 this->cacheLogFlushQ->Dequeue();
02171 this->mu.unlock();
02172 return;
02173 }
02174
02175
02176 if (this->debug >= CacheIntf::LogFlush) {
02177 Debug::Lock();
02178 cout << Debug::Timestamp() << "STARTED -- Flushing CacheLog" << endl;
02179 cout << endl;
02180 Debug::Unlock();
02181 }
02182
02183
02184 CacheLog::Entry *last;
02185 int numFlushed = 0;
02186 this->cacheLogMu.lock();
02187 try {
02188 this->cacheLog->start();
02189 for (curr = vLog; curr != NULL; curr = curr->next) {
02190 last = curr;
02191 if (numFlushed % 10 == 0 && numFlushed > 0) {
02192
02193
02194 this->cacheLog->commit();
02195 this->cacheLog->start();
02196 }
02197 curr->Log(*(this->cacheLog));
02198 numFlushed++;
02199 if (this->debug >= CacheIntf::LogFlushEntries) {
02200 Debug::Lock();
02201 curr->Debug(cout);
02202 Debug::Unlock();
02203 }
02204 curr->Reset();
02205
02206 }
02207 this->cacheLog->commit();
02208
02209
02210 this->cacheLogLen += numFlushed;
02211 this->TryCleanCacheLog(Config_MaxCacheLogCnt,
02212 "CacheS::FlushCacheLog called");
02213 } catch (...) { this->cacheLogMu.unlock(); throw; }
02214 this->cacheLogMu.unlock();
02215
02216
02217 if (this->debug >= CacheIntf::LogFlushEntries) {
02218 Debug::Lock();
02219 cout << endl;
02220 Debug::Unlock();
02221 }
02222 if (this->debug >= CacheIntf::LogFlush) {
02223 Debug::Lock();
02224 cout << Debug::Timestamp() << "FINISHED -- Flushing CacheLog" << endl;
02225 cout << " Entries flushed = " << numFlushed << endl << endl;
02226 Debug::Unlock();
02227 }
02228
02229
02230 this->mu.lock();
02231 last->next = this->vCacheAvail;
02232 this->vCacheAvail = vLog;
02233 this->cacheLogFlushQ->Dequeue();
02234 this->mu.unlock();
02235 }
02236
02237 void CacheS::CleanCacheLogEntries(RecoveryReader &rd, fstream &ofs,
02238 EmptyPKLog::PKEpochTbl &pkEpochTbl,
02239 int &oldCnt, int &newCnt)
02240 throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02241
02242 {
02243 while (!rd.eof()) {
02244
02245 CacheLog::Entry logEntry(rd);
02246 FP::Tag *pk = logEntry.pk;
02247 PKFile::Epoch pkEpoch;
02248 oldCnt++;
02249 this->mu.lock();
02250 try {
02251
02252 if (!pkEpochTbl.Get(*pk, pkEpoch)) {
02253
02254 pkEpoch = PKFileEpoch(*pk);
02255 bool inTbl = pkEpochTbl.Put(*pk, pkEpoch); assert(!inTbl);
02256 }
02257
02258
02259 if (pkEpoch == 0) {
02260 (void) this->emptyPKLog->GetEpoch0(*pk, pkEpoch);
02261 }
02262
02263
02264 if (logEntry.pkEpoch >= pkEpoch) {
02265 logEntry.Write(ofs);
02266 newCnt++;
02267 }
02268 } catch (...) { this->mu.unlock(); throw; }
02269 this->mu.unlock();
02270 }
02271 }
02272
02273 void CacheS::CleanCacheLog()
02274 throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02275
02276 {
02277
02278 int oldCnt = 0, newCnt = 0;
02279
02280
02281 if (this->debug >= CacheIntf::LogFlush) {
02282 Debug::Lock();
02283 cout << Debug::Timestamp() << "STARTED -- CleanCacheLog" << endl;
02284 cout << endl;
02285 Debug::Unlock();
02286 }
02287
02288
02289
02290
02291
02292 this->emptyPKLog->CheckpointBegin();
02293 fstream *cacheLogChkpt;
02294 int newLogVersion;
02295 this->cacheLogMu.lock();
02296 try {
02297 cacheLogChkpt = this->cacheLog->checkpointBegin();
02298 newLogVersion = this->cacheLog->logVersion();
02299 } catch (...) { this->cacheLogMu.unlock(); throw; }
02300 this->cacheLogMu.unlock();
02301
02302
02303
02304
02305 RecoveryReader *rd;
02306 VestaLogSeq currCacheLogSeq(Config_CacheLogPath.chars());
02307 currCacheLogSeq.Open( -1, true);
02308
02309
02310
02311
02312
02313
02314 EmptyPKLog::PKEpochTbl pkEpochTbl;
02315
02316
02317 try {
02318 while ((rd = currCacheLogSeq.Next(newLogVersion)) != NULL) {
02319 this->CleanCacheLogEntries(*rd, *cacheLogChkpt,
02320 pkEpochTbl, oldCnt, newCnt);
02321 }
02322 } catch (...) { currCacheLogSeq.Close(); throw; }
02323 currCacheLogSeq.Close();
02324
02325
02326 FS::Close(*cacheLogChkpt);
02327
02328
02329 this->cacheLogMu.lock();
02330 try {
02331 this->cacheLog->checkpointEnd();
02332 this->cacheLog->prune( 1);
02333 this->emptyPKLog->CheckpointEnd();
02334 } catch (...) { this->cacheLogMu.unlock(); throw; }
02335 this->cacheLogMu.unlock();
02336
02337
02338 if (this->debug >= CacheIntf::LogFlush) {
02339 Debug::Lock();
02340 cout << Debug::Timestamp() << "FINISHED -- CleanCacheLog" << endl;
02341 cout << " old log size = " << oldCnt << endl;
02342 cout << " new log size = " << newCnt << endl;
02343 cout << endl;
02344 Debug::Unlock();
02345 }
02346 }
02347
02348 void* CacheS_CleanCacheLogWorker(void *arg) throw ()
02349
02350
02351
02352 {
02353 CleanWorker *cw = (CleanWorker *)arg;
02354 try {
02355 while (true) {
02356
02357 cw->mu.lock();
02358 while (!(cw->argsReady)) {
02359 cw->workToDo.wait(cw->mu);
02360 }
02361 cw->mu.unlock();
02362
02363
02364 cw->cs->CleanCacheLog();
02365
02366
02367 cw->Finish();
02368 cw->cs->RegisterIdleCleanWorker(cw);
02369 }
02370 }
02371 catch (const VestaLog::Error &err) {
02372 cerr << "VestaLog fatal error: ";
02373 cerr << "failed cleaning cache log; exiting..." << endl;
02374 cerr << "num = " << err.r << "; " << err.msg << endl;
02375 exit(1);
02376 }
02377 catch (VestaLog::Eof) {
02378 cerr << "VestaLog fatal error: ";
02379 cerr << "unexpected EOF cleaning cache log; exiting..." << endl;
02380 exit(1);
02381 }
02382 catch (const FS::Failure &f) {
02383 cerr << "FS fatal error: ";
02384 cerr << "unexpected FS error cleaning cache log; exiting..." << endl;
02385 cerr << f;
02386 exit(1);
02387 }
02388
02389 assert(false);
02390 return (void *)NULL;
02391 }
02392
02393 CleanWorker *CacheS::NewCleanWorker() throw ()
02394
02395 {
02396
02397 while (this->idleCleanWorker == NULL) {
02398 this->availCleanWorker.wait(this->cacheLogMu);
02399 }
02400 CleanWorker *res = this->idleCleanWorker;
02401 this->idleCleanWorker = NULL;
02402 return res;
02403 }
02404
02405 void CacheS::RegisterIdleCleanWorker(CleanWorker *cw) throw ()
02406
02407 {
02408 this->cacheLogMu.lock();
02409 assert(this->idleCleanWorker == NULL);
02410 this->idleCleanWorker = cw;
02411 this->cacheLogMu.unlock();
02412 this->availCleanWorker.signal();
02413 }
02414
02415 void CacheS::TryCleanCacheLog(int upper_bound, const char *reason) throw ()
02416
02417 {
02418
02419 if (this->cacheLogLen > upper_bound) {
02420 CleanWorker *worker = this->NewCleanWorker();
02421 this->cacheLogLen = 0;
02422 worker->Start(reason);
02423 }
02424 }
02425
02426 void CacheS::RecoverEmptyPKLog( bool &empty)
02427 throw (VestaLog::Error, VestaLog::Eof)
02428
02429 {
02430 while (!this->emptyPKLog->EndOfFile()) {
02431
02432 FP::Tag pk; PKFile::Epoch logPKEpoch;
02433 this->emptyPKLog->Read( pk, logPKEpoch);
02434 assert(logPKEpoch > 0);
02435
02436
02437 if (this->debug >= CacheIntf::LogRecover) {
02438 Debug::Lock();
02439 cout << " pk = " << pk;
02440 cout << ", pkEpoch = " << logPKEpoch << endl;
02441 empty = false;
02442 Debug::Unlock();
02443 }
02444 }
02445 }
02446
02447 void CacheS::RecoverCacheLogEntries(RecoveryReader &rd,
02448 EmptyPKLog::PKEpochTbl &pkEpochTbl, bool &empty)
02449 throw (VestaLog::Error, VestaLog::Eof)
02450
02451 {
02452 while (!rd.eof()) {
02453
02454 CacheLog::Entry logEntry(rd);
02455 this->cacheLogLen++;
02456
02457
02458
02459
02460
02461
02462
02463
02464
02465
02466
02467 FP::Tag *pk = logEntry.pk;
02468 PKFile::Epoch pkEpoch;
02469 this->mu.lock();
02470 try {
02471
02472
02473
02474
02475
02476
02477
02478
02479
02480
02481 if (!pkEpochTbl.Get(*pk, pkEpoch)) {
02482
02483 pkEpoch = this->PKFileEpoch(*pk);
02484 bool inTbl = pkEpochTbl.Put(*pk, pkEpoch); assert(!inTbl);
02485 }
02486
02487
02488 if (pkEpoch == 0) {
02489 (void) this->emptyPKLog->GetEpoch0(*pk, pkEpoch);
02490 }
02491 } catch (...) { this->mu.unlock(); throw; }
02492 this->mu.unlock();
02493
02494
02495 if (logEntry.pkEpoch >= pkEpoch) {
02496
02497 VPKFile *vf;
02498 this->mu.lock();
02499
02500
02501
02502
02503 if(!this->usedCIs.Read(logEntry.ci))
02504 {
02505 Debug::Lock();
02506
02507 cerr << Debug::Timestamp()
02508 << "INTERNAL ERROR: unused CI in cache log:" << endl
02509 << " pk = " << *pk << endl
02510 << " ci = " << logEntry.ci << endl
02511 << " (Please report this as a bug.)" << endl;
02512
02513 Debug::Unlock();
02514 abort();
02515 }
02516
02517 (void)(this->GetVPKFile(*pk, vf));
02518 this->state.newEntryCnt++;
02519 this->state.newPklSize += logEntry.value->len;
02520 int currentFreeEpoch = this->freeMPKFileEpoch;
02521 this->mu.unlock();
02522
02523
02524 this->cacheLogMu.unlock();
02525 try {
02526
02527 vf->mu.lock();
02528
02529 vf->UpdateFreeEpoch(currentFreeEpoch);
02530 try {
02531 FP::Tag *commonFP;
02532 CE::T *entry = vf->NewEntry(logEntry.ci,
02533 logEntry.names, logEntry.fps,
02534 logEntry.value,
02535 logEntry.model, logEntry.kids,
02536 commonFP);
02537
02538 vf->AddEntry(NEW_CONSTR(Text, (logEntry.sourceFunc)), entry,
02539 commonFP, logEntry.pkEpoch);
02540 } catch (...) { vf->mu.unlock(); throw; }
02541 vf->mu.unlock();
02542
02543
02544 PKPrefix::T pfx(*pk);
02545 this->AddEntryToMPKFile(pfx,
02546 "MultiPKFile threshold exceeded during recovery");
02547
02548 } catch (...) { this->cacheLogMu.lock(); throw; }
02549 this->cacheLogMu.lock();
02550
02551
02552 if (this->debug >= CacheIntf::LogRecover) {
02553 Debug::Lock();
02554 logEntry.Debug(cout);
02555 empty = false;
02556 Debug::Unlock();
02557 }
02558 }
02559 }
02560 }
02561
02562 void CacheS::RecoverCacheLog()
02563 throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02564
02565 {
02566
02567 bool empty = true;
02568 if (this->debug >= CacheIntf::LogRecover) {
02569 Debug::Lock();
02570 cout << Debug::Timestamp() << "RECOVERING -- EmptyPKLog" << endl;
02571 Debug::Unlock();
02572 }
02573
02574
02575 this->mu.lock();
02576 try {
02577
02578
02579
02580 do {
02581 this->RecoverEmptyPKLog( empty);
02582 } while (this->emptyPKLog->NextLog());
02583 this->emptyPKLog->EndRecovery();
02584 } catch (...) { this->mu.unlock(); throw; }
02585 this->mu.unlock();
02586
02587
02588 if (this->debug >= CacheIntf::LogRecover) {
02589 Debug::Lock();
02590 if (empty) cout << " <<Empty>>" << endl;
02591 cout << endl;
02592 cout << Debug::Timestamp() << "RECOVERING -- CacheLog" << endl;
02593 Debug::Unlock();
02594 }
02595
02596
02597 this->mu.lock();
02598 this->cacheLog = NEW(VestaLog);
02599 this->cacheLogFlushQ = NEW_CONSTR(FlushQueue, (&(this->mu)));
02600 this->vCacheLog = this->vCacheAvail = (CacheLog::Entry *)NULL;
02601 this->vCacheLogTail = (CacheLog::Entry *)NULL;
02602 this->mu.unlock();
02603
02604
02605
02606
02607
02608 EmptyPKLog::PKEpochTbl pkEpochTbl;
02609
02610
02611 empty = true;
02612 this->cacheLogMu.lock();
02613 try {
02614 this->cacheLog->open(Config_CacheLogPath.chars());
02615
02616
02617 fstream *chkpt = this->cacheLog->openCheckpoint();
02618 if (chkpt != (fstream *)NULL) {
02619 RecoveryReader rd(chkpt);
02620 this->RecoverCacheLogEntries(rd,
02621 pkEpochTbl, empty);
02622 FS::Close(*chkpt);
02623 }
02624
02625
02626 do {
02627 RecoveryReader rd(this->cacheLog);
02628 this->RecoverCacheLogEntries(rd,
02629 pkEpochTbl, empty);
02630 } while (this->cacheLog->nextLog());
02631 this->cacheLog->loggingBegin();
02632
02633 } catch (...) { this->cacheLogMu.unlock(); throw; }
02634 this->cacheLogMu.unlock();
02635
02636
02637 if (this->debug >= CacheIntf::LogRecover) {
02638 Debug::Lock();
02639 if (empty) cout << " <<Empty>>" << endl;
02640 cout << endl;
02641 Debug::Unlock();
02642 }
02643 }
02644
02645
02646
02647 void CacheS::LogGraphNode(CacheEntry::Index ci, FP::Tag *loc, Model::T model,
02648 CacheEntry::Indices *kids, Derived::Indices *refs) throw ()
02649
02650 {
02651 GraphLog::Node *logNode;
02652
02653
02654 if (this->vGraphNodeAvail == (GraphLog::Node *)NULL) {
02655 logNode = NEW_CONSTR(GraphLog::Node, (ci, loc, model, kids, refs));
02656 } else {
02657 logNode = this->vGraphNodeAvail;
02658 assert(logNode->kind == GraphLog::NodeKind);
02659 this->vGraphNodeAvail = logNode->next;
02660 this->vGraphNodeAvailLen--;
02661 logNode->Init(ci, loc, model, kids, refs);
02662 }
02663
02664
02665 if (this->vGraphLogTail == (GraphLog::Node *)NULL)
02666 this->vGraphLog = logNode;
02667 else
02668 this->vGraphLogTail->next = logNode;
02669 this->vGraphLogTail = logNode;
02670 }
02671
02672 void CacheS::FlushGraphLog() throw (VestaLog::Error)
02673
02674 {
02675 GraphLog::Node *vLog;
02676
02677
02678 this->mu.lock();
02679 vLog = this->vGraphLog;
02680 this->vGraphLog = this->vGraphLogTail = (GraphLog::Node *)NULL;
02681 this->graphLogFlushQ->Enqueue();
02682 this->mu.unlock();
02683
02684
02685 this->FlushUsedCIs();
02686
02687
02688 if (vLog == (GraphLog::Node *)NULL)
02689 {
02690 this->mu.lock();
02691 this->graphLogFlushQ->Dequeue();
02692 this->mu.unlock();
02693 return;
02694 }
02695
02696
02697 if (this->debug >= CacheIntf::LogFlush) {
02698 Debug::Lock();
02699 cout << Debug::Timestamp() << "STARTED -- Flushing GraphLog" << endl;
02700 cout << endl;
02701 Debug::Unlock();
02702 }
02703
02704
02705 int numFlushed = 0;
02706 GraphLog::Node *last;
02707 this->graphLogMu.lock();
02708 try {
02709 this->graphLog->start();
02710 for (GraphLog::Node *curr = vLog; curr != NULL; curr = curr->next) {
02711 last = curr;
02712 if (numFlushed % 20 == 0 && numFlushed > 0) {
02713
02714
02715 this->graphLog->commit();
02716 this->graphLog->start();
02717 }
02718 curr->Log(*(this->graphLog));
02719 numFlushed++;
02720 if (this->debug >= CacheIntf::LogFlushEntries) {
02721 Debug::Lock();
02722 curr->Debug(cout);
02723 Debug::Unlock();
02724 }
02725 curr->Reset();
02726
02727 }
02728 this->graphLog->commit();
02729 } catch (...) { this->graphLogMu.unlock(); throw; }
02730 this->graphLogMu.unlock();
02731
02732
02733 if (this->debug >= CacheIntf::LogFlushEntries) {
02734 Debug::Lock();
02735 cout << endl;
02736 Debug::Unlock();
02737 }
02738 if (this->debug >= CacheIntf::LogFlush) {
02739 Debug::Lock();
02740 cout << Debug::Timestamp() << "FINISHED -- Flushing GraphLog" << endl;
02741 cout << " Nodes flushed = " << numFlushed << endl;
02742 cout << endl;
02743 Debug::Unlock();
02744 }
02745
02746
02747 this->mu.lock();
02748 last->next = this->vGraphNodeAvail;
02749 this->vGraphNodeAvail = vLog;
02750 this->vGraphNodeAvailLen += numFlushed;
02751 this->graphLogFlushQ->Dequeue();
02752 this->mu.unlock();
02753 }
02754
02755 int CacheS::ChkptGraphLog() throw (FS::Failure, VestaLog::Error)
02756
02757 {
02758 int res;
02759 fstream *fs;
02760 this->graphLogMu.lock();
02761 try {
02762 fs = this->graphLog->checkpointBegin();
02763 FS::Close(*fs);
02764 res = this->graphLog->logVersion();
02765 } catch (...) { this->graphLogMu.unlock(); throw; }
02766 this->graphLogMu.unlock();
02767 return res;
02768 }
02769
02770 void CacheS::AbortGraphLogChkpt() throw (VestaLog::Error)
02771
02772 {
02773 this->graphLogMu.lock();
02774 try {
02775 this->graphLog->checkpointAbort();
02776 } catch (...) { this->graphLogMu.unlock(); throw; }
02777 this->graphLogMu.unlock();
02778 }
02779
02780 void CacheS::RecoverGraphLog()
02781 throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02782
02783
02784
02785
02786 {
02787
02788 this->mu.lock();
02789 this->graphLog = NEW(VestaLog);
02790 this->graphLogFlushQ = NEW_CONSTR(FlushQueue, (&(this->mu)));
02791 this->vGraphLog = this->vGraphNodeAvail = (GraphLog::Node *)NULL;
02792 this->vGraphLogTail = (GraphLog::Node *)NULL;
02793 this->vGraphNodeAvailLen = 0;
02794 this->mu.unlock();
02795
02796 this->graphLogMu.lock();
02797 try {
02798
02799 this->graphLog->open(Config_GraphLogPath.chars());
02800
02801
02802
02803
02804
02805
02806 char c;
02807 do {
02808 while (!this->graphLog->eof()) this->graphLog->get(c);
02809 } while (this->graphLog->nextLog());
02810
02811
02812 fstream *ofs = this->graphLog->checkpointResume( ios::out);
02813 if (ofs != (fstream *)NULL) {
02814 FS::Close(*ofs);
02815 this->mu.lock();
02816 this->graphLogChkptVer = this->graphLog->logVersion();
02817 this->mu.unlock();
02818 }
02819
02820
02821 this->graphLog->loggingBegin();
02822 } catch (...) { this->graphLogMu.unlock(); throw; }
02823 this->graphLogMu.unlock();
02824 }
02825
02826
02827
02828 void CacheS::LogCI(Intvl::Op op, CacheEntry::Index ci) throw ()
02829
02830 {
02831 if (this->vCILog != (Intvl::List *)NULL && this->vCILog->i.op == op
02832 && this->vCILog->i.hi + 1 == ci) {
02833 this->vCILog->i.hi = ci;
02834 } else {
02835
02836 Intvl::List *il;
02837 if (this->vCIAvail == (Intvl::List *)NULL) {
02838 il = NEW(Intvl::List);
02839 } else {
02840 il = this->vCIAvail;
02841 this->vCIAvail = il->next;
02842 }
02843
02844
02845 il->i.op = op;
02846 il->i.lo = il->i.hi = ci;
02847 il->next = this->vCILog;
02848 this->vCILog = il;
02849 }
02850 }
02851
02852 void CacheS::FlushUsedCIs() throw (VestaLog::Error)
02853
02854 {
02855 Intvl::List *vLog, *curr, *last;
02856
02857
02858 this->mu.lock();
02859 vLog = this->vCILog;
02860 this->vCILog = (Intvl::List *)NULL;
02861 this->ciLogFlushQ->Enqueue();
02862 this->mu.unlock();
02863
02864 if (vLog == (Intvl::List *)NULL)
02865 {
02866 this->mu.lock();
02867 this->ciLogFlushQ->Dequeue();
02868 this->mu.unlock();
02869
02870 return;
02871 }
02872
02873
02874 if (this->debug >= CacheIntf::LogFlush) {
02875 Debug::Lock();
02876 cout << Debug::Timestamp() << "STARTED -- Flushing Used CI's Log"
02877 << endl << endl;
02878 Debug::Unlock();
02879 }
02880
02881
02882 int numFlushed;
02883 this->ciLogMu.lock();
02884 try {
02885 last = this->FlushUsedCIsList(vLog, numFlushed);
02886 } catch (...) { this->ciLogMu.unlock(); throw; }
02887 ciLogMu.unlock();
02888
02889
02890 if (this->debug >= CacheIntf::LogFlush) {
02891 Debug::Lock();
02892 cout << Debug::Timestamp() << "FINISHED -- Flushing Used CI's Log"
02893 << endl;
02894 cout << " Intervals flushed = " << numFlushed << endl;
02895 cout << endl;
02896 Debug::Unlock();
02897 }
02898
02899
02900 this->mu.lock();
02901 last->next = this->vCIAvail;
02902 this->vCIAvail = vLog;
02903 this->ciLogFlushQ->Dequeue();
02904 this->mu.unlock();
02905 }
02906
02907 Intvl::List *CacheS::FlushUsedCIsList(Intvl::List *vLog,
02908 int &numFlushed) throw (VestaLog::Error)
02909
02910 {
02911
02912 assert(vLog != (Intvl::List *)NULL);
02913 Intvl::List *curr, *last;
02914 numFlushed = 0;
02915 this->ciLog->start();
02916 for (curr = vLog; curr != (Intvl::List *)NULL; curr = curr->next) {
02917 last = curr;
02918 curr->i.Log(*(this->ciLog));
02919 numFlushed++;
02920 if (this->debug >= CacheIntf::LogFlushEntries) {
02921 Debug::Lock();
02922 curr->i.Debug(cout);
02923 Debug::Unlock();
02924 }
02925
02926 }
02927 this->ciLog->commit();
02928 if (this->debug >= CacheIntf::LogFlushEntries) {
02929 Debug::Lock();
02930 cout << endl;
02931 Debug::Unlock();
02932 }
02933 return last;
02934 }
02935
02936 void CacheS::ChkptUsedCIs(const BitVector &del)
02937 throw (VestaLog::Error, FS::Failure)
02938
02939
02940
02941
02942
02943
02944
02945
02946
02947
02948
02949
02950
02951
02952
02953
02954
02955
02956
02957
02958 {
02959 BitVector *usedCIsCopy;
02960 fstream *chkptFS;
02961
02962
02963 this->FlushUsedCIs();
02964
02965
02966 if (this->debug >= CacheIntf::LogFlush) {
02967 Debug::Lock();
02968 cout << Debug::Timestamp()<< "STARTED -- Flushing Used CI's Log"
02969 << endl << endl;
02970 Debug::Unlock();
02971 }
02972
02973
02974 int numFlushed;
02975 this->ciLogMu.lock();
02976 try {
02977 this->mu.lock();
02978 try {
02979
02980
02981
02982 Intvl::List *vLog = this->vCILog;
02983 this->vCILog = (Intvl::List *)NULL;
02984 if (vLog != (Intvl::List *)NULL) {
02985
02986 Intvl::List *last = this->FlushUsedCIsList(vLog,numFlushed);
02987
02988
02989 last->next = this->vCIAvail;
02990 this->vCIAvail = vLog;
02991 }
02992
02993
02994 this->usedCIs -= del;
02995 this->entryCnt = this->usedCIs.Cardinality();
02996
02997
02998 usedCIsCopy = NEW_CONSTR(BitVector, (&(this->usedCIs)));
02999
03000 } catch (...) { this->mu.unlock(); this->ciLogMu.unlock(); throw; }
03001 this->mu.unlock();
03002
03003
03004 chkptFS = this->ciLog->checkpointBegin();
03005 } catch (...) { this->ciLogMu.unlock(); throw; }
03006 this->ciLogMu.unlock();
03007
03008
03009 if (this->debug >= CacheIntf::LogFlush) {
03010 Debug::Lock();
03011 cout << Debug::Timestamp()<< "FINISHED -- Flushing Used CI's Log"
03012 << endl;
03013 cout << " Intervals flushed = " << numFlushed << endl;
03014 cout << endl;
03015 Debug::Unlock();
03016 }
03017
03018
03019 try {
03020 usedCIsCopy->Write(*chkptFS);
03021 FS::Close(*chkptFS);
03022 } catch (...) {
03023
03024 this->ciLogMu.lock();
03025 try {
03026 this->ciLog->checkpointAbort();
03027 } catch (...) { this->ciLogMu.unlock(); throw; }
03028 this->ciLogMu.unlock();
03029 throw;
03030 }
03031
03032
03033 this->ciLogMu.lock();
03034 try {
03035 this->ciLog->checkpointEnd();
03036 this->ciLog->prune( 1);
03037 } catch (...) { this->ciLogMu.unlock(); throw; }
03038 this->ciLogMu.unlock();
03039 }
03040
03041 void CacheS::RecoverCILog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
03042
03043 {
03044
03045 bool empty;
03046 if (this->debug >= CacheIntf::LogRecover) {
03047 Debug::Lock();
03048 cout << Debug::Timestamp() << "RECOVERING -- Used CI's Log" << endl;
03049 empty = true;
03050 Debug::Unlock();
03051 }
03052
03053
03054 this->mu.lock();
03055 this->ciLog = NEW(VestaLog);
03056 this->ciLogFlushQ = NEW_CONSTR(FlushQueue, (&(this->mu)));
03057 this->usedCIs.ResetAll();
03058 this->entryCnt = 0;
03059 this->vCILog = this->vCIAvail = (Intvl::List *)NULL;
03060 this->mu.unlock();
03061
03062
03063 fstream *chkpt = (fstream *)NULL;
03064 this->ciLogMu.lock();
03065 try {
03066
03067 this->ciLog->open(Config_CILogPath.chars());
03068
03069
03070 chkpt = this->ciLog->openCheckpoint();
03071 if (chkpt != (fstream *)NULL) {
03072 RecoveryReader rd(chkpt);
03073 this->mu.lock();
03074 try {
03075 this->usedCIs.Recover(rd);
03076 } catch (...) { this->mu.unlock(); throw; }
03077 this->mu.unlock();
03078 FS::Close(*chkpt);
03079 }
03080 } catch (...) { this->ciLogMu.unlock(); throw; }
03081 this->ciLogMu.unlock();
03082
03083
03084 if (chkpt != (fstream *)NULL && this->debug >= CacheIntf::LogRecover) {
03085 Debug::Lock();
03086 cout << " usedCIs = " << usedCIs << endl;
03087 empty = false;
03088 Debug::Unlock();
03089 }
03090
03091
03092 this->ciLogMu.lock();
03093 try {
03094
03095 RecoveryReader rd(this->ciLog);
03096 do {
03097 while (!rd.eof()) {
03098 Intvl::T intvl(rd);
03099 this->mu.lock();
03100 try {
03101 this->usedCIs.WriteInterval(intvl.lo, intvl.hi,
03102 (intvl.op == Intvl::Add));
03103 } catch (...) { this->mu.unlock(); throw; }
03104 this->mu.unlock();
03105 if (this->debug >= CacheIntf::LogRecover) {
03106 intvl.Debug(cout);
03107 empty = false;
03108 }
03109 }
03110 } while (this->ciLog->nextLog());
03111 this->ciLog->loggingBegin();
03112 } catch (...) { this->ciLogMu.unlock(); throw; }
03113 this->ciLogMu.unlock();
03114
03115
03116 this->mu.lock();
03117 this->entryCnt = this->usedCIs.Cardinality();
03118 this->mu.unlock();
03119
03120
03121 if (this->debug >= CacheIntf::LogRecover) {
03122 Debug::Lock();
03123 if (empty) cout << " <<Empty>>" << endl;
03124 cout << endl;
03125 Debug::Unlock();
03126 }
03127 }
03128
03129
03130
03131 CacheWorker::CacheWorker(CacheS *cs) throw ()
03132 : cs(cs), reason("idle")
03133 {
03134
03135 this->mu.lock();
03136 this->argsReady = false;
03137 this->mu.unlock();
03138 }
03139
03140 void CacheWorker::Start(const char *reason) throw ()
03141 {
03142
03143 this->mu.lock();
03144 this->argsReady = true;
03145 this->reason = reason;
03146 this->mu.unlock();
03147 this->workToDo.signal();
03148 }
03149
03150 void CacheWorker::Finish() throw ()
03151 {
03152 this->mu.lock();
03153 this->argsReady = false;
03154 this->reason = "idle";
03155 this->mu.unlock();
03156 }
03157
03158 FlushWorker::FlushWorker(CacheS *cs) throw ()
03159 : CacheWorker(cs)
03160 {
03161
03162 this->th.fork_and_detach(CacheS_FlushWorker, (void *)this);
03163 }
03164
03165 void* CacheS_FlushWorker(void *arg) throw ()
03166
03167 {
03168 FlushWorker *fw = (FlushWorker *)arg;
03169 while (true) {
03170
03171 fw->mu.lock();
03172 while (!(fw->argsReady)) {
03173 fw->workToDo.wait(fw->mu);
03174 }
03175 fw->mu.unlock();
03176
03177
03178 fw->cs->VToSCache(fw->pfx);
03179
03180
03181 fw->Finish();
03182 fw->cs->RegisterIdleFlushWorker(fw);
03183 }
03184
03185
03186
03187 }
03188
03189 void FlushWorker::Start(const PKPrefix::T &pfx, const char *reason) throw ()
03190 {
03191
03192 this->pfx = pfx;
03193
03194
03195 ((CacheWorker *)this)->Start(reason);
03196 }
03197
03198 CleanWorker::CleanWorker(CacheS *cs) throw ()
03199 : CacheWorker(cs)
03200 {
03201
03202 this->th.fork_and_detach(CacheS_CleanCacheLogWorker, (void *)this);
03203 }