Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

Replication.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // 
00020 // Replication.C
00021 // Last modified on Wed Apr 20 09:44:33 EDT 2005 by ken@xorian.net
00022 //      modified on Tue Aug  7 17:40:51 PDT 2001 by mann
00023 //
00024 // Implements Replicate()
00025 //
00026 
00027 #include "VestaSource.H"
00028 #include "VDirSurrogate.H"
00029 #include "VRConcurrency.H"
00030 #include "Mastership.H"
00031 #include "Replication.H"
00032 #include "UniqueId.H"
00033 #include "logging.H"
00034 #include "FPShortId.H"
00035 
00036 #include "lock_timing.H"
00037 
00038 #include <FdStream.H>
00039 
00040 using FS::OFdStream;
00041 
00042 /*#define COPY_SIZE 8192*/
00043 #define COPY_SIZE (128*1024)
00044 #define REPLICATOR_DIR ".replicator"
00045 
00046 //
00047 // Check whether this repository is permitted to take a replica of an
00048 // object from the specified repository.  The vs parameter is either
00049 // the existing local copy of the object (perhaps a stub or ghost), or
00050 // the object's parent directory if no such copy exists.
00051 // Implementation: search upward for the first "which" attribute; if
00052 // found, return true if it lists the source repository or "*".
00053 // If not, or none found, return false.
00054 //
00055 bool
00056 ReplicationAccessCheck(VestaSource* vs, const char* which,
00057                        const char* srcHost, const char* srcPort) throw ()
00058 {
00059   bool free_vs = false;
00060   for (;;) {
00061     if (vs == NULL) return false;
00062     if (vs->getAttribConst(which)) break;
00063     VestaSource *vs_parent = vs->longid.getParent().lookup();
00064     if(free_vs) 
00065       delete vs;
00066     vs = vs_parent;
00067     // We allocated vs, we need to free it.
00068     free_vs = true;
00069   }
00070   char hostport[2*MAX_ARC_LEN+2];
00071   sprintf(hostport, "%s:%s", srcHost, srcPort);
00072   bool result = vs->inAttribs(which, hostport) || vs->inAttribs(which, "*");
00073   if(free_vs)
00074     delete vs;
00075   return result;
00076 }
00077 
00078 // We use this to remember hosts that didn't seem to support readWhole
00079 // when we tried it previously.
00080 static Table<Text, time_t>::Default bad_readWhole_peers;
00081 
00082 // We'll refrain from trying readWhole for an hour after it fails
00083 #define BAD_READWHOLE_TTL (60*60)
00084 
00085 static bool Should_readWhole(const Text &host, const Text &port)
00086 {
00087   Text hostport = host;
00088   hostport += ":";
00089   hostport += port;
00090 
00091   time_t failed;
00092 
00093   // Did an attempt to call readWhole on this peer fail in the past?
00094   if(bad_readWhole_peers.Get(hostport, failed))
00095     {
00096       time_t now = time(0);
00097       // Was it less than the TTL ago?
00098       if((now - failed) < BAD_READWHOLE_TTL)
00099         return false;
00100     }
00101 
00102   return true;
00103 }
00104 
00105 static void Failed_readWhole(const Text &host, const Text &port)
00106 {
00107   Text hostport = host;
00108   hostport += ":";
00109   hostport += port;
00110 
00111   time_t now = time(0);
00112 
00113   // Remember that this peer had problems with readWhole
00114   (void) bad_readWhole_peers.Put(hostport, now);
00115 }
00116 
00117 //
00118 // Get a local shortid for a copy of the remote immutableFile svs.
00119 // This can be either an existing shortid with the same fingerprint,
00120 // or a newly created copy.
00121 //
00122 // !How to properly protect this sid from weeding?  Perhaps should
00123 // enter this routine with a lock held, release it if copying is
00124 // needed, then reacquire it.  The repository's internal short-term
00125 // lease would protect the sid between the end of the copying and the
00126 // reacquisition.
00127 //
00128 static VestaSource::errorCode
00129 ReplicateImmFile(VestaSource* svs, AccessControl::Identity swho,
00130                  ShortId* sid) throw ()
00131 {
00132   ShortId newsid;
00133   VestaSource::errorCode err;
00134 
00135   // Check for existing copy
00136   newsid = GetFPShortId(svs->fptag);
00137   if (newsid != NullShortId) {
00138     *sid = newsid;
00139     return VestaSource::ok;
00140   }
00141 
00142   // Make new copy
00143   int newfd;
00144   try
00145     {
00146       newfd = SourceOrDerived::fdcreate(newsid);
00147     }
00148   catch (SourceOrDerived::Fatal f)
00149     {
00150       Repos::dprintf(DBG_ALWAYS,
00151                      "ReplicateImmFile: file creation error: %s "
00152                      "(maybe the repository server is running as "
00153                      "the wrong user?)\n",
00154                      f.msg.cchars());
00155 
00156       // Really it would be better to return some kind of "internal
00157       // error" here.  We use noPermission because the only case where
00158       // this can happen currently is if the repository is denied
00159       // permission to create a new shortid, which may mean that it's
00160       // running as the wrong user.
00161       return VestaSource::noPermission;
00162     }
00163   if (newfd < 0) {
00164     int saved_errno = errno;
00165     Text etxt = Basics::errno_Text(saved_errno);
00166     Repos::dprintf(DBG_REPLICATION,
00167                    "ReplicateImmFile: file creation error: %s\n", etxt.cchars());
00168     *sid = NullShortId;
00169     return Repos::errno_to_errorCode(saved_errno);
00170   }
00171 
00172   int res;
00173   bool did_readWhole = false;
00174 
00175   if(Should_readWhole(svs->host(), svs->port()))
00176     try
00177       {
00178         // Try the "send whole file" call (which also compresses on the
00179         // wire).
00180         OFdStream new_stream(newfd);
00181         err = svs->readWhole(new_stream, swho);
00182         did_readWhole = true;
00183       }
00184     catch(SRPC::failure f)
00185       {
00186         if((f.r == SRPC::version_skew) &&
00187            (f.msg == "VestaSourceSRPC: Unknown proc_id"))
00188           {
00189             // We seem to be talking to an older repository without
00190             // support for readWhole.  Fall back on the older (slow)
00191             // method.  Remember that this peer doesn't support
00192             // readWhole so we won't try it again for a while.
00193             Repos::dprintf(DBG_REPLICATION,
00194                            "ReplicateImmFile: %s:%s doesn't seem to support "
00195                            "readWhole, falling back to old method\n",
00196                            svs->host().cchars(), svs->port().cchars());
00197             Failed_readWhole(svs->host(), svs->port());
00198           
00199           }
00200         else
00201           {
00202             // Treat other failures as fatal.  (zlib problems could even
00203             // have given us a partially written or currupted file in
00204             // the new shortid.)
00205             Repos::dprintf(DBG_REPLICATION,
00206                            "ReplicateImmFile: readWhole failed at %s:%s: %s\n",
00207                            svs->host().cchars(), svs->port().cchars(),
00208                            f.msg.cchars());
00209             do
00210               res = close(newfd);
00211             while ((res == -1) && (errno == EINTR));
00212             *sid = NullShortId;
00213             return VestaSource::rpcFailure;
00214           }
00215       }
00216 
00217   // If we should fall back on the old method..
00218   if(!did_readWhole)
00219     {
00220       try
00221         {
00222           Basics::uint64 n = svs->size();
00223           Basics::uint64 offset = 0;
00224           while (n > 0) {
00225             char buf[COPY_SIZE];
00226             int len = COPY_SIZE;
00227             if (len > n) len = n;
00228             err = svs->read(buf, &len, offset, swho);
00229             if (err != VestaSource::ok) {
00230               Repos::dprintf(DBG_REPLICATION,
00231                              "ReplicateImmFile: file read error: %s\n",
00232                              VestaSource::errorCodeString(err));
00233               do
00234                 res = close(newfd);
00235               while ((res == -1) && (errno == EINTR));
00236               *sid = NullShortId;
00237               return err;
00238             }
00239             if (len == 0) break;
00240             do
00241               len = write(newfd, buf, len);
00242             while((len == -1) && (errno == EINTR));
00243             if (len < 0) {
00244               int saved_errno = errno;
00245               Text etxt = Basics::errno_Text(saved_errno);
00246               Repos::dprintf(DBG_REPLICATION,
00247                              "ReplicateImmFile: file write error: %s\n",
00248                              etxt.cchars());
00249               do
00250                 res = close(newfd);
00251               while ((res == -1) && (errno == EINTR));
00252               *sid = NullShortId;
00253               return Repos::errno_to_errorCode(saved_errno);
00254             }
00255             n -= len;
00256             offset += len;
00257           }
00258         }
00259       catch (SRPC::failure f)
00260         {
00261           Repos::dprintf(DBG_REPLICATION,
00262                          "ReplicateImmFile: file read SRPC failure: %s\n",
00263                          f.msg.cchars());
00264           do
00265             res = close(newfd);
00266           while ((res == -1) && (errno == EINTR));
00267           *sid = NullShortId;
00268           return VestaSource::rpcFailure;
00269         }
00270     }
00271 
00272   // Set shortid file mode, timestamp, etc.
00273 
00274   struct stat st;
00275   res = fstat(newfd, &st);
00276   assert(res != -1);
00277   st.st_mode &= ~0222;  // make immutable
00278   if (svs->executable()) {
00279     st.st_mode |= 0111;  // make executable
00280   }
00281   res = fchmod(newfd, st.st_mode);
00282   assert(res != -1);
00283   // Before closing the file, flush all the data we just wrote to disk.
00284   res = fsync(newfd);
00285   assert(res != -1);
00286   do
00287     res = close(newfd);
00288   while ((res == -1) && (errno == EINTR));
00289   assert(res != -1);
00290   time_t ts = svs->timestamp();
00291   char *path = ShortIdBlock::shortIdToName(newsid);
00292   struct timeval tvp[2];
00293   tvp[0].tv_sec = ts;
00294   tvp[0].tv_usec = 0;
00295   tvp[1].tv_sec = ts;
00296   tvp[1].tv_usec = 0;
00297   res = utimes(path, tvp);
00298   assert(res != -1);
00299   *sid = newsid;
00300   delete [] path;
00301   return VestaSource::ok;
00302 }
00303 
00304 struct ReplicateImmDirClosure {
00305   VestaSource* spar;
00306   AccessControl::Identity swho;
00307   VestaSource* mpar;
00308   VestaSource::errorCode err;
00309 };
00310 
00311 /*forward*/ static VestaSource::errorCode
00312 ReplicateImmDir(VestaSource* svs, AccessControl::Identity swho,
00313                 VestaSource* mpar, Arc arc, VestaSource** mvsret) throw ();
00314 
00315 static bool
00316 ReplicateImmDirCallback(void* closure, VestaSource::typeTag type,
00317                         Arc arc, unsigned int index, Bit32 pseudoInode,
00318                         ShortId filesid, bool master) throw ()
00319 {
00320   ReplicateImmDirClosure* cl = (ReplicateImmDirClosure*) closure;
00321   VestaSource* svs = NULL;
00322   VestaSource* mvs = NULL;
00323   bool ret = false;
00324   ReadersWritersLock* lock = NULL;
00325 
00326   if (type != VestaSource::deleted) {
00327     try {
00328       cl->err = cl->spar->lookupIndex(index, svs);
00329     } catch (SRPC::failure f) {
00330       Repos::dprintf(DBG_REPLICATION,
00331                      "ReplicateImmDir: lookupIndex SRPC failure: %s\n",
00332                      f.msg.cchars());
00333       cl->err = VestaSource::rpcFailure;
00334       goto done;
00335     }
00336     if (cl->err != VestaSource::ok) {
00337       Repos::dprintf(DBG_REPLICATION,
00338                      "ReplicateImmDirCallback: lookupIndex error: %s\n",
00339                      VestaSource::errorCodeString(cl->err));
00340       goto done;
00341     }
00342   }
00343 
00344   switch (type) {
00345   case VestaSource::immutableFile: {
00346     ShortId sid;
00347     cl->err = ReplicateImmFile(svs, cl->swho, &sid);
00348     if (cl->err != VestaSource::ok) {
00349       goto done;
00350     }
00351     // Need to relookup mpar here since lock was not held
00352     VestaSource* nmpar = cl->mpar->longid.lookup(LongId::writeLock, &lock);
00353     RWLOCK_LOCKED_REASON(lock,
00354                          "ReplicateImmDirCallback (relookup mpar for immutableFile)");
00355     if (nmpar == NULL) {
00356       Repos::dprintf(DBG_REPLICATION,
00357                      "ReplicateImmDirCallback: mutable parent relookup error\n");
00358       cl->err = VestaSource::notFound;
00359       goto done;
00360     }
00361     cl->err = nmpar->insertFile(arc, sid, false, NULL,
00362                                 VestaSource::replaceDiff,
00363                                 NULL, svs->timestamp(), &svs->fptag);
00364     delete nmpar;
00365     if (cl->err != VestaSource::ok) {
00366       Repos::dprintf(DBG_REPLICATION,
00367                      "ReplicateImmDirCallback: insert file error %s\n",
00368                      VestaSource::errorCodeString(cl->err));
00369       goto done;
00370     }
00371     break; }
00372 
00373   case VestaSource::immutableDirectory: {
00374     cl->err = ReplicateImmDir(svs, cl->swho, cl->mpar, arc, &mvs);
00375     if (cl->err != VestaSource::ok) {
00376       goto done;
00377     }
00378 
00379     // Make the replica immutable and set its fingerprint
00380     // Need to relookup both cl->mpar and mvs
00381     VestaSource* nmpar = cl->mpar->longid.lookup(LongId::writeLock, &lock);
00382     RWLOCK_LOCKED_REASON(lock,
00383                          "ReplicateImmDirCallback (make immutable, set fingerprint)");
00384     if (nmpar == NULL) {
00385       cl->err = VestaSource::notFound;
00386       Repos::dprintf(DBG_REPLICATION,
00387                      "ReplicateImmDirCallback: mutable parent relookup error: %s\n", 
00388                      VestaSource::errorCodeString(cl->err));
00389       goto done;
00390     }
00391     VestaSource* nmvs = mvs->longid.lookup(LongId::checkLock, &lock);
00392     if (nmvs == NULL) {
00393       cl->err = VestaSource::notFound;
00394       Repos::dprintf(DBG_REPLICATION,
00395                      "ReplicateImmDir: mutable child relookup error: %s\n", 
00396                      VestaSource::errorCodeString(cl->err));
00397       delete nmpar;
00398       goto done;
00399     }
00400     cl->err = nmpar->insertImmutableDirectory(arc, nmvs, false, NULL,
00401                                               VestaSource::replaceDiff, NULL,
00402                                               svs->timestamp(), &svs->fptag);
00403     delete nmpar;
00404     delete nmvs;
00405     if (cl->err != VestaSource::ok) {
00406       Repos::dprintf(DBG_REPLICATION,
00407                      "ReplicateImmDirCallback: reinsert directory error: %s\n",
00408                      VestaSource::errorCodeString(cl->err));
00409       goto done;
00410     }
00411     break; }
00412 
00413   case VestaSource::deleted: {
00414     // Need to relookup mpar here since lock was not held
00415     VestaSource* nmpar = cl->mpar->longid.lookup(LongId::writeLock, &lock);
00416     RWLOCK_LOCKED_REASON(lock, "ReplicateImmDirCallback (relookup mpar for deleted)");
00417     if (nmpar == NULL) {
00418       Repos::dprintf(DBG_REPLICATION,
00419                      "ReplicateImmDirCallback: mutable parent relookup error\n");
00420       cl->err = VestaSource::notFound;
00421       goto done;
00422     }
00423     cl->err = nmpar->reallyDelete(arc, NULL, false);
00424     delete nmpar;
00425     if (cl->err != VestaSource::ok) {
00426       Repos::dprintf(DBG_REPLICATION,
00427                      "ReplicateImmDirCallback: deletion error %s\n",
00428                      VestaSource::errorCodeString(cl->err));
00429       goto done;
00430     }
00431     break;}
00432 
00433   default:
00434     assert(false);
00435   }
00436   ret = true;
00437  done:
00438   if (svs) delete svs;
00439   if (mvs) delete mvs;
00440   if (lock) lock->release();
00441   return ret;
00442 }
00443 
00444 //
00445 // Insert a copy of immutable directory svs into mpar under the name
00446 // arc, and return a VestaSource object for it in mvs.
00447 // Lock is not held on entry; it is acquired and released as needed.
00448 //
00449 static VestaSource::errorCode
00450 ReplicateImmDir(VestaSource* svs, AccessControl::Identity swho,
00451                 VestaSource* mpar, Arc arc, VestaSource** mvsret) throw ()
00452 {
00453   VestaSource::errorCode err = VestaSource::ok;
00454   VestaSource* evs = NULL;
00455   VestaSource* mvs = NULL;
00456   ReadersWritersLock* lock = &StableLock;
00457   VestaSource* dbase = NULL;
00458 
00459   lock->acquireWrite();
00460   RWLOCK_LOCKED_REASON(lock, "ReplicateImmDir (use existing)");
00461   // Look for existing copy of directory
00462   ShortId sid = GetFPShortId(svs->fptag);
00463   if (sid != NullShortId) {
00464     // Insert existing copy of directory
00465     evs = LongId::fromShortId(sid).lookup(LongId::checkLock, &lock);
00466     assert(evs != NULL);
00467     // Need to relookup mpar here since lock was not held
00468     VestaSource* nmpar = mpar->longid.lookup(LongId::checkLock, &lock);
00469     if (nmpar == NULL) {
00470       err = VestaSource::notFound;
00471       Repos::dprintf(DBG_REPLICATION,
00472                      "ReplicateImmDir: mutable parent relookup error: %s\n", 
00473                      VestaSource::errorCodeString(err));
00474       goto done;
00475     }
00476     err = nmpar->insertImmutableDirectory(arc, evs, false, NULL,
00477                                           VestaSource::replaceDiff,
00478                                           &mvs, svs->timestamp(), &svs->fptag);
00479     delete nmpar;
00480   } else {
00481     // Copy the directory 
00482 
00483     // Does source have a base?
00484     VestaSource* sbase;
00485     // Release lock while doing remote operation
00486     lock->release();
00487     lock = NULL;
00488     try {
00489       err = svs->getBase(sbase, swho);
00490     } catch (SRPC::failure f) {
00491       Repos::dprintf(DBG_REPLICATION,
00492                      "ReplicateImmDir: getBase SRPC failure: %s\n", f.msg.cchars());
00493       err = VestaSource::rpcFailure;
00494       goto done;
00495     }
00496     lock = &StableLock;
00497     lock->acquireWrite();
00498     RWLOCK_LOCKED_REASON(lock, "ReplicateImmDir (copy)");
00499     if (err == VestaSource::ok) {
00500       // Yes, check to see if we have a copy of the base already
00501       sid = GetFPShortId(sbase->fptag);
00502       delete sbase;
00503       if (sid != NullShortId) {
00504         // Yes, set dbase to it
00505         dbase = LongId::fromShortId(sid).lookup(LongId::checkLock, &lock);
00506         assert(dbase != NULL);
00507       }
00508     }
00509     // Need to relookup mpar here since lock was not held
00510     VestaSource* nmpar = mpar->longid.lookup(LongId::checkLock, &lock);
00511     if (nmpar == NULL) {
00512       err = VestaSource::notFound;
00513       Repos::dprintf(DBG_REPLICATION,
00514                      "ReplicateImmDir: mutable parent relookup error: %s\n", 
00515                      VestaSource::errorCodeString(err));
00516       goto done;
00517     }
00518     err = nmpar->insertMutableDirectory(arc, dbase, false, NULL,
00519                                         VestaSource::replaceDiff, &mvs,
00520                                         svs->timestamp());
00521     delete nmpar;
00522     if (err != VestaSource::ok) {
00523       Repos::dprintf(DBG_REPLICATION, "ReplicateImmDir: insert directory error: %s\n",
00524                      VestaSource::errorCodeString(err));
00525       goto done;
00526     }
00527 
00528     // Release lock while doing remote operation
00529     lock->release();
00530     lock = NULL;
00531 
00532     // Copy the children
00533     ReplicateImmDirClosure cl;
00534     cl.spar = svs;
00535     cl.swho = swho;
00536     cl.mpar = mvs;
00537     cl.err = VestaSource::ok;
00538     try {
00539       err = svs->list(0, ReplicateImmDirCallback, &cl, swho, (dbase != NULL));
00540     } catch (SRPC::failure f) {
00541       Repos::dprintf(DBG_REPLICATION,
00542                      "ReplicateImmDir: list directory SRPC failure: %s\n",
00543                      f.msg.cchars());
00544       err = VestaSource::rpcFailure;
00545       goto done;
00546     }
00547     if (err != VestaSource::ok) {
00548       Repos::dprintf(DBG_REPLICATION, "ReplicateImmDir: list directory error: %s\n",
00549                      VestaSource::errorCodeString(err));
00550       goto done;
00551     }
00552     if (cl.err != VestaSource::ok) {
00553       err = cl.err;
00554       goto done;
00555     }
00556   }
00557  done:
00558   if (lock) lock->release();
00559   if (evs) delete evs;
00560   if (dbase) delete dbase;
00561   if (err == VestaSource::ok) {
00562     *mvsret = mvs;
00563   } else {
00564     if (mvs) delete mvs;
00565   }
00566   return err;
00567 }
00568 
00569 static bool
00570 ReplicationCleanupCallback(void* closure, VestaSource::typeTag type,
00571                            Arc arc, unsigned int index, Bit32 pseudoInode,
00572                            ShortId filesid, bool master) throw ()
00573 {
00574   VestaSource* mpar = (VestaSource*) closure;
00575   mpar->reallyDelete(arc);
00576   return true;
00577 }
00578 
00579 // Called during repository initialization, after recovery but before
00580 // VestaSourceRPC interface is exported.  Deletes any mutable
00581 // directories left over from unfinished Replicate calls.
00582 void
00583 ReplicationCleanup() throw ()
00584 {
00585   VestaSource* mroot;
00586   VestaSource* mpar;
00587   ReadersWritersLock* lock;
00588   VestaSource::errorCode err;
00589   
00590   mroot = VestaSource::mutableRoot(LongId::writeLock, &lock);
00591   RWLOCK_LOCKED_REASON(lock, "ReplicationCleanup");
00592   err = mroot->lookup(REPLICATOR_DIR, mpar, NULL);
00593   if (err == VestaSource::ok) {
00594     mpar->list(0, ReplicationCleanupCallback, mpar);
00595   }
00596   lock->release();
00597   delete mpar;
00598 }
00599 
00600 VestaSource::errorCode
00601 Replicate(const char* pathname, bool asStub, bool asGhost,
00602           const char* srcHost, const char* srcPort, char pathnameSep,
00603           AccessControl::Identity dwho, AccessControl::Identity swho) throw ()
00604 {
00605   if (asStub && asGhost) return VestaSource::invalidArgs;
00606   VestaSource::errorCode err = VestaSource::ok;
00607   VestaSource* droot = NULL;
00608   VestaSource* dpar = NULL;
00609   VestaSource* dvs = NULL;
00610   VestaSource* sroot = NULL;
00611   VestaSource* svs = NULL;
00612   VestaSource* mroot = NULL;
00613   VestaSource* mpar = NULL;
00614   VestaSource* mvs = NULL;
00615   char mname[FP::ByteCnt*2+1];
00616   ShortId sid = NullShortId;
00617   ReadersWritersLock* lock = NULL;  
00618 
00619   //
00620   // Split pathname into head (name of parent) and tail (new arc)
00621   //
00622   char* head = strdup(pathname);
00623   const char* tail;
00624   char* p = strrchr(head, pathnameSep);
00625   if (p) {
00626     // Head is nonempty
00627     *p = '\000';
00628     tail = p + 1;
00629   } else {
00630     // Head is empty
00631     *head = '\000';
00632     tail = pathname;
00633   }
00634 
00635   //
00636   // Initial lookup of dpar and dvs under read lock to check for
00637   // permissions and simple caller errors.
00638   //
00639   droot = VestaSource::repositoryRoot(LongId::readLock, &lock);
00640   RWLOCK_LOCKED_REASON(lock, "Replicate (initial lookup, permissions check)");
00641   if (*head == '\000') {
00642     dpar = droot;
00643   } else {
00644     err = droot->lookupPathname(head, dpar, dwho, pathnameSep);
00645     if (err != VestaSource::ok) {
00646       Repos::dprintf(DBG_REPLICATION,
00647                      "Replicate: \"%s\" destination parent lookup error: %s\n", 
00648                      pathname, VestaSource::errorCodeString(err));
00649       goto done;
00650     }
00651   }
00652   err = dpar->lookup(tail, dvs, dwho);
00653   if (err != VestaSource::ok && err != VestaSource::notFound) {
00654     Repos::dprintf(DBG_REPLICATION,
00655                    "Replicate: \"%s\" old object lookup error: %s\n", 
00656                    pathname, VestaSource::errorCodeString(err));
00657     goto done;
00658   }    
00659 
00660   //
00661   // Check for caller errors
00662   //
00663   if (dpar->type == VestaSource::immutableDirectory) {
00664     err = VestaSource::inappropriateOp;
00665     Repos::dprintf(DBG_REPLICATION,
00666                    "Replicate: \"%s\" destination parent is immutable\n", pathname);
00667     goto done;
00668   }
00669   if (dvs) {
00670     if (asStub || asGhost ||
00671         (dvs->type != VestaSource::stub && dvs->type != VestaSource::ghost) ||
00672         (dvs->type == VestaSource::stub && dvs->master)) {
00673       err = VestaSource::nameInUse;
00674       Repos::dprintf(DBG_REPLICATION,
00675                      "Replicate: \"%s\" destination object exists\n", pathname);
00676       goto done;
00677     }
00678   }
00679 
00680   //
00681   // Check permission
00682   //
00683   if (!ReplicationAccessCheck((dvs ? dvs : dpar),
00684                               "#replicate-from", srcHost, srcPort) &&
00685       !ReplicationAccessCheck((dvs ? dvs : dpar),
00686                               "#replicate-from-noac", srcHost, srcPort)) {
00687     Repos::dprintf(DBG_REPLICATION,
00688                    "Replicate: \"%s\" no permission to replicate from %s:%s\n",
00689                    pathname, srcHost, srcPort);
00690     err = VestaSource::noPermission;
00691     goto done;
00692   }
00693 
00694   //
00695   // Release lock during remote operations
00696   //
00697   if (dpar != droot) delete dpar;
00698   dpar = NULL;
00699   if (dvs) delete dvs;
00700   dvs = NULL;
00701   lock->release();
00702   lock = NULL;
00703 
00704   //
00705   // Look up the object in src, yielding svs.
00706   //
00707   try {
00708     sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00709     err = sroot->lookupPathname(pathname, svs, swho, pathnameSep);
00710 
00711   } catch (SRPC::failure f) {
00712     Repos::dprintf(DBG_REPLICATION,
00713                    "Replicate: \"%s\" initial RPC to %s:%s failed: %s\n", 
00714                    pathname, srcHost, srcPort, f.msg.cchars());
00715     err = VestaSource::rpcFailure;
00716     goto done;
00717   }
00718   if (err != VestaSource::ok) {
00719     Repos::dprintf(DBG_REPLICATION, "Replicate: \"%s\" remote lookup failed: %s\n", 
00720                    pathname, VestaSource::errorCodeString(err));
00721     goto done;
00722   }
00723   
00724   //
00725   // Determine type of new object in dst
00726   //
00727   VestaSource::typeTag ntype;
00728   if (asStub) {
00729     ntype = VestaSource::stub;
00730   } else if (asGhost) {
00731     ntype = VestaSource::ghost;
00732   } else {
00733     ntype = svs->type;
00734   }
00735 
00736   //
00737   // Copy svs if needed
00738   //
00739   if (ntype == VestaSource::immutableFile) {
00740     // Look up the fingerprint to check for an existing local
00741     // replica; if none, copy the remote file.
00742 
00743     err = ReplicateImmFile(svs, swho, &sid);
00744     if (err != VestaSource::ok) goto done;
00745 
00746   } else if (ntype == VestaSource::immutableDirectory) {
00747     // 
00748     // Walk over the tree below svs and make a local copy as
00749     // a mutable directory tree.  For each node, look up the
00750     // fingerprint to check for an existing local replica before
00751     // copying.  For immutable directories that must be copied, look
00752     // up the fingerprint of the base to decide whether to do a full
00753     // copy or delta copy.  We need to avoid holding the local lock
00754     // during remote operations, especially file copies, so we do an
00755     // acquire/lookup/insert/release cycle for each change to the
00756     // local tree.
00757     //
00758 
00759     //
00760     // Determine where to put the copy
00761     //
00762     mroot = VestaSource::mutableRoot(LongId::writeLock, &lock);
00763     RWLOCK_LOCKED_REASON(lock, "Replicate (determine where to put the copy)");
00764     err = mroot->lookup(REPLICATOR_DIR, mpar, NULL);
00765     if (err == VestaSource::notFound) {
00766       err = mroot->insertMutableDirectory(REPLICATOR_DIR, NULL, true, NULL,
00767                                           VestaSource::dontReplace, &mpar);
00768       if (err == VestaSource::ok) {
00769         err = mpar->setAttrib("#mode", "000");
00770       }
00771     }
00772     if (err != VestaSource::ok) {
00773       Repos::dprintf(DBG_REPLICATION,
00774                      "Replicate: \"%s\" lookup or creation error: %s\n", 
00775                      REPLICATOR_DIR, VestaSource::errorCodeString(err));
00776       goto done;
00777     }
00778     lock->release();
00779     lock = NULL;
00780     
00781     FP::Tag fp = UniqueId();
00782     sprintf(mname,
00783             "%016" FORMAT_LENGTH_INT_64 "x"
00784             "%016" FORMAT_LENGTH_INT_64 "x",
00785             fp.Word0(), fp.Word1());
00786 
00787     err = ReplicateImmDir(svs, swho, mpar, mname, &mvs);
00788     if (err != VestaSource::ok) goto done;
00789   }
00790 
00791   // Take write lock, lookup destination parent dpar
00792   droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00793   RWLOCK_LOCKED_REASON(lock, "Replicate (destination parent checks)");
00794   if (*head == '\000') {
00795     dpar = droot;
00796   } else {
00797     err = droot->lookupPathname(head, dpar, dwho, pathnameSep);
00798     if (err != VestaSource::ok) {
00799       Repos::dprintf(DBG_REPLICATION,
00800                      "Replicate: \"%s\" destination parent lookup error: %s\n", 
00801                      pathname, VestaSource::errorCodeString(err));
00802       goto done;
00803     }
00804   }
00805 
00806   // Before inserting the new dvs, look up the old dvs under the lock
00807   // to do final checking.
00808   err = dpar->lookup(tail, dvs, dwho);
00809   if (err != VestaSource::ok && err != VestaSource::notFound) {
00810     Repos::dprintf(DBG_REPLICATION,
00811                    "Replicate: \"%s\" old object relookup error: %s\n", 
00812                    pathname, VestaSource::errorCodeString(err));
00813     goto done;
00814   }    
00815 
00816   //
00817   // Check for svs vs. dvs agreement violations -- not done earlier
00818   //
00819   if (dvs &&
00820       ((svs->master && dvs->master) ||
00821        (svs->type == VestaSource::stub && svs->master &&
00822         dvs->type != VestaSource::stub && dvs->type != VestaSource::ghost) ||
00823        (dvs->type == VestaSource::stub && dvs->master &&
00824         svs->type != VestaSource::stub && svs->type != VestaSource::ghost) ||
00825        (svs->type != VestaSource::stub && svs->type != VestaSource::ghost &&
00826         dvs->type != VestaSource::stub && dvs->type != VestaSource::ghost &&
00827         svs->type != dvs->type))) {
00828     err = VestaSource::inappropriateOp;
00829     Repos::dprintf(DBG_REPLICATION,
00830                    "Replicate: \"%s\" agreement violation, source %s %s vs. dest %s %s\n",
00831                    pathname, svs->master ? "master" : "nonmaster",
00832                    VestaSource::typeTagString(svs->type),
00833                    dvs->master ? "master" : "nonmaster",
00834                    VestaSource::typeTagString(dvs->type));
00835     goto done;    
00836   }
00837 
00838   //
00839   // Finally, insert the new object in its appendable parent
00840   //
00841   switch (ntype) {
00842   case VestaSource::stub:
00843     err = dpar->insertStub(tail, false, NULL,
00844                            VestaSource::replaceDiff, NULL, svs->timestamp());
00845     break;
00846   case VestaSource::ghost:
00847     err = dpar->insertGhost(tail, (dvs && dvs->master), NULL,
00848                             VestaSource::replaceDiff, NULL, svs->timestamp());
00849     break;
00850   case VestaSource::immutableFile:
00851     err = dpar->insertFile(tail, sid, (dvs && dvs->master), NULL,
00852                            VestaSource::replaceDiff, NULL, svs->timestamp(),
00853                            &svs->fptag);
00854     break;
00855   case VestaSource::immutableDirectory: {
00856     // Need to relookup mvs because we released and reacquired the lock
00857     VestaSource* nmvs = mvs->longid.lookup(LongId::checkLock, &lock);
00858     if (nmvs == NULL) {
00859       err = VestaSource::notFound;
00860       Repos::dprintf(DBG_REPLICATION,
00861                      "Replicate: \"%s\" temporary copy relookup error: %s\n", 
00862                      pathname, VestaSource::errorCodeString(err));
00863       goto done;
00864     }
00865     err = dpar->insertImmutableDirectory(tail, nmvs, (dvs && dvs->master),
00866                                          NULL, VestaSource::replaceDiff,
00867                                          NULL, svs->timestamp(), &svs->fptag);
00868     delete nmvs;
00869     // Need to relookup mpar because we released and reacquired the lock
00870     VestaSource* nmpar = mpar->longid.lookup(LongId::checkLock, &lock);
00871     if (nmpar == NULL) {
00872       err = VestaSource::notFound;
00873       Repos::dprintf(DBG_REPLICATION,
00874                      "Replicate: \"%s\" relookup error: %s\n", 
00875                      REPLICATOR_DIR, VestaSource::errorCodeString(err));
00876       goto done;
00877     }
00878     nmpar->reallyDelete(mname);
00879     delete nmpar;
00880     break; }
00881   case VestaSource::appendableDirectory:
00882     err = dpar->insertAppendableDirectory(tail, false, NULL,
00883                                           VestaSource::replaceDiff,
00884                                           NULL, svs->timestamp());
00885     break;
00886   default:
00887     assert(false);
00888     break;
00889   }      
00890   if (err != VestaSource::ok) {
00891     Repos::dprintf(DBG_REPLICATION,
00892                    "Replicate: \"%s\" destination insert error: %s\n", 
00893                    pathname, VestaSource::errorCodeString(err));
00894     goto done;
00895   }
00896 
00897  done:
00898   if (mvs) delete mvs;
00899   if (mpar) delete mpar;
00900   if (svs) delete svs;
00901   if (sroot) delete sroot;
00902   if (dvs) delete dvs;
00903   if (dpar && dpar != droot) delete dpar;
00904   // do not delete droot
00905   if (lock) lock->release();
00906   free(head);
00907   return err;
00908 }
00909 
00910 
00911 struct ReplicateAttribsClosure {
00912   VestaSource::errorCode err;
00913   VestaSource* dvs;
00914   bool includeAccess;
00915 };
00916 
00917 
00918 static bool
00919 ReplicateAttribsCallback(void* closure, VestaAttribs::attribOp op,
00920                          const char* name, const char* value,
00921                          time_t timestamp)
00922 {
00923   ReplicateAttribsClosure* cl = (ReplicateAttribsClosure*) closure;
00924   VestaSource* ndvs = NULL;
00925   bool ret = false;
00926   ReadersWritersLock* lock = NULL;
00927 
00928   if (cl->includeAccess || name[0] != '#') {
00929     // Need to relookup dvs
00930     ndvs = cl->dvs->longid.lookup(LongId::writeLock, &lock);
00931     RWLOCK_LOCKED_REASON(lock, "ReplicateAttribsCallback");
00932     if (ndvs == NULL) {
00933       Repos::dprintf(DBG_REPLICATION,
00934                      "ReplicateAttribsCallback: object relookup error\n");
00935       cl->err = VestaSource::notFound;
00936       goto done;
00937     }
00938     cl->err = ndvs->writeAttrib(op, name, value, NULL, timestamp);
00939     if (cl->err != VestaSource::ok) {
00940       Repos::dprintf(DBG_REPLICATION,
00941                      "ReplicateAttribsCallback: writeAttrib error: %s\n",
00942                      VestaSource::errorCodeString(cl->err));
00943       goto done;
00944     }
00945   }
00946   ret = true;
00947  done:
00948   if (ndvs) delete ndvs;
00949   if (lock) lock->release();
00950   return ret;
00951 }
00952 
00953 VestaSource::errorCode
00954 ReplicateAttribs(const char* pathname, bool includeAccess,
00955                  const char* srcHost, const char* srcPort, char pathnameSep,
00956                  AccessControl::Identity dwho, AccessControl::Identity swho)
00957   throw ()
00958 {
00959   VestaSource::errorCode err = VestaSource::ok;
00960   VestaSource* droot = NULL;
00961   VestaSource* dvs = NULL;
00962   VestaSource* sroot = NULL;
00963   VestaSource* svs = NULL;
00964   ReadersWritersLock* lock = NULL;  
00965 
00966   //
00967   // Look up the object in src, yielding svs.
00968   //
00969   try {
00970     sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00971     err = sroot->lookupPathname(pathname, svs, swho, pathnameSep);
00972 
00973   } catch (SRPC::failure f) {
00974     Repos::dprintf(DBG_REPLICATION,
00975                    "ReplicateAttribs: \"%s\" initial RPC to %s:%s failed: %s\n", 
00976                    pathname, srcHost, srcPort, f.msg.cchars());
00977     err = VestaSource::rpcFailure;
00978     goto done;
00979   }
00980   if (err != VestaSource::ok) {
00981     Repos::dprintf(DBG_REPLICATION,
00982                    "ReplicateAttribs: \"%s\" source object lookup error: %s\n", 
00983                    pathname, VestaSource::errorCodeString(err));
00984     goto done;
00985   }
00986  
00987   //
00988   // Look up the object locally, yielding dvs
00989   //
00990   droot = VestaSource::repositoryRoot(LongId::readLock, &lock);
00991   RWLOCK_LOCKED_REASON(lock, "ReplicateAttribs");
00992   err = droot->lookupPathname(pathname, dvs, dwho, pathnameSep);
00993   if (err != VestaSource::ok) {
00994     Repos::dprintf(DBG_REPLICATION,
00995                    "ReplicateAttribs: \"%s\" destination object lookup error: %s\n", 
00996                    pathname, VestaSource::errorCodeString(err));
00997     goto done;
00998   }
00999   if (dvs->type != svs->type) {
01000     err = VestaSource::inappropriateOp;
01001     Repos::dprintf(DBG_REPLICATION,
01002                    "ReplicateAttribs: \"%s\" object type mismatch\n", pathname);
01003     goto done;
01004   }
01005 
01006   //
01007   // Check permission
01008   //
01009   if (!ReplicationAccessCheck(dvs, "#replicate-from", srcHost, srcPort) &&
01010       (includeAccess || !ReplicationAccessCheck(dvs, "#replicate-from-noac",
01011                                                 srcHost, srcPort))) {
01012     Repos::dprintf(DBG_REPLICATION,
01013                    "Replicate: \"%s\" no permission to replicate from %s:%s\n",
01014                    pathname, srcHost, srcPort);
01015     err = VestaSource::noPermission;
01016     goto done;
01017   }
01018 
01019   // Release lock during remote operation
01020   lock->release();
01021   lock = NULL;
01022 
01023   //
01024   // Enumerate the remote attrib history and replicate each tuple
01025   //
01026   ReplicateAttribsClosure cl;
01027   cl.err = VestaSource::ok;
01028   cl.dvs = dvs;
01029   cl.includeAccess = includeAccess;
01030   try {
01031     svs->getAttribHistory(ReplicateAttribsCallback, &cl);
01032   } catch (SRPC::failure f) {
01033     Repos::dprintf(DBG_REPLICATION,
01034                    "ReplicateAttribs: \"%s\" getAttribHistory SRPC failure: %s\n",
01035                    pathname, f.msg.cchars());
01036     err = VestaSource::rpcFailure;
01037     goto done;
01038   }
01039   err = cl.err;
01040 
01041  done:
01042   if (lock) lock->release();
01043   if (svs) delete svs;
01044   if (sroot) delete sroot;
01045   if (dvs) delete dvs;
01046   // do not delete droot
01047   return err;
01048 }

Generated on Mon May 8 00:48:46 2006 for Vesta by  doxygen 1.4.2