Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

VRWeed.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 //
00020 // VRWeed.C
00021 //
00022 // Miscellaneous repository weeding code.  The mark-and-sweep work
00023 // for source weeding is done in VDirChangeable.C and VMemPool.C.
00024 // Actual SourceOrDerived deletion is done in ShortIdImpl.C.
00025 
00026 #if __linux__
00027 #include <stdint.h>
00028 #endif
00029 #include "VRWeed.H"
00030 #include "VMemPool.H"
00031 #include "VestaSource.H"
00032 #include "VDirChangeable.H"
00033 #include "VDirEvaluator.H"
00034 #include "VForward.H"
00035 #include "ReadersWritersLock.H"
00036 #include "SourceOrDerived.H"
00037 #include "VestaLog.H"
00038 #include "Recovery.H"
00039 #include "VestaConfig.H"
00040 #include "ShortIdImpl.H"
00041 #include "VRConcurrency.H"
00042 #include "VestaAttribsRep.H"
00043 #include "DirShortId.H"
00044 #include "Mastership.H"
00045 #include "logging.H"
00046 #include <Thread.H>
00047 #include <fstream>
00048 #include <sys/time.h>
00049 #include <BufStream.H>
00050 #include "MutableSidref.H"
00051 
00052 using Basics::OBufStream;
00053 using std::hex;
00054 using std::dec;
00055 using std::ifstream;
00056 using std::fstream;
00057 using std::ios;
00058 
00059 #include "lock_timing.H"
00060 
00061 // Module globals
00062 static Basics::mutex state_mu; // protects the following:
00063 static ShortId keepSourceSid = NullShortId, keepDerivedSid = NullShortId;
00064 static time_t keepSourceTime = 0, keepDerivedTime = 0;
00065 static bool deletionsDone = true;
00066 static bool sourceWeedDone = false;
00067 static bool checkpointInProgress = false;
00068 static bool checkpointRequested = false;
00069 static bool deletionsInProgress = false;
00070 static bool sourceWeedInProgress = false; // logical lock, protects:
00071 static FILE* keepSourceSidFile = NULL;
00072 // end protected by sourceWeedInProgress
00073 static Basics::cond state_cond; // wait for an inProgress or requested
00074                                 // flag to change
00075 // end protected by state_mutex
00076 
00077 // Recovery code
00078 static void
00079 KeepCallback(RecoveryReader* rr, char& c)
00080      throw(VestaLog::Error, VestaLog::Eof)
00081 {
00082     unsigned long ulsid;
00083     long lsec;
00084     char kind;
00085     rr->skipWhite(c);
00086     kind = c;
00087     rr->get(c);
00088     rr->getULong(c, ulsid);
00089     rr->getLong(c, lsec);
00090     switch (kind) {
00091       case 's':
00092         keepSourceSid = (ShortId) ulsid;
00093         keepSourceTime = lsec;
00094         break;
00095       case 'd':
00096         keepDerivedSid = (ShortId) ulsid;
00097         keepDerivedTime = lsec;
00098         keepSourceSid = NullShortId;
00099         keepSourceTime = 0;
00100         break;
00101       default:
00102         assert(false);
00103     }
00104     deletionsDone = false;
00105 }
00106 
00107 static void
00108 DdunCallback(RecoveryReader* rr, char& c)
00109      throw(VestaLog::Error, VestaLog::Eof)
00110 {
00111     deletionsDone = true;
00112 }
00113 
00114 static void
00115 NpinCallback(RecoveryReader* rr, char& c)
00116      throw(VestaLog::Error, VestaLog::Eof)
00117 {
00118     // Obsolete; read and ignore
00119     unsigned long ulnpin;
00120     rr->getULong(c, ulnpin);
00121 }
00122 
00123 static void
00124 FsidCallback(RecoveryReader* rr, char& c)
00125      throw(VestaLog::Error, VestaLog::Eof)
00126 {
00127     unsigned long ulsid;
00128     rr->skipWhite(c);
00129     while (c != ')') {
00130         rr->getULong(c, ulsid);
00131         DeleteDirShortId((ShortId) ulsid);
00132         rr->skipWhite(c);
00133     }
00134 }
00135 
00136 void
00137 InitVRWeed()
00138 {
00139     RegisterRecoveryCallback("keep", KeepCallback);
00140     RegisterRecoveryCallback("ddun", DdunCallback);
00141     RegisterRecoveryCallback("npin", NpinCallback);
00142     RegisterRecoveryCallback("fsid", FsidCallback);
00143 
00144     // Register garbage collection routines
00145     VMemPool::registerCallbacks(VMemPool::vDirChangeable,
00146                                 VDirChangeable::markCallback, NULL,
00147                                 VDirChangeable::sweepCallback,
00148                                 (void*) &keepSourceSidFile,
00149                                 VDirChangeable::rebuildCallback, NULL);
00150 
00151     VMemPool::registerCallbacks(VMemPool::vDirImmutable,
00152                                 VDirChangeable::markCallback, NULL,
00153                                 VDirChangeable::sweepCallback,
00154                                 (void*) &keepSourceSidFile,
00155                                 VDirChangeable::rebuildCallback, NULL);
00156     
00157     VMemPool::registerCallbacks(VMemPool::vDirAppendable,
00158                                 VDirChangeable::markCallback, NULL,
00159                                 VDirChangeable::sweepCallback,
00160                                 (void*) &keepSourceSidFile,
00161                                 VDirChangeable::rebuildCallback, NULL);
00162     
00163     VMemPool::registerCallbacks(VMemPool::vForward,
00164                                 VForward::markCallback, NULL,
00165                                 VForward::sweepCallback, NULL,
00166                                 VForward::rebuildCallback, NULL);
00167     
00168     VMemPool::registerCallbacks(VMemPool::vDirEvaluator,
00169                                 VDirEvaluator::markCallback, NULL,
00170                                 VDirEvaluator::sweepCallback, NULL,
00171                                 VDirEvaluator::rebuildCallback, NULL);
00172 
00173     VMemPool::registerCallbacks(VMemPool::vAttrib,
00174                                 VestaAttribsRep::markCallback, NULL,
00175                                 VestaAttribsRep::sweepCallback, NULL,
00176                                 VestaAttribsRep::rebuildCallback, NULL);
00177 }
00178 
00179 static void
00180 LogKeep(char kind, ShortId sid, time_t time)
00181 {
00182     StableLock.acquireWrite();
00183     RWLOCK_LOCKED_REASON(&StableLock, "Weeding:LogKeep");
00184     {
00185         char logrec[512];
00186         OBufStream ost(logrec, sizeof(logrec));
00187         // ('keep' kind sid time)
00188         ost << "(keep " << kind
00189           << " 0x" << hex << sid << " " << dec
00190           << time << ")\n";
00191         VRLog.start();
00192         VRLog.put(ost.str());
00193         VRLog.commit();
00194     }
00195     StableLock.releaseWrite();
00196 }
00197 
00198 int
00199 KeepDerived(ShortIdsFile ds, time_t dt, bool force)
00200 {
00201     if (ds != NullShortId) {
00202       // Sanity check that ds exists
00203       int fd = SourceOrDerived::fdopen(ds, O_RDONLY);
00204       if (fd == -1) {
00205         assert(errno != 0);
00206         return errno;
00207       }
00208       close(fd);
00209     }
00210 
00211     // Log the new ds, dt
00212     state_mu.lock();
00213     while (checkpointInProgress || sourceWeedInProgress ||
00214            sourceWeedDone || deletionsInProgress) {
00215       state_cond.wait(state_mu);
00216     }
00217     if (!force && keepDerivedSid == ds && keepDerivedTime == dt &&
00218         deletionsDone) {
00219       // This weed already succeeded
00220       state_mu.unlock();
00221       return 0;
00222     }
00223     Repos::dprintf(DBG_ALWAYS, 
00224                    "keep derived: keepDerivedTime %d, sid 0x%08x\n", dt, ds);
00225     keepDerivedSid = ds;
00226     keepDerivedTime = dt;
00227     keepSourceSid = NullShortId;
00228     keepSourceTime = 0;
00229     deletionsDone = false;
00230     LogKeep('d', ds, dt);
00231     state_mu.unlock();
00232 
00233     // Do source weed and deletions
00234     int err = SourceWeed();
00235     if (err) return err;
00236     return DoDeletions();
00237 }
00238 
00239 int
00240 SourceWeed() throw ()
00241 {
00242     int err = 0, res;
00243     state_mu.lock();
00244     while (checkpointInProgress ||
00245            sourceWeedInProgress || deletionsInProgress) {
00246         state_cond.wait(state_mu);
00247     }   
00248     if (deletionsDone) {
00249         // Nothing to do
00250         state_mu.unlock();
00251         return 0;
00252     } 
00253     sourceWeedInProgress = true;
00254     state_mu.unlock();
00255     state_cond.broadcast();
00256 
00257     time_t newKeepSourceTime = time(NULL) - 30;  // fudge to be extra safe
00258     ShortId newKeepSourceSid;
00259     int fd = SourceOrDerived::fdcreate(newKeepSourceSid);
00260     if (fd < 0) {
00261         assert (errno != 0);
00262         err = errno;
00263         goto done;
00264     }
00265     keepSourceSidFile = fdopen(fd, "w");
00266     if (fd < 0 || keepSourceSidFile == NULL) {
00267         assert (errno != 0);
00268         err = errno;
00269         goto done;
00270     }
00271     Repos::dprintf(DBG_ALWAYS, "source weed: keepSourceTime %d, sid 0x%08x\n",
00272                    newKeepSourceTime, newKeepSourceSid);
00273     // Keep this ShortIdsFile itself, because we want the deletion phase
00274     //  to be restartable after a crash.
00275     res = fprintf(keepSourceSidFile, "%08x\n", newKeepSourceSid);
00276     if (res < 0) {
00277         assert (errno != 0);
00278         err = errno;
00279         goto done;
00280     }
00281 
00282     VDirVolatileRoot::lockAll();
00283     StableLock.acquireWrite();
00284     RWLOCK_LOCKED_REASON(&VolatileRootLock, "Weeding:SourceWeed");
00285     RWLOCK_LOCKED_REASON(&StableLock, "Weeding:SourceWeed");
00286     VMemPool::gc(keepDerivedSid);
00287     StableLock.releaseWrite();
00288     VDirVolatileRoot::unlockAll();
00289 
00290     res = fflush(keepSourceSidFile);
00291     if (res == EOF) {
00292         assert (errno != 0);
00293         err = errno;
00294         goto done;
00295     }
00296     res = fsync(fd);
00297     if (res < 0) {
00298         assert (errno != 0);
00299         err = errno;
00300         goto done;
00301     }
00302     res = fclose(keepSourceSidFile);
00303     if (res == EOF) {
00304         assert (errno != 0);
00305         err = errno;
00306         goto done;
00307     }
00308 
00309   done:
00310     state_mu.lock();
00311     if (err == 0) {
00312         keepSourceTime = newKeepSourceTime;
00313         keepSourceSid = newKeepSourceSid;
00314         deletionsDone = false;
00315         LogKeep('s', newKeepSourceSid, newKeepSourceTime);
00316         Repos::dprintf(DBG_ALWAYS, "source weed done\n");
00317     } else {
00318         Repos::dprintf(DBG_ALWAYS, "source weed failed: errno %d\n", err);
00319     }
00320     sourceWeedInProgress = false;
00321     sourceWeedDone = true;
00322     state_mu.unlock();
00323     state_cond.broadcast();
00324     return err;
00325 }
00326 
00327 // Check whether a file exists and has a size greater than 0.  Used
00328 // for sanity check on the file of shortids to keep.
00329 static bool file_is_non_empty(const char *p_fname) throw()
00330 {
00331   struct stat l_info;
00332 
00333   // If we can stat the file and its size is greater than 0, it's
00334   // non-empty.
00335   return ((stat(p_fname, &l_info) == 0) &&
00336           (l_info.st_size > 0));
00337 }
00338 
00339 // Check whether the final character in a file is a newline.  Used for
00340 // sanity check on the file of shortids to keep.
00341 static bool file_ends_with_newline(const char *p_fname) throw()
00342 {
00343   // Try to open the file
00344   ifstream l_file(p_fname);
00345   if(l_file.good())
00346     {
00347       // Go to one character before the end of the file.
00348       l_file.seekg(-1, fstream::end);
00349 
00350       Repos::dprintf(DBG_ALWAYS,
00351                      "file_ends_with_newline: position after seek: %d\n",
00352                      (unsigned int) l_file.tellg());
00353 
00354       if(l_file.good())
00355         {
00356           // If it's a newline, return true.
00357           int l_next = l_file.peek();
00358           Repos::dprintf(DBG_ALWAYS,
00359                          "file_ends_with_newline: last character in file: %d\n",
00360                          l_next);
00361           if(l_next == '\n')
00362             {
00363               return true;
00364             }
00365         }
00366       else
00367         {
00368           Repos::dprintf(DBG_ALWAYS,
00369                          "file_ends_with_newline: seek caused error "
00370                          "(l_file.good() is false after seek)\n");
00371         }
00372     }
00373   else
00374     {
00375       Repos::dprintf(DBG_ALWAYS,
00376                      "file_ends_with_newline: couldn't open %s "
00377                      "(l_file.good() is false after open)\n", p_fname);
00378     }
00379 
00380   return false;
00381 }
00382 
00383 int
00384 DoDeletions() throw ()
00385 {
00386     state_mu.lock();
00387     while (checkpointInProgress ||
00388            sourceWeedInProgress || deletionsInProgress) {
00389         state_cond.wait(state_mu);
00390     }   
00391     if (deletionsDone) {
00392         // Nothing to do
00393         state_mu.unlock();
00394         return 0;
00395     } 
00396     ShortIdsFile ds = keepDerivedSid;
00397     time_t dt = keepDerivedTime;
00398     ShortIdsFile ss = keepSourceSid;
00399     time_t st = keepSourceTime;
00400     deletionsInProgress = true;
00401     sourceWeedDone = false;
00402     state_mu.unlock();
00403     state_cond.broadcast();
00404     
00405     int ret = 0;
00406     int status;
00407     Text cmd;
00408     char* oname;
00409     // Sort sids and eliminate duplicates in a separate process.
00410     // Eventually this should run on a separate machine to avoid
00411     // memory contention with the server.
00412     Repos::dprintf(DBG_ALWAYS, "starting deletions\n");
00413     Text sidsort;
00414     if (!VestaConfig::get("Repository", "ShortId_sorter", sidsort)) {
00415         sidsort = "sidsort";
00416     }
00417     const char* sname = (ss == NullShortId) ? ""
00418       : SourceOrDerived::shortIdToName(ss);
00419     const char* dname = (ds == NullShortId) ? ""
00420       : SourceOrDerived::shortIdToName(ds);
00421     ShortIdsFile osidfile;
00422     int fd = SourceOrDerived::fdcreate(osidfile);
00423     if (fd < 0) {
00424         ret = errno;
00425         Repos::dprintf(DBG_ALWAYS, 
00426                        "failed to create combined keepSidFile: errno %d\n", ret);
00427         assert(ret != 0);
00428         goto finish;
00429     }
00430     close(fd);
00431     oname = SourceOrDerived::shortIdToName(osidfile);
00432     cmd = sidsort +" "+ oname +" "+ sname +" "+ dname;
00433     delete [] sname;
00434     delete [] dname;
00435     status = system(cmd.cchars());
00436     if (status != 0) {
00437         if (status < 0) {
00438             ret = errno;
00439             Repos::dprintf(DBG_ALWAYS, "system() failed: errno %d\n", ret);
00440             assert(ret != 0);
00441         } else {
00442             Repos::dprintf(DBG_ALWAYS, "ShortId_sorter failed: status %d\n", status);
00443             ret = EINVAL;
00444         }
00445         goto finish;
00446     }
00447 
00448     // Perform two sanitry checks on the combined file of shortids to
00449     // keep:
00450 
00451     // 1. If it's empty (indicating that all shortids are to be
00452     // deleted), refuse to do deletions.  The only legitimate way for
00453     // this to happen is a degenerate case.  If you really want to
00454     // delete everything, delete the repository's metadata and start
00455     // fresh.  For the sake of safety, we assume that an empty keep
00456     // list means that something went wrong with sidsort or the
00457     // generation of one of the files fed to it.
00458 
00459     // 2. If the final character in the file is not a newline
00460     // (suggesting that it was truncated), refuse to do deletions.  If
00461     // there's any chance that the keep list is incomplete, we err on
00462     // the side of caution and don't proceed with deletions.
00463     if(!file_is_non_empty(oname))
00464       {
00465         Repos::dprintf(DBG_ALWAYS,
00466                        "combined keepSidFile (0x%08x) is empty;"
00467                        " not doing deletions\n", osidfile);
00468         ret = ENODATA;
00469         goto finish;
00470       }
00471     else if(!file_ends_with_newline(oname))
00472       {
00473         Repos::dprintf(DBG_ALWAYS,
00474                        "combined keepSidFile (0x%08x) doen't end with a newline "
00475                        "(may be truncated); not doing deletions\n", osidfile);
00476         ret = EINVAL;
00477         goto finish;
00478         
00479       }
00480 
00481     delete [] oname;
00482 
00483     Repos::dprintf(DBG_ALWAYS, "wrote combined keepSidFile 0x%08x\n", osidfile);
00484     
00485     ret = DeleteAllShortIdsBut(osidfile, (st < dt) ? st : dt);
00486     
00487   finish:
00488     state_mu.lock();
00489     if (ret == 0 && ds == keepDerivedSid && dt == keepDerivedTime &&
00490         ss == keepSourceSid && st == keepSourceTime) {
00491         StableLock.acquireWrite();
00492         RWLOCK_LOCKED_REASON(&StableLock, "Weeding:deletions done");
00493         VRLog.start();
00494         VRLog.put("(ddun)\n");
00495         VRLog.commit();
00496         StableLock.releaseWrite();
00497         deletionsDone = true;
00498         Repos::dprintf(DBG_ALWAYS, "deletions done\n");
00499     } else {
00500         Repos::dprintf(DBG_ALWAYS, "deletions failed\n");
00501     }
00502     deletionsInProgress = false;
00503     state_mu.unlock();
00504     state_cond.broadcast();
00505     return ret;
00506 }
00507 
00508 // Implementation of the StartCheckpoint SRPC call.  Write a checkpoint,
00509 //  then immediately read back the VMemPool part of it.  This uses a
00510 //  "server" thread that is known to have a large enough stack to handle
00511 //  deep recursion (namely, the main thread).
00512 void
00513 Checkpoint() throw ()
00514 {
00515     // Kick the CheckpointServer and wait for it to finish
00516     state_mu.lock();
00517     while (checkpointRequested) {
00518         state_cond.wait(state_mu);
00519     }
00520     checkpointRequested = true;
00521     state_cond.broadcast();
00522     while (checkpointInProgress || checkpointRequested) {
00523         state_cond.wait(state_mu);
00524     }
00525     state_mu.unlock();
00526     return;
00527 }
00528 
00529 void
00530 CheckpointServer() throw ()
00531 {
00532     for (;;) {
00533         state_mu.lock();
00534         while (!checkpointRequested ||
00535                sourceWeedInProgress || deletionsInProgress) {
00536             state_cond.wait(state_mu);
00537         }
00538         Repos::dprintf(DBG_ALWAYS, "starting checkpoint\n");
00539         assert(!checkpointInProgress);
00540         checkpointInProgress = true;
00541         checkpointRequested = false;
00542         state_mu.unlock();
00543         state_cond.broadcast();
00544 
00545         VDirVolatileRoot::lockAll();
00546         StableLock.acquireWrite();
00547 
00548         RWLOCK_LOCKED_REASON(&VolatileRootLock, "Weeding:CheckpointServer");
00549         RWLOCK_LOCKED_REASON(&StableLock, "Weeding:CheckpointServer");  
00550 
00551         fstream& ckpt = *VRLog.checkpointBegin(ios::in|ios::out);
00552         if (!ckpt.good()) {
00553             Repos::dprintf(DBG_ALWAYS, 
00554                            "creation of checkpoint file failed: errno %d\n",errno);
00555             assert(ckpt.good());        // crash
00556         }
00557         
00558         VMemPool::writeCheckpoint(ckpt);
00559         if (!ckpt.good()) {
00560             Repos::dprintf(DBG_ALWAYS, 
00561                            "write to checkpoint file failed: errno %d\n", errno);
00562             assert(ckpt.good());        // crash
00563         }
00564         
00565         // Checkpoint log version
00566         ckpt << "(vers " << VRLogVersion << ")\n";
00567 
00568         ShortIdBlockCheckpoint(ckpt);
00569         if (!ckpt.good()) {
00570             Repos::dprintf(DBG_ALWAYS,
00571                            "write to checkpoint file failed: errno %d\n", errno);
00572             assert(ckpt.good());        // crash
00573         }
00574         
00575         MastershipCheckpoint(ckpt);
00576         if (!ckpt.good()) {
00577             Repos::dprintf(DBG_ALWAYS,
00578                            "write to checkpoint file failed: errno %d\n", errno);
00579             assert(ckpt.good());        // crash
00580         }
00581 
00582         // The two keeps and the ddun must be in this order!
00583         ckpt << "(keep d 0x" << hex << keepDerivedSid << " "
00584           << dec << keepDerivedTime << ")\n";
00585         ckpt << "(keep s 0x" << hex << keepSourceSid << " "
00586           << dec << keepSourceTime << ")\n";
00587         if (deletionsDone) {
00588             ckpt << "(ddun)\n";
00589         }
00590         
00591         VDirVolatileRoot::finishCheckpoint(ckpt);
00592 
00593         if (!ckpt.good()) {
00594             Repos::dprintf(DBG_ALWAYS,
00595                            "write to checkpoint file failed: errno %d\n", errno);
00596             assert(ckpt.good());        // crash
00597         }
00598         
00599         ckpt.seekg(0, fstream::beg);
00600         VMemPool::readCheckpoint(ckpt, true);
00601         ckpt.close();
00602         
00603         VRLog.checkpointEnd();
00604         
00605         // Paranoid check of the mutable root shortid reference count
00606         MutableSidrefCheck(0, LongId::noLock);
00607 
00608         StableLock.releaseWrite();
00609         VDirVolatileRoot::unlockAll();
00610 
00611         Repos::dprintf(DBG_ALWAYS, "checkpoint done\n");
00612         state_mu.lock();
00613         checkpointInProgress = false;
00614         state_mu.unlock();
00615         state_cond.broadcast();
00616     }
00617 }
00618 
00619 void GetWeedingState(ShortIdsFile& ds, time_t& dt,
00620                      ShortIdsFile& ss, time_t& st,
00621                      bool& sourceWeedInProgress,
00622                      bool& deletionsInProgress,
00623                      bool& deletionsDone,
00624                      bool& checkpointInProgress) throw ()
00625 {
00626     state_mu.lock();
00627     ds = keepDerivedSid;
00628     dt = keepDerivedTime;
00629     ss = keepSourceSid;
00630     st = keepSourceTime;
00631     sourceWeedInProgress = ::sourceWeedInProgress;
00632     deletionsInProgress = ::deletionsInProgress;
00633     deletionsDone = ::deletionsDone;
00634     checkpointInProgress = ::checkpointInProgress;
00635     state_mu.unlock();
00636 }

Generated on Mon May 8 00:48:54 2006 for Vesta by  doxygen 1.4.2