Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

VestaSourceServer.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 //
00020 // VestaSourceServer.C
00021 //
00022 // Server stubs for VestaSourceSRPC interface
00023 // 
00024 
00025 #include <time.h>
00026 #include <assert.h>
00027 #include <zlib.h> // Used for ReadWholeCompressed, which sends
00028                   // compressed data to the client
00029 #include <SRPC.H>
00030 #include <Thread.H>
00031 #include <LimService.H>
00032 #include <OS.H>
00033 #include "VestaSourceSRPC.H"
00034 #include "VestaSource.H"
00035 #include "VDirVolatileRoot.H"
00036 #include "VRConcurrency.H"
00037 #include "VestaConfig.H"
00038 #include "VestaSourceServer.H"
00039 #include "FPShortId.H"
00040 #include "logging.H"
00041 #include "VestaSourceImpl.H"
00042 #include "Mastership.H"
00043 #include "Replication.H"
00044 #include "ReposStats.H"
00045 #include "FdCache.H"
00046 #include "dupe.H"
00047 
00048 #include "lock_timing.H"
00049 
00050 #include "nfsStats.H"
00051 
00052 #if defined(REPOS_PERF_DEBUG)
00053 #include "timing.H"
00054 #include "perfDebugControl.H"
00055 #endif
00056 
00057 #if defined(__linux__) && defined(__GNUC__) && (__GNUC__ < 3)
00058 // For some reason pread doesn't get declared on older systems
00059 extern "C" ssize_t pread(int fd, void *buf, size_t count, off_t offset);
00060 #endif
00061 
00062 #define DEF_MAX_RUNNING 32
00063 #define DEF_MAX_BLOCKED 8
00064 #define STACK_SIZE 256000L
00065 
00066 // ----------------------------------------------------------------------
00067 // SRPC call statistic helper code
00068 // ----------------------------------------------------------------------
00069 
00070 extern "C"
00071 {
00072   void SRPC_Call_Stats_Helper_stats_init() throw();
00073 }
00074 
00075 // An instance of this class gets created per SRPC servicing thread.
00076 // It records the number of calls and total time spent servicing all
00077 // calls over the lifetime of the thread.
00078 class SRPC_Call_Stats
00079 {
00080 private:
00081   // Single linked list of all instances of this class.
00082   static Basics::mutex head_mu;
00083   static SRPC_Call_Stats *head;
00084   SRPC_Call_Stats *next;
00085 
00086   // Protext local fields.
00087   Basics::mutex mu;
00088 
00089   // Statistics for one SRPC-servicing thread.
00090   Basics::uint64 call_count;
00091   Basics::uint64 elapsed_secs;
00092   Basics::uint32 elapsed_usecs;
00093 
00094   // Add the statistics from this instance into a running total.
00095   void accumulateStats(/*OUT*/ Basics::uint64 &calls,
00096                        /*OUT*/ Basics::uint64 &secs,
00097                        /*OUT*/ Basics::uint32 &usecs);
00098 
00099   // Record an SRPC call.
00100   void recordCall(Basics::uint32 secs, Basics::uint32 &usecs);
00101 
00102 public:
00103 
00104   SRPC_Call_Stats();
00105   ~SRPC_Call_Stats();
00106 
00107   static void getStats(/*OUT*/ Basics::uint64 &calls,
00108                        /*OUT*/ Basics::uint64 &secs,
00109                        /*OUT*/ Basics::uint32 &usecs);
00110 
00111   // Class whose creation and destruction mark the beginning and end
00112   // of a call.
00113   class Helper
00114   {
00115   private:
00116     static pthread_key_t stats_key;
00117     static pthread_once_t stats_once;
00118 
00119     friend void SRPC_Call_Stats_Helper_stats_init() throw();
00120 
00121     struct timeval call_start;
00122   public:
00123     Helper();
00124     ~Helper();
00125   };
00126 
00127   friend class Helper;
00128 };
00129 
00130 Basics::mutex SRPC_Call_Stats::head_mu;
00131 SRPC_Call_Stats *SRPC_Call_Stats::head = 0;
00132 
00133 pthread_key_t SRPC_Call_Stats::Helper::stats_key;
00134 pthread_once_t SRPC_Call_Stats::Helper::stats_once = PTHREAD_ONCE_INIT;
00135 
00136 void SRPC_Call_Stats::accumulateStats(/*OUT*/ Basics::uint64 &calls,
00137                                       /*OUT*/ Basics::uint64 &secs,
00138                                       /*OUT*/ Basics::uint32 &usecs)
00139 {
00140   this->mu.lock();
00141   calls += this->call_count;
00142   secs += this->elapsed_secs;
00143   usecs += this->elapsed_usecs;
00144   this->mu.unlock();
00145 }
00146 
00147 void SRPC_Call_Stats::recordCall(Basics::uint32 secs, Basics::uint32 &usecs)
00148 {
00149   this->mu.lock();
00150   // Record that another call has been completed.
00151   this->call_count++;
00152 
00153   // Add the time for this call to our running total.
00154   this->elapsed_secs += secs;
00155   this->elapsed_usecs += usecs;
00156   if(this->elapsed_usecs > USECS_PER_SEC)
00157     {
00158       this->elapsed_secs += this->elapsed_usecs/USECS_PER_SEC;
00159       this->elapsed_usecs %= USECS_PER_SEC;
00160     }
00161   this->mu.unlock();
00162 }
00163 
00164 void SRPC_Call_Stats::getStats(/*OUT*/ Basics::uint64 &calls,
00165                                /*OUT*/ Basics::uint64 &secs,
00166                                /*OUT*/ Basics::uint32 &usecs)
00167 {
00168   // Reset our parameters.
00169   calls = 0;
00170   secs = 0;
00171   usecs = 0;
00172 
00173   // Get the current list of instances
00174   SRPC_Call_Stats *list;
00175   SRPC_Call_Stats::head_mu.lock();
00176   list = SRPC_Call_Stats::head;
00177   SRPC_Call_Stats::head_mu.unlock();
00178 
00179   while(list != 0)
00180     {
00181       // Accumulate into our parameters
00182       list->accumulateStats(calls, secs, usecs);
00183 
00184       // Handle usecs overflow
00185       if(usecs > USECS_PER_SEC)
00186         {
00187           secs += usecs/USECS_PER_SEC;
00188           usecs %= USECS_PER_SEC;
00189         }
00190 
00191       // Advance to the next instance.  (Note: no lock required to
00192       // access this member variable as it's set at instantiation and
00193       // never changed.)
00194       list = list->next;
00195     }
00196 }
00197 
00198 SRPC_Call_Stats::SRPC_Call_Stats()
00199   : next(0), call_count(0), elapsed_secs(0), elapsed_usecs(0)
00200 {
00201   // Insert ourselves into the global list
00202   SRPC_Call_Stats::head_mu.lock();
00203   this->next = SRPC_Call_Stats::head;
00204   SRPC_Call_Stats::head = this;
00205   SRPC_Call_Stats::head_mu.unlock();
00206 }
00207 
00208 SRPC_Call_Stats::~SRPC_Call_Stats()
00209 {
00210   // We don't ever expect to be destroyed.
00211   assert(false);
00212 }
00213 
00214 extern "C"
00215 {
00216   void SRPC_Call_Stats_Helper_stats_init() throw()
00217   {
00218     int err = pthread_key_create(&SRPC_Call_Stats::Helper::stats_key, NULL);
00219     assert(err == 0);
00220   }
00221 }
00222 
00223 SRPC_Call_Stats::Helper::Helper()
00224 {
00225   // Save the time that this call started
00226   struct timezone unused_tz;
00227   int err = gettimeofday(&this->call_start, &unused_tz);
00228   assert(err == 0);
00229 }
00230 
00231 SRPC_Call_Stats::Helper::~Helper()
00232 {
00233   // Get the time that this call ended
00234   struct timezone unused_tz;
00235   struct timeval call_end;
00236   int err = gettimeofday(&call_end, &unused_tz);
00237   assert(err == 0);
00238 
00239   // Make sure the pthread_key is initialized
00240   pthread_once(&SRPC_Call_Stats::Helper::stats_once,
00241                SRPC_Call_Stats_Helper_stats_init);
00242 
00243   // Get or create the object that holds statistics for this SRPC
00244   // thread.
00245   SRPC_Call_Stats *thread_stats =
00246     ((SRPC_Call_Stats *)
00247      pthread_getspecific(SRPC_Call_Stats::Helper::stats_key));
00248   if(thread_stats == 0)
00249     {
00250       thread_stats = NEW(SRPC_Call_Stats);
00251 
00252       err = pthread_setspecific(SRPC_Call_Stats::Helper::stats_key,
00253                                 (void *) thread_stats);
00254       assert(err == 0);
00255     }
00256 
00257   // Compute the time taken by this call.
00258   Basics::uint32 call_secs, call_usecs;
00259   if(call_end.tv_sec == this->call_start.tv_sec)
00260     {
00261       call_secs = 0;
00262       if(call_end.tv_usec >= this->call_start.tv_usec)
00263         call_usecs = call_end.tv_usec - this->call_start.tv_usec;
00264       else
00265         // Time went backwards.  This can happen if the system time
00266         // gets adjusted.  Count this call as having taken 0 time.
00267         call_usecs = 0;
00268     }
00269   else if(call_end.tv_sec > this->call_start.tv_sec)
00270     {
00271       call_secs = (call_end.tv_sec - this->call_start.tv_sec) - 1;
00272       call_usecs = ((call_end.tv_usec + USECS_PER_SEC) -
00273                     this->call_start.tv_usec);
00274     }
00275   else
00276     {
00277       // Time went backwards.  This can happen if the system time gets
00278       // adjusted.  Count this call as having taken 0 time.
00279       call_secs = 0;
00280       call_usecs = 0;
00281     }
00282 
00283   // Record the time for this call in the stats recorder.
00284   thread_stats->recordCall(call_secs, call_usecs);
00285 }
00286 
00287 // ----------------------------------------------------------------------
00288 
00289 // By default we give a client a maximum of five minutes to complete a
00290 // call they make.  Of course normally it should be much less than
00291 // this.  The limit is there just to prevent a suspended or
00292 // misbehaving client from causing a server thread to be waiting
00293 // indefinitely.
00294 static unsigned int readTimeout = 300;
00295 
00296 // Unmarshal AccessControl::Identity
00297 AccessControl::Identity
00298 srpc_recv_identity(SRPC* srpc, int intf_ver, bool access_needed)
00299 {
00300   AccessControl::Identity ret = 0;
00301   sockaddr_in addr;
00302   srpc->socket()->get_remote_addr(addr);
00303 
00304   AccessControl::IdentityRep::Flavor flavor;
00305   if (intf_ver <= 10) {
00306     flavor = AccessControl::IdentityRep::unix_flavor;
00307   } else {
00308     flavor = (AccessControl::IdentityRep::Flavor) srpc->recv_int();
00309   }
00310   switch (flavor) {
00311   case AccessControl::IdentityRep::unix_flavor: {
00312     authunix_parms* aup = NEW(authunix_parms);
00313     aup->aup_time = (u_int) srpc->recv_int();
00314     aup->aup_machname = NEW_PTRFREE_ARRAY(char, MAX_MACHINE_NAME);
00315     int len = MAX_MACHINE_NAME;
00316     srpc->recv_chars_here(aup->aup_machname, len);
00317     aup->aup_uid = srpc->recv_int();
00318     aup->aup_gid = srpc->recv_int();
00319     srpc->recv_seq_start((int*) &aup->aup_len);
00320     if (aup->aup_len > NGRPS)
00321       srpc->send_failure(SRPC::buffer_too_small, "too many GIDs");
00322 #if __linux__
00323     aup->aup_gids = NEW_PTRFREE_ARRAY(__gid_t, NGRPS);
00324 #else
00325     aup->aup_gids = NEW_PTRFREE_ARRAY(int, NGRPS);
00326 #endif
00327     unsigned int i;
00328     for (i=0; i<aup->aup_len; i++) {
00329         aup->aup_gids[i] = srpc->recv_int();
00330     }
00331     srpc->recv_seq_end();
00332     ret = NEW_CONSTR(AccessControl::UnixIdentityRep, (aup, &addr));
00333     break; }
00334 
00335   case AccessControl::IdentityRep::global: {
00336     char* user = srpc->recv_chars();
00337     ret = NEW_CONSTR(AccessControl::GlobalIdentityRep, (user, &addr));
00338     break; }
00339 
00340   case AccessControl::IdentityRep::gssapi: {
00341     srpc->send_failure(SRPC::protocol_violation,
00342                        "gssapi identity flavor not implemented yet");
00343     break; }
00344 
00345   default:
00346     srpc->send_failure(SRPC::protocol_violation, "unknown identity flavor");
00347   }
00348   if (access_needed && !AccessControl::admit(ret)) {
00349     // Caller is not allowed access to this repository
00350     delete ret;
00351     srpc->send_failure(SRPC::invalid_parameter, "unauthorized user");
00352   }
00353   return ret;
00354 }
00355 
00356 // Server stub for Lookup
00357 static void
00358 VestaSourceLookup(SRPC* srpc, int intf_ver)
00359 {
00360     LongId longid;
00361     char arc[MAX_ARC_LEN+1];
00362     int len;
00363     VestaSource::errorCode err;
00364 
00365     // Receive the arguments
00366     len = sizeof(longid.value);
00367     srpc->recv_bytes_here((char *) &longid.value, len);
00368     len = MAX_ARC_LEN+1;
00369     srpc->recv_chars_here(arc, len);
00370     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00371     srpc->recv_end();
00372     
00373     // Do the work
00374     ReadersWritersLock* lock;
00375     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00376     try {
00377         if (vs == NULL) {
00378             err = VestaSource::invalidArgs;
00379         } else {
00380           RWLOCK_LOCKED_REASON(lock, "SRPC:lookup");
00381             err = VestaSource::ok;
00382         
00383             if (len > 0) {
00384                 VestaSource* childvs;
00385                 err = vs->lookup(arc, childvs, who);
00386                 delete vs;
00387                 vs = childvs;
00388             }
00389         }
00390     
00391         // Send the results
00392         srpc->send_int((int) err);
00393         if (err == VestaSource::ok) {
00394             srpc->send_int((int) vs->type);
00395             srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00396             srpc->send_int((int) vs->master);
00397             srpc->send_int((int) vs->pseudoInode);
00398             srpc->send_int((int) vs->shortId());
00399             srpc->send_int((int) vs->timestamp());
00400             srpc->send_int((int) (vs->hasAttribs() ||
00401                                   MutableRootLongId.isAncestorOf(vs->longid)));
00402             vs->fptag.Send(*srpc);
00403             delete vs;
00404         }
00405     } catch (SRPC::failure f) {
00406         if (lock != NULL) lock->releaseRead();
00407         delete who;
00408         throw;
00409     }
00410     if (lock != NULL) lock->releaseRead();
00411     delete who;
00412     srpc->send_end();
00413 }
00414 
00415 
00416 // Server stub for LookupPathname
00417 static void
00418 VestaSourceLookupPathname(SRPC* srpc, int intf_ver)
00419 {
00420   LongId longid;
00421   char* pathname;
00422   char pathnameSep;
00423   VestaSource::errorCode err;
00424   int len;
00425     
00426   // Receive the arguments
00427   len = sizeof(longid.value);
00428   srpc->recv_bytes_here((char *) &longid.value, len);
00429   pathname = srpc->recv_chars();
00430   AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00431   pathnameSep = (char) srpc->recv_int();
00432   srpc->recv_end();
00433     
00434   // Do the work
00435   ReadersWritersLock* lock;
00436   VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00437   try {
00438     if (vs == NULL) {
00439       err = VestaSource::invalidArgs;
00440     } else {
00441       RWLOCK_LOCKED_REASON(lock, "SRPC:lookupPathname");
00442       VestaSource* childvs;
00443       err = vs->lookupPathname(pathname, childvs, who, pathnameSep);
00444       delete vs;
00445       vs = childvs;
00446     }
00447     
00448     // Send the results
00449     srpc->send_int((int) err);
00450     if (err == VestaSource::ok) {
00451       srpc->send_int((int) vs->type);
00452       srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00453       srpc->send_int((int) vs->master);
00454       srpc->send_int((int) vs->pseudoInode);
00455       srpc->send_int((int) vs->shortId());
00456       srpc->send_int((int) vs->timestamp());
00457       srpc->send_int((int) (vs->hasAttribs() ||
00458                             MutableRootLongId.isAncestorOf(vs->longid)));
00459       vs->fptag.Send(*srpc);
00460       delete vs;
00461     }
00462   } catch (SRPC::failure f) {
00463     if (lock != NULL) lock->releaseRead();
00464     delete [] pathname;
00465     delete who;
00466     throw;    
00467   }
00468   if (lock != NULL) lock->releaseRead();
00469   delete [] pathname;
00470   delete who;
00471   srpc->send_end();
00472 }
00473 
00474 
00475 // Server stub for LookupIndex
00476 static void
00477 VestaSourceLookupIndex(SRPC* srpc, int intf_ver)
00478 {
00479     LongId longid;
00480     unsigned int index;
00481     VestaSource::errorCode err;
00482     int len;
00483     char arcbuf[MAX_ARC_LEN+1];
00484     bool sendarc;
00485     
00486     // Receive the arguments
00487     len = sizeof(longid.value);
00488     srpc->recv_bytes_here((char *) &longid.value, len);
00489     index = (unsigned int) srpc->recv_int();
00490     sendarc = (bool) srpc->recv_int();
00491     srpc->recv_end();
00492     
00493     // Do the work
00494     ReadersWritersLock* lock;
00495     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00496     try {
00497         if (vs == NULL) {
00498             err = VestaSource::invalidArgs;
00499         } else {
00500           RWLOCK_LOCKED_REASON(lock, "SRPC:lookupIndex");
00501             VestaSource* childvs;
00502             err = vs->lookupIndex(index, childvs, sendarc ? arcbuf : NULL);
00503             delete vs;
00504             vs = childvs;
00505         }
00506     
00507         // Send the results
00508         srpc->send_int((int) err);
00509         if (err == VestaSource::ok) {
00510             srpc->send_int((int) vs->type);
00511             srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00512             srpc->send_int((int) vs->master);
00513             srpc->send_int((int) vs->pseudoInode);
00514             srpc->send_int((int) vs->shortId());
00515             srpc->send_int((int) vs->timestamp());
00516             srpc->send_int((int) (vs->hasAttribs() ||
00517                                   MutableRootLongId.isAncestorOf(vs->longid)));
00518             vs->fptag.Send(*srpc);
00519             delete vs;
00520             if (sendarc) srpc->send_chars(arcbuf);
00521         }
00522     } catch (SRPC::failure f) {
00523         if (lock != NULL) lock->releaseRead();
00524         throw;
00525     } 
00526     if (lock != NULL) lock->releaseRead();
00527     srpc->send_end();
00528 }
00529 
00530 
00531 // Server stub for CreateVolatileDirectory
00532 static void
00533 VestaSourceCreateVolatileDirectory(SRPC* srpc, int intf_ver)
00534 {
00535     char host[MAX_ARC_LEN+1];
00536     char port[MAX_ARC_LEN+1];
00537     int len;
00538     VestaSource::errorCode err;
00539     Bit64 handle;
00540     time_t timestamp;
00541     bool readOnlyExisting;
00542 
00543     // Receive the arguments
00544     len = MAX_ARC_LEN+1;
00545     srpc->recv_chars_here(host, len);
00546     len = MAX_ARC_LEN+1;
00547     srpc->recv_chars_here(port, len);
00548     len = sizeof(handle);
00549     srpc->recv_bytes_here((char*) &handle, len);
00550     timestamp = (time_t) srpc->recv_int();
00551     readOnlyExisting = (bool) srpc->recv_int();
00552     srpc->recv_end();
00553     
00554     // Do the work
00555     VestaSource* vs;
00556     ReadersWritersLock* lock;
00557     err = ((VDirVolatileRoot*) VestaSource::volatileRoot())->
00558       createVolatileDirectory(host, port, handle, vs, timestamp,
00559                               LongId::readLock, &lock, readOnlyExisting);
00560     try {
00561         // Send the results
00562         srpc->send_int((int) err);
00563         if (err == VestaSource::ok) {
00564             srpc->send_int((int) vs->type);
00565             srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00566             srpc->send_int((int) vs->master);
00567             srpc->send_int((int) vs->pseudoInode);
00568             srpc->send_int((int) vs->shortId());
00569             srpc->send_int((int) vs->timestamp());
00570             srpc->send_int((int) vs->hasAttribs());
00571             vs->fptag.Send(*srpc);
00572             delete vs;
00573         }
00574     } catch (SRPC::failure f) {
00575         if (lock != NULL) lock->releaseRead();
00576         throw;
00577     } 
00578     if (lock != NULL) lock->releaseRead();
00579     srpc->send_end();
00580 }
00581 
00582 // Server stub for DeleteVolatileDirectory
00583 static void
00584 VestaSourceDeleteVolatileDirectory(SRPC* srpc, int intf_ver)
00585 {
00586     LongId longid, plongid;
00587     int len;
00588     VestaSource::errorCode err;
00589     
00590     // Receive the arguments
00591     len = sizeof(longid.value);
00592     srpc->recv_bytes_here((char *) &longid.value, len);
00593     srpc->recv_end();
00594     
00595     // Do the work
00596     unsigned int index;
00597     plongid = longid.getParent(&index);
00598     if (memcmp(&plongid, &VolatileRootLongId, sizeof(LongId)) != 0) {
00599         err = VestaSource::inappropriateOp; // client coding error!
00600     } else {
00601         ReadersWritersLock *lock;
00602         err = ((VDirVolatileRoot*)
00603                VestaSource::volatileRoot(LongId::writeLock, &lock))->
00604                  deleteIndex(index);
00605         RWLOCK_LOCKED_REASON(lock, "SRPC:DeleteVolatileDirectory");
00606         lock->releaseWrite();
00607     }
00608     
00609     // Send the results
00610     srpc->send_int((int) err);
00611     srpc->send_end();
00612 }
00613 
00614 // Server stub for getBase
00615 static void
00616 VestaSourceGetBase(SRPC* srpc, int intf_ver)
00617 {
00618   LongId longid;        
00619   int len;
00620   VestaSource::errorCode err;
00621 
00622   // Receive the arguments
00623   len = sizeof(longid.value);
00624   srpc->recv_bytes_here((char *) &longid.value, len);
00625   AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00626   srpc->recv_end();
00627     
00628   // Do the work
00629   ReadersWritersLock* lock;
00630   VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00631   try {
00632     VestaSource* mvs;
00633     if (vs == NULL) {
00634       err = VestaSource::invalidArgs;
00635     } else {
00636       RWLOCK_LOCKED_REASON(lock, "SRPC:GetBase");
00637       err = vs->getBase(mvs, who);
00638       delete vs;
00639     }
00640     // Send the results
00641     srpc->send_int((int) err);
00642     if (err == VestaSource::ok) {
00643       srpc->send_int((int) mvs->type);
00644       srpc->send_bytes((const char *) &mvs->longid, sizeof(mvs->longid));
00645       srpc->send_int((int) mvs->master);
00646       srpc->send_int((int) mvs->pseudoInode);
00647       srpc->send_int((int) mvs->shortId());
00648       srpc->send_int((int) mvs->timestamp());
00649       srpc->send_int((int) mvs->hasAttribs());
00650       mvs->fptag.Send(*srpc);
00651       delete mvs;
00652     }
00653   } catch (SRPC::failure f) {
00654     if (lock != NULL) lock->releaseRead();
00655     delete who;
00656     throw;
00657   }
00658   if (lock != NULL) lock->releaseRead();
00659   delete who;
00660   srpc->send_end();
00661 }
00662 
00663 
00664 struct VSLClosure {
00665     SRPC* srpc;
00666     int cost, limit, overhead;
00667     int intf_ver;
00668 };
00669 
00670 // Internal callback for List stub below
00671 static bool
00672 VSLCallback(void* closure, VestaSource::typeTag type, Arc arc,
00673             unsigned int index, Bit32 pseudoInode, ShortId filsid, bool master)
00674 {
00675     VSLClosure* cl = (VSLClosure*) closure;
00676     cl->cost += cl->overhead + strlen(arc);
00677     if (cl->cost > cl->limit) return false; // no room for this entry
00678     cl->srpc->send_chars(arc);
00679     cl->srpc->send_int((int) type);
00680     cl->srpc->send_int((int) index);
00681     if (cl->intf_ver >= 10) {
00682       cl->srpc->send_int((int) pseudoInode);
00683       cl->srpc->send_int((int) filsid);
00684     } else {
00685       // Rough backward compatibility
00686       cl->srpc->send_int(((int)filsid) ? ((int)filsid) : ((int)pseudoInode));
00687     }
00688     cl->srpc->send_int((int) master);
00689     return true;
00690 }
00691 
00692 
00693 // Server stub for List
00694 static void
00695 VestaSourceList(SRPC* srpc, int intf_ver)
00696 {
00697     LongId longid;
00698     unsigned int sindex;
00699     bool delta;
00700     int limit, overhead;
00701     VestaSource::errorCode err;
00702     VSLClosure cl;
00703     int len;
00704     
00705     // Receive the arguments
00706     len = sizeof(longid.value);
00707     srpc->recv_bytes_here((char *) &longid.value, len);
00708     sindex = (unsigned int) srpc->recv_int();
00709     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00710     delta = (bool) srpc->recv_int();
00711     limit = srpc->recv_int();
00712     overhead = srpc->recv_int();
00713     srpc->recv_end();
00714     
00715     // Do the work and send the results
00716     srpc->send_seq_start();
00717     ReadersWritersLock* lock;
00718     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00719     try {
00720         if (vs == NULL) {
00721             err = VestaSource::notFound;
00722         } else {
00723           RWLOCK_LOCKED_REASON(lock, "SRPC:List");
00724             cl.srpc = srpc;
00725             cl.cost = 0;
00726             cl.limit = limit;
00727             cl.overhead = overhead;
00728             cl.intf_ver = intf_ver;
00729             err = vs->list(sindex, VSLCallback, &cl, who, delta, 0);
00730             delete vs;
00731         }
00732         if (err != VestaSource::ok || cl.cost <= cl.limit) {
00733             // Error or the real end of the directory; 
00734             // send terminating entry, including error code.
00735             srpc->send_chars("");
00736             srpc->send_int((int) VestaSource::unused);
00737             srpc->send_int((int) err);
00738         }
00739     } catch (SRPC::failure f) {
00740         if (vs != NULL) delete vs;
00741         if (lock != NULL) lock->releaseRead();
00742         delete who;
00743         throw;
00744     }
00745     if (lock != NULL) lock->releaseRead();
00746     delete who;
00747     srpc->send_seq_end();
00748     srpc->send_end();
00749 }
00750 
00751 
00752 // Server stub for GetNFSInfo
00753 static void
00754 VestaSourceGetNFSInfo(SRPC* srpc, int intf_ver)
00755 {
00756     VestaSource::errorCode err;
00757     extern char* MyNFSSocket; // set by NFS server initialization
00758     
00759     // No arguments to receive
00760     srpc->recv_end();
00761     
00762     // Send the results
00763     srpc->send_chars(MyNFSSocket);
00764     srpc->send_bytes((const char *) &RootLongId, sizeof(LongId));
00765     srpc->send_bytes((const char *) &MutableRootLongId, sizeof(LongId));
00766     srpc->send_end();
00767 }
00768 
00769 
00770 // Server stub for GetNFSInfo
00771 static void
00772 VestaSourceFPToShortId(SRPC* srpc, int intf_ver)
00773 {
00774     VestaSource::errorCode err;
00775     FP::Tag fptag;
00776     ShortId sid;
00777 
00778     fptag.Recv(*srpc);
00779     srpc->recv_end();
00780 
00781     // Do the work
00782     sid = GetFPShortId(fptag);
00783 
00784     srpc->send_int((int)sid);
00785     srpc->send_end();
00786 }
00787 
00788 static void
00789 VestaSourceReallyDelete(SRPC* srpc, int intf_ver)
00790 {
00791     int len;
00792     LongId longid;
00793     char arc[MAX_ARC_LEN+1];
00794     bool existCheck;
00795     time_t timestamp;
00796     VestaSource::errorCode err;
00797     
00798     // Receive the arguments
00799     len = sizeof(longid.value);
00800     srpc->recv_bytes_here((char *) &longid.value, len);
00801     len = MAX_ARC_LEN+1;
00802     srpc->recv_chars_here(arc, len);
00803     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00804     existCheck = (bool) srpc->recv_int();
00805     timestamp = (time_t) srpc->recv_int();
00806     srpc->recv_end();
00807     
00808     // Do the work
00809     ReadersWritersLock* lock;
00810     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
00811     if (vs == NULL) {
00812       err = VestaSource::invalidArgs;
00813     } else {
00814       RWLOCK_LOCKED_REASON(lock, "SRPC:reallyDelete");
00815       err = vs->reallyDelete(arc, who, existCheck, timestamp);
00816       delete vs;
00817     }
00818     if (lock != NULL) lock->releaseWrite();
00819     delete who;
00820     
00821     // Send the results
00822     srpc->send_int((int) err);
00823     srpc->send_end();
00824 }
00825 
00826 static void
00827 VestaSourceInsertFile(SRPC* srpc, int intf_ver)
00828 {
00829     int len;
00830     LongId longid, rlongid;
00831     char arc[MAX_ARC_LEN+1];
00832     ShortId sid;
00833     bool master;
00834     VestaSource::dupeCheck chk;
00835     time_t timestamp;
00836     VestaSource::errorCode err;
00837     FP::Tag fptag;
00838     
00839     // Receive the arguments
00840     len = sizeof(longid.value);
00841     srpc->recv_bytes_here((char *) &longid.value, len);
00842     len = MAX_ARC_LEN+1;
00843     srpc->recv_chars_here(arc, len);
00844     sid = (ShortId) srpc->recv_int();
00845     master = (bool) srpc->recv_int();
00846     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00847     chk = (VestaSource::dupeCheck) srpc->recv_int();
00848     timestamp = (time_t) srpc->recv_int();
00849 
00850     len = FP::ByteCnt;
00851     unsigned char fpbytes[FP::ByteCnt];
00852     srpc->recv_bytes_here((char*)fpbytes, len);
00853     bool have_fp = (len == FP::ByteCnt);
00854     if(have_fp)
00855       {
00856         fptag.FromBytes(fpbytes);
00857       }
00858 
00859     srpc->recv_end();
00860     
00861     // Do the work
00862     ReadersWritersLock* lock;
00863     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
00864     VestaSource* newvs;
00865     if (vs == NULL) {
00866         err = VestaSource::invalidArgs;
00867     } else {
00868       RWLOCK_LOCKED_REASON(lock, "SRPC:insertFile");
00869         err = vs->insertFile(arc, sid, master, who, chk, &newvs, timestamp,
00870                              have_fp ? &fptag : NULL);
00871         delete vs;
00872     }
00873     if (lock != NULL) lock->releaseWrite();
00874     delete who;
00875     
00876     // Send the results
00877     srpc->send_int((int) err);
00878     if (err == VestaSource::ok) {
00879         srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
00880         srpc->send_int((int) newvs->pseudoInode);
00881         newvs->fptag.Send(*srpc);
00882         srpc->send_int((int) newvs->shortId());
00883         delete newvs;
00884     }
00885     srpc->send_end();
00886 }
00887 
00888 static void
00889 VestaSourceInsertMutableFile(SRPC* srpc, int intf_ver)
00890 {
00891     int len;
00892     LongId longid, rlongid;
00893     char arc[MAX_ARC_LEN+1];
00894     ShortId sid;
00895     bool master;
00896     VestaSource::dupeCheck chk;
00897     time_t timestamp;
00898     VestaSource::errorCode err;
00899     
00900     // Receive the arguments
00901     len = sizeof(longid.value);
00902     srpc->recv_bytes_here((char *) &longid.value, len);
00903     len = MAX_ARC_LEN+1;
00904     srpc->recv_chars_here(arc, len);
00905     sid = (ShortId) srpc->recv_int();
00906     master = (bool) srpc->recv_int();
00907     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00908     chk = (VestaSource::dupeCheck) srpc->recv_int();
00909     timestamp = (time_t) srpc->recv_int();
00910     srpc->recv_end();
00911     
00912     // Do the work
00913     ReadersWritersLock* lock;
00914     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
00915     VestaSource* newvs;
00916     if (vs == NULL) {
00917         err = VestaSource::invalidArgs;
00918     } else {
00919       RWLOCK_LOCKED_REASON(lock, "SRPC:insertMutableFile");
00920         err = vs->insertMutableFile(arc, sid, master, who, chk, &newvs,
00921                                     timestamp);
00922         delete vs;
00923     }
00924     if (lock != NULL) lock->releaseWrite();
00925     delete who;
00926     
00927     // Send the results
00928     srpc->send_int((int) err);
00929     if (err == VestaSource::ok) {
00930         srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
00931         srpc->send_int((int) newvs->pseudoInode);
00932         newvs->fptag.Send(*srpc);
00933         srpc->send_int((int) newvs->shortId());
00934         delete newvs;
00935     }
00936     srpc->send_end();
00937 }
00938 
00939 static void
00940 VestaSourceInsertImmutableDirectory(SRPC* srpc, int intf_ver)
00941 {
00942     int len;
00943     LongId longid, dlongid, rlongid;
00944     char arc[MAX_ARC_LEN+1];
00945     bool master;
00946     VestaSource::dupeCheck chk;
00947     time_t timestamp;
00948     VestaSource::errorCode err;
00949     FP::Tag fptag;
00950     
00951     // Receive the arguments
00952     len = sizeof(longid.value);
00953     srpc->recv_bytes_here((char *) &longid.value, len);
00954     len = MAX_ARC_LEN+1;
00955     srpc->recv_chars_here(arc, len);
00956     len = sizeof(dlongid.value);
00957     srpc->recv_bytes_here((char *) &dlongid.value, len);
00958     master = (bool) srpc->recv_int();
00959     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00960     chk = (VestaSource::dupeCheck) srpc->recv_int();
00961     timestamp = (time_t) srpc->recv_int();
00962 
00963     len = FP::ByteCnt;
00964     unsigned char fpbytes[FP::ByteCnt];
00965     srpc->recv_bytes_here((char*)fpbytes, len);
00966     bool have_fp = (len == FP::ByteCnt);
00967     if(have_fp)
00968       {
00969         fptag.FromBytes(fpbytes);
00970       }
00971 
00972     srpc->recv_end();
00973     
00974     // Do the work
00975     ReadersWritersLock* vlock = NULL;
00976     VestaSource* vs;
00977     ReadersWritersLock* lock;
00978     if (VolatileRootLongId.isAncestorOf(longid)) {
00979       // Must retain VolatileRootLock.read across both longid lookups,
00980       // not acquire and release it within the first.
00981       vlock = &VolatileRootLock;
00982       vlock->acquireRead();
00983       RWLOCK_LOCKED_REASON(vlock,
00984                            "SRPC:insertImmutableDirectory (in volatile)");
00985       vs = longid.lookup(LongId::writeLockV, &lock);
00986     } else {
00987       vs = longid.lookup(LongId::writeLock, &lock);
00988     }      
00989     VestaSource* newvs;
00990     if (vs == NULL) {
00991         err = VestaSource::invalidArgs;
00992         if (vlock != NULL) vlock->releaseRead();
00993     } else {
00994       RWLOCK_LOCKED_REASON(lock, "SRPC:insertImmutableDirectory");
00995         VestaSource* dirvs = dlongid.lookup(LongId::checkLock, &lock);
00996         if (vlock != NULL) vlock->releaseRead();
00997         err = vs->insertImmutableDirectory(arc, dirvs, master, who, chk,
00998                                            &newvs, timestamp,
00999                                            have_fp ? &fptag : NULL);
01000         delete vs;
01001         if (dirvs != NULL) delete dirvs;
01002     }
01003     if (lock != NULL) lock->releaseWrite();
01004     delete who;
01005     
01006     // Send the results
01007     srpc->send_int((int) err);
01008     if (err == VestaSource::ok) {
01009         srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01010         srpc->send_int((int) newvs->pseudoInode);
01011         newvs->fptag.Send(*srpc);
01012         srpc->send_int(newvs->shortId());
01013         delete newvs;
01014     }
01015     srpc->send_end();
01016 }
01017 
01018 static void
01019 VestaSourceInsertAppendableDirectory(SRPC* srpc, int intf_ver)
01020 {
01021     int len;
01022     LongId longid, rlongid;
01023     char arc[MAX_ARC_LEN+1];
01024     bool master;
01025     VestaSource::dupeCheck chk;
01026     time_t timestamp;
01027     VestaSource::errorCode err;
01028     
01029     // Receive the arguments
01030     len = sizeof(longid.value);
01031     srpc->recv_bytes_here((char *) &longid.value, len);
01032     len = MAX_ARC_LEN+1;
01033     srpc->recv_chars_here(arc, len);
01034     master = (bool) srpc->recv_int();
01035     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01036     chk = (VestaSource::dupeCheck) srpc->recv_int();
01037     timestamp = (time_t) srpc->recv_int();
01038     srpc->recv_end();
01039     
01040     // Do the work
01041     ReadersWritersLock* lock;
01042     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01043     VestaSource* newvs;
01044     if (vs == NULL) {
01045         err = VestaSource::invalidArgs;
01046     } else {
01047       RWLOCK_LOCKED_REASON(lock, "SRPC:insertAppendableDirectory");
01048         err = vs->insertAppendableDirectory(arc, master, who, chk,
01049                                             &newvs, timestamp);
01050         if((err == VestaSource::ok) && master && !vs->master &&
01051            !myMasterHint.Empty())
01052           {
01053             // Inserting a master appendable directory into a
01054             // non-master directory (probably a top-leve directory
01055             // under /vesta): add a master-repository attribute to the
01056             // new directory.
01057             newvs->setAttrib("master-repository", myMasterHint.cchars(), NULL);
01058           }
01059         delete vs;
01060     }
01061     if (lock != NULL) lock->releaseWrite();
01062     delete who;
01063     
01064     // Send the results
01065     srpc->send_int((int) err);
01066     if (err == VestaSource::ok) {
01067         srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01068         srpc->send_int((int) newvs->pseudoInode);
01069         newvs->fptag.Send(*srpc);
01070         delete newvs;
01071     }
01072     srpc->send_end();
01073 }
01074 
01075 static void
01076 VestaSourceInsertMutableDirectory(SRPC* srpc, int intf_ver)
01077 {
01078     int len;
01079     LongId longid, dlongid, rlongid;
01080     char arc[MAX_ARC_LEN+1];
01081     bool master;
01082     VestaSource::dupeCheck chk;
01083     time_t timestamp;
01084     VestaSource::errorCode err;
01085     
01086     // Receive the arguments
01087     len = sizeof(longid.value);
01088     srpc->recv_bytes_here((char *) &longid.value, len);
01089     len = MAX_ARC_LEN+1;
01090     srpc->recv_chars_here(arc, len);
01091     len = sizeof(dlongid.value);
01092     srpc->recv_bytes_here((char *) &dlongid.value, len);
01093     master = (bool) srpc->recv_int();
01094     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01095     chk = (VestaSource::dupeCheck) srpc->recv_int();
01096     timestamp = (time_t) srpc->recv_int();
01097     srpc->recv_end();
01098     
01099     // Do the work
01100     ReadersWritersLock* vlock = NULL;
01101     VestaSource* vs;
01102     ReadersWritersLock* lock;
01103     if (VolatileRootLongId.isAncestorOf(longid)) {
01104       // Must retain VolatileRootLock.read across both longid lookups,
01105       // not acquire and release it within the first.
01106       vlock = &VolatileRootLock;
01107       vlock->acquireRead();
01108       RWLOCK_LOCKED_REASON(vlock, "SRPC:insertMutableDirectory (in volatile)");
01109       vs = longid.lookup(LongId::writeLockV, &lock);
01110     } else {
01111       vs = longid.lookup(LongId::writeLock, &lock);
01112     }      
01113     VestaSource* newvs;
01114     if (vs == NULL) {
01115         err = VestaSource::invalidArgs;
01116         if (vlock != NULL) vlock->releaseRead();
01117     } else {
01118       RWLOCK_LOCKED_REASON(lock, "SRPC:insertMutableDirectory");
01119         VestaSource* dirvs = dlongid.lookup(LongId::checkLock, &lock);
01120         if (vlock != NULL) vlock->releaseRead();
01121         err = vs->insertMutableDirectory(arc, dirvs, master, who, chk,
01122                                          &newvs, timestamp);
01123         delete vs;
01124         if (dirvs != NULL) delete dirvs;
01125     }
01126     if (lock != NULL) lock->releaseWrite();
01127     delete who;
01128     
01129     // Send the results
01130     srpc->send_int((int) err);
01131     if (err == VestaSource::ok) {
01132         srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01133         srpc->send_int((int) newvs->pseudoInode);
01134         newvs->fptag.Send(*srpc);
01135         delete newvs;
01136     }
01137     srpc->send_end();
01138 }
01139 
01140 static void
01141 VestaSourceInsertGhost(SRPC* srpc, int intf_ver)
01142 {
01143     int len;
01144     LongId longid, rlongid;
01145     char arc[MAX_ARC_LEN+1];
01146     bool master;
01147     VestaSource::dupeCheck chk;
01148     time_t timestamp;
01149     VestaSource::errorCode err;
01150     
01151     // Receive the arguments
01152     len = sizeof(longid.value);
01153     srpc->recv_bytes_here((char *) &longid.value, len);
01154     len = MAX_ARC_LEN+1;
01155     srpc->recv_chars_here(arc, len);
01156     master = (bool) srpc->recv_int();
01157     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01158     chk = (VestaSource::dupeCheck) srpc->recv_int();
01159     timestamp = (time_t) srpc->recv_int();
01160     srpc->recv_end();
01161     
01162     // Do the work
01163     ReadersWritersLock* lock;
01164     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01165     VestaSource* newvs;
01166     if (vs == NULL) {
01167         err = VestaSource::invalidArgs;
01168     } else {
01169       RWLOCK_LOCKED_REASON(lock, "SRPC:insertGhost");
01170         err = vs->insertGhost(arc, master, who, chk, &newvs, timestamp);
01171         delete vs;
01172     }
01173     if (lock != NULL) lock->releaseWrite();
01174     delete who;
01175     
01176     // Send the results
01177     srpc->send_int((int) err);
01178     if (err == VestaSource::ok) {
01179         srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01180         srpc->send_int((int) newvs->pseudoInode);
01181         newvs->fptag.Send(*srpc);
01182         delete newvs;
01183     }
01184     srpc->send_end();
01185 }
01186 
01187 static void
01188 VestaSourceInsertStub(SRPC* srpc, int intf_ver)
01189 {
01190     int len;
01191     LongId longid, rlongid;
01192     char arc[MAX_ARC_LEN+1];
01193     bool master;
01194     VestaSource::dupeCheck chk;
01195     time_t timestamp;
01196     VestaSource::errorCode err;
01197     
01198     // Receive the arguments
01199     len = sizeof(longid.value);
01200     srpc->recv_bytes_here((char *) &longid.value, len);
01201     len = MAX_ARC_LEN+1;
01202     srpc->recv_chars_here(arc, len);
01203     master = (bool) srpc->recv_int();
01204     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01205     chk = (VestaSource::dupeCheck) srpc->recv_int();
01206     timestamp = (time_t) srpc->recv_int();
01207     srpc->recv_end();
01208     
01209     // Do the work
01210     ReadersWritersLock* lock;
01211     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01212     VestaSource* newvs;
01213     if (vs == NULL) {
01214         err = VestaSource::invalidArgs;
01215     } else {
01216       RWLOCK_LOCKED_REASON(lock, "SRPCinsertStub");
01217         err = vs->insertStub(arc, master, who, chk, &newvs, timestamp);
01218         delete vs;
01219     }
01220     if (lock != NULL) lock->releaseWrite();
01221     delete who;
01222     
01223     // Send the results
01224     srpc->send_int((int) err);
01225     if (err == VestaSource::ok) {
01226         srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01227         srpc->send_int((int) newvs->pseudoInode);
01228         newvs->fptag.Send(*srpc);
01229         delete newvs;
01230     }
01231     srpc->send_end();
01232 }
01233 
01234 static void
01235 VestaSourceRenameTo(SRPC* srpc, int intf_ver)
01236 {
01237     int len;
01238     LongId longid, flongid, rlongid;
01239     char arc[MAX_ARC_LEN+1], fromArc[MAX_ARC_LEN+1];
01240     VestaSource::dupeCheck chk;
01241     time_t timestamp;
01242     VestaSource::errorCode err;
01243     
01244     // Receive the arguments
01245     len = sizeof(longid.value);
01246     srpc->recv_bytes_here((char *) &longid.value, len);
01247     len = MAX_ARC_LEN+1;
01248     srpc->recv_chars_here(arc, len);
01249     len = sizeof(flongid.value);
01250     srpc->recv_bytes_here((char *) &flongid.value, len);
01251     len = MAX_ARC_LEN+1;
01252     srpc->recv_chars_here(fromArc, len);
01253     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01254     chk = (VestaSource::dupeCheck) srpc->recv_int();
01255     timestamp = (time_t) srpc->recv_int();
01256     srpc->recv_end();
01257     
01258     // Do the work
01259     ReadersWritersLock* vlock = NULL;
01260     ReadersWritersLock* lock;
01261     VestaSource* vs;
01262     if (VolatileRootLongId.isAncestorOf(longid)) {
01263       // Must retain VolatileRootLock.read across both longid lookups,
01264       // not acquire and release it within the first.
01265       vlock = &VolatileRootLock;
01266       vlock->acquireRead();
01267       RWLOCK_LOCKED_REASON(vlock, "SRPC:renameTo (in volatile)");
01268       vs = longid.lookup(LongId::writeLockV, &lock);
01269     } else {
01270       vs = longid.lookup(LongId::writeLock, &lock);
01271     }      
01272     if(lock != 0)
01273       {
01274         RWLOCK_LOCKED_REASON(lock, "SRPC:renameTo");
01275       }
01276     VestaSource* fvs = flongid.lookup(LongId::checkLock, &lock);
01277     if (vlock != NULL) vlock->releaseRead();
01278     if (vs == NULL || fvs == NULL) {
01279         err = VestaSource::invalidArgs;
01280     } else {
01281         err = vs->renameTo(arc, fvs, fromArc, who, chk, timestamp);
01282     }
01283     delete who;
01284     if (vs != NULL) delete vs;
01285     if (fvs != NULL) delete fvs;
01286     if (lock != NULL) lock->releaseWrite();
01287     
01288     // Send the results
01289     srpc->send_int((int) err);
01290     srpc->send_end();
01291 }
01292 
01293 // Server stub for MakeMutable
01294 static void
01295 VestaSourceMakeMutable(SRPC* srpc, int intf_ver)
01296 {
01297     int len;
01298     LongId longid;
01299     ShortId sid;
01300     VestaSource::errorCode err;
01301     AccessControl::Identity who;
01302     Basics::uint64 copyMax;
01303 
01304     // Receive the arguments
01305     len = sizeof(longid.value);
01306     srpc->recv_bytes_here((char *) &longid.value, len);
01307     sid = (ShortId) srpc->recv_int();
01308     if (intf_ver <= 8) {
01309       copyMax = (Basics::uint64) -1;
01310       who = NULL;
01311     } else {
01312       copyMax = (unsigned int) srpc->recv_int();
01313       copyMax += ((Basics::uint64) srpc->recv_int()) << 32;
01314       who = srpc_recv_identity(srpc, intf_ver);
01315     }
01316     srpc->recv_end();
01317     
01318     // Do the work
01319     ReadersWritersLock* lock;
01320     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01321     try {
01322         VestaSource* mvs;
01323         if (vs == NULL) {
01324             err = VestaSource::invalidArgs;
01325         } else {
01326           RWLOCK_LOCKED_REASON(lock, "SRPC:makeMutable");
01327             err = vs->makeMutable(mvs, sid, copyMax, who);
01328             delete vs;
01329         }
01330         // Send the results
01331         srpc->send_int((int) err);
01332         if (err == VestaSource::ok) {
01333             srpc->send_int((int) mvs->type);
01334             srpc->send_bytes((const char *) &mvs->longid, sizeof(mvs->longid));
01335             srpc->send_int((int) mvs->master);
01336             srpc->send_int((int) mvs->pseudoInode);
01337             srpc->send_int((int) mvs->shortId());
01338             srpc->send_int((int) mvs->timestamp());
01339             srpc->send_int((int) mvs->hasAttribs());
01340             vs->fptag.Send(*srpc);
01341             delete mvs;
01342         }
01343     } catch (SRPC::failure f) {
01344         if (lock != NULL) lock->releaseWrite();
01345         delete who;
01346         throw;
01347     }
01348     if (lock != NULL) lock->releaseWrite();
01349     delete who;
01350     srpc->send_end();
01351 }
01352 
01353 
01354 static void
01355 VestaSourceInAttribs(SRPC* srpc, int intf_ver)
01356 {
01357     LongId longid;
01358     int len;
01359     char* name;
01360     char* value;
01361     bool retval;
01362     
01363     // Receive the arguments
01364     len = sizeof(longid.value);
01365     srpc->recv_bytes_here((char *) &longid.value, len);
01366     name = srpc->recv_chars();
01367     value = srpc->recv_chars();
01368     srpc->recv_end();
01369     
01370     // Do the work
01371     ReadersWritersLock* lock;
01372     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01373     if (vs == NULL) {
01374         retval = false;
01375     } else {
01376       RWLOCK_LOCKED_REASON(lock, "SRPC:inAttribs");
01377         retval = vs->inAttribs(name, value);
01378         delete vs;
01379     }
01380     if (lock != NULL) lock->releaseRead();
01381     
01382     // Send the results
01383     delete [] name;
01384     delete [] value;
01385     srpc->send_int((int) retval);
01386     srpc->send_end();
01387 }
01388 
01389 static void
01390 VestaSourceGetAttrib(SRPC* srpc, int intf_ver)
01391 {
01392     LongId longid;
01393     int len;
01394     char* name;
01395     const char* value;
01396     bool retval;
01397     
01398     // Receive the arguments
01399     len = sizeof(longid.value);
01400     srpc->recv_bytes_here((char *) &longid.value, len);
01401     name = srpc->recv_chars();
01402     srpc->recv_end();
01403     
01404     // Do the work
01405     ReadersWritersLock* lock;
01406     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01407     if (vs == NULL) {
01408         value = NULL;
01409     } else {
01410       RWLOCK_LOCKED_REASON(lock, "SRPC:getAttribConst");
01411         value = vs->getAttribConst(name);
01412         delete vs;
01413     }
01414     
01415     // Send the results
01416     // Hold lock until done to keep value valid
01417     try {
01418         if (value == NULL) {
01419             srpc->send_int((int) true);
01420             srpc->send_chars("");
01421         } else {
01422             srpc->send_int((int) false);
01423             srpc->send_chars(value);
01424         }
01425     } catch (SRPC::failure f) {
01426         if (lock != NULL) lock->releaseRead(); 
01427         delete [] name;
01428         throw;
01429     }
01430     if (lock != NULL) lock->releaseRead(); 
01431     delete [] name;
01432     srpc->send_end();
01433 }
01434 
01435 static bool
01436 valueCallback(void* cl, const char* value)
01437 {
01438     SRPC* srpc = (SRPC*) cl;
01439     srpc->send_chars(value);
01440     return true;
01441 }
01442 
01443 static void
01444 VestaSourceGetAttrib2(SRPC* srpc, int intf_ver)
01445 {
01446     LongId longid;
01447     int len;
01448     char* name;
01449     bool retval;
01450     
01451     // Receive the arguments
01452     len = sizeof(longid.value);
01453     srpc->recv_bytes_here((char *) &longid.value, len);
01454     name = srpc->recv_chars();
01455     srpc->recv_end();
01456     
01457     // Do the work and send the results
01458     srpc->send_seq_start();
01459     ReadersWritersLock* lock;
01460     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01461     if (vs != NULL) {
01462       RWLOCK_LOCKED_REASON(lock, "SRPC:getAttrib");
01463         try {
01464             vs->getAttrib(name, valueCallback, (void*) srpc);
01465         } catch (SRPC::failure f) {
01466             delete vs;
01467             if (lock != NULL) lock->releaseRead();
01468             delete [] name;
01469             throw;
01470         }
01471         delete vs;
01472     }
01473     if (lock != NULL) lock->releaseRead();
01474     delete [] name;
01475     srpc->send_seq_end();
01476     srpc->send_end();
01477 }
01478 
01479 static void
01480 VestaSourceListAttribs(SRPC* srpc, int intf_ver)
01481 {
01482     LongId longid;
01483     int len;
01484     bool retval;
01485     
01486     // Receive the arguments
01487     len = sizeof(longid.value);
01488     srpc->recv_bytes_here((char *) &longid.value, len);
01489     srpc->recv_end();
01490     
01491     // Do the work and send the results
01492     srpc->send_seq_start();
01493     ReadersWritersLock* lock;
01494     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01495     if (vs != NULL) {
01496       RWLOCK_LOCKED_REASON(lock, "SRPC:listAttribs");
01497         try {
01498             vs->listAttribs(valueCallback, (void*) srpc);
01499         } catch (SRPC::failure f) {
01500             delete vs;
01501             if (lock != NULL) lock->releaseRead();
01502             throw;
01503         }
01504         delete vs;
01505     }
01506     if (lock != NULL) lock->releaseRead();
01507     srpc->send_seq_end();
01508     srpc->send_end();
01509 }
01510 
01511 static bool
01512 historyCallback(void* cl, VestaSource::attribOp op, const char* name,
01513                 const char* value, time_t timestamp)
01514 {
01515     SRPC* srpc = (SRPC*) cl;
01516     srpc->send_int((int) op);
01517     srpc->send_chars(name);
01518     srpc->send_chars(value);
01519     srpc->send_int((int) timestamp);
01520     return true;
01521 }
01522 
01523 static void
01524 VestaSourceGetAttribHistory(SRPC* srpc, int intf_ver)
01525 {
01526     LongId longid;
01527     int len;
01528     bool retval;
01529     
01530     // Receive the arguments
01531     len = sizeof(longid.value);
01532     srpc->recv_bytes_here((char *) &longid.value, len);
01533     srpc->recv_end();
01534     
01535     // Do the work and send the results
01536     srpc->send_seq_start();
01537     ReadersWritersLock* lock;
01538     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01539     if (vs != NULL) {
01540       RWLOCK_LOCKED_REASON(lock, "SRPC:getAttribHistory");
01541         try {
01542             vs->getAttribHistory(historyCallback, (void*) srpc);
01543         } catch (SRPC::failure f) {
01544             delete vs;
01545             if (lock != NULL) lock->releaseRead();
01546             throw;
01547         }
01548         delete vs;
01549     }
01550     if (lock != NULL) lock->releaseRead();
01551     srpc->send_seq_end();
01552     srpc->send_end();
01553 }
01554 
01555 static void
01556 VestaSourceWriteAttrib(SRPC* srpc, int intf_ver)
01557 {
01558     LongId longid;
01559     int len;
01560     VestaSource::attribOp op;
01561     char* name;
01562     char* value;
01563     time_t timestamp;
01564     VestaSource::errorCode err = VestaSource::ok;
01565     
01566     // Receive the arguments
01567     len = sizeof(longid.value);
01568     srpc->recv_bytes_here((char *) &longid.value, len);
01569     op = (VestaSource::attribOp) srpc->recv_int();
01570     name = srpc->recv_chars();
01571     value = srpc->recv_chars();
01572     AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01573     timestamp = (time_t) srpc->recv_int();
01574     srpc->recv_end();
01575     
01576     // Do the work
01577     ReadersWritersLock* lock;
01578     VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01579     if (vs != NULL) {
01580       RWLOCK_LOCKED_REASON(lock, "SRPC:writeAttrib");
01581       if(!vs->hasAttribs() && MutableRootLongId.isAncestorOf(vs->longid))
01582         {
01583           VestaSource *new_vs = 0;
01584           err = vs->copyToMutable(new_vs);
01585           delete vs;
01586           vs = new_vs;
01587         }
01588       if(err == VestaSource::ok)
01589         {
01590           err = vs->writeAttrib(op, name, value, who, timestamp);
01591           delete vs;
01592         }
01593     } else {
01594         err = VestaSource::invalidArgs;
01595     }
01596     if (lock != NULL) lock->releaseWrite();
01597     delete who;
01598     
01599     // Send the results
01600     delete [] name;
01601     delete [] value;
01602     srpc->send_int((int) err);
01603     srpc->send_end();
01604 }
01605 
01606 void
01607 MakeFilesImmutable(SRPC* srpc, int intf_ver)
01608 {
01609     LongId longid;
01610     int len;
01611     unsigned int threshold;
01612     VestaSource::errorCode err;
01613     AccessControl::Identity who = NULL;
01614 
01615     try {
01616       // Receive the arguments
01617       len = sizeof(longid.value);
01618       srpc->recv_bytes_here((char *) &longid.value, len);
01619       threshold = srpc->recv_int();
01620       if (intf_ver > 8) {
01621         who = srpc_recv_identity(srpc, intf_ver);
01622       }
01623       srpc->recv_end();
01624 
01625       // Do the work
01626       ReadersWritersLock* lock = NULL;
01627       VestaSource *vs = longid.lookup(LongId::writeLock, &lock);
01628       if (vs == NULL) {
01629         err = VestaSource::invalidArgs;
01630       } else {
01631         RWLOCK_LOCKED_REASON(lock, "SRPC:makeFilesImmutable");
01632         err = vs->makeFilesImmutable(threshold, who);
01633         delete vs;
01634       }
01635       if (lock != NULL) lock->releaseWrite();
01636 
01637       // Send the results
01638       srpc->send_int((int) err);
01639       srpc->send_end();
01640     } catch (...) {
01641       if (who) delete who;
01642       throw;
01643     }
01644     if (who) delete who;
01645 }    
01646 
01647 void
01648 SetIndexMaster(SRPC* srpc, int intf_ver)
01649 {
01650     LongId longid;
01651     int len;
01652     unsigned int index;
01653     bool state;
01654     VestaSource::errorCode err;
01655     AccessControl::Identity who = NULL;
01656 
01657     try {
01658       // Receive the arguments
01659       len = sizeof(longid.value);
01660       srpc->recv_bytes_here((char *) &longid.value, len);
01661       index = (unsigned int) srpc->recv_int();
01662       state = (bool) srpc->recv_int();
01663       if (intf_ver <= 8) {
01664         who = NULL;
01665       } else {
01666         who = srpc_recv_identity(srpc, intf_ver);
01667       }
01668       srpc->recv_end();
01669     
01670       // Do the work
01671       ReadersWritersLock* lock = NULL;
01672       VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01673       if (vs == NULL) {
01674         err = VestaSource::invalidArgs;
01675       } else {
01676         RWLOCK_LOCKED_REASON(lock, "SRPC:setIndexMaster");
01677         err = vs->setIndexMaster(index, state, who);
01678         delete vs;
01679       }
01680       if (lock != NULL) lock->releaseWrite();
01681 
01682       // Send the results
01683       srpc->send_int((int) err);
01684       srpc->send_end();
01685     } catch (...) {
01686       if (who) delete who;
01687       throw;
01688     }
01689     if (who) delete who;
01690 }    
01691 
01692 // Server stub for VDirSurrogate::acquireMastership
01693 static void
01694 VestaSourceAcquireMastership(SRPC* srpc, int intf_ver)
01695 {
01696   const char* pathname = NULL;
01697   char srcHost[MAX_ARC_LEN+1];
01698   char srcPort[MAX_ARC_LEN+1];
01699   VestaSource::errorCode err;
01700   char pathnameSep;
01701   AccessControl::Identity dwho = NULL;
01702   AccessControl::Identity swho = NULL;
01703 
01704   // Receive the arguments
01705   try {
01706     int len;
01707     pathname = srpc->recv_chars();
01708     len = MAX_ARC_LEN+1;
01709     srpc->recv_chars_here(srcHost, len);
01710     len = MAX_ARC_LEN+1;
01711     srpc->recv_chars_here(srcPort, len);
01712     pathnameSep = (char) srpc->recv_int();
01713     dwho = srpc_recv_identity(srpc, intf_ver);
01714     swho = srpc_recv_identity(srpc, intf_ver);
01715     srpc->recv_end();
01716     
01717     // Do the work
01718     err = AcquireMastership(pathname, srcHost, srcPort, pathnameSep,
01719                             dwho, swho);
01720 
01721     // Send the results
01722     srpc->send_int((int) err);
01723     srpc->send_end();
01724 
01725   } catch (SRPC::failure f) {
01726     if (dwho) delete dwho;
01727     if (swho) delete swho;
01728     if (pathname) delete[] pathname;
01729     throw;
01730   } 
01731   delete dwho;
01732   delete swho;
01733   delete[] pathname;
01734 }
01735 
01736 // Server stub for VDirSurrogate::cedeMastership
01737 static void
01738 VestaSourceCedeMastership(SRPC* srpc, int intf_ver)
01739 {
01740   LongId longid;
01741   char requestid[MAX_ARC_LEN+1];
01742   const char* grantid = NULL;
01743   VestaSource::errorCode err;
01744   VestaSource *vs = NULL;
01745   AccessControl::Identity who = NULL;
01746 
01747   // Receive the arguments
01748   try {
01749     int len;
01750     len = sizeof(longid.value);
01751     srpc->recv_bytes_here((char *) &longid.value, len);
01752     len = MAX_ARC_LEN+1;
01753     srpc->recv_chars_here(requestid, len);
01754     who = srpc_recv_identity(srpc, intf_ver);
01755     srpc->recv_end();
01756     
01757     // Do the work
01758     ReadersWritersLock* lock = NULL;
01759     vs = longid.lookup(LongId::writeLock, &lock);
01760     if (vs == NULL) {
01761       err = VestaSource::invalidArgs;
01762     } else {
01763       RWLOCK_LOCKED_REASON(lock, "SRPC:cedeMastership");
01764       err = vs->cedeMastership(requestid, &grantid, who);
01765     }
01766     if (lock != NULL) lock->releaseWrite();
01767 
01768     // Send the results
01769     srpc->send_int((int) err);
01770     if (err == VestaSource::ok) {
01771       srpc->send_chars(grantid);
01772     }
01773     srpc->send_end();
01774 
01775   } catch (SRPC::failure f) {
01776     if (who) delete who;
01777     if (vs) delete vs;
01778     if (grantid) delete[] grantid;
01779     throw;
01780   } 
01781   if (who) delete who;
01782   if (vs) delete vs;
01783   if (grantid) delete[] grantid;
01784 }
01785 
01786 // Server stub for VDirSurrogate::replicate
01787 static void
01788 VestaSourceReplicate(SRPC* srpc, int intf_ver)
01789 {
01790   char* pathname = NULL;
01791   bool asStub, asGhost;
01792   char srcHost[MAX_ARC_LEN+1];
01793   char srcPort[MAX_ARC_LEN+1];
01794   VestaSource::errorCode err;
01795   char pathnameSep;
01796   AccessControl::Identity dwho = NULL;
01797   AccessControl::Identity swho = NULL;
01798 
01799   // Receive the arguments
01800   try {
01801     int len;
01802     pathname = srpc->recv_chars();
01803     asStub = (bool) srpc->recv_int();
01804     asGhost = (bool) srpc->recv_int();
01805     len = MAX_ARC_LEN+1;
01806     srpc->recv_chars_here(srcHost, len);
01807     len = MAX_ARC_LEN+1;
01808     srpc->recv_chars_here(srcPort, len);
01809     pathnameSep = (char) srpc->recv_int();
01810     dwho = srpc_recv_identity(srpc, intf_ver);
01811     swho = srpc_recv_identity(srpc, intf_ver);
01812     srpc->recv_end();
01813     
01814     // Do the work
01815     err = Replicate(pathname, asStub, asGhost, srcHost, srcPort, pathnameSep,
01816                     dwho, swho);
01817 
01818     // Send the results
01819     srpc->send_int((int) err);
01820     srpc->send_end();
01821 
01822   } catch (SRPC::failure f) {
01823     if (dwho) delete dwho;
01824     if (swho) delete swho;
01825     if (pathname) delete[] pathname;
01826     throw;
01827   } 
01828   delete dwho;
01829   delete swho;
01830   delete[] pathname;
01831 }
01832 
01833 // Server stub for VDirSurrogate::replicateAttribs
01834 static void
01835 VestaSourceReplicateAttribs(SRPC* srpc, int intf_ver)
01836 {
01837   char* pathname = NULL;
01838   bool includeAccess;
01839   char srcHost[MAX_ARC_LEN+1];
01840   char srcPort[MAX_ARC_LEN+1];
01841   VestaSource::errorCode err;
01842   char pathnameSep;
01843   AccessControl::Identity dwho = NULL;
01844   AccessControl::Identity swho = NULL;
01845 
01846   // Receive the arguments
01847   try {
01848     int len;
01849     pathname = srpc->recv_chars();
01850     includeAccess = (bool) srpc->recv_int();
01851     len = MAX_ARC_LEN+1;
01852     srpc->recv_chars_here(srcHost, len);
01853     len = MAX_ARC_LEN+1;
01854     srpc->recv_chars_here(srcPort, len);
01855     pathnameSep = (char) srpc->recv_int();
01856     dwho = srpc_recv_identity(srpc, intf_ver);
01857     swho = srpc_recv_identity(srpc, intf_ver);
01858     srpc->recv_end();
01859     
01860     // Do the work
01861     err = ReplicateAttribs(pathname, includeAccess, srcHost, srcPort,
01862                            pathnameSep, dwho, swho);
01863 
01864     // Send the results
01865     srpc->send_int((int) err);
01866     srpc->send_end();
01867 
01868   } catch (SRPC::failure f) {
01869     if (dwho) delete dwho;
01870     if (swho) delete swho;
01871     if (pathname) delete[] pathname;
01872     throw;
01873   } 
01874   delete dwho;
01875   delete swho;
01876   delete[] pathname;
01877 }
01878 
01879 
01880 void
01881 VSStat(SRPC* srpc, int intf_ver)
01882 {
01883     LongId longid;
01884     int len;
01885     time_t ts = -1;
01886     bool x = false;
01887     Basics::uint64 s = 0;
01888     VestaSource::errorCode err = VestaSource::invalidArgs;
01889 
01890     // Receive the arguments
01891     len = sizeof(longid.value);
01892     srpc->recv_bytes_here((char *) &longid.value, len);
01893     srpc->recv_end();
01894     
01895     // Do the work
01896     ReadersWritersLock* lock = NULL;
01897     VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01898     if (vs != NULL) {
01899       RWLOCK_LOCKED_REASON(lock, "SRPC:stat");
01900       err = VestaSource::ok;
01901       ts = vs->timestamp();
01902       x = vs->executable();
01903       s = vs->size();
01904     }
01905     // Release lock
01906     if(lock != NULL) lock->releaseRead();
01907 
01908     // Free vs.  (We do this after releaseing the lock as delete may
01909     // block in the memory allocation system.)
01910     if(vs != NULL) delete vs;
01911 
01912     // Send the results
01913     srpc->send_int((int) err);
01914     srpc->send_int((int) ts);
01915     srpc->send_int((int) x);
01916     srpc->send_int((int) (s & 0xffffffff));
01917     srpc->send_int((int) (s >> 32ul));
01918     srpc->send_end();
01919 }
01920 
01921 void
01922 VSRead(SRPC* srpc, int intf_ver)
01923 {
01924     LongId longid;
01925     int len;
01926     int nbytes;
01927     Basics::uint64 offset;
01928     void* buffer = NULL;
01929     VestaSource::errorCode err;
01930     AccessControl::Identity who = NULL;
01931 
01932     try {
01933         // Receive the arguments
01934         len = sizeof(longid.value);
01935         srpc->recv_bytes_here((char *) &longid.value, len);
01936         nbytes = srpc->recv_int();
01937         offset = (Basics::uint64) srpc->recv_int();
01938         offset += ((Basics::uint64) srpc->recv_int()) << 32ul;
01939         who = srpc_recv_identity(srpc, intf_ver);
01940         srpc->recv_end();
01941 
01942         // Do the work
01943         ReadersWritersLock* lock = NULL;
01944         VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01945         if (vs == NULL) {
01946             err = VestaSource::invalidArgs;
01947         } else {
01948           RWLOCK_LOCKED_REASON(lock, "SRPC:read");
01949             buffer = (void*) malloc(nbytes);
01950             err = vs->read(buffer, &nbytes, offset, who);
01951         }
01952         // Release lock
01953         if (lock != NULL) lock->releaseRead();
01954 
01955         if(vs) delete vs;
01956 
01957         // Send the results
01958         srpc->send_int((int) err);
01959         if (err == VestaSource::ok) {
01960             srpc->send_bytes((char*)buffer, nbytes);
01961         }
01962         srpc->send_end();
01963     } catch (SRPC::failure f) {
01964         if (who) delete who;
01965         if (buffer) free(buffer);
01966         throw;
01967     } 
01968     if (who) delete who;
01969     if (buffer) free(buffer);
01970 }
01971 
01972 void
01973 VSWrite(SRPC* srpc, int intf_ver)
01974 {
01975     LongId longid;
01976     int len;
01977     int nbytes;
01978     Basics::uint64 offset;
01979     char* buffer = NULL;
01980     VestaSource::errorCode err;
01981     AccessControl::Identity who = NULL;
01982 
01983     try {
01984       // Receive the arguments
01985       len = sizeof(longid.value);
01986       srpc->recv_bytes_here((char *) &longid.value, len);
01987       offset = (Basics::uint64) srpc->recv_int();
01988       offset += ((Basics::uint64) srpc->recv_int()) << 32ul;
01989       buffer = srpc->recv_bytes(nbytes);
01990       who = srpc_recv_identity(srpc, intf_ver);
01991       srpc->recv_end();
01992     
01993       // Do the work
01994       ReadersWritersLock* lock = NULL;
01995       VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01996       if (vs == NULL) {
01997         err = VestaSource::invalidArgs;
01998       } else {
01999         RWLOCK_LOCKED_REASON(lock, "SRPC:write");
02000         err = vs->write((void*)buffer, &nbytes, offset, who);
02001       }
02002       // Release lock
02003       if (lock != NULL) lock->releaseWrite();
02004 
02005       delete[] buffer;
02006       if(vs) delete vs;
02007 
02008       // Send the results
02009       srpc->send_int((int) err);
02010       if (err == VestaSource::ok) {
02011         srpc->send_int((int) nbytes);
02012       }
02013       srpc->send_end();
02014     } catch (...) {
02015       if (who) delete who;
02016       throw;
02017     }
02018     if (who) delete who;
02019 }
02020 
02021 void
02022 VSSetExecutable(SRPC* srpc, int intf_ver)
02023 {
02024     LongId longid;
02025     int len;
02026     bool x;
02027     VestaSource::errorCode err;
02028     AccessControl::Identity who = NULL;
02029 
02030     try {
02031       // Receive the arguments
02032       len = sizeof(longid.value);
02033       srpc->recv_bytes_here((char *) &longid.value, len);
02034       x = (bool) srpc->recv_int();
02035       who = srpc_recv_identity(srpc, intf_ver);
02036       srpc->recv_end();
02037     
02038       // Do the work
02039       ReadersWritersLock* lock = NULL;
02040       VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02041       if (vs == NULL) {
02042         err = VestaSource::invalidArgs;
02043       } else {
02044         RWLOCK_LOCKED_REASON(lock, "SRPC:setExecutable");
02045         err = vs->setExecutable(x, who);
02046       }
02047       // Release lock
02048       if (lock != NULL) lock->releaseWrite();
02049       if(vs) delete vs;
02050 
02051       // Send the results
02052       srpc->send_int((int) err);
02053       srpc->send_end();
02054     } catch (...) {
02055       if (who) delete who;
02056       throw;
02057     }
02058     if (who) delete who;
02059 }
02060 
02061 void
02062 VSSetSize(SRPC* srpc, int intf_ver)
02063 {
02064     LongId longid;
02065     int len;
02066     Basics::uint64 s;
02067     VestaSource::errorCode err;
02068     AccessControl::Identity who = NULL;
02069 
02070     try {
02071       // Receive the arguments
02072       len = sizeof(longid.value);
02073       srpc->recv_bytes_here((char *) &longid.value, len);
02074       s = (Basics::uint64) srpc->recv_int();
02075       s += ((Basics::uint64) srpc->recv_int()) << 32ul;
02076       who = srpc_recv_identity(srpc, intf_ver);
02077 
02078       srpc->recv_end();
02079     
02080       // Do the work
02081       ReadersWritersLock* lock = NULL;
02082       VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02083       if (vs == NULL) {
02084         err = VestaSource::invalidArgs;
02085       } else {
02086         RWLOCK_LOCKED_REASON(lock, "SRPC:setSize");
02087         err = vs->setSize(s, who);
02088       }
02089       // Release lock
02090       if (lock != NULL) lock->releaseWrite();
02091       if(vs) delete vs;
02092 
02093       // Send the results
02094       srpc->send_int((int) err);
02095       srpc->send_end();
02096     } catch (...) {
02097       if (who) delete who;
02098       throw;
02099     }
02100     if (who) delete who;
02101 }
02102 
02103 void
02104 VSSetTimestamp(SRPC* srpc, int intf_ver)
02105 {
02106     LongId longid;
02107     int len;
02108     time_t ts;
02109     VestaSource::errorCode err;
02110     AccessControl::Identity who = NULL;
02111 
02112     try {
02113       // Receive the arguments
02114       len = sizeof(longid.value);
02115       srpc->recv_bytes_here((char *) &longid.value, len);
02116       ts = (time_t) srpc->recv_int();
02117       who = srpc_recv_identity(srpc, intf_ver);
02118       srpc->recv_end();
02119     
02120       // Do the work
02121       ReadersWritersLock* lock = NULL;
02122       VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02123       if (vs == NULL) {
02124         err = VestaSource::invalidArgs;
02125       } else {
02126         RWLOCK_LOCKED_REASON(lock, "SRPC:setTimestamp");
02127         err = vs->setTimestamp(ts, who);
02128       }
02129       // Release lock
02130       if (lock != NULL) lock->releaseWrite();
02131       if(vs) delete vs;
02132 
02133       // Send the results
02134       srpc->send_int((int) err);
02135       srpc->send_end();
02136     } catch (...) {
02137       if (who) delete who;
02138       throw;
02139     }
02140     if (who) delete who;
02141 }
02142 
02143 static void
02144 VestaSourceGetUserInfo(SRPC* srpc, int intf_ver)
02145 {
02146   // Receive the arguments.  The first is really just a formality to
02147   // ensure that only authorized users can inquire about other users.
02148   AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
02149   AccessControl::Identity subject = srpc_recv_identity(srpc, intf_ver, false);
02150   srpc->recv_end();
02151 
02152   AccessControl::IdInfo result;
02153 
02154   unsigned int i = 0;
02155   const char *val = 0;
02156 
02157   // If the requestor is different from the subject and the requestor
02158   // is not an administrator...
02159   if((*who != *subject) &&
02160      !VestaSource::repositoryRoot()->ac.check(who,
02161                                               AccessControl::administrative))
02162     {
02163       // Deny the request.
02164       srpc->send_failure(SRPC::invalid_parameter,
02165                          "administrator access required to get information about other users");
02166     }
02167 
02168   // Get the list of the user's names and aliases.
02169   while((val = subject->user(i)) != 0)
02170     {
02171       result.names.append(val);
02172       i++;
02173     }
02174 
02175   // Get the list of the user's group memberships.
02176   i = 0;
02177   while((val = subject->group(i)) != 0)
02178     {
02179       result.groups.append(val);
02180       i++;
02181     }
02182 
02183   // Get the user ID and primary group ID of the user.  (These are
02184   // significant for NFS access.)
02185   result.unix_uid = subject->toUnixUser();
02186   result.unix_gid = subject->toUnixGroup();
02187 
02188   // Determine if the user has administrative access.
02189   result.is_admin = 
02190     VestaSource::repositoryRoot()->ac.check(subject,
02191                                             AccessControl::administrative);
02192   // Determine if the user has agreement access.
02193   result.is_wizard =
02194     VestaSource::repositoryRoot()->ac.check(subject,
02195                                             AccessControl::agreement);
02196   // Root has additional special powers (can make anything
02197   // setuid/setgid).
02198   result.is_root = subject->userMatch(AccessControl::rootUser);
02199   // The runtool user is the special user that owns the volatile tree.
02200   result.is_runtool = subject->userMatch(AccessControl::runtoolUser);
02201 
02202   // Send the list of global names and groups
02203   srpc->send_chars_seq(result.names);
02204   srpc->send_chars_seq(result.groups);
02205 
02206   // Send the NFS uid and primary gid
02207   srpc->send_int32(result.unix_uid);
02208   srpc->send_int32(result.unix_gid);
02209 
02210   // Send all the special bits in one 16-bit integer
02211   srpc->send_int16((result.is_root ? 1 : 0) |
02212                    ((result.is_admin ? 1 : 0) << 1) |
02213                    ((result.is_wizard ? 1 : 0) << 2) |
02214                    ((result.is_runtool ? 1 : 0) << 3));
02215 
02216   srpc->send_end();
02217 }
02218 
02219 static void
02220 VestaSourceRefreshAccessTables(SRPC* srpc, int intf_ver)
02221 {
02222   // Receive the arguments.  The first is really just a formality to
02223   // ensure that only authorized users can inquire about other users.
02224   AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
02225   srpc->recv_end();
02226 
02227   // Does the requestor have administrative access?
02228   if(!VestaSource::repositoryRoot()->ac.check(who,
02229                                               AccessControl::administrative))
02230     {
02231       // If not, deny the request.
02232       srpc->send_failure(SRPC::invalid_parameter,
02233                          "administrator access required");
02234     }
02235 
02236   try
02237     {
02238       AccessControl::refreshAccessTables();
02239     }
02240   catch(AccessControl::ParseError f)
02241     {
02242       Text message = ("error parsing " + f.fkind + " " + f.fname +
02243                       ": " + f.message);
02244 
02245       // Not really an invalid parameter, but one of the access
02246       // control files was unparseable.
02247       srpc->send_failure(SRPC::invalid_parameter,
02248                          message.cchars());
02249     }
02250 
02251   srpc->send_end();
02252 }
02253 
02254 static void
02255 VestaSourceGetStats(SRPC* srpc, int intf_ver)
02256 {
02257   // This is just a formailty to ensure that only users with read
02258   // access to the repository can make this call.
02259   AccessControl::Identity who = 0;
02260   Basics::int16 *requested_stats = 0;
02261 
02262   try
02263     {
02264       who = srpc_recv_identity(srpc, intf_ver);
02265 
02266       int requested_stats_count = 0;
02267       requested_stats = srpc->recv_int16_array(requested_stats_count);
02268 
02269       srpc->recv_end();
02270 
02271       srpc->send_seq_start();
02272       for(int i = 0; i < requested_stats_count; i++)
02273         {
02274           switch((ReposStats::StatKind) requested_stats[i])
02275             {
02276             case ReposStats::fdCache:
02277               srpc->send_int16(requested_stats[i]);
02278               {
02279                 ReposStats::FdCacheStats fdCacheStats;
02280                 FdCache::getStats(fdCacheStats.n_in_cache,
02281                                   fdCacheStats.hits,
02282                                   fdCacheStats.open_misses,
02283                                   fdCacheStats.try_misses,
02284                                   fdCacheStats.evictions,
02285                                   fdCacheStats.expirations);
02286 
02287                 // Send them to the client.
02288                 fdCacheStats.send(srpc);
02289               }
02290               break;
02291             case ReposStats::dupeTotal:
02292               srpc->send_int16(requested_stats[i]);
02293               {
02294                 // Get the duplicate suppression statistics.
02295                 ReposStats::DupeStats dupeStats;
02296                 dupeStats = get_dupe_stats();
02297 
02298                 // Send them to the client.
02299                 dupeStats.send(srpc);
02300               }
02301               break;
02302             case ReposStats::srpcTotal:
02303               srpc->send_int16(requested_stats[i]);
02304               {
02305                 // Get statistics on SRPC calls
02306                 ReposStats::TimedCalls srpcStats;
02307                 SRPC_Call_Stats::getStats(srpcStats.call_count,
02308                                           srpcStats.elapsed_secs,
02309                                           srpcStats.elapsed_usecs);
02310 
02311                 // Send them to the client
02312                 srpcStats.send(srpc);
02313               }
02314               break;
02315             case ReposStats::nfsTotal:
02316               srpc->send_int16(requested_stats[i]);
02317               {
02318                 // Get statistics on NFS calls
02319                 ReposStats::TimedCalls nfsStats;
02320                 NFS_Call_Stats::getStats(nfsStats.call_count,
02321                                          nfsStats.elapsed_secs,
02322                                          nfsStats.elapsed_usecs);
02323 
02324                 // Send them to the client
02325                 nfsStats.send(srpc);
02326               }
02327               break;
02328             case ReposStats::memUsage:
02329               srpc->send_int16(requested_stats[i]);
02330               {
02331                 // Get memory usage
02332                 unsigned long total, resident;
02333                 OS::GetProcessSize(total, resident);
02334 
02335                 // Send them to the client
02336                 ReposStats::MemStats memStats(total, resident);
02337                 memStats.send(srpc);
02338               }
02339               break;
02340               // Note: no default switch label.  In the event that the
02341               // client asks for a statistic we don't know how to gather,
02342               // we just don't send it back to them.
02343             }
02344         }
02345       srpc->send_seq_end();
02346 
02347       srpc->send_end();
02348     }
02349   catch (...)
02350     {
02351       if(who) delete who;
02352       if(requested_stats) delete [] requested_stats;
02353       throw;
02354     }
02355   if (who) delete who;
02356   if(requested_stats) delete [] requested_stats;
02357 }
02358 
02359 static void
02360 VestaSourceMeasureDirectory(SRPC* srpc, int intf_ver)
02361 {
02362     LongId longid;
02363     int len;
02364     VestaSource::errorCode err;
02365     AccessControl::Identity who = NULL;
02366 
02367     try {
02368       // Receive the arguments
02369       len = sizeof(longid.value);
02370       srpc->recv_bytes_here((char *) &longid.value, len);
02371       who = srpc_recv_identity(srpc, intf_ver);
02372       srpc->recv_end();
02373     
02374       // Do the work
02375       VestaSource::directoryStats stats;
02376       ReadersWritersLock* lock = NULL;
02377       VestaSource* vs = longid.lookup(LongId::readLock, &lock);
02378       if (vs == NULL) {
02379         err = VestaSource::invalidArgs;
02380       } else {
02381         RWLOCK_LOCKED_REASON(lock, "SRPC:MeasureDirectory");
02382         err = vs->measureDirectory(stats, who);
02383       }
02384       // Release lock
02385       if (lock != NULL) lock->releaseRead();
02386 
02387       if(vs) delete vs;
02388 
02389       // Send the results
02390       srpc->send_int((int) err);
02391       if(err == VestaSource::ok)
02392       {
02393         srpc->send_int32(stats.baseChainLength);
02394         srpc->send_int32(stats.usedEntryCount);
02395         srpc->send_int32(stats.usedEntrySize);
02396         srpc->send_int32(stats.totalEntryCount);
02397         srpc->send_int32(stats.totalEntrySize);
02398       }
02399       srpc->send_end();
02400     } catch (...) {
02401       if (who) delete who;
02402       throw;
02403     }
02404     if (who) delete who;
02405 }
02406 
02407 static void
02408 VestaSourceCollapseBase(SRPC* srpc, int intf_ver)
02409 {
02410     LongId longid;
02411     int len;
02412     VestaSource::errorCode err;
02413     AccessControl::Identity who = NULL;
02414 
02415     try {
02416       // Receive the arguments
02417       len = sizeof(longid.value);
02418       srpc->recv_bytes_here((char *) &longid.value, len);
02419       who = srpc_recv_identity(srpc, intf_ver);
02420       srpc->recv_end();
02421     
02422       // Do the work
02423       ReadersWritersLock* lock = NULL;
02424       VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02425       if (vs == NULL) {
02426         err = VestaSource::invalidArgs;
02427       } else {
02428         RWLOCK_LOCKED_REASON(lock, "SRPC:CollapseBase");
02429         err = vs->collapseBase(who);
02430       }
02431       // Release lock
02432       if (lock != NULL) lock->releaseWrite();
02433 
02434       if(vs) delete vs;
02435 
02436       // Send the results
02437       srpc->send_int((int) err);
02438       srpc->send_end();
02439     } catch (...) {
02440       if (who) delete who;
02441       throw;
02442     }
02443     if (who) delete who;
02444 }
02445 
02446 static void
02447 SetPerfDebug(SRPC* srpc, int intf_ver)
02448 {
02449   AccessControl::Identity who = 0;
02450   try
02451     {
02452       // Receive the arguments
02453       who = srpc_recv_identity(srpc, intf_ver);
02454       Basics::uint64 settings = srpc->recv_int64();
02455       srpc->recv_end();
02456 
02457       if(!VestaSource::repositoryRoot()->ac.check(who,
02458                                                   AccessControl::administrative))
02459         {
02460           // If not, deny the request.
02461           srpc->send_failure(SRPC::invalid_parameter,
02462                              "administrator access required");
02463         }
02464 
02465       Basics::uint64 result = 0;
02466 #if defined(REPOS_PERF_DEBUG)
02467       timing_control(settings & PerfDebug::nfsCallTiming);
02468       result |= (settings & PerfDebug::nfsCallTiming);
02469 
02470       rwlock_timing_control(settings & PerfDebug::centralLockTiming);
02471       result |= (settings & PerfDebug::centralLockTiming);
02472 #endif
02473       srpc->send_int64(result);
02474       srpc->send_end();
02475     }
02476   catch (...)
02477     {
02478       if (who) delete who;
02479       throw;
02480     }
02481   if (who) delete who;
02482 }
02483 
02484 // Server version.  This is defined in a file written in progs.ves.
02485 extern const char *Version;
02486 
02487 // Server start time, set in main.
02488 extern time_t serverStartTime;
02489 
02490 static void
02491 GetServerInfo(SRPC* srpc, int intf_ver)
02492 {
02493   AccessControl::Identity who = 0;
02494   try
02495     {
02496       // Receive the arguments
02497       who = srpc_recv_identity(srpc, intf_ver);
02498       srpc->recv_end();
02499 
02500       srpc->send_chars(Version);
02501       srpc->send_int64(serverStartTime);
02502       srpc->send_int32(time((time_t *) 0) - serverStartTime);
02503 
02504       srpc->send_end();
02505     }
02506   catch (...)
02507     {
02508       if (who) delete who;
02509       throw;
02510     }
02511   if (who) delete who;
02512 }
02513 
02514 // These control VSReadWholeCompressed.  They're read from config
02515 // settings in VestaSourceServerExport below.
02516 static int readWhole_raw_bufsiz = (128*1024);
02517 static int readWhole_deflate_bufsiz = (64*1024);
02518 static int readWhole_deflate_level = -1;
02519 
02520 static void
02521 VSReadWholeCompressed(SRPC* srpc, int intf_ver)
02522 {
02523   // Arguments from the client
02524   LongId longid;
02525   int len;
02526   AccessControl::Identity who = 0;
02527   int maxbytes;
02528 
02529   // Result status sent to the client
02530   VestaSource::errorCode err;
02531 
02532   // Information on the file we're compressing and sending to the client.
02533   ShortId filesid;
02534   int fd = -1;
02535   Basics::uint64 bytes_left;
02536   off_t bytes_read = 0;
02537 
02538   // Client's list of supported compression methods
02539   Basics::int16 *client_methods = 0;
02540   int client_method_count = 0;
02541 
02542   // zlib state
02543   z_stream zstrm;
02544   bool zstrm_initialized = false;
02545 
02546   // Buffers used to hold bytes read from the file and compressed
02547   // bytes
02548   char *raw_buf = 0, *send_buf = 0;
02549 
02550   try
02551     {
02552       // Receive the arguments
02553       len = sizeof(longid.value);
02554       srpc->recv_bytes_here((char *) &longid.value, len);
02555       who = srpc_recv_identity(srpc, intf_ver);
02556       client_methods = srpc->recv_int16_array(client_method_count);
02557       maxbytes = srpc->recv_int32();
02558       srpc->recv_end();
02559 
02560       bool method_ok = false;
02561       for(unsigned int i = 0; i < client_method_count; i++)
02562         if(client_methods[i] == VestaSourceSRPC::compress_zlib_deflate)
02563           method_ok = true;
02564       if(!method_ok)
02565         {
02566           err = VestaSource::invalidArgs;
02567         }
02568       else
02569         {
02570           // Do the work
02571           ReadersWritersLock* lock = NULL;
02572           VestaSource* vs = longid.lookup(LongId::readLock, &lock);
02573           if (vs == NULL) {
02574             err = VestaSource::invalidArgs;
02575           } else {
02576             RWLOCK_LOCKED_REASON(lock, "SRPC:readWholeCompressed");
02577             if(vs->type != VestaSource::immutableFile) {
02578               err = VestaSource::inappropriateOp;
02579             } else {
02580               filesid = vs->shortId();
02581               bytes_left = vs->size();
02582               fd = FdCache::open(filesid, FdCache::ro);
02583               err = ((fd == -1)
02584                      ? Repos::errno_to_errorCode(errno)
02585                      : VestaSource::ok);
02586             }
02587           }
02588       
02589           // Release lock.  (We don't want to hold it while compressing
02590           // and sending data to the client.)
02591           if (lock != NULL) lock->releaseRead();
02592           // Free memory
02593           if (vs) delete vs;
02594         }
02595 
02596       // Send the results
02597       srpc->send_int((int) err);
02598       if (err == VestaSource::ok) {
02599         assert(fd != -1);
02600 
02601         // For now, we always use zlib's deflate for compression.
02602         srpc->send_int16(VestaSourceSRPC::compress_zlib_deflate);
02603 
02604         // Now that we've release the lock, we'll actually start up
02605         // zlib and compress the data.
02606         zstrm.zalloc = Z_NULL;
02607         zstrm.zfree = Z_NULL;
02608         zstrm.opaque = Z_NULL;
02609         if (deflateInit(&zstrm, readWhole_deflate_level) != Z_OK)
02610           srpc->send_failure(SRPC::internal_trouble,
02611                              "zlib deflateInit failed");
02612         zstrm_initialized = true;
02613 
02614         raw_buf = NEW_PTRFREE_ARRAY(char, readWhole_raw_bufsiz);
02615         unsigned int send_size = ((maxbytes < readWhole_deflate_bufsiz)
02616                                   ? maxbytes
02617                                   : readWhole_deflate_bufsiz);
02618         send_buf = NEW_PTRFREE_ARRAY(char, send_size);
02619 
02620         srpc->send_seq_start();
02621         while(bytes_left > 0)
02622           {
02623             zstrm.next_in = (Bytef *) raw_buf;
02624             int read_res;
02625             do
02626               read_res = ::pread(fd, raw_buf, readWhole_raw_bufsiz,
02627                                  bytes_read);
02628             while((read_res == -1) && (errno == EINTR));
02629             zstrm.avail_in = read_res;
02630 
02631             if(zstrm.avail_in > 0)
02632               {
02633                 assert(bytes_left >= zstrm.avail_in);
02634                 bytes_left -= zstrm.avail_in;
02635                 bytes_read += zstrm.avail_in;
02636                 do
02637                   {
02638                     zstrm.avail_out = send_size;
02639                     zstrm.next_out = (Bytef *) send_buf;
02640                     int deflate_status = deflate(&zstrm,
02641                                                  (bytes_left > 0
02642                                                   ? Z_NO_FLUSH
02643                                                   : Z_FINISH));
02644                     if(deflate_status == Z_STREAM_ERROR)
02645                       srpc->send_failure(SRPC::internal_trouble,
02646                                          "zlib deflate returned Z_STREAM_ERROR");
02647                     srpc->send_bytes(send_buf,
02648                                      send_size - zstrm.avail_out);
02649                   }
02650                 while(zstrm.avail_out == 0);
02651                 if(zstrm.avail_in != 0)
02652                   srpc->send_failure(SRPC::internal_trouble,
02653                                      "zlib deflate left some input unconsumed");
02654               }
02655             else
02656               {
02657                 int errno_save = errno;
02658                 Text msg("error reading file for compression: ");
02659                 msg += Basics::errno_Text(errno_save);
02660                 srpc->send_failure(SRPC::internal_trouble, msg);
02661               }
02662           }
02663         srpc->send_seq_end();
02664 
02665         srpc->send_end();
02666       }
02667     }
02668   catch (...)
02669     {
02670       if(zstrm_initialized) (void)deflateEnd(&zstrm);
02671       if(fd != -1) FdCache::close(filesid, fd, FdCache::ro);
02672       if(raw_buf) delete [] raw_buf;
02673       if(send_buf) delete [] send_buf;
02674       if(who) delete who;
02675       throw;
02676     }
02677   if(zstrm_initialized) (void)deflateEnd(&zstrm);
02678   if(fd != -1) FdCache::close(filesid, fd, FdCache::ro);
02679   if(raw_buf) delete [] raw_buf;
02680   if(send_buf) delete [] send_buf;
02681   if (who) delete who;
02682 }
02683 
02684 // What's the minimum interface version we can support?
02685 static const int g_min_intf_ver = 8;
02686 
02687 // Function to accept RPC calls
02688 void
02689 VestaSourceReceptionist(SRPC *srpc, int intf_ver, int proc_id, void *arg)
02690 {
02691   // This objects destruction when we leave this function (either
02692   // normally or by an exception) will record statistics about this
02693   // SRPC call.
02694   SRPC_Call_Stats::Helper stat_recorder;
02695 
02696     // Needed?
02697     signal(SIGPIPE, SIG_IGN);
02698     signal(SIGQUIT, SIG_DFL);
02699     signal(SIGSEGV, SIG_DFL);
02700     signal(SIGABRT, SIG_DFL);
02701     signal(SIGILL, SIG_DFL);
02702     signal(SIGBUS, SIG_DFL);
02703     // end Needed?
02704 
02705     if (intf_ver < g_min_intf_ver || intf_ver > VestaSourceSRPC::version) {
02706       Text client = srpc->remote_socket();
02707       Repos::dprintf(DBG_ALWAYS,
02708                      "VestaSourceReceptionist got unsupported interface "
02709                      "version %d; client %s\n", intf_ver, client.cchars());
02710       srpc->send_failure(SRPC::version_skew,
02711                          "VestaSourceSRPC: Unsupported interface version");
02712       return;
02713     }
02714 
02715     // Don't allow us to block forever waiting for a client.
02716     srpc->enable_read_timeout(readTimeout);
02717 
02718     switch (proc_id)
02719       {
02720       case VestaSourceSRPC::Lookup:
02721         VestaSourceLookup(srpc, intf_ver);
02722         break;
02723       case VestaSourceSRPC::CreateVolatileDirectory:
02724         VestaSourceCreateVolatileDirectory(srpc, intf_ver);
02725         break;
02726       case VestaSourceSRPC::DeleteVolatileDirectory:
02727         VestaSourceDeleteVolatileDirectory(srpc, intf_ver);
02728         break;
02729       case VestaSourceSRPC::GetBase:
02730         VestaSourceGetBase(srpc, intf_ver);
02731         break;
02732       case VestaSourceSRPC::List:
02733         VestaSourceList(srpc, intf_ver);
02734         break;
02735       case VestaSourceSRPC::GetNFSInfo:
02736         VestaSourceGetNFSInfo(srpc, intf_ver);
02737         break;
02738       case VestaSourceSRPC::FPToShortId:
02739         VestaSourceFPToShortId(srpc, intf_ver);
02740         break;
02741       case VestaSourceSRPC::ReallyDelete:
02742         VestaSourceReallyDelete(srpc, intf_ver);
02743         break;
02744       case VestaSourceSRPC::InsertFile:
02745         VestaSourceInsertFile(srpc, intf_ver);
02746         break;
02747       case VestaSourceSRPC::InsertMutableFile:
02748         VestaSourceInsertMutableFile(srpc, intf_ver);
02749         break;
02750       case VestaSourceSRPC::InsertImmutableDirectory:
02751         VestaSourceInsertImmutableDirectory(srpc, intf_ver);
02752         break;
02753       case VestaSourceSRPC::InsertAppendableDirectory:
02754         VestaSourceInsertAppendableDirectory(srpc, intf_ver);
02755         break;
02756       case VestaSourceSRPC::InsertMutableDirectory:
02757         VestaSourceInsertMutableDirectory(srpc, intf_ver);
02758         break;
02759       case VestaSourceSRPC::InsertGhost:
02760         VestaSourceInsertGhost(srpc, intf_ver);
02761         break;
02762       case VestaSourceSRPC::InsertStub:
02763         VestaSourceInsertStub(srpc, intf_ver);
02764         break;
02765       case VestaSourceSRPC::RenameTo:
02766         VestaSourceRenameTo(srpc, intf_ver);
02767         break;
02768       case VestaSourceSRPC::MakeMutable:
02769         VestaSourceMakeMutable(srpc, intf_ver);
02770         break;
02771       case VestaSourceSRPC::InAttribs:
02772         VestaSourceInAttribs(srpc, intf_ver);
02773         break;
02774       case VestaSourceSRPC::GetAttrib:
02775         VestaSourceGetAttrib(srpc, intf_ver);
02776         break;
02777       case VestaSourceSRPC::GetAttrib2:
02778         VestaSourceGetAttrib2(srpc, intf_ver);
02779         break;
02780       case VestaSourceSRPC::ListAttribs:
02781         VestaSourceListAttribs(srpc, intf_ver);
02782         break;
02783       case VestaSourceSRPC::GetAttribHistory:
02784         VestaSourceGetAttribHistory(srpc, intf_ver);
02785         break;
02786       case VestaSourceSRPC::WriteAttrib:
02787         VestaSourceWriteAttrib(srpc, intf_ver);
02788         break;
02789       case VestaSourceSRPC::LookupPathname:
02790         VestaSourceLookupPathname(srpc, intf_ver);
02791         break;
02792       case VestaSourceSRPC::LookupIndex:
02793         VestaSourceLookupIndex(srpc, intf_ver);
02794         break;
02795       case VestaSourceSRPC::MakeFilesImmutable:
02796         MakeFilesImmutable(srpc, intf_ver);
02797         break;
02798       case VestaSourceSRPC::SetIndexMaster:
02799         SetIndexMaster(srpc, intf_ver);
02800         break;
02801       case VestaSourceSRPC::Stat:
02802         VSStat(srpc, intf_ver);
02803         break;
02804       case VestaSourceSRPC::Read:
02805         VSRead(srpc, intf_ver);
02806         break;
02807       case VestaSourceSRPC::Write:
02808         VSWrite(srpc, intf_ver);
02809         break;
02810       case VestaSourceSRPC::SetExecutable:
02811         VSSetExecutable(srpc, intf_ver);
02812         break;
02813       case VestaSourceSRPC::SetSize:
02814         VSSetSize(srpc, intf_ver);
02815         break;
02816       case VestaSourceSRPC::SetTimestamp:
02817         VSSetTimestamp(srpc, intf_ver);
02818         break;
02819       case VestaSourceSRPC::Atomic:
02820         VSAtomic(srpc, intf_ver);
02821         break;
02822       case VestaSourceSRPC::AtomicTarget:
02823       case VestaSourceSRPC::AtomicDeclare:
02824       case VestaSourceSRPC::AtomicResync:
02825       case VestaSourceSRPC::AtomicTestMaster:
02826       case VestaSourceSRPC::AtomicRun:
02827       case VestaSourceSRPC::AtomicCancel:
02828         {
02829           Text client = srpc->remote_socket();
02830           Repos::dprintf(DBG_ALWAYS,
02831                          "VestaSourceReceptionist got misplaced Atomic "
02832                          "proc_id %d; client %s\n", proc_id, client.cchars());
02833           srpc->send_failure(SRPC::version_skew,
02834                              "VestaSourceSRPC: Misplaced Atomic proc_id");
02835         }
02836         break;
02837       case VestaSourceSRPC::AcquireMastership:
02838         VestaSourceAcquireMastership(srpc, intf_ver);
02839         break;
02840       case VestaSourceSRPC::CedeMastership:
02841         VestaSourceCedeMastership(srpc, intf_ver);
02842         break;
02843       case VestaSourceSRPC::Replicate:
02844         VestaSourceReplicate(srpc, intf_ver);
02845         break;
02846       case VestaSourceSRPC::ReplicateAttribs:
02847         VestaSourceReplicateAttribs(srpc, intf_ver);
02848         break;
02849       case VestaSourceSRPC::GetUserInfo:
02850         VestaSourceGetUserInfo(srpc, intf_ver);
02851         break;
02852       case VestaSourceSRPC::RefreshAccessTables:
02853         VestaSourceRefreshAccessTables(srpc, intf_ver);
02854         break;
02855       case VestaSourceSRPC::GetStats:
02856         VestaSourceGetStats(srpc, intf_ver);
02857         break;
02858       case VestaSourceSRPC::MeasureDirectory:
02859         VestaSourceMeasureDirectory(srpc, intf_ver);
02860         break;
02861       case VestaSourceSRPC::CollapseBase:
02862         VestaSourceCollapseBase(srpc, intf_ver);
02863         break;
02864       case VestaSourceSRPC::SetPerfDebug:
02865         SetPerfDebug(srpc, intf_ver);
02866         break;
02867       case VestaSourceSRPC::GetServerInfo:
02868         GetServerInfo(srpc, intf_ver);
02869         break;
02870       case VestaSourceSRPC::ReadWholeCompressed:
02871         VSReadWholeCompressed(srpc, intf_ver);
02872         break;
02873       default:
02874         {
02875           Text client = srpc->remote_socket();
02876           Repos::dprintf(DBG_ALWAYS, "VestaSourceReceptionist got unknown "
02877                          "proc_id %d; client %s\n", proc_id, client.cchars());
02878           srpc->send_failure(SRPC::version_skew,
02879                              "VestaSourceSRPC: Unknown proc_id");
02880         }
02881         break;
02882       }
02883 }
02884 
02885 void
02886 VestaSourceFailure(SRPC* srpc, const SRPC::failure &f, void* arg)
02887 {
02888     if (f.r == SRPC::partner_went_away && !Repos::isDebugLevel(DBG_SRPC))
02889       return;
02890     Text client;
02891     try {
02892         client = srpc->remote_socket();
02893     } catch (SRPC::failure f) {
02894         client = "unknown";
02895     }
02896     Repos::dprintf(DBG_ALWAYS,
02897                    "VestaSourceReceptionist got SRPC::failure: %s (%d); "
02898                    "client %s\n", f.msg.cchars(), f.r, client.cchars());
02899 }
02900 
02901 void
02902 VestaSourceServerExport()
02903 {
02904     Text port;
02905     int max_running=DEF_MAX_RUNNING, max_blocked=DEF_MAX_BLOCKED;
02906     Text unused;
02907 
02908     try {
02909         //VestaSourceSRPC_port is required.
02910         //SRPC_max_* are allowed to be non-existant or ints.
02911         //anything else is fatal.
02912         port = VestaConfig::get_Text("Repository", "VestaSourceSRPC_port");
02913         if(VestaConfig::get("Repository",
02914                             "VestaSourceSRPC_max_running", unused))
02915             max_running = VestaConfig::get_int("Repository",
02916                                                "VestaSourceSRPC_max_running");
02917         if(VestaConfig::get("Repository",
02918                             "VestaSourceSRPC_max_blocked", unused))
02919             max_blocked = VestaConfig::get_int("Repository",
02920                                                "VestaSourceSRPC_max_blocked");
02921         if(VestaConfig::get("Repository",
02922                             "VestaSourceSRPC_read_timeout", unused))
02923             readTimeout = VestaConfig::get_int("Repository",
02924                                                "VestaSourceSRPC_read_timeout");
02925         if(VestaConfig::get("Repository",
02926                             "VestaSourceSRPC_readWhole_raw_bufsiz", unused))
02927           readWhole_raw_bufsiz =
02928               VestaConfig::get_int("Repository",
02929                                    "VestaSourceSRPC_readWhole_raw_bufsiz");
02930         if(VestaConfig::get("Repository",
02931                             "VestaSourceSRPC_readWhole_deflate_bufsiz",
02932                             unused))
02933           readWhole_deflate_bufsiz =
02934               VestaConfig::get_int("Repository",
02935                                    "VestaSourceSRPC_readWhole_deflate_bufsiz");
02936         if(VestaConfig::get("Repository",
02937                             "VestaSourceSRPC_readWhole_deflate_level",
02938                             unused))
02939           readWhole_deflate_level =
02940               VestaConfig::get_int("Repository",
02941                                    "VestaSourceSRPC_readWhole_deflate_level");
02942     } catch (VestaConfig::failure f) {
02943         Repos::dprintf(DBG_ALWAYS,
02944                        "VestaConfig::failure in VestaSourceServerExport: %s\n",
02945                        f.msg.cchars());
02946         abort();
02947     }
02948 
02949     static Basics::thread_attr ls_thread_attr(STACK_SIZE);
02950 
02951 #if defined (_POSIX_THREAD_PRIORITY_SCHEDULING) && !defined(__linux__)
02952     // Linux only allows the superuser to use SCHED_RR
02953     ls_thread_attr.set_schedpolicy(SCHED_RR);
02954     ls_thread_attr.set_inheritsched(PTHREAD_EXPLICIT_SCHED);
02955     ls_thread_attr.set_sched_priority(sched_get_priority_min(SCHED_RR));
02956 #endif
02957 
02958     static LimService receptionist(port, max_running,
02959                                    max_blocked, VestaSourceReceptionist,
02960                                    VestaSourceFailure, NULL, ls_thread_attr);
02961     receptionist.Forked_Run();
02962 }

Generated on Mon May 8 00:48:53 2006 for Vesta by  doxygen 1.4.2