00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "UniqueId.H"
00028 #include "FP.H"
00029 #include "VestaSource.H"
00030 #include "VDirSurrogate.H"
00031 #include "logging.H"
00032 #include "VRConcurrency.H"
00033 #include "Mastership.H"
00034 #include "VLogHelp.H"
00035
00036 #include "lock_timing.H"
00037
00038 using std::fstream;
00039 using std::endl;
00040 using std::cerr;
00041 using std::cout;
00042
00043 Text myHostPort, myMasterHint;
00044
00045
00046 static VestaSource::errorCode
00047 AcceptMastership(VestaSource* svs, const char* pathname, char pathnameSep,
00048 const char* requestid, const char* grantid) throw ();
00049 static VestaSource::errorCode
00050 AcknowledgeMastership(VestaSource* svs,
00051 const char* grantid,
00052 AccessControl::Identity swho = NULL) throw ();
00053 static VestaSource::errorCode
00054 FinishMastership(const char* pathname, char pathnameSep,
00055 const char* grantid) throw ();
00056 static void* RecoverMastershipThread(void *arg) throw ();
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 bool
00069 MastershipAccessCheck(VestaSource* vs, const char* direction,
00070 const char* hostport) throw ()
00071 {
00072 bool free_vs = false;
00073 for (;;) {
00074 if (vs == NULL) return false;
00075 if (vs->getAttribConst(direction)) break;
00076 VestaSource *vs_parent = vs->longid.getParent().lookup();
00077 if(free_vs)
00078 delete vs;
00079 vs = vs_parent;
00080
00081 free_vs = true;
00082 }
00083 bool result = (vs->inAttribs(direction, hostport) ||
00084 vs->inAttribs(direction, "*"));
00085 if(free_vs)
00086 delete vs;
00087 return result;
00088 }
00089
00090
00091
00092 static bool
00093 TypeCheck(VestaSource::typeTag fromType, VestaSource::typeTag toType)
00094 {
00095 switch (toType) {
00096 case VestaSource::appendableDirectory:
00097
00098
00099 case VestaSource::stub:
00100
00101 case VestaSource::ghost:
00102
00103
00104 if (fromType != toType) {
00105 return false;
00106 }
00107 break;
00108
00109 default:
00110
00111
00112
00113
00114 if (fromType != toType && fromType != VestaSource::ghost) {
00115 return false;
00116 }
00117 break;
00118 }
00119 return true;
00120 }
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141 VestaSource::errorCode
00142 AcquireMastership(const char* pathname,
00143 const char* srcHost, const char* srcPort, char pathnameSep,
00144 AccessControl::Identity dwho, AccessControl::Identity swho)
00145 throw ()
00146 {
00147 VestaSource* sroot = NULL;
00148 VestaSource* droot = NULL;
00149 VestaSource* dvs = NULL;
00150 VestaSource* svs = NULL;
00151 ReadersWritersLock* lock = NULL;
00152 const char* grantid = NULL;
00153 VestaSource::errorCode err = VestaSource::ok;
00154 char arcbuf[MAX_ARC_LEN+1], hostbuf[MAX_ARC_LEN+1], portbuf[MAX_ARC_LEN+1];
00155 char hpbuf[2*MAX_ARC_LEN+2];
00156 bool needCommit = false;
00157 bool needRecovery = false;
00158 FP::Tag uid;
00159 time_t now = 0;
00160 char *xp = NULL;
00161 unsigned char* uidbytes = NULL;
00162
00163 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: %s\n", pathname);
00164
00165 if (*srcHost == '\000' || *srcPort == '\000') {
00166 Repos::dprintf(DBG_MASTERSHIP,
00167 "AcquireMastership: srcHost:srcPort must be supplied\n");
00168 err = VestaSource::notMaster;
00169 goto done;
00170 }
00171
00172 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: trying master %s:%s\n",
00173 srcHost, srcPort);
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184 if (strchr(srcHost, ' ') || strchr(srcHost, '/') ||
00185 strchr(srcPort, ' ') || strchr(srcPort, '/') ||
00186 !strchr(srcHost, '.') || strchr(srcPort, ':')) {
00187 Repos::dprintf(DBG_MASTERSHIP,
00188 "AcquireMastership: bad srcHost:srcPort %s:%s\n",
00189 srcHost, srcPort);
00190 err = VestaSource::notMaster;
00191 goto done;
00192 }
00193
00194 try {
00195
00196 sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00197 err = sroot->lookupPathname(pathname, svs, swho, pathnameSep);
00198
00199 } catch (SRPC::failure f) {
00200 Repos::dprintf(DBG_MASTERSHIP,
00201 "AcquireMastership: initial RPC to %s:%s failed: %s (%d)\n",
00202 srcHost, srcPort, f.msg.cchars(), f.r);
00203
00204 err = VestaSource::rpcFailure;
00205 goto done;
00206 }
00207
00208 if (err != VestaSource::ok) {
00209 Repos::dprintf(DBG_MASTERSHIP,
00210 "AcquireMastership: remote lookup failed: %s\n",
00211 VestaSource::errorCodeString(err));
00212 if (err == VestaSource::notFound) err = VestaSource::notMaster;
00213 goto done;
00214 }
00215
00216 if (!svs->hasAttribs()) {
00217 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: not an independent object\n");
00218 err = VestaSource::inappropriateOp;
00219 goto done;
00220 }
00221
00222 if (!svs->master) {
00223 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: %s:%s is not master\n",
00224 srcHost, srcPort);
00225 err = VestaSource::notMaster;
00226 goto done;
00227 }
00228
00229
00230
00231
00232
00233
00234 droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00235 RWLOCK_LOCKED_REASON(lock, "AcquireMastership (step A2)");
00236 VRLog.start();
00237 needCommit = true;
00238
00239 err = droot->lookupPathname(pathname, dvs, dwho, pathnameSep);
00240 if (err != VestaSource::ok) {
00241 Repos::dprintf(DBG_MASTERSHIP,
00242 "AcquireMastership: local lookup failed: %s)\n",
00243 VestaSource::errorCodeString(err));
00244 goto done;
00245 }
00246
00247
00248 assert(RootLongId.isAncestorOf(dvs->longid));
00249
00250
00251 if (dvs->master) {
00252 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: already master\n");
00253 err = VestaSource::ok;
00254 goto done;
00255 }
00256
00257
00258
00259 if (!TypeCheck(svs->type, dvs->type)) {
00260 Repos::dprintf(DBG_MASTERSHIP,
00261 "AcquireMastership: type %s -> %s disallowed\n",
00262 VestaSource::typeTagString(svs->type),
00263 VestaSource::typeTagString(dvs->type));
00264 err = VestaSource::inappropriateOp;
00265 goto done;
00266 }
00267
00268
00269
00270 sprintf(hpbuf, "%s:%s", srcHost, srcPort);
00271 if (!dvs->ac.check(dwho, AccessControl::ownership) ||
00272 !MastershipAccessCheck(dvs, "#mastership-from", hpbuf)) {
00273 Repos::dprintf(DBG_MASTERSHIP,
00274 "AcquireMastership: noPermission in local repository\n");
00275 err = VestaSource::noPermission;
00276 goto done;
00277 }
00278
00279
00280
00281 uid = UniqueId();
00282 now = time(NULL);
00283 char requestid[1024];
00284 xp = requestid;
00285 int i;
00286 uidbytes = (unsigned char*) uid.Words();
00287 for (i=0; i<FP::ByteCnt; i++) {
00288 sprintf(xp, "%02x", uidbytes[i]);
00289 xp += 2;
00290 }
00291 *xp = '\000';
00292 sprintf(xp, " %u %s:%s %s", now, srcHost, srcPort,
00293 (myMasterHint.Empty()
00294 ? myHostPort.cchars()
00295 : myMasterHint.cchars()));
00296 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: requestid \"%s\"\n", requestid);
00297
00298
00299 err = dvs->addAttrib("#master-request", requestid, dwho, now);
00300 if (err != VestaSource::ok) {
00301 Repos::dprintf(DBG_MASTERSHIP,
00302 "AcquireMastership: record requestid failed: %s\n",
00303 VestaSource::errorCodeString(err));
00304 goto done;
00305 }
00306
00307
00308 needRecovery = true;
00309
00310
00311 VRLog.put("(acqm ");
00312 LogPutQuotedString(VRLog, pathname);
00313 VRLog.put(" ");
00314 char sep[2];
00315 sep[0] = pathnameSep;
00316 sep[1] = '\000';
00317 LogPutQuotedString(VRLog, sep);
00318 VRLog.put(" ");
00319 LogPutQuotedString(VRLog, requestid);
00320 VRLog.put(")\n");
00321
00322
00323 delete dvs;
00324 dvs = droot = NULL;
00325 VRLog.commit();
00326 needCommit = false;
00327 lock->releaseWrite();
00328 lock = NULL;
00329
00330
00331
00332
00333
00334 try {
00335 err = svs->cedeMastership(requestid, &grantid, swho);
00336
00337 } catch (SRPC::failure f) {
00338
00339
00340 err = VestaSource::rpcFailure;
00341 goto done;
00342 }
00343
00344
00345
00346
00347 if (err != VestaSource::ok) {
00348
00349
00350
00351 Repos::dprintf(DBG_MASTERSHIP,
00352 "AcquireMastership: cedeMastership refused: %s\n",
00353 VestaSource::errorCodeString(err));
00354 (void) FinishMastership(pathname, pathnameSep, requestid);
00355 needRecovery = false;
00356 goto done;
00357 }
00358
00359 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: grantid \"%s\"\n", grantid);
00360
00361
00362
00363
00364 err = AcceptMastership(svs, pathname, pathnameSep, requestid, grantid);
00365 if (err != VestaSource::ok) {
00366 Repos::dprintf(DBG_MASTERSHIP,
00367 "AcquireMastership: AcceptMastership (A4) failed: %s\n",
00368 VestaSource::errorCodeString(err));
00369
00370 (void) FinishMastership(pathname, pathnameSep, requestid);
00371 needRecovery = false;
00372 goto done;
00373 }
00374
00375
00376
00377
00378 err = AcknowledgeMastership(svs, grantid, swho);
00379 if (err != VestaSource::ok && err != VestaSource::rpcFailure) {
00380 Repos::dprintf(DBG_MASTERSHIP,
00381 "AcquireMastership: AcknowledgeMastership (A5) failed: %s\n",
00382 VestaSource::errorCodeString(err));
00383
00384 (void) FinishMastership(pathname, pathnameSep, requestid);
00385 needRecovery = false;
00386 goto done;
00387 }
00388
00389
00390
00391
00392 err = FinishMastership(pathname, pathnameSep, grantid);
00393
00394 done:
00395 if (err == VestaSource::rpcFailure && needRecovery) {
00396
00397 QueueRecoverMastership(pathname, pathnameSep, requestid);
00398 }
00399 if (grantid) delete[] grantid;
00400 if (sroot) delete sroot;
00401 if (svs) delete svs;
00402 if (dvs) delete dvs;
00403 if (needCommit) VRLog.commit();
00404 if (lock) lock->release();
00405 Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership returns %s\n",
00406 VestaSource::errorCodeString(err));
00407 return err;
00408 }
00409
00410
00411
00412
00413
00414 static VestaSource::errorCode
00415 AcceptMastership(VestaSource* svs,
00416 const char* pathname, char pathnameSep,
00417 const char* requestid, const char* grantid) throw ()
00418 {
00419 VestaSource* droot = NULL;
00420 ReadersWritersLock* lock = NULL;
00421 VestaSource::errorCode err = VestaSource::ok;
00422 VestaSource* dvs = NULL;
00423
00424
00425 droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00426 RWLOCK_LOCKED_REASON(lock, "AcceptMastership");
00427 VRLog.start();
00428
00429
00430 int reqlen = strlen(requestid);
00431 bool ok = (strncmp(requestid, grantid, reqlen) == 0);
00432 if (!ok || grantid[reqlen] != ' ') {
00433 Repos::dprintf(DBG_MASTERSHIP, "AcceptMastership: bad grantid \"%s\"\n", grantid);
00434 err = VestaSource::invalidArgs;
00435 goto done;
00436 }
00437
00438
00439 err = droot->lookupPathname(pathname, dvs, NULL, pathnameSep);
00440 if (err != VestaSource::ok) {
00441
00442
00443 Repos::dprintf(DBG_MASTERSHIP,
00444 "AcceptMastership: error on 2nd local lookup: %s\n",
00445 VestaSource::errorCodeString(err));
00446 goto done;
00447 }
00448
00449
00450
00451
00452
00453
00454 if (!TypeCheck(svs->type, dvs->type)) {
00455 Repos::dprintf(DBG_MASTERSHIP,
00456 "AcceptMastership: type %s -> %s disallowed\n",
00457 VestaSource::typeTagString(svs->type),
00458 VestaSource::typeTagString(dvs->type));
00459 err = VestaSource::inappropriateOp;
00460 goto done;
00461 }
00462
00463
00464
00465
00466 if (!dvs->inAttribs("#master-request", requestid)) {
00467 Repos::dprintf(DBG_MASTERSHIP,
00468 "AcceptMastership: grantid \"%s\" has no requestid\n", grantid);
00469 err = VestaSource::ok;
00470 goto done;
00471 }
00472
00473
00474 if(!myMasterHint.Empty())
00475 {
00476 err = dvs->setAttrib("master-repository", myMasterHint.cchars(), NULL);
00477 assert(err == VestaSource::ok);
00478 }
00479
00480
00481
00482 if (dvs->type == VestaSource::appendableDirectory) {
00483 const char* p;
00484 char arcbuf[MAX_ARC_LEN+1];
00485 char hintbuf[MAX_ARC_LEN+1];
00486 time_t hinttime;
00487 p = grantid + strlen(requestid) + 1;
00488 while (*p) {
00489
00490 const char* q = strchr(p, '/');
00491 if (q == NULL || q-p > MAX_ARC_LEN) {
00492 Repos::dprintf(DBG_MASTERSHIP,
00493 "AcceptMastership: bad grantid (arc) \"%s\"\n", grantid);
00494 err = VestaSource::invalidArgs;
00495 goto done;
00496 }
00497 memcpy(arcbuf, p, q-p);
00498 arcbuf[q-p] = '\000';
00499 p = q+1;
00500
00501
00502 q = strchr(p, '/');
00503 if (q == NULL || q-p > MAX_ARC_LEN) {
00504 Repos::dprintf(DBG_MASTERSHIP,
00505 "AcceptMastership: bad grantid (hint) \"%s\"\n", grantid);
00506 err = VestaSource::invalidArgs;
00507 goto done;
00508 }
00509 memcpy(hintbuf, p, q-p);
00510 hintbuf[q-p] = '\000';
00511 p = q+1;
00512
00513
00514 q = strchr(p, '/');
00515 if (q == NULL || !isdigit(*p)) {
00516 Repos::dprintf(DBG_MASTERSHIP,
00517 "AcceptMastership: bad grantid (htime) \"%s\"\n", grantid);
00518 err = VestaSource::invalidArgs;
00519 goto done;
00520 }
00521 hinttime = (time_t) atoi(p);
00522 p = q + 1;
00523
00524
00525 VestaSource* child;
00526 err = dvs->lookup(arcbuf, child, NULL);
00527 if (err == VestaSource::notFound) {
00528
00529 err = dvs->insertStub(arcbuf, false, NULL,
00530 VestaSource::dontReplace, &child);
00531 Repos::dprintf(DBG_MASTERSHIP,
00532 "AcceptMastership: inserted stub %s\n", arcbuf);
00533 }
00534 assert(err == VestaSource::ok);
00535
00536 if (child->master) {
00537
00538
00539
00540
00541 err = child->clearAttrib("master-repository");
00542 } else {
00543
00544
00545
00546
00547
00548 if (hinttime > 0) {
00549 err = child->setAttrib("master-repository", hintbuf, NULL, hinttime);
00550 }
00551 }
00552 assert(err == VestaSource::ok);
00553 delete child;
00554 }
00555 }
00556
00557
00558
00559
00560
00561
00562 err = dvs->removeAttrib("#master-request", requestid, NULL);
00563 assert(err == VestaSource::ok);
00564 err = dvs->addAttrib("#master-request", grantid, NULL);
00565 assert(err == VestaSource::ok);
00566
00567
00568 err = dvs->setMaster(true, NULL);
00569 assert(err == VestaSource::ok);
00570
00571 done:
00572 if (dvs) delete dvs;
00573 if (lock) {
00574 VRLog.commit();
00575 lock->releaseWrite();
00576 }
00577 return err;
00578 }
00579
00580
00581
00582
00583 static VestaSource::errorCode
00584 AcknowledgeMastership(VestaSource* svs, const char* grantid,
00585 AccessControl::Identity swho) throw ()
00586 {
00587 try {
00588 return svs->removeAttrib("#master-grant", grantid, swho);
00589 } catch (SRPC::failure f) {
00590 Repos::dprintf(DBG_MASTERSHIP,
00591 "AcquireMastership: RPC to erase grantid failed\n");
00592 return VestaSource::rpcFailure;
00593 }
00594 }
00595
00596
00597
00598
00599
00600
00601
00602
00603 static VestaSource::errorCode
00604 FinishMastership(const char* pathname, char pathnameSep,
00605 const char* id) throw ()
00606 {
00607 VestaSource* droot = NULL;
00608 ReadersWritersLock* lock = NULL;
00609 VestaSource::errorCode err = VestaSource::ok;
00610 VestaSource* dvs = NULL;
00611
00612
00613 droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00614 RWLOCK_LOCKED_REASON(lock, "FinishMastership");
00615 VRLog.start();
00616
00617
00618
00619
00620 char sep[2];
00621 sep[0] = pathnameSep;
00622 sep[1] = '\000';
00623 VRLog.put("(finm ");
00624 LogPutQuotedString(VRLog, pathname);
00625 VRLog.put(" ");
00626 LogPutQuotedString(VRLog, sep);
00627 VRLog.put(" ");
00628 LogPutQuotedString(VRLog, id);
00629 VRLog.put(")\n");
00630
00631
00632 err = droot->lookupPathname(pathname, dvs, NULL, pathnameSep);
00633 if (err != VestaSource::ok) {
00634
00635
00636
00637 Repos::dprintf(DBG_MASTERSHIP,
00638 "AcceptMastership: error on 3rd local lookup: %s\n",
00639 VestaSource::errorCodeString(err));
00640 err = VestaSource::ok;
00641 goto done;
00642 }
00643
00644
00645 err = dvs->removeAttrib("#master-request", id, NULL);
00646 assert(err == VestaSource::ok);
00647
00648 done:
00649 if (dvs) delete dvs;
00650 if (lock) {
00651 VRLog.commit();
00652 lock->releaseWrite();
00653 }
00654 return err;
00655 }
00656
00657
00658
00659
00660 struct RecoverMastershipInfo {
00661 RecoverMastershipInfo *next;
00662 RecoverMastershipInfo *prev;
00663 const char* pathname;
00664 char pathnameSep;
00665 const char* requestid;
00666 };
00667 static Basics::mutex recoverMastershipMu;
00668 static Basics::cond recoverMastershipCond;
00669 static Basics::thread recoverMastershipTh;
00670 static RecoverMastershipInfo* recoverMastershipList;
00671 #define RECOVER_MASTERSHIP_SLEEP 3600 // one hour
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693 struct CheckMatchClosure {
00694 const char* requestid;
00695 int requestidlen;
00696 const char* matchedid;
00697 };
00698
00699 static bool
00700 CheckMatchCallback(void* closure, const char* value)
00701 {
00702 CheckMatchClosure* cl = (CheckMatchClosure*) closure;
00703 if (strncmp(cl->requestid, value, cl->requestidlen) == 0) {
00704
00705 cl->matchedid = strdup(value);
00706 return false;
00707 }
00708 return true;
00709 }
00710
00711 static VestaSource::errorCode
00712 RecoverMastership(const char* pathname, char pathnameSep,
00713 const char* requestid) throw ()
00714 {
00715 Repos::dprintf(DBG_MASTERSHIP, "RecoverMastership of %s: requestid %s\n",
00716 pathname, requestid);
00717
00718 VestaSource* sroot = NULL;
00719 VestaSource* svs = NULL;
00720 VestaSource::errorCode err = VestaSource::ok;
00721 char srcHost[MAX_ARC_LEN+1];
00722 char srcPort[MAX_ARC_LEN+1];
00723 const char* p;
00724 const char* q;
00725 const char* srcid = NULL;
00726 const char* dstid = NULL;
00727 ReadersWritersLock* lock = NULL;
00728 VestaSource* droot = NULL;
00729 VestaSource* dvs = NULL;
00730
00731
00732
00733 p = strchr(requestid, ' ');
00734 p++;
00735
00736 p = strchr(p, ' ');
00737 p++;
00738
00739 q = strchr(p, ':');
00740 memcpy(srcHost, p, q-p);
00741 srcHost[q-p] = '\000';
00742 p = q+1;
00743
00744 q = strchr(p, ' ');
00745 memcpy(srcPort, p, q-p);
00746 srcPort[q-p] = '\000';
00747 p = q+1;
00748
00749
00750 droot = VestaSource::repositoryRoot(LongId::readLock, &lock);
00751 RWLOCK_LOCKED_REASON(lock, "RecoverMastership");
00752 err = droot->lookupPathname(pathname, dvs, NULL, pathnameSep);
00753 if (err != VestaSource::ok) {
00754 Repos::dprintf(DBG_MASTERSHIP,
00755 "RecoverMastership: local lookup failed: %s\n",
00756 VestaSource::errorCodeString(err));
00757
00758 goto done;
00759 }
00760
00761 CheckMatchClosure cl;
00762 cl.requestid = requestid;
00763 cl.requestidlen = strlen(requestid);
00764 cl.matchedid = NULL;
00765 dvs->getAttrib("#master-request", CheckMatchCallback, &cl);
00766 dstid = cl.matchedid;
00767
00768 dvs = droot = NULL;
00769 lock->releaseRead();
00770 lock = NULL;
00771
00772 if (dstid == NULL) {
00773
00774
00775 Repos::dprintf(DBG_MASTERSHIP,
00776 "RecoverMastership: #master-request was manually deleted\n");
00777 (void) FinishMastership(pathname, pathnameSep, requestid);
00778 err = VestaSource::ok;
00779 goto done;
00780 }
00781
00782 try {
00783
00784 sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00785 err = sroot->lookupPathname(pathname, svs, NULL, pathnameSep);
00786
00787
00788 CheckMatchClosure cl;
00789 cl.requestid = requestid;
00790 cl.requestidlen = strlen(requestid);
00791 cl.matchedid = NULL;
00792 svs->getAttrib("#master-grant", CheckMatchCallback, &cl);
00793 srcid = cl.matchedid;
00794
00795 } catch (SRPC::failure f) {
00796 Repos::dprintf(DBG_MASTERSHIP,
00797 "RecoverMastership: RPC to %s:%s failed: %s (%d)\n",
00798 srcHost, srcPort, f.msg.cchars(), f.r);
00799 err = VestaSource::rpcFailure;
00800 goto done;
00801 }
00802
00803
00804 if (strlen(dstid) == strlen(requestid)) {
00805
00806 if (srcid == NULL) {
00807
00808 Repos::dprintf(DBG_MASTERSHIP,
00809 "RecoverMastership: case R1 (aborting)\n");
00810 err = FinishMastership(pathname, pathnameSep, dstid);
00811 } else {
00812
00813 Repos::dprintf(DBG_MASTERSHIP,
00814 "RecoverMastership: case R2 (accepting)\n");
00815 err = AcceptMastership(svs, pathname, pathnameSep, requestid, srcid);
00816 if (err != VestaSource::ok) {
00817
00818 Repos::dprintf(DBG_MASTERSHIP,
00819 "RecoverMastership: failed to accept transfer: %s\n",
00820 VestaSource::errorCodeString(err));
00821 (void) FinishMastership(pathname, pathnameSep, requestid);
00822 goto done;
00823 }
00824 err = AcknowledgeMastership(svs, srcid);
00825 if (err != VestaSource::ok && err != VestaSource::rpcFailure) {
00826
00827 Repos::dprintf(DBG_MASTERSHIP,
00828 "RecoverMastership: failed to acknowledge transfer: %s\n",
00829 VestaSource::errorCodeString(err));
00830 (void) FinishMastership(pathname, pathnameSep, requestid);
00831 goto done;
00832 }
00833 err = FinishMastership(pathname, pathnameSep, srcid);
00834 }
00835 } else {
00836
00837 if (srcid != NULL) {
00838
00839 Repos::dprintf(DBG_MASTERSHIP, "RecoverMastership: case R3 (acknowledging)\n");
00840 err = AcknowledgeMastership(svs, srcid);
00841 if (err != VestaSource::ok && err != VestaSource::rpcFailure) {
00842
00843 Repos::dprintf(DBG_MASTERSHIP,
00844 "RecoverMastership: failed to acknowledge transfer: %s\n",
00845 VestaSource::errorCodeString(err));
00846 (void) FinishMastership(pathname, pathnameSep, requestid);
00847 goto done;
00848 }
00849 err = FinishMastership(pathname, pathnameSep, srcid);
00850 } else {
00851
00852 Repos::dprintf(DBG_MASTERSHIP,
00853 "RecoverMastership: case R4 (finishing)\n");
00854 err = FinishMastership(pathname, pathnameSep, dstid);
00855 }
00856 }
00857
00858 done:
00859 if (sroot) delete sroot;
00860 if (svs) delete svs;
00861 if (lock) lock->release();
00862 if (dvs) delete dvs;
00863 if (srcid) delete[] srcid;
00864 if (dstid) delete[] dstid;
00865 Repos::dprintf(DBG_MASTERSHIP, "RecoverMastership returns %s\n",
00866 VestaSource::errorCodeString(err));
00867 return err;
00868 }
00869
00870 void
00871 QueueRecoverMastership(const char* pathname, char pathnameSep,
00872 const char* requestid) throw ()
00873 {
00874 recoverMastershipMu.lock();
00875 RecoverMastershipInfo* info = NEW(RecoverMastershipInfo);
00876 info->pathname = strdup(pathname);
00877 info->pathnameSep = pathnameSep;
00878 info->requestid = strdup(requestid);
00879
00880 info->next = recoverMastershipList;
00881 info->prev = NULL;
00882 if (recoverMastershipList) recoverMastershipList->prev = info;
00883 recoverMastershipList = info;
00884 recoverMastershipMu.unlock();
00885 recoverMastershipCond.signal();
00886 }
00887
00888
00889 void
00890 DequeueRecoverMastershipL(RecoverMastershipInfo* rminfo)
00891 {
00892 if (rminfo->prev) {
00893 rminfo->prev->next = rminfo->next;
00894 } else {
00895 recoverMastershipList = rminfo->next;
00896 }
00897 if (rminfo->next) {
00898 rminfo->next->prev = rminfo->prev;
00899 }
00900 free((void*)rminfo->pathname);
00901 free((void*)rminfo->requestid);
00902 delete rminfo;
00903 }
00904
00905 void
00906 DequeueRecoverMastership(RecoverMastershipInfo* rminfo)
00907 {
00908 recoverMastershipMu.lock();
00909 DequeueRecoverMastershipL(rminfo);
00910 recoverMastershipMu.unlock();
00911 }
00912
00913 static void*
00914 RecoverMastershipThread(void *arg) throw ()
00915 {
00916 VestaSource::errorCode err;
00917 struct timespec waittime;
00918 waittime.tv_sec = RECOVER_MASTERSHIP_SLEEP;
00919 waittime.tv_nsec = 0;
00920
00921 recoverMastershipMu.lock();
00922 for (;;) {
00923 while (recoverMastershipList == NULL) {
00924 recoverMastershipCond.wait(recoverMastershipMu);
00925 }
00926
00927
00928 RecoverMastershipInfo* first = recoverMastershipList;
00929 RecoverMastershipInfo* cur = first;
00930
00931 while (cur) {
00932
00933 recoverMastershipMu.unlock();
00934
00935
00936 err = RecoverMastership(cur->pathname, cur->pathnameSep, cur->requestid);
00937
00938
00939 recoverMastershipMu.lock();
00940 RecoverMastershipInfo* next = cur->next;
00941 if (err != VestaSource::rpcFailure) {
00942
00943 DequeueRecoverMastershipL(cur);
00944 }
00945 cur = next;
00946 }
00947
00948 if (recoverMastershipList == first) {
00949
00950 struct timespec abstime;
00951 #if 0
00952 pthread_get_expiration_np(&waittime, &abstime);
00953 #else
00954 struct timeval now;
00955 gettimeofday(&now, NULL);
00956 abstime.tv_sec = now.tv_sec + waittime.tv_sec;
00957 abstime.tv_nsec = now.tv_usec * 1000 + waittime.tv_nsec;
00958 if (abstime.tv_nsec >= 1000000000) {
00959 abstime.tv_nsec -= 1000000000;
00960 abstime.tv_sec++;
00961 }
00962 #endif
00963 recoverMastershipCond.timedwait(recoverMastershipMu, &abstime);
00964 }
00965 }
00966 }
00967
00968
00969
00970 static void
00971 AcqmCallback(RecoveryReader* rr, char& c)
00972 throw(VestaLog::Error, VestaLog::Eof)
00973 {
00974 char* pathname;
00975 char sep[2];
00976 char *requestid;
00977
00978 rr->getNewQuotedString(c, pathname);
00979 rr->getQuotedString(c, sep, sizeof(sep));
00980 rr->getNewQuotedString(c, requestid);
00981 QueueRecoverMastership(pathname, sep[0], requestid);
00982 delete[] pathname;
00983 delete[] requestid;
00984 }
00985
00986
00987 static void
00988 FinmCallback(RecoveryReader* rr, char& c)
00989 throw(VestaLog::Error, VestaLog::Eof)
00990 {
00991 char* pathname;
00992 char sep[2];
00993 char *finid;
00994
00995 rr->getNewQuotedString(c, pathname);
00996 rr->getQuotedString(c, sep, sizeof(sep));
00997 rr->getNewQuotedString(c, finid);
00998
00999 RecoverMastershipInfo* rminfo = recoverMastershipList;
01000 bool removed = false;
01001 while (rminfo && !removed) {
01002 RecoverMastershipInfo* next = rminfo->next;
01003 if (strcmp(pathname, rminfo->pathname) == 0 &&
01004 sep[0] == rminfo->pathnameSep &&
01005 strncmp(finid, rminfo->requestid, strlen(rminfo->requestid)) == 0) {
01006 DequeueRecoverMastership(rminfo);
01007 removed = true;
01008 }
01009 rminfo = next;
01010 }
01011 assert(removed);
01012
01013 delete[] pathname;
01014 delete[] finid;
01015 }
01016
01017
01018
01019
01020 void
01021 MastershipCheckpoint(fstream& ckpt) throw ()
01022 {
01023 recoverMastershipMu.lock();
01024 RecoverMastershipInfo* rminfo = recoverMastershipList;
01025 while (rminfo) {
01026 ckpt << "(acqm ";
01027 PutQuotedString(ckpt, rminfo->pathname);
01028 ckpt << " ";
01029 char sep[2];
01030 sep[0] = rminfo->pathnameSep;
01031 sep[1] = '\000';
01032 PutQuotedString(ckpt, sep);
01033 ckpt << " ";
01034 PutQuotedString(ckpt, rminfo->requestid);
01035 ckpt << ")\n";
01036
01037 rminfo = rminfo->next;
01038 }
01039 recoverMastershipMu.unlock();
01040 }
01041
01042
01043
01044
01045
01046
01047
01048
01049
01050 struct AttribTimestampClosure {
01051 const char* name;
01052 const char* value;
01053 time_t timestamp;
01054 };
01055
01056 static bool
01057 AttribTimestampCallback(void* closure, VestaSource::attribOp op,
01058 const char* name, const char* value, time_t timestamp)
01059 {
01060 AttribTimestampClosure *cl = (AttribTimestampClosure *) closure;
01061 if ((op == VestaSource::opSet || op == VestaSource::opAdd) &&
01062 strcmp(name, cl->name) == 0 &&
01063 strcmp(value, cl->value) == 0) {
01064 cl->timestamp = timestamp;
01065 return false;
01066 } else {
01067 return true;
01068 }
01069 }
01070
01071
01072
01073
01074 struct HintsClosure {
01075 VestaSource* vs;
01076 Text list;
01077 time_t now;
01078 Text nowtxt;
01079 };
01080
01081 static bool
01082 HintsCallback(void* closure, VestaSource::typeTag type, Arc arc,
01083 unsigned int index, Bit32 pseudoInode, ShortId sid, bool master)
01084 {
01085 HintsClosure* cl = (HintsClosure*) closure;
01086 VestaSource* child;
01087 VestaSource::errorCode err = cl->vs->lookupIndex(index, child);
01088 assert(err == VestaSource::ok);
01089 if (master) {
01090 if(!myMasterHint.Empty())
01091 child->setAttrib("master-repository", myMasterHint.cchars(),
01092 NULL, cl->now);
01093 cl->list = (cl->list + arc + "/" +
01094 (myMasterHint.Empty()
01095 ? myHostPort.cchars()
01096 : myMasterHint.cchars()) + "/" +
01097 cl->nowtxt + "/");
01098 } else {
01099 const char* hint = child->getAttrib("master-repository");
01100 time_t hintts;
01101 if (hint == NULL) {
01102 hint = "";
01103 hintts = 0;
01104 } else {
01105 AttribTimestampClosure atcl;
01106 atcl.name = "master-repository";
01107 atcl.value = hint;
01108 atcl.timestamp = 0;
01109 child->getAttribHistory(AttribTimestampCallback, &atcl);
01110 hintts = atcl.timestamp;
01111 }
01112 char hinttsChars[32];
01113 sprintf(hinttsChars, "%d", hintts);
01114 cl->list = cl->list + arc + "/" + hint + "/" + hinttsChars + "/";
01115 }
01116 delete child;
01117 return true;
01118 }
01119
01120
01121
01122
01123
01124
01125
01126
01127
01128
01129
01130
01131
01132
01133
01134
01135
01136
01137
01138
01139
01140
01141
01142
01143 VestaSource::errorCode
01144 VestaSource::cedeMastership(const char* requestid, const char **grantidOut,
01145 AccessControl::Identity who)
01146 throw (SRPC::failure )
01147 {
01148 VestaSource::errorCode err;
01149
01150 if (Repos::isDebugLevel(DBG_MASTERSHIP)) {
01151 char buf[100];
01152 Repos::pr_nfs_fh(buf, (nfs_fh*) longid.value.byte);
01153 Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: longid %s, requestid \"%s\"\n",
01154 buf, requestid);
01155 }
01156
01157
01158 if (!RootLongId.isAncestorOf(longid))
01159 return VestaSource::invalidArgs;
01160 if (!master) return VestaSource::notMaster;
01161 const char* tstmp = strchr(requestid, ' ');
01162 if (tstmp == NULL) {
01163
01164 Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: bad requestid \"%s\"\n",
01165 requestid);
01166 return VestaSource::invalidArgs;
01167 }
01168 tstmp++;
01169 const char* srcHP = strchr(tstmp, ' ');
01170 if (srcHP == NULL) {
01171
01172 Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: bad requestid \"%s\"\n",
01173 requestid);
01174 return VestaSource::invalidArgs;
01175 }
01176 srcHP++;
01177 const char* dstHP = strrchr(srcHP, ' ');
01178 if (dstHP == NULL) {
01179
01180 Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: bad requestid \"%s\"\n",
01181 requestid);
01182 return VestaSource::invalidArgs;
01183 }
01184 dstHP++;
01185 int srclen = dstHP - srcHP - 1;
01186
01187
01188 if (!(srclen == myHostPort.Length() &&
01189 strncmp(srcHP, myHostPort.cchars(), srclen) == 0) &&
01190 !(!myMasterHint.Empty() &&
01191 srclen == myMasterHint.Length() &&
01192 strncmp(srcHP, myMasterHint.cchars(), srclen) == 0)) {
01193
01194 if(myMasterHint.Empty())
01195 Repos::dprintf(DBG_MASTERSHIP,
01196 "cedeMastership: bad requestid \"%s\"; srcHostPort != %s\n",
01197 requestid, myHostPort.cchars());
01198 else
01199 Repos::dprintf(DBG_MASTERSHIP,
01200 "cedeMastership: bad requestid \"%s\"; srcHostPort != %s or %s\n",
01201 requestid, myHostPort.cchars(), myMasterHint.cchars());
01202 return VestaSource::invalidArgs;
01203 }
01204 if (!ac.check(who, AccessControl::ownership) ||
01205 !MastershipAccessCheck(this, "#mastership-to", dstHP)) {
01206 return VestaSource::noPermission;
01207 }
01208
01209
01210 VRLog.start();
01211
01212
01213 HintsClosure cl;
01214 cl.vs = this;
01215 cl.list = " ";
01216 cl.now = time(NULL);
01217 char nowtxt[32];
01218 sprintf(nowtxt, "%d", cl.now);
01219 cl.nowtxt = nowtxt;
01220
01221 err = setAttrib("master-repository", dstHP, NULL, cl.now);
01222 assert(err == VestaSource::ok);
01223 if (type == VestaSource::appendableDirectory) {
01224 err = list(0, HintsCallback, &cl);
01225 assert(err == VestaSource::ok);
01226 }
01227
01228 int reqlen = strlen(requestid);
01229 char* grantid = NEW_PTRFREE_ARRAY(char, reqlen + cl.list.Length() + 1);
01230 strcpy(grantid, requestid);
01231 strcpy(grantid + reqlen, cl.list.cchars());
01232
01233
01234 err = addAttrib("#master-grant", grantid, NULL, cl.now);
01235 assert(err == VestaSource::ok);
01236
01237
01238 err = setMaster(false);
01239 assert(err == VestaSource::ok);
01240
01241
01242 Repos::dprintf(DBG_MASTERSHIP, "cedeMastership succeeded, grantid \"%s\"\n",
01243 grantid);
01244 *grantidOut = grantid;
01245 VRLog.commit();
01246 return VestaSource::ok;
01247 }
01248
01249 void
01250 MastershipInit1() throw ()
01251 {
01252 myHostPort =
01253 VDirSurrogate::defaultHost() + ":" + VDirSurrogate::defaultPort();
01254
01255
01256 if(myHostPort.FindChar('/') != -1)
01257 {
01258 Repos::dprintf(DBG_ALWAYS,
01259 "Fatal: Default repository host:port contains a slash\n");
01260 abort();
01261 }
01262 if(myHostPort.FindChar(' ') != -1)
01263 {
01264 Repos::dprintf(DBG_ALWAYS,
01265 "Fatal: Default repository host:port contains a space\n");
01266 abort();
01267 }
01268
01269
01270 myMasterHint = myHostPort;
01271 bool defaultMasterHint = true;
01272 try
01273 {
01274 defaultMasterHint =
01275 !VestaConfig::get("Repository", "master_hint", myMasterHint);
01276 }
01277 catch (VestaConfig::failure f)
01278 {
01279
01280
01281 cerr << "VestaConfig::failure " << f.msg << endl;
01282 abort();
01283 }
01284
01285
01286 if(!defaultMasterHint && !myMasterHint.Empty())
01287 {
01288 bool badMasterHint = false;
01289
01290 if(myMasterHint.FindChar('/') != -1)
01291 {
01292 Repos::dprintf(DBG_ALWAYS,
01293 "Fatal: [Repository]master_hint contains a slash\n");
01294 badMasterHint = true;
01295 }
01296
01297 if(myMasterHint.FindChar(' ') != -1)
01298 {
01299 Repos::dprintf(DBG_ALWAYS,
01300 "Fatal: [Repository]master_hint contains a space\n");
01301 badMasterHint = true;
01302 }
01303
01304 int colonPos = myMasterHint.FindChar(':');
01305 if(colonPos == -1)
01306 {
01307 Repos::dprintf(DBG_ALWAYS,
01308 "Fatal: [Repository]master_hint doesn't contain a colon\n");
01309 badMasterHint = true;
01310 }
01311 else
01312 {
01313
01314
01315 for(int i = colonPos+1; i < myMasterHint.Length(); i++)
01316 {
01317 if(!isdigit(myMasterHint[i]))
01318 {
01319 Repos::dprintf(DBG_ALWAYS,
01320 "Fatal: Non-digit after colon in [Repository]master_hint\n");
01321 badMasterHint = true;
01322 break;
01323 }
01324 }
01325 }
01326 if(badMasterHint)
01327 abort();
01328 }
01329
01330
01331 if(defaultMasterHint)
01332 {
01333 try
01334 {
01335 in_addr my_addr =
01336 TCP_sock::host_to_addr(VDirSurrogate::defaultHost());
01337 cout << "Repository host resolves to " << inet_ntoa_r(my_addr)
01338 << endl;
01339 if((my_addr.s_addr & htonl(0xff000000)) ==
01340 htonl(127 << 24))
01341 {
01342
01343
01344 Repos::dprintf(DBG_ALWAYS,
01345 "Warning: Repository address is in 127.0.0.0/8; "
01346 "master-repository hint attributes will not be set\n");
01347 myMasterHint="";
01348 }
01349 }
01350 catch(...)
01351 {
01352
01353 Repos::dprintf(DBG_ALWAYS,
01354 "Fatal: Can't resolve the repository's host (%s)!\n",
01355 VDirSurrogate::defaultHost().cchars());
01356 abort();
01357 }
01358 }
01359
01360 RegisterRecoveryCallback("acqm", AcqmCallback);
01361 RegisterRecoveryCallback("finm", FinmCallback);
01362 }
01363
01364 void
01365 MastershipInit2() throw ()
01366 {
01367 Basics::thread_attr recoverMastership_attr;
01368 #if defined (_POSIX_THREAD_PRIORITY_SCHEDULING) && !defined(__linux__)
01369
01370 recoverMastership_attr.set_schedpolicy(SCHED_RR);
01371 recoverMastership_attr.set_inheritsched(PTHREAD_EXPLICIT_SCHED);
01372 recoverMastership_attr.set_sched_priority(sched_get_priority_min(SCHED_RR));
01373 #endif
01374
01375 recoverMastershipTh.fork(RecoverMastershipThread, NULL,
01376 recoverMastership_attr);
01377 }