00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "VestaSource.H"
00028 #include "VDirSurrogate.H"
00029 #include "VRConcurrency.H"
00030 #include "Mastership.H"
00031 #include "Replication.H"
00032 #include "UniqueId.H"
00033 #include "logging.H"
00034 #include "FPShortId.H"
00035
00036 #include "lock_timing.H"
00037
00038 #include <FdStream.H>
00039
00040 using FS::OFdStream;
00041
00042
00043 #define COPY_SIZE (128*1024)
00044 #define REPLICATOR_DIR ".replicator"
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 bool
00056 ReplicationAccessCheck(VestaSource* vs, const char* which,
00057 const char* srcHost, const char* srcPort) throw ()
00058 {
00059 bool free_vs = false;
00060 for (;;) {
00061 if (vs == NULL) return false;
00062 if (vs->getAttribConst(which)) break;
00063 VestaSource *vs_parent = vs->longid.getParent().lookup();
00064 if(free_vs)
00065 delete vs;
00066 vs = vs_parent;
00067
00068 free_vs = true;
00069 }
00070 char hostport[2*MAX_ARC_LEN+2];
00071 sprintf(hostport, "%s:%s", srcHost, srcPort);
00072 bool result = vs->inAttribs(which, hostport) || vs->inAttribs(which, "*");
00073 if(free_vs)
00074 delete vs;
00075 return result;
00076 }
00077
00078
00079
00080 static Table<Text, time_t>::Default bad_readWhole_peers;
00081
00082
00083 #define BAD_READWHOLE_TTL (60*60)
00084
00085 static bool Should_readWhole(const Text &host, const Text &port)
00086 {
00087 Text hostport = host;
00088 hostport += ":";
00089 hostport += port;
00090
00091 time_t failed;
00092
00093
00094 if(bad_readWhole_peers.Get(hostport, failed))
00095 {
00096 time_t now = time(0);
00097
00098 if((now - failed) < BAD_READWHOLE_TTL)
00099 return false;
00100 }
00101
00102 return true;
00103 }
00104
00105 static void Failed_readWhole(const Text &host, const Text &port)
00106 {
00107 Text hostport = host;
00108 hostport += ":";
00109 hostport += port;
00110
00111 time_t now = time(0);
00112
00113
00114 (void) bad_readWhole_peers.Put(hostport, now);
00115 }
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128 static VestaSource::errorCode
00129 ReplicateImmFile(VestaSource* svs, AccessControl::Identity swho,
00130 ShortId* sid) throw ()
00131 {
00132 ShortId newsid;
00133 VestaSource::errorCode err;
00134
00135
00136 newsid = GetFPShortId(svs->fptag);
00137 if (newsid != NullShortId) {
00138 *sid = newsid;
00139 return VestaSource::ok;
00140 }
00141
00142
00143 int newfd;
00144 try
00145 {
00146 newfd = SourceOrDerived::fdcreate(newsid);
00147 }
00148 catch (SourceOrDerived::Fatal f)
00149 {
00150 Repos::dprintf(DBG_ALWAYS,
00151 "ReplicateImmFile: file creation error: %s "
00152 "(maybe the repository server is running as "
00153 "the wrong user?)\n",
00154 f.msg.cchars());
00155
00156
00157
00158
00159
00160
00161 return VestaSource::noPermission;
00162 }
00163 if (newfd < 0) {
00164 int saved_errno = errno;
00165 Text etxt = Basics::errno_Text(saved_errno);
00166 Repos::dprintf(DBG_REPLICATION,
00167 "ReplicateImmFile: file creation error: %s\n", etxt.cchars());
00168 *sid = NullShortId;
00169 return Repos::errno_to_errorCode(saved_errno);
00170 }
00171
00172 int res;
00173 bool did_readWhole = false;
00174
00175 if(Should_readWhole(svs->host(), svs->port()))
00176 try
00177 {
00178
00179
00180 OFdStream new_stream(newfd);
00181 err = svs->readWhole(new_stream, swho);
00182 did_readWhole = true;
00183 }
00184 catch(SRPC::failure f)
00185 {
00186 if((f.r == SRPC::version_skew) &&
00187 (f.msg == "VestaSourceSRPC: Unknown proc_id"))
00188 {
00189
00190
00191
00192
00193 Repos::dprintf(DBG_REPLICATION,
00194 "ReplicateImmFile: %s:%s doesn't seem to support "
00195 "readWhole, falling back to old method\n",
00196 svs->host().cchars(), svs->port().cchars());
00197 Failed_readWhole(svs->host(), svs->port());
00198
00199 }
00200 else
00201 {
00202
00203
00204
00205 Repos::dprintf(DBG_REPLICATION,
00206 "ReplicateImmFile: readWhole failed at %s:%s: %s\n",
00207 svs->host().cchars(), svs->port().cchars(),
00208 f.msg.cchars());
00209 do
00210 res = close(newfd);
00211 while ((res == -1) && (errno == EINTR));
00212 *sid = NullShortId;
00213 return VestaSource::rpcFailure;
00214 }
00215 }
00216
00217
00218 if(!did_readWhole)
00219 {
00220 try
00221 {
00222 Basics::uint64 n = svs->size();
00223 Basics::uint64 offset = 0;
00224 while (n > 0) {
00225 char buf[COPY_SIZE];
00226 int len = COPY_SIZE;
00227 if (len > n) len = n;
00228 err = svs->read(buf, &len, offset, swho);
00229 if (err != VestaSource::ok) {
00230 Repos::dprintf(DBG_REPLICATION,
00231 "ReplicateImmFile: file read error: %s\n",
00232 VestaSource::errorCodeString(err));
00233 do
00234 res = close(newfd);
00235 while ((res == -1) && (errno == EINTR));
00236 *sid = NullShortId;
00237 return err;
00238 }
00239 if (len == 0) break;
00240 do
00241 len = write(newfd, buf, len);
00242 while((len == -1) && (errno == EINTR));
00243 if (len < 0) {
00244 int saved_errno = errno;
00245 Text etxt = Basics::errno_Text(saved_errno);
00246 Repos::dprintf(DBG_REPLICATION,
00247 "ReplicateImmFile: file write error: %s\n",
00248 etxt.cchars());
00249 do
00250 res = close(newfd);
00251 while ((res == -1) && (errno == EINTR));
00252 *sid = NullShortId;
00253 return Repos::errno_to_errorCode(saved_errno);
00254 }
00255 n -= len;
00256 offset += len;
00257 }
00258 }
00259 catch (SRPC::failure f)
00260 {
00261 Repos::dprintf(DBG_REPLICATION,
00262 "ReplicateImmFile: file read SRPC failure: %s\n",
00263 f.msg.cchars());
00264 do
00265 res = close(newfd);
00266 while ((res == -1) && (errno == EINTR));
00267 *sid = NullShortId;
00268 return VestaSource::rpcFailure;
00269 }
00270 }
00271
00272
00273
00274 struct stat st;
00275 res = fstat(newfd, &st);
00276 assert(res != -1);
00277 st.st_mode &= ~0222;
00278 if (svs->executable()) {
00279 st.st_mode |= 0111;
00280 }
00281 res = fchmod(newfd, st.st_mode);
00282 assert(res != -1);
00283
00284 res = fsync(newfd);
00285 assert(res != -1);
00286 do
00287 res = close(newfd);
00288 while ((res == -1) && (errno == EINTR));
00289 assert(res != -1);
00290 time_t ts = svs->timestamp();
00291 char *path = ShortIdBlock::shortIdToName(newsid);
00292 struct timeval tvp[2];
00293 tvp[0].tv_sec = ts;
00294 tvp[0].tv_usec = 0;
00295 tvp[1].tv_sec = ts;
00296 tvp[1].tv_usec = 0;
00297 res = utimes(path, tvp);
00298 assert(res != -1);
00299 *sid = newsid;
00300 delete [] path;
00301 return VestaSource::ok;
00302 }
00303
00304 struct ReplicateImmDirClosure {
00305 VestaSource* spar;
00306 AccessControl::Identity swho;
00307 VestaSource* mpar;
00308 VestaSource::errorCode err;
00309 };
00310
00311 static VestaSource::errorCode
00312 ReplicateImmDir(VestaSource* svs, AccessControl::Identity swho,
00313 VestaSource* mpar, Arc arc, VestaSource** mvsret) throw ();
00314
00315 static bool
00316 ReplicateImmDirCallback(void* closure, VestaSource::typeTag type,
00317 Arc arc, unsigned int index, Bit32 pseudoInode,
00318 ShortId filesid, bool master) throw ()
00319 {
00320 ReplicateImmDirClosure* cl = (ReplicateImmDirClosure*) closure;
00321 VestaSource* svs = NULL;
00322 VestaSource* mvs = NULL;
00323 bool ret = false;
00324 ReadersWritersLock* lock = NULL;
00325
00326 if (type != VestaSource::deleted) {
00327 try {
00328 cl->err = cl->spar->lookupIndex(index, svs);
00329 } catch (SRPC::failure f) {
00330 Repos::dprintf(DBG_REPLICATION,
00331 "ReplicateImmDir: lookupIndex SRPC failure: %s\n",
00332 f.msg.cchars());
00333 cl->err = VestaSource::rpcFailure;
00334 goto done;
00335 }
00336 if (cl->err != VestaSource::ok) {
00337 Repos::dprintf(DBG_REPLICATION,
00338 "ReplicateImmDirCallback: lookupIndex error: %s\n",
00339 VestaSource::errorCodeString(cl->err));
00340 goto done;
00341 }
00342 }
00343
00344 switch (type) {
00345 case VestaSource::immutableFile: {
00346 ShortId sid;
00347 cl->err = ReplicateImmFile(svs, cl->swho, &sid);
00348 if (cl->err != VestaSource::ok) {
00349 goto done;
00350 }
00351
00352 VestaSource* nmpar = cl->mpar->longid.lookup(LongId::writeLock, &lock);
00353 RWLOCK_LOCKED_REASON(lock,
00354 "ReplicateImmDirCallback (relookup mpar for immutableFile)");
00355 if (nmpar == NULL) {
00356 Repos::dprintf(DBG_REPLICATION,
00357 "ReplicateImmDirCallback: mutable parent relookup error\n");
00358 cl->err = VestaSource::notFound;
00359 goto done;
00360 }
00361 cl->err = nmpar->insertFile(arc, sid, false, NULL,
00362 VestaSource::replaceDiff,
00363 NULL, svs->timestamp(), &svs->fptag);
00364 delete nmpar;
00365 if (cl->err != VestaSource::ok) {
00366 Repos::dprintf(DBG_REPLICATION,
00367 "ReplicateImmDirCallback: insert file error %s\n",
00368 VestaSource::errorCodeString(cl->err));
00369 goto done;
00370 }
00371 break; }
00372
00373 case VestaSource::immutableDirectory: {
00374 cl->err = ReplicateImmDir(svs, cl->swho, cl->mpar, arc, &mvs);
00375 if (cl->err != VestaSource::ok) {
00376 goto done;
00377 }
00378
00379
00380
00381 VestaSource* nmpar = cl->mpar->longid.lookup(LongId::writeLock, &lock);
00382 RWLOCK_LOCKED_REASON(lock,
00383 "ReplicateImmDirCallback (make immutable, set fingerprint)");
00384 if (nmpar == NULL) {
00385 cl->err = VestaSource::notFound;
00386 Repos::dprintf(DBG_REPLICATION,
00387 "ReplicateImmDirCallback: mutable parent relookup error: %s\n",
00388 VestaSource::errorCodeString(cl->err));
00389 goto done;
00390 }
00391 VestaSource* nmvs = mvs->longid.lookup(LongId::checkLock, &lock);
00392 if (nmvs == NULL) {
00393 cl->err = VestaSource::notFound;
00394 Repos::dprintf(DBG_REPLICATION,
00395 "ReplicateImmDir: mutable child relookup error: %s\n",
00396 VestaSource::errorCodeString(cl->err));
00397 delete nmpar;
00398 goto done;
00399 }
00400 cl->err = nmpar->insertImmutableDirectory(arc, nmvs, false, NULL,
00401 VestaSource::replaceDiff, NULL,
00402 svs->timestamp(), &svs->fptag);
00403 delete nmpar;
00404 delete nmvs;
00405 if (cl->err != VestaSource::ok) {
00406 Repos::dprintf(DBG_REPLICATION,
00407 "ReplicateImmDirCallback: reinsert directory error: %s\n",
00408 VestaSource::errorCodeString(cl->err));
00409 goto done;
00410 }
00411 break; }
00412
00413 case VestaSource::deleted: {
00414
00415 VestaSource* nmpar = cl->mpar->longid.lookup(LongId::writeLock, &lock);
00416 RWLOCK_LOCKED_REASON(lock, "ReplicateImmDirCallback (relookup mpar for deleted)");
00417 if (nmpar == NULL) {
00418 Repos::dprintf(DBG_REPLICATION,
00419 "ReplicateImmDirCallback: mutable parent relookup error\n");
00420 cl->err = VestaSource::notFound;
00421 goto done;
00422 }
00423 cl->err = nmpar->reallyDelete(arc, NULL, false);
00424 delete nmpar;
00425 if (cl->err != VestaSource::ok) {
00426 Repos::dprintf(DBG_REPLICATION,
00427 "ReplicateImmDirCallback: deletion error %s\n",
00428 VestaSource::errorCodeString(cl->err));
00429 goto done;
00430 }
00431 break;}
00432
00433 default:
00434 assert(false);
00435 }
00436 ret = true;
00437 done:
00438 if (svs) delete svs;
00439 if (mvs) delete mvs;
00440 if (lock) lock->release();
00441 return ret;
00442 }
00443
00444
00445
00446
00447
00448
00449 static VestaSource::errorCode
00450 ReplicateImmDir(VestaSource* svs, AccessControl::Identity swho,
00451 VestaSource* mpar, Arc arc, VestaSource** mvsret) throw ()
00452 {
00453 VestaSource::errorCode err = VestaSource::ok;
00454 VestaSource* evs = NULL;
00455 VestaSource* mvs = NULL;
00456 ReadersWritersLock* lock = &StableLock;
00457 VestaSource* dbase = NULL;
00458
00459 lock->acquireWrite();
00460 RWLOCK_LOCKED_REASON(lock, "ReplicateImmDir (use existing)");
00461
00462 ShortId sid = GetFPShortId(svs->fptag);
00463 if (sid != NullShortId) {
00464
00465 evs = LongId::fromShortId(sid).lookup(LongId::checkLock, &lock);
00466 assert(evs != NULL);
00467
00468 VestaSource* nmpar = mpar->longid.lookup(LongId::checkLock, &lock);
00469 if (nmpar == NULL) {
00470 err = VestaSource::notFound;
00471 Repos::dprintf(DBG_REPLICATION,
00472 "ReplicateImmDir: mutable parent relookup error: %s\n",
00473 VestaSource::errorCodeString(err));
00474 goto done;
00475 }
00476 err = nmpar->insertImmutableDirectory(arc, evs, false, NULL,
00477 VestaSource::replaceDiff,
00478 &mvs, svs->timestamp(), &svs->fptag);
00479 delete nmpar;
00480 } else {
00481
00482
00483
00484 VestaSource* sbase;
00485
00486 lock->release();
00487 lock = NULL;
00488 try {
00489 err = svs->getBase(sbase, swho);
00490 } catch (SRPC::failure f) {
00491 Repos::dprintf(DBG_REPLICATION,
00492 "ReplicateImmDir: getBase SRPC failure: %s\n", f.msg.cchars());
00493 err = VestaSource::rpcFailure;
00494 goto done;
00495 }
00496 lock = &StableLock;
00497 lock->acquireWrite();
00498 RWLOCK_LOCKED_REASON(lock, "ReplicateImmDir (copy)");
00499 if (err == VestaSource::ok) {
00500
00501 sid = GetFPShortId(sbase->fptag);
00502 delete sbase;
00503 if (sid != NullShortId) {
00504
00505 dbase = LongId::fromShortId(sid).lookup(LongId::checkLock, &lock);
00506 assert(dbase != NULL);
00507 }
00508 }
00509
00510 VestaSource* nmpar = mpar->longid.lookup(LongId::checkLock, &lock);
00511 if (nmpar == NULL) {
00512 err = VestaSource::notFound;
00513 Repos::dprintf(DBG_REPLICATION,
00514 "ReplicateImmDir: mutable parent relookup error: %s\n",
00515 VestaSource::errorCodeString(err));
00516 goto done;
00517 }
00518 err = nmpar->insertMutableDirectory(arc, dbase, false, NULL,
00519 VestaSource::replaceDiff, &mvs,
00520 svs->timestamp());
00521 delete nmpar;
00522 if (err != VestaSource::ok) {
00523 Repos::dprintf(DBG_REPLICATION, "ReplicateImmDir: insert directory error: %s\n",
00524 VestaSource::errorCodeString(err));
00525 goto done;
00526 }
00527
00528
00529 lock->release();
00530 lock = NULL;
00531
00532
00533 ReplicateImmDirClosure cl;
00534 cl.spar = svs;
00535 cl.swho = swho;
00536 cl.mpar = mvs;
00537 cl.err = VestaSource::ok;
00538 try {
00539 err = svs->list(0, ReplicateImmDirCallback, &cl, swho, (dbase != NULL));
00540 } catch (SRPC::failure f) {
00541 Repos::dprintf(DBG_REPLICATION,
00542 "ReplicateImmDir: list directory SRPC failure: %s\n",
00543 f.msg.cchars());
00544 err = VestaSource::rpcFailure;
00545 goto done;
00546 }
00547 if (err != VestaSource::ok) {
00548 Repos::dprintf(DBG_REPLICATION, "ReplicateImmDir: list directory error: %s\n",
00549 VestaSource::errorCodeString(err));
00550 goto done;
00551 }
00552 if (cl.err != VestaSource::ok) {
00553 err = cl.err;
00554 goto done;
00555 }
00556 }
00557 done:
00558 if (lock) lock->release();
00559 if (evs) delete evs;
00560 if (dbase) delete dbase;
00561 if (err == VestaSource::ok) {
00562 *mvsret = mvs;
00563 } else {
00564 if (mvs) delete mvs;
00565 }
00566 return err;
00567 }
00568
00569 static bool
00570 ReplicationCleanupCallback(void* closure, VestaSource::typeTag type,
00571 Arc arc, unsigned int index, Bit32 pseudoInode,
00572 ShortId filesid, bool master) throw ()
00573 {
00574 VestaSource* mpar = (VestaSource*) closure;
00575 mpar->reallyDelete(arc);
00576 return true;
00577 }
00578
00579
00580
00581
00582 void
00583 ReplicationCleanup() throw ()
00584 {
00585 VestaSource* mroot;
00586 VestaSource* mpar;
00587 ReadersWritersLock* lock;
00588 VestaSource::errorCode err;
00589
00590 mroot = VestaSource::mutableRoot(LongId::writeLock, &lock);
00591 RWLOCK_LOCKED_REASON(lock, "ReplicationCleanup");
00592 err = mroot->lookup(REPLICATOR_DIR, mpar, NULL);
00593 if (err == VestaSource::ok) {
00594 mpar->list(0, ReplicationCleanupCallback, mpar);
00595 }
00596 lock->release();
00597 delete mpar;
00598 }
00599
00600 VestaSource::errorCode
00601 Replicate(const char* pathname, bool asStub, bool asGhost,
00602 const char* srcHost, const char* srcPort, char pathnameSep,
00603 AccessControl::Identity dwho, AccessControl::Identity swho) throw ()
00604 {
00605 if (asStub && asGhost) return VestaSource::invalidArgs;
00606 VestaSource::errorCode err = VestaSource::ok;
00607 VestaSource* droot = NULL;
00608 VestaSource* dpar = NULL;
00609 VestaSource* dvs = NULL;
00610 VestaSource* sroot = NULL;
00611 VestaSource* svs = NULL;
00612 VestaSource* mroot = NULL;
00613 VestaSource* mpar = NULL;
00614 VestaSource* mvs = NULL;
00615 char mname[FP::ByteCnt*2+1];
00616 ShortId sid = NullShortId;
00617 ReadersWritersLock* lock = NULL;
00618
00619
00620
00621
00622 char* head = strdup(pathname);
00623 const char* tail;
00624 char* p = strrchr(head, pathnameSep);
00625 if (p) {
00626
00627 *p = '\000';
00628 tail = p + 1;
00629 } else {
00630
00631 *head = '\000';
00632 tail = pathname;
00633 }
00634
00635
00636
00637
00638
00639 droot = VestaSource::repositoryRoot(LongId::readLock, &lock);
00640 RWLOCK_LOCKED_REASON(lock, "Replicate (initial lookup, permissions check)");
00641 if (*head == '\000') {
00642 dpar = droot;
00643 } else {
00644 err = droot->lookupPathname(head, dpar, dwho, pathnameSep);
00645 if (err != VestaSource::ok) {
00646 Repos::dprintf(DBG_REPLICATION,
00647 "Replicate: \"%s\" destination parent lookup error: %s\n",
00648 pathname, VestaSource::errorCodeString(err));
00649 goto done;
00650 }
00651 }
00652 err = dpar->lookup(tail, dvs, dwho);
00653 if (err != VestaSource::ok && err != VestaSource::notFound) {
00654 Repos::dprintf(DBG_REPLICATION,
00655 "Replicate: \"%s\" old object lookup error: %s\n",
00656 pathname, VestaSource::errorCodeString(err));
00657 goto done;
00658 }
00659
00660
00661
00662
00663 if (dpar->type == VestaSource::immutableDirectory) {
00664 err = VestaSource::inappropriateOp;
00665 Repos::dprintf(DBG_REPLICATION,
00666 "Replicate: \"%s\" destination parent is immutable\n", pathname);
00667 goto done;
00668 }
00669 if (dvs) {
00670 if (asStub || asGhost ||
00671 (dvs->type != VestaSource::stub && dvs->type != VestaSource::ghost) ||
00672 (dvs->type == VestaSource::stub && dvs->master)) {
00673 err = VestaSource::nameInUse;
00674 Repos::dprintf(DBG_REPLICATION,
00675 "Replicate: \"%s\" destination object exists\n", pathname);
00676 goto done;
00677 }
00678 }
00679
00680
00681
00682
00683 if (!ReplicationAccessCheck((dvs ? dvs : dpar),
00684 "#replicate-from", srcHost, srcPort) &&
00685 !ReplicationAccessCheck((dvs ? dvs : dpar),
00686 "#replicate-from-noac", srcHost, srcPort)) {
00687 Repos::dprintf(DBG_REPLICATION,
00688 "Replicate: \"%s\" no permission to replicate from %s:%s\n",
00689 pathname, srcHost, srcPort);
00690 err = VestaSource::noPermission;
00691 goto done;
00692 }
00693
00694
00695
00696
00697 if (dpar != droot) delete dpar;
00698 dpar = NULL;
00699 if (dvs) delete dvs;
00700 dvs = NULL;
00701 lock->release();
00702 lock = NULL;
00703
00704
00705
00706
00707 try {
00708 sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00709 err = sroot->lookupPathname(pathname, svs, swho, pathnameSep);
00710
00711 } catch (SRPC::failure f) {
00712 Repos::dprintf(DBG_REPLICATION,
00713 "Replicate: \"%s\" initial RPC to %s:%s failed: %s\n",
00714 pathname, srcHost, srcPort, f.msg.cchars());
00715 err = VestaSource::rpcFailure;
00716 goto done;
00717 }
00718 if (err != VestaSource::ok) {
00719 Repos::dprintf(DBG_REPLICATION, "Replicate: \"%s\" remote lookup failed: %s\n",
00720 pathname, VestaSource::errorCodeString(err));
00721 goto done;
00722 }
00723
00724
00725
00726
00727 VestaSource::typeTag ntype;
00728 if (asStub) {
00729 ntype = VestaSource::stub;
00730 } else if (asGhost) {
00731 ntype = VestaSource::ghost;
00732 } else {
00733 ntype = svs->type;
00734 }
00735
00736
00737
00738
00739 if (ntype == VestaSource::immutableFile) {
00740
00741
00742
00743 err = ReplicateImmFile(svs, swho, &sid);
00744 if (err != VestaSource::ok) goto done;
00745
00746 } else if (ntype == VestaSource::immutableDirectory) {
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762 mroot = VestaSource::mutableRoot(LongId::writeLock, &lock);
00763 RWLOCK_LOCKED_REASON(lock, "Replicate (determine where to put the copy)");
00764 err = mroot->lookup(REPLICATOR_DIR, mpar, NULL);
00765 if (err == VestaSource::notFound) {
00766 err = mroot->insertMutableDirectory(REPLICATOR_DIR, NULL, true, NULL,
00767 VestaSource::dontReplace, &mpar);
00768 if (err == VestaSource::ok) {
00769 err = mpar->setAttrib("#mode", "000");
00770 }
00771 }
00772 if (err != VestaSource::ok) {
00773 Repos::dprintf(DBG_REPLICATION,
00774 "Replicate: \"%s\" lookup or creation error: %s\n",
00775 REPLICATOR_DIR, VestaSource::errorCodeString(err));
00776 goto done;
00777 }
00778 lock->release();
00779 lock = NULL;
00780
00781 FP::Tag fp = UniqueId();
00782 sprintf(mname,
00783 "%016" FORMAT_LENGTH_INT_64 "x"
00784 "%016" FORMAT_LENGTH_INT_64 "x",
00785 fp.Word0(), fp.Word1());
00786
00787 err = ReplicateImmDir(svs, swho, mpar, mname, &mvs);
00788 if (err != VestaSource::ok) goto done;
00789 }
00790
00791
00792 droot = VestaSource::repositoryRoot(LongId::writeLock, &lock);
00793 RWLOCK_LOCKED_REASON(lock, "Replicate (destination parent checks)");
00794 if (*head == '\000') {
00795 dpar = droot;
00796 } else {
00797 err = droot->lookupPathname(head, dpar, dwho, pathnameSep);
00798 if (err != VestaSource::ok) {
00799 Repos::dprintf(DBG_REPLICATION,
00800 "Replicate: \"%s\" destination parent lookup error: %s\n",
00801 pathname, VestaSource::errorCodeString(err));
00802 goto done;
00803 }
00804 }
00805
00806
00807
00808 err = dpar->lookup(tail, dvs, dwho);
00809 if (err != VestaSource::ok && err != VestaSource::notFound) {
00810 Repos::dprintf(DBG_REPLICATION,
00811 "Replicate: \"%s\" old object relookup error: %s\n",
00812 pathname, VestaSource::errorCodeString(err));
00813 goto done;
00814 }
00815
00816
00817
00818
00819 if (dvs &&
00820 ((svs->master && dvs->master) ||
00821 (svs->type == VestaSource::stub && svs->master &&
00822 dvs->type != VestaSource::stub && dvs->type != VestaSource::ghost) ||
00823 (dvs->type == VestaSource::stub && dvs->master &&
00824 svs->type != VestaSource::stub && svs->type != VestaSource::ghost) ||
00825 (svs->type != VestaSource::stub && svs->type != VestaSource::ghost &&
00826 dvs->type != VestaSource::stub && dvs->type != VestaSource::ghost &&
00827 svs->type != dvs->type))) {
00828 err = VestaSource::inappropriateOp;
00829 Repos::dprintf(DBG_REPLICATION,
00830 "Replicate: \"%s\" agreement violation, source %s %s vs. dest %s %s\n",
00831 pathname, svs->master ? "master" : "nonmaster",
00832 VestaSource::typeTagString(svs->type),
00833 dvs->master ? "master" : "nonmaster",
00834 VestaSource::typeTagString(dvs->type));
00835 goto done;
00836 }
00837
00838
00839
00840
00841 switch (ntype) {
00842 case VestaSource::stub:
00843 err = dpar->insertStub(tail, false, NULL,
00844 VestaSource::replaceDiff, NULL, svs->timestamp());
00845 break;
00846 case VestaSource::ghost:
00847 err = dpar->insertGhost(tail, (dvs && dvs->master), NULL,
00848 VestaSource::replaceDiff, NULL, svs->timestamp());
00849 break;
00850 case VestaSource::immutableFile:
00851 err = dpar->insertFile(tail, sid, (dvs && dvs->master), NULL,
00852 VestaSource::replaceDiff, NULL, svs->timestamp(),
00853 &svs->fptag);
00854 break;
00855 case VestaSource::immutableDirectory: {
00856
00857 VestaSource* nmvs = mvs->longid.lookup(LongId::checkLock, &lock);
00858 if (nmvs == NULL) {
00859 err = VestaSource::notFound;
00860 Repos::dprintf(DBG_REPLICATION,
00861 "Replicate: \"%s\" temporary copy relookup error: %s\n",
00862 pathname, VestaSource::errorCodeString(err));
00863 goto done;
00864 }
00865 err = dpar->insertImmutableDirectory(tail, nmvs, (dvs && dvs->master),
00866 NULL, VestaSource::replaceDiff,
00867 NULL, svs->timestamp(), &svs->fptag);
00868 delete nmvs;
00869
00870 VestaSource* nmpar = mpar->longid.lookup(LongId::checkLock, &lock);
00871 if (nmpar == NULL) {
00872 err = VestaSource::notFound;
00873 Repos::dprintf(DBG_REPLICATION,
00874 "Replicate: \"%s\" relookup error: %s\n",
00875 REPLICATOR_DIR, VestaSource::errorCodeString(err));
00876 goto done;
00877 }
00878 nmpar->reallyDelete(mname);
00879 delete nmpar;
00880 break; }
00881 case VestaSource::appendableDirectory:
00882 err = dpar->insertAppendableDirectory(tail, false, NULL,
00883 VestaSource::replaceDiff,
00884 NULL, svs->timestamp());
00885 break;
00886 default:
00887 assert(false);
00888 break;
00889 }
00890 if (err != VestaSource::ok) {
00891 Repos::dprintf(DBG_REPLICATION,
00892 "Replicate: \"%s\" destination insert error: %s\n",
00893 pathname, VestaSource::errorCodeString(err));
00894 goto done;
00895 }
00896
00897 done:
00898 if (mvs) delete mvs;
00899 if (mpar) delete mpar;
00900 if (svs) delete svs;
00901 if (sroot) delete sroot;
00902 if (dvs) delete dvs;
00903 if (dpar && dpar != droot) delete dpar;
00904
00905 if (lock) lock->release();
00906 free(head);
00907 return err;
00908 }
00909
00910
00911 struct ReplicateAttribsClosure {
00912 VestaSource::errorCode err;
00913 VestaSource* dvs;
00914 bool includeAccess;
00915 };
00916
00917
00918 static bool
00919 ReplicateAttribsCallback(void* closure, VestaAttribs::attribOp op,
00920 const char* name, const char* value,
00921 time_t timestamp)
00922 {
00923 ReplicateAttribsClosure* cl = (ReplicateAttribsClosure*) closure;
00924 VestaSource* ndvs = NULL;
00925 bool ret = false;
00926 ReadersWritersLock* lock = NULL;
00927
00928 if (cl->includeAccess || name[0] != '#') {
00929
00930 ndvs = cl->dvs->longid.lookup(LongId::writeLock, &lock);
00931 RWLOCK_LOCKED_REASON(lock, "ReplicateAttribsCallback");
00932 if (ndvs == NULL) {
00933 Repos::dprintf(DBG_REPLICATION,
00934 "ReplicateAttribsCallback: object relookup error\n");
00935 cl->err = VestaSource::notFound;
00936 goto done;
00937 }
00938 cl->err = ndvs->writeAttrib(op, name, value, NULL, timestamp);
00939 if (cl->err != VestaSource::ok) {
00940 Repos::dprintf(DBG_REPLICATION,
00941 "ReplicateAttribsCallback: writeAttrib error: %s\n",
00942 VestaSource::errorCodeString(cl->err));
00943 goto done;
00944 }
00945 }
00946 ret = true;
00947 done:
00948 if (ndvs) delete ndvs;
00949 if (lock) lock->release();
00950 return ret;
00951 }
00952
00953 VestaSource::errorCode
00954 ReplicateAttribs(const char* pathname, bool includeAccess,
00955 const char* srcHost, const char* srcPort, char pathnameSep,
00956 AccessControl::Identity dwho, AccessControl::Identity swho)
00957 throw ()
00958 {
00959 VestaSource::errorCode err = VestaSource::ok;
00960 VestaSource* droot = NULL;
00961 VestaSource* dvs = NULL;
00962 VestaSource* sroot = NULL;
00963 VestaSource* svs = NULL;
00964 ReadersWritersLock* lock = NULL;
00965
00966
00967
00968
00969 try {
00970 sroot = VDirSurrogate::repositoryRoot(srcHost, srcPort);
00971 err = sroot->lookupPathname(pathname, svs, swho, pathnameSep);
00972
00973 } catch (SRPC::failure f) {
00974 Repos::dprintf(DBG_REPLICATION,
00975 "ReplicateAttribs: \"%s\" initial RPC to %s:%s failed: %s\n",
00976 pathname, srcHost, srcPort, f.msg.cchars());
00977 err = VestaSource::rpcFailure;
00978 goto done;
00979 }
00980 if (err != VestaSource::ok) {
00981 Repos::dprintf(DBG_REPLICATION,
00982 "ReplicateAttribs: \"%s\" source object lookup error: %s\n",
00983 pathname, VestaSource::errorCodeString(err));
00984 goto done;
00985 }
00986
00987
00988
00989
00990 droot = VestaSource::repositoryRoot(LongId::readLock, &lock);
00991 RWLOCK_LOCKED_REASON(lock, "ReplicateAttribs");
00992 err = droot->lookupPathname(pathname, dvs, dwho, pathnameSep);
00993 if (err != VestaSource::ok) {
00994 Repos::dprintf(DBG_REPLICATION,
00995 "ReplicateAttribs: \"%s\" destination object lookup error: %s\n",
00996 pathname, VestaSource::errorCodeString(err));
00997 goto done;
00998 }
00999 if (dvs->type != svs->type) {
01000 err = VestaSource::inappropriateOp;
01001 Repos::dprintf(DBG_REPLICATION,
01002 "ReplicateAttribs: \"%s\" object type mismatch\n", pathname);
01003 goto done;
01004 }
01005
01006
01007
01008
01009 if (!ReplicationAccessCheck(dvs, "#replicate-from", srcHost, srcPort) &&
01010 (includeAccess || !ReplicationAccessCheck(dvs, "#replicate-from-noac",
01011 srcHost, srcPort))) {
01012 Repos::dprintf(DBG_REPLICATION,
01013 "Replicate: \"%s\" no permission to replicate from %s:%s\n",
01014 pathname, srcHost, srcPort);
01015 err = VestaSource::noPermission;
01016 goto done;
01017 }
01018
01019
01020 lock->release();
01021 lock = NULL;
01022
01023
01024
01025
01026 ReplicateAttribsClosure cl;
01027 cl.err = VestaSource::ok;
01028 cl.dvs = dvs;
01029 cl.includeAccess = includeAccess;
01030 try {
01031 svs->getAttribHistory(ReplicateAttribsCallback, &cl);
01032 } catch (SRPC::failure f) {
01033 Repos::dprintf(DBG_REPLICATION,
01034 "ReplicateAttribs: \"%s\" getAttribHistory SRPC failure: %s\n",
01035 pathname, f.msg.cchars());
01036 err = VestaSource::rpcFailure;
01037 goto done;
01038 }
01039 err = cl.err;
01040
01041 done:
01042 if (lock) lock->release();
01043 if (svs) delete svs;
01044 if (sroot) delete sroot;
01045 if (dvs) delete dvs;
01046
01047 return err;
01048 }