00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "VDirVolatileRoot.H"
00030 #include "VDirChangeable.H"
00031 #include "VDirEvaluator.H"
00032 #include "VMemPool.H"
00033 #include "VestaLog.H"
00034 #include "Recovery.H"
00035 #include "VRConcurrency.H"
00036 #include "IndexKey.H"
00037 #include "logging.H"
00038 #include "ShortIdRefCount.H"
00039
00040 #include "lock_timing.H"
00041
00042 #include <BufStream.H>
00043
00044 using std::endl;
00045 using Basics::OBufStream;
00046
00047 class VolRootEntry {
00048 public:
00049
00050
00051 static VolRootEntry *head;
00052 static VolRootEntry *tail;
00053
00054
00055 VolRootEntry *next, *prev;
00056
00057 Bit32 srep;
00058 bool readOnlyExisting;
00059 time_t creationTime;
00060 ReadersWritersLock lock;
00061 ShortIdRefCount sidref;
00062
00063
00064 unsigned int index;
00065
00066 FP::Tag fptag;
00067
00068
00069 bool alive;
00070
00071
00072
00073
00074
00075
00076 void link()
00077 {
00078
00079 assert(next == 0);
00080 assert(prev == 0);
00081
00082
00083 prev = VolRootEntry::tail;
00084 VolRootEntry::tail = this;
00085
00086 if(prev != 0)
00087 {
00088
00089 assert(VolRootEntry::head != 0);
00090 assert(prev->next == 0);
00091 prev->next = this;
00092 }
00093 else
00094 {
00095
00096 assert(VolRootEntry::head == 0);
00097 assert(prev == 0);
00098 VolRootEntry::head = this;
00099 }
00100 }
00101 void unlink()
00102 {
00103
00104 if(next != 0)
00105 {
00106 next->prev = prev;
00107 }
00108 if(prev != 0)
00109 {
00110 prev->next = next;
00111 }
00112
00113
00114 if(VolRootEntry::head == this)
00115 {
00116 VolRootEntry::head = next;
00117 }
00118 if(VolRootEntry::tail == this)
00119 {
00120 VolRootEntry::tail = prev;
00121 }
00122
00123
00124 next = 0;
00125 prev = 0;
00126
00127
00128
00129 assert(((VolRootEntry::head != 0) && (VolRootEntry::tail != 0)) ||
00130 ((VolRootEntry::head == 0) && (VolRootEntry::tail == 0)));
00131 }
00132
00133 VolRootEntry(unsigned int myIndex)
00134 : lock(true), next(0), prev(0), index(myIndex),
00135 fptag((char *) &myIndex, sizeof(myIndex)), alive(true)
00136 {
00137 }
00138
00139 ~VolRootEntry()
00140 {
00141
00142 assert(next == 0);
00143 assert(prev == 0);
00144 assert(VolRootEntry::head != this);
00145 assert(VolRootEntry::tail != this);
00146 }
00147 };
00148
00149 VolRootEntry *VolRootEntry::head = 0;
00150 VolRootEntry *VolRootEntry::tail = 0;
00151
00152 typedef Table<IndexKey, VolRootEntry*>::Default VolRootTable;
00153 typedef Table<IndexKey, VolRootEntry*>::Iterator VolRootIter;
00154
00155
00156
00157 static VolRootTable vrtTable;
00158 static Basics::thread gardener;
00159 #define INDEX_BLOCKSIZE 1024
00160 #define GARDENER_SLEEP (60*60)
00161
00162
00163 static unsigned nextIndex = 0;
00164
00165
00166
00167 VestaSource::errorCode
00168 VDirVolatileRoot::lookupIndex(unsigned int index, VestaSource*& result,
00169 char* arcbuf) throw ()
00170 {
00171
00172
00173 result = 0;
00174
00175 VolRootEntry* vre;
00176
00177
00178 if (vrtTable.Get(index, vre) == 0) {
00179 return VestaSource::notFound;
00180 }
00181
00182 result = NEW_CONSTR(VDirChangeable,
00183 (vre->readOnlyExisting ?
00184 VestaSource::volatileROEDirectory :
00185 VestaSource::volatileDirectory,
00186 (Bit8*) VMemPool::lengthenPointer(vre->srep),
00187 &(vre->sidref)));
00188 result->master = true;
00189 result->attribs = NULL;
00190 result->longid = longid.append(index);
00191 result->pseudoInode = indexToPseudoInode(index);
00192 result->ac = this->ac;
00193 result->VestaSource::fptag = vre->fptag;
00194
00195 if (arcbuf) {
00196 sprintf(arcbuf, "%08x", index);
00197 }
00198
00199 return VestaSource::ok;
00200 }
00201
00202
00203
00204 VestaSource::errorCode
00205 VDirVolatileRoot::lookupIndexAndLock(unsigned int index, VestaSource*& result,
00206 LongId::lockKindTag lockKind,
00207 ReadersWritersLock** lock) throw ()
00208 {
00209
00210
00211 result = 0;
00212
00213 VolRootEntry* vre;
00214
00215
00216 if (vrtTable.Get(index, vre) == 0) {
00217 if (lock && lockKind != LongId::checkLock) *lock = NULL;
00218 return VestaSource::notFound;
00219 }
00220
00221 result = NEW_CONSTR(VDirChangeable,
00222 (vre->readOnlyExisting ?
00223 VestaSource::volatileROEDirectory :
00224 VestaSource::volatileDirectory,
00225 (Bit8*) VMemPool::lengthenPointer(vre->srep),
00226 &(vre->sidref)));
00227 result->master = true;
00228 result->attribs = NULL;
00229 result->longid = longid.append(index);
00230 result->pseudoInode = indexToPseudoInode(index);
00231 result->ac = this->ac;
00232 result->VestaSource::fptag = vre->fptag;
00233
00234 switch (lockKind) {
00235 case LongId::noLock:
00236 if (lock != NULL) *lock = NULL;
00237 break;
00238 case LongId::readLock:
00239 case LongId::readLockV:
00240 vre->lock.acquireRead();
00241 *lock = &(vre->lock);
00242 break;
00243 case LongId::writeLock:
00244 case LongId::writeLockV:
00245 vre->lock.acquireWrite();
00246 *lock = &(vre->lock);
00247 break;
00248 case LongId::checkLock:
00249 if (*lock != &(vre->lock)) return VestaSource::invalidArgs;
00250 break;
00251 }
00252
00253 return VestaSource::ok;
00254 }
00255
00256
00257
00258 VestaSource::errorCode
00259 VDirVolatileRoot::lookup(Arc arc, VestaSource*& result,
00260 AccessControl::Identity who, unsigned int indexOffset)
00261 throw ()
00262 {
00263
00264
00265 result = 0;
00266
00267 VolRootEntry* vre;
00268 unsigned int index;
00269 char *endp;
00270 assert(indexOffset == 0);
00271
00272
00273 index = strtol(arc, &endp, 16);
00274 if (*endp != '\0' || endp - arc != 8) {
00275 return VestaSource::notFound;
00276 }
00277 if (vrtTable.Get(index, vre) == 0) {
00278 return VestaSource::notFound;
00279 }
00280
00281 result = NEW_CONSTR(VDirChangeable,
00282 (vre->readOnlyExisting ?
00283 VestaSource::volatileROEDirectory :
00284 VestaSource::volatileDirectory,
00285 (Bit8*) VMemPool::lengthenPointer(vre->srep),
00286 &(vre->sidref)));
00287 result->master = true;
00288 result->attribs = NULL;
00289 result->longid = longid.append(index);
00290 result->pseudoInode = indexToPseudoInode(index);
00291 result->ac = this->ac;
00292 result->VestaSource::fptag = vre->fptag;
00293
00294 return VestaSource::ok;
00295 }
00296
00297
00298 VestaSource::errorCode
00299 VDirVolatileRoot::list(unsigned int firstIndex,
00300 VestaSource::listCallback callback, void* closure,
00301 AccessControl::Identity who,
00302 bool deltaOnly, unsigned int indexOffset) throw ()
00303 {
00304 assert(indexOffset == 0);
00305
00306
00307
00308 VolRootEntry *vre = VolRootEntry::head;
00309 if((VolRootEntry::head != 0) && (firstIndex != 0))
00310 {
00311
00312
00313 if(!vrtTable.Get(firstIndex, vre))
00314 {
00315 if(VolRootEntry::tail->index >= VolRootEntry::head->index)
00316 {
00317
00318
00319
00320 if(firstIndex > VolRootEntry::tail->index)
00321 {
00322
00323
00324
00325
00326
00327 vre = 0;
00328 }
00329 else if(firstIndex <= VolRootEntry::head->index)
00330 {
00331
00332
00333 vre = VolRootEntry::head;
00334 }
00335 else
00336 {
00337
00338
00339
00340 vre = VolRootEntry::head;
00341 while((vre != 0) && (vre->index < firstIndex))
00342 {
00343 vre = vre->next;
00344 }
00345 }
00346 }
00347 else
00348 {
00349
00350
00351
00352 if((firstIndex > VolRootEntry::tail->index) &&
00353 (firstIndex < VolRootEntry::head->index))
00354 {
00355
00356
00357 vre = 0;
00358 }
00359 else if(firstIndex >= VolRootEntry::head->index)
00360 {
00361
00362
00363 vre = VolRootEntry::head;
00364 while((vre != 0) &&
00365 (vre->index < firstIndex) &&
00366 (vre->index >= VolRootEntry::head->index))
00367 {
00368 vre = vre->next;
00369 }
00370 }
00371 else if(firstIndex <= VolRootEntry::tail->index)
00372 {
00373
00374
00375 vre = VolRootEntry::head;
00376 while((vre != 0) &&
00377 (vre->index >= VolRootEntry::head->index))
00378 {
00379 vre = vre->next;
00380 }
00381
00382
00383 while((vre != 0) &&
00384 (vre->index < firstIndex))
00385 {
00386 vre = vre->next;
00387 }
00388 }
00389 }
00390 }
00391 }
00392
00393
00394 while(vre != 0)
00395 {
00396 char arcbuf[MAX_ARC_LEN+1];
00397 sprintf(arcbuf, "%08x", vre->index);
00398 if (!callback(closure, vre->readOnlyExisting ?
00399 VestaSource::volatileROEDirectory :
00400 VestaSource::volatileDirectory, arcbuf, vre->index,
00401 indexToPseudoInode(vre->index), NullShortId, true))
00402 {
00403 break;
00404 }
00405 vre = vre->next;
00406 }
00407 return VestaSource::ok;
00408 }
00409
00410
00411
00412
00413
00414 VestaSource::errorCode
00415 VDirVolatileRoot::
00416 createVolatileDirectory(char* hostname, char* port, Bit64 handle,
00417 VestaSource*& result,
00418 time_t timestamp,
00419 LongId::lockKindTag lockKind,
00420 ReadersWritersLock** lock,
00421 bool readOnlyExisting) throw ()
00422 {
00423 StableLock.acquireWrite();
00424 RWLOCK_LOCKED_REASON(&StableLock, "createVolatileDirectory (logging)");
00425 if (nextIndex % INDEX_BLOCKSIZE == 0) {
00426 char logrec[512];
00427 OBufStream ost(logrec, sizeof(logrec));
00428
00429 ost << "(vidx " << nextIndex << ")\n";
00430 VRLog.start();
00431 VRLog.put(ost.str());
00432 VRLog.commit();
00433
00434
00435
00436
00437 if ((nextIndex >= 0x7ffffc00) || (nextIndex == 0)) {
00438 nextIndex = 1;
00439 }
00440 }
00441 unsigned int index = nextIndex++;
00442 StableLock.releaseWrite();
00443
00444
00445 if (timestamp == 0) timestamp = time(NULL);
00446 VolRootEntry* vre = NEW_CONSTR(VolRootEntry, (index));
00447 vre->readOnlyExisting = readOnlyExisting;
00448 vre->creationTime = time(NULL);
00449
00450
00451
00452
00453
00454 assert(lockKind != LongId::readLockV && lockKind != LongId::writeLockV);
00455 ReadersWritersLock* vrootlock;
00456 VestaSource* vroot =
00457 VestaSource::volatileRoot(LongId::writeLock, &vrootlock);
00458
00459 RWLOCK_LOCKED_REASON(vrootlock, "createVolatileDirectory");
00460 VDirEvaluator edir(readOnlyExisting ? VestaSource::evaluatorROEDirectory
00461 : VestaSource::evaluatorDirectory,
00462 hostname, port, handle, &vre->alive, timestamp);
00463
00464
00465
00466
00467 VDirChangeable* vdir =
00468 NEW_CONSTR(VDirChangeable,
00469 (readOnlyExisting ? VestaSource::volatileROEDirectory
00470 : VestaSource::volatileDirectory));
00471 vdir->setIsMoreOrBase(vdir->rep, VDirChangeable::isBase);
00472 vdir->setMoreOrBase(vdir->rep, VMemPool::shortenPointer(edir.rep));
00473 vdir->baseCache = edir.rep;
00474 vdir->setTimestamp(timestamp);
00475 vdir->setID(index);
00476 vdir->sidref = &(vre->sidref);
00477
00478
00479 vre->srep = VMemPool::shortenPointer(vdir->rep);
00480
00481 vre->link();
00482 vrtTable.Put(index, vre);
00483
00484 vdir->longid = VolatileRootLongId.append(index);
00485 vdir->VestaSource::master = true;
00486 vdir->pseudoInode = index;
00487 vdir->ac = vroot->ac;
00488 vdir->VestaSource::fptag = vre->fptag;
00489
00490 result = vdir;
00491
00492 switch (lockKind) {
00493 case LongId::noLock:
00494 vrootlock->releaseWrite();
00495 break;
00496 case LongId::readLock:
00497 vre->lock.acquireRead();
00498 vrootlock->releaseWrite();
00499 *lock = &(vre->lock);
00500 break;
00501 case LongId::writeLock:
00502 vre->lock.acquireWrite();
00503 vrootlock->releaseWrite();
00504 *lock = &(vre->lock);
00505 break;
00506 case LongId::checkLock:
00507 case LongId::readLockV:
00508 case LongId::writeLockV:
00509 assert(false);
00510 break;
00511 }
00512
00513 return VestaSource::ok;
00514 }
00515
00516
00517
00518
00519
00520 VestaSource::errorCode
00521 VDirVolatileRoot::deleteIndex(unsigned int index) throw ()
00522 {
00523 VolRootEntry* vre;
00524
00525 bool ok = vrtTable.Delete(index, vre, false);
00526 if (!ok) return VestaSource::notFound;
00527
00528
00529
00530 vre->unlink();
00531
00532
00533 vre->lock.acquireWrite();
00534
00535
00536
00537
00538 VDirChangeable vdc((vre->readOnlyExisting
00539 ? VestaSource::volatileROEDirectory
00540 : VestaSource::volatileDirectory),
00541 (Bit8*) VMemPool::lengthenPointer(vre->srep));
00542 VestaSource* vs;
00543 VestaSource::errorCode err = vdc.getBase(vs);
00544 assert(err == VestaSource::ok);
00545 VDirEvaluator* vde = (VDirEvaluator*) vs;
00546 vde->freeTree();
00547
00548
00549
00550
00551
00552
00553 vdc.freeTree();
00554
00555 vre->lock.releaseWrite();
00556 delete vre;
00557 delete vs;
00558
00559 return VestaSource::ok;
00560 }
00561
00562 void
00563 VDirVolatileRoot::lockAll()
00564 {
00565 VolatileRootLock.acquireWrite();
00566 VolRootIter iter(&vrtTable);
00567 IndexKey key;
00568 VolRootEntry* vre;
00569 while (iter.Next(key, vre)) {
00570 vre->lock.acquireWrite();
00571 }
00572 }
00573
00574 void
00575 VDirVolatileRoot::unlockAll()
00576 {
00577 VolRootIter iter(&vrtTable);
00578 IndexKey key;
00579 VolRootEntry* vre;
00580 while (iter.Next(key, vre)) {
00581 vre->lock.releaseWrite();
00582 }
00583 VolatileRootLock.releaseWrite();
00584 }
00585
00586 static void
00587 VidxCallback(RecoveryReader* rr, char& c)
00588 throw(VestaLog::Error, VestaLog::Eof)
00589 {
00590 unsigned long ulindex;
00591 rr->getULong(c, ulindex);
00592
00593
00594
00595
00596
00597 nextIndex = ulindex + INDEX_BLOCKSIZE;
00598 }
00599
00600
00601
00602 void
00603 VDirVolatileRoot::mark(bool byName, ArcTable* hidden) throw ()
00604 {
00605 assert(byName);
00606 assert(hidden == NULL);
00607 VolRootIter iter(&vrtTable);
00608 IndexKey key;
00609 VolRootEntry* val;
00610 while (iter.Next(key, val)) {
00611 VDirChangeable vs(VestaSource::volatileDirectory,
00612 (Bit8*) VMemPool::lengthenPointer(val->srep));
00613 vs.setHasName(true);
00614 vs.mark();
00615 }
00616 }
00617
00618
00619 Bit32
00620 VDirVolatileRoot::checkpoint(Bit32& nextSP, std::fstream& ckpt) throw ()
00621 {
00622
00623
00624
00625
00626 VolRootIter iter(&vrtTable);
00627 IndexKey key;
00628 VolRootEntry* val;
00629
00630 while (iter.Next(key, val)) {
00631 VDirChangeable vs(VestaSource::volatileDirectory,
00632 (Bit8*) VMemPool::lengthenPointer(val->srep));
00633 val->srep = vs.checkpoint(nextSP, ckpt);
00634
00635 if (!ckpt.good()) {
00636 Repos::dprintf(DBG_ALWAYS,
00637 "write to checkpoint file failed: errno %d\n", errno);
00638 assert(ckpt.good());
00639 }
00640 }
00641 return nextSP;
00642 }
00643
00644 void
00645 VDirVolatileRoot::finishCheckpoint(std::fstream& ckpt) throw ()
00646 {
00647
00648 ckpt << "(vidx " << nextIndex - (nextIndex % INDEX_BLOCKSIZE) << ")" << endl;
00649 }
00650
00651
00652
00653 void*
00654 GardenerThread(void* arg) throw ()
00655 {
00656 signal(SIGPIPE, SIG_IGN);
00657 signal(SIGQUIT, SIG_DFL);
00658 signal(SIGSEGV, SIG_DFL);
00659 signal(SIGABRT, SIG_DFL);
00660 signal(SIGBUS, SIG_DFL);
00661 signal(SIGILL, SIG_DFL);
00662
00663 for (;;) {
00664 sleep(GARDENER_SLEEP);
00665
00666
00667
00668
00669 VolatileRootLock.acquireRead();
00670 RWLOCK_LOCKED_REASON(&VolatileRootLock, "GardenerThread, 1st pass");
00671 unsigned int *dead_index_list = NEW_PTRFREE_ARRAY(unsigned int,
00672 vrtTable.Size());
00673 unsigned int dead_index_count = 0;
00674 VolRootIter iter(&vrtTable);
00675 IndexKey key;
00676 VolRootEntry* vre;
00677 time_t now = time(NULL);
00678 while (iter.Next(key, vre)) {
00679
00680 if (now - vre->creationTime < GARDENER_SLEEP) continue;
00681
00682 if (!vre->lock.tryWrite()) continue;
00683 VDirChangeable vdc(VestaSource::volatileDirectory,
00684 (Bit8*) VMemPool::lengthenPointer(vre->srep));
00685 VestaSource* vs;
00686 VestaSource::errorCode err = vdc.getBase(vs);
00687 assert(err == VestaSource::ok);
00688 VDirEvaluator* vde = (VDirEvaluator*) vs;
00689 if (!vde->alive()) {
00690 dead_index_list[dead_index_count++] = key.index;
00691 }
00692 vre->lock.releaseWrite();
00693 delete vs;
00694 }
00695 assert(dead_index_count <= vrtTable.Size());
00696 VolatileRootLock.releaseRead();
00697
00698
00699
00700
00701 for(unsigned int i = 0; i < dead_index_count; i++)
00702 {
00703 VolatileRootLock.acquireWrite();
00704 RWLOCK_LOCKED_REASON(&VolatileRootLock, "GardenerThread, 2nd pass");
00705 key.index = dead_index_list[i];
00706
00707
00708
00709 if(vrtTable.Delete(key, vre, false))
00710 {
00711
00712
00713 vre->unlink();
00714
00715
00716 vre->lock.acquireWrite();
00717 VDirChangeable vdc((vre->readOnlyExisting
00718 ? VestaSource::volatileROEDirectory
00719 : VestaSource::volatileDirectory),
00720 (Bit8*) VMemPool::lengthenPointer(vre->srep));
00721 VestaSource* vs;
00722 VestaSource::errorCode err = vdc.getBase(vs);
00723 assert(err == VestaSource::ok);
00724 VDirEvaluator* vde = (VDirEvaluator*) vs;
00725
00726 assert(!vde->alive());
00727 vde->purge();
00728 vde->freeTree();
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740 vdc.freeTree();
00741 vre->lock.releaseWrite();
00742 delete vre;
00743 delete vs;
00744 }
00745 VolatileRootLock.releaseWrite();
00746
00747
00748 sleep(1);
00749 }
00750 delete [] dead_index_list;
00751 }
00752
00753
00754 }
00755
00756 void VDirVolatileRoot::init() throw ()
00757 {
00758 static int done = false;
00759 assert(!done);
00760 done = true;
00761 RegisterRecoveryCallback("vidx", VidxCallback);
00762 Basics::thread_attr gardener_attr;
00763 #if defined (_POSIX_THREAD_PRIORITY_SCHEDULING) && !defined(__linux__)
00764
00765 gardener_attr.set_schedpolicy(SCHED_RR);
00766 gardener_attr.set_inheritsched(PTHREAD_EXPLICIT_SCHED);
00767 gardener_attr.set_sched_priority(sched_get_priority_min(SCHED_RR));
00768 #endif
00769
00770 gardener.fork(GardenerThread, NULL, gardener_attr);
00771 }
00772
00773 VestaSource *VDirVolatileRoot::copy() throw()
00774 {
00775 VestaSource *result = NEW(VDirVolatileRoot);
00776 *result = *this;
00777 return result;
00778 }