Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

CacheS.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // Last modified on Thu Aug 11 23:43:29 EDT 2005 by ken@xorian.net         
00020 //      modified on Fri Jul 29 11:27:46 EDT 2005 by irina.furman@intel.com 
00021 //      modified on Tue Jun 27 17:37:43 PDT 2000 by mann  
00022 //      modified on Fri Feb  4 17:46:52 PST 2000 by heydon
00023 //      modified on Tue Feb 10 16:12:36 PST 1998 by yuanyu
00024 
00025 // CacheS.C -- the Vesta-2 cache server
00026 
00027 #include <sys/types.h>
00028 #include <dirent.h>
00029 #include <time.h>
00030 #include <unistd.h>
00031 
00032 // #include <gc.h>
00033 #include <Basics.H>
00034 #include <FS.H>
00035 #include <VestaLog.H>
00036 #include <Recovery.H>
00037 #include <VestaLogSeq.H>
00038 #include <AtomicFile.H>
00039 // #include <Drd.H>
00040 #include <FP.H>
00041 #include <UniqueId.H>
00042 
00043 // cache-common
00044 #include <BitVector.H>
00045 #include <CacheConfig.H>
00046 #include <CacheIntf.H>
00047 #include <Debug.H>
00048 #include <PKPrefix.H>
00049 #include <Timer.H>
00050 #include <Model.H>
00051 #include <PKEpoch.H>
00052 #include <CacheIndex.H>
00053 #include <Derived.H>
00054 #include <FV.H>
00055 #include <VestaVal.H>
00056 #include <GraphLog.H>
00057 
00058 // cache-server
00059 #include <CacheConfigServer.H>
00060 #include <Leases.H>
00061 #include <CacheEntry.H>
00062 #include <CacheLog.H>
00063 #include <Intvl.H>
00064 #include <VPKFile.H>
00065 #include <VMultiPKFile.H>
00066 
00067 // local
00068 #include "VCacheVersion.H"
00069 #include "PKPrefixSeq.H"
00070 #include "CacheS.H"
00071 
00072 using std::ios;
00073 using std::fstream;
00074 using std::ifstream;
00075 using std::cout;
00076 using std::cerr;
00077 using std::endl;
00078 
00079 CacheS::CacheS(CacheIntf::DebugLevel debug, bool noHits) throw ()
00080 /* REQUIRES LL = Empty */
00081 : debug(debug), noHits(noHits)
00082 {
00083     try {
00084         // initialize cache server values
00085         this->mu.lock();
00086         try {
00087           this->leases =
00088             NEW_CONSTR(Leases, (&(this->mu),
00089                                 Config_LeaseTimeoutSecs,
00090                                 (this->debug >= CacheIntf::LeaseExp)));
00091           this->cache = NEW_CONSTR(CacheMap, (5, /*useGC=*/ true));
00092           this->mpkTbl = NEW_CONSTR(MPKMap, (5, /*useGC=*/ true));
00093           this->emptyPKLog = NEW_CONSTR(EmptyPKLog,
00094                                         (&(this->mu), this->debug));
00095           this->weederSRPC = (SRPC *)NULL;
00096           this->graphLogChkptVer = -1;
00097           this->freeMPKFileEpoch = 0;
00098           this->entryCnt = 0;
00099           this->idleFlushWorkers = (FlushWorkerList *)NULL;
00100           this->numActiveFlushWorkers = 0;
00101         } catch (...) { this->mu.unlock(); throw; }
00102         this->mu.unlock();
00103 
00104         this->Recover();
00105 
00106         // Record instance fingerprint.
00107         this->instanceFp = UniqueId();
00108         // This is probably excessive paranoia, but include the
00109         // current total number of cache entries in the fingerprint.
00110         this->instanceFp.Extend((char *) &(this->entryCnt),
00111                                 sizeof(this->entryCnt));
00112         // Debugging
00113         if (this->debug >= CacheIntf::StatusMsgs)
00114           {
00115             Debug::Lock();
00116             cout << Debug::Timestamp()
00117                  << "Instance fingerprint: "
00118                  << this->instanceFp << endl;
00119             Debug::Unlock();
00120           }
00121 
00122         // fork "DoFreeMPKFiles" thread
00123         Basics::thread *th2 = NEW_PTRFREE(Basics::thread);
00124         th2->fork_and_detach(CacheS_DoFreeMPKFiles, (void *)this);
00125 
00126         // fork "DoDeletions" thread
00127         Basics::thread *th1 = NEW_PTRFREE(Basics::thread);
00128         th1->fork_and_detach(CacheS_DoDeletions, (void *)this);
00129 
00130         // allocate single worker thread to clean cache log
00131         this->cacheLogMu.lock();
00132         try {
00133           this->idleCleanWorker = NEW_CONSTR(CleanWorker, (this));
00134           this->cacheLogLen = 0;
00135         } catch (...) { this->cacheLogMu.unlock(); throw; }
00136         this->cacheLogMu.unlock();
00137 
00138         // allocate a thread to handle Checkpoint calls
00139         this->chkptMu.lock();
00140         this->availChkptWorkers = (ChkptWorker *)NULL;
00141         this->queuedChkptWorkers = (ChkptWorker *)NULL;
00142         this->chkptMu.unlock();
00143         this->chkptTh.fork_and_detach(ChkptWorker::MainLoop, (void *)this);
00144 
00145         // allocate an initial set of FlushWorker threads
00146         typedef FlushWorker *FlushWorkerPtr;
00147         FlushWorkerPtr *workers = NEW_ARRAY(FlushWorkerPtr,
00148                                             Config_FlushWorkerCnt);
00149         this->mu.lock();
00150         int i;
00151         for (i = 0; i < Config_FlushWorkerCnt; i++) {
00152             workers[i] = this->NewFlushWorker(/*block=*/ false);
00153         }
00154         this->mu.unlock();
00155         for (i = 0; i < Config_FlushWorkerCnt; i++) {
00156             RegisterIdleFlushWorker(workers[i]);
00157             workers[i] = NULL;
00158         }
00159         delete[] workers;
00160 
00161         // record start time
00162         this->mu.lock();
00163         this->startTime = time((time_t *)NULL);
00164         assert(this->startTime >= 0);
00165         this->mu.unlock();
00166     }
00167     catch (const VestaLog::Error &err) {
00168         cerr << "VestaLog fatal error: ";
00169         cerr << "failed recovering cache server" << endl;
00170         cerr << "  num = " << err.r << "; " << err.msg << endl;
00171         cerr << "Exiting..." << endl;
00172         exit(1);
00173     }
00174     catch (VestaLog::Eof) {
00175         cerr << "VestaLog fatal error: ";
00176         cerr << "unexpected EOF recovering cache server; exiting..." << endl;
00177         exit(1);
00178     }
00179     catch (const FS::Failure &f) {
00180         cerr << "FS fatal error: ";
00181         cerr << "unexpected FS error recovering cache server; exiting..."
00182              << endl << f;
00183         exit(1);
00184     }
00185 }
00186 
00187 /* Evaluator client methods ------------------------------------------------ */
00188 
00189 bool CacheS::FindVPKFile(const FP::Tag &pk, /*OUT*/ VPKFile* &vpk) throw ()
00190 /* REQUIRES Sup(LL) = SELF.mu */
00191 /* Note: This operation will lead to a disk access if the VPKFile for "pk" is
00192    not currently in the cache. */
00193 {
00194     bool res = this->cache->Get(pk, /*OUT*/ vpk);
00195     if (!res) {
00196         // create the VPKFile; this attempts to read the SPKFile on disk
00197         try {
00198             PKFile::Epoch delPKEpoch = 0;
00199             (void)(this->emptyPKLog->GetEpoch0(pk, /*OUT*/ delPKEpoch));
00200             FV::Epoch namesEpoch = 0;
00201             (void)evictedNamesEpochs.Delete(pk, namesEpoch);
00202             vpk = NEW_CONSTR(VPKFile, (pk, delPKEpoch, namesEpoch));
00203         }
00204         catch (const FS::Failure &f) {
00205             // fatal error
00206             cerr << f;
00207             assert(false);
00208         }
00209     }
00210     return res;
00211 }
00212 
00213 PKFile::Epoch CacheS::PKFileEpoch(const FP::Tag &pk) throw ()
00214 /* REQUIRES Sup(LL) = SELF.mu */
00215 {
00216     VPKFile *vpk;
00217     (void)(this->FindVPKFile(pk, /*OUT*/ vpk));
00218     return vpk->PKEpoch();
00219 }
00220 
00221 bool CacheS::GetVPKFile(const FP::Tag &pk, /*OUT*/ VPKFile* &vpk) throw ()
00222 /* REQUIRES Sup(LL) = SELF.mu */
00223 /* Note: This operation will lead to a disk access if the VPKFile for "pk" is
00224    not currently in the cache. */
00225 {
00226     bool res = this->FindVPKFile(pk, /*OUT*/ vpk);
00227     if (!res) {
00228         // install it in the cache
00229         bool inCache = cache->Put(pk, vpk); assert(!inCache);
00230 
00231         // add it to the appropriate VMultiPKFile
00232         PKPrefix::T pfx(pk); VMultiPKFile *mpk;
00233         if (!(this->mpkTbl->Get(pfx, /*OUT*/ mpk))) {
00234           // create the new VMultiPKFile and install it
00235           mpk = NEW_CONSTR(VMultiPKFile, (pfx));
00236           bool inMPK = this->mpkTbl->Put(pfx, mpk); assert(!inMPK);
00237         }
00238         bool inVMPK = mpk->Put(pk, vpk); assert(!inVMPK);
00239     }
00240 
00241     return res;
00242 }
00243 
00244 void CacheS::FreeVariables(const FP::Tag& pk, /*OUT*/ VPKFile* &vf)
00245   throw ()
00246 /* REQUIRES Sup(LL) < SELF.mu) */
00247 {
00248     this->mu.lock();
00249     this->cnt.freeVarsCnt++;
00250     (void)(this->GetVPKFile(pk, /*OUT*/ vf));
00251     int currentFreeEpoch = this->freeMPKFileEpoch;
00252     this->mu.unlock();
00253 
00254     // Note that this PKFile has been accessed.
00255     vf->mu.lock();
00256 
00257     // Ensure that we don't have an evicted VPKFile (needed to
00258     // avoid a race with freeing empty VPKFiles).
00259     while(vf->Evicted())
00260       {
00261         // Unlock the evicted one, as we won't be using it.
00262         vf->mu.unlock();
00263         // Get another VPKFile object for this PK (probably
00264         // creating it).
00265         this->mu.lock();
00266         (void)(this->GetVPKFile(pk, /*OUT*/ vf));
00267         this->mu.unlock();
00268         // Lock our new VPKFile object.
00269         vf->mu.lock();
00270       }
00271 
00272     vf->UpdateFreeEpoch(currentFreeEpoch);
00273     vf->mu.unlock();
00274 }
00275 
00276 static void Etp_CacheS_Lookup(int fps_bytes, int outcome, int val_bytes)
00277   throw ()
00278 /* This procedure is a no-op. It's for etp(1) logging purposes only.
00279    It reflects the size of the arguments and result returned by the
00280    CacheS::Lookup method. "fps_bytes" is the size of the fingerprint
00281    argument (in bytes). "outcome" is the lookup outcome as defined in the
00282    "CacheIntf" interface. Finally, "val_bytes" is the total size of the
00283    returned value in bytes. */
00284 { /* SKIP */ }
00285 
00286 CacheIntf::LookupRes
00287 CacheS::Lookup(const FP::Tag &pk, FV::Epoch id, const FP::List &fps,
00288   /*OUT*/ CacheEntry::Index &ci, /*OUT*/ const VestaVal::T* &value) throw ()
00289 /* REQUIRES (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
00290 {
00291     CacheIntf::LookupRes res;
00292     CacheIntf::LookupOutcome outcome;
00293 
00294     // get VPKFile
00295     VPKFile *vf;
00296     this->mu.lock();
00297     this->cnt.lookupCnt++;
00298     (void)(this->GetVPKFile(pk, /*OUT*/ vf));
00299     int currentFreeEpoch = this->freeMPKFileEpoch;
00300     this->mu.unlock();
00301 
00302     vf->mu.lock();
00303 
00304     // Ensure that we don't have an evicted VPKFile (needed to
00305     // avoid a race with freeing empty VPKFiles).
00306     while(vf->Evicted())
00307       {
00308         // Unlock the evicted one, as we won't be using it.
00309         vf->mu.unlock();
00310         // Get another VPKFile object for this PK (probably
00311         // creating it).
00312         this->mu.lock();
00313         (void)(this->GetVPKFile(pk, /*OUT*/ vf));
00314         this->mu.unlock();
00315         // Lock our new VPKFile object.
00316         vf->mu.lock();
00317       }
00318 
00319     // Note that this PKFile has been accessed.
00320     vf->UpdateFreeEpoch(currentFreeEpoch);
00321 
00322     // actual work
00323     try {
00324       res = vf->Lookup(id, fps,
00325                        /*OUT*/ ci, /*OUT*/ value, /*OUT*/ outcome);
00326     }
00327     catch (const FS::Failure &f) {
00328       // fatal error
00329       cerr << f;
00330       assert(false);
00331     }
00332 
00333     vf->mu.unlock();
00334 
00335     if (res == CacheIntf::Hit) {
00336       if (this->noHits) {
00337         // don't allow a hit if "-noHits" was specified
00338         res = CacheIntf::Miss;
00339       } else {
00340         this->mu.lock();
00341         try {
00342           // do "hitFilter" screening
00343           if (this->hitFilter.Read(ci) &&
00344               !(this->leases->IsLeased(ci))) {
00345             res = CacheIntf::Miss;
00346           } else {
00347             // otherwise, take out lease on "ci"
00348             this->leases->NewLease(ci);
00349           }
00350 
00351           // update stats if a new entry was paged in
00352           if (outcome == CacheIntf::DiskHits) {
00353             this->state.oldEntryCnt++;
00354             this->state.oldPklSize += value->len;
00355           }
00356 
00357           // Make sure that the CI we're returning is a used CI.
00358           // Getting a cache hit on an unused CI indicates cache
00359           // corruption, possibly a weeder bug.
00360           if(!this->usedCIs.Read(ci))
00361             {
00362               Debug::Lock();
00363               cerr << Debug::Timestamp()
00364                    << "INTERNAL ERROR: hit on unused CI" << endl
00365                    << "    ci = " << ci << endl
00366                    << "    pk = " << pk << endl
00367                    << " (Please report this as a bug.)" << endl;
00368               Debug::Unlock();
00369               abort();
00370             }
00371         } catch (...) { this->mu.unlock(); throw; }
00372         this->mu.unlock();
00373       }
00374     }
00375     Etp_CacheS_Lookup(fps.Size(), outcome,
00376       (res == CacheIntf::Hit) ? value->Size() : sizeof(value));
00377     return res;
00378 }
00379 
00380 static void Etp_CacheS_AddEntry(int arg_bytes) throw ()
00381 /* This procedure is a no-op. It's for etp(1) logging purposes only.
00382    It reflects the size of the arguments of the CacheS::AddEntry method.
00383    "arg_bytes" is the total size of the arguments (in bytes). */
00384 { /* SKIP */ }
00385 
00386 CacheIntf::AddEntryRes
00387 CacheS::AddEntry(FP::Tag *pk, FV::List *names, FP::List *fps,
00388   VestaVal::T *value, Model::T model, CacheEntry::Indices *kids,
00389   const Text& sourceFunc, /*OUT*/ CacheEntry::Index& ci) throw ()
00390 /* REQUIRES (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
00391 {
00392     GraphLog::Node *dummy_var; // causes GC-related crash not to occur?
00393     CacheIntf::AddEntryRes res = CacheIntf::BadAddEntryArgs;
00394     if (names->len != fps->len) {
00395         res = CacheIntf::BadAddEntryArgs;
00396     } else {
00397         int kid; VPKFile *vf;
00398         this->mu.lock();
00399         this->cnt.addEntryCnt++;
00400         int currentFreeEpoch = this->freeMPKFileEpoch;
00401         try {
00402             // find VPKFile for this PK
00403             (void)(this->GetVPKFile(*pk, /*OUT*/ vf));
00404 
00405             // get next available CI and log it
00406             BitVector *except = this->deleting
00407               ? &(this->hitFilter) : (BitVector *)NULL;
00408             ci = this->usedCIs.NextAvailExcept(except);
00409             this->entryCnt++;
00410             this->LogCI(/*op=*/ Intvl::Add, ci);
00411 
00412             // take out lease on "ci"
00413             this->leases->NewLease(ci);
00414 
00415             // check for necessary cache entry leases
00416             for (kid = 0; kid < kids->len; kid++) {
00417               if (!this->leases->IsLeased(kids->index[kid])) break;
00418             }
00419 
00420             // If all child CIs are leased...
00421             if (kid == kids->len) {
00422               // add node to "vGraphLog"
00423               this->LogGraphNode(ci, pk, model, kids, &(value->dis));
00424             }
00425         } catch (...) { this->mu.unlock(); throw; }
00426         this->mu.unlock();
00427 
00428         if (kid < kids->len) {
00429             // exited loop prematurely
00430             res = CacheIntf::NoLease;
00431         } else {
00432             vf->mu.lock();
00433 
00434             // Ensure that we don't have an evicted VPKFile (needed to
00435             // avoid a race with freeing empty VPKFiles).
00436             while(vf->Evicted())
00437               {
00438                 // Unlock the evicted one, as we won't be using it.
00439                 vf->mu.unlock();
00440                 // Get another VPKFile object for this PK (probably
00441                 // creating it).
00442                 this->mu.lock();
00443                 (void)(this->GetVPKFile(*pk, /*OUT*/ vf));
00444                 this->mu.unlock();
00445                 // Lock our new VPKFile object.
00446                 vf->mu.lock();
00447               }
00448 
00449             // Note that this PKFile has been accessed.
00450             vf->UpdateFreeEpoch(currentFreeEpoch);
00451 
00452             try {
00453                 // create cache entry
00454                 FP::Tag *commonFP;
00455                 CE::T *entry = vf->NewEntry(ci, names, fps,
00456                                             value, model, kids,
00457                                             /*OUT*/ commonFP);
00458 
00459                 // add node to "vCacheLog"
00460                 this->mu.lock();
00461                 try {
00462                     this->LogCacheEntry(sourceFunc, pk, vf->PKEpoch(),
00463                                         ci, value, model, kids,
00464                                         names, fps);
00465                 } catch (...) { this->mu.unlock(); throw; }
00466                 this->mu.unlock();
00467 
00468                 // add entry to VPKFile
00469                 vf->AddEntry(NEW_CONSTR(Text, (sourceFunc)), entry, commonFP);
00470                 this->mu.lock();
00471                 this->state.newEntryCnt++;
00472                 this->state.newPklSize += entry->Value()->len;
00473                 this->mu.unlock();
00474                 res = CacheIntf::EntryAdded;
00475             }
00476             catch (DuplicateNames) {
00477                 res = CacheIntf::BadAddEntryArgs;
00478             }
00479             catch (...) { vf->mu.unlock(); throw; }
00480             vf->mu.unlock();
00481 
00482             // flush the MultiPKFile if necessary
00483             PKPrefix::T pfx(*pk);
00484             this->AddEntryToMPKFile(pfx,
00485               "MultiPKFile threshold exceeded on CacheS::AddEntry");
00486         }
00487     }
00488     Etp_CacheS_AddEntry(sizeof(*pk) + names->CheapSize()
00489       + fps->Size() + value->Size() + kids->Size());
00490     return res;
00491 } // CacheS::AddEntry
00492 
00493 // Flushing MPKFile methods ---------------------------------------------------
00494 
00495 void CacheS::AddEntryToMPKFile(const PKPrefix::T &pfx, const char *reason)
00496   throw ()
00497 /* REQUIRES Sup(LL) < SELF.mu */
00498 {
00499     VMultiPKFile *mpk;
00500     this->mu.lock();
00501     try {
00502         bool inTbl = this->mpkTbl->Get(pfx, /*OUT*/ mpk); assert(inTbl);
00503         mpk->IncEntries();
00504         mpk->UpdateEpoch(this->freeMPKFileEpoch);
00505         if (mpk->IsFull()) {
00506             this->FlushMPKFile(pfx, reason, /*block=*/ false);
00507         }
00508     } catch (...) { this->mu.unlock(); throw; }
00509     this->mu.unlock();
00510 }
00511 
00512 void CacheS::FlushMPKFile(const PKPrefix::T &pfx,
00513   const char *reason, bool block) throw ()
00514 /* REQUIRES Sup(LL) = SELF.mu */
00515 {
00516     FlushWorker *worker = this->NewFlushWorker(block);
00517     worker->Start(pfx, reason);
00518 }
00519 
00520 FlushWorker *CacheS::NewFlushWorker(bool block) throw ()
00521 /* REQUIRES Sup(LL) = SELF.mu */
00522 {
00523     FlushWorker *res;
00524     if (!block && this->idleFlushWorkers == (FlushWorkerList *)NULL) {
00525       // create new FlushWorker object
00526       res = NEW_CONSTR(FlushWorker, (this));
00527     } else {
00528         while (this->idleFlushWorkers == NULL) {
00529             assert(block); // we'll only get here if in blocking call
00530             this->availFlushWorker.wait(this->mu);
00531         }
00532         // get FlushWorker off avail list
00533         res = this->idleFlushWorkers->worker;
00534         this->idleFlushWorkers = this->idleFlushWorkers->next;
00535     }
00536     this->numActiveFlushWorkers++;
00537     return res;
00538 }
00539 
00540 void CacheS::RegisterIdleFlushWorker(FlushWorker *worker) throw ()
00541 /* REQUIRES Sup(LL) < SELF.mu */
00542 {
00543   FlushWorkerList *cons = NEW(FlushWorkerList);
00544   cons->worker = worker;
00545   this->mu.lock();
00546   cons->next = this->idleFlushWorkers;
00547   this->idleFlushWorkers = cons;
00548   this->numActiveFlushWorkers--;
00549   if (this->numActiveFlushWorkers == 0) {
00550     this->allFlushWorkersDone.signal();
00551   }
00552   this->mu.unlock();
00553   this->availFlushWorker.signal();
00554 }
00555 
00556 void *CacheS_DoFreeMPKFiles(void *arg) throw ()
00557 /* REQUIRES Sup(LL) < ((CacheS *)arg)->mu */
00558 {
00559     CacheS *cache = (CacheS *)arg;
00560     
00561     while (true) {
00562         MethodCnts cnt;
00563         if (!Config_FreeAggressively) {
00564             // snapshot cache load
00565             cache->mu.lock();
00566             cnt = cache->cnt;
00567             cache->mu.unlock();
00568         }
00569 
00570         // pause
00571         Basics::thread::pause(Config_FreePauseDur);
00572 
00573         // increment the flush epoch
00574         cache->mu.lock();
00575         int lastFreeMPKFileEpoch = cache->freeMPKFileEpoch;
00576         cache->freeMPKFileEpoch++;
00577 
00578         if (!Config_FreeAggressively) {
00579             // see if the cache is lightly loaded; if not, try again later
00580             if (cnt != cache->cnt) {
00581                 cache->mu.unlock();
00582                 continue;
00583             }
00584         }
00585 
00586         // collect list of MPKFiles that have not been accessed this epoch
00587         CacheS::MPKIter it(cache->mpkTbl);
00588         PKPrefixSeq toFlush(/*sizeHint=*/ cache->mpkTbl->Size() / 2);
00589         PKPrefixSeq toPurge(/*sizeHint=*/ cache->mpkTbl->Size() / 2);
00590         PKPrefix::T pfx; VMultiPKFile *mpk;
00591         while (it.Next(/*OUT*/ pfx, /*OUT*/ mpk)) {
00592             if (mpk->IsStale(lastFreeMPKFileEpoch)) {
00593                 toFlush.addhi(pfx);
00594             } else if (mpk->IsUnmodified()) {
00595                 toPurge.addhi(pfx);
00596             }
00597         }
00598         cache->mu.unlock();
00599 
00600         if (toFlush.size() > 0) {
00601             // pre debugging
00602             if (cache->debug >= CacheIntf::MPKFileFlush) {
00603                 Debug::Lock();
00604                 cout << Debug::Timestamp()
00605                      << "STARTED -- Flushing VMultiPKFiles" << endl;
00606                 cout << "  number = " << toFlush.size() << endl << endl;
00607                 Debug::Unlock();
00608             }
00609 
00610             cache->mu.lock();
00611             try {
00612                 // free the cache entries consumed by them
00613                 while (toFlush.size() > 0) {
00614                     cache->FlushMPKFile(toFlush.remlo(),
00615                       /*reason=*/ "CacheS_DoFreeMPKFiles called",
00616                       /*block=*/ true);
00617                 }
00618 
00619                 // wait for *all* flush workers to finish
00620                 while (cache->numActiveFlushWorkers > 0) {
00621                     cache->allFlushWorkersDone.wait(cache->mu);
00622                 }
00623             } catch (...) { cache->mu.unlock(); throw; }
00624             cache->mu.unlock();
00625 
00626             // post debugging
00627             if (cache->debug >= CacheIntf::MPKFileFlush) {
00628                 Debug::Lock();
00629                 cout << Debug::Timestamp()
00630                      << "FINISHED -- Flushing VMultiPKFiles" << endl << endl;
00631                 Debug::Unlock();
00632             }
00633 
00634             // clean the cache log
00635             cache->cacheLogMu.lock();
00636             try {
00637                 cache->TryCleanCacheLog(/*upper_bound=*/ -1,
00638                   /*reason=*/ "CacheS_DoFreeMPKFiles thread");
00639             } catch (...) { cache->cacheLogMu.unlock(); throw; }
00640             cache->cacheLogMu.unlock();
00641         }
00642 
00643         // Loop over any unmodified MPKFiles purging old entries from
00644         // memory.
00645         if(toPurge.size() > 0)
00646           {
00647             // pre debugging
00648             if (cache->debug >= CacheIntf::MPKFileFlush)
00649               {
00650                 Debug::Lock();
00651                 cout << Debug::Timestamp()
00652                      << "STARTED -- Freeing old entries from unmodified VMultiPKFiles" << endl;
00653                 cout << "  number = " << toPurge.size() << endl << endl;
00654                 Debug::Unlock();
00655               }
00656 
00657             EntryState dState; // initialized all 0
00658 
00659             while(toPurge.size() > 0)
00660               {
00661                 pfx = toPurge.remhi();
00662 
00663                 cache->mu.lock();
00664 
00665                 // Look up this VMultiPKFile
00666                 bool inTbl = cache->mpkTbl->Get(pfx, mpk);
00667                 assert(inTbl && (mpk != 0));
00668 
00669                 // Take a copy of its VPKFile table
00670                 SMultiPKFile::VPKFileMap vpks_in_mpk(&(mpk->VPKFileTbl()));
00671 
00672                 cache->mu.unlock();
00673 
00674                 // Loop over the VPKFiles in the VMultiPKFile
00675                 SMultiPKFile::VPKFileIter it(&vpks_in_mpk);
00676                 FP::Tag pk;
00677                 VPKFile *vpk;
00678                 while(it.Next(/*OUT*/ pk, /*OUT*/ vpk))
00679                   {
00680                     // Lock this one.
00681                     vpk->mu.lock();
00682 
00683                     // If this VPKFile is ready to have its warm
00684                     // entries purged,,,
00685                     if(vpk->ReadyForPurgeWarm(lastFreeMPKFileEpoch))
00686                       {
00687                         // As a size hint (for the size of the table
00688                         // after all entries are removed), use half
00689                         // its current size.
00690                         int sizeHint = vpk->OldEntries()->Size() / 2;
00691 
00692                         // Have this VPKFile purge all its old
00693                         // entries.
00694                         vpk->DeleteOldEntries(sizeHint, /*INOUT*/ dState);
00695                       }
00696 
00697                     // Unlock it now that we're done with it.
00698                     vpk->mu.unlock();
00699                   }
00700               }
00701 
00702             // Update the cache server state based on the entries we
00703             // just freed.
00704             cache->mu.lock();
00705             cache->state += dState;
00706             cache->mu.unlock();
00707 
00708             // post debugging
00709             if (cache->debug >= CacheIntf::MPKFileFlush)
00710               {
00711                 Debug::Lock();
00712                 cout << Debug::Timestamp()
00713                      << "FINISHED -- Freeing old entries from unmodified VMultiPKFiles" << endl
00714                      << "  entries freed = " << -(dState.oldEntryCnt) << endl
00715                      << "  pickled bytes freed = " << -(dState.oldPklSize)
00716                      << endl << endl;
00717                 Debug::Unlock();
00718               }
00719           }
00720 
00721         // Pre-debugging for the freeing step
00722         if (cache->debug >= CacheIntf::MPKFileFlush)
00723           {
00724             Debug::Lock();
00725             cout << Debug::Timestamp()
00726                  << "STARTED -- Freeing empty VPKFiles and VMultiPKFiles"
00727                  << endl << endl;
00728             Debug::Unlock();
00729           }
00730 
00731         // We need to iterate over all VPKFiles.  However, due to
00732         // locking order constraints, we can't hold cache->mu while
00733         // acquiring the individual VPKFile locks.  So, we take a copy
00734         // of the current table of VPKFiles and iterate over that.
00735         cache->mu.lock();
00736         CacheS::CacheMap vpk_tbl_copy(cache->cache, /*useGC=*/ true);
00737         cache->mu.unlock();
00738 
00739         // Look for VPKFiles to free.
00740         unsigned int num_vpks_freed = 0;
00741         CacheS::CacheMapIter vpk_it(&vpk_tbl_copy);
00742         FP::Tag pk;
00743         VPKFile *vpk;
00744         while(vpk_it.Next(/*OUT*/ pk, /*OUT*/ vpk))
00745           {
00746             vpk->mu.lock();
00747 
00748             // If this VPKFile hasn't been touched recently, and it
00749             // has neither new entries yet to be flushed to disk nor
00750             // old entries, then we may be able to remove it.
00751             if(vpk->ReadyForEviction(lastFreeMPKFileEpoch))
00752               {
00753                 cache->mu.lock();
00754 
00755                 // Find its VMultiPKFile.
00756                 PKPrefix::T pfx(pk);
00757                 VMultiPKFile *mpk;
00758                 bool inTbl = cache->mpkTbl->Get(pfx, mpk);
00759                 assert(inTbl && (mpk != 0));
00760 
00761                 // Last check: is this VMultiPKFile being re-written?
00762                 // If so, we can't evict the VPKFile.  Also, if it
00763                 // will be re-written soon, don't evict the VPKFile,
00764                 // as in that case it would be re-created in the near
00765                 // future.
00766                 if(!mpk->FlushRunning() && !mpk->FlushPending())
00767                   {
00768                     // Remove the VPKFile from the cache table of
00769                     // VPKFiles.
00770                     VPKFile *removed;
00771                     inTbl = cache->cache->Delete(pk, removed, false);
00772                     assert(inTbl && (removed == vpk));
00773 
00774                     // Remove it from its VMultiPKFile too.
00775                     inTbl = mpk->Delete(pk, removed);
00776                     assert(inTbl && (removed == vpk));
00777 
00778                     // Mark this VPKFile as evicted (to avoid a race with
00779                     // adding entries).
00780                     vpk->Evict();
00781 
00782                     // If the stable PKFile for this VPKFile is empty,
00783                     // and it has a non-zero namesEpoch, remember its
00784                     // namesEpoch to prevent its namesEpoch from
00785                     // decreasing.  (If there's an evaluator between a
00786                     // FreeVars and Lookup call for this PK, and the
00787                     // namesEpoch decreases, the Lookup call will
00788                     // return "bad lookup args".)
00789                     if(vpk->IsStableEmpty() && (vpk->NamesEpoch() != 0))
00790                       {
00791                         inTbl =
00792                           cache->evictedNamesEpochs.Put(pk,
00793                                                         vpk->NamesEpoch());
00794                         assert(!inTbl);
00795                       }
00796 
00797                     // After the end of this iteration, the
00798                     // VPKFile will not be referenced, and the
00799                     // garbage collector can reclaim its storage.
00800                     num_vpks_freed++;
00801                   }
00802 
00803                 cache->mu.unlock();
00804               }
00805             vpk->mu.unlock();
00806           }
00807 
00808         // Help out the garbage collector by removing our copy of
00809         // references to the VPKFiles.
00810         vpk_tbl_copy.Init();
00811         vpk = 0;
00812 
00813         // If we've removed any VPKFiles from the cache table, resize
00814         // it.  (We need to do this because we call Delete with
00815         // resize=false, which is necessary because we're iterating
00816         // over the table.)
00817         if(num_vpks_freed > 0)
00818           {
00819             cache->mu.lock();
00820             cache->cache->Resize();
00821             cache->mu.unlock();
00822           }
00823 
00824         // To free VMultiPKFiles, we must iterate over and modify the
00825         // cache->mpkTbl.  Therefore we hold the cache server lock the
00826         // entire time.
00827         cache->mu.lock();
00828 
00829         // Look for VMultiPKFiles to free.
00830         unsigned int num_vmpks_freed = 0;
00831         it.Reset();
00832         while (it.Next(/*OUT*/ pfx, /*OUT*/ mpk))
00833           {
00834             // If this VMultiPKFile has no VPKFiles (because we
00835             // removed its last one above), and there is no re-wriote
00836             // of it running now or pending in the near future, remove
00837             // it.
00838             if((mpk->NumVPKFiles() == 0) &&
00839                !mpk->FlushRunning() && !mpk->FlushPending())
00840               {
00841                 VMultiPKFile *removed;
00842                 bool inTbl = cache->mpkTbl->Delete(pfx, removed, false);
00843                 assert(inTbl && (removed == mpk));
00844 
00845                 num_vmpks_freed++;
00846               }
00847           }
00848 
00849         // Help the garbage collector.
00850         mpk = 0;
00851 
00852         // If we've removed any VMultiPKFiles from mpkTbl, resize it.
00853         if(num_vmpks_freed > 0)
00854           {
00855             cache->mpkTbl->Resize();
00856           }
00857 
00858         // Now that we're done freeing VPKFiles and VMultiPKFiles (and
00859         // thus modifying the cache's data structures), we can unlock
00860         // its mutex.
00861         cache->mu.unlock();
00862 
00863         // Post-debugging for the freeing step
00864         if (cache->debug >= CacheIntf::MPKFileFlush)
00865           {
00866             Debug::Lock();
00867             cout << Debug::Timestamp()
00868                  << "FINISHED -- Freeing empty VPKFiles and VMultiPKFiles"
00869                  << endl
00870                  << "  VPKFiles freed = " << num_vpks_freed << endl
00871                  << "  VMultiPKFiles freed = " << num_vmpks_freed
00872                  << endl << endl;
00873             Debug::Unlock();
00874           }
00875     }
00876     //assert(false); // not reached
00877     //return (void *)NULL;
00878 } // CacheS_DoFreeMPKFiles
00879 
00880 // Weeder client methods ------------------------------------------------------
00881 
00882 bool CacheS::WeederRecovering(SRPC *srpc, bool doneMarking) throw ()
00883 /* REQUIRES Sup(LL) < SELF.mu */
00884 {
00885     this->mu.lock();
00886 
00887     // check if a weed is already in progress
00888     if (this->weederSRPC != (SRPC *)NULL) {
00889         if (this->weederSRPC->alive()) {
00890             this->mu.unlock();
00891             return true;
00892         }
00893     }
00894 
00895     // cache this srpc connection to indicate that a weed is in progress
00896     this->weederSRPC = srpc;
00897 
00898     // resume lease expiration
00899     try {
00900         // if lease expiration is disabled, then "doneMarking" must be "false"
00901         assert(this->leases->ExpirationIsEnabled() || !doneMarking);
00902         this->leases->EnableExpiration();
00903 
00904         // test if we need to back out to the idle state
00905         if (!(this->hitFilter.IsEmpty()) && !this->deleting && !doneMarking) {
00906             this->ClearStableHitFilter();
00907         }
00908     } catch (...) { this->mu.unlock(); throw; }
00909     this->mu.unlock();
00910     return false;
00911 }
00912 
00913 BitVector* CacheS::StartMark(/*OUT*/ int &newLogVer) throw ()
00914 /* REQUIRES Sup(LL) < SELF.graphLogMu */
00915 {
00916     // main work
00917     BitVector *res;
00918     int currChkptVer;
00919     this->mu.lock();
00920     try {
00921         while (this->deleting) {
00922             this->notDeleting.wait(this->mu);
00923         }
00924         res = NEW_CONSTR(BitVector, (&(this->usedCIs)));
00925         this->leases->DisableExpiration();
00926         currChkptVer = this->graphLogChkptVer;
00927     } catch (...) { this->mu.unlock(); throw; }
00928     this->mu.unlock();
00929 
00930     // flush & checkpoint the GraphLog
00931     try {
00932         this->FlushGraphLog();
00933 
00934         // abort an old checkpoint if one is outstanding
00935         if (currChkptVer >= 0) this->AbortGraphLogChkpt();
00936 
00937         // start a new checkpoint
00938         newLogVer = this->ChkptGraphLog();
00939         assert(newLogVer >= 0);
00940     } catch (const FS::Failure &f) {
00941         cerr << "Fatal FS error: ";
00942         cerr << "while checkpointing \"graphLog\"; exiting..." << endl;
00943         cerr << f << endl;
00944         exit(1);
00945     } catch (const VestaLog::Error &err) {
00946         cerr << "Fatal VestaLog error: ";
00947         cerr << "while flushing \"graphLog\"" << endl
00948              << "  " << err.msg;
00949         if(err.r != 0)
00950           {
00951             cerr << " (errno = " << err.r << ")";
00952           }
00953         cerr << endl << "Exiting..." << endl;
00954         exit(1);
00955     }
00956 
00957     // record the outstanding graphLog checkpoint number
00958     this->mu.lock();
00959     this->graphLogChkptVer = newLogVer;
00960     this->mu.unlock();
00961 
00962     return res;
00963 }
00964 
00965 void CacheS::SetHitFilter(const BitVector &cis) throw ()
00966 /* REQUIRES Sup(LL) < SELF.mu */
00967 {
00968     this->mu.lock();
00969     try {
00970         assert(!this->deleting);
00971         this->SetStableHitFilter(cis);
00972     } catch (...) { this->mu.unlock(); throw; }
00973     this->mu.unlock();
00974 }
00975 
00976 BitVector* CacheS::GetLeases() throw ()
00977 /* REQUIRES Sup(LL) < SELF.mu */
00978 {
00979     BitVector *res;
00980     this->mu.lock();
00981     try {
00982         res = leases->LeaseSet();
00983     } catch (...) { this->mu.unlock(); throw; }
00984     this->mu.unlock();
00985     return res;
00986 }
00987 
00988 void CacheS::ResumeLeaseExp() throw ()
00989 /* REQUIRES Sup(LL) < SELF.mu */
00990 {
00991     this->mu.lock();
00992     try {
00993         leases->EnableExpiration();
00994     } catch (...) { this->mu.unlock(); throw; }
00995     this->mu.unlock();
00996 }
00997 
00998 int CacheS::EndMark(const BitVector &cis, const PKPrefix::List &pfxs) throw ()
00999 /* REQUIRES
01000      Sup(LL) < SELF.mu AND
01001      !cis.IsEmpty() AND
01002      (this->hitFilter.IsEmpty() OR (cis <= this->hitFilter))
01003 */
01004 {
01005     int chkptVer;
01006 
01007     // verify preconditions; record whether "hitFilter" is empty, "deleting"
01008     assert(!(cis.IsEmpty()));
01009     bool emptyHF, deleting2;
01010     this->mu.lock();
01011     try {
01012         emptyHF = this->hitFilter.IsEmpty();
01013         deleting2 = this->deleting;
01014         chkptVer = this->graphLogChkptVer;
01015         assert(emptyHF || (cis <= this->hitFilter));
01016     } catch (...) { this->mu.unlock(); throw; }
01017     this->mu.unlock();
01018 
01019     // record result if necessary
01020     if (chkptVer < 0) {
01021         this->graphLogMu.lock();
01022         try {
01023             chkptVer = this->graphLog->logVersion();
01024         } catch (const VestaLog::Error &err) {
01025             cerr << "VestaLog fatal error: ";
01026             cerr << "failed calling 'logVersion' in 'CacheS::EndMark" << endl;
01027             cerr << "  num = " << err.r << "; " << err.msg << endl;
01028             cerr << "Exiting..." << endl;
01029             exit(1);
01030         }
01031         this->graphLogMu.unlock();
01032         this->mu.lock();
01033         this->graphLogChkptVer = chkptVer;
01034         this->mu.unlock();
01035     }
01036 
01037     // only advance to state 2 if currently in state 1
01038     if (!emptyHF && !deleting2) {
01039         this->mu.lock();
01040         try {
01041             // set hitFilter
01042             this->SetStableHitFilter(cis);
01043 
01044             // set "pksToWeed", reset "pksWeeded"
01045             this->SetMPKsToWeed(pfxs);
01046             this->ResetWeededMPKs();
01047 
01048             // enable deletions
01049             this->SetStableDeleting(true);
01050             this->doDeleting.signal();
01051         } catch (...) { this->mu.unlock(); throw; }
01052         this->mu.unlock();
01053     }
01054     assert(chkptVer >= 0);
01055     return chkptVer;
01056 } // CacheS::EndMark
01057 
01058 bool CacheS::CommitChkpt(const Text &chkptFileName) throw ()
01059 /* REQUIRES Sup(LL) < SELF.graphLogMu */
01060 {
01061   // Construct the full path of the checkpoint file that the
01062   // weeder has provided us with.  It comes in relative to the
01063   // graph log root, so we need to prepend that to it.
01064   Text temp_chkptFileName(Config_GraphLogPath);
01065   temp_chkptFileName += '/';
01066   temp_chkptFileName += chkptFileName;
01067 
01068   // Get the graph log checkpoint verion we're about to commit.
01069   this->mu.lock();
01070   int chkptVer = this->graphLogChkptVer;
01071   this->mu.unlock();
01072 
01073   // If it's positive, then we are actually in the middle of
01074   // checkpointing it and we should continue.  Otherwise, this is some
01075   // kind of spurious call.
01076   if(chkptVer >= 0)
01077     {
01078       // The checkpoint filename should be relative, not absolute.  If
01079       // it looks like an absolute filename, reject it.
01080       if(chkptFileName[0] == '/')
01081         {
01082           return false;
01083         }
01084 
01085       // Determine the full filename that the checkpoint.should have.
01086       Text final_chkptFileName(Config_GraphLogPath);
01087       // WARNING: this is just asking for a buffer overrun.  Should be
01088       // OK for 64-bit ints though.
01089       char buff[26];
01090       int spres = sprintf(buff, "/%d.ckp", chkptVer);
01091       assert((spres >= 0) && (spres <= 25));
01092       final_chkptFileName += buff;
01093 
01094       // We expect the checkpoint to be the same as the real name but
01095       // with a suffix for uniqueness.  If it's not, reject it.
01096       if((temp_chkptFileName.Length() <= final_chkptFileName.Length()) ||
01097          (temp_chkptFileName.Sub(0, final_chkptFileName.Length())
01098           != final_chkptFileName))
01099         {
01100           return false;
01101         }
01102 
01103       // Check to make sure that the new checkpoint exists.  If it
01104       // doesn't, reject it.
01105       if(!FS::Exists(temp_chkptFileName))
01106         {
01107           return false;
01108         }
01109 
01110       // Rename the [pruned] checkpoint of the graph log to the name
01111       // it should really have.
01112       if(rename(temp_chkptFileName.cchars(),
01113                 final_chkptFileName.cchars()) != 0)
01114         {
01115           // If the rename fails, we treat it as a fatal error.  It
01116           // would be nice to handle it a little more robustly, but
01117           // this puts us in kind of a tough situation.  If this ever
01118           // actually comes up, it could leave the cache in a somewhat
01119           // invalid state.
01120           cerr << "CacheS::CommitChkpt fatal error: rename(2) failed!" << endl;
01121           cerr << "  errno = " << errno << endl;
01122           cerr << "  from  = " << temp_chkptFileName << endl;
01123           cerr << "  to    = " << final_chkptFileName << endl;
01124           assert(false);
01125         }
01126       else
01127         {
01128           // Now actually complete the checkpoint.
01129           this->graphLogMu.lock();
01130           try {
01131             this->graphLog->checkpointEnd();
01132             this->graphLog->prune(/*ckpkeep=*/ 1);
01133             this->mu.lock();
01134             this->graphLogChkptVer = -1;
01135             this->weederSRPC = NULL; // weeder is done; unlock
01136             this->mu.unlock();
01137           } catch (const VestaLog::Error &err) {
01138             cerr << "Fatal VestaLog error: ";
01139             cerr << "while committing \"graphLog\" checkpoint" << endl
01140                  << "  " << err.msg;
01141             if(err.r != 0)
01142               {
01143                 cerr << " (errno = " << err.r << ")";
01144               }
01145             cerr << endl << "Exiting..." << endl;
01146             exit(1);
01147           }
01148           this->graphLogMu.unlock();
01149 
01150           // Indicate that we have accepted the checkpoint.
01151           return true;
01152         }
01153     }
01154   // If we know that we're not going to use this checkpointed graph
01155   // log, delete it.
01156   else
01157     {
01158       try
01159         {
01160           FS::Delete(temp_chkptFileName);
01161         }
01162       catch(FS::Failure)
01163         {
01164           // We don't really care if that operation failed.
01165         }
01166 
01167       // Perhaps we should do something to tell the caller that we've
01168       // decided not to accept their checkpoint?
01169 
01170     }
01171 
01172   // If we make it here, we did not accept the checkpoint.  (Perhaps
01173   // we weren't expecting a graph log checkpoint right now?)  Indicate
01174   // to the caller that we have declined their checkpoint.
01175   return false;
01176 }
01177 
01178 void *CacheS_DoDeletions(void *arg) throw ()
01179 /* REQUIRES LL = 0 */
01180 {
01181     CacheS *cs = (CacheS *)arg; // unchecked NARROW
01182 
01183     while (true) {
01184         // wait until "deleting"
01185         cs->mu.lock();
01186         while (!(cs->deleting)) {
01187             cs->doDeleting.wait(cs->mu);
01188         }
01189 
01190         // pre debugging
01191         if (cs->debug >= CacheIntf::WeederOps) {
01192             Debug::Lock();
01193             cout << Debug::Timestamp() << "STARTED -- DoDeletions" << endl;
01194             cout << endl;
01195             Debug::Unlock();
01196         }
01197 
01198         // flush necessary "VPKFiles"
01199         int cnt = 0;
01200         while (cs->nextMPKToWeed < cs->mpksToWeed.len) {
01201             // Invariant: "cs->mu" is locked
01202 
01203             // flush the next VMPKFile
01204             int next = cs->nextMPKToWeed;
01205             cs->mu.unlock();
01206             cs->VToSCache(cs->mpksToWeed.pfx[next], &(cs->hitFilter));
01207             cs->mu.lock();
01208             cs->nextMPKToWeed++;
01209 
01210             // log every 10th one
01211             if (++cnt == 10) {
01212                 cs->AddToWeededMPKs(cnt);
01213                 cnt = 0;
01214             }
01215         }
01216         if (cnt > 0) cs->AddToWeededMPKs(cnt);
01217         cs->mu.unlock();
01218 
01219         // checkpoint the new value of "usedCIs"
01220         try {
01221             cs->ChkptUsedCIs(cs->hitFilter);
01222         } catch (const VestaLog::Error &err) {
01223             cerr << "Fatal VestaLog error: ";
01224             cerr << "while checkpointing \"usedCIs\"" << endl
01225                  << "  " << err.msg;
01226             if(err.r != 0)
01227               {
01228                 cerr << " (errno = " << err.r << ")";
01229               }
01230             cerr << endl << "Exiting..." << endl;
01231             exit(1);
01232         } catch (const FS::Failure &f) {
01233             cerr << "Fatal FS error: ";
01234             cerr << "while checkpointing \"usedCIs\"; exiting..." << endl;
01235             cerr << f << endl;
01236             exit(1);
01237         }
01238 
01239         // set "hitFilter = {}"
01240         cs->mu.lock();
01241         cs->ClearStableHitFilter();
01242 
01243         // done deleting
01244         cs->SetStableDeleting(false);
01245         cs->notDeleting.signal();
01246         cs->mu.unlock();
01247 
01248         // post debugging
01249         if (cs->debug >= CacheIntf::WeederOps) {
01250             Debug::Lock();
01251             cout << Debug::Timestamp() << "FINISHED -- DoDeletions" << endl;
01252             cout << endl;
01253             Debug::Unlock();
01254         }
01255 
01256         // clean the cache log
01257         cs->cacheLogMu.lock();
01258         try {
01259             cs->TryCleanCacheLog(/*upper_bound=*/ -1,
01260               /*reason=*/ "CacheS_DoDeletions thread");
01261         } catch (...) { cs->cacheLogMu.unlock(); throw; }
01262         cs->cacheLogMu.unlock();
01263     }
01264     //assert(false);  // not reached
01265     //return (void *)NULL;
01266 } // CacheS_DoDeletions
01267 
01268 /* Extra methods ----------------------------------------------------------- */
01269 
01270 void CacheS::Checkpoint(const FP::Tag &pkgVersion, Model::T model,
01271   CacheEntry::Indices *cis, bool done) throw ()
01272 /* REQUIRES Sup(LL) < SELF.chkptMu */
01273 {
01274     // queue the checkpoint work to be done by the checkpoint thread
01275     ChkptWorker *ckpt = this->QueueChkpt(pkgVersion, model, cis, done);
01276 
01277     // if call is synchronous, block until the thread completes
01278     if (done) {
01279         ckpt->WaitUntilDone();
01280         this->FinishChkpt(ckpt); // return object to avail list
01281     }
01282 }
01283 
01284 ChkptWorker *CacheS::QueueChkpt(const FP::Tag &pkgVersion, Model::T model,
01285   CacheEntry::Indices *cis, bool done) throw ()
01286 /* REQUIRES Sup(LL) < SELF.chkptMu */
01287 {
01288     ChkptWorker *res;
01289     this->chkptMu.lock();
01290     try {
01291         // get the next available ChkptWorker object
01292         if (this->availChkptWorkers != (ChkptWorker *)NULL) {
01293             res = this->availChkptWorkers;
01294             this->availChkptWorkers = res->next;
01295             res->Init(pkgVersion, model, cis, done);
01296         } else {
01297           res = NEW_CONSTR(ChkptWorker, (pkgVersion, model, cis, done));
01298         }
01299 
01300         // add it to the end of the queue
01301         ChkptWorker *last = (ChkptWorker *)NULL;
01302         ChkptWorker *curr = this->queuedChkptWorkers;
01303         while (curr != (ChkptWorker *)NULL) {
01304             last = curr; curr = curr->next;
01305         }
01306         if (last == (ChkptWorker *)NULL)
01307             this->queuedChkptWorkers = res;
01308         else
01309             last->next = res;
01310     } catch (...) { this->chkptMu.unlock(); throw; }
01311     this->chkptMu.unlock();
01312 
01313     // signal the worker thread that there is work to do
01314     this->waitingChkptWorker.signal();
01315     return res;
01316 }
01317 
01318 void CacheS::FinishChkpt(ChkptWorker *cw) throw ()
01319 /* REQUIRES Sup(LL) < SELF.chkptMu */
01320 {
01321     this->chkptMu.lock();
01322     try {
01323         // return the ChkptWorker object to the avail list
01324         cw->Reset();
01325         cw->next = this->availChkptWorkers;
01326         this->availChkptWorkers = cw;
01327     } catch (...) { this->chkptMu.unlock(); throw; }
01328     this->chkptMu.unlock();
01329 }
01330 
01331 void *ChkptWorker::MainLoop(void *arg) throw ()
01332 /* REQUIRES Sup(LL) < CacheS::ciLogMu */
01333 {
01334     CacheS *cache = (CacheS *)arg;
01335     while (true) {
01336         // wait until there is more work to do
01337         cache->chkptMu.lock();
01338         while (cache->queuedChkptWorkers == (ChkptWorker *)NULL) {
01339             cache->waitingChkptWorker.wait(cache->chkptMu);
01340         }
01341         ChkptWorker *curr = cache->queuedChkptWorkers;
01342         // skip asynchronous checkpoints if more are pending
01343         while (!(curr->done) && curr->next != (ChkptWorker *)NULL) {
01344             // return skipped object to avail list
01345             ChkptWorker *avail = curr;
01346             curr = curr->next;
01347             avail->Reset();
01348             avail->next = cache->availChkptWorkers;
01349             cache->availChkptWorkers = avail;
01350         }
01351         assert(curr != (ChkptWorker *)NULL);
01352         cache->queuedChkptWorkers = curr->next;
01353         curr->next = (ChkptWorker *)NULL;
01354         cache->chkptMu.unlock();
01355 
01356         // do the work
01357         cache->DoCheckpoint(curr->pkgVersion, curr->model,
01358           curr->cis, curr->done);
01359 
01360         // finish
01361         if (curr->done) {
01362             // signal the waiting thread in the synchronous case
01363             curr->mu.lock();
01364             curr->chkptComplete = true;
01365             curr->mu.unlock();
01366             curr->isDone.signal();
01367         } else {
01368             // in the asynchronous case, return the worker to the avail list
01369             cache->FinishChkpt(curr);
01370         }
01371     }
01372     //assert(false);  // not reached
01373     //return (void *)NULL;
01374 } // ChkptWorker::MainLoop
01375 
01376 void CacheS::DoCheckpoint(const FP::Tag &pkgVersion, Model::T model,
01377   CacheEntry::Indices *cis, bool done) throw ()
01378 /* REQUIRES Sup(LL) < SELF.ciLogMu */
01379 {
01380   // flush all logs
01381   try {
01382     this->FlushCacheLog();
01383   }
01384   catch (const VestaLog::Error &err) {
01385     cerr << "Fatal VestaLog error: ";
01386     cerr << "while flushing \"cacheLog\"" << endl
01387          << "  " << err.msg;
01388     if(err.r != 0)
01389       {
01390         cerr << " (errno = " << err.r << ")";
01391       }
01392     cerr << endl << "Exiting..." << endl;
01393     exit(1);
01394   }
01395   
01396   // create root node
01397   GraphLog::Root root(pkgVersion, model, cis, done);
01398 
01399   int i;
01400   this->mu.lock();
01401 
01402   // check for necessary cache entry leases
01403   for(i = 0; i < cis->len; i++) {
01404     if(!this->leases->IsLeased(cis->index[i])) break;
01405   }
01406   this->mu.unlock();
01407   if(i < cis->len) {
01408     // exited loop prematurely
01409     Debug::Lock();
01410     cerr << Debug::Timestamp()\
01411          << "Checkpint rejected: ci " << cis->index[i] 
01412          << " is not leased." << endl << endl;
01413     // Print the full GraphLog root we're rejecting
01414     root.DebugFull(cerr);
01415     Debug::Unlock();
01416   }
01417   else {
01418     // debugging
01419     if (this->debug >= CacheIntf::LogFlush) {
01420       Debug::Lock();
01421       try {
01422         cout << Debug::Timestamp() << "Writing root to GraphLog:" << endl;
01423         root.Debug(cout);
01424         cout << endl;
01425       } 
01426       catch (...) { Debug::Unlock(); throw; }
01427       Debug::Unlock();
01428     }
01429     
01430     // write GraphLog root
01431     this->graphLogMu.lock();
01432     try {
01433       this->graphLog->start();
01434       root.Log(*(this->graphLog));
01435       this->graphLog->commit();
01436     }
01437     catch (const VestaLog::Error &err) {
01438       cerr << "Fatal VestaLog error: ";
01439       cerr << "writing root node to \"graphLog\"" << endl
01440            << "  " << err.msg;
01441       if(err.r != 0)
01442         {
01443           cerr << " (errno = " << err.r << ")";
01444         }
01445       cerr << endl << "Exiting..." << endl;
01446       exit(1);
01447     }
01448     catch (...) { this->graphLogMu.unlock(); throw; }
01449     this->graphLogMu.unlock();
01450   }
01451 } // CacheS::DoChekpoint
01452 
01453 void ChkptWorker::Init(const FP::Tag &pkgVersion, Model::T model,
01454   CacheEntry::Indices *cis, bool done) throw ()
01455 {
01456     this->next = (ChkptWorker *)NULL;
01457     this->pkgVersion = pkgVersion;
01458     this->model = model;
01459     this->cis = cis;
01460     this->done = done;
01461     this->chkptComplete = false;
01462 }
01463 
01464 void ChkptWorker::Reset() throw ()
01465 {
01466     this->cis = (CacheEntry::Indices *)NULL; // drop pointer on floor
01467 }
01468 
01469 void ChkptWorker::WaitUntilDone() throw ()
01470 /* REQUIRES Sup(LL) < SELF.mu */
01471 {
01472     this->mu.lock();
01473     while (!(this->chkptComplete)) {
01474         this->isDone.wait(this->mu);
01475     }
01476     this->mu.unlock();
01477 }
01478 
01479 void CacheS::FlushAll() throw ()
01480 /* REQUIRES 
01481      Sup(LL) < SELF.ciLogMu AND
01482      (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
01483 {
01484     // determine size of MultiPKFile table
01485     this->mu.lock();
01486     int mpkCnt = this->mpkTbl->Size();
01487     this->mu.unlock();
01488 
01489     // return if there is no work
01490     if (mpkCnt == 0) return;
01491 
01492     // save prefixes to flush with "this->mu" held
01493     PKPrefix::T *toFlush = NEW_PTRFREE_ARRAY(PKPrefix::T, mpkCnt);
01494     this->mu.lock();
01495     try {
01496         // copy the prefixes in "mpkTbl" into "toFlush"
01497         MPKIter it(this->mpkTbl);
01498         PKPrefix::T pfx; VMultiPKFile *mpk;
01499         int i;
01500         for (i = 0; it.Next(/*OUT*/ pfx, /*OUT*/ mpk); i++) {
01501             assert(i < mpkCnt);
01502             toFlush[i] = pfx;
01503         }
01504 
01505         // flush all the prefixes to the stable cache
01506         for (i = 0; i < mpkCnt; i++) {
01507             this->FlushMPKFile(toFlush[i],
01508               /*reason=*/ "CacheS::FlushAll called", /*block=*/ true);
01509         }
01510 
01511         // wait for *all* flush workers to finish
01512         while (this->numActiveFlushWorkers > 0) {
01513             this->allFlushWorkersDone.wait(this->mu);
01514         }
01515     } catch (...) { this->mu.unlock(); throw; }
01516     this->mu.unlock();
01517 
01518     // clean the cache log
01519     this->cacheLogMu.lock();
01520     try {
01521         this->TryCleanCacheLog(/*upper_bound=*/ -1,
01522           /*reason=*/ "CacheS::FlushAll called");
01523     } catch (...) { this->cacheLogMu.unlock(); throw; }
01524     this->cacheLogMu.unlock();
01525 } // CacheS::FlushAll
01526 
01527 void CacheS::GetCacheId(/*OUT*/ CacheId &id) throw ()
01528 /* REQUIRES Sup(LL) < SELF.mu */
01529 {
01530     char name[MAXHOSTNAMELEN];
01531     int res = gethostname(name, MAXHOSTNAMELEN);
01532     id.host = (res == 0) ? Text(name) : Text("<unknown host>");
01533     id.port = Config_Port;
01534     id.stableDir = Config_SCachePath;
01535     id.cacheVersion = VCacheVersion;
01536     id.intfVersion = CacheIntf::Version;
01537     this->mu.lock();
01538     id.startTime = this->startTime;
01539     this->mu.unlock();
01540 }
01541 
01542 void CacheS::GetCacheState(/*OUT*/ CacheState &state) throw ()
01543 /* REQUIRES Sup(LL) < SELF.mu */
01544 {
01545     unsigned long total, resident;
01546     OS::GetProcessSize(total, resident);
01547     state.virtualSize = total;
01548     state.physicalSize = resident;
01549 
01550     this->mu.lock();
01551     state.vmpkCnt = this->mpkTbl->Size();
01552     state.vpkCnt = this->cache->Size();
01553     state.entryCnt = this->entryCnt;
01554     state.cnt = this->cnt;
01555     state.s = this->state;
01556     state.hitFilterCnt = this->hitFilter.Cardinality();
01557     if (this->deleting) {
01558         state.delEntryCnt = state.hitFilterCnt;
01559         state.mpkWeedCnt = this->mpksToWeed.len - this->nextMPKToWeed;
01560     } else {
01561         state.delEntryCnt = 0;
01562         state.mpkWeedCnt = 0;
01563     }
01564     this->mu.unlock();
01565 }
01566 
01567 const FP::Tag &CacheS::GetCacheInstance()
01568      const
01569      throw ()
01570 /* Must only be called after the instance is initialized, as read
01571    access to the instance fingerprint is not protected by a mutex. */
01572 {
01573     return this->instanceFp;
01574 }
01575 
01576 bool CacheS::RenewLeases(CacheEntry::Indices *cis) throw ()
01577 /* REQUIRES Sup(LL) < SELF.mu */
01578 {
01579     bool res = true;
01580     this->mu.lock();
01581     try {
01582         for (int i = 0; i < cis->len; i++) {
01583             CacheEntry::Index ci = cis->index[i];
01584             if (this->usedCIs.Read(ci)) {
01585                 try {
01586                     this->leases->RenewLease(ci);
01587                 } catch (Leases::NoLease) {
01588                     res = false;
01589                 }
01590             } else {
01591                 res = false;
01592             }
01593         }
01594     } catch (...) { this->mu.unlock(); throw; }
01595     this->mu.unlock();
01596     return res;
01597 }
01598 
01599 void CacheS::VToSCache(const PKPrefix::T &pfx, const BitVector *toDelete)
01600   throw ()
01601 /* REQUIRES 
01602      Sup(LL) < SELF.ciLogMu AND
01603      (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
01604 {
01605     // see if the MultiPKFile needs to be rewritten
01606     VMultiPKFile *mpk = (VMultiPKFile *)NULL;
01607     this->mu.lock();
01608     try {
01609         if (!this->mpkTbl->Get(pfx, /*OUT*/ mpk)) {
01610             /* There are no volatile cache entries for this MultiPKFile, but
01611                we may still have to rewrite the MultiPKFile if deletions are
01612                pending. */
01613             if (toDelete != (BitVector *)NULL) {
01614               // create a new, empty VMultiPKfile for "pfx"
01615               mpk = NEW_CONSTR(VMultiPKFile, (pfx));
01616               bool inMPK = this->mpkTbl->Put(pfx, mpk); assert(!inMPK);
01617             }
01618         } else {
01619           assert((mpk != NULL) && (mpk->Prefix() == pfx));
01620         }
01621     } catch (...) { this->mu.unlock(); throw; }
01622 
01623     // return if there is no work to do
01624     if (mpk == (VMultiPKFile *)NULL)
01625       {
01626         this->mu.unlock();
01627         return;
01628       }
01629 
01630     // Acquire and exclusive lock to write the MPKFile.  This ensures
01631     // that this is the only thread writing it.
01632     if(mpk->LockForWrite(this->mu, toDelete))
01633       {
01634         this->mu.unlock();
01635 
01636         // pre debugging
01637         if (this->debug >= CacheIntf::MPKFileFlush) {
01638           Debug::Lock();
01639           cout << Debug::Timestamp() << "STARTED -- Flushing MPKFile" << endl;
01640           cout << "  prefix = " << pfx << endl;
01641           cout << endl;
01642           Debug::Unlock();
01643         }
01644 
01645         // actual work
01646         try {
01647 
01648           // Next we need to read the header of the stable MultiPKFile.
01649           // We do this to determine the set of PKs in the stable copy,
01650           // so that we can ensure that there is VPKFile for each.  (If
01651           // we leave any without VPKFiles, entries added by clients
01652           // during the reqrite may not be properly updated by the
01653           // changes made during the rewrite.)
01654           ifstream mpkfile_ifs;
01655           SMultiPKFileRep::Header *mpkfile_hdr;
01656           bool mpkFileExists =
01657             SMultiPKFile::PrepareForRewrite(pfx, mpkfile_ifs, mpkfile_hdr);
01658 
01659           // Now make sure we have VPKFiles
01660           for (unsigned int i = 0; i < mpkfile_hdr->num; i++)
01661             {
01662               FP::Tag *pki = mpkfile_hdr->pkSeq[i];
01663               assert(PKPrefix::T(*pki) == pfx);
01664 
01665               VPKFile *vpk;
01666               // (We could use CacheS::GetVPKFile here, but since we
01667               // already know which VMultiPKFile any new ones would
01668               // belong in, this is slightly more efficient.)
01669               this->mu.lock();
01670               if(!this->FindVPKFile(*pki, /*OUT*/ vpk))
01671                 {
01672                   // Install it in the cache and the MultiPKFile.
01673                   bool inCache = cache->Put(*pki, vpk); assert(!inCache);
01674                   bool inVMPK = mpk->Put(*pki, vpk); assert(!inVMPK);
01675                 }
01676               this->mu.unlock();
01677             }
01678 
01679           // Prepare to write the MPKFile, capturing all the VPKFiles
01680           // to be written and their current state.  Note that we must
01681           // do this *before* flushing the graph log, as new entry
01682           // additions may take place in parallel with us, and entries
01683           // must be written to the graph log before being written the
01684           // their MultiPKFiles.  (This may return false indicating
01685           // that nothing has changed and there's therefore no work to
01686           // do.)
01687           SMultiPKFile::VPKFileMap vpksToFlush;
01688           SMultiPKFile::ChkPtTbl vpkChkptTbl;
01689           if(mpk->ChkptForWrite(this->mu, toDelete, vpksToFlush, vpkChkptTbl))
01690             {
01691               // Before actually rewriting the MultiPKFile, flush the
01692               // GraphLog and UsedCIs log
01693               try
01694                 {
01695                   this->FlushGraphLog();
01696                 }
01697               // If anything goes wrong with flushing the graph log...
01698               catch(...)
01699                 {
01700                   // Release the writing lock for this MPK acquired by
01701                   // VMultiPKFile::LockForWrite
01702                   mpk->ReleaseWriteLock(this->mu);
01703 
01704                   // Close the old MultiPKFile if we opened one.
01705                   if(mpkFileExists) FS::Close(mpkfile_ifs);
01706 
01707                   throw;
01708                 }
01709 
01710               // rewrite the MultiPKFile corresponding to "mpk"
01711               EntryState dState; // initialized all 0
01712               mpk->ToSCache(this->mu,
01713 
01714                             // From SMultiPKFile::PrepareForRewrite
01715                             mpkFileExists, mpkfile_ifs, mpkfile_hdr,
01716 
01717                             // From VMultiPKFile::ChkptForRewrite
01718                             vpksToFlush, vpkChkptTbl,
01719 
01720                             toDelete, this->emptyPKLog,
01721                             /*INOUT*/ dState);
01722 
01723               // delete the "VMultiPKFile" in the cache if it is empty
01724               /*** NYI ***/
01725 
01726               // update "this->state" from "dState"
01727               this->mu.lock();
01728               this->state += dState;
01729               this->mu.unlock();
01730             }
01731           else
01732             {
01733               // Since we decided there were no changes to commit, close
01734               // the old MultiPKFile.
01735               if(mpkFileExists) FS::Close(mpkfile_ifs);
01736             }
01737         }
01738         catch (const VestaLog::Error &e) {
01739           // fatal error
01740           cerr << "VestaLog::Error:" << endl;
01741           cerr << "  " << e.msg;
01742           if (e.r != 0) cerr << ", errno = " << e.r;
01743           cerr << endl;
01744           assert(false);
01745         }
01746         catch (const FS::Failure &f) {
01747           // fatal error
01748           cerr << f;
01749           assert(false);
01750         }
01751         catch (const FS::EndOfFile &f) {
01752           // fatal error
01753           cerr << "CacheS::VToSCache: Unexpected end of file" << endl;
01754           assert(false);
01755         }
01756 
01757         // post debugging
01758         if (this->debug >= CacheIntf::MPKFileFlush) {
01759           Debug::Lock();
01760           cout << Debug::Timestamp() << "FINISHED -- Flushing MPKFile" << endl;
01761           cout << "  prefix = " << pfx << endl;
01762           cout << endl;
01763           Debug::Unlock();
01764         }
01765       }
01766     // If VMultiPKFile::LockForWrite indicates that there's nothing to
01767     // do, we still need to release CacheS.mu.
01768     else
01769       {
01770         this->mu.unlock();
01771       }
01772 } // CacheS::VToSCache
01773 
01774 void CacheS::Recover() throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
01775 /* REQUIRES LL = Empty */
01776 {
01777     this->RecoverCILog();
01778     this->RecoverDeleting();
01779     this->RecoverHitFilter();
01780     this->mu.lock();
01781     bool localDeleting;
01782     localDeleting = this->deleting;
01783     this->mu.unlock();
01784     if (localDeleting) {
01785         /* The list of MPKFiles to weed only needs to be recovered
01786            if we are in the process of deleting cache entries. */
01787         this->RecoverMPKsToWeed();
01788     }
01789     this->RecoverWeededMPKs();
01790     this->RecoverGraphLog();
01791     this->RecoverCacheLog();
01792 }
01793 
01794 // Stable variables -----------------------------------------------------------
01795 
01796 void CacheS::RecoverDeleting() throw (FS::Failure)
01797 /* REQUIRES LL = Empty */
01798 {
01799     ifstream ifs;
01800     bool delVal;  // for debugging
01801     try {
01802         FS::OpenReadOnly(Config_DeletingFile, /*OUT*/ ifs);
01803         this->mu.lock();
01804         try {
01805             FS::Read(ifs, (char *)(&(this->deleting)), sizeof(this->deleting));
01806             delVal = this->deleting;
01807         } catch (...) {
01808             this->mu.unlock();
01809             FS::Close(ifs);
01810             throw;
01811         }
01812         this->mu.unlock();
01813         FS::Close(ifs);
01814     }
01815     catch (FS::DoesNotExist) {
01816         this->mu.lock();
01817         delVal = this->deleting = false;
01818         this->mu.unlock();
01819     }
01820     catch (FS::EndOfFile) {
01821         // programming error
01822         assert(false);
01823     }
01824 
01825     // debugging
01826     if (this->debug >= CacheIntf::LogRecover) {
01827         Debug::Lock();
01828         cout << Debug::Timestamp() << "RECOVERED -- Deleting" << endl;
01829         cout << "  deleting = " << BoolName[delVal] << endl;
01830         cout << endl;
01831         Debug::Unlock();
01832     }
01833 }
01834 
01835 void CacheS::SetStableDeleting(bool del) throw ()
01836 /* REQUIRES Sup(LL) = SELF.mu */
01837 {
01838     // set volatile "deleting" variable
01839     this->deleting = del;
01840 
01841     // write "deleting" atomically
01842     AtomicFile ofs;
01843     try {
01844         ofs.open(Config_DeletingFile.chars(), ios::out);
01845         if (!ofs) {
01846             throw (FS::Failure(Text("CacheS::SetStableDeleting"),
01847                                Config_DeletingFile));
01848         }
01849         FS::Write(ofs, (char *)(&(this->deleting)), sizeof(this->deleting));
01850         FS::Close(ofs); // swing file pointer atomically
01851     }
01852     catch (const FS::Failure &f) {
01853         cerr << "unexpected FS error saving \"deleting\"; exiting..." << endl;
01854         cerr << f << endl;
01855         exit(1);
01856     }
01857 }
01858 
01859 void CacheS::RecoverHitFilter() throw (FS::Failure)
01860 /* REQUIRES LL = Empty */
01861 {
01862     ifstream ifs;
01863     try {
01864         FS::OpenReadOnly(Config_HitFilterFile, /*OUT*/ ifs);
01865         this->mu.lock();
01866         try {
01867             this->hitFilter.Read(ifs);
01868         } catch (...) {
01869             this->mu.unlock();
01870             FS::Close(ifs);
01871             throw;
01872         }
01873         this->mu.unlock();
01874         FS::Close(ifs);
01875     }
01876     catch (FS::DoesNotExist) {
01877         /* SKIP -- use initial (empty) value for "hitFilter" */
01878     }
01879     catch (FS::EndOfFile) {
01880         // programming error
01881         assert(false);
01882     }
01883 
01884     // debugging
01885     if (this->debug >= CacheIntf::LogRecover) {
01886         this->mu.lock(); // required to protect read of "hitFilter"
01887         try {
01888             Debug::Lock();
01889             try {
01890                 cout << Debug::Timestamp() << "RECOVERED -- HitFilter" << endl;
01891                 cout << "  hitFilter = " << this->hitFilter << endl << endl;
01892               } catch (...) { Debug::Unlock(); throw; }
01893             Debug::Unlock();
01894         } catch (...) { this->mu.unlock(); throw; }
01895         this->mu.unlock();
01896     }
01897 }
01898 
01899 void CacheS::WriteHitFilter() throw ()
01900 /* REQUIRES Sup(LL) = SELF.mu */
01901 {
01902     /* For now, we do this atomically, although the spec says we do not
01903        have to. ***/
01904     AtomicFile ofs;
01905     try {
01906         ofs.open(Config_HitFilterFile.chars(), ios::out);
01907         if (ofs.fail()) {
01908             throw (FS::Failure(Text("CacheS::WriteHitFilter"),
01909                                Config_HitFilterFile));
01910         }
01911         this->hitFilter.Write(ofs);
01912         FS::Close(ofs); // swing file pointer atomically
01913     }
01914     catch (const FS::Failure &f) {
01915         cerr << "unexpected FS error saving \"hitFilter\"; exiting..." << endl;
01916         cerr << f;
01917         exit(1);
01918     }
01919 }
01920 
01921 void CacheS::ClearStableHitFilter() throw ()
01922 /* REQUIRES Sup(LL) = SELF.mu */
01923 {
01924     this->hitFilter.ResetAll(/*freeMem =*/ true);
01925     this->WriteHitFilter();
01926 }
01927 
01928 void CacheS::SetStableHitFilter(const BitVector &hf) throw ()
01929 /* REQUIRES Sup(LL) = SELF.mu */
01930 {
01931     this->hitFilter = hf;
01932     this->WriteHitFilter();
01933 }
01934 
01935 void CacheS::SetMPKsToWeed(const PKPrefix::List &pfxs) throw ()
01936 /* REQUIRES Sup(LL) = SELF.mu */
01937 {
01938     // set "mpksToWeed"
01939     this->mpksToWeed = pfxs;
01940 
01941     // write "mpksToWeed" to a stable log
01942     AtomicFile ofs;
01943     try {
01944         ofs.open(Config_MPKsToWeedFile.chars(), ios::out);
01945         if (ofs.fail()) {
01946             throw (FS::Failure(Text("CacheS::SetMPKsToWeed"),
01947                                Config_MPKsToWeedFile));
01948         }
01949         this->mpksToWeed.Write(ofs);
01950         FS::Close(ofs); // swing file pointer atomically
01951     }
01952     catch (const FS::Failure &f) {
01953         cerr << "unexpected FS error saving \"mpksToWeed\"; exiting..." <<endl;
01954         cerr << f;
01955         exit(1);
01956     }
01957 }
01958 
01959 void CacheS::RecoverMPKsToWeed() throw (FS::Failure)
01960 /* REQUIRES LL = Empty */
01961 {
01962     ifstream ifs;
01963     try {
01964         FS::OpenReadOnly(Config_MPKsToWeedFile, /*OUT*/ ifs);
01965         this->mu.lock();
01966         try {
01967             mpksToWeed.Read(ifs);
01968         } catch (...) {
01969             this->mu.unlock();
01970             FS::Close(ifs);
01971             throw;
01972         }
01973         this->mu.unlock();
01974         FS::Close(ifs);
01975     }
01976     catch (FS::DoesNotExist) {
01977         /* SKIP -- use initial (empty) value for "mpksToWeed" */
01978     }
01979     catch (FS::EndOfFile) {
01980         // programming error
01981         assert(false);
01982     }
01983 
01984     // debugging
01985     if (this->debug >= CacheIntf::LogRecover) {
01986         this->mu.lock(); // required to protect read of "mpksToWeed"
01987         try {
01988             Debug::Lock();
01989             try {
01990                 cout << Debug::Timestamp() << "RECOVERED -- MPKsToWeed" <<endl;
01991                 if (this->mpksToWeed.len > 0) {
01992                     for (int i = 0; i < this->mpksToWeed.len; i++) {
01993                         cout << "  pfx[" << i << "] = "
01994                              << this->mpksToWeed.pfx[i] << endl;
01995                     }
01996                 } else {
01997                     cout << "  <<Empty>>" << endl;
01998                 }
01999                 cout << endl;
02000             } catch (...) { Debug::Unlock(); throw; }
02001             Debug::Unlock();
02002         } catch (...) { this->mu.unlock(); throw; }
02003         this->mu.unlock();
02004     }
02005 }
02006 
02007 static int CacheS_ReadLogVersion(const Text &dirName)
02008   throw (FS::Failure, FS::DoesNotExist)
02009 /* Read the file named "version" in the directory "dirName". If it exists,
02010    return the value of the first integer it contains (in ASCII). */
02011 {
02012     ifstream ifs;
02013     FS::OpenReadOnly(dirName + "/version", /*OUT*/ ifs);
02014     int res;
02015     ifs >> res;
02016     assert(ifs.good());
02017     FS::Close(ifs);
02018     return res;
02019 }
02020 
02021 void CacheS::ResetWeededMPKs() throw ()
02022 /* REQUIRES Sup(LL) = SELF.mu */
02023 {
02024     // reset the log recording which MPKs have been weeded
02025     this->nextMPKToWeed = 0;
02026     assert(this->weededMPKsLog != (VestaLog *)NULL);
02027     try {
02028         fstream *chkptFile = weededMPKsLog->checkpointBegin();
02029         // empty checkpoint; just start new log
02030         FS::Close(*chkptFile);
02031         this->weededMPKsLog->checkpointEnd();
02032         this->weededMPKsLog->prune(/*ckpkeep=*/ 1);
02033     } catch (const FS::Failure &f) {
02034         cerr << "Fatal FS error: ";
02035         cerr << "while checkpointing \"weededMPKsLog\"; exiting..." << endl;
02036         cerr << f << endl;
02037         exit(1);
02038     }
02039     catch (const VestaLog::Error &err) {
02040         cerr << "Fatal VestaLog error: ";
02041         cerr << "reseting \"weededMPKsLog\"" << endl
02042              << "  " << err.msg;
02043         if(err.r != 0)
02044           {
02045             cerr << " (errno = " << err.r << ")";
02046           }
02047         cerr << endl << "Exiting..." << endl;
02048         exit(1);
02049     }
02050 }
02051 
02052 void CacheS::AddToWeededMPKs(int num) throw ()
02053 /* REQUIRES Sup(LL) = SELF.mu /\ weededMPKsLog is in ``ready'' state*/
02054 {
02055     try {
02056         this->weededMPKsLog->start();
02057         this->weededMPKsLog->write((char *)(&num), sizeof(num));
02058         this->weededMPKsLog->commit();
02059     }
02060     catch (const VestaLog::Error &err) {
02061         cerr << "Fatal VestaLog error: ";
02062         cerr << "adding entry to \"weededMPKsLog\"" << endl
02063              << "  " << err.msg;
02064         if(err.r != 0)
02065           {
02066             cerr << " (errno = " << err.r << ")";
02067           }
02068         cerr << endl << "Exiting..." << endl;
02069         exit(1);
02070     }
02071 }
02072 
02073 void CacheS::RecoverWeededMPKs() throw (VestaLog::Eof, VestaLog::Error)
02074 /* REQUIRES LL = Empty */
02075 {
02076     this->mu.lock();
02077     try {
02078       // open log and initialize volatile values
02079       this->weededMPKsLog = NEW(VestaLog);
02080       this->nextMPKToWeed = 0;
02081       this->weededMPKsLog->open(Config_WeededLogPath.chars());
02082 
02083       // recover log
02084       do {
02085         while (!this->weededMPKsLog->eof()) {
02086           int delta;
02087           this->weededMPKsLog->readAll((char *)(&delta), sizeof(delta));
02088           this->nextMPKToWeed += delta;
02089         }
02090       } while (this->weededMPKsLog->nextLog());
02091       this->weededMPKsLog->loggingBegin();
02092     } catch (...) { this->mu.lock(); throw; }
02093     this->mu.unlock();
02094 
02095     // debugging
02096     if (this->debug >= CacheIntf::LogRecover) {
02097         this->mu.lock(); // required to protect read of "nextMPKToWeed"
02098         try {
02099             Debug::Lock();
02100             try {
02101                 cout << Debug::Timestamp() << "RECOVERED -- WeededLog" << endl;
02102                 if (this->nextMPKToWeed > 0) {
02103                     cout << "  Weeded MultiPKFiles = " << this->nextMPKToWeed;
02104                 } else {
02105                     cout << "  <<Empty>>";
02106                 }
02107                 cout << endl << endl;
02108             } catch (...) { Debug::Unlock(); throw; }
02109             Debug::Unlock();
02110         } catch (...) { this->mu.unlock(); throw; }
02111         this->mu.unlock();
02112     }
02113 }
02114 
02115 // CacheLog methods ---------------------------------------------------------
02116 
02117 void CacheS::LogCacheEntry(const Text& sourceFunc, FP::Tag *pk,
02118                            PKFile::Epoch pkEpoch,
02119                            CacheEntry::Index ci, VestaVal::T *value,
02120                            Model::T model, CacheEntry::Indices *kids,
02121                            FV::List *names, FP::List *fps)
02122   throw ()
02123 /* REQUIRES Sup(LL) = SELF.mu */
02124 {
02125     CacheLog::Entry *logEntry;
02126 
02127     // get new entry and fill it in
02128     if (this->vCacheAvail == (CacheLog::Entry *)NULL) {
02129       logEntry = NEW_CONSTR(CacheLog::Entry, (sourceFunc, pk, pkEpoch,
02130                                               ci, value, model, kids,
02131                                               names, fps));
02132     } else {
02133         logEntry = this->vCacheAvail;
02134         this->vCacheAvail = logEntry->next;
02135         // Note: Init resets next to NULL.
02136         logEntry->Init(sourceFunc, pk, pkEpoch,
02137                        ci, value, model, kids, names, fps);
02138     }
02139 
02140     // append new entry to end of list
02141     if (this->vCacheLogTail == (CacheLog::Entry *)NULL)
02142         this->vCacheLog = logEntry;
02143     else
02144         this->vCacheLogTail->next = logEntry;
02145     this->vCacheLogTail = logEntry;
02146 }
02147 
02148 void CacheS::FlushCacheLog() throw (VestaLog::Error)
02149 /* REQUIRES Sup(LL) < SELF.ciLogMu */
02150 {
02151     CacheLog::Entry *vLog, *curr;
02152 
02153     // capture "vCacheLog" in "vLog" local and reset "vCacheLog"
02154     this->mu.lock();
02155     vLog = this->vCacheLog;
02156     this->vCacheLog = this->vCacheLogTail = (CacheLog::Entry *)NULL;
02157     this->cacheLogFlushQ->Enqueue();
02158     this->mu.unlock();
02159 
02160     // flush lower-level logs
02161     this->FlushGraphLog();
02162 
02163     // make deriveds stable??
02164     /*** NYI ***/
02165 
02166     // return if there is nothing to flush
02167     if (vLog == (CacheLog::Entry *)NULL)
02168       {
02169         this->mu.lock();
02170         this->cacheLogFlushQ->Dequeue();
02171         this->mu.unlock();
02172         return;
02173       }
02174 
02175     // debugging start
02176     if (this->debug >= CacheIntf::LogFlush) {
02177         Debug::Lock();
02178         cout << Debug::Timestamp() << "STARTED -- Flushing CacheLog" << endl;
02179         cout << endl;
02180         Debug::Unlock();
02181     }
02182 
02183     // flush log
02184     CacheLog::Entry *last;
02185     int numFlushed = 0;
02186     this->cacheLogMu.lock();
02187     try {
02188         this->cacheLog->start();
02189         for (curr = vLog; curr != NULL; curr = curr->next) {
02190             last = curr;
02191             if (numFlushed % 10 == 0 && numFlushed > 0) {
02192                 /* Commit after every tenth log entry so the cache log
02193                    does not get very big between commit points. */
02194                 this->cacheLog->commit();
02195                 this->cacheLog->start();
02196             }
02197             curr->Log(*(this->cacheLog));
02198             numFlushed++;
02199             if (this->debug >= CacheIntf::LogFlushEntries) {
02200                 Debug::Lock();
02201                 curr->Debug(cout);
02202                 Debug::Unlock();
02203             }
02204             curr->Reset();
02205             // DrdReuse((caddr_t)curr, sizeof(*curr));
02206         }
02207         this->cacheLog->commit();
02208     
02209         // clean the cache log if it is too long
02210         this->cacheLogLen += numFlushed;
02211         this->TryCleanCacheLog(Config_MaxCacheLogCnt,
02212           /*reason=*/ "CacheS::FlushCacheLog called");
02213     } catch (...) { this->cacheLogMu.unlock(); throw; }
02214     this->cacheLogMu.unlock();
02215 
02216     // debugging end
02217     if (this->debug >= CacheIntf::LogFlushEntries) {
02218         Debug::Lock();
02219         cout << endl;
02220         Debug::Unlock();
02221     }
02222     if (this->debug >= CacheIntf::LogFlush) {
02223         Debug::Lock();
02224         cout << Debug::Timestamp() << "FINISHED -- Flushing CacheLog" << endl;
02225         cout << "  Entries flushed = " << numFlushed << endl << endl;
02226         Debug::Unlock();
02227     }
02228     
02229     // return "vLog" entries to avail list
02230     this->mu.lock();
02231     last->next = this->vCacheAvail;
02232     this->vCacheAvail = vLog;
02233     this->cacheLogFlushQ->Dequeue();
02234     this->mu.unlock();
02235 } // CacheS::FlushCacheLog
02236 
02237 void CacheS::CleanCacheLogEntries(RecoveryReader &rd, fstream &ofs,
02238   /*INOUT*/ EmptyPKLog::PKEpochTbl &pkEpochTbl,
02239   /*INOUT*/ int &oldCnt, /*INOUT*/ int &newCnt)
02240   throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02241 /* REQUIRES Sup(LL) < SELF.cacheLogMu */
02242 {
02243     while (!rd.eof()) {
02244         // read entry
02245         CacheLog::Entry logEntry(rd);
02246         FP::Tag *pk = logEntry.pk;          // alias for "logEntry.pk"
02247         PKFile::Epoch pkEpoch;
02248         oldCnt++;
02249         this->mu.lock();
02250         try {
02251             // find the pkEpoch of the on-disk SPKFile
02252             if (!pkEpochTbl.Get(*pk, /*OUT*/ pkEpoch)) {
02253                 // if not in the table, read the epoch from disk
02254                 pkEpoch = PKFileEpoch(*pk);
02255                 bool inTbl = pkEpochTbl.Put(*pk, pkEpoch); assert(!inTbl);
02256             }
02257 
02258             // if no stable PKFile, consult the emptyPKEpochTbl
02259             if (pkEpoch == 0) {
02260                 (void) this->emptyPKLog->GetEpoch0(*pk, /*OUT*/ pkEpoch);
02261             }
02262 
02263             // copy the entry if its "pkEpoch" is large enough
02264             if (logEntry.pkEpoch >= pkEpoch) {
02265                 logEntry.Write(ofs);
02266                 newCnt++;
02267             }
02268         } catch (...) { this->mu.unlock(); throw; }
02269         this->mu.unlock();
02270     }
02271 } // CacheS::CleanCacheLogEntries
02272 
02273 void CacheS::CleanCacheLog()
02274   throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02275 /* REQUIRES Sup(LL) < SELF.cacheLogMu */
02276 {
02277     // number of entries in old and new log (for debugging output only)
02278     int oldCnt = 0, newCnt = 0;
02279 
02280     // debugging start
02281     if (this->debug >= CacheIntf::LogFlush) {
02282         Debug::Lock();
02283         cout << Debug::Timestamp() << "STARTED -- CleanCacheLog" << endl;
02284         cout << endl;
02285         Debug::Unlock();
02286     }
02287 
02288     // checkpoint the current "cacheLog" and "emptyPKLog"
02289     /* Note: The "CheckpointBegin method of the emptyPKLog begins an empty
02290        checkpoint, and moves all old in-memory entries to a staging area 
02291        from which  they will be deleted when the checkpoint commits. */
02292     this->emptyPKLog->CheckpointBegin();
02293     fstream *cacheLogChkpt;
02294     int newLogVersion;
02295     this->cacheLogMu.lock();
02296     try {
02297         cacheLogChkpt = this->cacheLog->checkpointBegin();
02298         newLogVersion = this->cacheLog->logVersion();
02299     } catch (...) { this->cacheLogMu.unlock(); throw; }
02300     this->cacheLogMu.unlock();
02301 
02302     /* Next, we open a new read-only "VestaLog" on the current cache log.
02303        Since this log is local and will only be read by this thread,
02304        operations on it do not need to be protected by a lock. */
02305     RecoveryReader *rd;
02306     VestaLogSeq currCacheLogSeq(Config_CacheLogPath.chars());
02307     currCacheLogSeq.Open(/*startVer=*/ -1, /*readonly=*/ true);
02308 
02309     /* We keep a temporary table mapping PKs of existing SPKFiles to their
02310        corresponding "pkEpoch"s so that we do not have to go to disk more
02311        than once for each PK. This is a simple optimization to speed the
02312        process of cleaning the cache log. The table is discarded once the
02313        cacheLog has been cleaned. */ 
02314     EmptyPKLog::PKEpochTbl pkEpochTbl;
02315 
02316     // read the last valid checkpoint and log(s), copying to the checkpoint
02317     try {
02318         while ((rd = currCacheLogSeq.Next(newLogVersion)) != NULL) {
02319             this->CleanCacheLogEntries(*rd, *cacheLogChkpt,
02320               /*INOUT*/ pkEpochTbl, /*INOUT*/ oldCnt, /*INOUT*/ newCnt);
02321         }
02322     } catch (...) { currCacheLogSeq.Close(); throw; }
02323     currCacheLogSeq.Close();
02324 
02325     // close the checkpoint file being written
02326     FS::Close(*cacheLogChkpt);
02327 
02328     // commit the checkpoints in the proper order
02329     this->cacheLogMu.lock();
02330     try {
02331         this->cacheLog->checkpointEnd();
02332         this->cacheLog->prune(/*ckpkeep=*/ 1);
02333         this->emptyPKLog->CheckpointEnd();
02334     } catch (...) { this->cacheLogMu.unlock(); throw; }
02335     this->cacheLogMu.unlock();
02336 
02337     // debugging end
02338     if (this->debug >= CacheIntf::LogFlush) {
02339         Debug::Lock();
02340         cout << Debug::Timestamp() << "FINISHED -- CleanCacheLog" << endl;
02341         cout << "  old log size = " << oldCnt << endl;
02342         cout << "  new log size = " << newCnt << endl;
02343         cout << endl;
02344         Debug::Unlock();
02345     }
02346 } // CacheS::CleanCacheLog
02347 
02348 void* CacheS_CleanCacheLogWorker(void *arg) throw ()
02349 /* REQUIRES Empty(LL) */
02350 /* This is the apply function for a thread that is forked to clean
02351    the CacheLog. */
02352 {
02353     CleanWorker *cw = (CleanWorker *)arg;
02354     try {
02355         while (true) {
02356             // wait for work to do
02357             cw->mu.lock();
02358             while (!(cw->argsReady)) {
02359                 cw->workToDo.wait(cw->mu);
02360             }
02361             cw->mu.unlock();
02362 
02363             // do the work
02364             cw->cs->CleanCacheLog();
02365 
02366             // restore thread to avail list
02367             cw->Finish();
02368             cw->cs->RegisterIdleCleanWorker(cw);
02369         }
02370     }
02371     catch (const VestaLog::Error &err) {
02372         cerr << "VestaLog fatal error: ";
02373         cerr << "failed cleaning cache log; exiting..." << endl;
02374         cerr << "num = " << err.r << "; " << err.msg << endl;
02375         exit(1);
02376     }
02377     catch (VestaLog::Eof) {
02378         cerr << "VestaLog fatal error: ";
02379         cerr << "unexpected EOF cleaning cache log; exiting..." << endl;
02380         exit(1);
02381     }
02382     catch (const FS::Failure &f) {
02383         cerr << "FS fatal error: ";
02384         cerr << "unexpected FS error cleaning cache log; exiting..." << endl;
02385         cerr << f;
02386         exit(1);
02387     }
02388     // make compiler happy
02389     assert(false);
02390     return (void *)NULL;
02391 } // CacheS_CleanCacheLogWorker
02392 
02393 CleanWorker *CacheS::NewCleanWorker() throw ()
02394 /* REQUIRES Sup(LL) = SELF.cacheLogMu */
02395 {
02396     // only allow one clean worker at a time
02397     while (this->idleCleanWorker == NULL) {
02398         this->availCleanWorker.wait(this->cacheLogMu);
02399     }
02400     CleanWorker *res = this->idleCleanWorker;
02401     this->idleCleanWorker = NULL;
02402     return res;
02403 }
02404 
02405 void CacheS::RegisterIdleCleanWorker(CleanWorker *cw) throw ()
02406 /* REQUIRES Sup(LL) < SELF.cacheLogMu */
02407 {
02408     this->cacheLogMu.lock();
02409     assert(this->idleCleanWorker == NULL); // only one clean worker
02410     this->idleCleanWorker = cw;
02411     this->cacheLogMu.unlock();
02412     this->availCleanWorker.signal();
02413 }
02414 
02415 void CacheS::TryCleanCacheLog(int upper_bound, const char *reason) throw ()
02416 /* REQUIRES Sup(LL) = SELF.cacheLogMu */
02417 {
02418     // if log is too long, fork a thread to clean it
02419     if (this->cacheLogLen > upper_bound) {
02420         CleanWorker *worker = this->NewCleanWorker();
02421         this->cacheLogLen = 0;
02422         worker->Start(reason);
02423     }
02424 }
02425 
02426 void CacheS::RecoverEmptyPKLog(/*INOUT*/ bool &empty)
02427   throw (VestaLog::Error, VestaLog::Eof)
02428 /* REQUIRES Sup(LL) = SELF.mu */
02429 {
02430     while (!this->emptyPKLog->EndOfFile()) {
02431         // read the value from the log
02432         FP::Tag pk; PKFile::Epoch logPKEpoch;
02433         this->emptyPKLog->Read(/*OUT*/ pk, /*OUT*/ logPKEpoch);
02434         assert(logPKEpoch > 0);
02435 
02436         // print it (if necessary)
02437         if (this->debug >= CacheIntf::LogRecover) {
02438             Debug::Lock();
02439             cout << "  pk = " << pk;
02440             cout << ", pkEpoch = " << logPKEpoch << endl;
02441             empty = false;
02442             Debug::Unlock();
02443         }
02444     }
02445 } // CacheS::RecoverEmptyPKLog
02446 
02447 void CacheS::RecoverCacheLogEntries(RecoveryReader &rd,
02448   /*INOUT*/ EmptyPKLog::PKEpochTbl &pkEpochTbl, /*INOUT*/ bool &empty)
02449   throw (VestaLog::Error, VestaLog::Eof)
02450 /* REQUIRES Sup(LL) = SELF.cacheLogMu */
02451 {
02452     while (!rd.eof()) {
02453         // read entry
02454         CacheLog::Entry logEntry(rd);
02455         this->cacheLogLen++;
02456 
02457         // Note: you might think that it would be OK to supress replay
02458         // of entries on the list of pending deletions, but you's be
02459         // wrong.  This causes subtle bugs due to the way pkEpoch gets
02460         // updated and how entries get purged from the cache log.
02461 
02462         /* We only need to read in those cache entries that have
02463            not been flushed to stable PK files. We can tell if an
02464            entry is stale by comparing its "pkEpoch" to that of
02465            the epoch in the header of the stable PK file, and if
02466            necessary, in the emptyPKLog. Stale entries are ignored. */
02467         FP::Tag *pk = logEntry.pk;          // alias for "logEntry.pk"
02468         PKFile::Epoch pkEpoch; // epoch in stable cache or emptyPKLog
02469         this->mu.lock();
02470         try {
02471             /* Implementation note: We first consult the stable PKFile,
02472                and *then* the emptyPKLog (but only if no stable PKFile
02473                exists for this PK). This order is important because the
02474                emptyPKLog is committed before the SMultiPKFile is committed.
02475                If the cache server crashes after the former has been
02476                committed, but before the latter one has, there will be an
02477                entry in the emptyPKLog for a PKFile that was actually not
02478                deleted. So if a PKFile exists, we ignored the emptyPKLog. */
02479 
02480             // find the pkEpoch of the on-disk SPKFile
02481             if (!pkEpochTbl.Get(*pk, /*OUT*/ pkEpoch)) {
02482                 // if not in table, try reading epoch from disk
02483                 pkEpoch = this->PKFileEpoch(*pk);
02484                 bool inTbl = pkEpochTbl.Put(*pk, pkEpoch); assert(!inTbl);
02485             }
02486 
02487             // if there is no stable PKFile, consult the emptyPKLog
02488             if (pkEpoch == 0) {
02489                 (void) this->emptyPKLog->GetEpoch0(*pk, /*OUT*/ pkEpoch);
02490             }
02491         } catch (...) { this->mu.unlock(); throw; }
02492         this->mu.unlock();
02493 
02494         // process the entry only if its "pkEpoch" is large enough
02495         if (logEntry.pkEpoch >= pkEpoch) {
02496             // read header of stable PK file
02497             VPKFile *vf;
02498             this->mu.lock();
02499 
02500             // Make sure that the CI of this entry is in the used set.
02501             // Recovering a cache log entry for an unused CI indicates
02502             // a bug.
02503             if(!this->usedCIs.Read(logEntry.ci))
02504               {
02505                 Debug::Lock();
02506 
02507                 cerr << Debug::Timestamp()
02508                      << "INTERNAL ERROR: unused CI in cache log:" << endl
02509                      << "  pk           = " << *pk << endl
02510                      << "  ci           = " << logEntry.ci << endl
02511                      << " (Please report this as a bug.)" << endl;
02512 
02513                 Debug::Unlock();
02514                 abort();
02515               }
02516 
02517             (void)(this->GetVPKFile(*pk, /*OUT*/ vf));
02518             this->state.newEntryCnt++;
02519             this->state.newPklSize += logEntry.value->len;
02520             int currentFreeEpoch = this->freeMPKFileEpoch;
02521             this->mu.unlock();
02522 
02523             // release lock on cacheLog to read "vf"
02524             this->cacheLogMu.unlock();
02525             try {
02526                 // add entry to cache
02527                 vf->mu.lock();
02528                 // Note that this PKFile has been accessed.
02529                 vf->UpdateFreeEpoch(currentFreeEpoch);
02530                 try {
02531                   FP::Tag *commonFP;
02532                   CE::T *entry = vf->NewEntry(logEntry.ci,
02533                                               logEntry.names, logEntry.fps,
02534                                               logEntry.value,
02535                                               logEntry.model, logEntry.kids,
02536                                               /*OUT*/ commonFP);
02537 
02538                   vf->AddEntry(NEW_CONSTR(Text, (logEntry.sourceFunc)), entry,
02539                                commonFP, logEntry.pkEpoch);
02540                 } catch (...) { vf->mu.unlock(); throw; }
02541                 vf->mu.unlock();
02542 
02543                 // flush MPKFile if necessary
02544                 PKPrefix::T pfx(*pk);
02545                 this->AddEntryToMPKFile(pfx,
02546                   "MultiPKFile threshold exceeded during recovery");
02547             // re-acquire "cacheLogMu"
02548             } catch (...) { this->cacheLogMu.lock(); throw; }
02549             this->cacheLogMu.lock();
02550 
02551             // debugging output
02552             if (this->debug >= CacheIntf::LogRecover) {
02553                 Debug::Lock();
02554                 logEntry.Debug(cout);
02555                 empty = false;
02556                 Debug::Unlock();
02557             }
02558         } // if
02559     } // while
02560 } // CacheS::RecoverCacheLogEntries
02561 
02562 void CacheS::RecoverCacheLog()
02563   throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02564 /* REQUIRES LL = Empty */
02565 {
02566     // debugging start
02567     bool empty = true; // used only for debugging output
02568     if (this->debug >= CacheIntf::LogRecover) {
02569         Debug::Lock();
02570         cout << Debug::Timestamp() << "RECOVERING -- EmptyPKLog" << endl;
02571         Debug::Unlock();
02572     }
02573 
02574     // read EmptyPKLog into "this->emptyPKLog"'s internal pkEpoch table
02575     this->mu.lock();
02576     try {
02577         /* Note: The "EmptyPKLog" always has an empty checkpoint, so we
02578            don't have to call the log's "openCheckpoint" method to read
02579            it. We simply recover from the log files. */
02580         do {
02581             this->RecoverEmptyPKLog(/*INOUT*/ empty);
02582         } while (this->emptyPKLog->NextLog());
02583         this->emptyPKLog->EndRecovery();
02584     } catch (...) { this->mu.unlock(); throw; }
02585     this->mu.unlock();
02586 
02587     // debugging end / start
02588     if (this->debug >= CacheIntf::LogRecover) {
02589         Debug::Lock();
02590         if (empty) cout << "  <<Empty>>" << endl;
02591         cout << endl;
02592         cout << Debug::Timestamp() << "RECOVERING -- CacheLog" << endl;
02593         Debug::Unlock();
02594     }
02595 
02596     // open log and initialize volatile values
02597     this->mu.lock();
02598     this->cacheLog = NEW(VestaLog);
02599     this->cacheLogFlushQ = NEW_CONSTR(FlushQueue, (&(this->mu)));
02600     this->vCacheLog = this->vCacheAvail = (CacheLog::Entry *)NULL;
02601     this->vCacheLogTail = (CacheLog::Entry *)NULL;
02602     this->mu.unlock();
02603 
02604     /* We keep a temporary table mapping PKs of existing SPKFiles to their
02605        corresponding "pkEpoch"s so that we do not have to go to disk more
02606        than once for each PK. This is a simple optimization to speed recovery.
02607        The table is discarded once the cacheLog has been recovered. */
02608     EmptyPKLog::PKEpochTbl pkEpochTbl;
02609 
02610     // open the log file
02611     empty = true;
02612     this->cacheLogMu.lock();
02613     try {
02614         this->cacheLog->open(Config_CacheLogPath.chars());
02615 
02616         // recover from the checkpoint (if any)
02617         fstream *chkpt = this->cacheLog->openCheckpoint();
02618         if (chkpt != (fstream *)NULL) {
02619             RecoveryReader rd(chkpt);
02620             this->RecoverCacheLogEntries(rd,
02621               /*INOUT*/ pkEpochTbl, /*INOUT*/ empty);
02622             FS::Close(*chkpt);
02623         }
02624 
02625         // recover from log files
02626         do {
02627             RecoveryReader rd(this->cacheLog);
02628             this->RecoverCacheLogEntries(rd,
02629               /*INOUT*/ pkEpochTbl, /*INOUT*/ empty);
02630         } while (this->cacheLog->nextLog());
02631         this->cacheLog->loggingBegin(); // switch log to ``ready'' state
02632 
02633     } catch (...) { this->cacheLogMu.unlock(); throw; }
02634     this->cacheLogMu.unlock();
02635 
02636     // debugging end
02637     if (this->debug >= CacheIntf::LogRecover) {
02638         Debug::Lock();
02639         if (empty) cout << "  <<Empty>>" << endl;
02640         cout << endl;
02641         Debug::Unlock();
02642     }
02643 } // CacheS::RecoverCacheLog
02644 
02645 // GraphLog methods ---------------------------------------------------------
02646 
02647 void CacheS::LogGraphNode(CacheEntry::Index ci, FP::Tag *loc, Model::T model,
02648   CacheEntry::Indices *kids, Derived::Indices *refs) throw ()
02649 /* REQUIRES Sup(LL) = SELF.mu */
02650 {
02651     GraphLog::Node *logNode;
02652 
02653     // get new entry and fill it in
02654     if (this->vGraphNodeAvail == (GraphLog::Node *)NULL) {
02655       logNode = NEW_CONSTR(GraphLog::Node, (ci, loc, model, kids, refs));
02656     } else {
02657         logNode = this->vGraphNodeAvail;
02658         assert(logNode->kind == GraphLog::NodeKind);
02659         this->vGraphNodeAvail = logNode->next;
02660         this->vGraphNodeAvailLen--;
02661         logNode->Init(ci, loc, model, kids, refs);
02662     }
02663 
02664     // append new entry to end of list
02665     if (this->vGraphLogTail == (GraphLog::Node *)NULL)
02666         this->vGraphLog = logNode;
02667     else
02668         this->vGraphLogTail->next = logNode;
02669     this->vGraphLogTail = logNode;
02670 }
02671 
02672 void CacheS::FlushGraphLog() throw (VestaLog::Error)
02673 /* REQUIRES Sup(LL) < SELF.ciLogMu */
02674 {
02675     GraphLog::Node *vLog;
02676 
02677     // capture "vGraphLog" in "vLog" local
02678     this->mu.lock();
02679     vLog = this->vGraphLog;
02680     this->vGraphLog = this->vGraphLogTail = (GraphLog::Node *)NULL;
02681     this->graphLogFlushQ->Enqueue();
02682     this->mu.unlock();
02683 
02684     // flush lower-level logs
02685     this->FlushUsedCIs();
02686 
02687     // return if there is nothing to flush
02688     if (vLog == (GraphLog::Node *)NULL)
02689       {
02690         this->mu.lock();
02691         this->graphLogFlushQ->Dequeue();
02692         this->mu.unlock();
02693         return;
02694       }
02695 
02696     // debugging start
02697     if (this->debug >= CacheIntf::LogFlush) {
02698         Debug::Lock();
02699         cout << Debug::Timestamp() << "STARTED -- Flushing GraphLog" << endl;
02700         cout << endl;
02701         Debug::Unlock();
02702     }
02703 
02704     // flush log
02705     int numFlushed = 0;
02706     GraphLog::Node *last;
02707     this->graphLogMu.lock();
02708     try {
02709         this->graphLog->start();
02710         for (GraphLog::Node *curr = vLog; curr != NULL; curr = curr->next) {
02711             last = curr;
02712             if (numFlushed % 20 == 0 && numFlushed > 0) {
02713                 /* Commit after every 20th log entry so the graph log
02714                    does not get very big between commit points. */
02715                 this->graphLog->commit();
02716                 this->graphLog->start();
02717             }
02718             curr->Log(*(this->graphLog));
02719             numFlushed++;
02720             if (this->debug >= CacheIntf::LogFlushEntries) {
02721                 Debug::Lock();
02722                 curr->Debug(cout);
02723                 Debug::Unlock();
02724             }
02725             curr->Reset();
02726             // DrdReuse((caddr_t)curr, sizeof(*curr));
02727         }
02728         this->graphLog->commit();
02729     } catch (...) { this->graphLogMu.unlock(); throw; }
02730     this->graphLogMu.unlock();
02731 
02732     // debugging end
02733     if (this->debug >= CacheIntf::LogFlushEntries) {
02734         Debug::Lock();
02735         cout << endl;
02736         Debug::Unlock();
02737     }
02738     if (this->debug >= CacheIntf::LogFlush) {
02739         Debug::Lock();
02740         cout << Debug::Timestamp() << "FINISHED -- Flushing GraphLog" << endl;
02741         cout << "  Nodes flushed = " << numFlushed << endl;
02742         cout << endl;
02743         Debug::Unlock();
02744     }
02745 
02746     // return "vLog" entries to avail list
02747     this->mu.lock();
02748     last->next = this->vGraphNodeAvail;
02749     this->vGraphNodeAvail = vLog;
02750     this->vGraphNodeAvailLen += numFlushed;
02751     this->graphLogFlushQ->Dequeue();
02752     this->mu.unlock();
02753 }
02754 
02755 int CacheS::ChkptGraphLog() throw (FS::Failure, VestaLog::Error)
02756 /* REQUIRES Sup(LL) < SELF.graphLogMu */
02757 {
02758     int res;
02759     fstream *fs;
02760     this->graphLogMu.lock();
02761     try {
02762         fs = this->graphLog->checkpointBegin();
02763         FS::Close(*fs);
02764         res = this->graphLog->logVersion();
02765     } catch (...) { this->graphLogMu.unlock(); throw; }
02766     this->graphLogMu.unlock();
02767     return res;
02768 }
02769 
02770 void CacheS::AbortGraphLogChkpt() throw (VestaLog::Error)
02771 /* REQUIRES Sup(LL) < SELF.graphLogMu */
02772 {
02773     this->graphLogMu.lock();
02774     try {
02775         this->graphLog->checkpointAbort();
02776     } catch (...) { this->graphLogMu.unlock(); throw; }
02777     this->graphLogMu.unlock();
02778 }
02779 
02780 void CacheS::RecoverGraphLog()
02781   throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
02782 /* REQUIRES LL = Empty */
02783 /* This procedure ignores the current log, since it was already written to
02784    disk. However, it must advance the log's file pointer to the end of the log
02785    so appending will work correctly. */
02786 {
02787     // open log and initialize volatile values
02788     this->mu.lock();
02789     this->graphLog = NEW(VestaLog);
02790     this->graphLogFlushQ = NEW_CONSTR(FlushQueue, (&(this->mu)));
02791     this->vGraphLog = this->vGraphNodeAvail = (GraphLog::Node *)NULL;
02792     this->vGraphLogTail = (GraphLog::Node *)NULL;
02793     this->vGraphNodeAvailLen = 0;
02794     this->mu.unlock();
02795 
02796     this->graphLogMu.lock();
02797     try {
02798         // open the log file
02799         this->graphLog->open(Config_GraphLogPath.chars());
02800 
02801         /* Note: we do not even bother opening & reading the checkpoint
02802            file, since the cache server never *reads* the graphLog, it
02803            just appends to it. */
02804 
02805         // read log until EOF
02806         char c;
02807         do {
02808             while (!this->graphLog->eof()) this->graphLog->get(c);
02809         } while (this->graphLog->nextLog());
02810 
02811         // recover the checkpoint if one was in progress
02812         fstream *ofs = this->graphLog->checkpointResume(/*mode=*/ ios::out);
02813         if (ofs != (fstream *)NULL) {
02814             FS::Close(*ofs);
02815             this->mu.lock();
02816             this->graphLogChkptVer = this->graphLog->logVersion();
02817             this->mu.unlock();
02818         }
02819 
02820         // begin logging
02821         this->graphLog->loggingBegin();
02822     } catch (...) { this->graphLogMu.unlock(); throw; }
02823     this->graphLogMu.unlock();
02824 } // CacheS::RecoverGraphLog
02825 
02826 // UsedCI methods -----------------------------------------------------------
02827 
02828 void CacheS::LogCI(Intvl::Op op, CacheEntry::Index ci) throw ()
02829 /* REQUIRES Sup(LL) = SELF.mu */
02830 {
02831     if (this->vCILog != (Intvl::List *)NULL && this->vCILog->i.op == op
02832         && this->vCILog->i.hi + 1 == ci) {
02833         this->vCILog->i.hi = ci;
02834     } else {
02835         // set "il" to new "Intvl::List"
02836         Intvl::List *il;
02837         if (this->vCIAvail == (Intvl::List *)NULL) {
02838           il = NEW(Intvl::List);
02839         } else {
02840             il = this->vCIAvail;
02841             this->vCIAvail = il->next;
02842         }
02843 
02844         // fill it in and prepend it to "vCILog"
02845         il->i.op = op;
02846         il->i.lo = il->i.hi = ci;
02847         il->next = this->vCILog;
02848         this->vCILog = il;
02849     }
02850 }
02851 
02852 void CacheS::FlushUsedCIs() throw (VestaLog::Error)
02853 /* REQUIRES Sup(LL) < SELF.ciLogMu */
02854 {
02855     Intvl::List *vLog, *curr, *last;
02856 
02857     // capture "vCILog" in "vLog" local and reset "vCILog"
02858     this->mu.lock();
02859     vLog = this->vCILog;
02860     this->vCILog = (Intvl::List *)NULL;
02861     this->ciLogFlushQ->Enqueue();
02862     this->mu.unlock();
02863 
02864     if (vLog == (Intvl::List *)NULL)
02865       {
02866         this->mu.lock();
02867         this->ciLogFlushQ->Dequeue();
02868         this->mu.unlock();
02869 
02870         return;
02871       }
02872 
02873     // debugging start
02874     if (this->debug >= CacheIntf::LogFlush) {
02875         Debug::Lock();
02876         cout << Debug::Timestamp() << "STARTED -- Flushing Used CI's Log"
02877              << endl << endl;
02878         Debug::Unlock();
02879     }
02880 
02881     // flush the entries in "vLog"
02882     int numFlushed;
02883     this->ciLogMu.lock();
02884     try {
02885         last = this->FlushUsedCIsList(vLog, /*OUT*/ numFlushed);
02886     } catch (...) { this->ciLogMu.unlock(); throw; }
02887     ciLogMu.unlock();
02888 
02889     // debugging end
02890     if (this->debug >= CacheIntf::LogFlush) {
02891         Debug::Lock();
02892         cout << Debug::Timestamp() << "FINISHED -- Flushing Used CI's Log"
02893              << endl;
02894         cout << "  Intervals flushed = " << numFlushed << endl;
02895         cout << endl;
02896         Debug::Unlock();
02897     }
02898 
02899     // return "vLog" entries to avail list
02900     this->mu.lock();
02901     last->next = this->vCIAvail;
02902     this->vCIAvail = vLog;
02903     this->ciLogFlushQ->Dequeue();
02904     this->mu.unlock();
02905 }
02906 
02907 Intvl::List *CacheS::FlushUsedCIsList(Intvl::List *vLog,
02908   /*OUT*/ int &numFlushed) throw (VestaLog::Error)
02909 /* REQUIRES SELF.ciLogMu IN LL */
02910 {
02911     // flush log
02912     assert(vLog != (Intvl::List *)NULL);
02913     Intvl::List *curr, *last;
02914     numFlushed = 0;
02915     this->ciLog->start();
02916     for (curr = vLog; curr != (Intvl::List *)NULL; curr = curr->next) {
02917         last = curr;
02918         curr->i.Log(*(this->ciLog));
02919         numFlushed++;
02920         if (this->debug >= CacheIntf::LogFlushEntries) {
02921             Debug::Lock();
02922             curr->i.Debug(cout);
02923             Debug::Unlock();
02924         }
02925         // DrdReuse((caddr_t)curr, sizeof(*curr));
02926     }
02927     this->ciLog->commit();
02928     if (this->debug >= CacheIntf::LogFlushEntries) {
02929         Debug::Lock();
02930         cout << endl;
02931         Debug::Unlock();
02932     }
02933     return last;
02934 }
02935 
02936 void CacheS::ChkptUsedCIs(const BitVector &del)
02937   throw (VestaLog::Error, FS::Failure)
02938 /* REQUIRES Sup(LL) < SELF.ciLogMu */
02939 /* NOTE: The locking in this method is a bit tricky. To guarantee that the
02940    checkpoint contains the exact set of CIs in use after subtracting off the
02941    entries "del", we have to hold the central lock "SELF.mu" while the CILog
02942    is flushed, while the entries "del" are subtracted from "this->usedCIs",
02943    and while a copy of that bit vector is made. This prevents any new
02944    entries from being created during that interval. Since the lock that
02945    protects the CILog ("SELF.ciLogMu") precedes "SELF.mu" in locking order,
02946    we have to first acquire that lock before acquiring "SELF.mu".
02947 
02948    So as not to hold "SELF.mu" for very long, we first flush the CILog using
02949    the normal "FlushUsedCIs" method, which only holds "SELF.mu" long enough to
02950    get a local copy of the pointer to the head of the (in-memory) list of
02951    CILog entries. Once the log has been flushed in this way, the hope is that
02952    it will be nearly empty when the log is flushed with "SELF.mu" held, so
02953    that central lock will not be held for very long. We then flush the log,
02954    subtract off the entries "del" and make a copy of "this->usedCIs", all
02955    with "SELF.mu" held. When the new CILog checkpoint is created, only
02956    "SELF.ciLogMu" needs to be held, since holding that lock will prevent
02957    other threads from writing to the CILog. */
02958 {
02959     BitVector *usedCIsCopy;  // copy of "this->usedCIs" after subtracting "del"
02960     fstream *chkptFS;        // file to which checkpoint is written
02961 
02962     // first, flush the "usedCIs" log
02963     this->FlushUsedCIs();
02964 
02965     // debugging start
02966     if (this->debug >= CacheIntf::LogFlush) {
02967         Debug::Lock();
02968         cout << Debug::Timestamp()<< "STARTED -- Flushing Used CI's Log"
02969              << endl << endl;
02970         Debug::Unlock();
02971     }
02972 
02973     // now checkpoint the new "usedCIs" value w/ appropriate locks
02974     int numFlushed;
02975     this->ciLogMu.lock();
02976     try {
02977         this->mu.lock();
02978         try {
02979 
02980         // see if there are any more entries to flush
02981         // this code is a lot like "FlushUsedCIs", but with diff locking
02982         Intvl::List *vLog = this->vCILog;
02983         this->vCILog = (Intvl::List *)NULL;
02984         if (vLog != (Intvl::List *)NULL) {
02985             // flush any remaining "usedCIs", but with "this->mu" held
02986             Intvl::List *last = this->FlushUsedCIsList(vLog,/*OUT*/numFlushed);
02987 
02988             // return "vLog" entries to avail list
02989             last->next = this->vCIAvail;
02990             this->vCIAvail = vLog;
02991         }
02992 
02993         // subtract off the entries in "del"
02994         this->usedCIs -= del;
02995         this->entryCnt = this->usedCIs.Cardinality();
02996 
02997         // make a copy of "this->usedCIs" (with "this->mu" held)
02998         usedCIsCopy = NEW_CONSTR(BitVector, (&(this->usedCIs)));
02999 
03000         } catch (...) { this->mu.unlock(); this->ciLogMu.unlock(); throw; }
03001         this->mu.unlock();
03002 
03003         // start a new checkpoint
03004         chkptFS = this->ciLog->checkpointBegin();
03005     } catch (...) { this->ciLogMu.unlock(); throw; }
03006     this->ciLogMu.unlock();
03007 
03008     // debugging end
03009     if (this->debug >= CacheIntf::LogFlush) {
03010         Debug::Lock();
03011         cout << Debug::Timestamp()<< "FINISHED -- Flushing Used CI's Log"
03012              << endl;
03013         cout << "  Intervals flushed = " << numFlushed << endl;
03014         cout << endl;
03015         Debug::Unlock();
03016     }
03017 
03018     // write "usedCIsCopy" to checkpoint file
03019     try {
03020         usedCIsCopy->Write(*chkptFS); // may be long-running operation
03021         FS::Close(*chkptFS);
03022     } catch (...) {
03023         // in event of any failure while writing checkpoint, abort it
03024         this->ciLogMu.lock();
03025         try {
03026             this->ciLog->checkpointAbort();
03027         } catch (...) { this->ciLogMu.unlock(); throw; }
03028         this->ciLogMu.unlock();
03029         throw;
03030     }
03031 
03032     // commit the checkpoint and prune the log
03033     this->ciLogMu.lock();
03034     try {
03035         this->ciLog->checkpointEnd();
03036         this->ciLog->prune(/*ckpkeep=*/ 1);
03037     } catch (...) { this->ciLogMu.unlock(); throw; }
03038     this->ciLogMu.unlock();
03039 }
03040 
03041 void CacheS::RecoverCILog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure)
03042 /* REQUIRES LL = Empty */
03043 {
03044     // debugging start
03045     bool empty;              // only set if "debug >= CacheIntf::LogRecover"
03046     if (this->debug >= CacheIntf::LogRecover) {
03047         Debug::Lock();
03048         cout << Debug::Timestamp() << "RECOVERING -- Used CI's Log" << endl;
03049         empty = true;
03050         Debug::Unlock();
03051     }
03052 
03053     // open log and initialize volatile values
03054     this->mu.lock();
03055     this->ciLog = NEW(VestaLog);
03056     this->ciLogFlushQ = NEW_CONSTR(FlushQueue, (&(this->mu)));
03057     this->usedCIs.ResetAll();
03058     this->entryCnt = 0;
03059     this->vCILog = this->vCIAvail = (Intvl::List *)NULL;
03060     this->mu.unlock();
03061 
03062     // open log and recover checkpoint (if any)
03063     fstream *chkpt = (fstream *)NULL;
03064     this->ciLogMu.lock();
03065     try {
03066         // open the log file
03067         this->ciLog->open(Config_CILogPath.chars());
03068 
03069         // recover from checkpoint
03070         chkpt = this->ciLog->openCheckpoint();
03071         if (chkpt != (fstream *)NULL) {
03072             RecoveryReader rd(chkpt);
03073             this->mu.lock();
03074             try {
03075                 this->usedCIs.Recover(rd);
03076             } catch (...) { this->mu.unlock(); throw; }
03077             this->mu.unlock();
03078             FS::Close(*chkpt);
03079         }
03080     } catch (...) { this->ciLogMu.unlock(); throw; }
03081     this->ciLogMu.unlock();
03082 
03083     // more debugging
03084     if (chkpt != (fstream *)NULL && this->debug >= CacheIntf::LogRecover) {
03085         Debug::Lock();
03086         cout << "  usedCIs = " << usedCIs << endl;
03087         empty = false;
03088         Debug::Unlock();
03089     }
03090 
03091     // recover from log file(s)
03092     this->ciLogMu.lock();
03093     try {
03094         // recover from logs
03095         RecoveryReader rd(this->ciLog);
03096         do {
03097             while (!rd.eof()) {
03098                 Intvl::T intvl(rd);
03099                 this->mu.lock();
03100                 try {
03101                     this->usedCIs.WriteInterval(intvl.lo, intvl.hi,
03102                       (intvl.op == Intvl::Add));
03103                 } catch (...) { this->mu.unlock(); throw; }
03104                 this->mu.unlock();
03105                 if (this->debug >= CacheIntf::LogRecover) {
03106                     intvl.Debug(cout);
03107                     empty = false;
03108                 }
03109             }
03110         } while (this->ciLog->nextLog());
03111         this->ciLog->loggingBegin();
03112     } catch (...) { this->ciLogMu.unlock(); throw; }
03113     this->ciLogMu.unlock();
03114 
03115     // update "this->entryCnt"
03116     this->mu.lock();
03117     this->entryCnt = this->usedCIs.Cardinality();
03118     this->mu.unlock();
03119 
03120     // debugging end
03121     if (this->debug >= CacheIntf::LogRecover) {
03122         Debug::Lock();
03123         if (empty) cout << "  <<Empty>>" << endl;
03124         cout << endl;
03125         Debug::Unlock();
03126     }
03127 }
03128 
03129 // CacheWorker ----------------------------------------------------------------
03130 
03131 CacheWorker::CacheWorker(CacheS *cs) throw ()
03132   : cs(cs), reason("idle")
03133 {
03134     // initially, the thread is not runable
03135     this->mu.lock();
03136     this->argsReady = false;
03137     this->mu.unlock();
03138 }
03139 
03140 void CacheWorker::Start(const char *reason) throw ()
03141 {
03142     // signal the thread to go
03143     this->mu.lock();
03144     this->argsReady = true;
03145     this->reason = reason;
03146     this->mu.unlock();
03147     this->workToDo.signal();
03148 }
03149 
03150 void CacheWorker::Finish() throw ()
03151 {
03152     this->mu.lock();
03153     this->argsReady = false;
03154     this->reason = "idle";
03155     this->mu.unlock();
03156 }
03157 
03158 FlushWorker::FlushWorker(CacheS *cs) throw ()
03159   : CacheWorker(cs)
03160 {
03161     // fork the thread that does the actual work
03162     this->th.fork_and_detach(CacheS_FlushWorker, (void *)this);
03163 }
03164 
03165 void* CacheS_FlushWorker(void *arg) throw ()
03166 /* REQUIRES LL = 0 */
03167 {
03168     FlushWorker *fw = (FlushWorker *)arg;
03169     while (true) {
03170         // wait for work to do
03171         fw->mu.lock();
03172         while (!(fw->argsReady)) {
03173             fw->workToDo.wait(fw->mu);
03174         }
03175         fw->mu.unlock();
03176 
03177         // do the work
03178         fw->cs->VToSCache(fw->pfx);
03179 
03180         // restore thread to avail list
03181         fw->Finish();
03182         fw->cs->RegisterIdleFlushWorker(fw);
03183     }
03184 
03185     //assert(false); // not reached
03186     //return (void *)NULL;
03187 }
03188 
03189 void FlushWorker::Start(const PKPrefix::T &pfx, const char *reason) throw ()
03190 {
03191     // save argument
03192     this->pfx = pfx;
03193 
03194     // invoke "Start" method of supertype
03195     ((CacheWorker *)this)->Start(reason);
03196 }
03197 
03198 CleanWorker::CleanWorker(CacheS *cs) throw ()
03199   : CacheWorker(cs)
03200 {
03201     // fork the thread that does the actual work
03202     this->th.fork_and_detach(CacheS_CleanCacheLogWorker, (void *)this);
03203 }

Generated on Mon May 8 00:48:34 2006 for Vesta by  doxygen 1.4.2