00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include <pthread.h>
00027 #include <assert.h>
00028 #include <pwd.h>
00029 #include <zlib.h>
00030 #include "VestaSource.H"
00031 #include "VestaSourceSRPC.H"
00032 #include "VDirSurrogate.H"
00033 #include "MultiSRPC.H"
00034 #include "VestaConfig.H"
00035 #include "ListResults.H"
00036
00037 using std::cerr;
00038 using std::endl;
00039
00040
00041 static int listChunkSize = 8300;
00042 static int listPerEntryOverhead = 40;
00043
00044
00045
00046 static int forceWritableExisting = 0;
00047
00048
00049 static int readWhole_recv_bufsiz = (64*1024);
00050 static int readWhole_inflate_bufsiz = (128*1024);
00051
00052 extern "C"
00053 {
00054 void
00055 VDirSurrogate_init()
00056 {
00057 try {
00058 Text val;
00059 if (VestaConfig::get("Repository",
00060 "VestaSourceSRPC_listChunkSize", val)) {
00061 listChunkSize = atoi(val.cchars());
00062 }
00063 if (VestaConfig::get("Repository",
00064 "VestaSourceSRPC_listPerEntryOverhead", val)) {
00065 listPerEntryOverhead = atoi(val.cchars());
00066 }
00067 if (VestaConfig::get("Repository", "forceWritableExisting", val)) {
00068 forceWritableExisting = atoi(val.cchars());
00069 }
00070 if (VestaConfig::get("Repository",
00071 "VestaSourceSRPC_readWhole_recv_bufsiz",
00072 val)) {
00073 readWhole_recv_bufsiz = atoi(val.cchars());
00074 }
00075 if (VestaConfig::get("Repository",
00076 "VestaSourceSRPC_readWhole_inflate_bufsiz",
00077 val)) {
00078 readWhole_inflate_bufsiz = atoi(val.cchars());
00079 }
00080 AccessControl::commonInit();
00081 } catch (VestaConfig::failure f) {
00082 cerr << "VestaConfig::failure " << f.msg << endl;
00083 throw;
00084 }
00085 }
00086 }
00087
00088 void
00089 VDirSurrogateInit()
00090 {
00091 static pthread_once_t vdsonce = PTHREAD_ONCE_INIT;
00092 pthread_once(&vdsonce, VDirSurrogate_init);
00093 }
00094
00095 Text
00096 VDirSurrogate::defaultHost() throw()
00097 {
00098 return VestaSourceSRPC::defaultHost();
00099 }
00100
00101 Text
00102 VDirSurrogate::defaultPort() throw()
00103 {
00104 return VestaSourceSRPC::defaultInterface();
00105 }
00106
00107 VDirSurrogate::VDirSurrogate(time_t timestamp, ShortId sid,
00108 const Text host__, const Text port__,
00109 int executable, Basics::uint64 size)
00110 throw(SRPC::failure)
00111 {
00112 VDirSurrogateInit();
00113 timestampCache = timestamp;
00114 sidCache = sid;
00115 if (host__.Empty()) {
00116 host_ = defaultHost();
00117 port_ = defaultPort();
00118 } else {
00119 host_ = host__;
00120 port_ = port__;
00121 }
00122 executableCache = executable;
00123 sizeCache = size;
00124 }
00125
00126
00127 static VestaSource::errorCode
00128 GetLookupResult(SRPC* srpc, VestaSource::typeTag& type, LongId& olongid,
00129 bool& master, Bit32& pseudoInode, ShortId& sid,
00130 time_t& timestamp, Bit8*& attribs, FP::Tag& fptag)
00131 {
00132 VestaSource::errorCode err =
00133 (VestaSource::errorCode) srpc->recv_int();
00134 if (err == VestaSource::ok) {
00135 type = (VestaSource::typeTag) srpc->recv_int();
00136 int len = sizeof(olongid.value);
00137 srpc->recv_bytes_here((char *) &olongid.value, len);
00138 master = (bool) srpc->recv_int();
00139 pseudoInode = (Bit32) srpc->recv_int();
00140 sid = srpc->recv_int();
00141 timestamp = (time_t) srpc->recv_int();
00142 attribs = (Bit8*) (bool) srpc->recv_int();
00143 fptag.Recv(*srpc);
00144 }
00145 srpc->recv_end();
00146 return err;
00147 }
00148
00149
00150
00151 static VestaSource::errorCode
00152 Lookup(SRPC* srpc, const LongId& longid, const char* arc,
00153 AccessControl::Identity who,
00154 VestaSource::typeTag& type, LongId& olongid, bool& master,
00155 Bit32& pseudoInode, ShortId& sid, time_t& timestamp, Bit8*& attribs,
00156 FP::Tag& fptag)
00157 {
00158 VestaSource::errorCode err;
00159
00160 srpc->start_call(VestaSourceSRPC::Lookup,
00161 VestaSourceSRPC::version);
00162 srpc->send_bytes((const char*) &longid.value,
00163 sizeof(longid.value));
00164 srpc->send_chars(arc);
00165 VestaSourceSRPC::send_identity(srpc, who);
00166 srpc->send_end();
00167
00168 err = GetLookupResult(srpc, type, olongid, master, pseudoInode, sid,
00169 timestamp, attribs, fptag);
00170
00171 return err;
00172 }
00173
00174 VestaSource*
00175 VDirSurrogate::LongIdLookup(LongId longid, Text host, Text port,
00176 AccessControl::Identity who)
00177 throw(SRPC::failure)
00178 {
00179 ShortId sid;
00180 VestaSource::errorCode err;
00181 VestaSource::typeTag otype;
00182 LongId olongid;
00183 bool omaster;
00184 Bit32 opseudoInode;
00185 ShortId osid;
00186 time_t otimestamp;
00187 Bit8* oattribs;
00188 FP::Tag ofptag;
00189 VestaSource* result;
00190 VDirSurrogate* sresult;
00191 SRPC* srpc;
00192
00193 VDirSurrogateInit();
00194 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host, port);
00195 err = Lookup(srpc, longid, "", who, otype, olongid, omaster, opseudoInode,
00196 osid, otimestamp, oattribs, ofptag);
00197 VestaSourceSRPC::End(id);
00198
00199 if (err != VestaSource::ok) return NULL;
00200
00201 switch (otype) {
00202 case VestaSource::immutableFile:
00203 case VestaSource::mutableFile:
00204 case VestaSource::ghost:
00205 case VestaSource::stub:
00206 case VestaSource::device:
00207 case VestaSource::immutableDirectory:
00208 case VestaSource::appendableDirectory:
00209 case VestaSource::mutableDirectory:
00210 case VestaSource::evaluatorDirectory:
00211 case VestaSource::evaluatorROEDirectory:
00212 case VestaSource::volatileDirectory:
00213 case VestaSource::volatileROEDirectory:
00214 sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host, port));
00215 result = sresult;
00216 break;
00217
00218 default:
00219 assert(false);
00220 };
00221
00222 result->rep = NULL;
00223 result->type = otype;
00224 result->longid = olongid;
00225 result->master = omaster;
00226 result->pseudoInode = opseudoInode;
00227 result->attribs = oattribs;
00228 result->fptag = ofptag;
00229
00230 return result;
00231 }
00232
00233 bool
00234 VDirSurrogate::LongIdValid(LongId longid, Text host, Text port,
00235 AccessControl::Identity who)
00236 throw(SRPC::failure)
00237 {
00238 ShortId sid;
00239 VestaSource::errorCode err;
00240 VestaSource::typeTag otype;
00241 LongId olongid;
00242 bool omaster;
00243 Bit32 opseudoInode;
00244 ShortId osid;
00245 time_t otimestamp;
00246 Bit8* oattribs;
00247 FP::Tag ofptag;
00248 VestaSource* result;
00249 VDirSurrogate* sresult;
00250 SRPC* srpc;
00251
00252 VDirSurrogateInit();
00253 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host, port);
00254 err = Lookup(srpc, longid, "", who, otype, olongid, omaster, opseudoInode,
00255 osid, otimestamp, oattribs, ofptag);
00256 VestaSourceSRPC::End(id);
00257
00258 return (err == VestaSource::ok);
00259 }
00260
00261 VestaSource*
00262 VDirSurrogate::repositoryRoot(Text host, Text port, AccessControl::Identity who
00263 ) throw (SRPC::failure)
00264 {
00265 VestaSource *ret = NEW_CONSTR(VDirSurrogate, (2, NullShortId, host, port));
00266 ret->longid = RootLongId;
00267 ret->resync(who);
00268 return ret;
00269 }
00270
00271 VestaSource*
00272 VDirSurrogate::mutableRoot(Text host, Text port, AccessControl::Identity who
00273 ) throw (SRPC::failure)
00274 {
00275 VestaSource *ret = NEW_CONSTR(VDirSurrogate, (2, NullShortId, host, port));
00276 ret->longid = MutableRootLongId;
00277 ret->resync(who);
00278 return ret;
00279 }
00280
00281 VestaSource*
00282 VDirSurrogate::volatileRoot(Text host, Text port, AccessControl::Identity who
00283 ) throw (SRPC::failure)
00284 {
00285 VestaSource *ret = NEW_CONSTR(VDirSurrogate, (2, NullShortId, host, port));
00286 ret->longid = VolatileRootLongId;
00287 ret->resync(who);
00288 return ret;
00289 }
00290
00291 VestaSource::errorCode
00292 VDirSurrogate::createVolatileDirectory(const char* hostname, Bit64 handle,
00293 VestaSource*& result,
00294 time_t timestamp,
00295 bool readOnlyExisting,
00296 Text reposHost, Text reposPort)
00297 throw (SRPC::failure)
00298 {
00299 static Text port;
00300 if (port.Empty()) {
00301 try {
00302 port = VestaConfig::get_Text("Evaluator", "EvaluatorDirSRPC_port");
00303 } catch (VestaConfig::failure f) {
00304 cerr << "createVolatileDirectory got VestaConfig::failure: "
00305 << f.msg << endl;
00306 throw;
00307 }
00308 }
00309
00310 return VDirSurrogate::createVolatileDirectory(hostname, port.cchars(),
00311 handle, result, timestamp,
00312 readOnlyExisting,
00313 reposHost, reposPort);
00314 }
00315
00316
00317 VestaSource::errorCode
00318 VDirSurrogate::createVolatileDirectory(const char* hostname, const char* port,
00319 Bit64 handle,
00320 VestaSource*& result, time_t timestamp,
00321 bool readOnlyExisting,
00322 Text reposHost, Text reposPort)
00323 throw (SRPC::failure)
00324 {
00325 VestaSource::errorCode err;
00326 VestaSource::typeTag otype;
00327 LongId olongid;
00328 bool omaster;
00329 Bit32 opseudoInode;
00330 ShortId osid;
00331 time_t otimestamp;
00332 Bit8* oattribs;
00333 FP::Tag ofptag;
00334 SRPC* srpc;
00335
00336 VDirSurrogateInit();
00337 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, reposHost, reposPort);
00338
00339
00340 if (forceWritableExisting) readOnlyExisting = false;
00341
00342 srpc->start_call(VestaSourceSRPC::CreateVolatileDirectory,
00343 VestaSourceSRPC::version);
00344 srpc->send_chars(hostname);
00345 srpc->send_chars(port);
00346 srpc->send_bytes((char*) &handle, sizeof(handle));
00347 srpc->send_int((int) timestamp);
00348 srpc->send_int((int) readOnlyExisting);
00349 srpc->send_end();
00350
00351 err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
00352 otimestamp, oattribs, ofptag);
00353 VestaSourceSRPC::End(id);
00354 if (err != VestaSource::ok) return err;
00355
00356 assert(otype == (readOnlyExisting ? VestaSource::volatileROEDirectory :
00357 VestaSource::volatileDirectory));
00358
00359 result =
00360 NEW_CONSTR(VDirSurrogate, (otimestamp, osid, reposHost, reposPort));
00361 result->rep = NULL;
00362 result->type = otype;
00363 result->longid = olongid;
00364 result->master = omaster;
00365 result->pseudoInode = opseudoInode;
00366 result->attribs = oattribs;
00367 result->fptag = ofptag;
00368
00369 return VestaSource::ok;
00370 }
00371
00372 VestaSource::errorCode
00373 VDirSurrogate::deleteVolatileDirectory(VestaSource* dir)
00374 throw (SRPC::failure)
00375 {
00376 SRPC* srpc;
00377 VDirSurrogate *vds = (VDirSurrogate *)dir;
00378 assert(dir->type == VestaSource::volatileDirectory ||
00379 dir->type == VestaSource::volatileROEDirectory);
00380 VDirSurrogateInit();
00381 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc,
00382 vds->host_, vds->port_);
00383 srpc->start_call(VestaSourceSRPC::DeleteVolatileDirectory,
00384 VestaSourceSRPC::version);
00385 srpc->send_bytes((const char*) &dir->longid.value,
00386 sizeof(dir->longid.value));
00387 srpc->send_end();
00388 VestaSource::errorCode err =
00389 (VestaSource::errorCode) srpc->recv_int();
00390 srpc->recv_end();
00391 VestaSourceSRPC::End(id);
00392 return err;
00393 }
00394
00395 void
00396 VDirSurrogate::getNFSInfo(char*& socket, LongId& root, LongId& muRoot)
00397 throw (SRPC::failure)
00398 {
00399 int len;
00400 SRPC* srpc;
00401 VDirSurrogateInit();
00402 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc);
00403 srpc->start_call(VestaSourceSRPC::GetNFSInfo,
00404 VestaSourceSRPC::version);
00405 srpc->send_end();
00406 socket = srpc->recv_chars();
00407 len = sizeof(LongId);
00408 srpc->recv_bytes_here((char *) &root.value, len);
00409 len = sizeof(LongId);
00410 srpc->recv_bytes_here((char *) &muRoot.value, len);
00411 srpc->recv_end();
00412 VestaSourceSRPC::End(id);
00413 }
00414
00415 ShortId VDirSurrogate::fpToShortId(const FP::Tag& fptag, Text host, Text port)
00416 throw (SRPC::failure)
00417 {
00418 ShortId sid;
00419 SRPC* srpc;
00420 VDirSurrogateInit();
00421 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host, port);
00422 srpc->start_call(VestaSourceSRPC::FPToShortId,
00423 VestaSourceSRPC::version);
00424 fptag.Send(*srpc);
00425 srpc->send_end();
00426
00427 sid = (ShortId)srpc->recv_int();
00428 srpc->recv_end();
00429 VestaSourceSRPC::End(id);
00430
00431 return sid;
00432 }
00433
00434 void
00435 VDirSurrogate::getUserInfo(AccessControl::IdInfo &result ,
00436 AccessControl::Identity who,
00437 AccessControl::Identity subject,
00438 Text reposHost,
00439 Text reposPort)
00440 throw (SRPC::failure)
00441 {
00442
00443 VDirSurrogateInit();
00444
00445
00446
00447 if(subject == NULL)
00448 {
00449 subject = who;
00450 }
00451
00452
00453 SRPC* srpc;
00454 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, reposHost, reposPort);
00455
00456
00457
00458 srpc->start_call(VestaSourceSRPC::GetUserInfo,
00459 VestaSourceSRPC::version);
00460 VestaSourceSRPC::send_identity(srpc, who);
00461 VestaSourceSRPC::send_identity(srpc, subject);
00462 srpc->send_end();
00463
00464
00465 srpc->recv_chars_seq(result.names);
00466
00467
00468 srpc->recv_chars_seq(result.groups);
00469
00470
00471
00472 result.unix_uid = srpc->recv_int32();
00473 result.unix_gid = srpc->recv_int32();
00474
00475
00476
00477 Basics::uint16 specials = srpc->recv_int16();
00478 result.is_root = specials & 1;
00479 result.is_admin = specials & (1 << 1);
00480 result.is_wizard = specials & (1 << 2);
00481 result.is_runtool = specials & (1 << 3);
00482
00483
00484 srpc->recv_end();
00485 VestaSourceSRPC::End(id);
00486 }
00487
00488 void
00489 VDirSurrogate::refreshAccessTables(AccessControl::Identity who,
00490 Text reposHost,
00491 Text reposPort)
00492 throw (SRPC::failure)
00493 {
00494 VDirSurrogateInit();
00495
00496 SRPC* srpc;
00497 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, reposHost, reposPort);
00498
00499 srpc->start_call(VestaSourceSRPC::RefreshAccessTables,
00500 VestaSourceSRPC::version);
00501 VestaSourceSRPC::send_identity(srpc, who);
00502 srpc->send_end();
00503
00504 srpc->recv_end();
00505 VestaSourceSRPC::End(id);
00506 }
00507
00508 VestaSource::errorCode
00509 VDirSurrogate::acquireMastership(const char* pathname,
00510 Text dstHost, Text dstPort,
00511 Text srcHost, Text srcPort,
00512 char pathnameSep,
00513 AccessControl::Identity dwho,
00514 AccessControl::Identity swho)
00515 throw (SRPC::failure)
00516 {
00517 SRPC* srpc;
00518 VDirSurrogateInit();
00519
00520 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, dstHost, dstPort);
00521 srpc->start_call(VestaSourceSRPC::AcquireMastership,
00522 VestaSourceSRPC::version);
00523 srpc->send_chars(pathname);
00524 srpc->send_chars(srcHost.cchars());
00525 srpc->send_chars(srcPort.cchars());
00526 srpc->send_int((int) pathnameSep);
00527 VestaSourceSRPC::send_identity(srpc, dwho);
00528 VestaSourceSRPC::send_identity(srpc, swho ? swho : dwho);
00529 srpc->send_end();
00530 VestaSource::errorCode err =
00531 (VestaSource::errorCode) srpc->recv_int();
00532 srpc->recv_end();
00533 VestaSourceSRPC::End(id);
00534 return err;
00535 }
00536
00537
00538 VestaSource::errorCode
00539 VDirSurrogate::replicate(const char* pathname,
00540 bool asStub, bool asGhost,
00541 Text dstHost, Text dstPort,
00542 Text srcHost, Text srcPort,
00543 char pathnameSep,
00544 AccessControl::Identity dwho,
00545 AccessControl::Identity swho)
00546 throw (SRPC::failure)
00547 {
00548 SRPC* srpc;
00549 VDirSurrogateInit();
00550
00551 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, dstHost, dstPort);
00552 srpc->start_call(VestaSourceSRPC::Replicate,
00553 VestaSourceSRPC::version);
00554 srpc->send_chars(pathname);
00555 srpc->send_int((int) asStub);
00556 srpc->send_int((int) asGhost);
00557 srpc->send_chars(srcHost.cchars());
00558 srpc->send_chars(srcPort.cchars());
00559 srpc->send_int((int) pathnameSep);
00560 VestaSourceSRPC::send_identity(srpc, dwho);
00561 VestaSourceSRPC::send_identity(srpc, swho ? swho : dwho);
00562 srpc->send_end();
00563 VestaSource::errorCode err =
00564 (VestaSource::errorCode) srpc->recv_int();
00565 srpc->recv_end();
00566 VestaSourceSRPC::End(id);
00567 return err;
00568 }
00569
00570 VestaSource::errorCode
00571 VDirSurrogate::replicateAttribs(const char* pathname,
00572 bool includeAccess,
00573 Text dstHost, Text dstPort,
00574 Text srcHost, Text srcPort,
00575 char pathnameSep,
00576 AccessControl::Identity dwho,
00577 AccessControl::Identity swho)
00578 throw (SRPC::failure)
00579 {
00580 SRPC* srpc;
00581 VDirSurrogateInit();
00582
00583 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, dstHost, dstPort);
00584 srpc->start_call(VestaSourceSRPC::ReplicateAttribs,
00585 VestaSourceSRPC::version);
00586 srpc->send_chars(pathname);
00587 srpc->send_int((int) includeAccess);
00588 srpc->send_chars(srcHost.cchars());
00589 srpc->send_chars(srcPort.cchars());
00590 srpc->send_int((int) pathnameSep);
00591 VestaSourceSRPC::send_identity(srpc, dwho);
00592 VestaSourceSRPC::send_identity(srpc, swho ? swho : dwho);
00593 srpc->send_end();
00594 VestaSource::errorCode err =
00595 (VestaSource::errorCode) srpc->recv_int();
00596 srpc->recv_end();
00597 VestaSourceSRPC::End(id);
00598 return err;
00599 }
00600
00601
00602 void
00603 VDirSurrogate::resync(AccessControl::Identity who) throw (SRPC::failure)
00604 {
00605
00606 VestaSource::errorCode err;
00607 VDirSurrogateInit();
00608 SRPC* srpc;
00609 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00610 err = Lookup(srpc, longid, "", who, type, longid, master,
00611 pseudoInode, sidCache, timestampCache, attribs, fptag);
00612 VestaSourceSRPC::End(id);
00613 assert(err == VestaSource::ok);
00614
00615 executableCache = -1;
00616 sizeCache = (Basics::uint64) -1;
00617 }
00618
00619 VestaSource::errorCode
00620 VDirSurrogate::lookup(Arc arc, VestaSource*& result,
00621 AccessControl::Identity who,
00622 unsigned int indexOffset) throw (SRPC::failure)
00623 {
00624
00625
00626 result = 0;
00627
00628 assert(indexOffset == 0);
00629 ShortId sid;
00630 VestaSource::errorCode err;
00631 VestaSource::typeTag otype;
00632 LongId olongid;
00633 bool omaster;
00634 Bit32 opseudoInode;
00635 ShortId osid;
00636 time_t otimestamp;
00637 VDirSurrogate* sresult;
00638 Bit8* oattribs;
00639 FP::Tag ofptag;
00640
00641 VDirSurrogateInit();
00642 SRPC* srpc;
00643 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00644
00645 err = Lookup(srpc, longid, arc, who, otype, olongid, omaster,
00646 opseudoInode, osid, otimestamp, oattribs, ofptag);
00647 VestaSourceSRPC::End(id);
00648 if (err != VestaSource::ok) return err;
00649
00650 switch (otype) {
00651 case VestaSource::immutableFile:
00652 case VestaSource::mutableFile:
00653 case VestaSource::ghost:
00654 case VestaSource::stub:
00655 case VestaSource::device:
00656 case VestaSource::immutableDirectory:
00657 case VestaSource::appendableDirectory:
00658 case VestaSource::mutableDirectory:
00659 case VestaSource::evaluatorDirectory:
00660 case VestaSource::evaluatorROEDirectory:
00661 case VestaSource::volatileDirectory:
00662 case VestaSource::volatileROEDirectory:
00663 sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00664 result = sresult;
00665 break;
00666
00667 default:
00668 assert(false);
00669 };
00670
00671 result->rep = NULL;
00672 result->type = otype;
00673 result->longid = olongid;
00674 result->master = omaster;
00675 result->pseudoInode = opseudoInode;
00676 result->attribs = oattribs;
00677 result->fptag = ofptag;
00678
00679 return VestaSource::ok;
00680 }
00681
00682 VestaSource::errorCode
00683 VDirSurrogate::lookupIndex(unsigned int index, VestaSource*& result,
00684 char *arcbuf) throw (SRPC::failure)
00685 {
00686
00687
00688 result = 0;
00689
00690 ShortId sid;
00691 VestaSource::errorCode err;
00692 VestaSource::typeTag otype;
00693 LongId olongid;
00694 bool omaster;
00695 Bit32 opseudoInode;
00696 ShortId osid;
00697 time_t otimestamp;
00698 VDirSurrogate* sresult;
00699 Bit8* oattribs;
00700 FP::Tag ofptag;
00701
00702 VDirSurrogateInit();
00703 SRPC* srpc;
00704 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00705
00706 srpc->start_call(VestaSourceSRPC::LookupIndex,
00707 VestaSourceSRPC::version);
00708 srpc->send_bytes((const char*) &longid.value,
00709 sizeof(longid.value));
00710 srpc->send_int((int) index);
00711 srpc->send_int((int) (arcbuf != NULL));
00712 srpc->send_end();
00713
00714 err = (VestaSource::errorCode) srpc->recv_int();
00715 if (err == VestaSource::ok) {
00716 otype = (VestaSource::typeTag) srpc->recv_int();
00717 int len = sizeof(olongid.value);
00718 srpc->recv_bytes_here((char *) &olongid.value, len);
00719 omaster = (bool) srpc->recv_int();
00720 opseudoInode = (Bit32) srpc->recv_int();
00721 osid = srpc->recv_int();
00722 otimestamp = (time_t) srpc->recv_int();
00723 oattribs = (Bit8*) (bool) srpc->recv_int();
00724 ofptag.Recv(*srpc);
00725 if (arcbuf != NULL) {
00726 int arclen = MAX_ARC_LEN + 1;
00727 srpc->recv_chars_here(arcbuf, arclen);
00728 }
00729 }
00730 srpc->recv_end();
00731
00732 VestaSourceSRPC::End(id);
00733 if (err != VestaSource::ok) return err;
00734
00735 switch (otype) {
00736 case VestaSource::immutableFile:
00737 case VestaSource::mutableFile:
00738 case VestaSource::ghost:
00739 case VestaSource::stub:
00740 case VestaSource::device:
00741 case VestaSource::immutableDirectory:
00742 case VestaSource::appendableDirectory:
00743 case VestaSource::mutableDirectory:
00744 case VestaSource::evaluatorDirectory:
00745 case VestaSource::evaluatorROEDirectory:
00746 case VestaSource::volatileDirectory:
00747 case VestaSource::volatileROEDirectory:
00748 sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00749 result = sresult;
00750 break;
00751
00752 default:
00753 assert(false);
00754 };
00755
00756 result->rep = NULL;
00757 result->type = otype;
00758 result->longid = olongid;
00759 result->master = omaster;
00760 result->pseudoInode = opseudoInode;
00761 result->attribs = oattribs;
00762 result->fptag = ofptag;
00763
00764 return VestaSource::ok;
00765 }
00766
00767 VestaSource::errorCode
00768 VDirSurrogate::lookupPathname(const char* pathname, VestaSource*& result,
00769 AccessControl::Identity who, char pathnameSep)
00770 throw (SRPC::failure)
00771 {
00772 ShortId sid;
00773 VestaSource::errorCode err;
00774 VestaSource::typeTag otype;
00775 LongId olongid;
00776 bool omaster;
00777 Bit32 opseudoInode;
00778 ShortId osid;
00779 time_t otimestamp;
00780 VDirSurrogate* sresult;
00781 Bit8* oattribs;
00782 FP::Tag ofptag;
00783
00784 VDirSurrogateInit();
00785 SRPC* srpc;
00786 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00787
00788 srpc->start_call(VestaSourceSRPC::LookupPathname,
00789 VestaSourceSRPC::version);
00790 srpc->send_bytes((const char*) &longid.value,
00791 sizeof(longid.value));
00792 srpc->send_chars(pathname);
00793 VestaSourceSRPC::send_identity(srpc, who);
00794 srpc->send_int((int) pathnameSep);
00795 srpc->send_end();
00796
00797 err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
00798 otimestamp, oattribs, ofptag);
00799 VestaSourceSRPC::End(id);
00800 if (err != VestaSource::ok) return err;
00801
00802 switch (otype) {
00803 case VestaSource::immutableFile:
00804 case VestaSource::mutableFile:
00805 case VestaSource::ghost:
00806 case VestaSource::stub:
00807 case VestaSource::device:
00808 case VestaSource::immutableDirectory:
00809 case VestaSource::appendableDirectory:
00810 case VestaSource::mutableDirectory:
00811 case VestaSource::evaluatorDirectory:
00812 case VestaSource::evaluatorROEDirectory:
00813 case VestaSource::volatileDirectory:
00814 case VestaSource::volatileROEDirectory:
00815 sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00816 result = sresult;
00817 break;
00818
00819 default:
00820 assert(false);
00821 };
00822
00823 result->rep = NULL;
00824 result->type = otype;
00825 result->longid = olongid;
00826 result->master = omaster;
00827 result->pseudoInode = opseudoInode;
00828 result->attribs = oattribs;
00829 result->fptag = ofptag;
00830
00831 return VestaSource::ok;
00832 }
00833
00834 VestaSource::errorCode
00835 VDirSurrogate::getBase(VestaSource*& result, AccessControl::Identity who)
00836 throw (SRPC::failure)
00837 {
00838 VestaSource::errorCode err;
00839 VDirSurrogateInit();
00840 SRPC* srpc;
00841 VestaSource::typeTag otype;
00842 LongId olongid;
00843 bool omaster;
00844 Bit32 opseudoInode;
00845 ShortId osid;
00846 time_t otimestamp;
00847 Bit8* oattribs;
00848 FP::Tag ofptag;
00849
00850 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00851 srpc->start_call(VestaSourceSRPC::GetBase, VestaSourceSRPC::version);
00852 srpc->send_bytes((const char*) &this->longid.value,
00853 sizeof(this->longid.value));
00854 VestaSourceSRPC::send_identity(srpc, who);
00855 srpc->send_end();
00856
00857 err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
00858 otimestamp, oattribs, ofptag);
00859
00860 VestaSourceSRPC::End(id);
00861 if (err != VestaSource::ok) return err;
00862
00863 VDirSurrogate* sresult;
00864 switch (otype) {
00865 case VestaSource::immutableFile:
00866 case VestaSource::mutableFile:
00867 case VestaSource::ghost:
00868 case VestaSource::stub:
00869 case VestaSource::device:
00870 case VestaSource::immutableDirectory:
00871 case VestaSource::appendableDirectory:
00872 case VestaSource::mutableDirectory:
00873 case VestaSource::evaluatorDirectory:
00874 case VestaSource::evaluatorROEDirectory:
00875 case VestaSource::volatileDirectory:
00876 case VestaSource::volatileROEDirectory:
00877 sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00878 result = sresult;
00879 break;
00880
00881 default:
00882 assert(false);
00883 };
00884
00885 result->rep = NULL;
00886 result->type = otype;
00887 result->longid = olongid;
00888 result->master = omaster;
00889 result->pseudoInode = opseudoInode;
00890 result->attribs = oattribs;
00891 result->fptag = ofptag;
00892
00893 return VestaSource::ok;
00894 }
00895
00896
00897 VestaSource::errorCode
00898 VDirSurrogate::list(unsigned int firstIndex,
00899 VestaSource::listCallback callback, void* closure,
00900 AccessControl::Identity who,
00901 bool deltaOnly, unsigned int indexOffset)
00902 {
00903 assert(indexOffset == 0);
00904 int curChunkSize = 0;
00905 VestaSource::errorCode err;
00906 unsigned int index = firstIndex;
00907
00908 VDirSurrogateInit();
00909 SRPC* srpc;
00910 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00911
00912
00913
00914
00915
00916 bool stop_req = false, dir_end = false;
00917
00918
00919
00920 ListResultSeq list_result;
00921 do {
00922 srpc->start_call(VestaSourceSRPC::List,
00923 VestaSourceSRPC::version);
00924 srpc->send_bytes((const char*) &longid.value,
00925 sizeof(longid.value));
00926 srpc->send_int(index);
00927 VestaSourceSRPC::send_identity(srpc, who);
00928 srpc->send_int(deltaOnly);
00929 srpc->send_int(listChunkSize);
00930 srpc->send_int(listPerEntryOverhead);
00931 srpc->send_end();
00932
00933 err = VestaSource::ok;
00934 srpc->recv_seq_start();
00935 for (;;) {
00936 ListResultItem list_item;
00937 int arclen = sizeof(list_item.arc);
00938 bool got_end;
00939 srpc->recv_chars_here(list_item.arc, arclen, &got_end);
00940 if (got_end) {
00941
00942 err = VestaSource::ok;
00943 break;
00944 }
00945 list_item.type = (VestaSource::typeTag) srpc->recv_int();
00946 index = (unsigned int) srpc->recv_int();
00947 list_item.index = index;
00948 if (list_item.type == VestaSource::unused) {
00949
00950 dir_end = true;
00951 err = (VestaSource::errorCode) index;
00952 break;
00953 }
00954 list_item.pseudoInode = (Bit32) srpc->recv_int();
00955 list_item.filesid = (ShortId) srpc->recv_int();
00956 list_item.master = (bool) srpc->recv_int();
00957
00958
00959 list_result.addhi(list_item);
00960 }
00961 srpc->recv_seq_end();
00962 srpc->recv_end();
00963 index += 2;
00964
00965
00966
00967 while((list_result.size() > 0) && !stop_req)
00968 {
00969 ListResultItem list_item = list_result.remlo();
00970
00971 curChunkSize += strlen(list_item.arc) + listPerEntryOverhead;
00972
00973 stop_req = !callback(closure, list_item.type, list_item.arc,
00974 list_item.index, list_item.pseudoInode,
00975 list_item.filesid, list_item.master);
00976 }
00977
00978 if (stop_req && listChunkSize > curChunkSize + 100)
00979 {
00980
00981 listChunkSize = curChunkSize + 100;
00982 }
00983 } while (!stop_req && !dir_end);
00984
00985 VestaSourceSRPC::End(id);
00986 return err;
00987 }
00988
00989 VestaSource::errorCode
00990 VDirSurrogate::reallyDelete(Arc arc, AccessControl::Identity who,
00991 bool existCheck, time_t timestamp)
00992 throw (SRPC::failure)
00993 {
00994 VDirSurrogateInit();
00995 SRPC* srpc;
00996 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00997
00998 srpc->start_call(VestaSourceSRPC::ReallyDelete,
00999 VestaSourceSRPC::version);
01000 srpc->send_bytes((const char*) &this->longid.value,
01001 sizeof(this->longid.value));
01002 srpc->send_chars(arc);
01003 VestaSourceSRPC::send_identity(srpc, who);
01004 srpc->send_int((int) existCheck);
01005 srpc->send_int((int) timestamp);
01006 srpc->send_end();
01007 VestaSource::errorCode err =
01008 (VestaSource::errorCode) srpc->recv_int();
01009 srpc->recv_end();
01010 VestaSourceSRPC::End(id);
01011 return err;
01012 }
01013
01014
01015 VestaSource::errorCode
01016 VDirSurrogate::insertFile(Arc arc, ShortId sid, bool master,
01017 AccessControl::Identity who,
01018 VestaSource::dupeCheck chk,
01019 VestaSource** newvs, time_t timestamp,
01020 FP::Tag* fptag) throw (SRPC::failure)
01021 {
01022 if (newvs) *newvs = NULL;
01023 VDirSurrogateInit();
01024 SRPC* srpc;
01025 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01026
01027 srpc->start_call(VestaSourceSRPC::InsertFile,
01028 VestaSourceSRPC::version);
01029 srpc->send_bytes((const char*) &this->longid.value,
01030 sizeof(this->longid.value));
01031 srpc->send_chars(arc);
01032 srpc->send_int((int) sid);
01033 srpc->send_int((int) master);
01034 VestaSourceSRPC::send_identity(srpc, who);
01035 srpc->send_int((int) chk);
01036 srpc->send_int((int) timestamp);
01037 if (fptag == NULL) {
01038 srpc->send_bytes(NULL, 0);
01039 } else {
01040 fptag->Send(*srpc);
01041 }
01042 srpc->send_end();
01043 VestaSource::errorCode err =
01044 (VestaSource::errorCode) srpc->recv_int();
01045 if (err == VestaSource::ok) {
01046 int len = sizeof(LongId);
01047 VDirSurrogate* vs =
01048 NEW_CONSTR(VDirSurrogate, (timestamp, sid, host_, port_));
01049 vs->type = VestaSource::immutableFile;
01050 srpc->recv_bytes_here((char *) &vs->longid.value, len);
01051 vs->master = master;
01052 vs->pseudoInode = (Bit32) srpc->recv_int();
01053 vs->fptag.Recv(*srpc);
01054 vs->sidCache = (ShortId) srpc->recv_int();
01055 if (newvs == NULL)
01056 delete vs;
01057 else
01058 *newvs = vs;
01059 }
01060 srpc->recv_end();
01061 VestaSourceSRPC::End(id);
01062 return err;
01063 }
01064
01065
01066 VestaSource::errorCode
01067 VDirSurrogate::insertMutableFile(Arc arc, ShortId sid, bool master,
01068 AccessControl::Identity who,
01069 VestaSource::dupeCheck chk,
01070 VestaSource** newvs, time_t timestamp)
01071 throw (SRPC::failure)
01072 {
01073 if (newvs) *newvs = NULL;
01074 VDirSurrogateInit();
01075 SRPC* srpc;
01076 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01077
01078 srpc->start_call(VestaSourceSRPC::InsertMutableFile,
01079 VestaSourceSRPC::version);
01080 srpc->send_bytes((const char*) &this->longid.value,
01081 sizeof(this->longid.value));
01082 srpc->send_chars(arc);
01083 srpc->send_int((int) sid);
01084 srpc->send_int((int) master);
01085 VestaSourceSRPC::send_identity(srpc, who);
01086 srpc->send_int((int) chk);
01087 srpc->send_int((int) timestamp);
01088 srpc->send_end();
01089 VestaSource::errorCode err =
01090 (VestaSource::errorCode) srpc->recv_int();
01091 if (err == VestaSource::ok) {
01092 int len = sizeof(LongId);
01093 VDirSurrogate* vs =
01094 NEW_CONSTR(VDirSurrogate, (timestamp, sid, host_, port_));
01095 vs->type = VestaSource::mutableFile;
01096 srpc->recv_bytes_here((char *) &vs->longid.value, len);
01097 vs->master = master;
01098 vs->pseudoInode = (Bit32) srpc->recv_int();
01099 vs->fptag.Recv(*srpc);
01100 vs->sidCache = (ShortId) srpc->recv_int();
01101 if (newvs == NULL)
01102 delete vs;
01103 else
01104 *newvs = vs;
01105 }
01106 srpc->recv_end();
01107 VestaSourceSRPC::End(id);
01108 return err;
01109 }
01110
01111
01112 VestaSource::errorCode
01113 VDirSurrogate::insertImmutableDirectory(Arc arc, VestaSource* dir,
01114 bool master,
01115 AccessControl::Identity who,
01116 VestaSource::dupeCheck chk,
01117 VestaSource** newvs, time_t timestamp,
01118 FP::Tag* fptag)
01119 throw (SRPC::failure)
01120 {
01121 if (newvs) *newvs = NULL;
01122 VDirSurrogateInit();
01123 SRPC* srpc;
01124 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01125
01126 srpc->start_call(VestaSourceSRPC::InsertImmutableDirectory,
01127 VestaSourceSRPC::version);
01128 srpc->send_bytes((const char*) &this->longid.value,
01129 sizeof(this->longid.value));
01130 srpc->send_chars(arc);
01131 if (dir != NULL) {
01132 srpc->send_bytes((const char*) &dir->longid.value,
01133 sizeof(dir->longid.value));
01134 } else {
01135 srpc->send_bytes((const char*) &NullLongId.value,
01136 sizeof(NullLongId.value));
01137 }
01138 srpc->send_int((int) master);
01139 VestaSourceSRPC::send_identity(srpc, who);
01140 srpc->send_int((int) chk);
01141 srpc->send_int((int) timestamp);
01142 if (fptag == NULL) {
01143 srpc->send_bytes(NULL, 0);
01144 } else {
01145 fptag->Send(*srpc);
01146 }
01147 srpc->send_end();
01148 VestaSource::errorCode err =
01149 (VestaSource::errorCode) srpc->recv_int();
01150 if (err == VestaSource::ok) {
01151 int len = sizeof(LongId);
01152 VDirSurrogate* vs =
01153 NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01154 vs->type = VestaSource::immutableDirectory;
01155 srpc->recv_bytes_here((char *) &vs->longid.value, len);
01156 vs->master = master;
01157 vs->pseudoInode = srpc->recv_int();
01158 vs->fptag.Recv(*srpc);
01159 vs->sidCache = srpc->recv_int();
01160 if (newvs == NULL)
01161 delete vs;
01162 else
01163 *newvs = vs;
01164 }
01165 srpc->recv_end();
01166 VestaSourceSRPC::End(id);
01167 return err;
01168 }
01169
01170 VestaSource::errorCode
01171 VDirSurrogate::insertAppendableDirectory(Arc arc, bool master,
01172 AccessControl::Identity who,
01173 VestaSource::dupeCheck chk,
01174 VestaSource** newvs, time_t timestamp)
01175 throw (SRPC::failure)
01176 {
01177 if (newvs) *newvs = NULL;
01178 VDirSurrogateInit();
01179 SRPC* srpc;
01180 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01181
01182 srpc->start_call(VestaSourceSRPC::InsertAppendableDirectory,
01183 VestaSourceSRPC::version);
01184 srpc->send_bytes((const char*) &this->longid.value,
01185 sizeof(this->longid.value));
01186 srpc->send_chars(arc);
01187 srpc->send_int((int) master);
01188 VestaSourceSRPC::send_identity(srpc, who);
01189 srpc->send_int((int) chk);
01190 srpc->send_int((int) timestamp);
01191 srpc->send_end();
01192 VestaSource::errorCode err =
01193 (VestaSource::errorCode) srpc->recv_int();
01194 if (err == VestaSource::ok) {
01195 int len = sizeof(LongId);
01196 VDirSurrogate* vs =
01197 NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01198 vs->type = VestaSource::appendableDirectory;
01199 srpc->recv_bytes_here((char *) &vs->longid.value, len);
01200 vs->master = master;
01201 vs->pseudoInode = srpc->recv_int();
01202 vs->fptag.Recv(*srpc);
01203 if (newvs == NULL)
01204 delete vs;
01205 else
01206 *newvs = vs;
01207 }
01208 srpc->recv_end();
01209 VestaSourceSRPC::End(id);
01210 return err;
01211 }
01212
01213
01214 VestaSource::errorCode
01215 VDirSurrogate::insertMutableDirectory(Arc arc, VestaSource* dir,
01216 bool master,
01217 AccessControl::Identity who,
01218 VestaSource::dupeCheck chk,
01219 VestaSource** newvs, time_t timestamp)
01220 throw (SRPC::failure)
01221 {
01222 if (newvs) *newvs = NULL;
01223 VDirSurrogateInit();
01224 SRPC* srpc;
01225 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01226
01227 srpc->start_call(VestaSourceSRPC::InsertMutableDirectory,
01228 VestaSourceSRPC::version);
01229 srpc->send_bytes((const char*) &this->longid.value,
01230 sizeof(this->longid.value));
01231 srpc->send_chars(arc);
01232 if (dir != NULL) {
01233 srpc->send_bytes((const char*) &dir->longid.value,
01234 sizeof(dir->longid.value));
01235 } else {
01236 srpc->send_bytes((const char*) &NullLongId.value,
01237 sizeof(NullLongId.value));
01238 }
01239 srpc->send_int((int) master);
01240 VestaSourceSRPC::send_identity(srpc, who);
01241 srpc->send_int((int) chk);
01242 srpc->send_int((int) timestamp);
01243 srpc->send_end();
01244 VestaSource::errorCode err =
01245 (VestaSource::errorCode) srpc->recv_int();
01246 if (err == VestaSource::ok) {
01247 int len = sizeof(LongId);
01248 VDirSurrogate* vs =
01249 NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01250 vs->type = VestaSource::mutableDirectory;
01251 srpc->recv_bytes_here((char *) &vs->longid.value, len);
01252 vs->master = master;
01253 vs->pseudoInode = srpc->recv_int();
01254 vs->fptag.Recv(*srpc);
01255 if (newvs == NULL)
01256 delete vs;
01257 else
01258 *newvs = vs;
01259 }
01260 srpc->recv_end();
01261 VestaSourceSRPC::End(id);
01262 return err;
01263 }
01264
01265 VestaSource::errorCode
01266 VDirSurrogate::insertGhost(Arc arc, bool master, AccessControl::Identity who,
01267 VestaSource::dupeCheck chk, VestaSource** newvs,
01268 time_t timestamp)
01269 throw (SRPC::failure)
01270 {
01271 if (newvs) *newvs = NULL;
01272 VDirSurrogateInit();
01273 SRPC* srpc;
01274 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01275
01276 srpc->start_call(VestaSourceSRPC::InsertGhost,
01277 VestaSourceSRPC::version);
01278 srpc->send_bytes((const char*) &this->longid.value,
01279 sizeof(this->longid.value));
01280 srpc->send_chars(arc);
01281 srpc->send_int((int) master);
01282 VestaSourceSRPC::send_identity(srpc, who);
01283 srpc->send_int((int) chk);
01284 srpc->send_int((int) timestamp);
01285 srpc->send_end();
01286 VestaSource::errorCode err =
01287 (VestaSource::errorCode) srpc->recv_int();
01288 if (err == VestaSource::ok) {
01289 int len = sizeof(LongId);
01290 VDirSurrogate* vs =
01291 NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01292 vs->type = VestaSource::ghost;
01293 srpc->recv_bytes_here((char *) &vs->longid.value, len);
01294 vs->master = master;
01295 vs->pseudoInode = srpc->recv_int();
01296 vs->fptag.Recv(*srpc);
01297 if (newvs == NULL)
01298 delete vs;
01299 else
01300 *newvs = vs;
01301 }
01302 srpc->recv_end();
01303 VestaSourceSRPC::End(id);
01304 return err;
01305 }
01306
01307 VestaSource::errorCode
01308 VDirSurrogate::insertStub(Arc arc, bool master, AccessControl::Identity who,
01309 VestaSource::dupeCheck chk,
01310 VestaSource** newvs, time_t timestamp)
01311 throw (SRPC::failure)
01312 {
01313 if (newvs) *newvs = NULL;
01314 VDirSurrogateInit();
01315 SRPC* srpc;
01316 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01317
01318 srpc->start_call(VestaSourceSRPC::InsertStub,
01319 VestaSourceSRPC::version);
01320 srpc->send_bytes((const char*) &this->longid.value,
01321 sizeof(this->longid.value));
01322 srpc->send_chars(arc);
01323 srpc->send_int((int) master);
01324 VestaSourceSRPC::send_identity(srpc, who);
01325 srpc->send_int((int) chk);
01326 srpc->send_int((int) timestamp);
01327 srpc->send_end();
01328 VestaSource::errorCode err =
01329 (VestaSource::errorCode) srpc->recv_int();
01330 if (err == VestaSource::ok) {
01331 int len = sizeof(LongId);
01332 VDirSurrogate* vs =
01333 NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01334 vs->type = VestaSource::stub;
01335 srpc->recv_bytes_here((char *) &vs->longid.value, len);
01336 vs->master = master;
01337 vs->pseudoInode = srpc->recv_int();
01338 vs->fptag.Recv(*srpc);
01339 if (newvs == NULL)
01340 delete vs;
01341 else
01342 *newvs = vs;
01343 }
01344 srpc->recv_end();
01345 VestaSourceSRPC::End(id);
01346 return err;
01347 }
01348
01349
01350 VestaSource::errorCode
01351 VDirSurrogate::renameTo(Arc arc, VestaSource* fromDir, Arc fromArc,
01352 AccessControl::Identity who,
01353 VestaSource::dupeCheck chk, time_t timestamp)
01354 throw (SRPC::failure)
01355 {
01356 VDirSurrogateInit();
01357 SRPC* srpc;
01358 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01359
01360 srpc->start_call(VestaSourceSRPC::RenameTo,
01361 VestaSourceSRPC::version);
01362 srpc->send_bytes((const char*) &this->longid.value,
01363 sizeof(this->longid.value));
01364 srpc->send_chars(arc);
01365 srpc->send_bytes((const char*) &fromDir->longid.value,
01366 sizeof(fromDir->longid.value));
01367 srpc->send_chars(fromArc);
01368 VestaSourceSRPC::send_identity(srpc, who);
01369 srpc->send_int((int) chk);
01370 srpc->send_int((int) timestamp);
01371 srpc->send_end();
01372 VestaSource::errorCode err =
01373 (VestaSource::errorCode) srpc->recv_int();
01374 srpc->recv_end();
01375 VestaSourceSRPC::End(id);
01376 return err;
01377 }
01378
01379
01380 VestaSource::errorCode
01381 VDirSurrogate::makeMutable(VestaSource*& result, ShortId sid,
01382 Basics::uint64 copyMax, AccessControl::Identity who)
01383 throw (SRPC::failure)
01384 {
01385 VestaSource::errorCode err;
01386 VestaSource::typeTag otype;
01387 LongId olongid;
01388 bool omaster;
01389 Bit32 opseudoInode;
01390 ShortId osid;
01391 time_t otimestamp;
01392 Bit8* oattribs;
01393 FP::Tag ofptag;
01394
01395 VDirSurrogateInit();
01396 SRPC* srpc;
01397 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01398
01399
01400 srpc->start_call(VestaSourceSRPC::MakeMutable,
01401 VestaSourceSRPC::version);
01402 srpc->send_bytes((const char*) &this->longid.value,
01403 sizeof(this->longid.value));
01404 srpc->send_int((int) sid);
01405 srpc->send_int((int) (copyMax & 0xffffffff));
01406 srpc->send_int((int) (copyMax >> 32));
01407 VestaSourceSRPC::send_identity(srpc, who);
01408 srpc->send_end();
01409
01410 err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
01411 otimestamp, oattribs, ofptag);
01412
01413 VestaSourceSRPC::End(id);
01414 if (err != VestaSource::ok) return err;
01415
01416 VDirSurrogate* sresult;
01417 switch (otype) {
01418 case VestaSource::immutableFile:
01419 case VestaSource::mutableFile:
01420 case VestaSource::ghost:
01421 case VestaSource::stub:
01422 case VestaSource::device:
01423 case VestaSource::immutableDirectory:
01424 case VestaSource::appendableDirectory:
01425 case VestaSource::mutableDirectory:
01426 case VestaSource::evaluatorDirectory:
01427 case VestaSource::evaluatorROEDirectory:
01428 case VestaSource::volatileDirectory:
01429 case VestaSource::volatileROEDirectory:
01430 sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
01431 result = sresult;
01432 break;
01433
01434 default:
01435 assert(false);
01436 };
01437
01438 result->rep = NULL;
01439 result->type = otype;
01440 result->longid = olongid;
01441 result->master = omaster;
01442 result->pseudoInode = opseudoInode;
01443 result->attribs = oattribs;
01444 result->fptag = ofptag;
01445
01446 return VestaSource::ok;
01447 }
01448
01449
01450 bool
01451 VDirSurrogate::inAttribs(const char* name, const char* value)
01452 throw (SRPC::failure)
01453 {
01454 bool retval;
01455
01456 VDirSurrogateInit();
01457 SRPC* srpc;
01458 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01459
01460 srpc->start_call(VestaSourceSRPC::InAttribs,
01461 VestaSourceSRPC::version);
01462 srpc->send_bytes((const char*) &longid.value,
01463 sizeof(longid.value));
01464 srpc->send_chars(name);
01465 srpc->send_chars(value);
01466 srpc->send_end();
01467
01468 retval = (bool) srpc->recv_int();
01469 srpc->recv_end();
01470 VestaSourceSRPC::End(id);
01471
01472 return retval;
01473 }
01474
01475 char*
01476 VDirSurrogate::getAttrib(const char* name)
01477 throw (SRPC::failure)
01478 {
01479 char* value;
01480 bool null;
01481
01482 VDirSurrogateInit();
01483 SRPC* srpc;
01484 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01485
01486 srpc->start_call(VestaSourceSRPC::GetAttrib,
01487 VestaSourceSRPC::version);
01488 srpc->send_bytes((const char*) &longid.value,
01489 sizeof(longid.value));
01490 srpc->send_chars(name);
01491 srpc->send_end();
01492
01493 null = (bool) srpc->recv_int();
01494 value = srpc->recv_chars();
01495 srpc->recv_end();
01496 VestaSourceSRPC::End(id);
01497
01498 if (null) {
01499 return NULL;
01500 } else {
01501 return value;
01502 }
01503 }
01504
01505 void
01506 VDirSurrogate::getAttrib(const char* name,
01507 VestaSource::valueCallback cb, void* cl)
01508 {
01509 char* value;
01510
01511 VDirSurrogateInit();
01512 SRPC* srpc;
01513 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01514
01515 srpc->start_call(VestaSourceSRPC::GetAttrib2,
01516 VestaSourceSRPC::version);
01517 srpc->send_bytes((const char*) &longid.value,
01518 sizeof(longid.value));
01519 srpc->send_chars(name);
01520 srpc->send_end();
01521
01522 srpc->recv_seq_start();
01523 AttribResultSeq l_result;
01524 for (;;) {
01525 bool got_end;
01526 value = srpc->recv_chars(&got_end);
01527 if (got_end) break;
01528 l_result.addhi(value);
01529 }
01530 srpc->recv_seq_end();
01531 srpc->recv_end();
01532 VestaSourceSRPC::End(id);
01533
01534 bool cont = true;
01535 while(l_result.size() > 0)
01536 {
01537 char* value = l_result.remlo();
01538 if (cont) cont = cb(cl, value);
01539 delete [] value;
01540 }
01541 }
01542
01543 void
01544 VDirSurrogate::listAttribs(VestaSource::valueCallback cb, void* cl)
01545 {
01546 VDirSurrogateInit();
01547 SRPC* srpc;
01548 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01549
01550 srpc->start_call(VestaSourceSRPC::ListAttribs,
01551 VestaSourceSRPC::version);
01552 srpc->send_bytes((const char*) &longid.value,
01553 sizeof(longid.value));
01554 srpc->send_end();
01555
01556 srpc->recv_seq_start();
01557 AttribResultSeq l_result;
01558 for (;;) {
01559 bool got_end;
01560 char* value = srpc->recv_chars(&got_end);
01561 if (got_end) break;
01562 l_result.addhi(value);
01563 }
01564 srpc->recv_seq_end();
01565 srpc->recv_end();
01566 VestaSourceSRPC::End(id);
01567
01568 bool cont = true;
01569 while(l_result.size() > 0)
01570 {
01571 char* value = l_result.remlo();
01572 if (cont) cont = cb(cl, value);
01573 delete [] value;
01574 }
01575 }
01576
01577 void
01578 VDirSurrogate::getAttribHistory(VestaSource::historyCallback cb, void* cl)
01579 {
01580 VDirSurrogateInit();
01581 SRPC* srpc;
01582 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01583
01584 srpc->start_call(VestaSourceSRPC::GetAttribHistory,
01585 VestaSourceSRPC::version);
01586 srpc->send_bytes((const char*) &longid.value,
01587 sizeof(longid.value));
01588 srpc->send_end();
01589
01590 srpc->recv_seq_start();
01591 AttribHistoryResultSeq l_result;
01592 for (;;) {
01593 bool got_end;
01594 AttribHistoryResultItem l_item;
01595 l_item.op = (VestaSource::attribOp) srpc->recv_int(&got_end);
01596 if (got_end) break;
01597 l_item.name = srpc->recv_chars();
01598 l_item.value = srpc->recv_chars();
01599 l_item.timestamp = (time_t) srpc->recv_int();
01600 l_result.addhi(l_item);
01601 }
01602 srpc->recv_seq_end();
01603 srpc->recv_end();
01604 VestaSourceSRPC::End(id);
01605
01606 bool cont = true;
01607 while(l_result.size() > 0)
01608 {
01609 AttribHistoryResultItem l_item = l_result.remlo();
01610 if (cont) cont = cb(cl,
01611 l_item.op, l_item.name, l_item.value,
01612 l_item.timestamp);
01613 delete [] l_item.name;
01614 delete [] l_item.value;
01615 }
01616 }
01617
01618 VestaSource::errorCode
01619 VDirSurrogate::writeAttrib(VestaSource::attribOp op, const char* name,
01620 const char* value, AccessControl::Identity who,
01621 time_t timestamp)
01622 throw (SRPC::failure)
01623 {
01624 VDirSurrogateInit();
01625 SRPC* srpc;
01626 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01627
01628 srpc->start_call(VestaSourceSRPC::WriteAttrib,
01629 VestaSourceSRPC::version);
01630 srpc->send_bytes((const char*) &longid.value,
01631 sizeof(longid.value));
01632 srpc->send_int((int) op);
01633 srpc->send_chars(name);
01634 srpc->send_chars(value);
01635 VestaSourceSRPC::send_identity(srpc, who);
01636 srpc->send_int((int) timestamp);
01637 srpc->send_end();
01638 VestaSource::errorCode err =
01639 (VestaSource::errorCode) srpc->recv_int();
01640 srpc->recv_end();
01641 VestaSourceSRPC::End(id);
01642 return err;
01643 }
01644
01645
01646 VestaSource::errorCode
01647 VDirSurrogate::makeFilesImmutable(unsigned int threshold,
01648 AccessControl::Identity who)
01649 throw (SRPC::failure)
01650 {
01651 VestaSource::errorCode err;
01652 VDirSurrogateInit();
01653 SRPC* srpc;
01654 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01655
01656 srpc->start_call(VestaSourceSRPC::MakeFilesImmutable,
01657 VestaSourceSRPC::version);
01658 srpc->send_bytes((const char*) &longid.value,
01659 sizeof(longid.value));
01660 srpc->send_int((int) threshold);
01661 VestaSourceSRPC::send_identity(srpc, who);
01662 srpc->send_end();
01663 err = (VestaSource::errorCode) srpc->recv_int();
01664 srpc->recv_end();
01665 VestaSourceSRPC::End(id);
01666 return err;
01667 }
01668
01669 VestaSource::errorCode
01670 VDirSurrogate::setIndexMaster(unsigned int index, bool state,
01671 AccessControl::Identity who)
01672 throw (SRPC::failure)
01673 {
01674 VestaSource::errorCode err;
01675 VDirSurrogateInit();
01676 SRPC* srpc;
01677 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01678
01679 srpc->start_call(VestaSourceSRPC::SetIndexMaster,
01680 VestaSourceSRPC::version);
01681 srpc->send_bytes((const char*) &longid.value,
01682 sizeof(longid.value));
01683 srpc->send_int((int) index);
01684 srpc->send_int((int) state);
01685 VestaSourceSRPC::send_identity(srpc, who);
01686 srpc->send_end();
01687 err = (VestaSource::errorCode) srpc->recv_int();
01688 srpc->recv_end();
01689 VestaSourceSRPC::End(id);
01690 return err;
01691 }
01692
01693 VestaSource::errorCode
01694 VDirSurrogate::setMaster(bool state, AccessControl::Identity who)
01695 throw (SRPC::failure)
01696 {
01697 unsigned int index;
01698 LongId plongid = longid.getParent(&index);
01699
01700 VestaSource::errorCode err;
01701 VDirSurrogateInit();
01702 SRPC* srpc;
01703 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01704
01705 srpc->start_call(VestaSourceSRPC::SetIndexMaster,
01706 VestaSourceSRPC::version);
01707 srpc->send_bytes((const char*) &plongid.value,
01708 sizeof(plongid.value));
01709 srpc->send_int((int) index);
01710 srpc->send_int((int) state);
01711 VestaSourceSRPC::send_identity(srpc, who);
01712 srpc->send_end();
01713 err = (VestaSource::errorCode) srpc->recv_int();
01714 srpc->recv_end();
01715 VestaSourceSRPC::End(id);
01716 if (err == VestaSource::ok) {
01717 master = state;
01718 }
01719 return err;
01720 }
01721
01722 VestaSource::errorCode
01723 VDirSurrogate::cedeMastership(const char* requestid, const char** grantidOut,
01724 AccessControl::Identity who)
01725 throw (SRPC::failure)
01726 {
01727 *grantidOut = NULL;
01728 VestaSource::errorCode err;
01729 VDirSurrogateInit();
01730 SRPC* srpc;
01731 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01732
01733 srpc->start_call(VestaSourceSRPC::CedeMastership,
01734 VestaSourceSRPC::version);
01735 srpc->send_bytes((const char*) &longid.value, sizeof(longid.value));
01736 srpc->send_chars(requestid);
01737 VestaSourceSRPC::send_identity(srpc, who);
01738 srpc->send_end();
01739 err = (VestaSource::errorCode) srpc->recv_int();
01740 if (err == VestaSource::ok) {
01741 *grantidOut = srpc->recv_chars();
01742 }
01743 srpc->recv_end();
01744 VestaSourceSRPC::End(id);
01745 return err;
01746 }
01747
01748 VestaSource::errorCode
01749 VDirSurrogate::read(void* buffer, int* nbytes,
01750 Basics::uint64 offset, AccessControl::Identity who)
01751 throw (SRPC::failure)
01752 {
01753 VestaSource::errorCode err;
01754 VDirSurrogateInit();
01755 SRPC* srpc;
01756 MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01757
01758 srpc->start_call(VestaSourceSRPC::Read,
01759 VestaSourceSRPC::version);
01760 srpc->send_bytes((const char*) &longid.value,
01761 sizeof(longid.value));
01762 srpc->send_int(*nbytes);
01763 srpc->send_int(offset & 0xffffffff);
01764 srpc->send_int(offset >> 32ul);
01765 VestaSourceSRPC::send_identity(srpc, who);
01766 srpc->send_end();
01767 err = (VestaSource::errorCode) srpc->recv_int();
01768 if (err == VestaSource::ok) {
01769 srpc->recv_bytes_here((char *)buffer, *nbytes);
01770 }
01771 srpc->recv_end();
01772 VestaSourceSRPC::End(id);
01773 return err;
01774 }
01775
01776 static void
01777 DoStat(VDirSurrogate* thiz, time_t& timestampCache, int& executableCache,
01778 Basics::uint64& sizeCache)
01779 throw (SRPC::failure)
01780 {
01781 VestaSource::errorCode err;
01782 VDirSurrogateInit();
01783 SRPC* srpc;
01784 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc,
01785 thiz->host(), thiz->port());
01786
01787 srpc->start_call(VestaSourceSRPC::Stat,
01788 VestaSourceSRPC::version);
01789 srpc->send_bytes((const char*) &thiz->longid.value,
01790 sizeof(thiz->longid.value));
01791 srpc->send_end();
01792 err = (VestaSource::errorCode) srpc->recv_int();
01793 timestampCache = (time_t) srpc->recv_int();
01794 executableCache = srpc->recv_int();
01795 sizeCache = (unsigned int) srpc->recv_int();
01796 sizeCache += ((Basics::uint64) srpc->recv_int()) << 32;
01797 srpc->recv_end();
01798 VestaSourceSRPC::End(id);
01799 }
01800
01801 bool
01802 VDirSurrogate::executable()
01803 throw (SRPC::failure)
01804 {
01805 if (executableCache == -1) {
01806 DoStat(this, timestampCache, executableCache, sizeCache);
01807 }
01808 return executableCache;
01809 }
01810
01811 Basics::uint64
01812 VDirSurrogate::size()
01813 throw (SRPC::failure)
01814 {
01815 if (sizeCache == (Basics::uint64) -1) {
01816 DoStat(this, timestampCache, executableCache, sizeCache);
01817 }
01818 return sizeCache;
01819 }
01820
01821 time_t
01822 VDirSurrogate::timestamp()
01823 throw (SRPC::failure)
01824 {
01825 if (timestampCache == (time_t) -1) {
01826 DoStat(this, timestampCache, executableCache, sizeCache);
01827 }
01828 return timestampCache;
01829 }
01830
01831 VestaSource::errorCode
01832 VDirSurrogate::write(const void* buffer, int* nbytes,
01833 Basics::uint64 offset, AccessControl::Identity who)
01834 throw (SRPC::failure)
01835 {
01836 VestaSource::errorCode err;
01837 VDirSurrogateInit();
01838 SRPC* srpc;
01839 MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01840
01841 srpc->start_call(VestaSourceSRPC::Write,
01842 VestaSourceSRPC::version);
01843 srpc->send_bytes((const char*) &longid.value,
01844 sizeof(longid.value));
01845 srpc->send_int(offset & 0xffffffff);
01846 srpc->send_int(offset >> 32ul);
01847 srpc->send_bytes((const char*)buffer, *nbytes);
01848 VestaSourceSRPC::send_identity(srpc, who);
01849 srpc->send_end();
01850 err = (VestaSource::errorCode) srpc->recv_int();
01851 if (err == VestaSource::ok) {
01852 *nbytes = srpc->recv_int();
01853 timestampCache = -1;
01854 sizeCache = (Basics::uint64) -1;
01855 }
01856 srpc->recv_end();
01857 VestaSourceSRPC::End(id);
01858 return err;
01859 }
01860
01861 VestaSource::errorCode
01862 VDirSurrogate::setExecutable(bool x, AccessControl::Identity who)
01863 throw (SRPC::failure)
01864 {
01865 VestaSource::errorCode err;
01866 VDirSurrogateInit();
01867 SRPC* srpc;
01868 MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01869
01870 srpc->start_call(VestaSourceSRPC::SetExecutable,
01871 VestaSourceSRPC::version);
01872 srpc->send_bytes((const char*) &longid.value,
01873 sizeof(longid.value));
01874 srpc->send_int((int) x);
01875 VestaSourceSRPC::send_identity(srpc, who);
01876 srpc->send_end();
01877 err = (VestaSource::errorCode) srpc->recv_int();
01878 if (err == VestaSource::ok) {
01879 executableCache = x;
01880 }
01881 srpc->recv_end();
01882 VestaSourceSRPC::End(id);
01883 return err;
01884 }
01885
01886 VestaSource::errorCode
01887 VDirSurrogate::setSize(Basics::uint64 s, AccessControl::Identity who)
01888 throw (SRPC::failure)
01889 {
01890 VestaSource::errorCode err;
01891 VDirSurrogateInit();
01892 SRPC* srpc;
01893 MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01894
01895 srpc->start_call(VestaSourceSRPC::SetSize,
01896 VestaSourceSRPC::version);
01897 srpc->send_bytes((const char*) &longid.value,
01898 sizeof(longid.value));
01899 srpc->send_int((int)(s & 0xffffffff));
01900 srpc->send_int((int)(s >> 32));
01901 VestaSourceSRPC::send_identity(srpc, who);
01902 srpc->send_end();
01903 err = (VestaSource::errorCode) srpc->recv_int();
01904 if (err == VestaSource::ok) {
01905 sizeCache = s;
01906 timestampCache = (time_t) -1;
01907 }
01908 srpc->recv_end();
01909 VestaSourceSRPC::End(id);
01910 return err;
01911 }
01912
01913 VestaSource::errorCode
01914 VDirSurrogate::setTimestamp(time_t ts, AccessControl::Identity who)
01915 throw (SRPC::failure)
01916 {
01917 VestaSource::errorCode err;
01918 VDirSurrogateInit();
01919 SRPC* srpc;
01920 MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01921
01922 srpc->start_call(VestaSourceSRPC::SetTimestamp,
01923 VestaSourceSRPC::version);
01924 srpc->send_bytes((const char*) &longid.value,
01925 sizeof(longid.value));
01926 srpc->send_int((int) ts);
01927 VestaSourceSRPC::send_identity(srpc, who);
01928 srpc->send_end();
01929 err = (VestaSource::errorCode) srpc->recv_int();
01930 srpc->recv_end();
01931 VestaSourceSRPC::End(id);
01932 if (err == VestaSource::ok) {
01933 timestampCache = ts;
01934 }
01935 return err;
01936 }
01937
01938 VestaSource::errorCode
01939 VDirSurrogate::measureDirectory(VestaSource::directoryStats &result,
01940 AccessControl::Identity who)
01941 throw (SRPC::failure)
01942 {
01943 VestaSource::errorCode err;
01944 VDirSurrogateInit();
01945 SRPC* srpc;
01946 MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01947
01948 srpc->start_call(VestaSourceSRPC::MeasureDirectory,
01949 VestaSourceSRPC::version);
01950 srpc->send_bytes((const char*) &longid.value,
01951 sizeof(longid.value));
01952 VestaSourceSRPC::send_identity(srpc, who);
01953 srpc->send_end();
01954 err = (VestaSource::errorCode) srpc->recv_int();
01955 if (err == VestaSource::ok)
01956 {
01957 result.baseChainLength = srpc->recv_int32();
01958 result.usedEntryCount = srpc->recv_int32();
01959 result.usedEntrySize = srpc->recv_int32();
01960 result.totalEntryCount = srpc->recv_int32();
01961 result.totalEntrySize = srpc->recv_int32();
01962 }
01963 srpc->recv_end();
01964 VestaSourceSRPC::End(id);
01965 return err;
01966 }
01967
01968 VestaSource::errorCode
01969 VDirSurrogate::collapseBase(AccessControl::Identity who)
01970 throw (SRPC::failure)
01971 {
01972 VestaSource::errorCode err;
01973 VDirSurrogateInit();
01974 SRPC* srpc;
01975 MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01976
01977 srpc->start_call(VestaSourceSRPC::CollapseBase,
01978 VestaSourceSRPC::version);
01979 srpc->send_bytes((const char*) &longid.value,
01980 sizeof(longid.value));
01981 VestaSourceSRPC::send_identity(srpc, who);
01982 srpc->send_end();
01983 err = (VestaSource::errorCode) srpc->recv_int();
01984 srpc->recv_end();
01985 VestaSourceSRPC::End(id);
01986 return err;
01987 }
01988
01989 VestaSource::errorCode
01990 VDirSurrogate::readWhole(std::ostream &out, AccessControl::Identity who)
01991 throw (SRPC::failure)
01992 {
01993
01994 static Basics::int16 compression_methods[] = {
01995 VestaSourceSRPC::compress_zlib_deflate
01996 };
01997 static unsigned int compression_method_count =
01998 (sizeof(compression_methods) / sizeof(compression_methods[0]));
01999
02000 VestaSource::errorCode err;
02001 VDirSurrogateInit();
02002 SRPC* srpc;
02003 MultiSRPC::ConnId id;
02004
02005 id = VestaSourceSRPC::Start(srpc, host_, port_);
02006
02007 srpc->start_call(VestaSourceSRPC::ReadWholeCompressed,
02008 VestaSourceSRPC::version);
02009 srpc->send_bytes((const char*) &longid.value,
02010 sizeof(longid.value));
02011 VestaSourceSRPC::send_identity(srpc, who);
02012 srpc->send_int16_array(compression_methods, compression_method_count);
02013 srpc->send_int32(readWhole_recv_bufsiz);
02014 srpc->send_end();
02015 err = (VestaSource::errorCode) srpc->recv_int();
02016 if (err == VestaSource::ok)
02017 {
02018 Basics::int16 method_used = srpc->recv_int16();
02019 if(method_used != VestaSourceSRPC::compress_zlib_deflate)
02020 srpc->send_failure(SRPC::not_implemented,
02021 "unsupported compression method");
02022
02023
02024
02025 char *recv_buf = 0, *inflate_buf = 0;
02026
02027
02028 z_stream zstrm;
02029 zstrm.zalloc = Z_NULL;
02030 zstrm.zfree = Z_NULL;
02031 zstrm.opaque = Z_NULL;
02032 zstrm.avail_in = 0;
02033 zstrm.next_in = Z_NULL;
02034 if (inflateInit(&zstrm) != Z_OK)
02035 srpc->send_failure(SRPC::internal_trouble,
02036 "zlib inflateInit failed");
02037
02038 try
02039 {
02040 srpc->recv_seq_start();
02041 recv_buf = NEW_PTRFREE_ARRAY(char, readWhole_recv_bufsiz);
02042 inflate_buf = NEW_PTRFREE_ARRAY(char, readWhole_inflate_bufsiz);
02043 while(1)
02044 {
02045 bool got_end;
02046 int count = readWhole_recv_bufsiz;
02047 srpc->recv_bytes_here(recv_buf, count, &got_end);
02048
02049 if(got_end) break;
02050
02051
02052
02053 zstrm.next_in = (Bytef *) recv_buf;
02054 zstrm.avail_in = count;
02055 do
02056 {
02057
02058 zstrm.avail_out = readWhole_inflate_bufsiz;
02059 zstrm.next_out = (Bytef *) inflate_buf;
02060
02061
02062 int inf_status = inflate(&zstrm, Z_NO_FLUSH);
02063
02064
02065 switch (inf_status)
02066 {
02067
02068
02069
02070 #define INFLATE_ECASE(val) case val:\
02071 srpc->send_failure(SRPC::internal_trouble,\
02072 "zlib inflate error: " #val)
02073 INFLATE_ECASE(Z_STREAM_ERROR);
02074 INFLATE_ECASE(Z_NEED_DICT);
02075 INFLATE_ECASE(Z_DATA_ERROR);
02076 INFLATE_ECASE(Z_MEM_ERROR);
02077 }
02078
02079
02080
02081 int bytes = readWhole_inflate_bufsiz - zstrm.avail_out;
02082 if(bytes > 0)
02083
02084 out.write(inflate_buf, bytes);
02085 }
02086
02087
02088 while(zstrm.avail_out == 0);
02089 }
02090
02091
02092 srpc->recv_seq_end();
02093 }
02094 catch(...)
02095 {
02096
02097 (void)inflateEnd(&zstrm);
02098 if(recv_buf) delete [] recv_buf;
02099 if(inflate_buf) delete [] inflate_buf;
02100 throw;
02101 }
02102
02103
02104
02105 (void)inflateEnd(&zstrm);
02106 if(recv_buf) delete [] recv_buf;
02107 if(inflate_buf) delete [] inflate_buf;
02108 }
02109
02110 srpc->recv_end();
02111 VestaSourceSRPC::End(id);
02112 return err;
02113 }
02114
02115 VestaSource *VDirSurrogate::copy() throw()
02116 {
02117 VestaSource *result = NEW_CONSTR(VDirSurrogate, (*this));
02118 return result;
02119 }