Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

VDirSurrogate.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 //
00020 // VDirSurrogate.C
00021 //
00022 // Remote surrogate for a VestaSource.  Used outside the repository
00023 // address space (including in a different repository).
00024 //
00025 
00026 #include <pthread.h>
00027 #include <assert.h>
00028 #include <pwd.h>
00029 #include <zlib.h> // Used for readWhole, which gets compressed data from the server
00030 #include "VestaSource.H"
00031 #include "VestaSourceSRPC.H"
00032 #include "VDirSurrogate.H"
00033 #include "MultiSRPC.H"
00034 #include "VestaConfig.H"
00035 #include "ListResults.H"
00036 
00037 using std::cerr;
00038 using std::endl;
00039 
00040 // Module globals
00041 static int listChunkSize = 8300;
00042 static int listPerEntryOverhead = 40;
00043 
00044 // Kludge, should never be used.  Forces readOnlyExisting argument
00045 // to VDirSurrogate::createVolatileDirectory to be false.
00046 static int forceWritableExisting = 0;
00047 
00048 // These control buffer sizes in VDirSurrogate::readWhole below
00049 static int readWhole_recv_bufsiz = (64*1024);
00050 static int readWhole_inflate_bufsiz = (128*1024);
00051 
00052 extern "C"
00053 {
00054   void 
00055   VDirSurrogate_init()
00056   {
00057     try {
00058         Text val;
00059         if (VestaConfig::get("Repository",
00060                              "VestaSourceSRPC_listChunkSize", val)) {
00061             listChunkSize = atoi(val.cchars());
00062         }
00063         if (VestaConfig::get("Repository",
00064                              "VestaSourceSRPC_listPerEntryOverhead", val)) {
00065             listPerEntryOverhead = atoi(val.cchars());
00066         }
00067         if (VestaConfig::get("Repository", "forceWritableExisting", val)) {
00068             forceWritableExisting = atoi(val.cchars());
00069         }
00070         if (VestaConfig::get("Repository",
00071                              "VestaSourceSRPC_readWhole_recv_bufsiz",
00072                              val)) {
00073           readWhole_recv_bufsiz = atoi(val.cchars());
00074         }
00075         if (VestaConfig::get("Repository",
00076                              "VestaSourceSRPC_readWhole_inflate_bufsiz",
00077                              val)) {
00078           readWhole_inflate_bufsiz = atoi(val.cchars());
00079         }
00080         AccessControl::commonInit();
00081     } catch (VestaConfig::failure f) {
00082         cerr << "VestaConfig::failure " << f.msg << endl;
00083         throw;  // likely to be uncaught and fatal
00084     }
00085   }
00086 }
00087 
00088 void
00089 VDirSurrogateInit()
00090 {
00091   static pthread_once_t vdsonce = PTHREAD_ONCE_INIT;
00092   pthread_once(&vdsonce, VDirSurrogate_init);
00093 }
00094 
00095 Text 
00096 VDirSurrogate::defaultHost() throw()
00097 {
00098   return VestaSourceSRPC::defaultHost();
00099 }
00100 
00101 Text 
00102 VDirSurrogate::defaultPort() throw()
00103 {
00104   return VestaSourceSRPC::defaultInterface();
00105 }
00106 
00107 VDirSurrogate::VDirSurrogate(time_t timestamp, ShortId sid,
00108                              const Text host__, const Text port__,
00109                              int executable, Basics::uint64 size)
00110   throw(SRPC::failure)
00111 {
00112     VDirSurrogateInit();
00113     timestampCache = timestamp;
00114     sidCache = sid;
00115     if (host__.Empty()) {
00116         host_ = defaultHost();
00117         port_ = defaultPort();
00118     } else {
00119         host_ = host__;
00120         port_ = port__;
00121     }
00122     executableCache = executable;
00123     sizeCache = size;
00124 }
00125 
00126 // Common code for Lookup and CreateVolatileDirectory
00127 static VestaSource::errorCode
00128 GetLookupResult(SRPC* srpc, VestaSource::typeTag& type, LongId& olongid,
00129                 bool& master, Bit32& pseudoInode, ShortId& sid,
00130                 time_t& timestamp, Bit8*& attribs, FP::Tag& fptag)
00131 {
00132     VestaSource::errorCode err =
00133       (VestaSource::errorCode) srpc->recv_int();
00134     if (err == VestaSource::ok) {
00135         type = (VestaSource::typeTag) srpc->recv_int();
00136         int len = sizeof(olongid.value);
00137         srpc->recv_bytes_here((char *) &olongid.value, len);
00138         master = (bool) srpc->recv_int();
00139         pseudoInode = (Bit32) srpc->recv_int();
00140         sid = srpc->recv_int();
00141         timestamp = (time_t) srpc->recv_int();
00142         attribs = (Bit8*) (bool) srpc->recv_int();
00143         fptag.Recv(*srpc);
00144     }
00145     srpc->recv_end();
00146     return err;
00147 }
00148 
00149 // Common code for LongId::lookup, VestaSource::lookup,
00150 // and VestaSource::resync
00151 static VestaSource::errorCode
00152 Lookup(SRPC* srpc, const LongId& longid, const char* arc,
00153        AccessControl::Identity who,
00154        VestaSource::typeTag& type, LongId& olongid, bool& master,
00155        Bit32& pseudoInode, ShortId& sid, time_t& timestamp, Bit8*& attribs,
00156        FP::Tag& fptag)
00157 {
00158     VestaSource::errorCode err;
00159 
00160     srpc->start_call(VestaSourceSRPC::Lookup,
00161                      VestaSourceSRPC::version);
00162     srpc->send_bytes((const char*) &longid.value,
00163                      sizeof(longid.value));
00164     srpc->send_chars(arc);
00165     VestaSourceSRPC::send_identity(srpc, who);
00166     srpc->send_end();
00167 
00168     err = GetLookupResult(srpc, type, olongid, master, pseudoInode, sid,
00169                           timestamp, attribs, fptag);
00170 
00171     return err;
00172 }
00173 
00174 VestaSource*
00175 VDirSurrogate::LongIdLookup(LongId longid, Text host, Text port,
00176                             AccessControl::Identity who)
00177   throw(SRPC::failure)
00178 {
00179     ShortId sid;
00180     VestaSource::errorCode err;
00181     VestaSource::typeTag otype;
00182     LongId olongid;
00183     bool omaster;
00184     Bit32 opseudoInode;
00185     ShortId osid;
00186     time_t otimestamp;
00187     Bit8* oattribs;
00188     FP::Tag ofptag;
00189     VestaSource* result;
00190     VDirSurrogate* sresult;    
00191     SRPC* srpc;
00192     
00193     VDirSurrogateInit();
00194     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host, port);
00195     err = Lookup(srpc, longid, "", who, otype, olongid, omaster, opseudoInode,
00196                  osid, otimestamp, oattribs, ofptag);
00197     VestaSourceSRPC::End(id);
00198 
00199     if (err != VestaSource::ok) return NULL;
00200     
00201     switch (otype) {
00202       case VestaSource::immutableFile:
00203       case VestaSource::mutableFile:
00204       case VestaSource::ghost:
00205       case VestaSource::stub:
00206       case VestaSource::device:
00207       case VestaSource::immutableDirectory:
00208       case VestaSource::appendableDirectory:
00209       case VestaSource::mutableDirectory:
00210       case VestaSource::evaluatorDirectory:
00211       case VestaSource::evaluatorROEDirectory:
00212       case VestaSource::volatileDirectory:
00213       case VestaSource::volatileROEDirectory:
00214         sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host, port));
00215         result = sresult;
00216         break;
00217         
00218       default:
00219         assert(false);
00220     };
00221     
00222     result->rep = NULL;
00223     result->type = otype;
00224     result->longid = olongid;
00225     result->master = omaster;
00226     result->pseudoInode = opseudoInode;
00227     result->attribs = oattribs;
00228     result->fptag = ofptag;
00229 
00230     return result;
00231 }
00232 
00233 bool
00234 VDirSurrogate::LongIdValid(LongId longid, Text host, Text port,
00235                            AccessControl::Identity who)
00236   throw(SRPC::failure)
00237 {
00238     ShortId sid;
00239     VestaSource::errorCode err;
00240     VestaSource::typeTag otype;
00241     LongId olongid;
00242     bool omaster;
00243     Bit32 opseudoInode;
00244     ShortId osid;
00245     time_t otimestamp;
00246     Bit8* oattribs;
00247     FP::Tag ofptag;
00248     VestaSource* result;
00249     VDirSurrogate* sresult;    
00250     SRPC* srpc;
00251     
00252     VDirSurrogateInit();
00253     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host, port);
00254     err = Lookup(srpc, longid, "", who, otype, olongid, omaster, opseudoInode,
00255                  osid, otimestamp, oattribs, ofptag);
00256     VestaSourceSRPC::End(id);
00257 
00258     return (err == VestaSource::ok);
00259 }
00260 
00261 VestaSource*
00262 VDirSurrogate::repositoryRoot(Text host, Text port, AccessControl::Identity who
00263                               ) throw (SRPC::failure)
00264 {
00265     VestaSource *ret = NEW_CONSTR(VDirSurrogate, (2, NullShortId, host, port));
00266     ret->longid = RootLongId;
00267     ret->resync(who);
00268     return ret;
00269 }
00270 
00271 VestaSource*
00272 VDirSurrogate::mutableRoot(Text host, Text port, AccessControl::Identity who
00273                            ) throw (SRPC::failure)
00274 {
00275     VestaSource *ret = NEW_CONSTR(VDirSurrogate, (2, NullShortId, host, port));
00276     ret->longid = MutableRootLongId;
00277     ret->resync(who);
00278     return ret;
00279 }
00280 
00281 VestaSource*
00282 VDirSurrogate::volatileRoot(Text host, Text port, AccessControl::Identity who
00283                             ) throw (SRPC::failure)
00284 {
00285     VestaSource *ret = NEW_CONSTR(VDirSurrogate, (2, NullShortId, host, port));
00286     ret->longid = VolatileRootLongId;
00287     ret->resync(who);
00288     return ret;
00289 }
00290 
00291 VestaSource::errorCode
00292 VDirSurrogate::createVolatileDirectory(const char* hostname, Bit64 handle,
00293                                        VestaSource*& result,
00294                                        time_t timestamp,
00295                                        bool readOnlyExisting,
00296                                        Text reposHost, Text reposPort)
00297   throw (SRPC::failure)
00298 {
00299     static Text port;
00300     if (port.Empty()) {
00301         try {
00302             port = VestaConfig::get_Text("Evaluator", "EvaluatorDirSRPC_port");
00303         } catch (VestaConfig::failure f) {
00304             cerr << "createVolatileDirectory got VestaConfig::failure: "
00305               << f.msg << endl;
00306             throw;
00307         }
00308     }
00309 
00310     return VDirSurrogate::createVolatileDirectory(hostname, port.cchars(),
00311                                                   handle, result, timestamp,
00312                                                   readOnlyExisting,
00313                                                   reposHost, reposPort);
00314 }
00315 
00316 
00317 VestaSource::errorCode
00318 VDirSurrogate::createVolatileDirectory(const char* hostname, const char* port,
00319                                        Bit64 handle,
00320                                        VestaSource*& result, time_t timestamp,
00321                                        bool readOnlyExisting,
00322                                        Text reposHost, Text reposPort)
00323      throw (SRPC::failure)
00324 {
00325     VestaSource::errorCode err;
00326     VestaSource::typeTag otype;
00327     LongId olongid;
00328     bool omaster;
00329     Bit32 opseudoInode;
00330     ShortId osid;
00331     time_t otimestamp;
00332     Bit8* oattribs;
00333     FP::Tag ofptag;
00334     SRPC* srpc;
00335     
00336     VDirSurrogateInit();
00337     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, reposHost, reposPort);
00338     
00339     // Kludge, please ignore
00340     if (forceWritableExisting) readOnlyExisting = false;
00341 
00342     srpc->start_call(VestaSourceSRPC::CreateVolatileDirectory,
00343                      VestaSourceSRPC::version);
00344     srpc->send_chars(hostname);
00345     srpc->send_chars(port);
00346     srpc->send_bytes((char*) &handle, sizeof(handle));
00347     srpc->send_int((int) timestamp);
00348     srpc->send_int((int) readOnlyExisting);
00349     srpc->send_end();
00350     
00351     err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
00352                           otimestamp, oattribs, ofptag);
00353     VestaSourceSRPC::End(id);
00354     if (err != VestaSource::ok) return err;
00355     
00356     assert(otype == (readOnlyExisting ? VestaSource::volatileROEDirectory :
00357                      VestaSource::volatileDirectory));
00358     
00359     result =
00360       NEW_CONSTR(VDirSurrogate, (otimestamp, osid, reposHost, reposPort));
00361     result->rep = NULL;
00362     result->type = otype;
00363     result->longid = olongid;
00364     result->master = omaster;
00365     result->pseudoInode = opseudoInode;
00366     result->attribs = oattribs;
00367     result->fptag = ofptag;
00368     
00369     return VestaSource::ok;
00370 }
00371 
00372 VestaSource::errorCode
00373 VDirSurrogate::deleteVolatileDirectory(VestaSource* dir)
00374      throw (SRPC::failure)
00375 {
00376     SRPC* srpc;
00377     VDirSurrogate *vds = (VDirSurrogate *)dir; // ugh
00378     assert(dir->type == VestaSource::volatileDirectory ||
00379            dir->type == VestaSource::volatileROEDirectory);
00380     VDirSurrogateInit();
00381     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc,
00382                                                   vds->host_, vds->port_);
00383     srpc->start_call(VestaSourceSRPC::DeleteVolatileDirectory,
00384                      VestaSourceSRPC::version);
00385     srpc->send_bytes((const char*) &dir->longid.value,
00386                      sizeof(dir->longid.value));
00387     srpc->send_end();
00388     VestaSource::errorCode err =
00389       (VestaSource::errorCode) srpc->recv_int();
00390     srpc->recv_end();
00391     VestaSourceSRPC::End(id);
00392     return err;
00393 }
00394 
00395 void
00396 VDirSurrogate::getNFSInfo(char*& socket, LongId& root, LongId& muRoot)
00397      throw (SRPC::failure)
00398 {
00399     int len;
00400     SRPC* srpc;
00401     VDirSurrogateInit();
00402     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc);
00403     srpc->start_call(VestaSourceSRPC::GetNFSInfo,
00404                      VestaSourceSRPC::version);
00405     srpc->send_end();
00406     socket = srpc->recv_chars();
00407     len = sizeof(LongId);
00408     srpc->recv_bytes_here((char *) &root.value, len);
00409     len = sizeof(LongId);
00410     srpc->recv_bytes_here((char *) &muRoot.value, len);
00411     srpc->recv_end();
00412     VestaSourceSRPC::End(id);
00413 }
00414 
00415 ShortId VDirSurrogate::fpToShortId(const FP::Tag& fptag, Text host, Text port)
00416      throw (SRPC::failure)
00417 {
00418     ShortId sid;
00419     SRPC* srpc;
00420     VDirSurrogateInit();
00421     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host, port);
00422     srpc->start_call(VestaSourceSRPC::FPToShortId,
00423                      VestaSourceSRPC::version);
00424     fptag.Send(*srpc);
00425     srpc->send_end();
00426 
00427     sid = (ShortId)srpc->recv_int();
00428     srpc->recv_end();
00429     VestaSourceSRPC::End(id);
00430 
00431     return sid; 
00432 }
00433 
00434 void
00435 VDirSurrogate::getUserInfo(AccessControl::IdInfo &result /*OUT*/,
00436                            AccessControl::Identity who,
00437                            AccessControl::Identity subject,
00438                            Text reposHost,
00439                            Text reposPort)
00440   throw (SRPC::failure)
00441 {
00442   // Make sure that the default repository is initialized.
00443   VDirSurrogateInit();
00444 
00445   // If no user was explicitly requested, then we get information
00446   // about the calling user.
00447   if(subject == NULL)
00448     {
00449       subject = who;
00450     }
00451 
00452   // Contact the repository.
00453   SRPC* srpc;
00454   MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, reposHost, reposPort);
00455 
00456   // Make the GetUserInfo call, passing two identities: the requestor
00457   // and the subject.
00458   srpc->start_call(VestaSourceSRPC::GetUserInfo,
00459                    VestaSourceSRPC::version);
00460   VestaSourceSRPC::send_identity(srpc, who);
00461   VestaSourceSRPC::send_identity(srpc, subject);
00462   srpc->send_end();
00463 
00464   // Receive the global names (including all aliases) of this user.
00465   srpc->recv_chars_seq(result.names);
00466 
00467   // Receive the global groups of which this user is a member.
00468   srpc->recv_chars_seq(result.groups);
00469 
00470   // Receive the user's UNIX user and group IDs as used by the
00471   // repository's NFS interface.
00472   result.unix_uid = srpc->recv_int32();
00473   result.unix_gid = srpc->recv_int32();
00474 
00475   // Receive the special permissions of the user (encoded as bits in a
00476   // 16-bit integer).
00477   Basics::uint16 specials = srpc->recv_int16();
00478   result.is_root = specials & 1;
00479   result.is_admin = specials & (1 << 1);
00480   result.is_wizard = specials & (1 << 2);
00481   result.is_runtool = specials & (1 << 3);
00482 
00483   // End the call.
00484   srpc->recv_end();
00485   VestaSourceSRPC::End(id);
00486 }
00487 
00488 void
00489 VDirSurrogate::refreshAccessTables(AccessControl::Identity who,
00490                                    Text reposHost,
00491                                    Text reposPort)
00492   throw (SRPC::failure)
00493 {
00494   VDirSurrogateInit();
00495 
00496   SRPC* srpc;
00497   MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, reposHost, reposPort);
00498 
00499   srpc->start_call(VestaSourceSRPC::RefreshAccessTables,
00500                    VestaSourceSRPC::version);
00501   VestaSourceSRPC::send_identity(srpc, who);
00502   srpc->send_end();
00503 
00504   srpc->recv_end();
00505   VestaSourceSRPC::End(id);
00506 }
00507 
00508 VestaSource::errorCode
00509 VDirSurrogate::acquireMastership(const char* pathname,
00510                                  Text dstHost, Text dstPort,
00511                                  Text srcHost, Text srcPort,
00512                                  char pathnameSep,
00513                                  AccessControl::Identity dwho,
00514                                  AccessControl::Identity swho)
00515      throw (SRPC::failure)
00516 {
00517     SRPC* srpc;
00518     VDirSurrogateInit();
00519 
00520     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, dstHost, dstPort);
00521     srpc->start_call(VestaSourceSRPC::AcquireMastership,
00522                      VestaSourceSRPC::version);
00523     srpc->send_chars(pathname);
00524     srpc->send_chars(srcHost.cchars());
00525     srpc->send_chars(srcPort.cchars());
00526     srpc->send_int((int) pathnameSep);
00527     VestaSourceSRPC::send_identity(srpc, dwho);
00528     VestaSourceSRPC::send_identity(srpc, swho ? swho : dwho);
00529     srpc->send_end();
00530     VestaSource::errorCode err =
00531       (VestaSource::errorCode) srpc->recv_int();
00532     srpc->recv_end();
00533     VestaSourceSRPC::End(id);
00534     return err;
00535 }
00536 
00537 
00538 VestaSource::errorCode
00539 VDirSurrogate::replicate(const char* pathname,
00540                          bool asStub, bool asGhost,
00541                          Text dstHost, Text dstPort,
00542                          Text srcHost, Text srcPort,
00543                          char pathnameSep,
00544                          AccessControl::Identity dwho,
00545                          AccessControl::Identity swho)
00546      throw (SRPC::failure)
00547 {
00548     SRPC* srpc;
00549     VDirSurrogateInit();
00550 
00551     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, dstHost, dstPort);
00552     srpc->start_call(VestaSourceSRPC::Replicate,
00553                      VestaSourceSRPC::version);
00554     srpc->send_chars(pathname);
00555     srpc->send_int((int) asStub);
00556     srpc->send_int((int) asGhost);
00557     srpc->send_chars(srcHost.cchars());
00558     srpc->send_chars(srcPort.cchars());
00559     srpc->send_int((int) pathnameSep);
00560     VestaSourceSRPC::send_identity(srpc, dwho);
00561     VestaSourceSRPC::send_identity(srpc, swho ? swho : dwho);
00562     srpc->send_end();
00563     VestaSource::errorCode err =
00564       (VestaSource::errorCode) srpc->recv_int();
00565     srpc->recv_end();
00566     VestaSourceSRPC::End(id);
00567     return err;
00568 }
00569 
00570 VestaSource::errorCode
00571 VDirSurrogate::replicateAttribs(const char* pathname,
00572                                 bool includeAccess,
00573                                 Text dstHost, Text dstPort,
00574                                 Text srcHost, Text srcPort,
00575                                 char pathnameSep,
00576                                 AccessControl::Identity dwho,
00577                                 AccessControl::Identity swho)
00578      throw (SRPC::failure)
00579 {
00580     SRPC* srpc;
00581     VDirSurrogateInit();
00582 
00583     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, dstHost, dstPort);
00584     srpc->start_call(VestaSourceSRPC::ReplicateAttribs,
00585                      VestaSourceSRPC::version);
00586     srpc->send_chars(pathname);
00587     srpc->send_int((int) includeAccess);
00588     srpc->send_chars(srcHost.cchars());
00589     srpc->send_chars(srcPort.cchars());
00590     srpc->send_int((int) pathnameSep);
00591     VestaSourceSRPC::send_identity(srpc, dwho);
00592     VestaSourceSRPC::send_identity(srpc, swho ? swho : dwho);
00593     srpc->send_end();
00594     VestaSource::errorCode err =
00595       (VestaSource::errorCode) srpc->recv_int();
00596     srpc->recv_end();
00597     VestaSourceSRPC::End(id);
00598     return err;
00599 }
00600 
00601 
00602 void
00603 VDirSurrogate::resync(AccessControl::Identity who) throw (SRPC::failure)
00604 {
00605     // Redo lookup of our own longid
00606     VestaSource::errorCode err;
00607     VDirSurrogateInit();
00608     SRPC* srpc;
00609     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00610     err = Lookup(srpc, longid, "", who, type, longid, master,
00611                  pseudoInode, sidCache, timestampCache, attribs, fptag);
00612     VestaSourceSRPC::End(id);
00613     assert(err == VestaSource::ok);
00614     // Clear stat cache
00615     executableCache = -1;
00616     sizeCache = (Basics::uint64) -1;
00617 }
00618 
00619 VestaSource::errorCode
00620 VDirSurrogate::lookup(Arc arc, VestaSource*& result,
00621                       AccessControl::Identity who,
00622                       unsigned int indexOffset) throw (SRPC::failure)
00623 {
00624   // Start by initializing the result, in case we fail before setting
00625   // it to something useful.
00626   result = 0;
00627 
00628     assert(indexOffset == 0);
00629     ShortId sid;
00630     VestaSource::errorCode err;
00631     VestaSource::typeTag otype;
00632     LongId olongid;
00633     bool omaster;
00634     Bit32 opseudoInode;
00635     ShortId osid;
00636     time_t otimestamp;
00637     VDirSurrogate* sresult;    
00638     Bit8* oattribs;
00639     FP::Tag ofptag;
00640     
00641     VDirSurrogateInit();
00642     SRPC* srpc;
00643     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00644 
00645     err = Lookup(srpc, longid, arc, who, otype, olongid, omaster,
00646                  opseudoInode, osid, otimestamp, oattribs, ofptag);
00647     VestaSourceSRPC::End(id);
00648     if (err != VestaSource::ok) return err;
00649     
00650     switch (otype) {
00651       case VestaSource::immutableFile:
00652       case VestaSource::mutableFile:
00653       case VestaSource::ghost:
00654       case VestaSource::stub:
00655       case VestaSource::device:
00656       case VestaSource::immutableDirectory:
00657       case VestaSource::appendableDirectory:
00658       case VestaSource::mutableDirectory:
00659       case VestaSource::evaluatorDirectory:
00660       case VestaSource::evaluatorROEDirectory:
00661       case VestaSource::volatileDirectory:
00662       case VestaSource::volatileROEDirectory:
00663         sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00664         result = sresult;
00665         break;
00666         
00667       default:
00668         assert(false);
00669     };
00670     
00671     result->rep = NULL;
00672     result->type = otype;
00673     result->longid = olongid;
00674     result->master = omaster;
00675     result->pseudoInode = opseudoInode;
00676     result->attribs = oattribs;
00677     result->fptag = ofptag;
00678     
00679     return VestaSource::ok;
00680 }
00681 
00682 VestaSource::errorCode
00683 VDirSurrogate::lookupIndex(unsigned int index, VestaSource*& result,
00684                            char *arcbuf) throw (SRPC::failure)
00685 {
00686   // Start by initializing the result, in case we fail before setting
00687   // it to something useful.
00688   result = 0;
00689 
00690     ShortId sid;
00691     VestaSource::errorCode err;
00692     VestaSource::typeTag otype;
00693     LongId olongid;
00694     bool omaster;
00695     Bit32 opseudoInode;
00696     ShortId osid;
00697     time_t otimestamp;
00698     VDirSurrogate* sresult;    
00699     Bit8* oattribs;
00700     FP::Tag ofptag;
00701     
00702     VDirSurrogateInit();
00703     SRPC* srpc;
00704     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00705 
00706     srpc->start_call(VestaSourceSRPC::LookupIndex,
00707                      VestaSourceSRPC::version);
00708     srpc->send_bytes((const char*) &longid.value,
00709                      sizeof(longid.value));
00710     srpc->send_int((int) index);
00711     srpc->send_int((int) (arcbuf != NULL));
00712     srpc->send_end();
00713 
00714     err = (VestaSource::errorCode) srpc->recv_int();
00715     if (err == VestaSource::ok) {
00716         otype = (VestaSource::typeTag) srpc->recv_int();
00717         int len = sizeof(olongid.value);
00718         srpc->recv_bytes_here((char *) &olongid.value, len);
00719         omaster = (bool) srpc->recv_int();
00720         opseudoInode = (Bit32) srpc->recv_int();
00721         osid = srpc->recv_int();
00722         otimestamp = (time_t) srpc->recv_int();
00723         oattribs = (Bit8*) (bool) srpc->recv_int();
00724         ofptag.Recv(*srpc);
00725         if (arcbuf != NULL) {
00726             int arclen = MAX_ARC_LEN + 1;
00727             srpc->recv_chars_here(arcbuf, arclen);
00728         }
00729     }
00730     srpc->recv_end();
00731 
00732     VestaSourceSRPC::End(id);
00733     if (err != VestaSource::ok) return err;
00734     
00735     switch (otype) {
00736       case VestaSource::immutableFile:
00737       case VestaSource::mutableFile:
00738       case VestaSource::ghost:
00739       case VestaSource::stub:
00740       case VestaSource::device:
00741       case VestaSource::immutableDirectory:
00742       case VestaSource::appendableDirectory:
00743       case VestaSource::mutableDirectory:
00744       case VestaSource::evaluatorDirectory:
00745       case VestaSource::evaluatorROEDirectory:
00746       case VestaSource::volatileDirectory:
00747       case VestaSource::volatileROEDirectory:
00748         sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00749         result = sresult;
00750         break;
00751         
00752       default:
00753         assert(false);
00754     };
00755     
00756     result->rep = NULL;
00757     result->type = otype;
00758     result->longid = olongid;
00759     result->master = omaster;
00760     result->pseudoInode = opseudoInode;
00761     result->attribs = oattribs;
00762     result->fptag = ofptag;
00763     
00764     return VestaSource::ok;
00765 }
00766 
00767 VestaSource::errorCode
00768 VDirSurrogate::lookupPathname(const char* pathname, VestaSource*& result,
00769                               AccessControl::Identity who, char pathnameSep)
00770      throw (SRPC::failure)
00771 {
00772     ShortId sid;
00773     VestaSource::errorCode err;
00774     VestaSource::typeTag otype;
00775     LongId olongid;
00776     bool omaster;
00777     Bit32 opseudoInode;
00778     ShortId osid;
00779     time_t otimestamp;
00780     VDirSurrogate* sresult;    
00781     Bit8* oattribs;
00782     FP::Tag ofptag;
00783     
00784     VDirSurrogateInit();
00785     SRPC* srpc;
00786     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00787 
00788     srpc->start_call(VestaSourceSRPC::LookupPathname,
00789                      VestaSourceSRPC::version);
00790     srpc->send_bytes((const char*) &longid.value,
00791                      sizeof(longid.value));
00792     srpc->send_chars(pathname);
00793     VestaSourceSRPC::send_identity(srpc, who);
00794     srpc->send_int((int) pathnameSep);
00795     srpc->send_end();
00796 
00797     err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
00798                           otimestamp, oattribs, ofptag);
00799     VestaSourceSRPC::End(id);
00800     if (err != VestaSource::ok) return err;
00801     
00802     switch (otype) {
00803       case VestaSource::immutableFile:
00804       case VestaSource::mutableFile:
00805       case VestaSource::ghost:
00806       case VestaSource::stub:
00807       case VestaSource::device:
00808       case VestaSource::immutableDirectory:
00809       case VestaSource::appendableDirectory:
00810       case VestaSource::mutableDirectory:
00811       case VestaSource::evaluatorDirectory:
00812       case VestaSource::evaluatorROEDirectory:
00813       case VestaSource::volatileDirectory:
00814       case VestaSource::volatileROEDirectory:
00815         sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00816         result = sresult;
00817         break;
00818         
00819       default:
00820         assert(false);
00821     };
00822     
00823     result->rep = NULL;
00824     result->type = otype;
00825     result->longid = olongid;
00826     result->master = omaster;
00827     result->pseudoInode = opseudoInode;
00828     result->attribs = oattribs;
00829     result->fptag = ofptag;
00830     
00831     return VestaSource::ok;
00832 }
00833 
00834 VestaSource::errorCode
00835 VDirSurrogate::getBase(VestaSource*& result, AccessControl::Identity who)
00836      throw (SRPC::failure)
00837 {
00838     VestaSource::errorCode err;
00839     VDirSurrogateInit();
00840     SRPC* srpc;   
00841     VestaSource::typeTag otype;
00842     LongId olongid;
00843     bool omaster;
00844     Bit32 opseudoInode;
00845     ShortId osid;
00846     time_t otimestamp;
00847     Bit8* oattribs;
00848     FP::Tag ofptag;
00849 
00850     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00851     srpc->start_call(VestaSourceSRPC::GetBase, VestaSourceSRPC::version);
00852     srpc->send_bytes((const char*) &this->longid.value,
00853                      sizeof(this->longid.value));
00854     VestaSourceSRPC::send_identity(srpc, who);
00855     srpc->send_end();
00856 
00857     err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
00858                           otimestamp, oattribs, ofptag);
00859     
00860     VestaSourceSRPC::End(id);
00861     if (err != VestaSource::ok) return err;
00862     
00863     VDirSurrogate* sresult;    
00864     switch (otype) {
00865       case VestaSource::immutableFile:
00866       case VestaSource::mutableFile:
00867       case VestaSource::ghost:
00868       case VestaSource::stub:
00869       case VestaSource::device:
00870       case VestaSource::immutableDirectory:
00871       case VestaSource::appendableDirectory:
00872       case VestaSource::mutableDirectory:
00873       case VestaSource::evaluatorDirectory:
00874       case VestaSource::evaluatorROEDirectory:
00875       case VestaSource::volatileDirectory:
00876       case VestaSource::volatileROEDirectory:
00877         sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
00878         result = sresult;
00879         break;
00880         
00881       default:
00882         assert(false);
00883     };
00884     
00885     result->rep = NULL;
00886     result->type = otype;
00887     result->longid = olongid;
00888     result->master = omaster;
00889     result->pseudoInode = opseudoInode;
00890     result->attribs = oattribs;
00891     result->fptag = ofptag;
00892     
00893     return VestaSource::ok;
00894 }
00895 
00896 
00897 VestaSource::errorCode
00898 VDirSurrogate::list(unsigned int firstIndex,
00899                     VestaSource::listCallback callback, void* closure,
00900                     AccessControl::Identity who,
00901                     bool deltaOnly, unsigned int indexOffset)
00902 {
00903     assert(indexOffset == 0);
00904     int curChunkSize = 0;
00905     VestaSource::errorCode err;
00906     unsigned int index = firstIndex;
00907 
00908     VDirSurrogateInit();
00909     SRPC* srpc;
00910     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00911 
00912     // Do multiple calls, each fetching about chunkSize bytes worth of
00913     // results.  The idea is that our client will probably return
00914     // false ("stop") after it has received that many bytes, so we
00915     // don't want to get more than that in a stream from the server.
00916     bool stop_req = false, dir_end = false;
00917     // Note that we buffer the results of each call *before* calling
00918     // the callback, as we don't want to hold up the server during the
00919     // callback's execution.
00920     ListResultSeq list_result;
00921     do {
00922         srpc->start_call(VestaSourceSRPC::List,
00923                          VestaSourceSRPC::version);
00924         srpc->send_bytes((const char*) &longid.value,
00925                          sizeof(longid.value));
00926         srpc->send_int(index);
00927         VestaSourceSRPC::send_identity(srpc, who);
00928         srpc->send_int(deltaOnly);
00929         srpc->send_int(listChunkSize);
00930         srpc->send_int(listPerEntryOverhead);
00931         srpc->send_end();
00932 
00933         err = VestaSource::ok;
00934         srpc->recv_seq_start();
00935         for (;;) {
00936             ListResultItem list_item;
00937             int arclen = sizeof(list_item.arc);
00938             bool got_end;
00939             srpc->recv_chars_here(list_item.arc, arclen, &got_end);
00940             if (got_end) {
00941                 // Sequence ended due to chunkSize limit
00942                 err = VestaSource::ok;
00943                 break;
00944             }
00945             list_item.type = (VestaSource::typeTag) srpc->recv_int();
00946             index = (unsigned int) srpc->recv_int();
00947             list_item.index = index;
00948             if (list_item.type == VestaSource::unused) {
00949                 // End of directory reached, or error
00950                 dir_end = true;
00951                 err = (VestaSource::errorCode) index;
00952                 break;
00953             }
00954             list_item.pseudoInode = (Bit32) srpc->recv_int();
00955             list_item.filesid = (ShortId) srpc->recv_int();
00956             list_item.master = (bool) srpc->recv_int();
00957 
00958             // Add this item to the result sequence.
00959             list_result.addhi(list_item);
00960         }
00961         srpc->recv_seq_end();
00962         srpc->recv_end();
00963         index += 2; // don't read the same one again
00964 
00965         // Call the callback once per entry returned by the server
00966         // until the callback tells us to stop or we run out.
00967         while((list_result.size() > 0) && !stop_req)
00968           {
00969             ListResultItem list_item = list_result.remlo();
00970 
00971             curChunkSize += strlen(list_item.arc) + listPerEntryOverhead;
00972 
00973             stop_req = !callback(closure, list_item.type, list_item.arc,
00974                                  list_item.index, list_item.pseudoInode,
00975                                  list_item.filesid, list_item.master);
00976           }
00977 
00978         if (stop_req && listChunkSize > curChunkSize + 100)
00979           {
00980             // Seems we guessed wrong; use smaller size now.
00981             listChunkSize = curChunkSize + 100;
00982           }
00983     } while (!stop_req && !dir_end);
00984     
00985     VestaSourceSRPC::End(id);
00986     return err;
00987 }
00988 
00989 VestaSource::errorCode
00990 VDirSurrogate::reallyDelete(Arc arc, AccessControl::Identity who,
00991                             bool existCheck, time_t timestamp)
00992      throw (SRPC::failure)
00993 {
00994     VDirSurrogateInit();
00995     SRPC* srpc;
00996     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
00997 
00998     srpc->start_call(VestaSourceSRPC::ReallyDelete,
00999                      VestaSourceSRPC::version);
01000     srpc->send_bytes((const char*) &this->longid.value,
01001                      sizeof(this->longid.value));
01002     srpc->send_chars(arc);
01003     VestaSourceSRPC::send_identity(srpc, who);
01004     srpc->send_int((int) existCheck);
01005     srpc->send_int((int) timestamp);
01006     srpc->send_end();
01007     VestaSource::errorCode err =
01008       (VestaSource::errorCode) srpc->recv_int();
01009     srpc->recv_end();
01010     VestaSourceSRPC::End(id);
01011     return err;
01012 }
01013 
01014 
01015 VestaSource::errorCode
01016 VDirSurrogate::insertFile(Arc arc, ShortId sid, bool master,
01017                           AccessControl::Identity who,
01018                           VestaSource::dupeCheck chk,
01019                           VestaSource** newvs, time_t timestamp,
01020                           FP::Tag* fptag) throw (SRPC::failure)
01021 {
01022     if (newvs) *newvs = NULL; // in case of error
01023     VDirSurrogateInit();
01024     SRPC* srpc;
01025     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01026 
01027     srpc->start_call(VestaSourceSRPC::InsertFile,
01028                      VestaSourceSRPC::version);
01029     srpc->send_bytes((const char*) &this->longid.value,
01030                      sizeof(this->longid.value));
01031     srpc->send_chars(arc);
01032     srpc->send_int((int) sid);
01033     srpc->send_int((int) master);
01034     VestaSourceSRPC::send_identity(srpc, who);
01035     srpc->send_int((int) chk);
01036     srpc->send_int((int) timestamp);
01037     if (fptag == NULL) {
01038         srpc->send_bytes(NULL, 0);
01039     } else {
01040         fptag->Send(*srpc);
01041     }
01042     srpc->send_end();
01043     VestaSource::errorCode err =
01044       (VestaSource::errorCode) srpc->recv_int();
01045     if (err == VestaSource::ok) {
01046         int len = sizeof(LongId);
01047         VDirSurrogate* vs =
01048           NEW_CONSTR(VDirSurrogate, (timestamp, sid, host_, port_));
01049         vs->type = VestaSource::immutableFile;
01050         srpc->recv_bytes_here((char *) &vs->longid.value, len);
01051         vs->master = master;
01052         vs->pseudoInode = (Bit32) srpc->recv_int();
01053         vs->fptag.Recv(*srpc);
01054         vs->sidCache = (ShortId) srpc->recv_int();
01055         if (newvs == NULL)
01056           delete vs;
01057         else
01058           *newvs = vs;
01059     }
01060     srpc->recv_end();
01061     VestaSourceSRPC::End(id);
01062     return err;
01063 }
01064 
01065 
01066 VestaSource::errorCode
01067 VDirSurrogate::insertMutableFile(Arc arc, ShortId sid, bool master,
01068                                  AccessControl::Identity who,
01069                                  VestaSource::dupeCheck chk,
01070                                  VestaSource** newvs, time_t timestamp)
01071      throw (SRPC::failure)
01072 {
01073     if (newvs) *newvs = NULL; // in case of error
01074     VDirSurrogateInit();
01075     SRPC* srpc;
01076     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01077 
01078     srpc->start_call(VestaSourceSRPC::InsertMutableFile,
01079                      VestaSourceSRPC::version);
01080     srpc->send_bytes((const char*) &this->longid.value,
01081                      sizeof(this->longid.value));
01082     srpc->send_chars(arc);
01083     srpc->send_int((int) sid);
01084     srpc->send_int((int) master);
01085     VestaSourceSRPC::send_identity(srpc, who);
01086     srpc->send_int((int) chk);
01087     srpc->send_int((int) timestamp);
01088     srpc->send_end();
01089     VestaSource::errorCode err =
01090       (VestaSource::errorCode) srpc->recv_int();
01091     if (err == VestaSource::ok) {
01092         int len = sizeof(LongId);
01093         VDirSurrogate* vs =
01094           NEW_CONSTR(VDirSurrogate, (timestamp, sid, host_, port_));
01095         vs->type = VestaSource::mutableFile;
01096         srpc->recv_bytes_here((char *) &vs->longid.value, len);
01097         vs->master = master;
01098         vs->pseudoInode = (Bit32) srpc->recv_int();
01099         vs->fptag.Recv(*srpc);
01100         vs->sidCache = (ShortId) srpc->recv_int();
01101         if (newvs == NULL)
01102           delete vs;
01103         else
01104           *newvs = vs;
01105     }
01106     srpc->recv_end();
01107     VestaSourceSRPC::End(id);
01108     return err;
01109 }
01110 
01111 
01112 VestaSource::errorCode
01113 VDirSurrogate::insertImmutableDirectory(Arc arc, VestaSource* dir,
01114                                         bool master,
01115                                         AccessControl::Identity who,
01116                                         VestaSource::dupeCheck chk,
01117                                         VestaSource** newvs, time_t timestamp,
01118                                         FP::Tag* fptag)
01119      throw (SRPC::failure)
01120 {
01121     if (newvs) *newvs = NULL; // in case of error
01122     VDirSurrogateInit();
01123     SRPC* srpc;
01124     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01125 
01126     srpc->start_call(VestaSourceSRPC::InsertImmutableDirectory,
01127                      VestaSourceSRPC::version);
01128     srpc->send_bytes((const char*) &this->longid.value,
01129                      sizeof(this->longid.value));
01130     srpc->send_chars(arc);
01131     if (dir != NULL) {
01132         srpc->send_bytes((const char*) &dir->longid.value,
01133                          sizeof(dir->longid.value));
01134     } else {
01135         srpc->send_bytes((const char*) &NullLongId.value,
01136                          sizeof(NullLongId.value));
01137     }
01138     srpc->send_int((int) master);
01139     VestaSourceSRPC::send_identity(srpc, who);
01140     srpc->send_int((int) chk);
01141     srpc->send_int((int) timestamp);
01142     if (fptag == NULL) {
01143         srpc->send_bytes(NULL, 0);
01144     } else {
01145         fptag->Send(*srpc);
01146     }
01147     srpc->send_end();
01148     VestaSource::errorCode err =
01149       (VestaSource::errorCode) srpc->recv_int();
01150     if (err == VestaSource::ok) {
01151         int len = sizeof(LongId);
01152         VDirSurrogate* vs =
01153           NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01154         vs->type = VestaSource::immutableDirectory;
01155         srpc->recv_bytes_here((char *) &vs->longid.value, len);
01156         vs->master = master;
01157         vs->pseudoInode = srpc->recv_int();
01158         vs->fptag.Recv(*srpc);
01159         vs->sidCache = srpc->recv_int();
01160         if (newvs == NULL)
01161           delete vs;
01162         else
01163           *newvs = vs;
01164     }
01165     srpc->recv_end();
01166     VestaSourceSRPC::End(id);
01167     return err;
01168 }
01169 
01170 VestaSource::errorCode
01171 VDirSurrogate::insertAppendableDirectory(Arc arc, bool master,
01172                                          AccessControl::Identity who,
01173                                          VestaSource::dupeCheck chk,
01174                                          VestaSource** newvs, time_t timestamp)
01175      throw (SRPC::failure)
01176 {
01177     if (newvs) *newvs = NULL; // in case of error
01178     VDirSurrogateInit();
01179     SRPC* srpc;
01180     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01181 
01182     srpc->start_call(VestaSourceSRPC::InsertAppendableDirectory,
01183                      VestaSourceSRPC::version);
01184     srpc->send_bytes((const char*) &this->longid.value,
01185                      sizeof(this->longid.value));
01186     srpc->send_chars(arc);
01187     srpc->send_int((int) master);
01188     VestaSourceSRPC::send_identity(srpc, who);
01189     srpc->send_int((int) chk);
01190     srpc->send_int((int) timestamp);
01191     srpc->send_end();
01192     VestaSource::errorCode err =
01193       (VestaSource::errorCode) srpc->recv_int();
01194     if (err == VestaSource::ok) {
01195         int len = sizeof(LongId);
01196         VDirSurrogate* vs =
01197           NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01198         vs->type = VestaSource::appendableDirectory;
01199         srpc->recv_bytes_here((char *) &vs->longid.value, len);
01200         vs->master = master;
01201         vs->pseudoInode = srpc->recv_int();
01202         vs->fptag.Recv(*srpc);
01203         if (newvs == NULL)
01204           delete vs;
01205         else
01206           *newvs = vs;
01207     }
01208     srpc->recv_end();
01209     VestaSourceSRPC::End(id);
01210     return err;
01211 }
01212 
01213 
01214 VestaSource::errorCode
01215 VDirSurrogate::insertMutableDirectory(Arc arc, VestaSource* dir,
01216                                       bool master,
01217                                       AccessControl::Identity who,
01218                                       VestaSource::dupeCheck chk,
01219                                       VestaSource** newvs, time_t timestamp)
01220      throw (SRPC::failure)
01221 {
01222     if (newvs) *newvs = NULL; // in case of error
01223     VDirSurrogateInit();
01224     SRPC* srpc;
01225     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01226 
01227     srpc->start_call(VestaSourceSRPC::InsertMutableDirectory,
01228                      VestaSourceSRPC::version);
01229     srpc->send_bytes((const char*) &this->longid.value,
01230                      sizeof(this->longid.value));
01231     srpc->send_chars(arc);
01232     if (dir != NULL) {
01233         srpc->send_bytes((const char*) &dir->longid.value,
01234                          sizeof(dir->longid.value));
01235     } else {
01236         srpc->send_bytes((const char*) &NullLongId.value,
01237                          sizeof(NullLongId.value));
01238     }
01239     srpc->send_int((int) master);
01240     VestaSourceSRPC::send_identity(srpc, who);
01241     srpc->send_int((int) chk);
01242     srpc->send_int((int) timestamp);
01243     srpc->send_end();
01244     VestaSource::errorCode err =
01245       (VestaSource::errorCode) srpc->recv_int();
01246     if (err == VestaSource::ok) {
01247         int len = sizeof(LongId);
01248         VDirSurrogate* vs =
01249           NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01250         vs->type = VestaSource::mutableDirectory;
01251         srpc->recv_bytes_here((char *) &vs->longid.value, len);
01252         vs->master = master;
01253         vs->pseudoInode = srpc->recv_int();
01254         vs->fptag.Recv(*srpc);
01255         if (newvs == NULL)
01256           delete vs;
01257         else
01258           *newvs = vs;
01259     }
01260     srpc->recv_end();
01261     VestaSourceSRPC::End(id);
01262     return err;
01263 }
01264 
01265 VestaSource::errorCode
01266 VDirSurrogate::insertGhost(Arc arc, bool master, AccessControl::Identity who,
01267                            VestaSource::dupeCheck chk, VestaSource** newvs,
01268                            time_t timestamp)
01269      throw (SRPC::failure)
01270 {
01271     if (newvs) *newvs = NULL; // in case of error
01272     VDirSurrogateInit();
01273     SRPC* srpc;
01274     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01275 
01276     srpc->start_call(VestaSourceSRPC::InsertGhost,
01277                      VestaSourceSRPC::version);
01278     srpc->send_bytes((const char*) &this->longid.value,
01279                      sizeof(this->longid.value));
01280     srpc->send_chars(arc);
01281     srpc->send_int((int) master);
01282     VestaSourceSRPC::send_identity(srpc, who);
01283     srpc->send_int((int) chk);
01284     srpc->send_int((int) timestamp);
01285     srpc->send_end();
01286     VestaSource::errorCode err =
01287       (VestaSource::errorCode) srpc->recv_int();
01288     if (err == VestaSource::ok) {
01289         int len = sizeof(LongId);
01290         VDirSurrogate* vs =
01291           NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01292         vs->type = VestaSource::ghost;
01293         srpc->recv_bytes_here((char *) &vs->longid.value, len);
01294         vs->master = master;
01295         vs->pseudoInode = srpc->recv_int();
01296         vs->fptag.Recv(*srpc);
01297         if (newvs == NULL)
01298           delete vs;
01299         else
01300           *newvs = vs;
01301     }
01302     srpc->recv_end();
01303     VestaSourceSRPC::End(id);
01304     return err;
01305 }
01306 
01307 VestaSource::errorCode
01308 VDirSurrogate::insertStub(Arc arc, bool master, AccessControl::Identity who,
01309                           VestaSource::dupeCheck chk,
01310                           VestaSource** newvs, time_t timestamp)
01311      throw (SRPC::failure)
01312 {
01313     if (newvs) *newvs = NULL; // in case of error
01314     VDirSurrogateInit();
01315     SRPC* srpc;
01316     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01317 
01318     srpc->start_call(VestaSourceSRPC::InsertStub,
01319                      VestaSourceSRPC::version);
01320     srpc->send_bytes((const char*) &this->longid.value,
01321                      sizeof(this->longid.value));
01322     srpc->send_chars(arc);
01323     srpc->send_int((int) master);
01324     VestaSourceSRPC::send_identity(srpc, who);
01325     srpc->send_int((int) chk);
01326     srpc->send_int((int) timestamp);
01327     srpc->send_end();
01328     VestaSource::errorCode err =
01329       (VestaSource::errorCode) srpc->recv_int();
01330     if (err == VestaSource::ok) {
01331         int len = sizeof(LongId);
01332         VDirSurrogate* vs =
01333           NEW_CONSTR(VDirSurrogate, (timestamp, NullShortId, host_, port_));
01334         vs->type = VestaSource::stub;
01335         srpc->recv_bytes_here((char *) &vs->longid.value, len);
01336         vs->master = master;
01337         vs->pseudoInode = srpc->recv_int();
01338         vs->fptag.Recv(*srpc);
01339         if (newvs == NULL)
01340           delete vs;
01341         else
01342           *newvs = vs;
01343     }
01344     srpc->recv_end();
01345     VestaSourceSRPC::End(id);
01346     return err;
01347 }
01348 
01349 
01350 VestaSource::errorCode
01351 VDirSurrogate::renameTo(Arc arc, VestaSource* fromDir, Arc fromArc,
01352                         AccessControl::Identity who,
01353                         VestaSource::dupeCheck chk, time_t timestamp)
01354      throw (SRPC::failure)
01355 {
01356     VDirSurrogateInit();
01357     SRPC* srpc;
01358     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01359 
01360     srpc->start_call(VestaSourceSRPC::RenameTo,
01361                      VestaSourceSRPC::version);
01362     srpc->send_bytes((const char*) &this->longid.value,
01363                      sizeof(this->longid.value));
01364     srpc->send_chars(arc);
01365     srpc->send_bytes((const char*) &fromDir->longid.value,
01366                      sizeof(fromDir->longid.value));
01367     srpc->send_chars(fromArc);
01368     VestaSourceSRPC::send_identity(srpc, who);
01369     srpc->send_int((int) chk);
01370     srpc->send_int((int) timestamp);
01371     srpc->send_end();
01372     VestaSource::errorCode err =
01373       (VestaSource::errorCode) srpc->recv_int();
01374     srpc->recv_end();
01375     VestaSourceSRPC::End(id);
01376     return err;
01377 }
01378 
01379 
01380 VestaSource::errorCode
01381 VDirSurrogate::makeMutable(VestaSource*& result, ShortId sid,
01382                            Basics::uint64 copyMax, AccessControl::Identity who)
01383      throw (SRPC::failure)
01384 {
01385     VestaSource::errorCode err;
01386     VestaSource::typeTag otype;
01387     LongId olongid;
01388     bool omaster;
01389     Bit32 opseudoInode;
01390     ShortId osid;
01391     time_t otimestamp;
01392     Bit8* oattribs;
01393     FP::Tag ofptag;
01394     
01395     VDirSurrogateInit();
01396     SRPC* srpc;
01397     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01398 
01399     
01400     srpc->start_call(VestaSourceSRPC::MakeMutable,
01401                      VestaSourceSRPC::version);
01402     srpc->send_bytes((const char*) &this->longid.value,
01403                      sizeof(this->longid.value));
01404     srpc->send_int((int) sid);
01405     srpc->send_int((int) (copyMax & 0xffffffff));
01406     srpc->send_int((int) (copyMax >> 32));
01407     VestaSourceSRPC::send_identity(srpc, who);
01408     srpc->send_end();
01409     
01410     err = GetLookupResult(srpc, otype, olongid, omaster, opseudoInode, osid,
01411                           otimestamp, oattribs, ofptag);
01412     
01413     VestaSourceSRPC::End(id);
01414     if (err != VestaSource::ok) return err;
01415     
01416     VDirSurrogate* sresult;    
01417     switch (otype) {
01418       case VestaSource::immutableFile:
01419       case VestaSource::mutableFile:
01420       case VestaSource::ghost:
01421       case VestaSource::stub:
01422       case VestaSource::device:
01423       case VestaSource::immutableDirectory:
01424       case VestaSource::appendableDirectory:
01425       case VestaSource::mutableDirectory:
01426       case VestaSource::evaluatorDirectory:
01427       case VestaSource::evaluatorROEDirectory:
01428       case VestaSource::volatileDirectory:
01429       case VestaSource::volatileROEDirectory:
01430         sresult = NEW_CONSTR(VDirSurrogate, (otimestamp, osid, host_, port_));
01431         result = sresult;
01432         break;
01433         
01434       default:
01435         assert(false);
01436     };
01437     
01438     result->rep = NULL;
01439     result->type = otype;
01440     result->longid = olongid;
01441     result->master = omaster;
01442     result->pseudoInode = opseudoInode;
01443     result->attribs = oattribs;
01444     result->fptag = ofptag;
01445     
01446     return VestaSource::ok;
01447 }
01448 
01449 
01450 bool
01451 VDirSurrogate::inAttribs(const char* name, const char* value)
01452      throw (SRPC::failure)
01453 {
01454     bool retval;
01455 
01456     VDirSurrogateInit();
01457     SRPC* srpc;
01458     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01459 
01460     srpc->start_call(VestaSourceSRPC::InAttribs,
01461                      VestaSourceSRPC::version);
01462     srpc->send_bytes((const char*) &longid.value,
01463                      sizeof(longid.value));
01464     srpc->send_chars(name);
01465     srpc->send_chars(value);
01466     srpc->send_end();
01467 
01468     retval = (bool) srpc->recv_int();
01469     srpc->recv_end();
01470     VestaSourceSRPC::End(id);
01471 
01472     return retval;
01473 }
01474 
01475 char*
01476 VDirSurrogate::getAttrib(const char* name)
01477      throw (SRPC::failure)
01478 {
01479     char* value;
01480     bool null;
01481 
01482     VDirSurrogateInit();
01483     SRPC* srpc;
01484     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01485 
01486     srpc->start_call(VestaSourceSRPC::GetAttrib,
01487                      VestaSourceSRPC::version);
01488     srpc->send_bytes((const char*) &longid.value,
01489                      sizeof(longid.value));
01490     srpc->send_chars(name);
01491     srpc->send_end();
01492 
01493     null = (bool) srpc->recv_int();
01494     value = srpc->recv_chars();
01495     srpc->recv_end();
01496     VestaSourceSRPC::End(id);
01497 
01498     if (null) {
01499         return NULL;
01500     } else {
01501         return value;
01502     }
01503 }
01504 
01505 void
01506 VDirSurrogate::getAttrib(const char* name,
01507                          VestaSource::valueCallback cb, void* cl)
01508 {
01509     char* value;
01510 
01511     VDirSurrogateInit();
01512     SRPC* srpc;
01513     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01514 
01515     srpc->start_call(VestaSourceSRPC::GetAttrib2,
01516                      VestaSourceSRPC::version);
01517     srpc->send_bytes((const char*) &longid.value,
01518                      sizeof(longid.value));
01519     srpc->send_chars(name);
01520     srpc->send_end();
01521 
01522     srpc->recv_seq_start();
01523     AttribResultSeq l_result;
01524     for (;;) {
01525         bool got_end;
01526         value = srpc->recv_chars(&got_end);
01527         if (got_end) break;
01528         l_result.addhi(value);
01529     }
01530     srpc->recv_seq_end();
01531     srpc->recv_end();
01532     VestaSourceSRPC::End(id);
01533 
01534     bool cont = true;
01535     while(l_result.size() > 0)
01536       {
01537         char* value = l_result.remlo();
01538         if (cont) cont = cb(cl, value);
01539         delete [] value;
01540       }
01541 }
01542 
01543 void
01544 VDirSurrogate::listAttribs(VestaSource::valueCallback cb, void* cl)
01545 {
01546     VDirSurrogateInit();
01547     SRPC* srpc;
01548     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01549 
01550     srpc->start_call(VestaSourceSRPC::ListAttribs,
01551                      VestaSourceSRPC::version);
01552     srpc->send_bytes((const char*) &longid.value,
01553                      sizeof(longid.value));
01554     srpc->send_end();
01555 
01556     srpc->recv_seq_start();
01557     AttribResultSeq l_result;
01558     for (;;) {
01559         bool got_end;
01560         char* value = srpc->recv_chars(&got_end);
01561         if (got_end) break;
01562         l_result.addhi(value);
01563     }
01564     srpc->recv_seq_end();
01565     srpc->recv_end();
01566     VestaSourceSRPC::End(id);
01567 
01568     bool cont = true;
01569     while(l_result.size() > 0)
01570       {
01571         char* value = l_result.remlo();
01572         if (cont) cont = cb(cl, value);
01573         delete [] value;
01574       }
01575 }
01576 
01577 void
01578 VDirSurrogate::getAttribHistory(VestaSource::historyCallback cb, void* cl)
01579 {
01580     VDirSurrogateInit();
01581     SRPC* srpc;
01582     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01583 
01584     srpc->start_call(VestaSourceSRPC::GetAttribHistory,
01585                      VestaSourceSRPC::version);
01586     srpc->send_bytes((const char*) &longid.value,
01587                      sizeof(longid.value));
01588     srpc->send_end();
01589 
01590     srpc->recv_seq_start();
01591     AttribHistoryResultSeq l_result;
01592     for (;;) {
01593         bool got_end;
01594         AttribHistoryResultItem l_item;
01595         l_item.op = (VestaSource::attribOp) srpc->recv_int(&got_end);
01596         if (got_end) break;
01597         l_item.name = srpc->recv_chars();
01598         l_item.value = srpc->recv_chars();
01599         l_item.timestamp = (time_t) srpc->recv_int();
01600         l_result.addhi(l_item);
01601     }
01602     srpc->recv_seq_end();
01603     srpc->recv_end();
01604     VestaSourceSRPC::End(id);
01605 
01606     bool cont = true;
01607     while(l_result.size() > 0)
01608       {
01609         AttribHistoryResultItem l_item = l_result.remlo();
01610         if (cont) cont = cb(cl,
01611                             l_item.op, l_item.name, l_item.value,
01612                             l_item.timestamp);
01613         delete [] l_item.name;
01614         delete [] l_item.value;
01615       }
01616 }
01617 
01618 VestaSource::errorCode
01619 VDirSurrogate::writeAttrib(VestaSource::attribOp op, const char* name,
01620                            const char* value, AccessControl::Identity who,
01621                            time_t timestamp)
01622      throw (SRPC::failure)
01623 {
01624     VDirSurrogateInit();
01625     SRPC* srpc;
01626     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01627 
01628     srpc->start_call(VestaSourceSRPC::WriteAttrib,
01629                      VestaSourceSRPC::version);
01630     srpc->send_bytes((const char*) &longid.value,
01631                      sizeof(longid.value));
01632     srpc->send_int((int) op);
01633     srpc->send_chars(name);
01634     srpc->send_chars(value);
01635     VestaSourceSRPC::send_identity(srpc, who);
01636     srpc->send_int((int) timestamp);
01637     srpc->send_end();
01638     VestaSource::errorCode err =
01639       (VestaSource::errorCode) srpc->recv_int();
01640     srpc->recv_end();
01641     VestaSourceSRPC::End(id);
01642     return err;
01643 }
01644 
01645 
01646 VestaSource::errorCode
01647 VDirSurrogate::makeFilesImmutable(unsigned int threshold,
01648                                   AccessControl::Identity who)
01649      throw (SRPC::failure)
01650 {
01651     VestaSource::errorCode err;
01652     VDirSurrogateInit();
01653     SRPC* srpc;
01654     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01655 
01656     srpc->start_call(VestaSourceSRPC::MakeFilesImmutable,
01657                      VestaSourceSRPC::version);
01658     srpc->send_bytes((const char*) &longid.value,
01659                      sizeof(longid.value));
01660     srpc->send_int((int) threshold);
01661     VestaSourceSRPC::send_identity(srpc, who);
01662     srpc->send_end();
01663     err = (VestaSource::errorCode) srpc->recv_int();
01664     srpc->recv_end();
01665     VestaSourceSRPC::End(id);
01666     return err;
01667 }    
01668 
01669 VestaSource::errorCode
01670 VDirSurrogate::setIndexMaster(unsigned int index, bool state,
01671                               AccessControl::Identity who)
01672      throw (SRPC::failure)
01673 {
01674     VestaSource::errorCode err;
01675     VDirSurrogateInit();
01676     SRPC* srpc;
01677     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01678 
01679     srpc->start_call(VestaSourceSRPC::SetIndexMaster,
01680                      VestaSourceSRPC::version);
01681     srpc->send_bytes((const char*) &longid.value,
01682                      sizeof(longid.value));
01683     srpc->send_int((int) index);
01684     srpc->send_int((int) state);
01685     VestaSourceSRPC::send_identity(srpc, who);
01686     srpc->send_end();
01687     err = (VestaSource::errorCode) srpc->recv_int();
01688     srpc->recv_end();
01689     VestaSourceSRPC::End(id);
01690     return err;
01691 }
01692 
01693 VestaSource::errorCode
01694 VDirSurrogate::setMaster(bool state, AccessControl::Identity who)
01695      throw (SRPC::failure)
01696 {
01697     unsigned int index;
01698     LongId plongid = longid.getParent(&index);
01699 
01700     VestaSource::errorCode err;
01701     VDirSurrogateInit();
01702     SRPC* srpc;
01703     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01704 
01705     srpc->start_call(VestaSourceSRPC::SetIndexMaster,
01706                      VestaSourceSRPC::version);
01707     srpc->send_bytes((const char*) &plongid.value,
01708                      sizeof(plongid.value));
01709     srpc->send_int((int) index);
01710     srpc->send_int((int) state);
01711     VestaSourceSRPC::send_identity(srpc, who);
01712     srpc->send_end();
01713     err = (VestaSource::errorCode) srpc->recv_int();
01714     srpc->recv_end();
01715     VestaSourceSRPC::End(id);
01716     if (err == VestaSource::ok) {
01717         master = state;
01718     }
01719     return err;
01720 }
01721 
01722 VestaSource::errorCode
01723 VDirSurrogate::cedeMastership(const char* requestid, const char** grantidOut,
01724                               AccessControl::Identity who)
01725      throw (SRPC::failure)
01726 {
01727   *grantidOut = NULL;
01728   VestaSource::errorCode err;
01729   VDirSurrogateInit();
01730   SRPC* srpc;
01731   MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01732 
01733   srpc->start_call(VestaSourceSRPC::CedeMastership,
01734                    VestaSourceSRPC::version);
01735   srpc->send_bytes((const char*) &longid.value, sizeof(longid.value));
01736   srpc->send_chars(requestid);
01737   VestaSourceSRPC::send_identity(srpc, who);
01738   srpc->send_end();
01739   err = (VestaSource::errorCode) srpc->recv_int();
01740   if (err == VestaSource::ok) {
01741     *grantidOut = srpc->recv_chars();
01742   }
01743   srpc->recv_end();
01744   VestaSourceSRPC::End(id);
01745   return err;
01746 }
01747 
01748 VestaSource::errorCode
01749 VDirSurrogate::read(void* buffer, /*INOUT*/int* nbytes,
01750                     Basics::uint64 offset, AccessControl::Identity who)
01751      throw (SRPC::failure)
01752 {
01753     VestaSource::errorCode err;
01754     VDirSurrogateInit();
01755     SRPC* srpc;
01756     MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01757 
01758     srpc->start_call(VestaSourceSRPC::Read,
01759                      VestaSourceSRPC::version);
01760     srpc->send_bytes((const char*) &longid.value,
01761                      sizeof(longid.value));
01762     srpc->send_int(*nbytes);
01763     srpc->send_int(offset & 0xffffffff);
01764     srpc->send_int(offset >> 32ul);
01765     VestaSourceSRPC::send_identity(srpc, who);
01766     srpc->send_end();
01767     err = (VestaSource::errorCode) srpc->recv_int();
01768     if (err == VestaSource::ok) {
01769         srpc->recv_bytes_here((char *)buffer, *nbytes);
01770     }
01771     srpc->recv_end();
01772         VestaSourceSRPC::End(id);
01773     return err;
01774 }
01775 
01776 static void
01777 DoStat(VDirSurrogate* thiz, time_t& timestampCache, int& executableCache, 
01778        Basics::uint64& sizeCache)
01779      throw (SRPC::failure)
01780 {
01781     VestaSource::errorCode err;
01782     VDirSurrogateInit();
01783     SRPC* srpc;
01784     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc,
01785                                                   thiz->host(), thiz->port());
01786 
01787     srpc->start_call(VestaSourceSRPC::Stat,
01788                      VestaSourceSRPC::version);
01789     srpc->send_bytes((const char*) &thiz->longid.value,
01790                      sizeof(thiz->longid.value));
01791     srpc->send_end();
01792     err = (VestaSource::errorCode) srpc->recv_int();
01793     timestampCache = (time_t) srpc->recv_int();
01794     executableCache = srpc->recv_int();
01795     sizeCache = (unsigned int) srpc->recv_int();
01796     sizeCache += ((Basics::uint64) srpc->recv_int()) << 32;
01797     srpc->recv_end();
01798    VestaSourceSRPC::End(id);
01799 }
01800 
01801 bool
01802 VDirSurrogate::executable()
01803      throw (SRPC::failure)
01804 {
01805     if (executableCache == -1) {
01806         DoStat(this, timestampCache, executableCache, sizeCache);
01807     }
01808     return executableCache;
01809 }
01810 
01811 Basics::uint64
01812 VDirSurrogate::size()
01813      throw (SRPC::failure)
01814 {
01815     if (sizeCache == (Basics::uint64) -1) {
01816         DoStat(this, timestampCache, executableCache, sizeCache);
01817     }
01818     return sizeCache;
01819 }
01820 
01821 time_t
01822 VDirSurrogate::timestamp()
01823      throw (SRPC::failure)
01824 {
01825     if (timestampCache == (time_t) -1) {
01826         DoStat(this, timestampCache, executableCache, sizeCache);
01827     }
01828     return timestampCache;
01829 }
01830 
01831 VestaSource::errorCode
01832 VDirSurrogate::write(const void* buffer, /*INOUT*/int* nbytes,
01833                      Basics::uint64 offset, AccessControl::Identity who)
01834      throw (SRPC::failure)
01835 {
01836     VestaSource::errorCode err;
01837     VDirSurrogateInit();
01838     SRPC* srpc;
01839     MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01840 
01841     srpc->start_call(VestaSourceSRPC::Write,
01842                      VestaSourceSRPC::version);
01843     srpc->send_bytes((const char*) &longid.value,
01844                      sizeof(longid.value));
01845     srpc->send_int(offset & 0xffffffff);
01846     srpc->send_int(offset >> 32ul);
01847     srpc->send_bytes((const char*)buffer, *nbytes);
01848     VestaSourceSRPC::send_identity(srpc, who);
01849     srpc->send_end();
01850     err = (VestaSource::errorCode) srpc->recv_int();
01851     if (err == VestaSource::ok) {
01852         *nbytes = srpc->recv_int();
01853         timestampCache = -1;
01854         sizeCache = (Basics::uint64) -1;
01855     }
01856     srpc->recv_end();
01857    VestaSourceSRPC::End(id);
01858     return err;
01859 }
01860 
01861 VestaSource::errorCode 
01862 VDirSurrogate::setExecutable(bool x, AccessControl::Identity who)
01863      throw (SRPC::failure)
01864 {
01865     VestaSource::errorCode err;
01866     VDirSurrogateInit();
01867     SRPC* srpc;
01868     MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01869 
01870     srpc->start_call(VestaSourceSRPC::SetExecutable,
01871                      VestaSourceSRPC::version);
01872     srpc->send_bytes((const char*) &longid.value,
01873                      sizeof(longid.value));
01874     srpc->send_int((int) x);
01875     VestaSourceSRPC::send_identity(srpc, who);
01876     srpc->send_end();
01877     err = (VestaSource::errorCode) srpc->recv_int();
01878     if (err == VestaSource::ok) {
01879         executableCache = x;
01880     }
01881     srpc->recv_end();
01882    VestaSourceSRPC::End(id);
01883     return err;
01884 }
01885 
01886 VestaSource::errorCode 
01887 VDirSurrogate::setSize(Basics::uint64 s, AccessControl::Identity who)
01888      throw (SRPC::failure)
01889 {
01890     VestaSource::errorCode err;
01891     VDirSurrogateInit();
01892     SRPC* srpc;
01893     MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01894 
01895     srpc->start_call(VestaSourceSRPC::SetSize,
01896                      VestaSourceSRPC::version);
01897     srpc->send_bytes((const char*) &longid.value,
01898                      sizeof(longid.value));
01899     srpc->send_int((int)(s & 0xffffffff));
01900     srpc->send_int((int)(s >> 32));
01901     VestaSourceSRPC::send_identity(srpc, who);
01902     srpc->send_end();
01903     err = (VestaSource::errorCode) srpc->recv_int();
01904     if (err == VestaSource::ok) {
01905         sizeCache = s;
01906         timestampCache = (time_t) -1;
01907     }
01908     srpc->recv_end();
01909    VestaSourceSRPC::End(id);
01910     return err;
01911 }
01912 
01913 VestaSource::errorCode
01914 VDirSurrogate::setTimestamp(time_t ts, AccessControl::Identity who)
01915      throw (SRPC::failure)
01916 {
01917     VestaSource::errorCode err;
01918     VDirSurrogateInit();
01919     SRPC* srpc;
01920     MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01921 
01922     srpc->start_call(VestaSourceSRPC::SetTimestamp,
01923                      VestaSourceSRPC::version);
01924     srpc->send_bytes((const char*) &longid.value,
01925                      sizeof(longid.value));
01926     srpc->send_int((int) ts);
01927     VestaSourceSRPC::send_identity(srpc, who);
01928     srpc->send_end();
01929     err = (VestaSource::errorCode) srpc->recv_int();
01930     srpc->recv_end();
01931     VestaSourceSRPC::End(id);
01932     if (err == VestaSource::ok) {
01933         timestampCache = ts;
01934     }
01935     return err;
01936 }
01937 
01938 VestaSource::errorCode
01939 VDirSurrogate::measureDirectory(/*OUT*/VestaSource::directoryStats &result,
01940                                 AccessControl::Identity who)
01941   throw (SRPC::failure)
01942 {
01943     VestaSource::errorCode err;
01944     VDirSurrogateInit();
01945     SRPC* srpc;
01946     MultiSRPC::ConnId id =VestaSourceSRPC::Start(srpc, host_, port_);
01947 
01948     srpc->start_call(VestaSourceSRPC::MeasureDirectory,
01949                      VestaSourceSRPC::version);
01950     srpc->send_bytes((const char*) &longid.value,
01951                      sizeof(longid.value));
01952     VestaSourceSRPC::send_identity(srpc, who); 
01953     srpc->send_end(); 
01954     err = (VestaSource::errorCode) srpc->recv_int();
01955     if (err == VestaSource::ok)
01956       {
01957         result.baseChainLength = srpc->recv_int32();
01958         result.usedEntryCount = srpc->recv_int32();
01959         result.usedEntrySize = srpc->recv_int32();
01960         result.totalEntryCount = srpc->recv_int32();
01961         result.totalEntrySize = srpc->recv_int32();
01962       }
01963     srpc->recv_end();
01964     VestaSourceSRPC::End(id);
01965     return err;
01966 }
01967 
01968 VestaSource::errorCode
01969 VDirSurrogate::collapseBase(AccessControl::Identity who)
01970   throw (SRPC::failure)
01971 {
01972     VestaSource::errorCode err;
01973     VDirSurrogateInit();
01974     SRPC* srpc;
01975     MultiSRPC::ConnId id = VestaSourceSRPC::Start(srpc, host_, port_);
01976 
01977     srpc->start_call(VestaSourceSRPC::CollapseBase,
01978                      VestaSourceSRPC::version);
01979     srpc->send_bytes((const char*) &longid.value,
01980                      sizeof(longid.value));
01981     VestaSourceSRPC::send_identity(srpc, who); 
01982     srpc->send_end(); 
01983     err = (VestaSource::errorCode) srpc->recv_int();
01984     srpc->recv_end();
01985     VestaSourceSRPC::End(id);
01986     return err;
01987 }
01988 
01989 VestaSource::errorCode
01990 VDirSurrogate::readWhole(std::ostream &out, AccessControl::Identity who)
01991   throw (SRPC::failure)
01992 {
01993   // Compression methods supported by the client (just zlib for now).
01994   static Basics::int16 compression_methods[] = {
01995     VestaSourceSRPC::compress_zlib_deflate
01996   };
01997   static unsigned int compression_method_count =
01998     (sizeof(compression_methods) / sizeof(compression_methods[0]));
01999 
02000   VestaSource::errorCode err;
02001   VDirSurrogateInit();
02002   SRPC* srpc;
02003   MultiSRPC::ConnId id;
02004 
02005   id = VestaSourceSRPC::Start(srpc, host_, port_);
02006 
02007   srpc->start_call(VestaSourceSRPC::ReadWholeCompressed,
02008                    VestaSourceSRPC::version);
02009   srpc->send_bytes((const char*) &longid.value,
02010                    sizeof(longid.value));
02011   VestaSourceSRPC::send_identity(srpc, who); 
02012   srpc->send_int16_array(compression_methods, compression_method_count);
02013   srpc->send_int32(readWhole_recv_bufsiz);
02014   srpc->send_end(); 
02015   err = (VestaSource::errorCode) srpc->recv_int();
02016   if (err == VestaSource::ok)
02017     {
02018       Basics::int16 method_used = srpc->recv_int16();
02019       if(method_used != VestaSourceSRPC::compress_zlib_deflate)
02020         srpc->send_failure(SRPC::not_implemented,
02021                            "unsupported compression method");
02022         
02023       // Buffers used to hold the compressed bytes we receive and the
02024       // uncompressed data given to us by zlib.
02025       char *recv_buf = 0, *inflate_buf = 0;
02026 
02027       // Set up to use zlib inflation
02028       z_stream zstrm;
02029       zstrm.zalloc = Z_NULL;
02030       zstrm.zfree = Z_NULL;
02031       zstrm.opaque = Z_NULL;
02032       zstrm.avail_in = 0;
02033       zstrm.next_in = Z_NULL;
02034       if (inflateInit(&zstrm) != Z_OK)
02035         srpc->send_failure(SRPC::internal_trouble,
02036                            "zlib inflateInit failed");
02037 
02038       try
02039         {
02040           srpc->recv_seq_start();
02041           recv_buf = NEW_PTRFREE_ARRAY(char, readWhole_recv_bufsiz);
02042           inflate_buf = NEW_PTRFREE_ARRAY(char, readWhole_inflate_bufsiz);
02043           while(1)
02044             {
02045               bool got_end;
02046               int count = readWhole_recv_bufsiz;
02047               srpc->recv_bytes_here(recv_buf, count, &got_end);
02048               // If this is the end of the sequence, we're done.
02049               if(got_end) break;
02050 
02051               // Feed the compressed bytes we received to zlib for
02052               // inflation.
02053               zstrm.next_in = (Bytef *) recv_buf;
02054               zstrm.avail_in = count;
02055               do
02056                 {
02057                   // zlib will inflate into this buffer.
02058                   zstrm.avail_out = readWhole_inflate_bufsiz;
02059                   zstrm.next_out = (Bytef *) inflate_buf;
02060 
02061                   // Uncompress some of the bytes we received.
02062                   int inf_status = inflate(&zstrm, Z_NO_FLUSH);
02063 
02064                   // Check for an error from inflate
02065                   switch (inf_status)
02066                     {
02067                       // If inflate fails in any way, we throw SRPC
02068                       // failure with a message including the zlib
02069                       // error code
02070 #define INFLATE_ECASE(val) case val:\
02071                   srpc->send_failure(SRPC::internal_trouble,\
02072                                      "zlib inflate error: " #val)
02073                       INFLATE_ECASE(Z_STREAM_ERROR);
02074                       INFLATE_ECASE(Z_NEED_DICT);
02075                       INFLATE_ECASE(Z_DATA_ERROR);
02076                       INFLATE_ECASE(Z_MEM_ERROR);
02077                     }
02078 
02079                   // Count the number of uncompressed bytes produced
02080                   // in the output buffer.
02081                   int bytes = readWhole_inflate_bufsiz - zstrm.avail_out;
02082                   if(bytes > 0)
02083                     // Write the bytes to the output stream.
02084                     out.write(inflate_buf, bytes);
02085                 }
02086               // Repeat until inflate can't fill the output buffer (which
02087               // means it doesn't have any more uncompressed data).
02088               while(zstrm.avail_out == 0);
02089             }
02090 
02091           // We've received all chunks of compressed bytes.
02092           srpc->recv_seq_end();
02093         }
02094       catch(...)
02095         {
02096           // Clean up in the event of an exception
02097           (void)inflateEnd(&zstrm);
02098           if(recv_buf) delete [] recv_buf;
02099           if(inflate_buf) delete [] inflate_buf;
02100           throw;
02101         }
02102 
02103       // Clean up in the event of an exception now that we're done
02104       // receiving compressed data.
02105       (void)inflateEnd(&zstrm);
02106       if(recv_buf) delete [] recv_buf;
02107       if(inflate_buf) delete [] inflate_buf;
02108     }
02109 
02110   srpc->recv_end();
02111   VestaSourceSRPC::End(id);
02112   return err;
02113 }
02114 
02115 VestaSource *VDirSurrogate::copy() throw()
02116 {
02117   VestaSource *result = NEW_CONSTR(VDirSurrogate, (*this));
02118   return result;
02119 }

Generated on Mon May 8 00:48:49 2006 for Vesta by  doxygen 1.4.2