00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #if __linux__
00027 #include <stdint.h>
00028 #endif
00029 #include "VRWeed.H"
00030 #include "VMemPool.H"
00031 #include "VestaSource.H"
00032 #include "VDirChangeable.H"
00033 #include "VDirEvaluator.H"
00034 #include "VForward.H"
00035 #include "ReadersWritersLock.H"
00036 #include "SourceOrDerived.H"
00037 #include "VestaLog.H"
00038 #include "Recovery.H"
00039 #include "VestaConfig.H"
00040 #include "ShortIdImpl.H"
00041 #include "VRConcurrency.H"
00042 #include "VestaAttribsRep.H"
00043 #include "DirShortId.H"
00044 #include "Mastership.H"
00045 #include "logging.H"
00046 #include <Thread.H>
00047 #include <fstream>
00048 #include <sys/time.h>
00049 #include <BufStream.H>
00050 #include "MutableSidref.H"
00051
00052 using Basics::OBufStream;
00053 using std::hex;
00054 using std::dec;
00055 using std::ifstream;
00056 using std::fstream;
00057 using std::ios;
00058
00059 #include "lock_timing.H"
00060
00061
00062 static Basics::mutex state_mu;
00063 static ShortId keepSourceSid = NullShortId, keepDerivedSid = NullShortId;
00064 static time_t keepSourceTime = 0, keepDerivedTime = 0;
00065 static bool deletionsDone = true;
00066 static bool sourceWeedDone = false;
00067 static bool checkpointInProgress = false;
00068 static bool checkpointRequested = false;
00069 static bool deletionsInProgress = false;
00070 static bool sourceWeedInProgress = false;
00071 static FILE* keepSourceSidFile = NULL;
00072
00073 static Basics::cond state_cond;
00074
00075
00076
00077
00078 static void
00079 KeepCallback(RecoveryReader* rr, char& c)
00080 throw(VestaLog::Error, VestaLog::Eof)
00081 {
00082 unsigned long ulsid;
00083 long lsec;
00084 char kind;
00085 rr->skipWhite(c);
00086 kind = c;
00087 rr->get(c);
00088 rr->getULong(c, ulsid);
00089 rr->getLong(c, lsec);
00090 switch (kind) {
00091 case 's':
00092 keepSourceSid = (ShortId) ulsid;
00093 keepSourceTime = lsec;
00094 break;
00095 case 'd':
00096 keepDerivedSid = (ShortId) ulsid;
00097 keepDerivedTime = lsec;
00098 keepSourceSid = NullShortId;
00099 keepSourceTime = 0;
00100 break;
00101 default:
00102 assert(false);
00103 }
00104 deletionsDone = false;
00105 }
00106
00107 static void
00108 DdunCallback(RecoveryReader* rr, char& c)
00109 throw(VestaLog::Error, VestaLog::Eof)
00110 {
00111 deletionsDone = true;
00112 }
00113
00114 static void
00115 NpinCallback(RecoveryReader* rr, char& c)
00116 throw(VestaLog::Error, VestaLog::Eof)
00117 {
00118
00119 unsigned long ulnpin;
00120 rr->getULong(c, ulnpin);
00121 }
00122
00123 static void
00124 FsidCallback(RecoveryReader* rr, char& c)
00125 throw(VestaLog::Error, VestaLog::Eof)
00126 {
00127 unsigned long ulsid;
00128 rr->skipWhite(c);
00129 while (c != ')') {
00130 rr->getULong(c, ulsid);
00131 DeleteDirShortId((ShortId) ulsid);
00132 rr->skipWhite(c);
00133 }
00134 }
00135
00136 void
00137 InitVRWeed()
00138 {
00139 RegisterRecoveryCallback("keep", KeepCallback);
00140 RegisterRecoveryCallback("ddun", DdunCallback);
00141 RegisterRecoveryCallback("npin", NpinCallback);
00142 RegisterRecoveryCallback("fsid", FsidCallback);
00143
00144
00145 VMemPool::registerCallbacks(VMemPool::vDirChangeable,
00146 VDirChangeable::markCallback, NULL,
00147 VDirChangeable::sweepCallback,
00148 (void*) &keepSourceSidFile,
00149 VDirChangeable::rebuildCallback, NULL);
00150
00151 VMemPool::registerCallbacks(VMemPool::vDirImmutable,
00152 VDirChangeable::markCallback, NULL,
00153 VDirChangeable::sweepCallback,
00154 (void*) &keepSourceSidFile,
00155 VDirChangeable::rebuildCallback, NULL);
00156
00157 VMemPool::registerCallbacks(VMemPool::vDirAppendable,
00158 VDirChangeable::markCallback, NULL,
00159 VDirChangeable::sweepCallback,
00160 (void*) &keepSourceSidFile,
00161 VDirChangeable::rebuildCallback, NULL);
00162
00163 VMemPool::registerCallbacks(VMemPool::vForward,
00164 VForward::markCallback, NULL,
00165 VForward::sweepCallback, NULL,
00166 VForward::rebuildCallback, NULL);
00167
00168 VMemPool::registerCallbacks(VMemPool::vDirEvaluator,
00169 VDirEvaluator::markCallback, NULL,
00170 VDirEvaluator::sweepCallback, NULL,
00171 VDirEvaluator::rebuildCallback, NULL);
00172
00173 VMemPool::registerCallbacks(VMemPool::vAttrib,
00174 VestaAttribsRep::markCallback, NULL,
00175 VestaAttribsRep::sweepCallback, NULL,
00176 VestaAttribsRep::rebuildCallback, NULL);
00177 }
00178
00179 static void
00180 LogKeep(char kind, ShortId sid, time_t time)
00181 {
00182 StableLock.acquireWrite();
00183 RWLOCK_LOCKED_REASON(&StableLock, "Weeding:LogKeep");
00184 {
00185 char logrec[512];
00186 OBufStream ost(logrec, sizeof(logrec));
00187
00188 ost << "(keep " << kind
00189 << " 0x" << hex << sid << " " << dec
00190 << time << ")\n";
00191 VRLog.start();
00192 VRLog.put(ost.str());
00193 VRLog.commit();
00194 }
00195 StableLock.releaseWrite();
00196 }
00197
00198 int
00199 KeepDerived(ShortIdsFile ds, time_t dt, bool force)
00200 {
00201 if (ds != NullShortId) {
00202
00203 int fd = SourceOrDerived::fdopen(ds, O_RDONLY);
00204 if (fd == -1) {
00205 assert(errno != 0);
00206 return errno;
00207 }
00208 close(fd);
00209 }
00210
00211
00212 state_mu.lock();
00213 while (checkpointInProgress || sourceWeedInProgress ||
00214 sourceWeedDone || deletionsInProgress) {
00215 state_cond.wait(state_mu);
00216 }
00217 if (!force && keepDerivedSid == ds && keepDerivedTime == dt &&
00218 deletionsDone) {
00219
00220 state_mu.unlock();
00221 return 0;
00222 }
00223 Repos::dprintf(DBG_ALWAYS,
00224 "keep derived: keepDerivedTime %d, sid 0x%08x\n", dt, ds);
00225 keepDerivedSid = ds;
00226 keepDerivedTime = dt;
00227 keepSourceSid = NullShortId;
00228 keepSourceTime = 0;
00229 deletionsDone = false;
00230 LogKeep('d', ds, dt);
00231 state_mu.unlock();
00232
00233
00234 int err = SourceWeed();
00235 if (err) return err;
00236 return DoDeletions();
00237 }
00238
00239 int
00240 SourceWeed() throw ()
00241 {
00242 int err = 0, res;
00243 state_mu.lock();
00244 while (checkpointInProgress ||
00245 sourceWeedInProgress || deletionsInProgress) {
00246 state_cond.wait(state_mu);
00247 }
00248 if (deletionsDone) {
00249
00250 state_mu.unlock();
00251 return 0;
00252 }
00253 sourceWeedInProgress = true;
00254 state_mu.unlock();
00255 state_cond.broadcast();
00256
00257 time_t newKeepSourceTime = time(NULL) - 30;
00258 ShortId newKeepSourceSid;
00259 int fd = SourceOrDerived::fdcreate(newKeepSourceSid);
00260 if (fd < 0) {
00261 assert (errno != 0);
00262 err = errno;
00263 goto done;
00264 }
00265 keepSourceSidFile = fdopen(fd, "w");
00266 if (fd < 0 || keepSourceSidFile == NULL) {
00267 assert (errno != 0);
00268 err = errno;
00269 goto done;
00270 }
00271 Repos::dprintf(DBG_ALWAYS, "source weed: keepSourceTime %d, sid 0x%08x\n",
00272 newKeepSourceTime, newKeepSourceSid);
00273
00274
00275 res = fprintf(keepSourceSidFile, "%08x\n", newKeepSourceSid);
00276 if (res < 0) {
00277 assert (errno != 0);
00278 err = errno;
00279 goto done;
00280 }
00281
00282 VDirVolatileRoot::lockAll();
00283 StableLock.acquireWrite();
00284 RWLOCK_LOCKED_REASON(&VolatileRootLock, "Weeding:SourceWeed");
00285 RWLOCK_LOCKED_REASON(&StableLock, "Weeding:SourceWeed");
00286 VMemPool::gc(keepDerivedSid);
00287 StableLock.releaseWrite();
00288 VDirVolatileRoot::unlockAll();
00289
00290 res = fflush(keepSourceSidFile);
00291 if (res == EOF) {
00292 assert (errno != 0);
00293 err = errno;
00294 goto done;
00295 }
00296 res = fsync(fd);
00297 if (res < 0) {
00298 assert (errno != 0);
00299 err = errno;
00300 goto done;
00301 }
00302 res = fclose(keepSourceSidFile);
00303 if (res == EOF) {
00304 assert (errno != 0);
00305 err = errno;
00306 goto done;
00307 }
00308
00309 done:
00310 state_mu.lock();
00311 if (err == 0) {
00312 keepSourceTime = newKeepSourceTime;
00313 keepSourceSid = newKeepSourceSid;
00314 deletionsDone = false;
00315 LogKeep('s', newKeepSourceSid, newKeepSourceTime);
00316 Repos::dprintf(DBG_ALWAYS, "source weed done\n");
00317 } else {
00318 Repos::dprintf(DBG_ALWAYS, "source weed failed: errno %d\n", err);
00319 }
00320 sourceWeedInProgress = false;
00321 sourceWeedDone = true;
00322 state_mu.unlock();
00323 state_cond.broadcast();
00324 return err;
00325 }
00326
00327
00328
00329 static bool file_is_non_empty(const char *p_fname) throw()
00330 {
00331 struct stat l_info;
00332
00333
00334
00335 return ((stat(p_fname, &l_info) == 0) &&
00336 (l_info.st_size > 0));
00337 }
00338
00339
00340
00341 static bool file_ends_with_newline(const char *p_fname) throw()
00342 {
00343
00344 ifstream l_file(p_fname);
00345 if(l_file.good())
00346 {
00347
00348 l_file.seekg(-1, fstream::end);
00349
00350 Repos::dprintf(DBG_ALWAYS,
00351 "file_ends_with_newline: position after seek: %d\n",
00352 (unsigned int) l_file.tellg());
00353
00354 if(l_file.good())
00355 {
00356
00357 int l_next = l_file.peek();
00358 Repos::dprintf(DBG_ALWAYS,
00359 "file_ends_with_newline: last character in file: %d\n",
00360 l_next);
00361 if(l_next == '\n')
00362 {
00363 return true;
00364 }
00365 }
00366 else
00367 {
00368 Repos::dprintf(DBG_ALWAYS,
00369 "file_ends_with_newline: seek caused error "
00370 "(l_file.good() is false after seek)\n");
00371 }
00372 }
00373 else
00374 {
00375 Repos::dprintf(DBG_ALWAYS,
00376 "file_ends_with_newline: couldn't open %s "
00377 "(l_file.good() is false after open)\n", p_fname);
00378 }
00379
00380 return false;
00381 }
00382
00383 int
00384 DoDeletions() throw ()
00385 {
00386 state_mu.lock();
00387 while (checkpointInProgress ||
00388 sourceWeedInProgress || deletionsInProgress) {
00389 state_cond.wait(state_mu);
00390 }
00391 if (deletionsDone) {
00392
00393 state_mu.unlock();
00394 return 0;
00395 }
00396 ShortIdsFile ds = keepDerivedSid;
00397 time_t dt = keepDerivedTime;
00398 ShortIdsFile ss = keepSourceSid;
00399 time_t st = keepSourceTime;
00400 deletionsInProgress = true;
00401 sourceWeedDone = false;
00402 state_mu.unlock();
00403 state_cond.broadcast();
00404
00405 int ret = 0;
00406 int status;
00407 Text cmd;
00408 char* oname;
00409
00410
00411
00412 Repos::dprintf(DBG_ALWAYS, "starting deletions\n");
00413 Text sidsort;
00414 if (!VestaConfig::get("Repository", "ShortId_sorter", sidsort)) {
00415 sidsort = "sidsort";
00416 }
00417 const char* sname = (ss == NullShortId) ? ""
00418 : SourceOrDerived::shortIdToName(ss);
00419 const char* dname = (ds == NullShortId) ? ""
00420 : SourceOrDerived::shortIdToName(ds);
00421 ShortIdsFile osidfile;
00422 int fd = SourceOrDerived::fdcreate(osidfile);
00423 if (fd < 0) {
00424 ret = errno;
00425 Repos::dprintf(DBG_ALWAYS,
00426 "failed to create combined keepSidFile: errno %d\n", ret);
00427 assert(ret != 0);
00428 goto finish;
00429 }
00430 close(fd);
00431 oname = SourceOrDerived::shortIdToName(osidfile);
00432 cmd = sidsort +" "+ oname +" "+ sname +" "+ dname;
00433 delete [] sname;
00434 delete [] dname;
00435 status = system(cmd.cchars());
00436 if (status != 0) {
00437 if (status < 0) {
00438 ret = errno;
00439 Repos::dprintf(DBG_ALWAYS, "system() failed: errno %d\n", ret);
00440 assert(ret != 0);
00441 } else {
00442 Repos::dprintf(DBG_ALWAYS, "ShortId_sorter failed: status %d\n", status);
00443 ret = EINVAL;
00444 }
00445 goto finish;
00446 }
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463 if(!file_is_non_empty(oname))
00464 {
00465 Repos::dprintf(DBG_ALWAYS,
00466 "combined keepSidFile (0x%08x) is empty;"
00467 " not doing deletions\n", osidfile);
00468 ret = ENODATA;
00469 goto finish;
00470 }
00471 else if(!file_ends_with_newline(oname))
00472 {
00473 Repos::dprintf(DBG_ALWAYS,
00474 "combined keepSidFile (0x%08x) doen't end with a newline "
00475 "(may be truncated); not doing deletions\n", osidfile);
00476 ret = EINVAL;
00477 goto finish;
00478
00479 }
00480
00481 delete [] oname;
00482
00483 Repos::dprintf(DBG_ALWAYS, "wrote combined keepSidFile 0x%08x\n", osidfile);
00484
00485 ret = DeleteAllShortIdsBut(osidfile, (st < dt) ? st : dt);
00486
00487 finish:
00488 state_mu.lock();
00489 if (ret == 0 && ds == keepDerivedSid && dt == keepDerivedTime &&
00490 ss == keepSourceSid && st == keepSourceTime) {
00491 StableLock.acquireWrite();
00492 RWLOCK_LOCKED_REASON(&StableLock, "Weeding:deletions done");
00493 VRLog.start();
00494 VRLog.put("(ddun)\n");
00495 VRLog.commit();
00496 StableLock.releaseWrite();
00497 deletionsDone = true;
00498 Repos::dprintf(DBG_ALWAYS, "deletions done\n");
00499 } else {
00500 Repos::dprintf(DBG_ALWAYS, "deletions failed\n");
00501 }
00502 deletionsInProgress = false;
00503 state_mu.unlock();
00504 state_cond.broadcast();
00505 return ret;
00506 }
00507
00508
00509
00510
00511
00512 void
00513 Checkpoint() throw ()
00514 {
00515
00516 state_mu.lock();
00517 while (checkpointRequested) {
00518 state_cond.wait(state_mu);
00519 }
00520 checkpointRequested = true;
00521 state_cond.broadcast();
00522 while (checkpointInProgress || checkpointRequested) {
00523 state_cond.wait(state_mu);
00524 }
00525 state_mu.unlock();
00526 return;
00527 }
00528
00529 void
00530 CheckpointServer() throw ()
00531 {
00532 for (;;) {
00533 state_mu.lock();
00534 while (!checkpointRequested ||
00535 sourceWeedInProgress || deletionsInProgress) {
00536 state_cond.wait(state_mu);
00537 }
00538 Repos::dprintf(DBG_ALWAYS, "starting checkpoint\n");
00539 assert(!checkpointInProgress);
00540 checkpointInProgress = true;
00541 checkpointRequested = false;
00542 state_mu.unlock();
00543 state_cond.broadcast();
00544
00545 VDirVolatileRoot::lockAll();
00546 StableLock.acquireWrite();
00547
00548 RWLOCK_LOCKED_REASON(&VolatileRootLock, "Weeding:CheckpointServer");
00549 RWLOCK_LOCKED_REASON(&StableLock, "Weeding:CheckpointServer");
00550
00551 fstream& ckpt = *VRLog.checkpointBegin(ios::in|ios::out);
00552 if (!ckpt.good()) {
00553 Repos::dprintf(DBG_ALWAYS,
00554 "creation of checkpoint file failed: errno %d\n",errno);
00555 assert(ckpt.good());
00556 }
00557
00558 VMemPool::writeCheckpoint(ckpt);
00559 if (!ckpt.good()) {
00560 Repos::dprintf(DBG_ALWAYS,
00561 "write to checkpoint file failed: errno %d\n", errno);
00562 assert(ckpt.good());
00563 }
00564
00565
00566 ckpt << "(vers " << VRLogVersion << ")\n";
00567
00568 ShortIdBlockCheckpoint(ckpt);
00569 if (!ckpt.good()) {
00570 Repos::dprintf(DBG_ALWAYS,
00571 "write to checkpoint file failed: errno %d\n", errno);
00572 assert(ckpt.good());
00573 }
00574
00575 MastershipCheckpoint(ckpt);
00576 if (!ckpt.good()) {
00577 Repos::dprintf(DBG_ALWAYS,
00578 "write to checkpoint file failed: errno %d\n", errno);
00579 assert(ckpt.good());
00580 }
00581
00582
00583 ckpt << "(keep d 0x" << hex << keepDerivedSid << " "
00584 << dec << keepDerivedTime << ")\n";
00585 ckpt << "(keep s 0x" << hex << keepSourceSid << " "
00586 << dec << keepSourceTime << ")\n";
00587 if (deletionsDone) {
00588 ckpt << "(ddun)\n";
00589 }
00590
00591 VDirVolatileRoot::finishCheckpoint(ckpt);
00592
00593 if (!ckpt.good()) {
00594 Repos::dprintf(DBG_ALWAYS,
00595 "write to checkpoint file failed: errno %d\n", errno);
00596 assert(ckpt.good());
00597 }
00598
00599 ckpt.seekg(0, fstream::beg);
00600 VMemPool::readCheckpoint(ckpt, true);
00601 ckpt.close();
00602
00603 VRLog.checkpointEnd();
00604
00605
00606 MutableSidrefCheck(0, LongId::noLock);
00607
00608 StableLock.releaseWrite();
00609 VDirVolatileRoot::unlockAll();
00610
00611 Repos::dprintf(DBG_ALWAYS, "checkpoint done\n");
00612 state_mu.lock();
00613 checkpointInProgress = false;
00614 state_mu.unlock();
00615 state_cond.broadcast();
00616 }
00617 }
00618
00619 void GetWeedingState(ShortIdsFile& ds, time_t& dt,
00620 ShortIdsFile& ss, time_t& st,
00621 bool& sourceWeedInProgress,
00622 bool& deletionsInProgress,
00623 bool& deletionsDone,
00624 bool& checkpointInProgress) throw ()
00625 {
00626 state_mu.lock();
00627 ds = keepDerivedSid;
00628 dt = keepDerivedTime;
00629 ss = keepSourceSid;
00630 st = keepSourceTime;
00631 sourceWeedInProgress = ::sourceWeedInProgress;
00632 deletionsInProgress = ::deletionsInProgress;
00633 deletionsDone = ::deletionsDone;
00634 checkpointInProgress = ::checkpointInProgress;
00635 state_mu.unlock();
00636 }