Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

CacheS.H

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // Last modified on Mon May 23 22:58:16 EDT 2005 by ken@xorian.net  
00020 //      modified on Sat Feb 12 13:21:38 PST 2000 by mann  
00021 //      modified on Wed Feb 23 17:18:43 PST 2000 by mann  
00022 //      modified on Fri Feb  4 17:35:06 PST 2000 by heydon
00023 
00024 // CacheS -- the server interface to the Vesta-2 cache server
00025 
00026 /* The class "CacheS" provides the callback methods for a running instance of
00027    a Vesta-2 cache server. A new "CacheS" object is passed to the constructor
00028    of an "ExpCache" object. The "ExpCache" object listens for cache server
00029    SRPC connections. When a call is made by the client, the "ExpCache" object
00030    unmarshals the arguments and invokes the corresponding method of the
00031    "CacheS" object. */
00032 
00033 #ifndef _CACHES_H
00034 #define _CACHES_H
00035 
00036 #include <time.h>
00037 #include <Basics.H>
00038 #include <FS.H>
00039 #include <VestaLog.H>
00040 #include <FP.H>
00041 
00042 // cache-common
00043 #include <CacheIntf.H>
00044 #include <BitVector.H>
00045 #include <CacheState.H>
00046 #include <Model.H>
00047 #include <PKEpoch.H>
00048 #include <CacheIndex.H>
00049 #include <Derived.H>
00050 #include <FV.H>
00051 #include <VestaVal.H>
00052 #include <GraphLog.H>
00053 #include <PKPrefix.H>
00054 
00055 // cache-server
00056 #include <EmptyPKLog.H>
00057 #include <Leases.H>
00058 #include <CacheEntry.H>
00059 #include <CacheLog.H>
00060 #include <Intvl.H>
00061 #include <VPKFile.H>
00062 #include <VMultiPKFile.H>
00063 #include "FlushQueue.H"
00064 
00065 // forward declarations
00066 class FlushWorker;
00067 class CleanWorker;
00068 class ChkptWorker;
00069 
00070 class CacheS {
00071   public:
00072     CacheS(CacheIntf::DebugLevel debug = CacheIntf::None,
00073       bool noHits = false) throw ();
00074     /* REQUIRES LL = Empty */
00075     /* Initialize a new cache server object. Debugging information is printed
00076        to the standard output for all debugging messages having level at most
00077        "debug". If "noHits" is true, then the "Lookup" method will never
00078        return "Hit" as a result. */
00079 
00080     int Version() throw () { return CacheIntf::Version; }
00081     /* REQUIRES LL = Any */
00082     /* Returns the version number of this cache server interface. */
00083 
00084     CacheIntf::DebugLevel DebugLevel() const throw () { return this->debug; }
00085     /* REQUIRES LL = Any */
00086     /* Return the current debug level. */
00087 
00088     void FreeVariables(const FP::Tag& pk, /*OUT*/ VPKFile* &vf) throw ();
00089     /* REQUIRES Sup(LL) < SELF.mu) */
00090     /* Set "vf" to the VPKFile for "pk". */
00091 
00092     CacheIntf::LookupRes
00093     Lookup(const FP::Tag &pk, FV::Epoch id, const FP::List &fps,
00094       /*OUT*/ CacheEntry::Index &ci, /*OUT*/ const VestaVal::T* &value)
00095       throw ();
00096     /* REQUIRES (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
00097     /* Look up the cache entry with primary key "pk", free variables set with
00098        epoch "id", and corresponding value fingerprints "fps".
00099   
00100        In the event of a hit, "Hit" is returned, a lease is taken out (or
00101        renewed) on the matching cache entry, "ci" is set to the index of that
00102        entry, and "value" is set to its result value. For all other return
00103        values, "ci" and "value" are unchanged. 
00104 
00105        In the event of a cache miss, "Miss" is returned. In the event that "id"
00106        is an old epoch for "pk", "FVMismatch" is returned. In this case, the
00107        caller should call the "freeVariables" method again to get the latest
00108        epoch and names associated with "pk", and then try "Lookup" again. */
00109 
00110     CacheIntf::AddEntryRes
00111     AddEntry(FP::Tag *pk, FV::List *names, FP::List *fps,
00112       VestaVal::T *value, Model::T model, CacheEntry::Indices *kids,
00113       const Text& sourceFunc, /*OUT*/ CacheEntry::Index& ci) throw ();
00114     /* REQUIRES (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
00115     /* Add a new entry to the cache under the primary key "pk" corresponding
00116        to the evaluation of some function; "names" is the list of names that
00117        are free during the evaluation of the function, and "fps" are the
00118        fingerprints of the values associated with those names. If the lengths
00119        of these two lists do not agree, or if "names" contains any duplicate
00120        names, "CacheIntf::BadAddEntryArgs" is immediately returned; this
00121        return result indicates a programming error on the part of the client.
00122 
00123        "value" is the value produced by the evaluation of the function,
00124        "model" is the model in which the function is defined, and "kids" are
00125        the indices of the cache entries corresponding to function evaluations
00126        performed directly on behalf of this evaluation. Finally, "sourceFunc"
00127        is a text denoting the source location of the function definition
00128        for the new cache entry.
00129 
00130        In the event of success, "CacheIntf::EntryAdded" is returned, a lease
00131        is taken out on the new cache entry, and "ci" is set to the index of
00132        the new cache entry. 
00133 
00134        If any of the "kids" lacks a lease, then "CacheIntf::NoLease" is
00135        returned, and the value of "ci" is unchanged. */
00136 
00137     void Checkpoint(const FP::Tag &pkgVersion, Model::T model,
00138       CacheEntry::Indices *cis, bool done) throw ();
00139     /* REQUIRES Sup(LL) < SELF.chkptMu */
00140     /* Make stable all cache entries and deriveds reachable from the entries
00141        corresponding to "cis". If the cache entries corresponding to "cis"
00142        all have leases, then also write a "Root" entry to the graph log that
00143        protects them from weeding. The arguments "pkgVersion" and "model"
00144        identify the top-level model on which the evaluator was invoked:
00145        "pkgVersion" is the fingerprint of the immutable directory in which
00146        the model resides, and "model" is the ShortId of the model file
00147        itself. The "done" argument should be true if the checkpoint is for
00148        a completed model evaluation, false if it represents an intermediate
00149        checkpoint. The checkpoint is done synchronously iff "done" is true. */
00150 
00151     void FlushAll() throw ();
00152     /* REQUIRES
00153          Sup(LL) < SELF.ciLogMu AND
00154          (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
00155     /* Flush all VMultiPKFiles in the cache, even if they don't have any new
00156        entries to be flushed. Before returning, this method also forks a
00157        thread to clean the cache log. */
00158 
00159     void GetCacheId(/*OUT*/ CacheId &id) throw ();
00160     /* REQUIRES Sup(LL) < SELF.mu */
00161     /* Set "id" to the identifying information of the cache server. */
00162 
00163     void GetCacheState(/*OUT*/ CacheState &state) throw ();
00164     /* REQUIRES Sup(LL) < SELF.mu */
00165     /* Set "state" to the current state information of the cache server. */
00166 
00167     const FP::Tag &GetCacheInstance() const throw ();
00168     /* Returns the unique instance fingerprint of the cache server.
00169 
00170        Must only be called after the instance is initialized, as read
00171        access to the instance fingerprint is not protected by a mutex. */
00172 
00173     bool RenewLeases(CacheEntry::Indices *cis) throw ();
00174     /* REQUIRES Sup(LL) < SELF.mu */
00175     /* Renew the leases on the cache entries with indexes "cis". Returns true
00176        if all the cache entries named by "cis" exist and all currently have
00177        valid leases; false otherwise. Even if false is returned, the leases of
00178        all existing cache entries with valid leases named in "cis" will have
00179        had their leases renewed. */
00180 
00181     bool WeederRecovering(SRPC *srpc, bool doneMarking) throw ();
00182     /* REQUIRES Sup(LL) < SELF.mu */
00183     /* Indicate that the weeder is in the process of recovering, and set the
00184        cache server's state accordingly. "doneMarking" should be true iff the
00185        weeder's "weeded" set is non-empty, i.e., if the weeder had completed
00186        its mark phase but not the subsequent deletion phase. "srpc" should be
00187        a pointer to the SRPC connection making this call; the connection is
00188        used on subsequent calls to detect if a weed is currently in progress.
00189        Returns true iff there is already a weeder running, in which case no
00190        action is taken and the weeder should immediately abort. */
00191 
00192     BitVector* StartMark(/*OUT*/ int &newLogVer) throw ();
00193     /* REQUIRES Sup(LL) < SELF.graphLogMu */
00194     /* Indicate that the weeder is starting its mark phase. This blocks if the
00195        cache server is still in the middle of doing deletions from a previous
00196        weed. Then it disables lease expiration, and flushes the graphLog.
00197        It then checkpoints the graph log, and sets "newLogVer" to the version
00198        number of the new graphLog file. The weeder should read all versions up
00199        to (but not including) "newLogVer". Finally, this method returns the
00200        bit vector of cache indices in use at the time of the call. */
00201 
00202     void SetHitFilter(const BitVector &cis) throw ();
00203     /* REQUIRES Sup(LL) < SELF.mu */
00204     /* Set the cache server's hit filter set to "cis". The hit filter is the
00205        set of cache entries that may soon be deleted, so hits on them by the
00206        cache server's "Lookup" method will be disabled. This method requires
00207        that the cache server is not currently doing deletions from a previous
00208        weed. */
00209 
00210     BitVector* GetLeases() throw ();
00211     /* REQUIRES Sup(LL) < SELF.mu */
00212     /* Return a newly allocated bit vector corresponding to the cache entries
00213        that are currently leased. */
00214 
00215     void ResumeLeaseExp() throw ();
00216     /* REQUIRES Sup(LL) < SELF.mu */
00217     /* Re-enable lease expiration disabled by the "StartMark" method above. */
00218 
00219     int EndMark(const BitVector &cis, const PKPrefix::List &pfxs) throw ();
00220     /* REQUIRES Sup(LL) < SELF.mu */
00221     /* Indicate that the weeder's mark phase has completed, and that "cis" are
00222        the indices of the cache entries to delete. The list "pfxs" contains
00223        the union of the prefixes of the PK's of all these entries.
00224 
00225        The cache server's hit filter is set to "cis", and the cache server
00226        will fork a background thread to delete these  entries. The number of
00227        the checkpoint file into which the weeder should write the ``clean''
00228        version of the graphLog is returned. */
00229 
00230     bool CommitChkpt(const Text &chkptFileName) throw ();
00231     /* REQUIRES Sup(LL) < SELF.graphLogMu */
00232     /* Commit the currently outstanding graphLog checkpoint. This is a no-op
00233        if there is no outstanding checkpoint.
00234 
00235        The file to which the graphLog checkpoint was written, which is
00236        the "real" checkpoint name plus a suffix to make it unique, is
00237        passed as an argument.  This file is renamed to take the place
00238        of the actual checkpoint.  These extra steps guard against a
00239        case where two weeders are both trying to checkpoint the graph
00240        log of the same cache server.
00241 
00242        If for any reason the cache server decides not to accept the
00243        checkpoint (if, for example, it doesn't think anyone should be
00244        doing a checkpoint right now), it will return false.
00245        Otherwise, it will return true. */
00246 
00247   private:
00248     // dictionary types
00249     typedef Table<FP::Tag,VPKFile*>::Default CacheMap;
00250     typedef Table<FP::Tag,VPKFile*>::Iterator CacheMapIter;
00251     typedef Table<PKPrefix::T,VMultiPKFile*>::Default MPKMap;
00252     typedef Table<PKPrefix::T,VMultiPKFile*>::Iterator MPKIter;
00253     typedef Table<FP::Tag,FV::Epoch>::Default NamesEpochTbl;
00254     typedef Table<FP::Tag,FV::Epoch>::Iterator NamesEpochIter;
00255 
00256     // Data fields ----------------------------------------------------------
00257 
00258     // read-only after initialization
00259     CacheIntf::DebugLevel debug;
00260     bool noHits;
00261     
00262     // shared
00263     Basics::mutex mu;                // protects the following fields:
00264     Leases *leases;                  // cache entry leases
00265     CacheMap *cache;                 // cache: PK -> VPKFile
00266     MPKMap *mpkTbl;                  // mpkTbl: PKPrefix::T -> VMultiPKFile
00267     BitVector usedCIs;               // set of used cache indices
00268     EmptyPKLog *emptyPKLog;          // disk log of deleted PKFiles (see below)
00269     bool deleting;                   // deleting cache entries? (STABLE)
00270     SRPC *weederSRPC;                // cached connection to latest weeder
00271     Basics::cond doDeleting;         // "deleting"
00272     Basics::cond notDeleting;        // "!deleting"
00273     BitVector hitFilter;             // set of weeded cache indices (STABLE)
00274     PKPrefix::List mpksToWeed;       // which MPKFiles need weeding (STABLE)
00275     int nextMPKToWeed;               // index into mpksToWeed (STABLE)
00276     int graphLogChkptVer;            // version of outstanding graphLog chkpt
00277     VestaLog *weededMPKsLog;         // log of weeded VMultiPKfiles
00278     FlushQueue *cacheLogFlushQ;      // queue for flushing cache log
00279     CacheLog::Entry *vCacheLog;      // volatile cache log
00280     CacheLog::Entry *vCacheLogTail;  // tail of volatile cache log
00281     CacheLog::Entry *vCacheAvail;    // available vCacheLog entries
00282     FlushQueue *graphLogFlushQ;      // queue for flushing graph log
00283     GraphLog::Node *vGraphLog;       // volatile graph log
00284     GraphLog::Node *vGraphLogTail;   // tail of volatile graph log
00285     GraphLog::Node *vGraphNodeAvail; // available vGraphLog "Node" entries
00286     int vGraphNodeAvailLen;          // length of vGraphNodeAvail (debugging)
00287     FlushQueue *ciLogFlushQ;         // queue for flushing cache log
00288     Intvl::List *vCILog, *vCIAvail;  // log of usedCI's, available objects
00289     NamesEpochTbl evictedNamesEpochs;// The namesEpochs of evicted
00290                                      // empty PKFiles.
00291 
00292     /* Note: the field "emptyPKLog" is actually read-only after initialization,
00293        but the fields of the object it points to are protected by "mu". */
00294 
00295     // field for bkgrnd VMultiPKFile deletion thread; also protected by "mu"
00296     int freeMPKFileEpoch;
00297 
00298     // fields for supporting "GetCacheState" method; also protected by "mu"
00299     time_t startTime;                // time at which cache came up
00300     int entryCnt;                    // = usedCIs.Cardinality()
00301     MethodCnts cnt;                  // # of calls on various cache methods
00302     EntryState state;                // state of entries in memory
00303 
00304     // Cache server instance identifier, used to prevent an evaluator
00305     // from continuing across cache server restarts; also protected by
00306     // "mu"
00307     FP::Tag instanceFp;
00308 
00309     // fields for flushing MultiPKFiles; also protected by "mu"
00310     struct FlushWorkerList {
00311         FlushWorker *worker;
00312         FlushWorkerList *next;
00313     };
00314     FlushWorkerList *idleFlushWorkers; // linked list of worker threads
00315     Basics::cond availFlushWorker;     // == (idleFlushWorkers != NULL)
00316     int numActiveFlushWorkers;         // number of allocated flushed workers
00317     Basics::cond allFlushWorkersDone;  // == (numActiveFlushWorkers == 0)
00318 
00319     // fields for queueing up checkpoint requests; protected by "mu"
00320     Basics::thread chkptTh;            // thread object for thread
00321     Basics::mutex chkptMu;             // protects following fields
00322     ChkptWorker *availChkptWorkers;    // linked list of worker objects
00323     ChkptWorker *queuedChkptWorkers;   // linked list of objects w/ work to do
00324     Basics::cond waitingChkptWorker;   // == (queuedChkptWorkers != NULL)
00325 
00326     /* The "cache" is a map from PK's to VPKFile objects. The pair
00327        "(pk, vpkfile)" is in "cache" iff "vpkfile" is the VPKFile for the
00328        primary key "pk".
00329 
00330        The "mpkTbl" is a map from PK prefixes (i.e., names of MultiPKFiles)
00331        to "VMultiPKFile" objects. The pair "(pfx, vmpkfile)" appears in this
00332        table iff "vmpkfile" is the VMultiPKFile corresponding to prefix "pfx"
00333        and if the "VToSCache" method should be called to flush that
00334        MultiPKFile to the stable cache. The "vmpkfile" contains pointers to
00335        all of the VPKFile's stored in "cache".
00336 
00337        In general, the cache maintains the following invariant:
00338        
00339                                            pfx = PKPrefix::T(pk)
00340          (pk, vpkfile) IN cache  <==>   /\ (pfx, vmpkfile) IN mpkTbl
00341                                         /\ (pk, vpkfile) IN vmpkfile
00342     */
00343 
00344     // stable logs
00345     Basics::mutex cacheLogMu;        // protects writing/reading "cacheLog"
00346     int cacheLogLen;                 // current length of stable cache log (*)
00347     CleanWorker *idleCleanWorker;    // available worker thread
00348     Basics::cond availCleanWorker;   // == (idleCleanWorker != NULL)
00349     VestaLog *cacheLog;              // disk log of new entries
00350     Basics::mutex graphLogMu;        // protects writing/reading "graphLog"
00351     VestaLog *graphLog;              // disk log of new graph nodes
00352     Basics::mutex ciLogMu;           // protects writing/reading "ciLogMu"
00353     VestaLog *ciLog;                 // disk log of usedCI's
00354 
00355     /* (*) Actually, "cacheLogLen" records the number of entries added to the
00356            stable cache log *since it was last flushed*. */
00357 
00358     /* Locking levels:
00359 |        ciLogMu < graphLogMu < cacheLogMu < mu
00360     */
00361 
00362     // Stable variables -------------------------------------------------------
00363 
00364     void RecoverDeleting() throw (FS::Failure);
00365     /* REQUIRES LL = Empty */
00366     /* Initialize "deleting" from stable storage. */
00367 
00368     void SetStableDeleting(bool del) throw ();
00369     /* REQUIRES Sup(LL) = SELF.mu */
00370     /* Set the "deleting" value to "del", and write its value persistently. */
00371 
00372     void RecoverHitFilter() throw (FS::Failure);
00373     /* REQUIRES LL = Empty */
00374     /* Initialize "hitFilter" from stable storage. */
00375 
00376     void WriteHitFilter() throw ();
00377     /* REQUIRES Sup(LL) = SELF.mu */
00378     /* Write the current value of "hitFilter" to stable storage. */
00379 
00380     void ClearStableHitFilter() throw ();
00381     /* REQUIRES Sup(LL) = SELF.mu */
00382     /* Set the "hitFilter" value to the empty set, and write its value
00383        persistently. */
00384     
00385     void SetStableHitFilter(const BitVector &hf) throw ();
00386     /* REQUIRES Sup(LL) = SELF.mu */
00387     /* Set the "hitFilter" value to "hf", and write its value persistently. */
00388 
00389     void SetMPKsToWeed(const PKPrefix::List &pfxs) throw ();
00390     /* REQUIRES Sup(LL) = SELF.mu */
00391     /* Set "mpksToWeed" and write its value persistently. */
00392 
00393     void RecoverMPKsToWeed() throw (FS::Failure);
00394     /* REQUIRES LL = Empty */
00395     /* Initialize "mpksToWeed" from stable storage. */
00396 
00397     void ResetWeededMPKs() throw ();
00398     /* REQUIRES Sup(LL) = SELF.mu */
00399     /* Set "nextMPKToWeed" to 0, and reset the persistent log of the
00400        MultiPKFiles that have been weeded to the empty set. */
00401 
00402     void AddToWeededMPKs(int num) throw ();
00403     /* REQUIRES Sup(LL) = SELF.mu */
00404     /* Atomically record that "num" more MultiPKFiles in the "mpksToWeed"
00405        list have been flushed via "VToSCache". */
00406 
00407     void RecoverWeededMPKs() throw (VestaLog::Eof, VestaLog::Error);
00408     /* REQUIRES LL = Empty */
00409     /* Initialize "nextMPKToWeed" from stable storage. */
00410 
00411     // (Multi)PKFiles ---------------------------------------------------------
00412 
00413     bool FindVPKFile(const FP::Tag &pk, /*OUT*/ VPKFile* &vpk) throw ();
00414     /* REQUIRES Sup(LL) = SELF.mu */
00415     /* If there is a "VPKFile" in the cache under the primary key
00416        "pk", set "vpk" to point to it and return true.  Otherwise, try
00417        creating a new "VPKFile" (with no "entries") from the stable
00418        cache and return false. If there is no "SPKFile" for "pk",
00419        create a new empty "VPKFile". */
00420 
00421     PKFile::Epoch PKFileEpoch(const FP::Tag &pk) throw ();
00422     /* REQUIRES Sup(LL) = SELF.mu */
00423     /* Return the "pkEpoch" of the PKFile for "pk" in the volatile
00424        cache. If no PKFile exists for "pk", the result will be 0. */
00425 
00426     bool GetVPKFile(const FP::Tag &pk, /*OUT*/ VPKFile* &vpk) throw ();
00427     /* REQUIRES Sup(LL) = SELF.mu */
00428     /* Like "FindVPKFile" above, but also adds the new "VPKFile" to
00429        the cache and sets "vpk" to point to it. Returns "true" iff a "VPKFile"
00430        already existed in the cache under "pk". This method is analogous to
00431        the "GetVF" function in "SVCache.spec". */
00432 
00433     void VToSCache(const PKPrefix::T &pfx, const BitVector *toDelete = NULL)
00434       throw ();
00435     /* REQUIRES 
00436          Sup(LL) < SELF.ciLogMu AND
00437          (FORALL vpk: VPKFile :: Sup(LL) < vpk.mu) */
00438     /* If there are any new entries for the MultiPKFile with prefix "pfx",
00439        flush them to the stable cache. If "toDelete" is non-NULL, then any
00440        cache entries contained in the same MultiPKFile as "pfx" whose
00441        indices correspond to set bits in "toDelete" are deleted from the
00442        cache. */
00443 
00444     void AddEntryToMPKFile(const PKPrefix::T &pfx, const char *reason)
00445       throw ();
00446     /* REQUIRES Sup(LL) < SELF.mu */
00447     /* Called to indicate that a new entry has been added to a VPKFile
00448        in the MultiPKFile with prefix "pfx". If heuristics indicate that the
00449        MultiPKFile needs flushing, the "FlushMPKFile" below is called with
00450        "reason" and with "block = false". */
00451 
00452     void FlushMPKFile(const PKPrefix::T &pfx,
00453       const char *reason, bool block=false) throw ();
00454     /* REQUIRES Sup(LL) = SELF.mu */
00455     /* Fork a background thread to flush the MultiPKFile with prefix "pfx".
00456        The "reason" argument is for debugging purposes; it should indicate
00457        why the MultiPKFile is being flushed. If "block" is true, then this
00458        method will block until a FlushWorker thread becomes available if
00459        none currently are. */
00460 
00461     friend void* CacheS_DoFreeMPKFiles(void *arg) throw ();
00462     /* REQUIRES Sup(LL) < ((CacheS *)arg)->mu */
00463     /* A background thread that attempts to free memory used by
00464        MultiPKFiles that have not been accessed recently. The "arg"
00465        is actually of type "CacheS *". */
00466 
00467     FlushWorker *NewFlushWorker(bool block = false) throw ();
00468     /* REQUIRES Sup(LL) = SELF.mu */
00469     /* Return a new "FlushWorker" object that is ready to run to flush a
00470        MultiPKFile to disk. If "block" is "true", block until a previously-
00471        created flush worker is available to do the work. Once this method
00472        returns, you call the worker's "Start" method to start it running
00473        on a specified MPKFile. */
00474 
00475     void RegisterIdleFlushWorker(FlushWorker *fw) throw ();
00476     /* REQUIRES Sup(LL) < SELF.mu */
00477     /* Return "fw" to the list of idle flush worker threads. This
00478        should be called by the worker when it has completed its job. */
00479 
00480     friend void* CacheS_FlushWorker(void *arg) throw ();
00481     /* REQUIRES Sup(LL) < SELF.mu */
00482     /* The apply function for threads forked to flush a MultiPKFile.
00483        "arg" is actually of type "(FlushWorker *)". */
00484 
00485     // Checkpoint -----------------------------------------------------------
00486 
00487     void DoCheckpoint(const FP::Tag &pkgVersion, Model::T model,
00488       CacheEntry::Indices *cis, bool done) throw ();
00489     /* REQUIRES Sup(LL) < SELF.ciLogMu */
00490     /* This method does the actual work of the "Checkpoint" method. */
00491 
00492     ChkptWorker *QueueChkpt(const FP::Tag &pkgVersion, Model::T model,
00493       CacheEntry::Indices *cis, bool done) throw ();
00494     /* REQUIRES Sup(LL) < SELF.chkptMu */
00495     /* Add a new checkpoint object (whose fields are initialized
00496        to "pkgVersion", "model", "cis", and "done") to the queue of
00497        checkpoints to perform. */ 
00498 
00499     void FinishChkpt(ChkptWorker *cw) throw ();
00500     /* REQUIRES Sup(LL) < SELF.chkptMu */
00501     /* Reset the checkpoint object "cw" and return it to the list of
00502        available checkpoint objects. */
00503 
00504     friend class ChkptWorker;
00505 
00506     // Recovery -------------------------------------------------------------
00507 
00508     void Recover() throw (VestaLog::Error, VestaLog::Eof, FS::Failure);
00509     /* REQUIRES LL = Empty */
00510     /* Read logs to restore in-memory cache. */
00511 
00512     friend void* CacheS_DoDeletions(void *cacheS) throw ();
00513     /* REQUIRES LL = 0 */
00514     /* The body of the background thread for deleting cache entries. "cacheS"
00515        will actually be of type "CacheS*". */
00516 
00517     // Methods related to "cacheLog" ----------------------------------------
00518 
00519     void LogCacheEntry(const Text& sourceFunc, FP::Tag *pk,
00520                        PKFile::Epoch pkEpoch,
00521                        CacheEntry::Index ci, VestaVal::T *value,
00522                        Model::T model, CacheEntry::Indices *kids,
00523                        FV::List *names, FP::List *fps) throw ();
00524     /* REQUIRES Sup(LL) = SELF.mu */
00525     /* Append the tuple "(sourceFunc, pk, pkEpoch, ci, value, model,
00526        kids, names, fps)" to the "vCacheLog". */
00527 
00528     void FlushCacheLog() throw (VestaLog::Error);
00529     /* REQUIRES Sup(LL) < SELF.ciLogMu */
00530     /* Flush the "vCacheLog" to disk ("cacheLog"). This flushes the "vCILog"
00531        and "vGraphLog" first. At most one thread can be flushing the cache
00532        log at a time, so the caller will be blocked if another thread is
00533        currently flushing the log. */
00534 
00535     void CleanCacheLog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure);
00536     /* REQUIRES Sup(LL) < SELF.cacheLogMu */
00537     /* Remove unnecessary entries from the cache log. The only entries that
00538        are preserved are those that are new entries in the volatile cache
00539        that have not been flushed to stable PKFiles. This method writes the
00540        preserved entries to the cache log checkpoint, and it writes an empty
00541        checkpoint for the emptyPKLog. */
00542 
00543     CleanWorker *NewCleanWorker() throw ();
00544     /* REQUIRES Sup(LL) = SELF.cacheLogMu */
00545     /* Block until the background thread for cleaning the cache log is
00546        available; then return it. */
00547 
00548     void RegisterIdleCleanWorker(CleanWorker *cw) throw ();
00549     /* REQUIRES Sup(LL) < SELF.cacheLogMu */
00550     /* Return "cw" to the list of idle clean worker threads. This
00551        should be called by the workern when it has completed its job. */
00552 
00553     friend void* CacheS_CleanCacheLogWorker(void *arg) throw ();
00554     /* REQUIRES Empty(LL) */
00555     /* This is the apply function for a thread that is forked to clean
00556        the CacheLog. */
00557 
00558     void CleanCacheLogEntries(RecoveryReader &rd, std::fstream &ofs,
00559       /*INOUT*/ EmptyPKLog::PKEpochTbl &pkEpochTbl,
00560       /*INOUT*/ int &oldCnt, /*INOUT*/ int &newCnt)
00561       throw (VestaLog::Error, VestaLog::Eof, FS::Failure);
00562     /* REQUIRES Sup(LL) < SELF.cacheLogMu */
00563 
00564     void TryCleanCacheLog(int upper_bound, const char *reason) throw ();
00565     /* REQUIRES Sup(LL) = SELF.cacheLogMu */
00566     /* If the number of entries written to the stable cache log since it was
00567        last cleaned exceeds "upper_bound", fork a thread to ``clean'' it.
00568        The "reason" is a string used for debugging to indicate why the log is
00569        being cleaned. */
00570 
00571     void RecoverEmptyPKLog(/*INOUT*/ bool &empty)
00572       throw (VestaLog::Error, VestaLog::Eof);
00573     /* REQUIRES Sup(LL) = SELF.mu */
00574     /* Read "this->emptyPKLog", adding epochs to its associated table.
00575        Sets "empty" to "false" if any entries are processed. */
00576 
00577     void RecoverCacheLogEntries(RecoveryReader &rd,
00578       /*INOUT*/ EmptyPKLog::PKEpochTbl &pkEpochTbl, /*INOUT*/ bool &empty)
00579       throw (VestaLog::Error, VestaLog::Eof);
00580     /* REQUIRES Sup(LL) = SELF.cacheLogMu */
00581     /* Read cache log entries from "rd" until end of file. "pkEpochTbl" is a
00582        table mapping PKs to their known (from disk) "pkEpoch"s. It is
00583        augmented to include any newly-discovered associations. If the
00584        debugging level is high enough, "empty" is set to "false" if any
00585        entries are read. */
00586 
00587     void RecoverCacheLog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure);
00588     /* REQUIRES LL = Empty */
00589 
00590     // Methods related to "graphLog" ----------------------------------------
00591 
00592     void LogGraphNode(CacheEntry::Index ci, FP::Tag *loc, Model::T model,
00593       CacheEntry::Indices *kids, Derived::Indices *refs) throw ();
00594     /* REQUIRES Sup(LL) = SELF.mu */
00595     /* Append the quintuple "(ci, loc, model, kids, refs)" to the
00596        "vGraphLog". */
00597 
00598     void FlushGraphLog() throw (VestaLog::Error);
00599     /* REQUIRES Sup(LL) < SELF.ciLogMu */
00600     /* Flush the "vGraphLog" to disk ("graphLog"). This flushes the "vCILog"
00601        first. At most one thread can be flushing the graph log at a time, so
00602        the caller will be blocked if the log is currently being flushed by
00603        another thread. */
00604 
00605     int ChkptGraphLog() throw (FS::Failure, VestaLog::Error);
00606     /* REQUIRES Sup(LL) < SELF.graphLogMu */
00607     /* Checkpoint the graphLog and return the number of the new checkpoint
00608        and log file. */
00609 
00610     void AbortGraphLogChkpt() throw (VestaLog::Error);
00611     /* REQUIRES Sup(LL) < SELF.graphLogMu */
00612     /* Abort the currently outstanding graphLog checkpoint. */
00613 
00614     void RecoverGraphLog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure);
00615     /* REQUIRES LL = Empty */
00616 
00617     // Methods related to the "ciLog" ---------------------------------------
00618 
00619     void LogCI(Intvl::Op op, CacheEntry::Index ci) throw ();
00620     /* REQUIRES Sup(LL) = SELF.mu */
00621     /* Append "ci" to the "vCILog"; "op" indicates if the index should be
00622         added or removed from the "usedCIs" set. */
00623 
00624     void FlushUsedCIs() throw (VestaLog::Error);
00625     /* REQUIRES Sup(LL) < SELF.ciLogMu */
00626     /* Flush the "vCILog" to disk ("ciLog"). */
00627 
00628     Intvl::List *FlushUsedCIsList(Intvl::List *vLog, /*OUT*/ int &numFlushed)
00629       throw (VestaLog::Error);
00630     /* REQUIRES SELF.ciLogMu IN LL */
00631     /* Flush the entries in "vLog" to the "usedCIs" log on disk, and return
00632        a pointer to the last node in the list. Requires "vLog != NULL". Sets
00633        "numFlushed" to the number of intervals flushed to the log. */
00634 
00635     void ChkptUsedCIs(const BitVector &del)
00636       throw (VestaLog::Error, FS::Failure);
00637     /* REQUIRES Sup(LL) < SELF.ciLogMu */
00638     /* Atomically flush all volatile "usedCIs" entries to the "usedCIs" disk
00639        log, subtract the entries in "del" from "this->usedCIs", and checkpoint
00640        the "usedCIs" disk log so it contains the new value of "usedCIs". */
00641 
00642     void RecoverCILog() throw (VestaLog::Error, VestaLog::Eof, FS::Failure);
00643     /* REQUIRES LL = Empty */
00644     /* Set "usedCIs" from the "ciLog" on disk. */
00645 
00646     // make copy constructor inaccessible to clients
00647     CacheS(const CacheS&);
00648 };
00649 
00650 // Worker thread classes ------------------------------------------------------
00651 
00652 // base class for various kinds of worker threads
00653 class CacheWorker {
00654   public:
00655     CacheWorker(CacheS *cs) throw ();
00656     /* Initialize a new worker object ready to run on "cs".
00657        Call the associated "Start" method to actually run it
00658        on particular arguments. */
00659 
00660     void Start(const char *reason) throw ();
00661     /* Signal the thread to go. Subclasses may have their own Start
00662        methods to allow passing arguments to the thread.
00663        The "reason" argument is for debugging; it indicates
00664        the task on behalf of which the worker thread is acting. */
00665 
00666     void Finish() throw ();
00667     /* Indicate that the worker thread is idle. */
00668   protected:
00669     // the thread and associated cache server
00670     Basics::thread th;
00671     CacheS *cs;
00672 
00673     // synchronization fields
00674     Basics::mutex mu;
00675     bool argsReady;
00676     Basics::cond workToDo;
00677     const char *reason;    // for debugging info only
00678 };
00679 
00680 // worker thread for flush a specified MultiPKFile to the stable cache
00681 class FlushWorker : public CacheWorker {
00682   public:
00683     FlushWorker(CacheS *cs) throw ();
00684 
00685     void Start(const PKPrefix::T &pfx, const char *reason) throw ();
00686     /* Start the worker thread flushing the MultiPKFile
00687        named by "pfx". The "reason" argument indicates why
00688        this MultiPKFile is being flushed. */
00689 
00690   private:
00691     friend void* CacheS_FlushWorker(void *arg) throw ();
00692     /* REQUIRES LL = 0 */
00693     /* Callback function for the cache server worker thread
00694        that flushes the MultiPKFile named by "arg->pfx".
00695        "arg" will actually be of type "FlushWorker *". */ 
00696 
00697     // thread args
00698     PKPrefix::T pfx;
00699 };
00700 
00701 // worker thread for cleaning the cache log
00702 class CleanWorker : public CacheWorker {
00703   public:
00704     CleanWorker(CacheS *cs) throw ();
00705 
00706   private:
00707     friend void* CacheS_CleanCacheLogWorker(void *arg) throw ();
00708     /* REQUIRES LL = 0 */
00709     /* Callback function for the cache server worker thread
00710        that cleans the cache log. "arg" will actually be of
00711        type "CleanWorker *". */ 
00712 };
00713 
00714 // class for checkpointing an evaluation
00715 class ChkptWorker {
00716   /* As opposed to the other "*Worker" objects, there is not a separate
00717      thread per "ChkptWorker" object. Instead, the cache forks a single
00718      thread running the "ChkptWorker::MainLoop" method below. Each
00719      "ChkptWorker" object is simply a container of arguments on which
00720      the "CacheS::DoCheckpoint" method is invoked. */
00721   public:
00722     // constructors/initializers
00723     ChkptWorker(const FP::Tag &pkgVersion, Model::T model,
00724       CacheEntry::Indices *cis, bool done) throw ()
00725           { Init(pkgVersion, model, cis, done); }
00726     void Init(const FP::Tag &pkgVersion, Model::T model,
00727       CacheEntry::Indices *cis, bool done) throw ();
00728 
00729     // reset object fields after using it
00730     void Reset() throw ();
00731 
00732     // block until this checkpoint completes
00733     void WaitUntilDone() throw ();
00734     /* REQUIRES Sup(LL) < SELF.mu */
00735 
00736     static void* MainLoop(void *arg) throw ();
00737     /* REQUIRES Sup(LL) < CacheS::ciLogMu */
00738     /* Repeatedly invoke "DoCheckpoint" whenever there is another worker
00739        in the "CacheS::queuedChkptWorkers" queue. "arg" is actually of
00740        type "CacheS". */
00741 
00742     // next object in linked list
00743     ChkptWorker *next;
00744 
00745   private:
00746     // arguments
00747     FP::Tag pkgVersion;
00748     Model::T model;
00749     CacheEntry::Indices *cis;
00750     bool done;
00751 
00752     // synchronization fields
00753     Basics::mutex mu;           // protects following fields
00754     bool chkptComplete; // did the checkpoint finish?
00755     Basics::cond isDone;        // == (chkptComplete)
00756 };
00757 
00758 #endif // _CACHES_H

Generated on Mon May 8 00:48:34 2006 for Vesta by  doxygen 1.4.2