Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

VestaLog.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 //
00020 // VestaLog.C
00021 // Last modified on Sat May 28 16:47:33 EDT 2005 by ken@xorian.net         
00022 //      modified on Fri Aug  9 12:25:11 EDT 2002 by kcschalk@shr.intel.com 
00023 //      modified on Wed Sep  6 16:27:41 PDT 2000 by mann   
00024 //      modified on Tue May  4 10:47:23 PDT 1999 by heydon 
00025 //
00026 // Log changes to the repository state
00027 //
00028 
00029 #include "VestaLog.H"
00030 #include "VestaLogPrivate.H"
00031 
00032 #include <unistd.h>
00033 #include <sys/stat.h>
00034 #include <assert.h>
00035 #include <fcntl.h>
00036 #include <errno.h>
00037 #include <dirent.h>
00038 // add declaration to fix broken <dirent.h> header file
00039 extern "C" int _Preaddir_r(DIR *, struct dirent *, struct dirent **);
00040 
00041 #include <FS.H>
00042 #include <FdStream.H>
00043 
00044 using std::ifstream;
00045 using std::fstream;
00046 using std::ios;
00047 using std::endl;
00048 
00049 static const char VersionFileName[] = "version";
00050 static const char NewVersionFileName[] = "version.new";
00051 static const char PrunedFileName[] = "pruned";
00052 static const char NewPrunedFileName[] = "pruned.new";
00053 static const char LockFileName[] = "lock";
00054 static const char LogExtension[] = ".log";
00055 static const char CheckpointExtension[] = ".ckp";
00056 static const int MaxFileNameLen = 4096;
00057 
00058 #define COPY_SIZE 8192
00059 
00060 // Protection bits for log files hardwired here
00061 static const int LOG_PROT = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
00062 
00063 VestaLog::VestaLog() throw()
00064 {
00065     int i;
00066 
00067     vlp = NEW(VestaLogPrivate);
00068     vlp->state = VestaLogPrivate::initial;
00069 }
00070 
00071 void VestaLog::open(char* dir, int ver, bool readonly, bool lock,
00072                     char* dir2, bool bakckp) throw (VestaLog::Error)
00073 {
00074     // state: initial -> recovering
00075     assert(vlp->state == VestaLogPrivate::initial);
00076     vlp->directory = NEW_PTRFREE_ARRAY(char, strlen(dir) + 1);
00077     strcpy(vlp->directory, dir);
00078     vlp->readonly = readonly;
00079     if (dir2) {
00080       vlp->directory2 = NEW_PTRFREE_ARRAY(char, strlen(dir2) + 1);
00081       strcpy(vlp->directory2, dir2);
00082     } else {
00083       vlp->directory2 = NULL;
00084       assert(!bakckp);
00085     }
00086     vlp->bakckp = bakckp;
00087 
00088     vlp->lockfd = -1;
00089     vlp->lockfd2 = -1;
00090     if (lock) {
00091         // Acquire advisory lock on this log
00092         Text lockfn = Text(vlp->directory) + PathnameSep + LockFileName;
00093         vlp->lockfd = ::open(lockfn.cchars(), O_RDWR | O_CREAT, 0666);
00094         if (vlp->lockfd == -1) {
00095             vlp->state = VestaLogPrivate::bad;
00096             throw Error(errno, Text("VestaLog::open got \"") +
00097                         Basics::errno_Text(errno) + "\" opening " + lockfn);
00098         }
00099         struct flock fl;
00100         fl.l_type = readonly ? F_RDLCK : F_WRLCK;
00101         fl.l_whence = SEEK_SET;
00102         fl.l_start = 0;
00103         fl.l_len = 0;
00104         fl.l_pid = 0;
00105         int ok = ::fcntl(vlp->lockfd, F_SETLK, &fl);
00106         if (ok == -1) {
00107             vlp->state = VestaLogPrivate::bad;
00108             throw Error(errno, Text("VestaLog::open got \"") +
00109                         Basics::errno_Text(errno) + "\" locking " + lockfn);
00110         }
00111         // Acquire advisory lock on backup log
00112         if (dir2) {
00113           Text lockfn2 = Text(vlp->directory2) + PathnameSep + LockFileName;
00114           vlp->lockfd2 = ::open(lockfn2.cchars(), O_RDWR | O_CREAT, 0666);
00115           if (vlp->lockfd2 == -1) {
00116             vlp->state = VestaLogPrivate::bad;
00117             throw Error(errno, Text("VestaLog::open got \"") +
00118                         Basics::errno_Text(errno) + "\" opening " + lockfn2);
00119           }
00120           ok = ::fcntl(vlp->lockfd, F_SETLK, &fl);
00121           if (ok == -1) {
00122             vlp->state = VestaLogPrivate::bad;
00123             throw Error(errno, Text("VestaLog::open got \"") +
00124                         Basics::errno_Text(errno) + "\" locking " + lockfn2);
00125           }
00126         }
00127     }
00128 
00129     // Determine the highest committed checkpoint version number,
00130     //  and the log version to open
00131     Text vfn = Text(vlp->directory) + PathnameSep + VersionFileName;
00132     ifstream vf;
00133     do {
00134       // Retry loop to work around Tru64 NFS weirdness.  If the
00135       // version file has just been updated on another host, our NFS
00136       // client may have a mapping to the filehandle of the old version
00137       // file in its cache.  Arrgh!
00138       vf.open(vfn.cchars());
00139     } while (vf.fail() && errno == ESTALE);
00140 
00141     if (vf.fail()) {
00142         vlp->ccVersion = 0;
00143     } else {
00144         vf >> vlp->ccVersion;
00145     }
00146     vf.close();
00147     if (ver == -1) {
00148         vlp->version = vlp->ccVersion;
00149     } else {
00150         vlp->version = ver;
00151     }
00152 
00153     // Find highest committed checkpoint in backup directory
00154     if (vlp->directory2) {
00155       Text bvfn = Text(vlp->directory2) + PathnameSep + VersionFileName;
00156       ifstream bvf;
00157       do {
00158         bvf.open(bvfn.cchars());
00159       } while (bvf.fail() && errno == ESTALE);
00160 
00161       if (bvf.fail()) {
00162         vlp->ccVersion2 = 0;
00163       } else {
00164         bvf >> vlp->ccVersion2;
00165       }
00166       bvf.close();
00167     }
00168 
00169     // Check for invalid ver parameter
00170     if (vlp->version > vlp->ccVersion) {
00171         vlp->state = VestaLogPrivate::bad;
00172         throw Error(0,
00173           "VestaLog::open parameter error: ver > last committed checkpoint");
00174     }
00175 
00176     // Open the logfile
00177     char vstr[16];
00178     sprintf(vstr, "%d", vlp->version);
00179     Text lfn = Text(vlp->directory) + PathnameSep + vstr + LogExtension;
00180     vlp->fd = ::open(lfn.cchars(),
00181                      (vlp->readonly ? O_RDONLY : O_RDWR) |
00182                      (vlp->version == 0 ? O_CREAT : 0), LOG_PROT);
00183     if (vlp->fd == -1) {
00184         vlp->state = VestaLogPrivate::bad;
00185         throw Error(errno, Text("VestaLog::open got \"") +
00186                     Basics::errno_Text(errno) + "\" opening " + lfn);
00187     }
00188 
00189     // Open the backup logfile if there is a backup
00190     vlp->fd2 = -1;
00191     if (vlp->directory2) {
00192       // Check if the primary was 0-length; if so, ok to create the backup
00193       struct stat statbuf;
00194       if (fstat(vlp->fd, &statbuf) != 0) {
00195         vlp->state = VestaLogPrivate::bad;
00196         throw VestaLog::Error(errno, Text("VestaLog::open got \"")+
00197                               Basics::errno_Text(errno) + "\" on fstat");
00198       }
00199       Text lfn2 =
00200         Text(vlp->directory2) + PathnameSep + vstr + LogExtension;
00201       vlp->fd2 = ::open(lfn2.cchars(), O_RDWR |
00202                         (statbuf.st_size == 0 ? O_CREAT | O_TRUNC : 0),
00203                         LOG_PROT);
00204       if (vlp->fd2 == -1) {
00205         vlp->state = VestaLogPrivate::bad;
00206         throw Error(errno, Text("VestaLog::open got \"") +
00207                     Basics::errno_Text(errno) + "\" opening " + lfn2);
00208       }
00209     }
00210 
00211     // Initialize for reading
00212     vlp->curSeq = vlp->curLen = vlp->nextSeq = vlp->nextPhy = 0;
00213     vlp->hitEOF = vlp->usePocket = false;
00214     vlp->commSeq = vlp->commPhy = vlp->commPocketPhy = 0;
00215     vlp->commUsePocket = false;
00216     vlp->cur = vlp->last = vlp->pocket = vlp->free = NULL;
00217     vlp->nesting = 0;
00218 
00219     // Ready to go
00220     vlp->state = VestaLogPrivate::recovering;
00221     vlp->checkpointing = false;
00222 }
00223 
00224 // Return the version number of the logfile currently open.
00225 int VestaLog::logVersion() throw (VestaLog::Error)
00226 {
00227     assert(vlp->state != VestaLogPrivate::initial);
00228     assert(vlp->state != VestaLogPrivate::bad);
00229     return vlp->version;
00230 }
00231 
00232 // Open the checkpoint that this log starts from.  
00233 // NULL for the first log.
00234 fstream* VestaLog::openCheckpoint() throw (VestaLog::Error)
00235 {
00236     assert(vlp->state == VestaLogPrivate::recovering);
00237     if (vlp->version == 0) return NULL;
00238     char vstr[16];
00239     sprintf(vstr, "%d", vlp->version);
00240     Text cfn = Text(vlp->directory) + PathnameSep + vstr + CheckpointExtension;
00241     fstream* ret = NEW_CONSTR(fstream, (cfn.cchars(), ios::in));
00242     if (ret->fail()) {
00243         vlp->state = VestaLogPrivate::bad;
00244         throw Error(errno, Text("VestaLog::openCheckpoint got \"") +
00245                     Basics::errno_Text(errno) + "\" opening " + cfn);
00246     } else {
00247         return ret;
00248     }
00249 }
00250 
00251 void VestaLog::get(char& c) throw (VestaLog::Eof, VestaLog::Error)
00252 {
00253     assert(vlp->state == VestaLogPrivate::recovering);
00254     
00255     vlp->makeBytesAvail();
00256     c = vlp->cur->data->bytes[vlp->curLen++];
00257 }
00258 
00259 inline static Bit32 HashSeq(int seq)
00260 {
00261     return ((unsigned int) seq + 12345u) * 715827881u;
00262 }
00263 
00264 // Common routine used by makeBytesAvail and extendCur to read the
00265 // next block.  Returns NULL if we hit EOF.  If log has a backup,
00266 // reads both copies and checks that they match; if they don't, the
00267 // log effectively ends here.
00268 VLogBlock* VestaLogPrivate::readBlock() throw(VestaLog::Error)
00269 {
00270   // Read primary
00271   VLogBlock* block = balloc();
00272   ssize_t res = ::read(fd, (char*) block->data, DiskBlockSize);
00273   if (res < DiskBlockSize) {
00274     bfree(block);
00275     if (res != 0) {
00276       // Error reading primary
00277       state = bad;
00278       throw
00279         VestaLog::Error(errno, Text("VestaLog::readBlock got \"") +
00280                         Basics::errno_Text(errno) + "\" on read" +
00281                         (fd2 == -1 ? "" : " from primary"));
00282     }
00283     // Hit end of file on primary
00284     block = NULL;
00285   } else {
00286     // Read primary OK
00287     assert(res == DiskBlockSize);
00288   }
00289 
00290   // Done if no backup
00291   if (fd2 == -1) {
00292     if (block == NULL) hitEOF = true;
00293     return block;
00294   }
00295 
00296   // Read backup
00297   VLogBlock* block2 = balloc();
00298   res = ::read(fd2, (char*) block2->data, DiskBlockSize);
00299   if (res < DiskBlockSize) {
00300     bfree(block2);
00301     if (res != 0) {
00302       // Error reading backup
00303       bfree(block);
00304       state = bad;
00305       throw
00306         VestaLog::Error(errno, Text("VestaLog::readBlock got \"") +
00307                         Basics::errno_Text(errno) + "\" on read from backup");
00308     }
00309     // Hit end of file on backup
00310     block2 = NULL;
00311   } else {
00312     // Read backup OK
00313     assert(res == DiskBlockSize);
00314   }
00315 
00316   bool invalidate = false;
00317   if (block == NULL) {
00318     // Hit EOF on primary
00319     if (block2 == NULL) {
00320       // Hit EOF on both primary and backup
00321       hitEOF = true;
00322     } else {
00323       // Hit EOF on primary only; treat as invalid block
00324       block = block2;
00325       invalidate = true;
00326     }
00327   } else {
00328     // Read primary OK
00329     if (block2 == NULL) {
00330       // Hit EOF on backup only; treat as invalid block
00331       invalidate = true;
00332     } else {
00333       // Read both logs OK; check if they match
00334       if (block->data->getSeq() != block2->data->getSeq() ||
00335           block->data->getLen() != block2->data->getLen() ||
00336           block->data->getVer() != block2->data->getVer()) {
00337         // Blocks do not match; treat as invalid block
00338         invalidate = true;
00339       } else {
00340         // All is well
00341       }
00342       bfree(block2);
00343     }
00344   }
00345 
00346   if (invalidate) {
00347     block->data->setSeq(HashSeq((Bit32) 0xffffffff));
00348     block->data->setLen((Bit16) 0xffff);
00349     block->data->setVer((Bit16) 0xffff);
00350   }
00351   return block;
00352 }
00353 
00354 void VestaLogPrivate::extendCur() throw(VestaLog::Eof, VestaLog::Error)
00355 {
00356     // Extend the cur chain by one block, if possible
00357     // Precondition: on entry, there is a block in pocket unless we
00358     // have already hit EOF.
00359 
00360     VLogBlock* block = NULL;
00361 
00362     if (!hitEOF) {
00363       // Need to read a block
00364       block = readBlock();
00365       if (block) {
00366         block->phy = nextPhy++;
00367       } else {
00368         hitEOF = true;
00369       }
00370     }
00371     Bit32 hNextSeq = HashSeq(nextSeq);
00372 
00373     if (block == NULL) {
00374         if (pocket == NULL) {
00375             throw VestaLog::Eof();
00376         }
00377         // Only one block to consider
00378         if (pocket->data->getSeq() != hNextSeq) {
00379             // This is not the block we need
00380             throw VestaLog::Eof();
00381         }
00382         block = pocket;
00383         pocket = NULL;
00384         block->pocketPhy = nextPhy;
00385     } else if (pocket == NULL) {
00386         // Only one block to consider
00387         if (block->data->getSeq() != hNextSeq) {
00388             bfree(block);  // forget we read this block
00389             nextPhy--;
00390             hitEOF = true;
00391             throw VestaLog::Eof();
00392         }
00393         block->pocketPhy = nextPhy;
00394     } else {
00395         // Two blocks to consider
00396         if (block->data->getSeq() == hNextSeq) {
00397             // block is a version of the block we need
00398             if (pocket->data->getSeq() == hNextSeq) {
00399                 // Both are versions of the block we need
00400                 if (pocket->data->getVer() ==
00401                     ((block->data->getVer() + 1) % 4)) {
00402                     // Pocket block is current; swap
00403                     VLogBlock* temp = pocket;
00404                     pocket = block;
00405                     block = temp;
00406                 }
00407             }
00408         } else {
00409             // block is not the block we need
00410             if (pocket->data->getSeq() == hNextSeq) {
00411                 // pocket is the block we need
00412                 // Need to swap blocks
00413                 VLogBlock* temp = pocket;
00414                 pocket = block;
00415                 block = temp;
00416             } else {
00417                 // Neither is the block we need
00418                 bfree(block);  // forget we read this block
00419                 nextPhy--;
00420                 hitEOF = true;
00421                 throw VestaLog::Eof();
00422             }
00423         }
00424         block->pocketPhy = pocket->phy;
00425     }
00426 
00427     // Link block into the chain
00428     block->next = NULL;
00429     if (last == NULL) {
00430         cur = last = block;
00431     } else {
00432         last->next = block;
00433         last = block;
00434     }
00435     block->tailCommitted = false;
00436     nextSeq++;
00437 }
00438 
00439 void VestaLogPrivate::makeBytesAvail() throw (VestaLog::Eof, VestaLog::Error)
00440 {
00441     // Get a block in pocket if possible
00442     if (pocket == NULL && !hitEOF) {
00443       pocket = readBlock();
00444       if (pocket) {
00445         pocket->phy = nextPhy++;
00446         pocket->pocketPhy = -1;  // not meaningful yet
00447       } else {
00448         hitEOF = true;
00449       }
00450     }
00451 
00452     // Discard fully read block if any
00453     if (cur != NULL && curLen >= sizeof(cur->data->bytes)) {
00454         VLogBlock* temp = cur;
00455         cur = cur->next;
00456         if (cur == NULL) last = NULL;
00457         if (temp->tailCommitted && cur != NULL && cur->data->getLen() == 0) {
00458             cur->tailCommitted = true;
00459         }
00460         bfree(temp);
00461         curSeq++;
00462         curLen = 0;
00463     }
00464     
00465     // Get a block to read from if none
00466     if (cur == NULL) {
00467         extendCur();
00468     }
00469     
00470     // If necessary, look ahead to see whether bytes are committed
00471     if (!cur->tailCommitted &&
00472         curLen >= cur->data->getLen()) {
00473         VLogBlock* block = cur;
00474         do {
00475             block = block->next;
00476             if (block == NULL) {
00477                 extendCur();
00478                 block = last;
00479             }
00480         } while (block->data->getLen() == 0);
00481         // if we get here, the bytes are committed
00482         cur->tailCommitted = true;
00483     }
00484 }
00485 
00486 
00487 void VestaLog::get(char* p, int n, char term)
00488   throw (VestaLog::Eof, VestaLog::Error)
00489 {
00490     assert(vlp->state == VestaLogPrivate::recovering);
00491     
00492     do {
00493         vlp->makeBytesAvail();
00494 
00495         // Return as many bytes from this block as we can
00496         while (n > 1
00497                && (vlp->cur->tailCommitted
00498                    || vlp->curLen < vlp->cur->data->getLen())
00499                && (vlp->curLen < sizeof(vlp->cur->data->bytes))) {
00500             *p = vlp->cur->data->bytes[vlp->curLen];
00501             if (*p == term) break;
00502             vlp->curLen++;
00503             p++;
00504             n--;
00505         }
00506         
00507     } while (n > 1 && *p != term);
00508     
00509     *p = '\0';
00510 }
00511 
00512 int VestaLog::read(char* p, int n) throw (VestaLog::Error)
00513 {
00514     assert(vlp->state == VestaLogPrivate::recovering);
00515     
00516     int count = 0;
00517     do {
00518         try {
00519             vlp->makeBytesAvail();
00520         } catch (Eof) {
00521             return count;
00522         }
00523 
00524         // Return as many bytes from this block as we can
00525         while (count < n
00526                && (vlp->cur->tailCommitted
00527                    || vlp->curLen < vlp->cur->data->getLen())
00528                && (vlp->curLen < sizeof(vlp->cur->data->bytes))) {
00529             *p = vlp->cur->data->bytes[vlp->curLen];
00530             vlp->curLen++;
00531             p++;
00532             count++;
00533         }
00534         
00535     } while (count < n);
00536 
00537     return count;
00538 }
00539 
00540 void VestaLog::readAll(char* p, int n) throw (VestaLog::Eof, VestaLog::Error)
00541 {
00542     assert(vlp->state == VestaLogPrivate::recovering);
00543     
00544     int count = 0;
00545     do {
00546         vlp->makeBytesAvail();
00547 
00548         // Return as many bytes from this block as we can
00549         while (count < n
00550                && (vlp->cur->tailCommitted
00551                    || vlp->curLen < vlp->cur->data->getLen())
00552                && (vlp->curLen < sizeof(vlp->cur->data->bytes))) {
00553             *p = vlp->cur->data->bytes[vlp->curLen];
00554             vlp->curLen++;
00555             p++;
00556             count++;
00557         }
00558         
00559     } while (count < n);
00560 }
00561 
00562 bool VestaLog::eof() throw (VestaLog::Error)
00563 {
00564     assert(vlp->state == VestaLogPrivate::recovering);
00565     try {
00566         vlp->makeBytesAvail();
00567     } catch (Eof) {
00568         return true;
00569     }
00570     return false;
00571 }
00572 
00573 void VestaLogPrivate::eraseUncommitted(int fd) throw (VestaLog::Error)
00574 {
00575     // If there are any blocks in the file beyond the last one that
00576     // contains valid log data, overwrite them with invalid log
00577     // sequence numbers.  This is needed because some of the blocks
00578     // could contain valid sequence numbers left from a write that was
00579     // in progress at the time of the last crash or abort.  We need to
00580     // make sure these are not made to look like valid blocks by the
00581     // new writes we do.  If we had a block left in pocket, we
00582     // must overwrite it too.
00583     //
00584         
00585     VLogBlock* inval = balloc();
00586     inval->data->setSeq(HashSeq((Bit32) 0xffffffff));
00587     inval->data->setLen((Bit16) 0xffff);
00588     inval->data->setVer((Bit16) 0xffff);
00589 
00590     struct stat statbuf;
00591     if (fstat(fd, &statbuf) != 0) {
00592         state = bad;
00593         throw VestaLog::Error(errno, Text("VestaLog::eraseUncommitted got \"")+
00594                               Basics::errno_Text(errno) + "\" on fstat");
00595     }
00596     int clearstart = (cur->phy + (usePocket ? 1 : 0)) * DiskBlockSize;
00597     if (usePocket && cur->pocketPhy * DiskBlockSize < clearstart) {
00598         (void) lseek(fd, cur->pocketPhy * DiskBlockSize, SEEK_SET);
00599         ssize_t res =
00600           ::write(fd, (const char*) inval->data, DiskBlockSize);
00601         if (res != DiskBlockSize) {
00602             state = bad;
00603             throw VestaLog::Error(errno,
00604                                   Text("VestaLog::eraseUncommitted got \"") +
00605                                   Basics::errno_Text(errno) + "\" on write");
00606         }
00607     }
00608     (void) lseek(fd, clearstart, SEEK_SET);
00609     while (clearstart < statbuf.st_size) {
00610         ssize_t res =
00611           ::write(fd, (const char*) inval->data, DiskBlockSize);
00612         if (res != DiskBlockSize) {
00613             state = bad;
00614             throw VestaLog::Error(errno,
00615                                   Text("VestaLog::eraseUncommitted got \"") +
00616                                   Basics::errno_Text(errno) + "\" on write");
00617         }
00618         clearstart += DiskBlockSize;
00619     }
00620     bfree(inval);
00621     if (fsync(fd) != 0) {
00622         state = bad;
00623         throw VestaLog::Error(errno, Text("VestaLog::eraseUncommitted got \"")+
00624                               Basics::errno_Text(errno) + "\" on fsync");
00625     }
00626 }
00627 
00628 bool VestaLog::nextLog() throw (VestaLog::Error)
00629 {
00630     // state: recovering -> (recovering | recovered)
00631     assert(vlp->state == VestaLogPrivate::recovering);
00632     assert(vlp->hitEOF);  // recovery has to have read everything
00633     assert(vlp->cur == NULL || !vlp->cur->tailCommitted);
00634 
00635     // open next log file if it exists
00636     char vstr[16];
00637     sprintf(vstr, "%d", vlp->version + 1);
00638     Text lfn = Text(vlp->directory) + PathnameSep + vstr + LogExtension;
00639     int fd =
00640       ::open(lfn.cchars(), (vlp->readonly ? O_RDONLY : O_RDWR), LOG_PROT);
00641     if (fd == -1) {
00642         if (errno == ENOENT) {
00643             vlp->state = VestaLogPrivate::recovered;
00644             return false;  // no more logs
00645         } else {
00646             vlp->state = VestaLogPrivate::bad;
00647             throw Error(errno, Text("VestaLog::nextLog got \"") +
00648                         Basics::errno_Text(errno) + "\" opening " + lfn);
00649         }
00650     }
00651 
00652     // Open the next backup logfile if there is a backup
00653     int fd2 = -1;
00654     if (vlp->directory2) {
00655       // Check if the primary was 0-length; if so, ok to create the backup
00656       struct stat statbuf;
00657       if (fstat(fd, &statbuf) != 0) {
00658         vlp->state = VestaLogPrivate::bad;
00659         throw VestaLog::Error(errno, Text("VestaLog::nextLog got \"")+
00660                               Basics::errno_Text(errno) + "\" on fstat");
00661       }
00662       Text lfn2 = Text(vlp->directory2) + PathnameSep + vstr + LogExtension;
00663 
00664       fd2 = ::open(lfn2.cchars(), (vlp->readonly ? O_RDONLY : O_RDWR),
00665                    (statbuf.st_size == 0 ? O_CREAT | O_TRUNC : 0), LOG_PROT);
00666       if (fd2 == -1) {
00667         vlp->state = VestaLogPrivate::bad;
00668         throw Error(errno, Text("VestaLog::nextLog got \"") +
00669                     Basics::errno_Text(errno) + "\" opening " + lfn2);
00670       }
00671     }
00672 
00673     // copied from close()
00674     while (vlp->cur != NULL) {
00675         VLogBlock* temp = vlp->cur->next;
00676         delete vlp->cur;
00677         vlp->cur = temp;
00678     }
00679     vlp->last = NULL;
00680     if (vlp->pocket != NULL) {
00681         delete vlp->pocket;
00682         vlp->pocket = NULL;
00683     }
00684     while (vlp->free != NULL) {
00685         VLogBlock* temp = vlp->free->next;
00686         delete vlp->free;
00687         vlp->free = temp;
00688     }
00689 
00690     (void) ::close(vlp->fd);
00691     vlp->fd = fd;
00692     vlp->version++;
00693 
00694     if (vlp->directory2) {
00695       (void) ::close(vlp->fd2);
00696       vlp->fd2 = fd2;
00697     }      
00698 
00699     // Initialize for reading
00700     vlp->curSeq = vlp->curLen = vlp->nextSeq = vlp->nextPhy = 0;
00701     vlp->hitEOF = vlp->usePocket = false;
00702     vlp->commSeq = vlp->commPhy = vlp->commPocketPhy = 0;
00703     vlp->commUsePocket = false;
00704     vlp->cur = vlp->last = vlp->pocket = vlp->free = NULL;
00705     vlp->nesting = 0;
00706 
00707     return true;
00708 }
00709 
00710 void VestaLog::loggingBegin() throw (VestaLog::Error)
00711 {
00712     // state: recovered -> ready
00713     assert(vlp->state == VestaLogPrivate::recovered);
00714     assert(vlp->hitEOF);  // recovery has to have read everything
00715     assert(vlp->cur == NULL || !vlp->cur->tailCommitted);
00716     
00717     // Initialize for writing.
00718     assert(!vlp->readonly);
00719 
00720     // Note: vlp->curLen > 0 implies a partial version of the current 
00721     // logical block is already on disk, in which case vlp->cur is a
00722     // copy of it.   Otherwise the current logical block is empty and
00723     // not yet on disk, in which case cur may be NULL or may contain a
00724     // garbage block. 
00725 
00726     // Establish new invariants on cur->phy and cur->pocketPhy.  Now
00727     // cur->phy will be one possible physical address for the current
00728     // logical block, and it will be greater than or equal to the
00729     // highest physical address that contains a valid block.
00730     // cur->pocketPhy will be the other possible physical address for
00731     // the current logical block, and cur->pocketPhy < cur->phy.
00732     // Also, set vlp->usePocket = true if cur->phy contains a valid
00733     // copy of the block (i.e., if it must not be overwritten).
00734     //
00735     if (vlp->curLen > 0) {
00736         // The current logical block is already partially written on
00737         // disk, and vlp->cur is a copy of the latest version of it.
00738         assert(vlp->cur != NULL);
00739         assert(vlp->cur->pocketPhy >= 0);
00740         if (vlp->cur->pocketPhy >= vlp->cur->phy) {
00741             int temp = vlp->cur->phy;
00742             vlp->cur->phy = vlp->cur->pocketPhy;
00743             vlp->cur->pocketPhy = temp;
00744             vlp->usePocket = false; // pocketPhy is now where block came from
00745         } else {
00746             vlp->usePocket = true;  // phy is still where block came from
00747         }
00748     } else {
00749         // The current logical block is empty.  Either vlp->cur is
00750         // NULL or it is a block of uncommitted data.
00751         if (vlp->cur == NULL) {
00752             vlp->cur = vlp->balloc();
00753             if (vlp->pocket == NULL) {
00754                 vlp->cur->pocketPhy = vlp->nextPhy;
00755                 vlp->cur->phy = vlp->nextPhy + 1;
00756             } else {
00757                 vlp->cur->pocketPhy = vlp->pocket->phy;
00758                 vlp->cur->phy = vlp->nextPhy;
00759             }
00760             vlp->usePocket = true;  // always use lower block first
00761         } else {
00762             assert(vlp->cur->pocketPhy >= 0);
00763             if (vlp->cur->pocketPhy >= vlp->cur->phy) {
00764                 int temp = vlp->cur->phy;
00765                 vlp->cur->phy = vlp->cur->pocketPhy;
00766                 vlp->cur->pocketPhy = temp;
00767             }
00768             vlp->usePocket = true;  // always use lower block first
00769         }
00770         vlp->cur->data->setSeq(HashSeq(vlp->curSeq));
00771         vlp->cur->data->setLen(0);
00772     }
00773     assert(vlp->cur->pocketPhy < vlp->cur->phy);
00774 
00775     // Erase leftover uncommitted data from before crash
00776     vlp->eraseUncommitted(vlp->fd);
00777     if (vlp->fd2 != -1) {
00778       vlp->eraseUncommitted(vlp->fd2);
00779     }
00780 
00781     // Free probably-unneeded block buffers
00782     while (vlp->cur->next != NULL) {
00783         VLogBlock* temp = vlp->cur->next->next;
00784         delete vlp->cur->next;
00785         vlp->cur->next = temp;
00786     }
00787     vlp->last = NULL;
00788     if (vlp->pocket != NULL) {
00789         delete vlp->pocket;
00790         vlp->pocket = NULL;
00791     }
00792     while (vlp->free != NULL) {
00793         VLogBlock* temp = vlp->free->next;
00794         delete vlp->free;
00795         vlp->free = temp;
00796     }
00797 
00798     // Save info to allow abort()
00799     vlp->commSeq = vlp->curSeq;
00800     vlp->commPhy = vlp->cur->phy;
00801     vlp->commPocketPhy = vlp->cur->pocketPhy;
00802     vlp->commUsePocket = vlp->usePocket;
00803 
00804     vlp->state = VestaLogPrivate::ready;
00805 }
00806 
00807 void VestaLog::start() throw (VestaLog::Error)
00808 {
00809     // Start a record, or increment start nesting level
00810     // state: ready -> logging
00811     //        logging -> logging
00812     if (vlp->state == VestaLogPrivate::ready) {
00813         vlp->nesting = 1;
00814         vlp->state = VestaLogPrivate::logging;
00815     } else {
00816         assert(vlp->state == VestaLogPrivate::logging);
00817         vlp->nesting++;
00818     }
00819 }
00820 
00821 int VestaLog::nesting() throw ()
00822 {
00823     // Return the nesting level
00824     // state: *
00825     return vlp->nesting;
00826 }
00827 
00828 void VestaLogPrivate::writeCur() throw (VestaLog::Error)
00829 {
00830     cur->data->setVer(cur->data->getVer() + 1);
00831     long byteAddr;
00832     if (usePocket) {
00833         byteAddr = ((long) cur->pocketPhy) * ((long) DiskBlockSize);
00834     } else {
00835         byteAddr = ((long) cur->phy) * ((long) DiskBlockSize);
00836     }
00837     (void) lseek(fd, byteAddr, SEEK_SET);
00838     ssize_t res = ::write(fd, (const char*) cur->data, DiskBlockSize);
00839     if (res != DiskBlockSize) {
00840         state = bad;
00841         throw VestaLog::Error(errno, Text("VestaLogPrivate::writeCur got \"") +
00842                               Basics::errno_Text(errno) + "\" on write" +
00843                               (fd2 == -1 ? "" : " + to primary"));
00844     }
00845     if (fd2 != -1) {
00846       (void) lseek(fd2, byteAddr, SEEK_SET);
00847       res = ::write(fd2, (const char*) cur->data, DiskBlockSize);
00848       if (res != DiskBlockSize) {
00849         state = bad;
00850         throw VestaLog::Error(errno, Text("VestaLogPrivate::writeCur got \"") +
00851                               Basics::errno_Text(errno) +
00852                               "\" on write to backup");
00853       }
00854     }
00855 }
00856 
00857 void VestaLogPrivate::makeSpaceAvail() throw (VestaLog::Error)
00858 {
00859     // Make space to write in cur buffer
00860     assert(curLen >= sizeof(cur->data->bytes));
00861     
00862     // Write out the full block
00863     writeCur();
00864 
00865     // Prepare cur to receive next block
00866     curSeq++;
00867     curLen = 0;
00868     if (usePocket) {
00869         // writeCur() used cur->pocketPhy
00870         // cur->phy is still available
00871         cur->pocketPhy = cur->phy;
00872         cur->phy++;
00873     } else {
00874         // writeCur() used cur->phy 
00875         // cur->pocketPhy is still available
00876         cur->phy++;
00877     }
00878     // Do not overwrite the block holding the previous stable commit
00879     usePocket = (bool) (cur->pocketPhy !=
00880                            (commUsePocket ? commPhy : commPocketPhy));
00881 
00882     cur->data->setSeq(HashSeq(curSeq));
00883     cur->data->setLen(0);
00884     cur->data->setVer(0);
00885 }
00886 
00887 void VestaLog::put(char c) throw (VestaLog::Error)
00888 {
00889     // state: logging
00890     assert(vlp->state == VestaLogPrivate::logging);
00891     if (vlp->curLen >= sizeof(vlp->cur->data->bytes)) {
00892         vlp->makeSpaceAvail();
00893     }
00894     vlp->cur->data->bytes[vlp->curLen++] = c;
00895 }
00896 
00897 void VestaLog::put(const char* p) throw (VestaLog::Error)
00898 {
00899     // Put null-terminated string
00900     // state: logging
00901     assert(vlp->state == VestaLogPrivate::logging);
00902     while (*p != '\0') {
00903         if (vlp->curLen >= sizeof(vlp->cur->data->bytes)) {
00904             vlp->makeSpaceAvail();
00905         }
00906         vlp->cur->data->bytes[vlp->curLen++] = *p++;
00907     }
00908 }
00909 
00910 void VestaLog::write(const char* p, int n) throw (VestaLog::Error)
00911 {
00912     // state: logging
00913     assert(vlp->state == VestaLogPrivate::logging);
00914     int count = 0;
00915     while (count < n) {
00916         if (vlp->curLen >= sizeof(vlp->cur->data->bytes)) {
00917             vlp->makeSpaceAvail();
00918         }
00919         vlp->cur->data->bytes[vlp->curLen++] = *p++;
00920         count++;
00921     }
00922 }
00923 
00924 void VestaLog::commit() throw (VestaLog::Error)
00925 {
00926     // Commit the current record
00927     // state: logging -> ready or logging
00928     assert(vlp->state == VestaLogPrivate::logging);
00929 
00930     if (--vlp->nesting > 0) {
00931         return;
00932     }
00933 
00934     vlp->cur->data->setLen(vlp->curLen);
00935     vlp->writeCur();
00936     vlp->usePocket = (bool) !vlp->usePocket;
00937     if (fsync(vlp->fd) != 0) {
00938         vlp->state = VestaLogPrivate::bad;
00939         throw VestaLog::Error(errno, Text("VestaLog::commit got \"") +
00940                               Basics::errno_Text(errno) + "\" on fsync" +
00941                               (vlp->fd2 == -1 ? "" : " of primary"));
00942     }
00943     if (vlp->fd2 != -1 && fsync(vlp->fd2) != 0) {
00944         vlp->state = VestaLogPrivate::bad;
00945         throw VestaLog::Error(errno, Text("VestaLog::commit got \"") +
00946                               Basics::errno_Text(errno) + "\" on fsync of backup");
00947     }
00948 
00949     // Save info to allow abort() and to prevent this block from
00950     //  being overwritten until after the next commit
00951     vlp->commSeq = vlp->curSeq;
00952     vlp->commPhy = vlp->cur->phy;
00953     vlp->commPocketPhy = vlp->cur->pocketPhy;
00954     vlp->commUsePocket = vlp->usePocket;
00955 
00956     vlp->state = VestaLogPrivate::ready;
00957 }
00958 
00959 void VestaLog::abort() throw (VestaLog::Error)
00960 {
00961     // Abort the current record
00962     // state: logging -> ready
00963     assert(vlp->state == VestaLogPrivate::logging);
00964 
00965     if (vlp->curSeq != vlp->commSeq) {
00966         // Buffer has a new block in it; need to get back old one
00967         long byteAddr;
00968         if (!vlp->commUsePocket) {
00969             // pocketPhy was used last
00970             byteAddr =
00971               ((long) vlp->commPocketPhy) * ((long) DiskBlockSize);
00972         } else {
00973             // phy was used last
00974             byteAddr =
00975               ((long) vlp->commPhy) * ((long) DiskBlockSize);
00976         }
00977         (void) lseek(vlp->fd, byteAddr, SEEK_SET);
00978         ssize_t res = ::read(vlp->fd, (char*) vlp->cur->data,
00979                            DiskBlockSize);
00980         if (res < DiskBlockSize) {
00981             vlp->state = VestaLogPrivate::bad;
00982             throw VestaLog::Error(errno, Text("VestaLog::abort got \"") +
00983                                   Basics::errno_Text(errno) + "\" on read");
00984         }
00985         vlp->curSeq = vlp->commSeq;
00986         vlp->cur->phy = vlp->commPhy;
00987         vlp->cur->pocketPhy = vlp->commPocketPhy;
00988         vlp->usePocket = vlp->commUsePocket;
00989     }
00990     vlp->curLen = vlp->cur->data->getLen();
00991 
00992     // Erase uncommitted blocks
00993     vlp->eraseUncommitted(vlp->fd);
00994     if (vlp->fd2 != -1) {
00995       vlp->eraseUncommitted(vlp->fd2);
00996     }
00997 
00998     vlp->nesting = 0;
00999     vlp->state = VestaLogPrivate::ready;
01000 }
01001 
01002 fstream* VestaLog::checkpointBegin(ios::openmode mode)
01003      throw(VestaLog::Error)
01004 {
01005     // state: ready, !checkpointing -> ready, checkpointing
01006     assert(vlp->state == VestaLogPrivate::ready);
01007     assert(!vlp->checkpointing);
01008 
01009     // Clean up any uncommitted checkpoints.
01010     int ver;
01011     for (ver = vlp->ccVersion + 1; ver <= vlp->version + 1; ver++) {
01012       char vstr[16];
01013       sprintf(vstr, "%d", ver);
01014       Text cfn =
01015         Text(vlp->directory) + PathnameSep + vstr + CheckpointExtension;
01016       (void) ::unlink(cfn.cchars());
01017     }
01018 
01019     if (vlp->bakckp) {
01020       // Clean up any uncommitted checkpoints in backup.
01021       for (ver = vlp->ccVersion2 + 1; ver <= vlp->version + 1; ver++) {
01022         char vstr[16];
01023         sprintf(vstr, "%d", ver);
01024         Text cfn2 =
01025           Text(vlp->directory2) + PathnameSep + vstr + CheckpointExtension;
01026         (void) ::unlink(cfn2.cchars());
01027       }
01028     }
01029 
01030     // Open a file to receive a new checkpoint
01031     char vstr[16];
01032     sprintf(vstr, "%d", vlp->version + 1);
01033     Text cfn = Text(vlp->directory) + PathnameSep + vstr + CheckpointExtension;
01034     // If the checkpoint file doesn;t exist, create it with the right
01035     // permissions.
01036     if(!FS::Exists(cfn))
01037       {
01038         try
01039           {
01040             FS::Touch(cfn, LOG_PROT, false);
01041           }
01042         catch(FS::Failure f)
01043           {
01044             throw VestaLog::Error(f.get_errno(),
01045                                   Text("VestaLog::checkpointBegin got \"") +
01046                                   Basics::errno_Text(f.get_errno()) +
01047                                   "\" creating " + cfn);
01048           }
01049       }
01050     fstream* ret = NEW_CONSTR(fstream, (cfn.cchars(), mode));
01051     if (!ret->good()) {
01052         vlp->state = VestaLogPrivate::bad;
01053         throw VestaLog::Error(errno, Text("VestaLog::checkpointBegin got \"") +
01054                               Basics::errno_Text(errno) + "\" opening " + cfn);
01055     }
01056 
01057     // Start a new log, preserving the old one
01058     if (::close(vlp->fd) != 0) {
01059         vlp->state = VestaLogPrivate::bad;
01060         throw VestaLog::Error(errno, Text("VestaLog::checkpointBegin got \"") +
01061                               Basics::errno_Text(errno) + "\" on close" +
01062                               (vlp->fd2 == -1 ? "" : " of primary"));
01063     }
01064     Text lfn = Text(vlp->directory) + PathnameSep + vstr + LogExtension;
01065     vlp->fd = ::open(lfn.cchars(), O_RDWR | O_CREAT | O_TRUNC, LOG_PROT);
01066     if (vlp->fd == -1) {
01067         vlp->state = VestaLogPrivate::bad;
01068         throw VestaLog::Error(errno, Text("VestaLog::checkpointBegin got \"") +
01069                               Basics::errno_Text(errno) + "\" creating " + lfn);
01070     }
01071     vlp->version++;
01072 
01073     // Create backup log if there is a backup
01074     if (vlp->fd2 != -1) {
01075       if (::close(vlp->fd2) != 0) {
01076         vlp->state = VestaLogPrivate::bad;
01077         throw VestaLog::Error(errno, Text("VestaLog::checkpointBegin got \"") +
01078                               Basics::errno_Text(errno) + "\" on close of backup");
01079       }
01080       Text lfn2 = Text(vlp->directory2) + PathnameSep + vstr + LogExtension;
01081       vlp->fd2 = ::open(lfn2.cchars(), O_RDWR | O_CREAT | O_TRUNC, LOG_PROT);
01082       if (vlp->fd2 == -1) {
01083         vlp->state = VestaLogPrivate::bad;
01084         throw VestaLog::Error(errno, Text("VestaLog::checkpointBegin got \"") +
01085                               Basics::errno_Text(errno) + "\" creating " + lfn2);
01086       }
01087     }
01088 
01089     // Initialize for writing next log
01090     if (vlp->cur == NULL) {
01091         vlp->cur = vlp->balloc();
01092     }
01093     vlp->curSeq = vlp->curLen = 0;
01094     vlp->cur->pocketPhy = 0;
01095     vlp->cur->phy = 1;
01096     vlp->usePocket = true;
01097     vlp->cur->data->setSeq(HashSeq(0));
01098     vlp->cur->data->setLen(0);
01099 
01100     // Save info to allow abort()
01101     vlp->commSeq = vlp->curSeq;
01102     vlp->commPhy = vlp->cur->phy;
01103     vlp->commPocketPhy = vlp->cur->pocketPhy;
01104     vlp->commUsePocket = vlp->usePocket;
01105 
01106     vlp->checkpointing = true;
01107     return ret;
01108 }
01109 
01110 
01111 void VestaLog::checkpointEnd()
01112      throw(VestaLog::Error)
01113 {
01114     // state: ready, checkpointing -> ready, !checkpointing
01115     assert(vlp->state == VestaLogPrivate::ready);
01116     assert(vlp->checkpointing);
01117     
01118     // Commit the current checkpoint
01119     Text nvfn = Text(vlp->directory) + PathnameSep + NewVersionFileName;
01120     FS::OFdStream vf(nvfn.cchars());
01121     vf << vlp->version << endl;
01122     if (fsync(vf.fd()) != 0) {
01123         vf.close();
01124         vlp->state = VestaLogPrivate::bad;
01125         throw VestaLog::Error(errno, Text("VestaLog::checkpointEnd got \"") +
01126                               Basics::errno_Text(errno) + "\" on fsync");
01127     }
01128     if (!vf.good()) {
01129         vf.close();
01130         vlp->state = VestaLogPrivate::bad;
01131         throw VestaLog::Error(errno, Text("VestaLog::checkpointEnd got \"") +
01132                               Basics::errno_Text(errno) + "\" writing(?)");
01133     }
01134     vf.close();
01135     Text vfn = Text(vlp->directory) + PathnameSep + VersionFileName;
01136     if (rename(nvfn.cchars(), vfn.cchars()) != 0) {
01137         vlp->state = VestaLogPrivate::bad;
01138         throw VestaLog::Error(errno, Text("VestaLog::checkpointEnd got \"") +
01139                               Basics::errno_Text(errno) + "\" renaming \"" +
01140                               nvfn + "\" to \"" + vfn +"\"");
01141     }
01142     vlp->ccVersion = vlp->version;
01143 
01144     // Back up the checkpoint file if requested
01145     if (vlp->bakckp) {
01146       // Open the new primary checkpoint
01147       char vstr[16];
01148       sprintf(vstr, "%d", vlp->version);
01149       Text cfn =
01150         Text(vlp->directory) + PathnameSep + vstr + CheckpointExtension;
01151       fstream* cstream = NEW_CONSTR(fstream, (cfn.cchars(), ios::in));
01152       if (cstream->fail()) {
01153         vlp->state = VestaLogPrivate::bad;
01154         throw Error(errno, Text("VestaLog::checkpointEnd got \"") +
01155                     Basics::errno_Text(errno) + "\" opening " + cfn);
01156       }
01157 
01158       // Create the new backup checkpoint
01159       Text bfn =
01160         Text(vlp->directory2) + PathnameSep + vstr + CheckpointExtension;
01161       FS::FdStream* bstream = NEW_CONSTR(FS::FdStream, (bfn.cchars(), ios::out, LOG_PROT));
01162       if (bstream->fail()) {
01163         vlp->state = VestaLogPrivate::bad;
01164         throw Error(errno, Text("VestaLog::checkpointEnd got \"") +
01165                     Basics::errno_Text(errno) + "\" opening " + bfn);
01166       }
01167 
01168       // Copy the data
01169       do {
01170         char buf[COPY_SIZE];
01171         cstream->read(buf, COPY_SIZE);
01172         int count = cstream->gcount();
01173         if (cstream->bad()) {
01174           vlp->state = VestaLogPrivate::bad;
01175           throw Error(errno, Text("VestaLog::checkpointEnd got \"") +
01176                       Basics::errno_Text(errno) + "\" reading " + cfn);
01177         }
01178         bstream->write(buf, count);
01179         if (bstream->fail()) {
01180           vlp->state = VestaLogPrivate::bad;
01181           throw Error(errno, Text("VestaLog::checkpointEnd got \"") +
01182                       Basics::errno_Text(errno) + "\" opening " + bfn);
01183         }
01184       } while (!cstream->fail());
01185 
01186       cstream->close();
01187       bstream->flush();
01188       if (fsync(bstream->fd()) != 0) {
01189         bstream->close();
01190         vlp->state = VestaLogPrivate::bad;
01191         throw VestaLog::Error(errno, Text("VestaLog::checkpointEnd got \"") +
01192                               Basics::errno_Text(errno) + "\" on fsync");
01193       }
01194       bstream->close();
01195 
01196       // Record the commit in the backup directory
01197       Text bnvfn = Text(vlp->directory2) + PathnameSep + NewVersionFileName;
01198       FS::OFdStream bnvf(bnvfn.cchars());
01199       bnvf << vlp->version << endl;
01200       if (fsync(bnvf.fd()) != 0) {
01201         bnvf.close();
01202         vlp->state = VestaLogPrivate::bad;
01203         throw VestaLog::Error(errno, Text("VestaLog::checkpointEnd got \"") +
01204                               Basics::errno_Text(errno) + "\" on fsync");
01205       }
01206       if (!bnvf.good()) {
01207         bnvf.close();
01208         vlp->state = VestaLogPrivate::bad;
01209         throw VestaLog::Error(errno, Text("VestaLog::checkpointEnd got \"") +
01210                               Basics::errno_Text(errno) + "\" writing(?)");
01211       }
01212       bnvf.close();
01213       Text bvfn = Text(vlp->directory2) + PathnameSep + VersionFileName;
01214       if (rename(bnvfn.cchars(), bvfn.cchars()) != 0) {
01215         vlp->state = VestaLogPrivate::bad;
01216         throw VestaLog::Error(errno, Text("VestaLog::checkpointEnd got \"") +
01217                               Basics::errno_Text(errno) + "\" renaming \"" +
01218                               bnvfn + "\" to \"" + bvfn +"\"");
01219       }
01220       vlp->ccVersion2 = vlp->version;
01221     }
01222 
01223     vlp->checkpointing = false;
01224 }
01225 
01226 
01227 void VestaLog::checkpointAbort()
01228      throw(VestaLog::Error)
01229 {
01230     // state: ready, checkpointing -> ready, !checkpointing
01231     assert(vlp->state == VestaLogPrivate::ready);
01232     assert(vlp->checkpointing);
01233     
01234     // Remove the current uncommitted checkpoint.  Needed to prevent
01235     // checkpointResume from being able to find it.
01236     assert(vlp->version > vlp->ccVersion);
01237     char vstr[16];
01238     sprintf(vstr, "%d", vlp->version);
01239     Text cfn = Text(vlp->directory) + PathnameSep + vstr + CheckpointExtension;
01240     (void) ::unlink(cfn.cchars());
01241 
01242     vlp->checkpointing = false;
01243 }
01244 
01245 fstream *VestaLog::checkpointResume(ios::openmode mode)
01246      throw(VestaLog::Error)
01247 {
01248     // state: recovered, !checkpointing -> recovered, checkpointing
01249     assert(vlp->state == VestaLogPrivate::recovered);
01250     assert(!vlp->checkpointing);
01251     assert(!vlp->readonly);
01252 
01253     // Return NULL if last checkpoint was committed
01254     if (vlp->version <= vlp->ccVersion) return NULL;
01255 
01256     // Try to open the possibly-existing uncommitted checkpoint
01257     char vstr[16];
01258     sprintf(vstr, "%d", vlp->version);
01259     Text cfn = Text(vlp->directory) + PathnameSep + vstr + CheckpointExtension;
01260     if(!FS::Exists(cfn))
01261       // Return NULL if checkpoint was not in progress
01262       return NULL;
01263     fstream* ret = NEW_CONSTR(fstream, (cfn.cchars(), mode));
01264     if (!ret->good()) {
01265       int saved_errno = errno;
01266       vlp->state = VestaLogPrivate::bad;
01267       throw VestaLog::Error(errno, Text("VestaLog::checkpointResume got \"") +
01268                             Basics::errno_Text(saved_errno) + "\" creating " + cfn);
01269     }
01270     
01271     vlp->checkpointing = true;
01272     return ret;
01273 }
01274 
01275 // Internal routine for VestaLog::prune.
01276 // Called twice, to prune the primary and (if any) the backup.
01277 static void
01278 doPrune(char* directory, int ckpcommitted, int ckpkeep, bool logkeep)
01279 {
01280     // Get the highest pruned checkpoint version number, to avoid
01281     // searching back beyond that point for versions to keep.
01282     Text pfn = Text(directory) + PathnameSep + PrunedFileName;
01283     int prunedver = -1;
01284     ifstream pfi(pfn.cchars());
01285     if (!pfi.fail()) {
01286         pfi >> prunedver;
01287     }
01288     pfi.close();
01289 
01290     // Find checkpoints to keep
01291     int delver = ckpcommitted; // start at highest committed version
01292     int nkept = 0;
01293     // cerr << "highest committed = " << delver << endl;
01294     while (nkept < ckpkeep && delver > prunedver) {
01295         // Keep this version
01296         // cerr << "keeping version " << delver << endl;
01297         delver--;
01298         nkept++;
01299         while (nkept < ckpkeep && delver > prunedver && delver > 0) {
01300             // Probe for the next lower committed version.
01301             char cfn[MaxFileNameLen];       
01302             sprintf(cfn, "%s%c%d%s", directory, PathnameSep,
01303                     delver, CheckpointExtension);
01304             struct stat junk;
01305             if (stat(cfn, &junk) == 0) {
01306                 // Found another committed version
01307                 // cerr << "found version " << delver << endl;
01308                 break;
01309             } else {
01310                 if (errno == ENOENT) {
01311                     // No committed version by this number
01312                     // cerr << "no version " << delver << endl;
01313                     delver--;
01314                 } else {
01315                     throw
01316                       VestaLog::Error(errno, Text("VestaLog::prune got \"") +
01317                                       Basics::errno_Text(errno) + "\" on stat of \"" +
01318                                       cfn + "\"");
01319                 }
01320             }
01321         }
01322     }
01323 
01324     // Delete all checkpoint versions <= delver, and if !logkeep, all
01325     //  log versions <= delver.
01326     if (delver < 0) {
01327         // Nothing to do
01328         // cerr << "nothing to prune" << endl;
01329         return;
01330     }
01331     DIR* dir = opendir(directory);
01332     if (!dir) {
01333         throw VestaLog::Error(errno, Text("VestaLog::prune got \"") +
01334                               Basics::errno_Text(errno) + "\" opening directory \"" +
01335                               directory +"\"");
01336     }   
01337     struct dirent de, *done;
01338     while (readdir_r(dir, /*OUT*/ &de, /*OUT*/ &done) == 0 && done != NULL) {
01339         int num;
01340         char ext[4], junk;
01341         if (sscanf(de.d_name, "%d.%3c%c", &num, ext, &junk) == 2) {
01342             ext[3] = '\0';
01343             if (num <= delver
01344                 && (strcmp(ext, "ckp") == 0
01345                     || (!logkeep && strcmp(ext, "log") == 0))) {
01346                 // Delete it!
01347                 char delname[MaxFileNameLen];
01348                 sprintf(delname, "%s%c%s", directory,
01349                         PathnameSep, de.d_name);
01350                 // cerr << "pruning " << delname << endl;
01351                 if (::unlink(delname) < 0) {
01352                     throw
01353                       VestaLog::Error(errno, Text("VestaLog::prune got \"") +
01354                                       Basics::errno_Text(errno) + "\" unlinking \"" +
01355                                       delname +"\"");
01356                 }
01357             }
01358         }
01359     }   
01360     closedir(dir);
01361 
01362     // Done, record what we did.  This keeps us from counting down all
01363     //  the way to 0 in the probe loop if the user asks us to keep more
01364     //  versions than there currently are.  This code is a bit over-
01365     //  engineered; I adapted it from writing the version file.
01366     Text npfn = Text(directory) + PathnameSep + NewPrunedFileName;
01367     FS::OFdStream pfo(npfn.cchars());
01368     pfo << delver << endl;
01369     if (fsync(pfo.fd()) != 0 || !pfo.good()) {
01370         pfo.close();
01371         return;  // ignore it, who really cares?
01372     }
01373     pfo.close();
01374     (void) ::rename(npfn.cchars(), pfn.cchars());
01375 }
01376 
01377 void VestaLog::prune(int ckpkeep, bool logkeep, bool prunebak)
01378   throw (VestaLog::Error)
01379 {
01380     // state: !initial & !bad
01381     assert(vlp->state != VestaLogPrivate::initial);
01382     assert(vlp->state != VestaLogPrivate::bad);
01383     assert(!vlp->readonly);
01384 
01385     try {
01386       // Prune the primary
01387       doPrune(vlp->directory, vlp->ccVersion, ckpkeep, logkeep);
01388 
01389       // Prune the backup
01390       if (prunebak && vlp->directory2) {
01391         doPrune(vlp->directory2, vlp->ccVersion2, ckpkeep, logkeep);
01392       }
01393     } catch (Error) {
01394       vlp->state = VestaLogPrivate::bad;
01395       throw;
01396     }
01397 }
01398 
01399 void VestaLog::close() throw ()
01400 {
01401     // state: * -> initial
01402     switch (vlp->state) {
01403       case VestaLogPrivate::initial:
01404         break;
01405 
01406       case VestaLogPrivate::recovering:
01407       case VestaLogPrivate::ready:
01408       case VestaLogPrivate::logging:
01409       case VestaLogPrivate::recovered:
01410       case VestaLogPrivate::bad:
01411         if (vlp->fd != -1) {
01412             (void) ::close(vlp->fd);
01413         }
01414         if (vlp->fd2 != -1) {
01415             (void) ::close(vlp->fd2);
01416         }
01417         if (vlp->lockfd != -1) {
01418             (void) ::close(vlp->lockfd);
01419         }
01420         if (vlp->lockfd2 != -1) {
01421             (void) ::close(vlp->lockfd2);
01422         }
01423         break;
01424     }
01425     while (vlp->cur != NULL) {
01426         VLogBlock* temp = vlp->cur->next;
01427         delete vlp->cur;
01428         vlp->cur = temp;
01429     }
01430     vlp->last = NULL;
01431     if (vlp->pocket != NULL) {
01432         delete vlp->pocket;
01433         vlp->pocket = NULL;
01434     }
01435     while (vlp->free != NULL) {
01436         VLogBlock* temp = vlp->free->next;
01437         delete vlp->free;
01438         vlp->free = temp;
01439     }
01440     vlp->state = VestaLogPrivate::initial;
01441     if (vlp->directory != NULL) {
01442         delete vlp->directory;
01443         vlp->directory = NULL;
01444     }
01445 }
01446 

Generated on Mon May 8 00:48:42 2006 for Vesta by  doxygen 1.4.2