Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

Mastership.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // 
00020 // Mastership.C
00021 // Last modified on Mon May 23 21:58:58 EDT 2005 by ken@xorian.net
00022 //      modified on Tue Aug  7 17:40:52 PDT 2001 by mann
00023 //
00024 // Implements AcquireMastership and cedeMastership, etc.
00025 //
00026 
00027 #include "UniqueId.H"
00028 #include "FP.H"
00029 #include "VestaSource.H"
00030 #include "VDirSurrogate.H"
00031 #include "logging.H"
00032 #include "VRConcurrency.H"
00033 #include "Mastership.H"
00034 #include "VLogHelp.H"
00035 
00036 #include "lock_timing.H"
00037 
00038 using std::fstream;
00039 using std::endl;
00040 using std::cerr;
00041 using std::cout;
00042 
00043 Text myHostPort, myMasterHint;
00044 
00045 // Forward
00046 static VestaSource::errorCode
00047 AcceptMastership(VestaSource* svs, const char* pathname, char pathnameSep,
00048                  const char* requestid, const char* grantid) throw ();
00049 static VestaSource::errorCode
00050 AcknowledgeMastership(VestaSource* svs,
00051                       const char* grantid,
00052                       AccessControl::Identity swho = NULL) throw ();
00053 static VestaSource::errorCode
00054 FinishMastership(const char* pathname, char pathnameSep,
00055                  const char* grantid) throw ();
00056 static void* RecoverMastershipThread(void *arg) throw ();
00057 
00058 
00059 //
00060 // Check whether the local repository is permitted to acquire
00061 // mastership from (direction="#mastership-from") or cede mastership
00062 // to (direction="#mastership-to") the specified repository
00063 // (hostport).  The vs parameter is the local copy of the object.
00064 // Implementation: search upward for a #mastership-from or
00065 // #mastership-to attribute; if found, return true if it lists the
00066 // specified repository or "*".  If not, or none found, return false.
00067 //
00068 bool
00069 MastershipAccessCheck(VestaSource* vs, const char* direction,
00070                       const char* hostport) throw ()
00071 {
00072   bool free_vs = false;
00073   for (;;) {
00074     if (vs == NULL) return false;
00075     if (vs->getAttribConst(direction)) break;
00076     VestaSource *vs_parent = vs->longid.getParent().lookup();
00077     if(free_vs) 
00078       delete vs;
00079     vs = vs_parent;
00080     // We allocated vs, we need to free it.
00081     free_vs = true;
00082   }
00083   bool result = (vs->inAttribs(direction, hostport) ||
00084                  vs->inAttribs(direction, "*"));
00085   if(free_vs)
00086     delete vs;
00087   return result;
00088 }
00089 
00090 // Check whether mastership transfer from a master of type fromType to
00091 // a nonmaster of type toType is allowed.
00092 static bool
00093 TypeCheck(VestaSource::typeTag fromType, VestaSource::typeTag toType)
00094 {
00095   switch (toType) {
00096   case VestaSource::appendableDirectory:
00097     // Transferring to an appendable directory is safe only from another
00098     // appendable directory.
00099   case VestaSource::stub:
00100     // Transferring to a stub is safe only from another stub.
00101   case VestaSource::ghost:
00102     // Transferring to a ghost from something of a different type is
00103     // safe but probably a mistake, so we disallow it too.
00104     if (fromType != toType) {
00105       return false;
00106     }
00107     break;
00108 
00109   default:
00110     // Otherwise, any transfer is safe, assuming the replication
00111     // invariant holds.  If svs is not either a ghost or of the same
00112     // type as dvs, however, the replication invariant is already
00113     // violated, so we disallow the transfer.
00114     if (fromType != toType && fromType != VestaSource::ghost) {
00115       return false;
00116     }
00117     break;
00118   }
00119   return true;
00120 }
00121 
00122 //
00123 // Main driver routine to acquire mastership for pathname.  Major
00124 // steps, each separately atomic:
00125 //
00126 // A1: Check that srcHost:srcPort can be contacted and is master.
00127 // A2: Choose a requestid and record it locally.
00128 // A3: Ask srcHost:srcPort to cede mastership, recording and returning grantid.
00129 //  A3x: If srcHost:srcPort is up but says no, erase requestid and quit.
00130 // A4: Fill in missing children, set master flag, and record grantid.
00131 // A5: Ask srcHost:srcPort to erase grantid.
00132 // A6: Erase grantid.
00133 //
00134 // Brief motivation: We want to always have some state recorded at the
00135 // destination repository until the transfer is entirely finished, so
00136 // that we can always drive recovery of transfers that have failed
00137 // partway through from the destination.  If we didn't need this, we
00138 // could merge step A6 into A4.
00139 // 
00140 
00141 VestaSource::errorCode
00142 AcquireMastership(const char* pathname,
00143                   const char* srcHost, const char* srcPort, char pathnameSep,
00144                   AccessControl::Identity dwho, AccessControl::Identity swho)
00145      throw ()
00146 {
00147   VestaSource* sroot = NULL;
00148   VestaSource* droot = NULL;
00149   VestaSource* dvs = NULL;
00150   VestaSource* svs = NULL;
00151   ReadersWritersLock* lock = NULL;
00152   const char* grantid = NULL;
00153   VestaSource::errorCode err = VestaSource::ok;
00154   char arcbuf[MAX_ARC_LEN+1], hostbuf[MAX_ARC_LEN+1], portbuf[MAX_ARC_LEN+1];
00155   char hpbuf[2*MAX_ARC_LEN+2];
00156   bool needCommit = false;
00157   bool needRecovery = false;
00158   FP::Tag uid;
00159   time_t now = 0;
00160   char *xp = NULL;
00161   unsigned char* uidbytes = NULL;
00162 
00163   Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: %s\n", pathname);
00164 
00165   if (*srcHost == '\000' || *srcPort == '\000') {
00166     Repos::dprintf(DBG_MASTERSHIP,
00167                    "AcquireMastership: srcHost:srcPort must be supplied\n");
00168     err = VestaSource::notMaster;
00169     goto done;
00170   }    
00171 
00172   Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: trying master %s:%s\n",
00173                  srcHost, srcPort);
00174 
00175   //
00176   // Step A1: Check that srcHost:srcPort can be contacted and is master.
00177   //
00178 
00179   // A few sanity checks.  The srcHost must be a fully qualified
00180   // domain name, while the srcPort can be a decimal number or a
00181   // service name (!).  We check for certain illegal characters that
00182   // would cause trouble in the code below or in SRPC.
00183   //
00184   if (strchr(srcHost, ' ') || strchr(srcHost, '/') ||
00185       strchr(srcPort, ' ') || strchr(srcPort, '/') ||
00186       !strchr(srcHost, '.') || strchr(srcPort, ':')) { 
00187     Repos::dprintf(DBG_MASTERSHIP,
00188                    "AcquireMastership: bad srcHost:srcPort %s:%s\n",
00189                    srcHost, srcPort);
00190     err = VestaSource::notMaster;
00191     goto done;
00192   }
00193 
00194   try {
00195     // Look up the remote object
00196     sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00197     err = sroot->lookupPathname(pathname, svs, swho, pathnameSep);
00198 
00199   } catch (SRPC::failure f) {
00200     Repos::dprintf(DBG_MASTERSHIP,
00201                    "AcquireMastership: initial RPC to %s:%s failed: %s (%d)\n", 
00202                    srcHost, srcPort, f.msg.cchars(), f.r);
00203     // In this case there can be no partially-done transfer
00204     err = VestaSource::rpcFailure;
00205     goto done;
00206   }
00207 
00208   if (err != VestaSource::ok) {
00209     Repos::dprintf(DBG_MASTERSHIP,
00210                    "AcquireMastership: remote lookup failed: %s\n", 
00211                    VestaSource::errorCodeString(err));
00212     if (err == VestaSource::notFound) err = VestaSource::notMaster;
00213     goto done;
00214   }
00215 
00216   if (!svs->hasAttribs()) {
00217     Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: not an independent object\n");
00218     err = VestaSource::inappropriateOp;
00219     goto done;
00220   }
00221 
00222   if (!svs->master) {
00223     Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: %s:%s is not master\n",
00224                    srcHost, srcPort);
00225     err = VestaSource::notMaster;
00226     goto done;
00227   }      
00228 
00229   //
00230   // Step A2: Choose a requestid and record it locally.
00231   //
00232 
00233   // Look up the object locally, get write lock, start atomic action
00234   droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00235   RWLOCK_LOCKED_REASON(lock, "AcquireMastership (step A2)");
00236   VRLog.start();
00237   needCommit = true;
00238 
00239   err = droot->lookupPathname(pathname, dvs, dwho, pathnameSep);
00240   if (err != VestaSource::ok) {
00241     Repos::dprintf(DBG_MASTERSHIP,
00242                    "AcquireMastership: local lookup failed: %s)\n", 
00243                    VestaSource::errorCodeString(err));
00244     goto done;
00245   }
00246 
00247   // Had better be under the appendable root
00248   assert(RootLongId.isAncestorOf(dvs->longid));
00249 
00250   // Nothing to do if already master
00251   if (dvs->master) {
00252     Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: already master\n");
00253     err = VestaSource::ok;
00254     goto done;
00255   }    
00256 
00257   // Check type compatibility.  We'll have to do this again just before
00258   // completing the transfer, in case something else is racing against us.
00259   if (!TypeCheck(svs->type, dvs->type)) {
00260     Repos::dprintf(DBG_MASTERSHIP,
00261                    "AcquireMastership: type %s -> %s disallowed\n",
00262                    VestaSource::typeTagString(svs->type),
00263                    VestaSource::typeTagString(dvs->type));
00264     err = VestaSource::inappropriateOp;
00265     goto done;
00266   }
00267 
00268   // Check permission locally.  If it changes hereafter, we'll succeed
00269   // anyway, because we do all local changes with who=NULL.
00270   sprintf(hpbuf, "%s:%s", srcHost, srcPort);
00271   if (!dvs->ac.check(dwho, AccessControl::ownership) ||
00272       !MastershipAccessCheck(dvs, "#mastership-from", hpbuf)) {
00273     Repos::dprintf(DBG_MASTERSHIP,
00274                    "AcquireMastership: noPermission in local repository\n");
00275     err = VestaSource::noPermission;
00276     goto done;
00277   }
00278 
00279   // Generate a requestid for the transfer, of the form
00280   //  "uid timestamp srcHost:srcPort dstHost:dstPort"
00281   uid = UniqueId();
00282   now = time(NULL);
00283   char requestid[1024];
00284   xp = requestid;
00285   int i;
00286   uidbytes = (unsigned char*) uid.Words();
00287   for (i=0; i<FP::ByteCnt; i++) {
00288     sprintf(xp, "%02x", uidbytes[i]);
00289     xp += 2;
00290   }
00291   *xp = '\000';
00292   sprintf(xp, " %u %s:%s %s", now, srcHost, srcPort, 
00293           (myMasterHint.Empty()
00294            ? myHostPort.cchars()
00295            : myMasterHint.cchars()));
00296   Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: requestid \"%s\"\n", requestid);
00297 
00298   // Record requestid locally; marks transfer as being in progress
00299   err = dvs->addAttrib("#master-request", requestid, dwho, now);
00300   if (err != VestaSource::ok) {
00301     Repos::dprintf(DBG_MASTERSHIP,
00302                    "AcquireMastership: record requestid failed: %s\n", 
00303                    VestaSource::errorCodeString(err));
00304     goto done;
00305   }
00306 
00307   // Recovery must run if we get an SRPC::failure henceforth
00308   needRecovery = true;
00309   // Log to ensure recovery runs if we crash
00310   // ('acqm' pathname pathnameSep requestid)
00311   VRLog.put("(acqm ");
00312   LogPutQuotedString(VRLog, pathname);
00313   VRLog.put(" ");
00314   char sep[2];
00315   sep[0] = pathnameSep;
00316   sep[1] = '\000';
00317   LogPutQuotedString(VRLog, sep);
00318   VRLog.put(" ");
00319   LogPutQuotedString(VRLog, requestid);
00320   VRLog.put(")\n");
00321 
00322   // Must end atomic action and release lock to do RPC, invalidating dvs
00323   delete dvs;
00324   dvs = droot = NULL;
00325   VRLog.commit();
00326   needCommit = false;
00327   lock->releaseWrite();
00328   lock = NULL;
00329 
00330   //
00331   // Step A3: Ask the remote repository to cede mastership, recording
00332   // and returning grantid.  See cedeMastership() below for details.
00333   //
00334   try {
00335     err = svs->cedeMastership(requestid, /*OUT*/&grantid, swho);
00336 
00337   } catch (SRPC::failure f) {
00338     // This is a bad case: we don't know for sure whether the remote
00339     // repository ceded mastership or not.  See recovery description below.
00340     err = VestaSource::rpcFailure;
00341     goto done;
00342   }
00343 
00344   //
00345   // Step A3x: Clean up if cedeMastership returned an error
00346   //
00347   if (err != VestaSource::ok) {
00348     // This is an annoying case.  We recorded the request, then it was
00349     // denied.  We have to delete the requestid locally before returning
00350     // the error.
00351     Repos::dprintf(DBG_MASTERSHIP,
00352                    "AcquireMastership: cedeMastership refused: %s\n", 
00353                    VestaSource::errorCodeString(err));
00354     (void) FinishMastership(pathname, pathnameSep, requestid);
00355     needRecovery = false;
00356     goto done;
00357   }
00358 
00359   Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership: grantid \"%s\"\n", grantid);
00360 
00361   //
00362   // Step A4: Fill in missing children, set master flag, and record grantid.
00363   //
00364   err = AcceptMastership(svs, pathname, pathnameSep, requestid, grantid);
00365   if (err != VestaSource::ok) {
00366      Repos::dprintf(DBG_MASTERSHIP, 
00367                     "AcquireMastership: AcceptMastership (A4) failed: %s\n",  
00368                     VestaSource::errorCodeString(err)); 
00369     // Give up on recovery
00370     (void) FinishMastership(pathname, pathnameSep, requestid);
00371     needRecovery = false;
00372     goto done;
00373   }
00374 
00375   //
00376   // Step A5: Ask srcHost:srcPort to erase grantid.
00377   //
00378   err = AcknowledgeMastership(svs, grantid, swho);
00379   if (err != VestaSource::ok && err != VestaSource::rpcFailure) {
00380     Repos::dprintf(DBG_MASTERSHIP,  
00381                    "AcquireMastership: AcknowledgeMastership (A5) failed: %s\n",   
00382                    VestaSource::errorCodeString(err));
00383     // Give up on recovery
00384     (void) FinishMastership(pathname, pathnameSep, requestid);
00385     needRecovery = false;
00386     goto done;
00387   }
00388 
00389   //
00390   // Step A6: Erase grantid locally.
00391   //
00392   err = FinishMastership(pathname, pathnameSep, grantid);
00393 
00394  done:
00395   if (err == VestaSource::rpcFailure && needRecovery) {
00396     // Schedule recovery
00397     QueueRecoverMastership(pathname, pathnameSep, requestid);
00398   }
00399   if (grantid) delete[] grantid;
00400   if (sroot) delete sroot;
00401   if (svs) delete svs;
00402   if (dvs) delete dvs;
00403   if (needCommit) VRLog.commit();
00404   if (lock) lock->release();
00405   Repos::dprintf(DBG_MASTERSHIP, "AcquireMastership returns %s\n",
00406                  VestaSource::errorCodeString(err));
00407   return err;
00408 }
00409 
00410 //
00411 // Do step A4 of mastership acquisition: Fill in missing children, set
00412 // master flag, and record grantid in place of requestid.
00413 //
00414 static VestaSource::errorCode
00415 AcceptMastership(VestaSource* svs,
00416                   const char* pathname, char pathnameSep,
00417                   const char* requestid, const char* grantid) throw ()
00418 {
00419   VestaSource* droot = NULL;
00420   ReadersWritersLock* lock = NULL;
00421   VestaSource::errorCode err = VestaSource::ok;
00422   VestaSource* dvs = NULL;
00423 
00424   // Start second atomic action
00425   droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00426   RWLOCK_LOCKED_REASON(lock, "AcceptMastership");
00427   VRLog.start();
00428 
00429   // Sanity check on grantid
00430   int reqlen = strlen(requestid);
00431   bool ok = (strncmp(requestid, grantid, reqlen) == 0);
00432   if (!ok || grantid[reqlen] != ' ') {
00433     Repos::dprintf(DBG_MASTERSHIP, "AcceptMastership: bad grantid \"%s\"\n", grantid);
00434     err = VestaSource::invalidArgs;
00435     goto done;
00436   }
00437 
00438   // Re-lookup dvs
00439   err = droot->lookupPathname(pathname, dvs, NULL, pathnameSep);
00440   if (err != VestaSource::ok) {
00441     // An error can occur here due to the local repository being
00442     // changed out from under us while we had the lock released.
00443     Repos::dprintf(DBG_MASTERSHIP,
00444                    "AcceptMastership: error on 2nd local lookup: %s\n", 
00445                    VestaSource::errorCodeString(err));
00446     goto done;
00447   }
00448 
00449   // A type check error can occur here due to the local repository
00450   // being changed out from under us while we had the lock released.
00451   // (Note: if svs's type changes after the initial check,
00452   // cedeMastership will fail with invalidArgs; we won't even get to
00453   // here.  We need this check only to cover changes to dvs->type.)
00454   if (!TypeCheck(svs->type, dvs->type)) {
00455     Repos::dprintf(DBG_MASTERSHIP,
00456                    "AcceptMastership: type %s -> %s disallowed\n",
00457                    VestaSource::typeTagString(svs->type),
00458                    VestaSource::typeTagString(dvs->type));
00459     err = VestaSource::inappropriateOp;
00460     goto done;
00461   }
00462 
00463   // If the local repository no longer has requestid recorded, then
00464   // don't proceed.  Could happen if someone changed it out from under
00465   // us while we had the lock released, but no one should do that.
00466   if (!dvs->inAttribs("#master-request", requestid)) {
00467     Repos::dprintf(DBG_MASTERSHIP,
00468                    "AcceptMastership: grantid \"%s\" has no requestid\n", grantid);
00469     err = VestaSource::ok; 
00470     goto done;
00471   }
00472 
00473   // Update the master-repository hint on dvs
00474   if(!myMasterHint.Empty())
00475     {
00476       err = dvs->setAttrib("master-repository", myMasterHint.cchars(), NULL);
00477       assert(err == VestaSource::ok);
00478     }
00479 
00480   // Update master-repository hints on children and install stubs for
00481   // any missing children.
00482   if (dvs->type == VestaSource::appendableDirectory) {
00483     const char* p;
00484     char arcbuf[MAX_ARC_LEN+1];
00485     char hintbuf[MAX_ARC_LEN+1]; // seems a safe size
00486     time_t hinttime;
00487     p = grantid + strlen(requestid) + 1;
00488     while (*p) {
00489       // Extract the child arc
00490       const char* q = strchr(p, '/');
00491       if (q == NULL || q-p > MAX_ARC_LEN) {
00492         Repos::dprintf(DBG_MASTERSHIP,
00493                        "AcceptMastership: bad grantid (arc) \"%s\"\n", grantid);
00494         err = VestaSource::invalidArgs;
00495         goto done;
00496       }
00497       memcpy(arcbuf, p, q-p);
00498       arcbuf[q-p] = '\000';
00499       p = q+1;
00500 
00501       // Extract the child master hint
00502       q = strchr(p, '/');
00503       if (q == NULL || q-p > MAX_ARC_LEN) {
00504         Repos::dprintf(DBG_MASTERSHIP,
00505                        "AcceptMastership: bad grantid (hint) \"%s\"\n", grantid);
00506         err = VestaSource::invalidArgs;
00507         goto done;
00508       }
00509       memcpy(hintbuf, p, q-p);
00510       hintbuf[q-p] = '\000';
00511       p = q+1;
00512 
00513       // Extract the hint's timestamp
00514       q = strchr(p, '/');
00515       if (q == NULL || !isdigit(*p)) {
00516         Repos::dprintf(DBG_MASTERSHIP,
00517                        "AcceptMastership: bad grantid (htime) \"%s\"\n", grantid);
00518         err = VestaSource::invalidArgs;
00519         goto done;
00520       }
00521       hinttime = (time_t) atoi(p);
00522       p = q + 1;
00523 
00524       // Look up the child
00525       VestaSource* child;
00526       err = dvs->lookup(arcbuf, child, NULL);
00527       if (err == VestaSource::notFound) {
00528         // Need to insert it as a nonmaster stub
00529         err = dvs->insertStub(arcbuf, false, NULL,
00530                               VestaSource::dontReplace, &child);
00531         Repos::dprintf(DBG_MASTERSHIP,
00532                        "AcceptMastership: inserted stub %s\n", arcbuf);
00533       }
00534       assert(err == VestaSource::ok);
00535 
00536       if (child->master) {
00537         // Any master child can have its hint deleted, since the hint on
00538         // the parent now implicitly covers it.  We ignore what the
00539         // remote repository had as a hint here; we are sure the local
00540         // repository is master.
00541         err = child->clearAttrib("master-repository");
00542       } else {
00543         // Child not mastered here; need to make sure its hint is
00544         // explicit. We copy the hint from the remote repository, if
00545         // any, using the supplied timestamp; if we have a more recent
00546         // hint already, that one will dominate.  If neither we nor
00547         // the remote repository has a hint, it will remain unset.
00548         if (hinttime > 0) {
00549           err = child->setAttrib("master-repository", hintbuf, NULL, hinttime);
00550         }
00551       }
00552       assert(err == VestaSource::ok);
00553       delete child;
00554     }
00555   }
00556 
00557   // Change #master-request to record the grantid, signifying that we
00558   // have finished accepting mastership and now have only to
00559   // acknowledge to the old master.  Note: The tiebreak rules
00560   // correctly order the add/remove/add/remove sequence on
00561   // #master-request even if all four get the same timestamp.
00562   err = dvs->removeAttrib("#master-request", requestid, NULL);
00563   assert(err == VestaSource::ok);
00564   err = dvs->addAttrib("#master-request", grantid, NULL);
00565   assert(err == VestaSource::ok);
00566 
00567   // At last we get to turn on the master flag!
00568   err = dvs->setMaster(true, NULL);
00569   assert(err == VestaSource::ok);
00570 
00571  done:
00572   if (dvs) delete dvs;
00573   if (lock) {
00574     VRLog.commit();
00575     lock->releaseWrite();
00576   }
00577   return err;
00578 }
00579 
00580 //
00581 // Do step A5 of mastership acquisition: Ask srcHost:srcPort to erase grantid.
00582 //
00583 static VestaSource::errorCode
00584 AcknowledgeMastership(VestaSource* svs, const char* grantid,
00585                       AccessControl::Identity swho) throw ()
00586 {
00587   try {
00588     return svs->removeAttrib("#master-grant", grantid, swho);
00589   } catch (SRPC::failure f) {
00590     Repos::dprintf(DBG_MASTERSHIP,
00591                    "AcquireMastership: RPC to erase grantid failed\n");
00592     return VestaSource::rpcFailure;
00593   }
00594 }
00595 
00596 //
00597 // Do step A6 of mastership acquisition: Erase grantid.
00598 // This code also serves for step A3x, with requestid in place of grantid.
00599 //
00600 // !! Maybe this should generally run if we fail with an error other
00601 // than rpcFailure, since there is no point in running recovery then.
00602 //
00603 static VestaSource::errorCode
00604 FinishMastership(const char* pathname, char pathnameSep,
00605                  const char* id) throw ()
00606 {
00607   VestaSource* droot = NULL;
00608   ReadersWritersLock* lock = NULL;
00609   VestaSource::errorCode err = VestaSource::ok;
00610   VestaSource* dvs = NULL;
00611 
00612   // Start third atomic action
00613   droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00614   RWLOCK_LOCKED_REASON(lock, "FinishMastership");
00615   VRLog.start();
00616 
00617   // Log to cancel out the acqm and prevent recovery running after
00618   // next restart.  
00619   // ('finm' pathname pathnameSep id)
00620   char sep[2];
00621   sep[0] = pathnameSep;
00622   sep[1] = '\000';
00623   VRLog.put("(finm ");
00624   LogPutQuotedString(VRLog, pathname);
00625   VRLog.put(" ");
00626   LogPutQuotedString(VRLog, sep);
00627   VRLog.put(" ");
00628   LogPutQuotedString(VRLog, id);
00629   VRLog.put(")\n");
00630 
00631   // Re-lookup dvs
00632   err = droot->lookupPathname(pathname, dvs, NULL, pathnameSep);
00633   if (err != VestaSource::ok) {
00634     // An error can occur here due to the local repository being
00635     // changed out from under us while we had the lock released.
00636     // At this point we are essentially done, so forget the error
00637     Repos::dprintf(DBG_MASTERSHIP,
00638                    "AcceptMastership: error on 3rd local lookup: %s\n", 
00639                    VestaSource::errorCodeString(err));
00640     err = VestaSource::ok;
00641     goto done;
00642   }
00643 
00644   // Delete the #master-request attribute, signifying that we're done.
00645   err = dvs->removeAttrib("#master-request", id, NULL);
00646   assert(err == VestaSource::ok);
00647 
00648  done:
00649   if (dvs) delete dvs;
00650   if (lock) {
00651     VRLog.commit();
00652     lock->releaseWrite();
00653   }
00654   return err;
00655 }
00656 
00657 //
00658 // Support for failure recovery
00659 //
00660 struct RecoverMastershipInfo {
00661   RecoverMastershipInfo *next;
00662   RecoverMastershipInfo *prev;
00663   const char* pathname;
00664   char pathnameSep;
00665   const char* requestid;
00666 };
00667 static Basics::mutex recoverMastershipMu;
00668 static Basics::cond recoverMastershipCond;
00669 static Basics::thread recoverMastershipTh;
00670 static RecoverMastershipInfo* recoverMastershipList;
00671 #define RECOVER_MASTERSHIP_SLEEP 3600  // one hour
00672 
00673 // Recovery for failure cases.  This must not be invoked twice on the
00674 // same object+requestid, or concurrently with AcquireMastership on
00675 // it.  It causes a background thread to persistently retry the
00676 // recovery until either it succeeds or someone manually removes the
00677 // matching #master-request attribute.
00678 //
00679 // R1: #master-request is a requestid, but has no corresponding
00680 // #master-grant.  AcquireMastership failed before step A3; delete the
00681 // #master-request using FinishMastership.
00682 //
00683 // R2: #master-request is a requestid and has a corresponding
00684 // #master-grant.  AcquireMastership failed before step A4; resume there.
00685 //
00686 // R3: #master-request is a grantid and has a corresponding #master-grant.
00687 // AcquireMastership failed before step A5; resume there.
00688 //
00689 // R4: #master-request is a grantid and has no corresponding #master-grant.
00690 // AcquireMastership failed before step A6; call FinishMastership to do it.
00691 //
00692 
00693 struct CheckMatchClosure {
00694   const char* requestid;  //in
00695   int requestidlen;       //in
00696   const char* matchedid;  //out
00697 };
00698 
00699 static bool
00700 CheckMatchCallback(void* closure, const char* value)
00701 {
00702   CheckMatchClosure* cl = (CheckMatchClosure*) closure;
00703   if (strncmp(cl->requestid, value, cl->requestidlen) == 0) {
00704     // Found a matching grantid
00705     cl->matchedid = strdup(value);
00706     return false;
00707   }
00708   return true;
00709 }
00710 
00711 static VestaSource::errorCode
00712 RecoverMastership(const char* pathname, char pathnameSep,
00713                   const char* requestid) throw ()
00714 {
00715   Repos::dprintf(DBG_MASTERSHIP, "RecoverMastership of %s: requestid %s\n",
00716                  pathname, requestid);
00717 
00718   VestaSource* sroot = NULL;
00719   VestaSource* svs = NULL;
00720   VestaSource::errorCode err = VestaSource::ok;
00721   char srcHost[MAX_ARC_LEN+1];
00722   char srcPort[MAX_ARC_LEN+1];
00723   const char* p;
00724   const char* q;
00725   const char* srcid = NULL;
00726   const char* dstid = NULL;
00727   ReadersWritersLock* lock = NULL;
00728   VestaSource* droot = NULL;
00729   VestaSource* dvs = NULL;
00730 
00731   // Parse stuff out of requestid
00732   // Skip uid
00733   p = strchr(requestid, ' ');
00734   p++;
00735   // Skip timestamp
00736   p = strchr(p, ' ');        // point to timestamp
00737   p++;
00738   // Find end of srcHost
00739   q = strchr(p, ':');
00740   memcpy(srcHost, p, q-p);
00741   srcHost[q-p] = '\000';
00742   p = q+1;
00743   // Find end of srcPort
00744   q = strchr(p, ' ');
00745   memcpy(srcPort, p, q-p);
00746   srcPort[q-p] = '\000';
00747   p = q+1;
00748   
00749   // Look up the local object to read #master-request
00750   droot = VestaSource::repositoryRoot(LongId::readLock, &lock);
00751   RWLOCK_LOCKED_REASON(lock, "RecoverMastership");
00752   err = droot->lookupPathname(pathname, dvs, NULL, pathnameSep);
00753   if (err != VestaSource::ok) {
00754     Repos::dprintf(DBG_MASTERSHIP,
00755                    "RecoverMastership: local lookup failed: %s\n", 
00756                    VestaSource::errorCodeString(err));
00757     // This causes recovery of this request to be abandoned
00758     goto done;
00759   }
00760   // Get matching local id, or NULL if none
00761   CheckMatchClosure cl;
00762   cl.requestid = requestid;
00763   cl.requestidlen = strlen(requestid);
00764   cl.matchedid = NULL;
00765   dvs->getAttrib("#master-request", CheckMatchCallback, &cl);
00766   dstid = cl.matchedid;
00767 
00768   dvs = droot = NULL;
00769   lock->releaseRead();
00770   lock = NULL;
00771 
00772   if (dstid == NULL) {
00773     // Someone deleted #master-request.  Quit.  Need to call
00774     // FinishMastership just to log the completion.
00775     Repos::dprintf(DBG_MASTERSHIP,
00776                    "RecoverMastership: #master-request was manually deleted\n");
00777     (void) FinishMastership(pathname, pathnameSep, requestid);
00778     err = VestaSource::ok;
00779     goto done;
00780   }
00781 
00782   try {
00783     // Look up the remote object to read #master-grant
00784     sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00785     err = sroot->lookupPathname(pathname, svs, NULL, pathnameSep);
00786 
00787     // Get matching grantid, or NULL if none
00788     CheckMatchClosure cl;
00789     cl.requestid = requestid;
00790     cl.requestidlen = strlen(requestid);
00791     cl.matchedid = NULL;
00792     svs->getAttrib("#master-grant", CheckMatchCallback, &cl);
00793     srcid = cl.matchedid;
00794 
00795   } catch (SRPC::failure f) {
00796     Repos::dprintf(DBG_MASTERSHIP,
00797                    "RecoverMastership: RPC to %s:%s failed: %s (%d)\n", 
00798                    srcHost, srcPort, f.msg.cchars(), f.r);
00799     err = VestaSource::rpcFailure;
00800     goto done;
00801   }
00802 
00803   // Is #master-request set to the requestid or the grantid?
00804   if (strlen(dstid) == strlen(requestid)) {
00805     // #master-request is the requestid
00806     if (srcid == NULL) {
00807       // Case R1: Do step A3x
00808       Repos::dprintf(DBG_MASTERSHIP,
00809                      "RecoverMastership: case R1 (aborting)\n");
00810       err = FinishMastership(pathname, pathnameSep, dstid);
00811     } else {
00812       // Case R2: Resume at step A4
00813       Repos::dprintf(DBG_MASTERSHIP,
00814                      "RecoverMastership: case R2 (accepting)\n");
00815       err = AcceptMastership(svs, pathname, pathnameSep, requestid, srcid);
00816       if (err != VestaSource::ok) {
00817         // Give up on recovery
00818         Repos::dprintf(DBG_MASTERSHIP,
00819                        "RecoverMastership: failed to accept transfer: %s\n",
00820                        VestaSource::errorCodeString(err));
00821         (void) FinishMastership(pathname, pathnameSep, requestid);
00822         goto done;
00823       }
00824       err = AcknowledgeMastership(svs, srcid);
00825       if (err != VestaSource::ok && err != VestaSource::rpcFailure) {
00826         // Give up on recovery
00827         Repos::dprintf(DBG_MASTERSHIP,
00828                        "RecoverMastership: failed to acknowledge transfer: %s\n",
00829                        VestaSource::errorCodeString(err));
00830         (void) FinishMastership(pathname, pathnameSep, requestid);
00831         goto done;
00832       }
00833       err = FinishMastership(pathname, pathnameSep, srcid);
00834     }
00835   } else {
00836     // #master-request is the grantid
00837     if (srcid != NULL) {
00838       // Case R3: Resume at step A5
00839       Repos::dprintf(DBG_MASTERSHIP, "RecoverMastership: case R3 (acknowledging)\n");
00840       err = AcknowledgeMastership(svs, srcid);
00841       if (err != VestaSource::ok && err != VestaSource::rpcFailure) {
00842         // Give up on recovery
00843         Repos::dprintf(DBG_MASTERSHIP,
00844                        "RecoverMastership: failed to acknowledge transfer: %s\n",
00845                        VestaSource::errorCodeString(err));
00846         (void) FinishMastership(pathname, pathnameSep, requestid);
00847         goto done;
00848       }
00849       err = FinishMastership(pathname, pathnameSep, srcid);
00850     } else {
00851       // Case R4:
00852       Repos::dprintf(DBG_MASTERSHIP,
00853                      "RecoverMastership: case R4 (finishing)\n");
00854       err = FinishMastership(pathname, pathnameSep, dstid);
00855     }
00856   }
00857 
00858  done:
00859   if (sroot) delete sroot;
00860   if (svs) delete svs;
00861   if (lock) lock->release();
00862   if (dvs) delete dvs;
00863   if (srcid) delete[] srcid;
00864   if (dstid) delete[] dstid;
00865   Repos::dprintf(DBG_MASTERSHIP, "RecoverMastership returns %s\n",
00866                  VestaSource::errorCodeString(err));      
00867   return err;
00868 }
00869 
00870 void
00871 QueueRecoverMastership(const char* pathname, char pathnameSep,
00872                        const char* requestid) throw ()
00873 {
00874   recoverMastershipMu.lock();
00875   RecoverMastershipInfo* info = NEW(RecoverMastershipInfo);
00876   info->pathname = strdup(pathname);
00877   info->pathnameSep = pathnameSep;
00878   info->requestid = strdup(requestid);
00879   // Add to front of list
00880   info->next = recoverMastershipList;
00881   info->prev = NULL;
00882   if (recoverMastershipList) recoverMastershipList->prev = info;
00883   recoverMastershipList = info;
00884   recoverMastershipMu.unlock();
00885   recoverMastershipCond.signal();
00886 }
00887 
00888 // Requires mutex
00889 void
00890 DequeueRecoverMastershipL(RecoverMastershipInfo* rminfo)
00891 {
00892   if (rminfo->prev) {
00893     rminfo->prev->next = rminfo->next;
00894   } else {
00895     recoverMastershipList = rminfo->next;
00896   }
00897   if (rminfo->next) {
00898     rminfo->next->prev = rminfo->prev;
00899   }
00900   free((void*)rminfo->pathname);
00901   free((void*)rminfo->requestid);
00902   delete rminfo;
00903 }
00904 
00905 void
00906 DequeueRecoverMastership(RecoverMastershipInfo* rminfo)
00907 {
00908   recoverMastershipMu.lock();
00909   DequeueRecoverMastershipL(rminfo);
00910   recoverMastershipMu.unlock();
00911 }
00912 
00913 static void*
00914 RecoverMastershipThread(void *arg) throw ()
00915 {
00916   VestaSource::errorCode err;
00917   struct timespec waittime;
00918   waittime.tv_sec = RECOVER_MASTERSHIP_SLEEP;
00919   waittime.tv_nsec = 0;
00920 
00921   recoverMastershipMu.lock();
00922   for (;;) {
00923     while (recoverMastershipList == NULL) {
00924       recoverMastershipCond.wait(recoverMastershipMu);
00925     }
00926     // Make one pass down the list.  New work can be added above where
00927     // we started while we are busy.
00928     RecoverMastershipInfo* first = recoverMastershipList;
00929     RecoverMastershipInfo* cur = first;
00930 
00931     while (cur) {
00932       // Allow for more work to be added
00933       recoverMastershipMu.unlock();
00934 
00935       // Process cur
00936       err = RecoverMastership(cur->pathname, cur->pathnameSep, cur->requestid);
00937 
00938       // Reacquire lock
00939       recoverMastershipMu.lock();
00940       RecoverMastershipInfo* next = cur->next;
00941       if (err != VestaSource::rpcFailure) {
00942         // Recovery is either done or hopeless
00943         DequeueRecoverMastershipL(cur);
00944       }
00945       cur = next;
00946     }
00947 
00948     if (recoverMastershipList == first) {
00949       // No new work has been added
00950       struct timespec abstime;      
00951 #if 0
00952       pthread_get_expiration_np(&waittime, &abstime);
00953 #else
00954       struct timeval now;
00955       gettimeofday(&now, NULL);
00956       abstime.tv_sec = now.tv_sec + waittime.tv_sec;
00957       abstime.tv_nsec = now.tv_usec * 1000 + waittime.tv_nsec;
00958       if (abstime.tv_nsec >= 1000000000) {
00959         abstime.tv_nsec -= 1000000000;
00960         abstime.tv_sec++;
00961       }
00962 #endif
00963       recoverMastershipCond.timedwait(recoverMastershipMu, &abstime);
00964     }
00965   }
00966 }
00967 
00968 
00969 // Unpack log record and requeue request on recoverMastershipList
00970 static void
00971 AcqmCallback(RecoveryReader* rr, char& c)
00972      throw(VestaLog::Error, VestaLog::Eof)
00973 {
00974   char* pathname;
00975   char sep[2];
00976   char *requestid;
00977 
00978   rr->getNewQuotedString(c, pathname);
00979   rr->getQuotedString(c, sep, sizeof(sep));
00980   rr->getNewQuotedString(c, requestid);
00981   QueueRecoverMastership(pathname, sep[0], requestid);
00982   delete[] pathname;
00983   delete[] requestid;
00984 }
00985 
00986 // Unpack log record and dequeue request from recoverMastershipList
00987 static void
00988 FinmCallback(RecoveryReader* rr, char& c)
00989      throw(VestaLog::Error, VestaLog::Eof)
00990 {
00991   char* pathname;
00992   char sep[2];
00993   char *finid;
00994 
00995   rr->getNewQuotedString(c, pathname);
00996   rr->getQuotedString(c, sep, sizeof(sep));
00997   rr->getNewQuotedString(c, finid);
00998 
00999   RecoverMastershipInfo* rminfo = recoverMastershipList;
01000   bool removed = false;
01001   while (rminfo && !removed) {
01002     RecoverMastershipInfo* next = rminfo->next;
01003     if (strcmp(pathname, rminfo->pathname) == 0 &&
01004         sep[0] == rminfo->pathnameSep &&
01005         strncmp(finid, rminfo->requestid, strlen(rminfo->requestid)) == 0) {
01006       DequeueRecoverMastership(rminfo);
01007       removed = true;
01008     }
01009     rminfo = next;
01010   }
01011   assert(removed);
01012 
01013   delete[] pathname;
01014   delete[] finid;
01015 }
01016 
01017 // Write log records for all unfinished recoveries into the checkpoint
01018 // file.  Would be nice to have a RecoveryWriter so we could factor
01019 // out the common code, sigh.
01020 void
01021 MastershipCheckpoint(fstream& ckpt) throw ()
01022 {
01023   recoverMastershipMu.lock();
01024   RecoverMastershipInfo* rminfo = recoverMastershipList;
01025   while (rminfo) {
01026     ckpt << "(acqm ";
01027     PutQuotedString(ckpt, rminfo->pathname);
01028     ckpt << " ";
01029     char sep[2];
01030     sep[0] = rminfo->pathnameSep;
01031     sep[1] = '\000';
01032     PutQuotedString(ckpt, sep);
01033     ckpt << " ";
01034     PutQuotedString(ckpt, rminfo->requestid);
01035     ckpt << ")\n";
01036 
01037     rminfo = rminfo->next;
01038   }
01039   recoverMastershipMu.unlock();
01040 }
01041 
01042 
01043 //
01044 // Helpers for cedeMastership
01045 //
01046 
01047 // Closure and callback for VestaSource::getAttribHistory to find the
01048 // timestamp of a particular name, value pair.
01049 
01050 struct AttribTimestampClosure {
01051   const char* name;
01052   const char* value;
01053   time_t timestamp;
01054 };
01055 
01056 static bool
01057 AttribTimestampCallback(void* closure, VestaSource::attribOp op,
01058                         const char* name, const char* value, time_t timestamp)
01059 {
01060   AttribTimestampClosure *cl = (AttribTimestampClosure *) closure;
01061   if ((op == VestaSource::opSet || op == VestaSource::opAdd) &&
01062       strcmp(name, cl->name) == 0 &&
01063       strcmp(value, cl->value) == 0) {
01064     cl->timestamp = timestamp;
01065     return false;
01066   } else {
01067     return true;
01068   }
01069 }
01070 
01071 // Closure and callback for VestaSources::list to update the hints on
01072 // children and form the arc/hint/ts/... list of a grantid.
01073 
01074 struct HintsClosure {
01075   VestaSource* vs;
01076   Text list;
01077   time_t now;
01078   Text nowtxt;
01079 };
01080 
01081 static bool
01082 HintsCallback(void* closure, VestaSource::typeTag type, Arc arc,
01083               unsigned int index, Bit32 pseudoInode, ShortId sid, bool master)
01084 {
01085   HintsClosure* cl = (HintsClosure*) closure;
01086   VestaSource* child;
01087   VestaSource::errorCode err = cl->vs->lookupIndex(index, child);
01088   assert(err == VestaSource::ok);
01089   if (master) {
01090     if(!myMasterHint.Empty())
01091       child->setAttrib("master-repository", myMasterHint.cchars(),
01092                        NULL, cl->now);
01093     cl->list = (cl->list + arc + "/" +
01094                 (myMasterHint.Empty()
01095                  ? myHostPort.cchars()
01096                  : myMasterHint.cchars()) + "/" +
01097                 cl->nowtxt + "/");
01098   } else {
01099     const char* hint = child->getAttrib("master-repository");
01100     time_t hintts;
01101     if (hint == NULL) {
01102       hint = "";
01103       hintts = 0;
01104     } else {
01105       AttribTimestampClosure atcl;
01106       atcl.name = "master-repository";
01107       atcl.value = hint;
01108       atcl.timestamp = 0;
01109       child->getAttribHistory(AttribTimestampCallback, &atcl);
01110       hintts = atcl.timestamp;
01111     }
01112     char hinttsChars[32];
01113     sprintf(hinttsChars, "%d", hintts);
01114     cl->list = cl->list + arc + "/" + hint + "/" + hinttsChars + "/";
01115   }
01116   delete child;
01117   return true;
01118 }
01119 
01120 // Do the following atomically: 
01121 //
01122 //  C1: Check that we have mastership for this object.  If not,
01123 //  return notMaster.  Also check that "who" has permission to take
01124 //  mastership.  (!!For now at least, use ownership.)
01125 //
01126 //  C2: Update master-repository hint and (if object is an appendable
01127 //  directory) those of its children.
01128 //
01129 //  C3: If object is an appendable directory, make a list of its
01130 //  children, their new master-repository hints, and the timestamp of
01131 //  each hint.  Append the list to requestid (separated by a space),
01132 //  and return the result as grantid.  If a child has no hint (i.e.,
01133 //  its master is unknown), return an empty hint string with timestamp
01134 //  0.  List syntax: arc1/hint1/t1/arc2/hint2/t2/.../arcn/hintn/tn/
01135 //
01136 //  C4: Record grantid in #master-grant attribute.
01137 //
01138 //  C5: Turn off master flag.
01139 //
01140 // The caller is responsible for freeing grantid.
01141 // Requires StableLock.write on entry.
01142 //
01143 VestaSource::errorCode
01144 VestaSource::cedeMastership(const char* requestid, const char **grantidOut,
01145                             AccessControl::Identity who)
01146      throw (SRPC::failure /*can't really happen*/)
01147 {
01148   VestaSource::errorCode err;
01149 
01150   if (Repos::isDebugLevel(DBG_MASTERSHIP)) {
01151     char buf[100];
01152     Repos::pr_nfs_fh(buf, (nfs_fh*) longid.value.byte);
01153     Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: longid %s, requestid \"%s\"\n",
01154                    buf, requestid);
01155   }
01156 
01157   // Step C1: checking
01158   if (!RootLongId.isAncestorOf(longid))
01159     return VestaSource::invalidArgs;
01160   if (!master) return VestaSource::notMaster;
01161   const char* tstmp = strchr(requestid, ' ');
01162   if (tstmp == NULL) {
01163     // Bad requestid
01164     Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: bad requestid \"%s\"\n",
01165                    requestid);
01166     return VestaSource::invalidArgs;
01167   }
01168   tstmp++;
01169   const char* srcHP = strchr(tstmp, ' ');
01170   if (srcHP == NULL) {
01171     // Bad requestid
01172     Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: bad requestid \"%s\"\n",
01173                    requestid);
01174     return VestaSource::invalidArgs;
01175   }
01176   srcHP++;
01177   const char* dstHP = strrchr(srcHP, ' ');
01178   if (dstHP == NULL) {
01179     // Bad requestid
01180     Repos::dprintf(DBG_MASTERSHIP, "cedeMastership: bad requestid \"%s\"\n",
01181                    requestid);
01182     return VestaSource::invalidArgs;
01183   }  
01184   dstHP++;
01185   int srclen = dstHP - srcHP - 1;
01186   // If srcHP matches neither myHostPort nor myMasterHint, this is an
01187   // invalid request.
01188   if (!(srclen == myHostPort.Length() &&
01189         strncmp(srcHP, myHostPort.cchars(), srclen) == 0) &&
01190       !(!myMasterHint.Empty() &&
01191         srclen == myMasterHint.Length() &&
01192         strncmp(srcHP, myMasterHint.cchars(), srclen) == 0)) {
01193     // Bad srcHost:srcPort in requestid
01194     if(myMasterHint.Empty())
01195       Repos::dprintf(DBG_MASTERSHIP,
01196                      "cedeMastership: bad requestid \"%s\"; srcHostPort != %s\n",
01197                      requestid, myHostPort.cchars());
01198     else
01199       Repos::dprintf(DBG_MASTERSHIP,
01200                      "cedeMastership: bad requestid \"%s\"; srcHostPort != %s or %s\n",
01201                      requestid, myHostPort.cchars(), myMasterHint.cchars());
01202     return VestaSource::invalidArgs;
01203   }
01204   if (!ac.check(who, AccessControl::ownership) ||
01205       !MastershipAccessCheck(this, "#mastership-to", dstHP)) {
01206     return VestaSource::noPermission;
01207   }
01208 
01209   // Atomically group log entries
01210   VRLog.start();
01211 
01212   // Steps C2 and C3: Update master-repository hints, form grantid
01213   HintsClosure cl;
01214   cl.vs = this;
01215   cl.list = " ";
01216   cl.now = time(NULL);
01217   char nowtxt[32];
01218   sprintf(nowtxt, "%d", cl.now);
01219   cl.nowtxt = nowtxt;
01220 
01221   err = setAttrib("master-repository", dstHP, NULL, cl.now);
01222   assert(err == VestaSource::ok);
01223   if (type == VestaSource::appendableDirectory) {
01224     err = list(0, HintsCallback, &cl);
01225     assert(err == VestaSource::ok);
01226   }
01227 
01228   int reqlen = strlen(requestid);
01229   char* grantid = NEW_PTRFREE_ARRAY(char, reqlen + cl.list.Length() + 1);
01230   strcpy(grantid, requestid);
01231   strcpy(grantid + reqlen, cl.list.cchars());
01232 
01233   // Step C4: record grantid
01234   err = addAttrib("#master-grant", grantid, NULL, cl.now);
01235   assert(err == VestaSource::ok);
01236 
01237   // Step C5: turn off master flag
01238   err = setMaster(false);
01239   assert(err == VestaSource::ok);
01240 
01241   // Done
01242   Repos::dprintf(DBG_MASTERSHIP, "cedeMastership succeeded, grantid \"%s\"\n",
01243                  grantid);
01244   *grantidOut = grantid;
01245   VRLog.commit();
01246   return VestaSource::ok;
01247 }
01248 
01249 void 
01250 MastershipInit1() throw ()
01251 {
01252   myHostPort =
01253     VDirSurrogate::defaultHost() + ":" + VDirSurrogate::defaultPort();
01254   // Sanity check on the default host:port.  (Note: some sanity checks
01255   // have already been performed in VDirSurrogate_init.)
01256   if(myHostPort.FindChar('/') != -1)
01257     {
01258       Repos::dprintf(DBG_ALWAYS,
01259                      "Fatal: Default repository host:port contains a slash\n");
01260       abort();
01261     }
01262   if(myHostPort.FindChar(' ') != -1)
01263     {
01264       Repos::dprintf(DBG_ALWAYS,
01265                      "Fatal: Default repository host:port contains a space\n");
01266       abort();
01267     }
01268   // Default myMasterHint to myHostPort, but allow a config variable
01269   // to override it.
01270   myMasterHint = myHostPort;
01271   bool defaultMasterHint = true;
01272   try
01273     {
01274       defaultMasterHint =
01275         !VestaConfig::get("Repository", "master_hint", myMasterHint);
01276     }
01277    catch (VestaConfig::failure f)
01278      {
01279        // Probably can't actually happen, as config file parsing
01280        // errors would have ben caught in VDirSurrogate_init.
01281        cerr << "VestaConfig::failure " << f.msg << endl;
01282        abort();
01283      }
01284   // If we have a non-default non-empty master hint, make sure it's
01285   // OK.
01286   if(!defaultMasterHint && !myMasterHint.Empty())
01287     {
01288       bool badMasterHint = false;
01289       // The master hint must not contain a slash
01290       if(myMasterHint.FindChar('/') != -1)
01291         {
01292           Repos::dprintf(DBG_ALWAYS,
01293                          "Fatal: [Repository]master_hint contains a slash\n");
01294           badMasterHint = true;
01295         }
01296       // The master hint must not contain a space
01297       if(myMasterHint.FindChar(' ') != -1)
01298         {
01299           Repos::dprintf(DBG_ALWAYS,
01300                          "Fatal: [Repository]master_hint contains a space\n");
01301           badMasterHint = true;
01302         }
01303       // The master hint must contain a colon
01304       int colonPos = myMasterHint.FindChar(':');
01305       if(colonPos == -1)
01306         {
01307           Repos::dprintf(DBG_ALWAYS,
01308                          "Fatal: [Repository]master_hint doesn't contain a colon\n");
01309           badMasterHint = true;
01310         }
01311       else
01312         {
01313           // Check that the part after the colon is numeric and
01314           // non-empty.
01315           for(int i = colonPos+1; i < myMasterHint.Length(); i++)
01316             {
01317               if(!isdigit(myMasterHint[i]))
01318                 {
01319                   Repos::dprintf(DBG_ALWAYS,
01320                                  "Fatal: Non-digit after colon in [Repository]master_hint\n");
01321                   badMasterHint = true;
01322                   break;
01323                 }
01324             }
01325         }
01326       if(badMasterHint)
01327         abort();
01328     }
01329   // If we have a default master hint, check to see if it's the
01330   // loopback address.
01331   if(defaultMasterHint)
01332     {
01333       try
01334         {
01335           in_addr my_addr =
01336             TCP_sock::host_to_addr(VDirSurrogate::defaultHost());
01337           cout << "Repository host resolves to " << inet_ntoa_r(my_addr)
01338                << endl;
01339           if((my_addr.s_addr &  htonl(0xff000000)) ==
01340              htonl(127 << 24))
01341             {
01342               // The repository is in loopback space: turn off
01343               // master-repository hint attributes.
01344               Repos::dprintf(DBG_ALWAYS,
01345                              "Warning: Repository address is in 127.0.0.0/8; "
01346                              "master-repository hint attributes will not be set\n");
01347               myMasterHint="";
01348             }
01349         }
01350       catch(...)
01351         {
01352           // Can't resolve the hostname of the repository!  
01353           Repos::dprintf(DBG_ALWAYS,
01354                          "Fatal: Can't resolve the repository's host (%s)!\n",
01355                          VDirSurrogate::defaultHost().cchars());
01356           abort();
01357         }
01358     }
01359 
01360   RegisterRecoveryCallback("acqm", AcqmCallback);
01361   RegisterRecoveryCallback("finm", FinmCallback);
01362 }
01363 
01364 void 
01365 MastershipInit2() throw ()
01366 {
01367   Basics::thread_attr recoverMastership_attr;
01368 #if defined (_POSIX_THREAD_PRIORITY_SCHEDULING) && !defined(__linux__)
01369   // Linux only allows the superuser to use SCHED_RR
01370   recoverMastership_attr.set_schedpolicy(SCHED_RR);
01371   recoverMastership_attr.set_inheritsched(PTHREAD_EXPLICIT_SCHED);
01372   recoverMastership_attr.set_sched_priority(sched_get_priority_min(SCHED_RR));
01373 #endif
01374 
01375   recoverMastershipTh.fork(RecoverMastershipThread, NULL,
01376                            recoverMastership_attr);
01377 }

Generated on Mon May 8 00:48:45 2006 for Vesta by  doxygen 1.4.2