00001 // Copyright (C) 2001, Compaq Computer Corporation 00002 // 00003 // This file is part of Vesta. 00004 // 00005 // Vesta is free software; you can redistribute it and/or 00006 // modify it under the terms of the GNU Lesser General Public 00007 // License as published by the Free Software Foundation; either 00008 // version 2.1 of the License, or (at your option) any later version. 00009 // 00010 // Vesta is distributed in the hope that it will be useful, 00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00013 // Lesser General Public License for more details. 00014 // 00015 // You should have received a copy of the GNU Lesser General Public 00016 // License along with Vesta; if not, write to the Free Software 00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00018 00019 // Last modified on Mon May 23 22:58:16 EDT 2005 by ken@xorian.net 00020 // modified on Sat Feb 12 13:21:38 PST 2000 by mann 00021 // modified on Wed Feb 23 17:18:43 PST 2000 by mann 00022 // modified on Fri Feb 4 17:35:06 PST 2000 by heydon 00023 00024 // CacheS -- the server interface to the Vesta-2 cache server 00025 00026 /* The class "CacheS" provides the callback methods for a running instance of 00027 a Vesta-2 cache server. A new "CacheS" object is passed to the constructor 00028 of an "ExpCache" object. The "ExpCache" object listens for cache server 00029 SRPC connections. When a call is made by the client, the "ExpCache" object 00030 unmarshals the arguments and invokes the corresponding method of the 00031 "CacheS" object. */ 00032 00033 #ifndef _CACHES_H 00034 #define _CACHES_H 00035 00036 #include <time.h> 00037 #include <Basics.H> 00038 #include <FS.H> 00039 #include <VestaLog.H> 00040 #include <FP.H> 00041 00042 // cache-common 00043 #include <CacheIntf.H> 00044 #include <BitVector.H> 00045 #include <CacheState.H> 00046 #include <Model.H> 00047 #include <PKEpoch.H> 00048 #include <CacheIndex.H> 00049 #include <Derived.H> 00050 #include <FV.H> 00051 #include <VestaVal.H> 00052 #include <GraphLog.H> 00053 #include <PKPrefix.H> 00054 00055 // cache-server 00056 #include <EmptyPKLog.H> 00057 #include <Leases.H> 00058 #include <CacheEntry.H> 00059 #include <CacheLog.H> 00060 #include <Intvl.H> 00061 #include <VPKFile.H> 00062 #include <VMultiPKFile.H> 00063 #include "FlushQueue.H" 00064 00065 // forward declarations 00066 class FlushWorker; 00067 class CleanWorker; 00068 class ChkptWorker; 00069 00070 class CacheS { 00071 public: 00072 CacheS(CacheIntf::DebugLevel debug = CacheIntf::None, 00073 bool noHits = false) throw (); 00074 /* REQUIRES LL = Empty */ 00075 /* Initialize a new cache server object. Debugging information is printed 00076 to the standard output for all debugging messages having level at most 00077 "debug". If "noHits" is true, then the "Lookup" method will never 00078 return "Hit" as a result. */ 00079 00080 int Version() throw () { return CacheIntf::Version; } 00081 /* REQUIRES LL = Any */ 00082 /* Returns the version number of this cache server interface. */ 00083 00084 CacheIntf::DebugLevel DebugLevel() const throw () { return this->debug; } 00085 /* REQUIRES LL = Any */ 00086 /* Return the current debug level. */ 00087 00088 void FreeVariables(const FP::Tag& pk, /*OUT*/ VPKFile* &vf) throw (); 00089 /* REQUIRES Sup(LL) < SELF.mu) */ 00090 /* Set "vf" to the VPKFile for "pk". */ 00091 00092 CacheIntf::LookupRes 00093 Lookup(const FP::Tag &pk, FV::Epoch id, const FP::List &fps, 00094 /*OUT*/ CacheEntry::Index &ci, /*OUT*/ const VestaVal::T* &value) 00095 throw (); 00096 /* REQUIRES (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */ 00097 /* Look up the cache entry with primary key "pk", free variables set with 00098 epoch "id", and corresponding value fingerprints "fps". 00099 00100 In the event of a hit, "Hit" is returned, a lease is taken out (or 00101 renewed) on the matching cache entry, "ci" is set to the index of that 00102 entry, and "value" is set to its result value. For all other return 00103 values, "ci" and "value" are unchanged. 00104 00105 In the event of a cache miss, "Miss" is returned. In the event that "id" 00106 is an old epoch for "pk", "FVMismatch" is returned. In this case, the 00107 caller should call the "freeVariables" method again to get the latest 00108 epoch and names associated with "pk", and then try "Lookup" again. */ 00109 00110 CacheIntf::AddEntryRes 00111 AddEntry(FP::Tag *pk, FV::List *names, FP::List *fps, 00112 VestaVal::T *value, Model::T model, CacheEntry::Indices *kids, 00113 const Text& sourceFunc, /*OUT*/ CacheEntry::Index& ci) throw (); 00114 /* REQUIRES (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */ 00115 /* Add a new entry to the cache under the primary key "pk" corresponding 00116 to the evaluation of some function; "names" is the list of names that 00117 are free during the evaluation of the function, and "fps" are the 00118 fingerprints of the values associated with those names. If the lengths 00119 of these two lists do not agree, or if "names" contains any duplicate 00120 names, "CacheIntf::BadAddEntryArgs" is immediately returned; this 00121 return result indicates a programming error on the part of the client. 00122 00123 "value" is the value produced by the evaluation of the function, 00124 "model" is the model in which the function is defined, and "kids" are 00125 the indices of the cache entries corresponding to function evaluations 00126 performed directly on behalf of this evaluation. Finally, "sourceFunc" 00127 is a text denoting the source location of the function definition 00128 for the new cache entry. 00129 00130 In the event of success, "CacheIntf::EntryAdded" is returned, a lease 00131 is taken out on the new cache entry, and "ci" is set to the index of 00132 the new cache entry. 00133 00134 If any of the "kids" lacks a lease, then "CacheIntf::NoLease" is 00135 returned, and the value of "ci" is unchanged. */ 00136 00137 void Checkpoint(const FP::Tag &pkgVersion, Model::T model, 00138 CacheEntry::Indices *cis, bool done) throw (); 00139 /* REQUIRES Sup(LL) < SELF.chkptMu */ 00140 /* Make stable all cache entries and deriveds reachable from the entries 00141 corresponding to "cis". If the cache entries corresponding to "cis" 00142 all have leases, then also write a "Root" entry to the graph log that 00143 protects them from weeding. The arguments "pkgVersion" and "model" 00144 identify the top-level model on which the evaluator was invoked: 00145 "pkgVersion" is the fingerprint of the immutable directory in which 00146 the model resides, and "model" is the ShortId of the model file 00147 itself. The "done" argument should be true if the checkpoint is for 00148 a completed model evaluation, false if it represents an intermediate 00149 checkpoint. The checkpoint is done synchronously iff "done" is true. */ 00150 00151 void FlushAll() throw (); 00152 /* REQUIRES 00153 Sup(LL) < SELF.ciLogMu AND 00154 (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */ 00155 /* Flush all VMultiPKFiles in the cache, even if they don't have any new 00156 entries to be flushed. Before returning, this method also forks a 00157 thread to clean the cache log. */ 00158 00159 void GetCacheId(/*OUT*/ CacheId &id) throw (); 00160 /* REQUIRES Sup(LL) < SELF.mu */ 00161 /* Set "id" to the identifying information of the cache server. */ 00162 00163 void GetCacheState(/*OUT*/ CacheState &state) throw (); 00164 /* REQUIRES Sup(LL) < SELF.mu */ 00165 /* Set "state" to the current state information of the cache server. */ 00166 00167 const FP::Tag &GetCacheInstance() const throw (); 00168 /* Returns the unique instance fingerprint of the cache server. 00169 00170 Must only be called after the instance is initialized, as read 00171 access to the instance fingerprint is not protected by a mutex. */ 00172 00173 bool RenewLeases(CacheEntry::Indices *cis) throw (); 00174 /* REQUIRES Sup(LL) < SELF.mu */ 00175 /* Renew the leases on the cache entries with indexes "cis". Returns true 00176 if all the cache entries named by "cis" exist and all currently have 00177 valid leases; false otherwise. Even if false is returned, the leases of 00178 all existing cache entries with valid leases named in "cis" will have 00179 had their leases renewed. */ 00180 00181 bool WeederRecovering(SRPC *srpc, bool doneMarking) throw (); 00182 /* REQUIRES Sup(LL) < SELF.mu */ 00183 /* Indicate that the weeder is in the process of recovering, and set the 00184 cache server's state accordingly. "doneMarking" should be true iff the 00185 weeder's "weeded" set is non-empty, i.e., if the weeder had completed 00186 its mark phase but not the subsequent deletion phase. "srpc" should be 00187 a pointer to the SRPC connection making this call; the connection is 00188 used on subsequent calls to detect if a weed is currently in progress. 00189 Returns true iff there is already a weeder running, in which case no 00190 action is taken and the weeder should immediately abort. */ 00191 00192 BitVector* StartMark(/*OUT*/ int &newLogVer) throw (); 00193 /* REQUIRES Sup(LL) < SELF.graphLogMu */ 00194 /* Indicate that the weeder is starting its mark phase. This blocks if the 00195 cache server is still in the middle of doing deletions from a previous 00196 weed. Then it disables lease expiration, and flushes the graphLog. 00197 It then checkpoints the graph log, and sets "newLogVer" to the version 00198 number of the new graphLog file. The weeder should read all versions up 00199 to (but not including) "newLogVer". Finally, this method returns the 00200 bit vector of cache indices in use at the time of the call. */ 00201 00202 void SetHitFilter(const BitVector &cis) throw (); 00203 /* REQUIRES Sup(LL) < SELF.mu */ 00204 /* Set the cache server's hit filter set to "cis". The hit filter is the 00205 set of cache entries that may soon be deleted, so hits on them by the 00206 cache server's "Lookup" method will be disabled. This method requires 00207 that the cache server is not currently doing deletions from a previous 00208 weed. */ 00209 00210 BitVector* GetLeases() throw (); 00211 /* REQUIRES Sup(LL) < SELF.mu */ 00212 /* Return a newly allocated bit vector corresponding to the cache entries 00213 that are currently leased. */ 00214 00215 void ResumeLeaseExp() throw (); 00216 /* REQUIRES Sup(LL) < SELF.mu */ 00217 /* Re-enable lease expiration disabled by the "StartMark" method above. */ 00218 00219 int EndMark(const BitVector &cis, const PKPrefix::List &pfxs) throw (); 00220 /* REQUIRES Sup(LL) < SELF.mu */ 00221 /* Indicate that the weeder's mark phase has completed, and that "cis" are 00222 the indices of the cache entries to delete. The list "pfxs" contains 00223 the union of the prefixes of the PK's of all these entries. 00224 00225 The cache server's hit filter is set to "cis", and the cache server 00226 will fork a background thread to delete these entries. The number of 00227 the checkpoint file into which the weeder should write the ``clean'' 00228 version of the graphLog is returned. */ 00229 00230 bool CommitChkpt(const Text &chkptFileName) throw (); 00231 /* REQUIRES Sup(LL) < SELF.graphLogMu */ 00232 /* Commit the currently outstanding graphLog checkpoint. This is a no-op 00233 if there is no outstanding checkpoint. 00234 00235 The file to which the graphLog checkpoint was written, which is 00236 the "real" checkpoint name plus a suffix to make it unique, is 00237 passed as an argument. This file is renamed to take the place 00238 of the actual checkpoint. These extra steps guard against a 00239 case where two weeders are both trying to checkpoint the graph 00240 log of the same cache server. 00241 00242 If for any reason the cache server decides not to accept the 00243 checkpoint (if, for example, it doesn't think anyone should be 00244 doing a checkpoint right now), it will return false. 00245 Otherwise, it will return true. */ 00246 00247 private: 00248 // dictionary types 00249 typedef Table<FP::Tag,VPKFile*>::Default CacheMap; 00250 typedef Table<FP::Tag,VPKFile*>::Iterator CacheMapIter; 00251 typedef Table<PKPrefix::T,VMultiPKFile*>::Default MPKMap; 00252 typedef Table<PKPrefix::T,VMultiPKFile*>::Iterator MPKIter; 00253 typedef Table<FP::Tag,FV::Epoch>::Default NamesEpochTbl; 00254 typedef Table<FP::Tag,FV::Epoch>::Iterator NamesEpochIter; 00255 00256 // Data fields ---------------------------------------------------------- 00257 00258 // read-only after initialization 00259 CacheIntf::DebugLevel debug; 00260 bool noHits; 00261 00262 // shared 00263 Basics::mutex mu; // protects the following fields: 00264 Leases *leases; // cache entry leases 00265 CacheMap *cache; // cache: PK -> VPKFile 00266 MPKMap *mpkTbl; // mpkTbl: PKPrefix::T -> VMultiPKFile 00267 BitVector usedCIs; // set of used cache indices 00268 EmptyPKLog *emptyPKLog; // disk log of deleted PKFiles (see below) 00269 bool deleting; // deleting cache entries? (STABLE) 00270 SRPC *weederSRPC; // cached connection to latest weeder 00271 Basics::cond doDeleting; // "deleting" 00272 Basics::cond notDeleting; // "!deleting" 00273 BitVector hitFilter; // set of weeded cache indices (STABLE) 00274 PKPrefix::List mpksToWeed; // which MPKFiles need weeding (STABLE) 00275 int nextMPKToWeed; // index into mpksToWeed (STABLE) 00276 int graphLogChkptVer; // version of outstanding graphLog chkpt 00277 VestaLog *weededMPKsLog; // log of weeded VMultiPKfiles 00278 FlushQueue *cacheLogFlushQ; // queue for flushing cache log 00279 CacheLog::Entry *vCacheLog; // volatile cache log 00280 CacheLog::Entry *vCacheLogTail; // tail of volatile cache log 00281 CacheLog::Entry *vCacheAvail; // available vCacheLog entries 00282 FlushQueue *graphLogFlushQ; // queue for flushing graph log 00283 GraphLog::Node *vGraphLog; // volatile graph log 00284 GraphLog::Node *vGraphLogTail; // tail of volatile graph log 00285 GraphLog::Node *vGraphNodeAvail; // available vGraphLog "Node" entries 00286 int vGraphNodeAvailLen; // length of vGraphNodeAvail (debugging) 00287 FlushQueue *ciLogFlushQ; // queue for flushing cache log 00288 Intvl::List *vCILog, *vCIAvail; // log of usedCI's, available objects 00289 NamesEpochTbl evictedNamesEpochs;// The namesEpochs of evicted 00290 // empty PKFiles. 00291 00292 /* Note: the field "emptyPKLog" is actually read-only after initialization, 00293 but the fields of the object it points to are protected by "mu". */ 00294 00295 // field for bkgrnd VMultiPKFile deletion thread; also protected by "mu" 00296 int freeMPKFileEpoch; 00297 00298 // fields for supporting "GetCacheState" method; also protected by "mu" 00299 time_t startTime; // time at which cache came up 00300 int entryCnt; // = usedCIs.Cardinality() 00301 MethodCnts cnt; // # of calls on various cache methods 00302 EntryState state; // state of entries in memory 00303 00304 // Cache server instance identifier, used to prevent an evaluator 00305 // from continuing across cache server restarts; also protected by 00306 // "mu" 00307 FP::Tag instanceFp; 00308 00309 // fields for flushing MultiPKFiles; also protected by "mu" 00310 struct FlushWorkerList { 00311 FlushWorker *worker; 00312 FlushWorkerList *next; 00313 }; 00314 FlushWorkerList *idleFlushWorkers; // linked list of worker threads 00315 Basics::cond availFlushWorker; // == (idleFlushWorkers != NULL) 00316 int numActiveFlushWorkers; // number of allocated flushed workers 00317 Basics::cond allFlushWorkersDone; // == (numActiveFlushWorkers == 0) 00318 00319 // fields for queueing up checkpoint requests; protected by "mu" 00320 Basics::thread chkptTh; // thread object for thread 00321 Basics::mutex chkptMu; // protects following fields 00322 ChkptWorker *availChkptWorkers; // linked list of worker objects 00323 ChkptWorker *queuedChkptWorkers; // linked list of objects w/ work to do 00324 Basics::cond waitingChkptWorker; // == (queuedChkptWorkers != NULL) 00325 00326 /* The "cache" is a map from PK's to VPKFile objects. The pair 00327 "(pk, vpkfile)" is in "cache" iff "vpkfile" is the VPKFile for the 00328 primary key "pk". 00329 00330 The "mpkTbl" is a map from PK prefixes (i.e., names of MultiPKFiles) 00331 to "VMultiPKFile" objects. The pair "(pfx, vmpkfile)" appears in this 00332 table iff "vmpkfile" is the VMultiPKFile corresponding to prefix "pfx" 00333 and if the "VToSCache" method should be called to flush that 00334 MultiPKFile to the stable cache. The "vmpkfile" contains pointers to 00335 all of the VPKFile's stored in "cache". 00336 00337 In general, the cache maintains the following invariant: 00338 00339 pfx = PKPrefix::T(pk) 00340 (pk, vpkfile) IN cache <==> /\ (pfx, vmpkfile) IN mpkTbl 00341 /\ (pk, vpkfile) IN vmpkfile 00342 */ 00343 00344 // stable logs 00345 Basics::mutex cacheLogMu; // protects writing/reading "cacheLog" 00346 int cacheLogLen; // current length of stable cache log (*) 00347 CleanWorker *idleCleanWorker; // available worker thread 00348 Basics::cond availCleanWorker; // == (idleCleanWorker != NULL) 00349 VestaLog *cacheLog; // disk log of new entries 00350 Basics::mutex graphLogMu; // protects writing/reading "graphLog" 00351 VestaLog *graphLog; // disk log of new graph nodes 00352 Basics::mutex ciLogMu; // protects writing/reading "ciLogMu" 00353 VestaLog *ciLog; // disk log of usedCI's 00354 00355 /* (*) Actually, "cacheLogLen" records the number of entries added to the 00356 stable cache log *since it was last flushed*. */ 00357 00358 /* Locking levels: 00359 | ciLogMu < graphLogMu < cacheLogMu < mu 00360 */ 00361 00362 // Stable variables ------------------------------------------------------- 00363 00364 void RecoverDeleting() throw (FS::Failure); 00365 /* REQUIRES LL = Empty */ 00366 /* Initialize "deleting" from stable storage. */ 00367 00368 void SetStableDeleting(bool del) throw (); 00369 /* REQUIRES Sup(LL) = SELF.mu */ 00370 /* Set the "deleting" value to "del", and write its value persistently. */ 00371 00372 void RecoverHitFilter() throw (FS::Failure); 00373 /* REQUIRES LL = Empty */ 00374 /* Initialize "hitFilter" from stable storage. */ 00375 00376 void WriteHitFilter() throw (); 00377 /* REQUIRES Sup(LL) = SELF.mu */ 00378 /* Write the current value of "hitFilter" to stable storage. */ 00379 00380 void ClearStableHitFilter() throw (); 00381 /* REQUIRES Sup(LL) = SELF.mu */ 00382 /* Set the "hitFilter" value to the empty set, and write its value 00383 persistently. */ 00384 00385 void SetStableHitFilter(const BitVector &hf) throw (); 00386 /* REQUIRES Sup(LL) = SELF.mu */ 00387 /* Set the "hitFilter" value to "hf", and write its value persistently. */ 00388 00389 void SetMPKsToWeed(const PKPrefix::List &pfxs) throw (); 00390 /* REQUIRES Sup(LL) = SELF.mu */ 00391 /* Set "mpksToWeed" and write its value persistently. */ 00392 00393 void RecoverMPKsToWeed() throw (FS::Failure); 00394 /* REQUIRES LL = Empty */ 00395 /* Initialize "mpksToWeed" from stable storage. */ 00396 00397 void ResetWeededMPKs() throw (); 00398 /* REQUIRES Sup(LL) = SELF.mu */ 00399 /* Set "nextMPKToWeed" to 0, and reset the persistent log of the 00400 MultiPKFiles that have been weeded to the empty set. */ 00401 00402 void AddToWeededMPKs(int num) throw (); 00403 /* REQUIRES Sup(LL) = SELF.mu */ 00404 /* Atomically record that "num" more MultiPKFiles in the "mpksToWeed" 00405 list have been flushed via "VToSCache". */ 00406 00407 void RecoverWeededMPKs() throw (VestaLog::Eof, VestaLog::Error); 00408 /* REQUIRES LL = Empty */ 00409 /* Initialize "nextMPKToWeed" from stable storage. */ 00410 00411 // (Multi)PKFiles --------------------------------------------------------- 00412 00413 bool FindVPKFile(const FP::Tag &pk, /*OUT*/ VPKFile* &vpk) throw (); 00414 /* REQUIRES Sup(LL) = SELF.mu */ 00415 /* If there is a "VPKFile" in the cache under the primary key 00416 "pk", set "vpk" to point to it and return true. Otherwise, try 00417 creating a new "VPKFile" (with no "entries") from the stable 00418 cache and return false. If there is no "SPKFile" for "pk", 00419 create a new empty "VPKFile". */ 00420 00421 PKFile::Epoch PKFileEpoch(const FP::Tag &pk) throw (); 00422 /* REQUIRES Sup(LL) = SELF.mu */ 00423 /* Return the "pkEpoch" of the PKFile for "pk" in the volatile 00424 cache. If no PKFile exists for "pk", the result will be 0. */ 00425 00426 bool GetVPKFile(const FP::Tag &pk, /*OUT*/ VPKFile* &vpk) throw (); 00427 /* REQUIRES Sup(LL) = SELF.mu */ 00428 /* Like "FindVPKFile" above, but also adds the new "VPKFile" to 00429 the cache and sets "vpk" to point to it. Returns "true" iff a "VPKFile" 00430 already existed in the cache under "pk". This method is analogous to 00431 the "GetVF" function in "SVCache.spec". */ 00432 00433 void VToSCache(const PKPrefix::T &pfx, const BitVector *toDelete = NULL) 00434 throw (); 00435 /* REQUIRES 00436 Sup(LL) < SELF.ciLogMu AND 00437 (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */ 00438 /* If there are any new entries for the MultiPKFile with prefix "pfx", 00439 flush them to the stable cache. If "toDelete" is non-NULL, then any 00440 cache entries contained in the same MultiPKFile as "pfx" whose 00441 indices correspond to set bits in "toDelete" are deleted from the 00442 cache. */ 00443 00444 void AddEntryToMPKFile(const PKPrefix::T &pfx, const char *reason) 00445 throw (); 00446 /* REQUIRES Sup(LL) < SELF.mu */ 00447 /* Called to indicate that a new entry has been added to a VPKFile 00448 in the MultiPKFile with prefix "pfx". If heuristics indicate that the 00449 MultiPKFile needs flushing, the "FlushMPKFile" below is called with 00450 "reason" and with "block = false". */ 00451 00452 void FlushMPKFile(const PKPrefix::T &pfx, 00453 const char *reason, bool block=false) throw (); 00454 /* REQUIRES Sup(LL) = SELF.mu */ 00455 /* Fork a background thread to flush the MultiPKFile with prefix "pfx". 00456 The "reason" argument is for debugging purposes; it should indicate 00457 why the MultiPKFile is being flushed. If "block" is true, then this 00458 method will block until a FlushWorker thread becomes available if 00459 none currently are. */ 00460 00461 friend void* CacheS_DoFreeMPKFiles(void *arg) throw (); 00462 /* REQUIRES Sup(LL) < ((CacheS *)arg)->mu */ 00463 /* A background thread that attempts to free memory used by 00464 MultiPKFiles that have not been accessed recently. The "arg" 00465 is actually of type "CacheS *". */ 00466 00467 FlushWorker *NewFlushWorker(bool block = false) throw (); 00468 /* REQUIRES Sup(LL) = SELF.mu */ 00469 /* Return a new "FlushWorker" object that is ready to run to flush a 00470 MultiPKFile to disk. If "block" is "true", block until a previously- 00471 created flush worker is available to do the work. Once this method 00472 returns, you call the worker's "Start" method to start it running 00473 on a specified MPKFile. */ 00474 00475 void RegisterIdleFlushWorker(FlushWorker *fw) throw (); 00476 /* REQUIRES Sup(LL) < SELF.mu */ 00477 /* Return "fw" to the list of idle flush worker threads. This 00478 should be called by the worker when it has completed its job. */ 00479 00480 friend void* CacheS_FlushWorker(void *arg) throw (); 00481 /* REQUIRES Sup(LL) < SELF.mu */ 00482 /* The apply function for threads forked to flush a MultiPKFile. 00483 "arg" is actually of type "(FlushWorker *)". */ 00484 00485 // Checkpoint ----------------------------------------------------------- 00486 00487 void DoCheckpoint(const FP::Tag &pkgVersion, Model::T model, 00488 CacheEntry::Indices *cis, bool done) throw (); 00489 /* REQUIRES Sup(LL) < SELF.ciLogMu */ 00490 /* This method does the actual work of the "Checkpoint" method. */ 00491 00492 ChkptWorker *QueueChkpt(const FP::Tag &pkgVersion, Model::T model, 00493 CacheEntry::Indices *cis, bool done) throw (); 00494 /* REQUIRES Sup(LL) < SELF.chkptMu */ 00495 /* Add a new checkpoint object (whose fields are initialized 00496 to "pkgVersion", "model", "cis", and "done") to the queue of 00497 checkpoints to perform. */ 00498 00499 void FinishChkpt(ChkptWorker *cw) throw (); 00500 /* REQUIRES Sup(LL) < SELF.chkptMu */ 00501 /* Reset the checkpoint object "cw" and return it to the list of 00502 available checkpoint objects. */ 00503 00504 friend class ChkptWorker; 00505 00506 // Recovery ------------------------------------------------------------- 00507 00508 void Recover() throw (VestaLog::Error, VestaLog::Eof, FS::Failure); 00509 /* REQUIRES LL = Empty */ 00510 /* Read logs to restore in-memory cache. */ 00511 00512 friend void* CacheS_DoDeletions(void *cacheS) throw (); 00513 /* REQUIRES LL = 0 */ 00514 /* The body of the background thread for deleting cache entries. "cacheS" 00515 will actually be of type "CacheS*". */ 00516 00517 // Methods related to "cacheLog" ---------------------------------------- 00518 00519 void LogCacheEntry(const Text& sourceFunc, FP::Tag *pk, 00520 PKFile::Epoch pkEpoch, 00521 CacheEntry::Index ci, VestaVal::T *value, 00522 Model::T model, CacheEntry::Indices *kids, 00523 FV::List *names, FP::List *fps) throw (); 00524 /* REQUIRES Sup(LL) = SELF.mu */ 00525 /* Append the tuple "(sourceFunc, pk, pkEpoch, ci, value, model, 00526 kids, names, fps)" to the "vCacheLog". */ 00527 00528 void FlushCacheLog() throw (VestaLog::Error); 00529 /* REQUIRES Sup(LL) < SELF.ciLogMu */ 00530 /* Flush the "vCacheLog" to disk ("cacheLog"). This flushes the "vCILog" 00531 and "vGraphLog" first. At most one thread can be flushing the cache 00532 log at a time, so the caller will be blocked if another thread is 00533 currently flushing the log. */ 00534 00535 void CleanCacheLog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure); 00536 /* REQUIRES Sup(LL) < SELF.cacheLogMu */ 00537 /* Remove unnecessary entries from the cache log. The only entries that 00538 are preserved are those that are new entries in the volatile cache 00539 that have not been flushed to stable PKFiles. This method writes the 00540 preserved entries to the cache log checkpoint, and it writes an empty 00541 checkpoint for the emptyPKLog. */ 00542 00543 CleanWorker *NewCleanWorker() throw (); 00544 /* REQUIRES Sup(LL) = SELF.cacheLogMu */ 00545 /* Block until the background thread for cleaning the cache log is 00546 available; then return it. */ 00547 00548 void RegisterIdleCleanWorker(CleanWorker *cw) throw (); 00549 /* REQUIRES Sup(LL) < SELF.cacheLogMu */ 00550 /* Return "cw" to the list of idle clean worker threads. This 00551 should be called by the workern when it has completed its job. */ 00552 00553 friend void* CacheS_CleanCacheLogWorker(void *arg) throw (); 00554 /* REQUIRES Empty(LL) */ 00555 /* This is the apply function for a thread that is forked to clean 00556 the CacheLog. */ 00557 00558 void CleanCacheLogEntries(RecoveryReader &rd, std::fstream &ofs, 00559 /*INOUT*/ EmptyPKLog::PKEpochTbl &pkEpochTbl, 00560 /*INOUT*/ int &oldCnt, /*INOUT*/ int &newCnt) 00561 throw (VestaLog::Error, VestaLog::Eof, FS::Failure); 00562 /* REQUIRES Sup(LL) < SELF.cacheLogMu */ 00563 00564 void TryCleanCacheLog(int upper_bound, const char *reason) throw (); 00565 /* REQUIRES Sup(LL) = SELF.cacheLogMu */ 00566 /* If the number of entries written to the stable cache log since it was 00567 last cleaned exceeds "upper_bound", fork a thread to ``clean'' it. 00568 The "reason" is a string used for debugging to indicate why the log is 00569 being cleaned. */ 00570 00571 void RecoverEmptyPKLog(/*INOUT*/ bool &empty) 00572 throw (VestaLog::Error, VestaLog::Eof); 00573 /* REQUIRES Sup(LL) = SELF.mu */ 00574 /* Read "this->emptyPKLog", adding epochs to its associated table. 00575 Sets "empty" to "false" if any entries are processed. */ 00576 00577 void RecoverCacheLogEntries(RecoveryReader &rd, 00578 /*INOUT*/ EmptyPKLog::PKEpochTbl &pkEpochTbl, /*INOUT*/ bool &empty) 00579 throw (VestaLog::Error, VestaLog::Eof); 00580 /* REQUIRES Sup(LL) = SELF.cacheLogMu */ 00581 /* Read cache log entries from "rd" until end of file. "pkEpochTbl" is a 00582 table mapping PKs to their known (from disk) "pkEpoch"s. It is 00583 augmented to include any newly-discovered associations. If the 00584 debugging level is high enough, "empty" is set to "false" if any 00585 entries are read. */ 00586 00587 void RecoverCacheLog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure); 00588 /* REQUIRES LL = Empty */ 00589 00590 // Methods related to "graphLog" ---------------------------------------- 00591 00592 void LogGraphNode(CacheEntry::Index ci, FP::Tag *loc, Model::T model, 00593 CacheEntry::Indices *kids, Derived::Indices *refs) throw (); 00594 /* REQUIRES Sup(LL) = SELF.mu */ 00595 /* Append the quintuple "(ci, loc, model, kids, refs)" to the 00596 "vGraphLog". */ 00597 00598 void FlushGraphLog() throw (VestaLog::Error); 00599 /* REQUIRES Sup(LL) < SELF.ciLogMu */ 00600 /* Flush the "vGraphLog" to disk ("graphLog"). This flushes the "vCILog" 00601 first. At most one thread can be flushing the graph log at a time, so 00602 the caller will be blocked if the log is currently being flushed by 00603 another thread. */ 00604 00605 int ChkptGraphLog() throw (FS::Failure, VestaLog::Error); 00606 /* REQUIRES Sup(LL) < SELF.graphLogMu */ 00607 /* Checkpoint the graphLog and return the number of the new checkpoint 00608 and log file. */ 00609 00610 void AbortGraphLogChkpt() throw (VestaLog::Error); 00611 /* REQUIRES Sup(LL) < SELF.graphLogMu */ 00612 /* Abort the currently outstanding graphLog checkpoint. */ 00613 00614 void RecoverGraphLog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure); 00615 /* REQUIRES LL = Empty */ 00616 00617 // Methods related to the "ciLog" --------------------------------------- 00618 00619 void LogCI(Intvl::Op op, CacheEntry::Index ci) throw (); 00620 /* REQUIRES Sup(LL) = SELF.mu */ 00621 /* Append "ci" to the "vCILog"; "op" indicates if the index should be 00622 added or removed from the "usedCIs" set. */ 00623 00624 void FlushUsedCIs() throw (VestaLog::Error); 00625 /* REQUIRES Sup(LL) < SELF.ciLogMu */ 00626 /* Flush the "vCILog" to disk ("ciLog"). */ 00627 00628 Intvl::List *FlushUsedCIsList(Intvl::List *vLog, /*OUT*/ int &numFlushed) 00629 throw (VestaLog::Error); 00630 /* REQUIRES SELF.ciLogMu IN LL */ 00631 /* Flush the entries in "vLog" to the "usedCIs" log on disk, and return 00632 a pointer to the last node in the list. Requires "vLog != NULL". Sets 00633 "numFlushed" to the number of intervals flushed to the log. */ 00634 00635 void ChkptUsedCIs(const BitVector &del) 00636 throw (VestaLog::Error, FS::Failure); 00637 /* REQUIRES Sup(LL) < SELF.ciLogMu */ 00638 /* Atomically flush all volatile "usedCIs" entries to the "usedCIs" disk 00639 log, subtract the entries in "del" from "this->usedCIs", and checkpoint 00640 the "usedCIs" disk log so it contains the new value of "usedCIs". */ 00641 00642 void RecoverCILog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure); 00643 /* REQUIRES LL = Empty */ 00644 /* Set "usedCIs" from the "ciLog" on disk. */ 00645 00646 // make copy constructor inaccessible to clients 00647 CacheS(const CacheS&); 00648 }; 00649 00650 // Worker thread classes ------------------------------------------------------ 00651 00652 // base class for various kinds of worker threads 00653 class CacheWorker { 00654 public: 00655 CacheWorker(CacheS *cs) throw (); 00656 /* Initialize a new worker object ready to run on "cs". 00657 Call the associated "Start" method to actually run it 00658 on particular arguments. */ 00659 00660 void Start(const char *reason) throw (); 00661 /* Signal the thread to go. Subclasses may have their own Start 00662 methods to allow passing arguments to the thread. 00663 The "reason" argument is for debugging; it indicates 00664 the task on behalf of which the worker thread is acting. */ 00665 00666 void Finish() throw (); 00667 /* Indicate that the worker thread is idle. */ 00668 protected: 00669 // the thread and associated cache server 00670 Basics::thread th; 00671 CacheS *cs; 00672 00673 // synchronization fields 00674 Basics::mutex mu; 00675 bool argsReady; 00676 Basics::cond workToDo; 00677 const char *reason; // for debugging info only 00678 }; 00679 00680 // worker thread for flush a specified MultiPKFile to the stable cache 00681 class FlushWorker : public CacheWorker { 00682 public: 00683 FlushWorker(CacheS *cs) throw (); 00684 00685 void Start(const PKPrefix::T &pfx, const char *reason) throw (); 00686 /* Start the worker thread flushing the MultiPKFile 00687 named by "pfx". The "reason" argument indicates why 00688 this MultiPKFile is being flushed. */ 00689 00690 private: 00691 friend void* CacheS_FlushWorker(void *arg) throw (); 00692 /* REQUIRES LL = 0 */ 00693 /* Callback function for the cache server worker thread 00694 that flushes the MultiPKFile named by "arg->pfx". 00695 "arg" will actually be of type "FlushWorker *". */ 00696 00697 // thread args 00698 PKPrefix::T pfx; 00699 }; 00700 00701 // worker thread for cleaning the cache log 00702 class CleanWorker : public CacheWorker { 00703 public: 00704 CleanWorker(CacheS *cs) throw (); 00705 00706 private: 00707 friend void* CacheS_CleanCacheLogWorker(void *arg) throw (); 00708 /* REQUIRES LL = 0 */ 00709 /* Callback function for the cache server worker thread 00710 that cleans the cache log. "arg" will actually be of 00711 type "CleanWorker *". */ 00712 }; 00713 00714 // class for checkpointing an evaluation 00715 class ChkptWorker { 00716 /* As opposed to the other "*Worker" objects, there is not a separate 00717 thread per "ChkptWorker" object. Instead, the cache forks a single 00718 thread running the "ChkptWorker::MainLoop" method below. Each 00719 "ChkptWorker" object is simply a container of arguments on which 00720 the "CacheS::DoCheckpoint" method is invoked. */ 00721 public: 00722 // constructors/initializers 00723 ChkptWorker(const FP::Tag &pkgVersion, Model::T model, 00724 CacheEntry::Indices *cis, bool done) throw () 00725 { Init(pkgVersion, model, cis, done); } 00726 void Init(const FP::Tag &pkgVersion, Model::T model, 00727 CacheEntry::Indices *cis, bool done) throw (); 00728 00729 // reset object fields after using it 00730 void Reset() throw (); 00731 00732 // block until this checkpoint completes 00733 void WaitUntilDone() throw (); 00734 /* REQUIRES Sup(LL) < SELF.mu */ 00735 00736 static void* MainLoop(void *arg) throw (); 00737 /* REQUIRES Sup(LL) < CacheS::ciLogMu */ 00738 /* Repeatedly invoke "DoCheckpoint" whenever there is another worker 00739 in the "CacheS::queuedChkptWorkers" queue. "arg" is actually of 00740 type "CacheS". */ 00741 00742 // next object in linked list 00743 ChkptWorker *next; 00744 00745 private: 00746 // arguments 00747 FP::Tag pkgVersion; 00748 Model::T model; 00749 CacheEntry::Indices *cis; 00750 bool done; 00751 00752 // synchronization fields 00753 Basics::mutex mu; // protects following fields 00754 bool chkptComplete; // did the checkpoint finish? 00755 Basics::cond isDone; // == (chkptComplete) 00756 }; 00757 00758 #endif // _CACHES_H