00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <time.h>
00026 #include <assert.h>
00027 #include <zlib.h>
00028
00029 #include <SRPC.H>
00030 #include <Thread.H>
00031 #include <LimService.H>
00032 #include <OS.H>
00033 #include "VestaSourceSRPC.H"
00034 #include "VestaSource.H"
00035 #include "VDirVolatileRoot.H"
00036 #include "VRConcurrency.H"
00037 #include "VestaConfig.H"
00038 #include "VestaSourceServer.H"
00039 #include "FPShortId.H"
00040 #include "logging.H"
00041 #include "VestaSourceImpl.H"
00042 #include "Mastership.H"
00043 #include "Replication.H"
00044 #include "ReposStats.H"
00045 #include "FdCache.H"
00046 #include "dupe.H"
00047
00048 #include "lock_timing.H"
00049
00050 #include "nfsStats.H"
00051
00052 #if defined(REPOS_PERF_DEBUG)
00053 #include "timing.H"
00054 #include "perfDebugControl.H"
00055 #endif
00056
00057 #if defined(__linux__) && defined(__GNUC__) && (__GNUC__ < 3)
00058
00059 extern "C" ssize_t pread(int fd, void *buf, size_t count, off_t offset);
00060 #endif
00061
00062 #define DEF_MAX_RUNNING 32
00063 #define DEF_MAX_BLOCKED 8
00064 #define STACK_SIZE 256000L
00065
00066
00067
00068
00069
00070 extern "C"
00071 {
00072 void SRPC_Call_Stats_Helper_stats_init() throw();
00073 }
00074
00075
00076
00077
00078 class SRPC_Call_Stats
00079 {
00080 private:
00081
00082 static Basics::mutex head_mu;
00083 static SRPC_Call_Stats *head;
00084 SRPC_Call_Stats *next;
00085
00086
00087 Basics::mutex mu;
00088
00089
00090 Basics::uint64 call_count;
00091 Basics::uint64 elapsed_secs;
00092 Basics::uint32 elapsed_usecs;
00093
00094
00095 void accumulateStats( Basics::uint64 &calls,
00096 Basics::uint64 &secs,
00097 Basics::uint32 &usecs);
00098
00099
00100 void recordCall(Basics::uint32 secs, Basics::uint32 &usecs);
00101
00102 public:
00103
00104 SRPC_Call_Stats();
00105 ~SRPC_Call_Stats();
00106
00107 static void getStats( Basics::uint64 &calls,
00108 Basics::uint64 &secs,
00109 Basics::uint32 &usecs);
00110
00111
00112
00113 class Helper
00114 {
00115 private:
00116 static pthread_key_t stats_key;
00117 static pthread_once_t stats_once;
00118
00119 friend void SRPC_Call_Stats_Helper_stats_init() throw();
00120
00121 struct timeval call_start;
00122 public:
00123 Helper();
00124 ~Helper();
00125 };
00126
00127 friend class Helper;
00128 };
00129
00130 Basics::mutex SRPC_Call_Stats::head_mu;
00131 SRPC_Call_Stats *SRPC_Call_Stats::head = 0;
00132
00133 pthread_key_t SRPC_Call_Stats::Helper::stats_key;
00134 pthread_once_t SRPC_Call_Stats::Helper::stats_once = PTHREAD_ONCE_INIT;
00135
00136 void SRPC_Call_Stats::accumulateStats( Basics::uint64 &calls,
00137 Basics::uint64 &secs,
00138 Basics::uint32 &usecs)
00139 {
00140 this->mu.lock();
00141 calls += this->call_count;
00142 secs += this->elapsed_secs;
00143 usecs += this->elapsed_usecs;
00144 this->mu.unlock();
00145 }
00146
00147 void SRPC_Call_Stats::recordCall(Basics::uint32 secs, Basics::uint32 &usecs)
00148 {
00149 this->mu.lock();
00150
00151 this->call_count++;
00152
00153
00154 this->elapsed_secs += secs;
00155 this->elapsed_usecs += usecs;
00156 if(this->elapsed_usecs > USECS_PER_SEC)
00157 {
00158 this->elapsed_secs += this->elapsed_usecs/USECS_PER_SEC;
00159 this->elapsed_usecs %= USECS_PER_SEC;
00160 }
00161 this->mu.unlock();
00162 }
00163
00164 void SRPC_Call_Stats::getStats( Basics::uint64 &calls,
00165 Basics::uint64 &secs,
00166 Basics::uint32 &usecs)
00167 {
00168
00169 calls = 0;
00170 secs = 0;
00171 usecs = 0;
00172
00173
00174 SRPC_Call_Stats *list;
00175 SRPC_Call_Stats::head_mu.lock();
00176 list = SRPC_Call_Stats::head;
00177 SRPC_Call_Stats::head_mu.unlock();
00178
00179 while(list != 0)
00180 {
00181
00182 list->accumulateStats(calls, secs, usecs);
00183
00184
00185 if(usecs > USECS_PER_SEC)
00186 {
00187 secs += usecs/USECS_PER_SEC;
00188 usecs %= USECS_PER_SEC;
00189 }
00190
00191
00192
00193
00194 list = list->next;
00195 }
00196 }
00197
00198 SRPC_Call_Stats::SRPC_Call_Stats()
00199 : next(0), call_count(0), elapsed_secs(0), elapsed_usecs(0)
00200 {
00201
00202 SRPC_Call_Stats::head_mu.lock();
00203 this->next = SRPC_Call_Stats::head;
00204 SRPC_Call_Stats::head = this;
00205 SRPC_Call_Stats::head_mu.unlock();
00206 }
00207
00208 SRPC_Call_Stats::~SRPC_Call_Stats()
00209 {
00210
00211 assert(false);
00212 }
00213
00214 extern "C"
00215 {
00216 void SRPC_Call_Stats_Helper_stats_init() throw()
00217 {
00218 int err = pthread_key_create(&SRPC_Call_Stats::Helper::stats_key, NULL);
00219 assert(err == 0);
00220 }
00221 }
00222
00223 SRPC_Call_Stats::Helper::Helper()
00224 {
00225
00226 struct timezone unused_tz;
00227 int err = gettimeofday(&this->call_start, &unused_tz);
00228 assert(err == 0);
00229 }
00230
00231 SRPC_Call_Stats::Helper::~Helper()
00232 {
00233
00234 struct timezone unused_tz;
00235 struct timeval call_end;
00236 int err = gettimeofday(&call_end, &unused_tz);
00237 assert(err == 0);
00238
00239
00240 pthread_once(&SRPC_Call_Stats::Helper::stats_once,
00241 SRPC_Call_Stats_Helper_stats_init);
00242
00243
00244
00245 SRPC_Call_Stats *thread_stats =
00246 ((SRPC_Call_Stats *)
00247 pthread_getspecific(SRPC_Call_Stats::Helper::stats_key));
00248 if(thread_stats == 0)
00249 {
00250 thread_stats = NEW(SRPC_Call_Stats);
00251
00252 err = pthread_setspecific(SRPC_Call_Stats::Helper::stats_key,
00253 (void *) thread_stats);
00254 assert(err == 0);
00255 }
00256
00257
00258 Basics::uint32 call_secs, call_usecs;
00259 if(call_end.tv_sec == this->call_start.tv_sec)
00260 {
00261 call_secs = 0;
00262 if(call_end.tv_usec >= this->call_start.tv_usec)
00263 call_usecs = call_end.tv_usec - this->call_start.tv_usec;
00264 else
00265
00266
00267 call_usecs = 0;
00268 }
00269 else if(call_end.tv_sec > this->call_start.tv_sec)
00270 {
00271 call_secs = (call_end.tv_sec - this->call_start.tv_sec) - 1;
00272 call_usecs = ((call_end.tv_usec + USECS_PER_SEC) -
00273 this->call_start.tv_usec);
00274 }
00275 else
00276 {
00277
00278
00279 call_secs = 0;
00280 call_usecs = 0;
00281 }
00282
00283
00284 thread_stats->recordCall(call_secs, call_usecs);
00285 }
00286
00287
00288
00289
00290
00291
00292
00293
00294 static unsigned int readTimeout = 300;
00295
00296
00297 AccessControl::Identity
00298 srpc_recv_identity(SRPC* srpc, int intf_ver, bool access_needed)
00299 {
00300 AccessControl::Identity ret = 0;
00301 sockaddr_in addr;
00302 srpc->socket()->get_remote_addr(addr);
00303
00304 AccessControl::IdentityRep::Flavor flavor;
00305 if (intf_ver <= 10) {
00306 flavor = AccessControl::IdentityRep::unix_flavor;
00307 } else {
00308 flavor = (AccessControl::IdentityRep::Flavor) srpc->recv_int();
00309 }
00310 switch (flavor) {
00311 case AccessControl::IdentityRep::unix_flavor: {
00312 authunix_parms* aup = NEW(authunix_parms);
00313 aup->aup_time = (u_int) srpc->recv_int();
00314 aup->aup_machname = NEW_PTRFREE_ARRAY(char, MAX_MACHINE_NAME);
00315 int len = MAX_MACHINE_NAME;
00316 srpc->recv_chars_here(aup->aup_machname, len);
00317 aup->aup_uid = srpc->recv_int();
00318 aup->aup_gid = srpc->recv_int();
00319 srpc->recv_seq_start((int*) &aup->aup_len);
00320 if (aup->aup_len > NGRPS)
00321 srpc->send_failure(SRPC::buffer_too_small, "too many GIDs");
00322 #if __linux__
00323 aup->aup_gids = NEW_PTRFREE_ARRAY(__gid_t, NGRPS);
00324 #else
00325 aup->aup_gids = NEW_PTRFREE_ARRAY(int, NGRPS);
00326 #endif
00327 unsigned int i;
00328 for (i=0; i<aup->aup_len; i++) {
00329 aup->aup_gids[i] = srpc->recv_int();
00330 }
00331 srpc->recv_seq_end();
00332 ret = NEW_CONSTR(AccessControl::UnixIdentityRep, (aup, &addr));
00333 break; }
00334
00335 case AccessControl::IdentityRep::global: {
00336 char* user = srpc->recv_chars();
00337 ret = NEW_CONSTR(AccessControl::GlobalIdentityRep, (user, &addr));
00338 break; }
00339
00340 case AccessControl::IdentityRep::gssapi: {
00341 srpc->send_failure(SRPC::protocol_violation,
00342 "gssapi identity flavor not implemented yet");
00343 break; }
00344
00345 default:
00346 srpc->send_failure(SRPC::protocol_violation, "unknown identity flavor");
00347 }
00348 if (access_needed && !AccessControl::admit(ret)) {
00349
00350 delete ret;
00351 srpc->send_failure(SRPC::invalid_parameter, "unauthorized user");
00352 }
00353 return ret;
00354 }
00355
00356
00357 static void
00358 VestaSourceLookup(SRPC* srpc, int intf_ver)
00359 {
00360 LongId longid;
00361 char arc[MAX_ARC_LEN+1];
00362 int len;
00363 VestaSource::errorCode err;
00364
00365
00366 len = sizeof(longid.value);
00367 srpc->recv_bytes_here((char *) &longid.value, len);
00368 len = MAX_ARC_LEN+1;
00369 srpc->recv_chars_here(arc, len);
00370 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00371 srpc->recv_end();
00372
00373
00374 ReadersWritersLock* lock;
00375 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00376 try {
00377 if (vs == NULL) {
00378 err = VestaSource::invalidArgs;
00379 } else {
00380 RWLOCK_LOCKED_REASON(lock, "SRPC:lookup");
00381 err = VestaSource::ok;
00382
00383 if (len > 0) {
00384 VestaSource* childvs;
00385 err = vs->lookup(arc, childvs, who);
00386 delete vs;
00387 vs = childvs;
00388 }
00389 }
00390
00391
00392 srpc->send_int((int) err);
00393 if (err == VestaSource::ok) {
00394 srpc->send_int((int) vs->type);
00395 srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00396 srpc->send_int((int) vs->master);
00397 srpc->send_int((int) vs->pseudoInode);
00398 srpc->send_int((int) vs->shortId());
00399 srpc->send_int((int) vs->timestamp());
00400 srpc->send_int((int) (vs->hasAttribs() ||
00401 MutableRootLongId.isAncestorOf(vs->longid)));
00402 vs->fptag.Send(*srpc);
00403 delete vs;
00404 }
00405 } catch (SRPC::failure f) {
00406 if (lock != NULL) lock->releaseRead();
00407 delete who;
00408 throw;
00409 }
00410 if (lock != NULL) lock->releaseRead();
00411 delete who;
00412 srpc->send_end();
00413 }
00414
00415
00416
00417 static void
00418 VestaSourceLookupPathname(SRPC* srpc, int intf_ver)
00419 {
00420 LongId longid;
00421 char* pathname;
00422 char pathnameSep;
00423 VestaSource::errorCode err;
00424 int len;
00425
00426
00427 len = sizeof(longid.value);
00428 srpc->recv_bytes_here((char *) &longid.value, len);
00429 pathname = srpc->recv_chars();
00430 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00431 pathnameSep = (char) srpc->recv_int();
00432 srpc->recv_end();
00433
00434
00435 ReadersWritersLock* lock;
00436 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00437 try {
00438 if (vs == NULL) {
00439 err = VestaSource::invalidArgs;
00440 } else {
00441 RWLOCK_LOCKED_REASON(lock, "SRPC:lookupPathname");
00442 VestaSource* childvs;
00443 err = vs->lookupPathname(pathname, childvs, who, pathnameSep);
00444 delete vs;
00445 vs = childvs;
00446 }
00447
00448
00449 srpc->send_int((int) err);
00450 if (err == VestaSource::ok) {
00451 srpc->send_int((int) vs->type);
00452 srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00453 srpc->send_int((int) vs->master);
00454 srpc->send_int((int) vs->pseudoInode);
00455 srpc->send_int((int) vs->shortId());
00456 srpc->send_int((int) vs->timestamp());
00457 srpc->send_int((int) (vs->hasAttribs() ||
00458 MutableRootLongId.isAncestorOf(vs->longid)));
00459 vs->fptag.Send(*srpc);
00460 delete vs;
00461 }
00462 } catch (SRPC::failure f) {
00463 if (lock != NULL) lock->releaseRead();
00464 delete [] pathname;
00465 delete who;
00466 throw;
00467 }
00468 if (lock != NULL) lock->releaseRead();
00469 delete [] pathname;
00470 delete who;
00471 srpc->send_end();
00472 }
00473
00474
00475
00476 static void
00477 VestaSourceLookupIndex(SRPC* srpc, int intf_ver)
00478 {
00479 LongId longid;
00480 unsigned int index;
00481 VestaSource::errorCode err;
00482 int len;
00483 char arcbuf[MAX_ARC_LEN+1];
00484 bool sendarc;
00485
00486
00487 len = sizeof(longid.value);
00488 srpc->recv_bytes_here((char *) &longid.value, len);
00489 index = (unsigned int) srpc->recv_int();
00490 sendarc = (bool) srpc->recv_int();
00491 srpc->recv_end();
00492
00493
00494 ReadersWritersLock* lock;
00495 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00496 try {
00497 if (vs == NULL) {
00498 err = VestaSource::invalidArgs;
00499 } else {
00500 RWLOCK_LOCKED_REASON(lock, "SRPC:lookupIndex");
00501 VestaSource* childvs;
00502 err = vs->lookupIndex(index, childvs, sendarc ? arcbuf : NULL);
00503 delete vs;
00504 vs = childvs;
00505 }
00506
00507
00508 srpc->send_int((int) err);
00509 if (err == VestaSource::ok) {
00510 srpc->send_int((int) vs->type);
00511 srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00512 srpc->send_int((int) vs->master);
00513 srpc->send_int((int) vs->pseudoInode);
00514 srpc->send_int((int) vs->shortId());
00515 srpc->send_int((int) vs->timestamp());
00516 srpc->send_int((int) (vs->hasAttribs() ||
00517 MutableRootLongId.isAncestorOf(vs->longid)));
00518 vs->fptag.Send(*srpc);
00519 delete vs;
00520 if (sendarc) srpc->send_chars(arcbuf);
00521 }
00522 } catch (SRPC::failure f) {
00523 if (lock != NULL) lock->releaseRead();
00524 throw;
00525 }
00526 if (lock != NULL) lock->releaseRead();
00527 srpc->send_end();
00528 }
00529
00530
00531
00532 static void
00533 VestaSourceCreateVolatileDirectory(SRPC* srpc, int intf_ver)
00534 {
00535 char host[MAX_ARC_LEN+1];
00536 char port[MAX_ARC_LEN+1];
00537 int len;
00538 VestaSource::errorCode err;
00539 Bit64 handle;
00540 time_t timestamp;
00541 bool readOnlyExisting;
00542
00543
00544 len = MAX_ARC_LEN+1;
00545 srpc->recv_chars_here(host, len);
00546 len = MAX_ARC_LEN+1;
00547 srpc->recv_chars_here(port, len);
00548 len = sizeof(handle);
00549 srpc->recv_bytes_here((char*) &handle, len);
00550 timestamp = (time_t) srpc->recv_int();
00551 readOnlyExisting = (bool) srpc->recv_int();
00552 srpc->recv_end();
00553
00554
00555 VestaSource* vs;
00556 ReadersWritersLock* lock;
00557 err = ((VDirVolatileRoot*) VestaSource::volatileRoot())->
00558 createVolatileDirectory(host, port, handle, vs, timestamp,
00559 LongId::readLock, &lock, readOnlyExisting);
00560 try {
00561
00562 srpc->send_int((int) err);
00563 if (err == VestaSource::ok) {
00564 srpc->send_int((int) vs->type);
00565 srpc->send_bytes((const char *) &vs->longid, sizeof(vs->longid));
00566 srpc->send_int((int) vs->master);
00567 srpc->send_int((int) vs->pseudoInode);
00568 srpc->send_int((int) vs->shortId());
00569 srpc->send_int((int) vs->timestamp());
00570 srpc->send_int((int) vs->hasAttribs());
00571 vs->fptag.Send(*srpc);
00572 delete vs;
00573 }
00574 } catch (SRPC::failure f) {
00575 if (lock != NULL) lock->releaseRead();
00576 throw;
00577 }
00578 if (lock != NULL) lock->releaseRead();
00579 srpc->send_end();
00580 }
00581
00582
00583 static void
00584 VestaSourceDeleteVolatileDirectory(SRPC* srpc, int intf_ver)
00585 {
00586 LongId longid, plongid;
00587 int len;
00588 VestaSource::errorCode err;
00589
00590
00591 len = sizeof(longid.value);
00592 srpc->recv_bytes_here((char *) &longid.value, len);
00593 srpc->recv_end();
00594
00595
00596 unsigned int index;
00597 plongid = longid.getParent(&index);
00598 if (memcmp(&plongid, &VolatileRootLongId, sizeof(LongId)) != 0) {
00599 err = VestaSource::inappropriateOp;
00600 } else {
00601 ReadersWritersLock *lock;
00602 err = ((VDirVolatileRoot*)
00603 VestaSource::volatileRoot(LongId::writeLock, &lock))->
00604 deleteIndex(index);
00605 RWLOCK_LOCKED_REASON(lock, "SRPC:DeleteVolatileDirectory");
00606 lock->releaseWrite();
00607 }
00608
00609
00610 srpc->send_int((int) err);
00611 srpc->send_end();
00612 }
00613
00614
00615 static void
00616 VestaSourceGetBase(SRPC* srpc, int intf_ver)
00617 {
00618 LongId longid;
00619 int len;
00620 VestaSource::errorCode err;
00621
00622
00623 len = sizeof(longid.value);
00624 srpc->recv_bytes_here((char *) &longid.value, len);
00625 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00626 srpc->recv_end();
00627
00628
00629 ReadersWritersLock* lock;
00630 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00631 try {
00632 VestaSource* mvs;
00633 if (vs == NULL) {
00634 err = VestaSource::invalidArgs;
00635 } else {
00636 RWLOCK_LOCKED_REASON(lock, "SRPC:GetBase");
00637 err = vs->getBase(mvs, who);
00638 delete vs;
00639 }
00640
00641 srpc->send_int((int) err);
00642 if (err == VestaSource::ok) {
00643 srpc->send_int((int) mvs->type);
00644 srpc->send_bytes((const char *) &mvs->longid, sizeof(mvs->longid));
00645 srpc->send_int((int) mvs->master);
00646 srpc->send_int((int) mvs->pseudoInode);
00647 srpc->send_int((int) mvs->shortId());
00648 srpc->send_int((int) mvs->timestamp());
00649 srpc->send_int((int) mvs->hasAttribs());
00650 mvs->fptag.Send(*srpc);
00651 delete mvs;
00652 }
00653 } catch (SRPC::failure f) {
00654 if (lock != NULL) lock->releaseRead();
00655 delete who;
00656 throw;
00657 }
00658 if (lock != NULL) lock->releaseRead();
00659 delete who;
00660 srpc->send_end();
00661 }
00662
00663
00664 struct VSLClosure {
00665 SRPC* srpc;
00666 int cost, limit, overhead;
00667 int intf_ver;
00668 };
00669
00670
00671 static bool
00672 VSLCallback(void* closure, VestaSource::typeTag type, Arc arc,
00673 unsigned int index, Bit32 pseudoInode, ShortId filsid, bool master)
00674 {
00675 VSLClosure* cl = (VSLClosure*) closure;
00676 cl->cost += cl->overhead + strlen(arc);
00677 if (cl->cost > cl->limit) return false;
00678 cl->srpc->send_chars(arc);
00679 cl->srpc->send_int((int) type);
00680 cl->srpc->send_int((int) index);
00681 if (cl->intf_ver >= 10) {
00682 cl->srpc->send_int((int) pseudoInode);
00683 cl->srpc->send_int((int) filsid);
00684 } else {
00685
00686 cl->srpc->send_int(((int)filsid) ? ((int)filsid) : ((int)pseudoInode));
00687 }
00688 cl->srpc->send_int((int) master);
00689 return true;
00690 }
00691
00692
00693
00694 static void
00695 VestaSourceList(SRPC* srpc, int intf_ver)
00696 {
00697 LongId longid;
00698 unsigned int sindex;
00699 bool delta;
00700 int limit, overhead;
00701 VestaSource::errorCode err;
00702 VSLClosure cl;
00703 int len;
00704
00705
00706 len = sizeof(longid.value);
00707 srpc->recv_bytes_here((char *) &longid.value, len);
00708 sindex = (unsigned int) srpc->recv_int();
00709 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00710 delta = (bool) srpc->recv_int();
00711 limit = srpc->recv_int();
00712 overhead = srpc->recv_int();
00713 srpc->recv_end();
00714
00715
00716 srpc->send_seq_start();
00717 ReadersWritersLock* lock;
00718 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
00719 try {
00720 if (vs == NULL) {
00721 err = VestaSource::notFound;
00722 } else {
00723 RWLOCK_LOCKED_REASON(lock, "SRPC:List");
00724 cl.srpc = srpc;
00725 cl.cost = 0;
00726 cl.limit = limit;
00727 cl.overhead = overhead;
00728 cl.intf_ver = intf_ver;
00729 err = vs->list(sindex, VSLCallback, &cl, who, delta, 0);
00730 delete vs;
00731 }
00732 if (err != VestaSource::ok || cl.cost <= cl.limit) {
00733
00734
00735 srpc->send_chars("");
00736 srpc->send_int((int) VestaSource::unused);
00737 srpc->send_int((int) err);
00738 }
00739 } catch (SRPC::failure f) {
00740 if (vs != NULL) delete vs;
00741 if (lock != NULL) lock->releaseRead();
00742 delete who;
00743 throw;
00744 }
00745 if (lock != NULL) lock->releaseRead();
00746 delete who;
00747 srpc->send_seq_end();
00748 srpc->send_end();
00749 }
00750
00751
00752
00753 static void
00754 VestaSourceGetNFSInfo(SRPC* srpc, int intf_ver)
00755 {
00756 VestaSource::errorCode err;
00757 extern char* MyNFSSocket;
00758
00759
00760 srpc->recv_end();
00761
00762
00763 srpc->send_chars(MyNFSSocket);
00764 srpc->send_bytes((const char *) &RootLongId, sizeof(LongId));
00765 srpc->send_bytes((const char *) &MutableRootLongId, sizeof(LongId));
00766 srpc->send_end();
00767 }
00768
00769
00770
00771 static void
00772 VestaSourceFPToShortId(SRPC* srpc, int intf_ver)
00773 {
00774 VestaSource::errorCode err;
00775 FP::Tag fptag;
00776 ShortId sid;
00777
00778 fptag.Recv(*srpc);
00779 srpc->recv_end();
00780
00781
00782 sid = GetFPShortId(fptag);
00783
00784 srpc->send_int((int)sid);
00785 srpc->send_end();
00786 }
00787
00788 static void
00789 VestaSourceReallyDelete(SRPC* srpc, int intf_ver)
00790 {
00791 int len;
00792 LongId longid;
00793 char arc[MAX_ARC_LEN+1];
00794 bool existCheck;
00795 time_t timestamp;
00796 VestaSource::errorCode err;
00797
00798
00799 len = sizeof(longid.value);
00800 srpc->recv_bytes_here((char *) &longid.value, len);
00801 len = MAX_ARC_LEN+1;
00802 srpc->recv_chars_here(arc, len);
00803 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00804 existCheck = (bool) srpc->recv_int();
00805 timestamp = (time_t) srpc->recv_int();
00806 srpc->recv_end();
00807
00808
00809 ReadersWritersLock* lock;
00810 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
00811 if (vs == NULL) {
00812 err = VestaSource::invalidArgs;
00813 } else {
00814 RWLOCK_LOCKED_REASON(lock, "SRPC:reallyDelete");
00815 err = vs->reallyDelete(arc, who, existCheck, timestamp);
00816 delete vs;
00817 }
00818 if (lock != NULL) lock->releaseWrite();
00819 delete who;
00820
00821
00822 srpc->send_int((int) err);
00823 srpc->send_end();
00824 }
00825
00826 static void
00827 VestaSourceInsertFile(SRPC* srpc, int intf_ver)
00828 {
00829 int len;
00830 LongId longid, rlongid;
00831 char arc[MAX_ARC_LEN+1];
00832 ShortId sid;
00833 bool master;
00834 VestaSource::dupeCheck chk;
00835 time_t timestamp;
00836 VestaSource::errorCode err;
00837 FP::Tag fptag;
00838
00839
00840 len = sizeof(longid.value);
00841 srpc->recv_bytes_here((char *) &longid.value, len);
00842 len = MAX_ARC_LEN+1;
00843 srpc->recv_chars_here(arc, len);
00844 sid = (ShortId) srpc->recv_int();
00845 master = (bool) srpc->recv_int();
00846 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00847 chk = (VestaSource::dupeCheck) srpc->recv_int();
00848 timestamp = (time_t) srpc->recv_int();
00849
00850 len = FP::ByteCnt;
00851 unsigned char fpbytes[FP::ByteCnt];
00852 srpc->recv_bytes_here((char*)fpbytes, len);
00853 bool have_fp = (len == FP::ByteCnt);
00854 if(have_fp)
00855 {
00856 fptag.FromBytes(fpbytes);
00857 }
00858
00859 srpc->recv_end();
00860
00861
00862 ReadersWritersLock* lock;
00863 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
00864 VestaSource* newvs;
00865 if (vs == NULL) {
00866 err = VestaSource::invalidArgs;
00867 } else {
00868 RWLOCK_LOCKED_REASON(lock, "SRPC:insertFile");
00869 err = vs->insertFile(arc, sid, master, who, chk, &newvs, timestamp,
00870 have_fp ? &fptag : NULL);
00871 delete vs;
00872 }
00873 if (lock != NULL) lock->releaseWrite();
00874 delete who;
00875
00876
00877 srpc->send_int((int) err);
00878 if (err == VestaSource::ok) {
00879 srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
00880 srpc->send_int((int) newvs->pseudoInode);
00881 newvs->fptag.Send(*srpc);
00882 srpc->send_int((int) newvs->shortId());
00883 delete newvs;
00884 }
00885 srpc->send_end();
00886 }
00887
00888 static void
00889 VestaSourceInsertMutableFile(SRPC* srpc, int intf_ver)
00890 {
00891 int len;
00892 LongId longid, rlongid;
00893 char arc[MAX_ARC_LEN+1];
00894 ShortId sid;
00895 bool master;
00896 VestaSource::dupeCheck chk;
00897 time_t timestamp;
00898 VestaSource::errorCode err;
00899
00900
00901 len = sizeof(longid.value);
00902 srpc->recv_bytes_here((char *) &longid.value, len);
00903 len = MAX_ARC_LEN+1;
00904 srpc->recv_chars_here(arc, len);
00905 sid = (ShortId) srpc->recv_int();
00906 master = (bool) srpc->recv_int();
00907 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00908 chk = (VestaSource::dupeCheck) srpc->recv_int();
00909 timestamp = (time_t) srpc->recv_int();
00910 srpc->recv_end();
00911
00912
00913 ReadersWritersLock* lock;
00914 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
00915 VestaSource* newvs;
00916 if (vs == NULL) {
00917 err = VestaSource::invalidArgs;
00918 } else {
00919 RWLOCK_LOCKED_REASON(lock, "SRPC:insertMutableFile");
00920 err = vs->insertMutableFile(arc, sid, master, who, chk, &newvs,
00921 timestamp);
00922 delete vs;
00923 }
00924 if (lock != NULL) lock->releaseWrite();
00925 delete who;
00926
00927
00928 srpc->send_int((int) err);
00929 if (err == VestaSource::ok) {
00930 srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
00931 srpc->send_int((int) newvs->pseudoInode);
00932 newvs->fptag.Send(*srpc);
00933 srpc->send_int((int) newvs->shortId());
00934 delete newvs;
00935 }
00936 srpc->send_end();
00937 }
00938
00939 static void
00940 VestaSourceInsertImmutableDirectory(SRPC* srpc, int intf_ver)
00941 {
00942 int len;
00943 LongId longid, dlongid, rlongid;
00944 char arc[MAX_ARC_LEN+1];
00945 bool master;
00946 VestaSource::dupeCheck chk;
00947 time_t timestamp;
00948 VestaSource::errorCode err;
00949 FP::Tag fptag;
00950
00951
00952 len = sizeof(longid.value);
00953 srpc->recv_bytes_here((char *) &longid.value, len);
00954 len = MAX_ARC_LEN+1;
00955 srpc->recv_chars_here(arc, len);
00956 len = sizeof(dlongid.value);
00957 srpc->recv_bytes_here((char *) &dlongid.value, len);
00958 master = (bool) srpc->recv_int();
00959 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
00960 chk = (VestaSource::dupeCheck) srpc->recv_int();
00961 timestamp = (time_t) srpc->recv_int();
00962
00963 len = FP::ByteCnt;
00964 unsigned char fpbytes[FP::ByteCnt];
00965 srpc->recv_bytes_here((char*)fpbytes, len);
00966 bool have_fp = (len == FP::ByteCnt);
00967 if(have_fp)
00968 {
00969 fptag.FromBytes(fpbytes);
00970 }
00971
00972 srpc->recv_end();
00973
00974
00975 ReadersWritersLock* vlock = NULL;
00976 VestaSource* vs;
00977 ReadersWritersLock* lock;
00978 if (VolatileRootLongId.isAncestorOf(longid)) {
00979
00980
00981 vlock = &VolatileRootLock;
00982 vlock->acquireRead();
00983 RWLOCK_LOCKED_REASON(vlock,
00984 "SRPC:insertImmutableDirectory (in volatile)");
00985 vs = longid.lookup(LongId::writeLockV, &lock);
00986 } else {
00987 vs = longid.lookup(LongId::writeLock, &lock);
00988 }
00989 VestaSource* newvs;
00990 if (vs == NULL) {
00991 err = VestaSource::invalidArgs;
00992 if (vlock != NULL) vlock->releaseRead();
00993 } else {
00994 RWLOCK_LOCKED_REASON(lock, "SRPC:insertImmutableDirectory");
00995 VestaSource* dirvs = dlongid.lookup(LongId::checkLock, &lock);
00996 if (vlock != NULL) vlock->releaseRead();
00997 err = vs->insertImmutableDirectory(arc, dirvs, master, who, chk,
00998 &newvs, timestamp,
00999 have_fp ? &fptag : NULL);
01000 delete vs;
01001 if (dirvs != NULL) delete dirvs;
01002 }
01003 if (lock != NULL) lock->releaseWrite();
01004 delete who;
01005
01006
01007 srpc->send_int((int) err);
01008 if (err == VestaSource::ok) {
01009 srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01010 srpc->send_int((int) newvs->pseudoInode);
01011 newvs->fptag.Send(*srpc);
01012 srpc->send_int(newvs->shortId());
01013 delete newvs;
01014 }
01015 srpc->send_end();
01016 }
01017
01018 static void
01019 VestaSourceInsertAppendableDirectory(SRPC* srpc, int intf_ver)
01020 {
01021 int len;
01022 LongId longid, rlongid;
01023 char arc[MAX_ARC_LEN+1];
01024 bool master;
01025 VestaSource::dupeCheck chk;
01026 time_t timestamp;
01027 VestaSource::errorCode err;
01028
01029
01030 len = sizeof(longid.value);
01031 srpc->recv_bytes_here((char *) &longid.value, len);
01032 len = MAX_ARC_LEN+1;
01033 srpc->recv_chars_here(arc, len);
01034 master = (bool) srpc->recv_int();
01035 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01036 chk = (VestaSource::dupeCheck) srpc->recv_int();
01037 timestamp = (time_t) srpc->recv_int();
01038 srpc->recv_end();
01039
01040
01041 ReadersWritersLock* lock;
01042 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01043 VestaSource* newvs;
01044 if (vs == NULL) {
01045 err = VestaSource::invalidArgs;
01046 } else {
01047 RWLOCK_LOCKED_REASON(lock, "SRPC:insertAppendableDirectory");
01048 err = vs->insertAppendableDirectory(arc, master, who, chk,
01049 &newvs, timestamp);
01050 if((err == VestaSource::ok) && master && !vs->master &&
01051 !myMasterHint.Empty())
01052 {
01053
01054
01055
01056
01057 newvs->setAttrib("master-repository", myMasterHint.cchars(), NULL);
01058 }
01059 delete vs;
01060 }
01061 if (lock != NULL) lock->releaseWrite();
01062 delete who;
01063
01064
01065 srpc->send_int((int) err);
01066 if (err == VestaSource::ok) {
01067 srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01068 srpc->send_int((int) newvs->pseudoInode);
01069 newvs->fptag.Send(*srpc);
01070 delete newvs;
01071 }
01072 srpc->send_end();
01073 }
01074
01075 static void
01076 VestaSourceInsertMutableDirectory(SRPC* srpc, int intf_ver)
01077 {
01078 int len;
01079 LongId longid, dlongid, rlongid;
01080 char arc[MAX_ARC_LEN+1];
01081 bool master;
01082 VestaSource::dupeCheck chk;
01083 time_t timestamp;
01084 VestaSource::errorCode err;
01085
01086
01087 len = sizeof(longid.value);
01088 srpc->recv_bytes_here((char *) &longid.value, len);
01089 len = MAX_ARC_LEN+1;
01090 srpc->recv_chars_here(arc, len);
01091 len = sizeof(dlongid.value);
01092 srpc->recv_bytes_here((char *) &dlongid.value, len);
01093 master = (bool) srpc->recv_int();
01094 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01095 chk = (VestaSource::dupeCheck) srpc->recv_int();
01096 timestamp = (time_t) srpc->recv_int();
01097 srpc->recv_end();
01098
01099
01100 ReadersWritersLock* vlock = NULL;
01101 VestaSource* vs;
01102 ReadersWritersLock* lock;
01103 if (VolatileRootLongId.isAncestorOf(longid)) {
01104
01105
01106 vlock = &VolatileRootLock;
01107 vlock->acquireRead();
01108 RWLOCK_LOCKED_REASON(vlock, "SRPC:insertMutableDirectory (in volatile)");
01109 vs = longid.lookup(LongId::writeLockV, &lock);
01110 } else {
01111 vs = longid.lookup(LongId::writeLock, &lock);
01112 }
01113 VestaSource* newvs;
01114 if (vs == NULL) {
01115 err = VestaSource::invalidArgs;
01116 if (vlock != NULL) vlock->releaseRead();
01117 } else {
01118 RWLOCK_LOCKED_REASON(lock, "SRPC:insertMutableDirectory");
01119 VestaSource* dirvs = dlongid.lookup(LongId::checkLock, &lock);
01120 if (vlock != NULL) vlock->releaseRead();
01121 err = vs->insertMutableDirectory(arc, dirvs, master, who, chk,
01122 &newvs, timestamp);
01123 delete vs;
01124 if (dirvs != NULL) delete dirvs;
01125 }
01126 if (lock != NULL) lock->releaseWrite();
01127 delete who;
01128
01129
01130 srpc->send_int((int) err);
01131 if (err == VestaSource::ok) {
01132 srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01133 srpc->send_int((int) newvs->pseudoInode);
01134 newvs->fptag.Send(*srpc);
01135 delete newvs;
01136 }
01137 srpc->send_end();
01138 }
01139
01140 static void
01141 VestaSourceInsertGhost(SRPC* srpc, int intf_ver)
01142 {
01143 int len;
01144 LongId longid, rlongid;
01145 char arc[MAX_ARC_LEN+1];
01146 bool master;
01147 VestaSource::dupeCheck chk;
01148 time_t timestamp;
01149 VestaSource::errorCode err;
01150
01151
01152 len = sizeof(longid.value);
01153 srpc->recv_bytes_here((char *) &longid.value, len);
01154 len = MAX_ARC_LEN+1;
01155 srpc->recv_chars_here(arc, len);
01156 master = (bool) srpc->recv_int();
01157 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01158 chk = (VestaSource::dupeCheck) srpc->recv_int();
01159 timestamp = (time_t) srpc->recv_int();
01160 srpc->recv_end();
01161
01162
01163 ReadersWritersLock* lock;
01164 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01165 VestaSource* newvs;
01166 if (vs == NULL) {
01167 err = VestaSource::invalidArgs;
01168 } else {
01169 RWLOCK_LOCKED_REASON(lock, "SRPC:insertGhost");
01170 err = vs->insertGhost(arc, master, who, chk, &newvs, timestamp);
01171 delete vs;
01172 }
01173 if (lock != NULL) lock->releaseWrite();
01174 delete who;
01175
01176
01177 srpc->send_int((int) err);
01178 if (err == VestaSource::ok) {
01179 srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01180 srpc->send_int((int) newvs->pseudoInode);
01181 newvs->fptag.Send(*srpc);
01182 delete newvs;
01183 }
01184 srpc->send_end();
01185 }
01186
01187 static void
01188 VestaSourceInsertStub(SRPC* srpc, int intf_ver)
01189 {
01190 int len;
01191 LongId longid, rlongid;
01192 char arc[MAX_ARC_LEN+1];
01193 bool master;
01194 VestaSource::dupeCheck chk;
01195 time_t timestamp;
01196 VestaSource::errorCode err;
01197
01198
01199 len = sizeof(longid.value);
01200 srpc->recv_bytes_here((char *) &longid.value, len);
01201 len = MAX_ARC_LEN+1;
01202 srpc->recv_chars_here(arc, len);
01203 master = (bool) srpc->recv_int();
01204 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01205 chk = (VestaSource::dupeCheck) srpc->recv_int();
01206 timestamp = (time_t) srpc->recv_int();
01207 srpc->recv_end();
01208
01209
01210 ReadersWritersLock* lock;
01211 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01212 VestaSource* newvs;
01213 if (vs == NULL) {
01214 err = VestaSource::invalidArgs;
01215 } else {
01216 RWLOCK_LOCKED_REASON(lock, "SRPCinsertStub");
01217 err = vs->insertStub(arc, master, who, chk, &newvs, timestamp);
01218 delete vs;
01219 }
01220 if (lock != NULL) lock->releaseWrite();
01221 delete who;
01222
01223
01224 srpc->send_int((int) err);
01225 if (err == VestaSource::ok) {
01226 srpc->send_bytes((const char*) &newvs->longid.value, sizeof(LongId));
01227 srpc->send_int((int) newvs->pseudoInode);
01228 newvs->fptag.Send(*srpc);
01229 delete newvs;
01230 }
01231 srpc->send_end();
01232 }
01233
01234 static void
01235 VestaSourceRenameTo(SRPC* srpc, int intf_ver)
01236 {
01237 int len;
01238 LongId longid, flongid, rlongid;
01239 char arc[MAX_ARC_LEN+1], fromArc[MAX_ARC_LEN+1];
01240 VestaSource::dupeCheck chk;
01241 time_t timestamp;
01242 VestaSource::errorCode err;
01243
01244
01245 len = sizeof(longid.value);
01246 srpc->recv_bytes_here((char *) &longid.value, len);
01247 len = MAX_ARC_LEN+1;
01248 srpc->recv_chars_here(arc, len);
01249 len = sizeof(flongid.value);
01250 srpc->recv_bytes_here((char *) &flongid.value, len);
01251 len = MAX_ARC_LEN+1;
01252 srpc->recv_chars_here(fromArc, len);
01253 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01254 chk = (VestaSource::dupeCheck) srpc->recv_int();
01255 timestamp = (time_t) srpc->recv_int();
01256 srpc->recv_end();
01257
01258
01259 ReadersWritersLock* vlock = NULL;
01260 ReadersWritersLock* lock;
01261 VestaSource* vs;
01262 if (VolatileRootLongId.isAncestorOf(longid)) {
01263
01264
01265 vlock = &VolatileRootLock;
01266 vlock->acquireRead();
01267 RWLOCK_LOCKED_REASON(vlock, "SRPC:renameTo (in volatile)");
01268 vs = longid.lookup(LongId::writeLockV, &lock);
01269 } else {
01270 vs = longid.lookup(LongId::writeLock, &lock);
01271 }
01272 if(lock != 0)
01273 {
01274 RWLOCK_LOCKED_REASON(lock, "SRPC:renameTo");
01275 }
01276 VestaSource* fvs = flongid.lookup(LongId::checkLock, &lock);
01277 if (vlock != NULL) vlock->releaseRead();
01278 if (vs == NULL || fvs == NULL) {
01279 err = VestaSource::invalidArgs;
01280 } else {
01281 err = vs->renameTo(arc, fvs, fromArc, who, chk, timestamp);
01282 }
01283 delete who;
01284 if (vs != NULL) delete vs;
01285 if (fvs != NULL) delete fvs;
01286 if (lock != NULL) lock->releaseWrite();
01287
01288
01289 srpc->send_int((int) err);
01290 srpc->send_end();
01291 }
01292
01293
01294 static void
01295 VestaSourceMakeMutable(SRPC* srpc, int intf_ver)
01296 {
01297 int len;
01298 LongId longid;
01299 ShortId sid;
01300 VestaSource::errorCode err;
01301 AccessControl::Identity who;
01302 Basics::uint64 copyMax;
01303
01304
01305 len = sizeof(longid.value);
01306 srpc->recv_bytes_here((char *) &longid.value, len);
01307 sid = (ShortId) srpc->recv_int();
01308 if (intf_ver <= 8) {
01309 copyMax = (Basics::uint64) -1;
01310 who = NULL;
01311 } else {
01312 copyMax = (unsigned int) srpc->recv_int();
01313 copyMax += ((Basics::uint64) srpc->recv_int()) << 32;
01314 who = srpc_recv_identity(srpc, intf_ver);
01315 }
01316 srpc->recv_end();
01317
01318
01319 ReadersWritersLock* lock;
01320 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01321 try {
01322 VestaSource* mvs;
01323 if (vs == NULL) {
01324 err = VestaSource::invalidArgs;
01325 } else {
01326 RWLOCK_LOCKED_REASON(lock, "SRPC:makeMutable");
01327 err = vs->makeMutable(mvs, sid, copyMax, who);
01328 delete vs;
01329 }
01330
01331 srpc->send_int((int) err);
01332 if (err == VestaSource::ok) {
01333 srpc->send_int((int) mvs->type);
01334 srpc->send_bytes((const char *) &mvs->longid, sizeof(mvs->longid));
01335 srpc->send_int((int) mvs->master);
01336 srpc->send_int((int) mvs->pseudoInode);
01337 srpc->send_int((int) mvs->shortId());
01338 srpc->send_int((int) mvs->timestamp());
01339 srpc->send_int((int) mvs->hasAttribs());
01340 vs->fptag.Send(*srpc);
01341 delete mvs;
01342 }
01343 } catch (SRPC::failure f) {
01344 if (lock != NULL) lock->releaseWrite();
01345 delete who;
01346 throw;
01347 }
01348 if (lock != NULL) lock->releaseWrite();
01349 delete who;
01350 srpc->send_end();
01351 }
01352
01353
01354 static void
01355 VestaSourceInAttribs(SRPC* srpc, int intf_ver)
01356 {
01357 LongId longid;
01358 int len;
01359 char* name;
01360 char* value;
01361 bool retval;
01362
01363
01364 len = sizeof(longid.value);
01365 srpc->recv_bytes_here((char *) &longid.value, len);
01366 name = srpc->recv_chars();
01367 value = srpc->recv_chars();
01368 srpc->recv_end();
01369
01370
01371 ReadersWritersLock* lock;
01372 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01373 if (vs == NULL) {
01374 retval = false;
01375 } else {
01376 RWLOCK_LOCKED_REASON(lock, "SRPC:inAttribs");
01377 retval = vs->inAttribs(name, value);
01378 delete vs;
01379 }
01380 if (lock != NULL) lock->releaseRead();
01381
01382
01383 delete [] name;
01384 delete [] value;
01385 srpc->send_int((int) retval);
01386 srpc->send_end();
01387 }
01388
01389 static void
01390 VestaSourceGetAttrib(SRPC* srpc, int intf_ver)
01391 {
01392 LongId longid;
01393 int len;
01394 char* name;
01395 const char* value;
01396 bool retval;
01397
01398
01399 len = sizeof(longid.value);
01400 srpc->recv_bytes_here((char *) &longid.value, len);
01401 name = srpc->recv_chars();
01402 srpc->recv_end();
01403
01404
01405 ReadersWritersLock* lock;
01406 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01407 if (vs == NULL) {
01408 value = NULL;
01409 } else {
01410 RWLOCK_LOCKED_REASON(lock, "SRPC:getAttribConst");
01411 value = vs->getAttribConst(name);
01412 delete vs;
01413 }
01414
01415
01416
01417 try {
01418 if (value == NULL) {
01419 srpc->send_int((int) true);
01420 srpc->send_chars("");
01421 } else {
01422 srpc->send_int((int) false);
01423 srpc->send_chars(value);
01424 }
01425 } catch (SRPC::failure f) {
01426 if (lock != NULL) lock->releaseRead();
01427 delete [] name;
01428 throw;
01429 }
01430 if (lock != NULL) lock->releaseRead();
01431 delete [] name;
01432 srpc->send_end();
01433 }
01434
01435 static bool
01436 valueCallback(void* cl, const char* value)
01437 {
01438 SRPC* srpc = (SRPC*) cl;
01439 srpc->send_chars(value);
01440 return true;
01441 }
01442
01443 static void
01444 VestaSourceGetAttrib2(SRPC* srpc, int intf_ver)
01445 {
01446 LongId longid;
01447 int len;
01448 char* name;
01449 bool retval;
01450
01451
01452 len = sizeof(longid.value);
01453 srpc->recv_bytes_here((char *) &longid.value, len);
01454 name = srpc->recv_chars();
01455 srpc->recv_end();
01456
01457
01458 srpc->send_seq_start();
01459 ReadersWritersLock* lock;
01460 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01461 if (vs != NULL) {
01462 RWLOCK_LOCKED_REASON(lock, "SRPC:getAttrib");
01463 try {
01464 vs->getAttrib(name, valueCallback, (void*) srpc);
01465 } catch (SRPC::failure f) {
01466 delete vs;
01467 if (lock != NULL) lock->releaseRead();
01468 delete [] name;
01469 throw;
01470 }
01471 delete vs;
01472 }
01473 if (lock != NULL) lock->releaseRead();
01474 delete [] name;
01475 srpc->send_seq_end();
01476 srpc->send_end();
01477 }
01478
01479 static void
01480 VestaSourceListAttribs(SRPC* srpc, int intf_ver)
01481 {
01482 LongId longid;
01483 int len;
01484 bool retval;
01485
01486
01487 len = sizeof(longid.value);
01488 srpc->recv_bytes_here((char *) &longid.value, len);
01489 srpc->recv_end();
01490
01491
01492 srpc->send_seq_start();
01493 ReadersWritersLock* lock;
01494 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01495 if (vs != NULL) {
01496 RWLOCK_LOCKED_REASON(lock, "SRPC:listAttribs");
01497 try {
01498 vs->listAttribs(valueCallback, (void*) srpc);
01499 } catch (SRPC::failure f) {
01500 delete vs;
01501 if (lock != NULL) lock->releaseRead();
01502 throw;
01503 }
01504 delete vs;
01505 }
01506 if (lock != NULL) lock->releaseRead();
01507 srpc->send_seq_end();
01508 srpc->send_end();
01509 }
01510
01511 static bool
01512 historyCallback(void* cl, VestaSource::attribOp op, const char* name,
01513 const char* value, time_t timestamp)
01514 {
01515 SRPC* srpc = (SRPC*) cl;
01516 srpc->send_int((int) op);
01517 srpc->send_chars(name);
01518 srpc->send_chars(value);
01519 srpc->send_int((int) timestamp);
01520 return true;
01521 }
01522
01523 static void
01524 VestaSourceGetAttribHistory(SRPC* srpc, int intf_ver)
01525 {
01526 LongId longid;
01527 int len;
01528 bool retval;
01529
01530
01531 len = sizeof(longid.value);
01532 srpc->recv_bytes_here((char *) &longid.value, len);
01533 srpc->recv_end();
01534
01535
01536 srpc->send_seq_start();
01537 ReadersWritersLock* lock;
01538 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01539 if (vs != NULL) {
01540 RWLOCK_LOCKED_REASON(lock, "SRPC:getAttribHistory");
01541 try {
01542 vs->getAttribHistory(historyCallback, (void*) srpc);
01543 } catch (SRPC::failure f) {
01544 delete vs;
01545 if (lock != NULL) lock->releaseRead();
01546 throw;
01547 }
01548 delete vs;
01549 }
01550 if (lock != NULL) lock->releaseRead();
01551 srpc->send_seq_end();
01552 srpc->send_end();
01553 }
01554
01555 static void
01556 VestaSourceWriteAttrib(SRPC* srpc, int intf_ver)
01557 {
01558 LongId longid;
01559 int len;
01560 VestaSource::attribOp op;
01561 char* name;
01562 char* value;
01563 time_t timestamp;
01564 VestaSource::errorCode err = VestaSource::ok;
01565
01566
01567 len = sizeof(longid.value);
01568 srpc->recv_bytes_here((char *) &longid.value, len);
01569 op = (VestaSource::attribOp) srpc->recv_int();
01570 name = srpc->recv_chars();
01571 value = srpc->recv_chars();
01572 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
01573 timestamp = (time_t) srpc->recv_int();
01574 srpc->recv_end();
01575
01576
01577 ReadersWritersLock* lock;
01578 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01579 if (vs != NULL) {
01580 RWLOCK_LOCKED_REASON(lock, "SRPC:writeAttrib");
01581 if(!vs->hasAttribs() && MutableRootLongId.isAncestorOf(vs->longid))
01582 {
01583 VestaSource *new_vs = 0;
01584 err = vs->copyToMutable(new_vs);
01585 delete vs;
01586 vs = new_vs;
01587 }
01588 if(err == VestaSource::ok)
01589 {
01590 err = vs->writeAttrib(op, name, value, who, timestamp);
01591 delete vs;
01592 }
01593 } else {
01594 err = VestaSource::invalidArgs;
01595 }
01596 if (lock != NULL) lock->releaseWrite();
01597 delete who;
01598
01599
01600 delete [] name;
01601 delete [] value;
01602 srpc->send_int((int) err);
01603 srpc->send_end();
01604 }
01605
01606 void
01607 MakeFilesImmutable(SRPC* srpc, int intf_ver)
01608 {
01609 LongId longid;
01610 int len;
01611 unsigned int threshold;
01612 VestaSource::errorCode err;
01613 AccessControl::Identity who = NULL;
01614
01615 try {
01616
01617 len = sizeof(longid.value);
01618 srpc->recv_bytes_here((char *) &longid.value, len);
01619 threshold = srpc->recv_int();
01620 if (intf_ver > 8) {
01621 who = srpc_recv_identity(srpc, intf_ver);
01622 }
01623 srpc->recv_end();
01624
01625
01626 ReadersWritersLock* lock = NULL;
01627 VestaSource *vs = longid.lookup(LongId::writeLock, &lock);
01628 if (vs == NULL) {
01629 err = VestaSource::invalidArgs;
01630 } else {
01631 RWLOCK_LOCKED_REASON(lock, "SRPC:makeFilesImmutable");
01632 err = vs->makeFilesImmutable(threshold, who);
01633 delete vs;
01634 }
01635 if (lock != NULL) lock->releaseWrite();
01636
01637
01638 srpc->send_int((int) err);
01639 srpc->send_end();
01640 } catch (...) {
01641 if (who) delete who;
01642 throw;
01643 }
01644 if (who) delete who;
01645 }
01646
01647 void
01648 SetIndexMaster(SRPC* srpc, int intf_ver)
01649 {
01650 LongId longid;
01651 int len;
01652 unsigned int index;
01653 bool state;
01654 VestaSource::errorCode err;
01655 AccessControl::Identity who = NULL;
01656
01657 try {
01658
01659 len = sizeof(longid.value);
01660 srpc->recv_bytes_here((char *) &longid.value, len);
01661 index = (unsigned int) srpc->recv_int();
01662 state = (bool) srpc->recv_int();
01663 if (intf_ver <= 8) {
01664 who = NULL;
01665 } else {
01666 who = srpc_recv_identity(srpc, intf_ver);
01667 }
01668 srpc->recv_end();
01669
01670
01671 ReadersWritersLock* lock = NULL;
01672 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01673 if (vs == NULL) {
01674 err = VestaSource::invalidArgs;
01675 } else {
01676 RWLOCK_LOCKED_REASON(lock, "SRPC:setIndexMaster");
01677 err = vs->setIndexMaster(index, state, who);
01678 delete vs;
01679 }
01680 if (lock != NULL) lock->releaseWrite();
01681
01682
01683 srpc->send_int((int) err);
01684 srpc->send_end();
01685 } catch (...) {
01686 if (who) delete who;
01687 throw;
01688 }
01689 if (who) delete who;
01690 }
01691
01692
01693 static void
01694 VestaSourceAcquireMastership(SRPC* srpc, int intf_ver)
01695 {
01696 const char* pathname = NULL;
01697 char srcHost[MAX_ARC_LEN+1];
01698 char srcPort[MAX_ARC_LEN+1];
01699 VestaSource::errorCode err;
01700 char pathnameSep;
01701 AccessControl::Identity dwho = NULL;
01702 AccessControl::Identity swho = NULL;
01703
01704
01705 try {
01706 int len;
01707 pathname = srpc->recv_chars();
01708 len = MAX_ARC_LEN+1;
01709 srpc->recv_chars_here(srcHost, len);
01710 len = MAX_ARC_LEN+1;
01711 srpc->recv_chars_here(srcPort, len);
01712 pathnameSep = (char) srpc->recv_int();
01713 dwho = srpc_recv_identity(srpc, intf_ver);
01714 swho = srpc_recv_identity(srpc, intf_ver);
01715 srpc->recv_end();
01716
01717
01718 err = AcquireMastership(pathname, srcHost, srcPort, pathnameSep,
01719 dwho, swho);
01720
01721
01722 srpc->send_int((int) err);
01723 srpc->send_end();
01724
01725 } catch (SRPC::failure f) {
01726 if (dwho) delete dwho;
01727 if (swho) delete swho;
01728 if (pathname) delete[] pathname;
01729 throw;
01730 }
01731 delete dwho;
01732 delete swho;
01733 delete[] pathname;
01734 }
01735
01736
01737 static void
01738 VestaSourceCedeMastership(SRPC* srpc, int intf_ver)
01739 {
01740 LongId longid;
01741 char requestid[MAX_ARC_LEN+1];
01742 const char* grantid = NULL;
01743 VestaSource::errorCode err;
01744 VestaSource *vs = NULL;
01745 AccessControl::Identity who = NULL;
01746
01747
01748 try {
01749 int len;
01750 len = sizeof(longid.value);
01751 srpc->recv_bytes_here((char *) &longid.value, len);
01752 len = MAX_ARC_LEN+1;
01753 srpc->recv_chars_here(requestid, len);
01754 who = srpc_recv_identity(srpc, intf_ver);
01755 srpc->recv_end();
01756
01757
01758 ReadersWritersLock* lock = NULL;
01759 vs = longid.lookup(LongId::writeLock, &lock);
01760 if (vs == NULL) {
01761 err = VestaSource::invalidArgs;
01762 } else {
01763 RWLOCK_LOCKED_REASON(lock, "SRPC:cedeMastership");
01764 err = vs->cedeMastership(requestid, &grantid, who);
01765 }
01766 if (lock != NULL) lock->releaseWrite();
01767
01768
01769 srpc->send_int((int) err);
01770 if (err == VestaSource::ok) {
01771 srpc->send_chars(grantid);
01772 }
01773 srpc->send_end();
01774
01775 } catch (SRPC::failure f) {
01776 if (who) delete who;
01777 if (vs) delete vs;
01778 if (grantid) delete[] grantid;
01779 throw;
01780 }
01781 if (who) delete who;
01782 if (vs) delete vs;
01783 if (grantid) delete[] grantid;
01784 }
01785
01786
01787 static void
01788 VestaSourceReplicate(SRPC* srpc, int intf_ver)
01789 {
01790 char* pathname = NULL;
01791 bool asStub, asGhost;
01792 char srcHost[MAX_ARC_LEN+1];
01793 char srcPort[MAX_ARC_LEN+1];
01794 VestaSource::errorCode err;
01795 char pathnameSep;
01796 AccessControl::Identity dwho = NULL;
01797 AccessControl::Identity swho = NULL;
01798
01799
01800 try {
01801 int len;
01802 pathname = srpc->recv_chars();
01803 asStub = (bool) srpc->recv_int();
01804 asGhost = (bool) srpc->recv_int();
01805 len = MAX_ARC_LEN+1;
01806 srpc->recv_chars_here(srcHost, len);
01807 len = MAX_ARC_LEN+1;
01808 srpc->recv_chars_here(srcPort, len);
01809 pathnameSep = (char) srpc->recv_int();
01810 dwho = srpc_recv_identity(srpc, intf_ver);
01811 swho = srpc_recv_identity(srpc, intf_ver);
01812 srpc->recv_end();
01813
01814
01815 err = Replicate(pathname, asStub, asGhost, srcHost, srcPort, pathnameSep,
01816 dwho, swho);
01817
01818
01819 srpc->send_int((int) err);
01820 srpc->send_end();
01821
01822 } catch (SRPC::failure f) {
01823 if (dwho) delete dwho;
01824 if (swho) delete swho;
01825 if (pathname) delete[] pathname;
01826 throw;
01827 }
01828 delete dwho;
01829 delete swho;
01830 delete[] pathname;
01831 }
01832
01833
01834 static void
01835 VestaSourceReplicateAttribs(SRPC* srpc, int intf_ver)
01836 {
01837 char* pathname = NULL;
01838 bool includeAccess;
01839 char srcHost[MAX_ARC_LEN+1];
01840 char srcPort[MAX_ARC_LEN+1];
01841 VestaSource::errorCode err;
01842 char pathnameSep;
01843 AccessControl::Identity dwho = NULL;
01844 AccessControl::Identity swho = NULL;
01845
01846
01847 try {
01848 int len;
01849 pathname = srpc->recv_chars();
01850 includeAccess = (bool) srpc->recv_int();
01851 len = MAX_ARC_LEN+1;
01852 srpc->recv_chars_here(srcHost, len);
01853 len = MAX_ARC_LEN+1;
01854 srpc->recv_chars_here(srcPort, len);
01855 pathnameSep = (char) srpc->recv_int();
01856 dwho = srpc_recv_identity(srpc, intf_ver);
01857 swho = srpc_recv_identity(srpc, intf_ver);
01858 srpc->recv_end();
01859
01860
01861 err = ReplicateAttribs(pathname, includeAccess, srcHost, srcPort,
01862 pathnameSep, dwho, swho);
01863
01864
01865 srpc->send_int((int) err);
01866 srpc->send_end();
01867
01868 } catch (SRPC::failure f) {
01869 if (dwho) delete dwho;
01870 if (swho) delete swho;
01871 if (pathname) delete[] pathname;
01872 throw;
01873 }
01874 delete dwho;
01875 delete swho;
01876 delete[] pathname;
01877 }
01878
01879
01880 void
01881 VSStat(SRPC* srpc, int intf_ver)
01882 {
01883 LongId longid;
01884 int len;
01885 time_t ts = -1;
01886 bool x = false;
01887 Basics::uint64 s = 0;
01888 VestaSource::errorCode err = VestaSource::invalidArgs;
01889
01890
01891 len = sizeof(longid.value);
01892 srpc->recv_bytes_here((char *) &longid.value, len);
01893 srpc->recv_end();
01894
01895
01896 ReadersWritersLock* lock = NULL;
01897 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01898 if (vs != NULL) {
01899 RWLOCK_LOCKED_REASON(lock, "SRPC:stat");
01900 err = VestaSource::ok;
01901 ts = vs->timestamp();
01902 x = vs->executable();
01903 s = vs->size();
01904 }
01905
01906 if(lock != NULL) lock->releaseRead();
01907
01908
01909
01910 if(vs != NULL) delete vs;
01911
01912
01913 srpc->send_int((int) err);
01914 srpc->send_int((int) ts);
01915 srpc->send_int((int) x);
01916 srpc->send_int((int) (s & 0xffffffff));
01917 srpc->send_int((int) (s >> 32ul));
01918 srpc->send_end();
01919 }
01920
01921 void
01922 VSRead(SRPC* srpc, int intf_ver)
01923 {
01924 LongId longid;
01925 int len;
01926 int nbytes;
01927 Basics::uint64 offset;
01928 void* buffer = NULL;
01929 VestaSource::errorCode err;
01930 AccessControl::Identity who = NULL;
01931
01932 try {
01933
01934 len = sizeof(longid.value);
01935 srpc->recv_bytes_here((char *) &longid.value, len);
01936 nbytes = srpc->recv_int();
01937 offset = (Basics::uint64) srpc->recv_int();
01938 offset += ((Basics::uint64) srpc->recv_int()) << 32ul;
01939 who = srpc_recv_identity(srpc, intf_ver);
01940 srpc->recv_end();
01941
01942
01943 ReadersWritersLock* lock = NULL;
01944 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
01945 if (vs == NULL) {
01946 err = VestaSource::invalidArgs;
01947 } else {
01948 RWLOCK_LOCKED_REASON(lock, "SRPC:read");
01949 buffer = (void*) malloc(nbytes);
01950 err = vs->read(buffer, &nbytes, offset, who);
01951 }
01952
01953 if (lock != NULL) lock->releaseRead();
01954
01955 if(vs) delete vs;
01956
01957
01958 srpc->send_int((int) err);
01959 if (err == VestaSource::ok) {
01960 srpc->send_bytes((char*)buffer, nbytes);
01961 }
01962 srpc->send_end();
01963 } catch (SRPC::failure f) {
01964 if (who) delete who;
01965 if (buffer) free(buffer);
01966 throw;
01967 }
01968 if (who) delete who;
01969 if (buffer) free(buffer);
01970 }
01971
01972 void
01973 VSWrite(SRPC* srpc, int intf_ver)
01974 {
01975 LongId longid;
01976 int len;
01977 int nbytes;
01978 Basics::uint64 offset;
01979 char* buffer = NULL;
01980 VestaSource::errorCode err;
01981 AccessControl::Identity who = NULL;
01982
01983 try {
01984
01985 len = sizeof(longid.value);
01986 srpc->recv_bytes_here((char *) &longid.value, len);
01987 offset = (Basics::uint64) srpc->recv_int();
01988 offset += ((Basics::uint64) srpc->recv_int()) << 32ul;
01989 buffer = srpc->recv_bytes(nbytes);
01990 who = srpc_recv_identity(srpc, intf_ver);
01991 srpc->recv_end();
01992
01993
01994 ReadersWritersLock* lock = NULL;
01995 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
01996 if (vs == NULL) {
01997 err = VestaSource::invalidArgs;
01998 } else {
01999 RWLOCK_LOCKED_REASON(lock, "SRPC:write");
02000 err = vs->write((void*)buffer, &nbytes, offset, who);
02001 }
02002
02003 if (lock != NULL) lock->releaseWrite();
02004
02005 delete[] buffer;
02006 if(vs) delete vs;
02007
02008
02009 srpc->send_int((int) err);
02010 if (err == VestaSource::ok) {
02011 srpc->send_int((int) nbytes);
02012 }
02013 srpc->send_end();
02014 } catch (...) {
02015 if (who) delete who;
02016 throw;
02017 }
02018 if (who) delete who;
02019 }
02020
02021 void
02022 VSSetExecutable(SRPC* srpc, int intf_ver)
02023 {
02024 LongId longid;
02025 int len;
02026 bool x;
02027 VestaSource::errorCode err;
02028 AccessControl::Identity who = NULL;
02029
02030 try {
02031
02032 len = sizeof(longid.value);
02033 srpc->recv_bytes_here((char *) &longid.value, len);
02034 x = (bool) srpc->recv_int();
02035 who = srpc_recv_identity(srpc, intf_ver);
02036 srpc->recv_end();
02037
02038
02039 ReadersWritersLock* lock = NULL;
02040 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02041 if (vs == NULL) {
02042 err = VestaSource::invalidArgs;
02043 } else {
02044 RWLOCK_LOCKED_REASON(lock, "SRPC:setExecutable");
02045 err = vs->setExecutable(x, who);
02046 }
02047
02048 if (lock != NULL) lock->releaseWrite();
02049 if(vs) delete vs;
02050
02051
02052 srpc->send_int((int) err);
02053 srpc->send_end();
02054 } catch (...) {
02055 if (who) delete who;
02056 throw;
02057 }
02058 if (who) delete who;
02059 }
02060
02061 void
02062 VSSetSize(SRPC* srpc, int intf_ver)
02063 {
02064 LongId longid;
02065 int len;
02066 Basics::uint64 s;
02067 VestaSource::errorCode err;
02068 AccessControl::Identity who = NULL;
02069
02070 try {
02071
02072 len = sizeof(longid.value);
02073 srpc->recv_bytes_here((char *) &longid.value, len);
02074 s = (Basics::uint64) srpc->recv_int();
02075 s += ((Basics::uint64) srpc->recv_int()) << 32ul;
02076 who = srpc_recv_identity(srpc, intf_ver);
02077
02078 srpc->recv_end();
02079
02080
02081 ReadersWritersLock* lock = NULL;
02082 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02083 if (vs == NULL) {
02084 err = VestaSource::invalidArgs;
02085 } else {
02086 RWLOCK_LOCKED_REASON(lock, "SRPC:setSize");
02087 err = vs->setSize(s, who);
02088 }
02089
02090 if (lock != NULL) lock->releaseWrite();
02091 if(vs) delete vs;
02092
02093
02094 srpc->send_int((int) err);
02095 srpc->send_end();
02096 } catch (...) {
02097 if (who) delete who;
02098 throw;
02099 }
02100 if (who) delete who;
02101 }
02102
02103 void
02104 VSSetTimestamp(SRPC* srpc, int intf_ver)
02105 {
02106 LongId longid;
02107 int len;
02108 time_t ts;
02109 VestaSource::errorCode err;
02110 AccessControl::Identity who = NULL;
02111
02112 try {
02113
02114 len = sizeof(longid.value);
02115 srpc->recv_bytes_here((char *) &longid.value, len);
02116 ts = (time_t) srpc->recv_int();
02117 who = srpc_recv_identity(srpc, intf_ver);
02118 srpc->recv_end();
02119
02120
02121 ReadersWritersLock* lock = NULL;
02122 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02123 if (vs == NULL) {
02124 err = VestaSource::invalidArgs;
02125 } else {
02126 RWLOCK_LOCKED_REASON(lock, "SRPC:setTimestamp");
02127 err = vs->setTimestamp(ts, who);
02128 }
02129
02130 if (lock != NULL) lock->releaseWrite();
02131 if(vs) delete vs;
02132
02133
02134 srpc->send_int((int) err);
02135 srpc->send_end();
02136 } catch (...) {
02137 if (who) delete who;
02138 throw;
02139 }
02140 if (who) delete who;
02141 }
02142
02143 static void
02144 VestaSourceGetUserInfo(SRPC* srpc, int intf_ver)
02145 {
02146
02147
02148 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
02149 AccessControl::Identity subject = srpc_recv_identity(srpc, intf_ver, false);
02150 srpc->recv_end();
02151
02152 AccessControl::IdInfo result;
02153
02154 unsigned int i = 0;
02155 const char *val = 0;
02156
02157
02158
02159 if((*who != *subject) &&
02160 !VestaSource::repositoryRoot()->ac.check(who,
02161 AccessControl::administrative))
02162 {
02163
02164 srpc->send_failure(SRPC::invalid_parameter,
02165 "administrator access required to get information about other users");
02166 }
02167
02168
02169 while((val = subject->user(i)) != 0)
02170 {
02171 result.names.append(val);
02172 i++;
02173 }
02174
02175
02176 i = 0;
02177 while((val = subject->group(i)) != 0)
02178 {
02179 result.groups.append(val);
02180 i++;
02181 }
02182
02183
02184
02185 result.unix_uid = subject->toUnixUser();
02186 result.unix_gid = subject->toUnixGroup();
02187
02188
02189 result.is_admin =
02190 VestaSource::repositoryRoot()->ac.check(subject,
02191 AccessControl::administrative);
02192
02193 result.is_wizard =
02194 VestaSource::repositoryRoot()->ac.check(subject,
02195 AccessControl::agreement);
02196
02197
02198 result.is_root = subject->userMatch(AccessControl::rootUser);
02199
02200 result.is_runtool = subject->userMatch(AccessControl::runtoolUser);
02201
02202
02203 srpc->send_chars_seq(result.names);
02204 srpc->send_chars_seq(result.groups);
02205
02206
02207 srpc->send_int32(result.unix_uid);
02208 srpc->send_int32(result.unix_gid);
02209
02210
02211 srpc->send_int16((result.is_root ? 1 : 0) |
02212 ((result.is_admin ? 1 : 0) << 1) |
02213 ((result.is_wizard ? 1 : 0) << 2) |
02214 ((result.is_runtool ? 1 : 0) << 3));
02215
02216 srpc->send_end();
02217 }
02218
02219 static void
02220 VestaSourceRefreshAccessTables(SRPC* srpc, int intf_ver)
02221 {
02222
02223
02224 AccessControl::Identity who = srpc_recv_identity(srpc, intf_ver);
02225 srpc->recv_end();
02226
02227
02228 if(!VestaSource::repositoryRoot()->ac.check(who,
02229 AccessControl::administrative))
02230 {
02231
02232 srpc->send_failure(SRPC::invalid_parameter,
02233 "administrator access required");
02234 }
02235
02236 try
02237 {
02238 AccessControl::refreshAccessTables();
02239 }
02240 catch(AccessControl::ParseError f)
02241 {
02242 Text message = ("error parsing " + f.fkind + " " + f.fname +
02243 ": " + f.message);
02244
02245
02246
02247 srpc->send_failure(SRPC::invalid_parameter,
02248 message.cchars());
02249 }
02250
02251 srpc->send_end();
02252 }
02253
02254 static void
02255 VestaSourceGetStats(SRPC* srpc, int intf_ver)
02256 {
02257
02258
02259 AccessControl::Identity who = 0;
02260 Basics::int16 *requested_stats = 0;
02261
02262 try
02263 {
02264 who = srpc_recv_identity(srpc, intf_ver);
02265
02266 int requested_stats_count = 0;
02267 requested_stats = srpc->recv_int16_array(requested_stats_count);
02268
02269 srpc->recv_end();
02270
02271 srpc->send_seq_start();
02272 for(int i = 0; i < requested_stats_count; i++)
02273 {
02274 switch((ReposStats::StatKind) requested_stats[i])
02275 {
02276 case ReposStats::fdCache:
02277 srpc->send_int16(requested_stats[i]);
02278 {
02279 ReposStats::FdCacheStats fdCacheStats;
02280 FdCache::getStats(fdCacheStats.n_in_cache,
02281 fdCacheStats.hits,
02282 fdCacheStats.open_misses,
02283 fdCacheStats.try_misses,
02284 fdCacheStats.evictions,
02285 fdCacheStats.expirations);
02286
02287
02288 fdCacheStats.send(srpc);
02289 }
02290 break;
02291 case ReposStats::dupeTotal:
02292 srpc->send_int16(requested_stats[i]);
02293 {
02294
02295 ReposStats::DupeStats dupeStats;
02296 dupeStats = get_dupe_stats();
02297
02298
02299 dupeStats.send(srpc);
02300 }
02301 break;
02302 case ReposStats::srpcTotal:
02303 srpc->send_int16(requested_stats[i]);
02304 {
02305
02306 ReposStats::TimedCalls srpcStats;
02307 SRPC_Call_Stats::getStats(srpcStats.call_count,
02308 srpcStats.elapsed_secs,
02309 srpcStats.elapsed_usecs);
02310
02311
02312 srpcStats.send(srpc);
02313 }
02314 break;
02315 case ReposStats::nfsTotal:
02316 srpc->send_int16(requested_stats[i]);
02317 {
02318
02319 ReposStats::TimedCalls nfsStats;
02320 NFS_Call_Stats::getStats(nfsStats.call_count,
02321 nfsStats.elapsed_secs,
02322 nfsStats.elapsed_usecs);
02323
02324
02325 nfsStats.send(srpc);
02326 }
02327 break;
02328 case ReposStats::memUsage:
02329 srpc->send_int16(requested_stats[i]);
02330 {
02331
02332 unsigned long total, resident;
02333 OS::GetProcessSize(total, resident);
02334
02335
02336 ReposStats::MemStats memStats(total, resident);
02337 memStats.send(srpc);
02338 }
02339 break;
02340
02341
02342
02343 }
02344 }
02345 srpc->send_seq_end();
02346
02347 srpc->send_end();
02348 }
02349 catch (...)
02350 {
02351 if(who) delete who;
02352 if(requested_stats) delete [] requested_stats;
02353 throw;
02354 }
02355 if (who) delete who;
02356 if(requested_stats) delete [] requested_stats;
02357 }
02358
02359 static void
02360 VestaSourceMeasureDirectory(SRPC* srpc, int intf_ver)
02361 {
02362 LongId longid;
02363 int len;
02364 VestaSource::errorCode err;
02365 AccessControl::Identity who = NULL;
02366
02367 try {
02368
02369 len = sizeof(longid.value);
02370 srpc->recv_bytes_here((char *) &longid.value, len);
02371 who = srpc_recv_identity(srpc, intf_ver);
02372 srpc->recv_end();
02373
02374
02375 VestaSource::directoryStats stats;
02376 ReadersWritersLock* lock = NULL;
02377 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
02378 if (vs == NULL) {
02379 err = VestaSource::invalidArgs;
02380 } else {
02381 RWLOCK_LOCKED_REASON(lock, "SRPC:MeasureDirectory");
02382 err = vs->measureDirectory(stats, who);
02383 }
02384
02385 if (lock != NULL) lock->releaseRead();
02386
02387 if(vs) delete vs;
02388
02389
02390 srpc->send_int((int) err);
02391 if(err == VestaSource::ok)
02392 {
02393 srpc->send_int32(stats.baseChainLength);
02394 srpc->send_int32(stats.usedEntryCount);
02395 srpc->send_int32(stats.usedEntrySize);
02396 srpc->send_int32(stats.totalEntryCount);
02397 srpc->send_int32(stats.totalEntrySize);
02398 }
02399 srpc->send_end();
02400 } catch (...) {
02401 if (who) delete who;
02402 throw;
02403 }
02404 if (who) delete who;
02405 }
02406
02407 static void
02408 VestaSourceCollapseBase(SRPC* srpc, int intf_ver)
02409 {
02410 LongId longid;
02411 int len;
02412 VestaSource::errorCode err;
02413 AccessControl::Identity who = NULL;
02414
02415 try {
02416
02417 len = sizeof(longid.value);
02418 srpc->recv_bytes_here((char *) &longid.value, len);
02419 who = srpc_recv_identity(srpc, intf_ver);
02420 srpc->recv_end();
02421
02422
02423 ReadersWritersLock* lock = NULL;
02424 VestaSource* vs = longid.lookup(LongId::writeLock, &lock);
02425 if (vs == NULL) {
02426 err = VestaSource::invalidArgs;
02427 } else {
02428 RWLOCK_LOCKED_REASON(lock, "SRPC:CollapseBase");
02429 err = vs->collapseBase(who);
02430 }
02431
02432 if (lock != NULL) lock->releaseWrite();
02433
02434 if(vs) delete vs;
02435
02436
02437 srpc->send_int((int) err);
02438 srpc->send_end();
02439 } catch (...) {
02440 if (who) delete who;
02441 throw;
02442 }
02443 if (who) delete who;
02444 }
02445
02446 static void
02447 SetPerfDebug(SRPC* srpc, int intf_ver)
02448 {
02449 AccessControl::Identity who = 0;
02450 try
02451 {
02452
02453 who = srpc_recv_identity(srpc, intf_ver);
02454 Basics::uint64 settings = srpc->recv_int64();
02455 srpc->recv_end();
02456
02457 if(!VestaSource::repositoryRoot()->ac.check(who,
02458 AccessControl::administrative))
02459 {
02460
02461 srpc->send_failure(SRPC::invalid_parameter,
02462 "administrator access required");
02463 }
02464
02465 Basics::uint64 result = 0;
02466 #if defined(REPOS_PERF_DEBUG)
02467 timing_control(settings & PerfDebug::nfsCallTiming);
02468 result |= (settings & PerfDebug::nfsCallTiming);
02469
02470 rwlock_timing_control(settings & PerfDebug::centralLockTiming);
02471 result |= (settings & PerfDebug::centralLockTiming);
02472 #endif
02473 srpc->send_int64(result);
02474 srpc->send_end();
02475 }
02476 catch (...)
02477 {
02478 if (who) delete who;
02479 throw;
02480 }
02481 if (who) delete who;
02482 }
02483
02484
02485 extern const char *Version;
02486
02487
02488 extern time_t serverStartTime;
02489
02490 static void
02491 GetServerInfo(SRPC* srpc, int intf_ver)
02492 {
02493 AccessControl::Identity who = 0;
02494 try
02495 {
02496
02497 who = srpc_recv_identity(srpc, intf_ver);
02498 srpc->recv_end();
02499
02500 srpc->send_chars(Version);
02501 srpc->send_int64(serverStartTime);
02502 srpc->send_int32(time((time_t *) 0) - serverStartTime);
02503
02504 srpc->send_end();
02505 }
02506 catch (...)
02507 {
02508 if (who) delete who;
02509 throw;
02510 }
02511 if (who) delete who;
02512 }
02513
02514
02515
02516 static int readWhole_raw_bufsiz = (128*1024);
02517 static int readWhole_deflate_bufsiz = (64*1024);
02518 static int readWhole_deflate_level = -1;
02519
02520 static void
02521 VSReadWholeCompressed(SRPC* srpc, int intf_ver)
02522 {
02523
02524 LongId longid;
02525 int len;
02526 AccessControl::Identity who = 0;
02527 int maxbytes;
02528
02529
02530 VestaSource::errorCode err;
02531
02532
02533 ShortId filesid;
02534 int fd = -1;
02535 Basics::uint64 bytes_left;
02536 off_t bytes_read = 0;
02537
02538
02539 Basics::int16 *client_methods = 0;
02540 int client_method_count = 0;
02541
02542
02543 z_stream zstrm;
02544 bool zstrm_initialized = false;
02545
02546
02547
02548 char *raw_buf = 0, *send_buf = 0;
02549
02550 try
02551 {
02552
02553 len = sizeof(longid.value);
02554 srpc->recv_bytes_here((char *) &longid.value, len);
02555 who = srpc_recv_identity(srpc, intf_ver);
02556 client_methods = srpc->recv_int16_array(client_method_count);
02557 maxbytes = srpc->recv_int32();
02558 srpc->recv_end();
02559
02560 bool method_ok = false;
02561 for(unsigned int i = 0; i < client_method_count; i++)
02562 if(client_methods[i] == VestaSourceSRPC::compress_zlib_deflate)
02563 method_ok = true;
02564 if(!method_ok)
02565 {
02566 err = VestaSource::invalidArgs;
02567 }
02568 else
02569 {
02570
02571 ReadersWritersLock* lock = NULL;
02572 VestaSource* vs = longid.lookup(LongId::readLock, &lock);
02573 if (vs == NULL) {
02574 err = VestaSource::invalidArgs;
02575 } else {
02576 RWLOCK_LOCKED_REASON(lock, "SRPC:readWholeCompressed");
02577 if(vs->type != VestaSource::immutableFile) {
02578 err = VestaSource::inappropriateOp;
02579 } else {
02580 filesid = vs->shortId();
02581 bytes_left = vs->size();
02582 fd = FdCache::open(filesid, FdCache::ro);
02583 err = ((fd == -1)
02584 ? Repos::errno_to_errorCode(errno)
02585 : VestaSource::ok);
02586 }
02587 }
02588
02589
02590
02591 if (lock != NULL) lock->releaseRead();
02592
02593 if (vs) delete vs;
02594 }
02595
02596
02597 srpc->send_int((int) err);
02598 if (err == VestaSource::ok) {
02599 assert(fd != -1);
02600
02601
02602 srpc->send_int16(VestaSourceSRPC::compress_zlib_deflate);
02603
02604
02605
02606 zstrm.zalloc = Z_NULL;
02607 zstrm.zfree = Z_NULL;
02608 zstrm.opaque = Z_NULL;
02609 if (deflateInit(&zstrm, readWhole_deflate_level) != Z_OK)
02610 srpc->send_failure(SRPC::internal_trouble,
02611 "zlib deflateInit failed");
02612 zstrm_initialized = true;
02613
02614 raw_buf = NEW_PTRFREE_ARRAY(char, readWhole_raw_bufsiz);
02615 unsigned int send_size = ((maxbytes < readWhole_deflate_bufsiz)
02616 ? maxbytes
02617 : readWhole_deflate_bufsiz);
02618 send_buf = NEW_PTRFREE_ARRAY(char, send_size);
02619
02620 srpc->send_seq_start();
02621 while(bytes_left > 0)
02622 {
02623 zstrm.next_in = (Bytef *) raw_buf;
02624 int read_res;
02625 do
02626 read_res = ::pread(fd, raw_buf, readWhole_raw_bufsiz,
02627 bytes_read);
02628 while((read_res == -1) && (errno == EINTR));
02629 zstrm.avail_in = read_res;
02630
02631 if(zstrm.avail_in > 0)
02632 {
02633 assert(bytes_left >= zstrm.avail_in);
02634 bytes_left -= zstrm.avail_in;
02635 bytes_read += zstrm.avail_in;
02636 do
02637 {
02638 zstrm.avail_out = send_size;
02639 zstrm.next_out = (Bytef *) send_buf;
02640 int deflate_status = deflate(&zstrm,
02641 (bytes_left > 0
02642 ? Z_NO_FLUSH
02643 : Z_FINISH));
02644 if(deflate_status == Z_STREAM_ERROR)
02645 srpc->send_failure(SRPC::internal_trouble,
02646 "zlib deflate returned Z_STREAM_ERROR");
02647 srpc->send_bytes(send_buf,
02648 send_size - zstrm.avail_out);
02649 }
02650 while(zstrm.avail_out == 0);
02651 if(zstrm.avail_in != 0)
02652 srpc->send_failure(SRPC::internal_trouble,
02653 "zlib deflate left some input unconsumed");
02654 }
02655 else
02656 {
02657 int errno_save = errno;
02658 Text msg("error reading file for compression: ");
02659 msg += Basics::errno_Text(errno_save);
02660 srpc->send_failure(SRPC::internal_trouble, msg);
02661 }
02662 }
02663 srpc->send_seq_end();
02664
02665 srpc->send_end();
02666 }
02667 }
02668 catch (...)
02669 {
02670 if(zstrm_initialized) (void)deflateEnd(&zstrm);
02671 if(fd != -1) FdCache::close(filesid, fd, FdCache::ro);
02672 if(raw_buf) delete [] raw_buf;
02673 if(send_buf) delete [] send_buf;
02674 if(who) delete who;
02675 throw;
02676 }
02677 if(zstrm_initialized) (void)deflateEnd(&zstrm);
02678 if(fd != -1) FdCache::close(filesid, fd, FdCache::ro);
02679 if(raw_buf) delete [] raw_buf;
02680 if(send_buf) delete [] send_buf;
02681 if (who) delete who;
02682 }
02683
02684
02685 static const int g_min_intf_ver = 8;
02686
02687
02688 void
02689 VestaSourceReceptionist(SRPC *srpc, int intf_ver, int proc_id, void *arg)
02690 {
02691
02692
02693
02694 SRPC_Call_Stats::Helper stat_recorder;
02695
02696
02697 signal(SIGPIPE, SIG_IGN);
02698 signal(SIGQUIT, SIG_DFL);
02699 signal(SIGSEGV, SIG_DFL);
02700 signal(SIGABRT, SIG_DFL);
02701 signal(SIGILL, SIG_DFL);
02702 signal(SIGBUS, SIG_DFL);
02703
02704
02705 if (intf_ver < g_min_intf_ver || intf_ver > VestaSourceSRPC::version) {
02706 Text client = srpc->remote_socket();
02707 Repos::dprintf(DBG_ALWAYS,
02708 "VestaSourceReceptionist got unsupported interface "
02709 "version %d; client %s\n", intf_ver, client.cchars());
02710 srpc->send_failure(SRPC::version_skew,
02711 "VestaSourceSRPC: Unsupported interface version");
02712 return;
02713 }
02714
02715
02716 srpc->enable_read_timeout(readTimeout);
02717
02718 switch (proc_id)
02719 {
02720 case VestaSourceSRPC::Lookup:
02721 VestaSourceLookup(srpc, intf_ver);
02722 break;
02723 case VestaSourceSRPC::CreateVolatileDirectory:
02724 VestaSourceCreateVolatileDirectory(srpc, intf_ver);
02725 break;
02726 case VestaSourceSRPC::DeleteVolatileDirectory:
02727 VestaSourceDeleteVolatileDirectory(srpc, intf_ver);
02728 break;
02729 case VestaSourceSRPC::GetBase:
02730 VestaSourceGetBase(srpc, intf_ver);
02731 break;
02732 case VestaSourceSRPC::List:
02733 VestaSourceList(srpc, intf_ver);
02734 break;
02735 case VestaSourceSRPC::GetNFSInfo:
02736 VestaSourceGetNFSInfo(srpc, intf_ver);
02737 break;
02738 case VestaSourceSRPC::FPToShortId:
02739 VestaSourceFPToShortId(srpc, intf_ver);
02740 break;
02741 case VestaSourceSRPC::ReallyDelete:
02742 VestaSourceReallyDelete(srpc, intf_ver);
02743 break;
02744 case VestaSourceSRPC::InsertFile:
02745 VestaSourceInsertFile(srpc, intf_ver);
02746 break;
02747 case VestaSourceSRPC::InsertMutableFile:
02748 VestaSourceInsertMutableFile(srpc, intf_ver);
02749 break;
02750 case VestaSourceSRPC::InsertImmutableDirectory:
02751 VestaSourceInsertImmutableDirectory(srpc, intf_ver);
02752 break;
02753 case VestaSourceSRPC::InsertAppendableDirectory:
02754 VestaSourceInsertAppendableDirectory(srpc, intf_ver);
02755 break;
02756 case VestaSourceSRPC::InsertMutableDirectory:
02757 VestaSourceInsertMutableDirectory(srpc, intf_ver);
02758 break;
02759 case VestaSourceSRPC::InsertGhost:
02760 VestaSourceInsertGhost(srpc, intf_ver);
02761 break;
02762 case VestaSourceSRPC::InsertStub:
02763 VestaSourceInsertStub(srpc, intf_ver);
02764 break;
02765 case VestaSourceSRPC::RenameTo:
02766 VestaSourceRenameTo(srpc, intf_ver);
02767 break;
02768 case VestaSourceSRPC::MakeMutable:
02769 VestaSourceMakeMutable(srpc, intf_ver);
02770 break;
02771 case VestaSourceSRPC::InAttribs:
02772 VestaSourceInAttribs(srpc, intf_ver);
02773 break;
02774 case VestaSourceSRPC::GetAttrib:
02775 VestaSourceGetAttrib(srpc, intf_ver);
02776 break;
02777 case VestaSourceSRPC::GetAttrib2:
02778 VestaSourceGetAttrib2(srpc, intf_ver);
02779 break;
02780 case VestaSourceSRPC::ListAttribs:
02781 VestaSourceListAttribs(srpc, intf_ver);
02782 break;
02783 case VestaSourceSRPC::GetAttribHistory:
02784 VestaSourceGetAttribHistory(srpc, intf_ver);
02785 break;
02786 case VestaSourceSRPC::WriteAttrib:
02787 VestaSourceWriteAttrib(srpc, intf_ver);
02788 break;
02789 case VestaSourceSRPC::LookupPathname:
02790 VestaSourceLookupPathname(srpc, intf_ver);
02791 break;
02792 case VestaSourceSRPC::LookupIndex:
02793 VestaSourceLookupIndex(srpc, intf_ver);
02794 break;
02795 case VestaSourceSRPC::MakeFilesImmutable:
02796 MakeFilesImmutable(srpc, intf_ver);
02797 break;
02798 case VestaSourceSRPC::SetIndexMaster:
02799 SetIndexMaster(srpc, intf_ver);
02800 break;
02801 case VestaSourceSRPC::Stat:
02802 VSStat(srpc, intf_ver);
02803 break;
02804 case VestaSourceSRPC::Read:
02805 VSRead(srpc, intf_ver);
02806 break;
02807 case VestaSourceSRPC::Write:
02808 VSWrite(srpc, intf_ver);
02809 break;
02810 case VestaSourceSRPC::SetExecutable:
02811 VSSetExecutable(srpc, intf_ver);
02812 break;
02813 case VestaSourceSRPC::SetSize:
02814 VSSetSize(srpc, intf_ver);
02815 break;
02816 case VestaSourceSRPC::SetTimestamp:
02817 VSSetTimestamp(srpc, intf_ver);
02818 break;
02819 case VestaSourceSRPC::Atomic:
02820 VSAtomic(srpc, intf_ver);
02821 break;
02822 case VestaSourceSRPC::AtomicTarget:
02823 case VestaSourceSRPC::AtomicDeclare:
02824 case VestaSourceSRPC::AtomicResync:
02825 case VestaSourceSRPC::AtomicTestMaster:
02826 case VestaSourceSRPC::AtomicRun:
02827 case VestaSourceSRPC::AtomicCancel:
02828 {
02829 Text client = srpc->remote_socket();
02830 Repos::dprintf(DBG_ALWAYS,
02831 "VestaSourceReceptionist got misplaced Atomic "
02832 "proc_id %d; client %s\n", proc_id, client.cchars());
02833 srpc->send_failure(SRPC::version_skew,
02834 "VestaSourceSRPC: Misplaced Atomic proc_id");
02835 }
02836 break;
02837 case VestaSourceSRPC::AcquireMastership:
02838 VestaSourceAcquireMastership(srpc, intf_ver);
02839 break;
02840 case VestaSourceSRPC::CedeMastership:
02841 VestaSourceCedeMastership(srpc, intf_ver);
02842 break;
02843 case VestaSourceSRPC::Replicate:
02844 VestaSourceReplicate(srpc, intf_ver);
02845 break;
02846 case VestaSourceSRPC::ReplicateAttribs:
02847 VestaSourceReplicateAttribs(srpc, intf_ver);
02848 break;
02849 case VestaSourceSRPC::GetUserInfo:
02850 VestaSourceGetUserInfo(srpc, intf_ver);
02851 break;
02852 case VestaSourceSRPC::RefreshAccessTables:
02853 VestaSourceRefreshAccessTables(srpc, intf_ver);
02854 break;
02855 case VestaSourceSRPC::GetStats:
02856 VestaSourceGetStats(srpc, intf_ver);
02857 break;
02858 case VestaSourceSRPC::MeasureDirectory:
02859 VestaSourceMeasureDirectory(srpc, intf_ver);
02860 break;
02861 case VestaSourceSRPC::CollapseBase:
02862 VestaSourceCollapseBase(srpc, intf_ver);
02863 break;
02864 case VestaSourceSRPC::SetPerfDebug:
02865 SetPerfDebug(srpc, intf_ver);
02866 break;
02867 case VestaSourceSRPC::GetServerInfo:
02868 GetServerInfo(srpc, intf_ver);
02869 break;
02870 case VestaSourceSRPC::ReadWholeCompressed:
02871 VSReadWholeCompressed(srpc, intf_ver);
02872 break;
02873 default:
02874 {
02875 Text client = srpc->remote_socket();
02876 Repos::dprintf(DBG_ALWAYS, "VestaSourceReceptionist got unknown "
02877 "proc_id %d; client %s\n", proc_id, client.cchars());
02878 srpc->send_failure(SRPC::version_skew,
02879 "VestaSourceSRPC: Unknown proc_id");
02880 }
02881 break;
02882 }
02883 }
02884
02885 void
02886 VestaSourceFailure(SRPC* srpc, const SRPC::failure &f, void* arg)
02887 {
02888 if (f.r == SRPC::partner_went_away && !Repos::isDebugLevel(DBG_SRPC))
02889 return;
02890 Text client;
02891 try {
02892 client = srpc->remote_socket();
02893 } catch (SRPC::failure f) {
02894 client = "unknown";
02895 }
02896 Repos::dprintf(DBG_ALWAYS,
02897 "VestaSourceReceptionist got SRPC::failure: %s (%d); "
02898 "client %s\n", f.msg.cchars(), f.r, client.cchars());
02899 }
02900
02901 void
02902 VestaSourceServerExport()
02903 {
02904 Text port;
02905 int max_running=DEF_MAX_RUNNING, max_blocked=DEF_MAX_BLOCKED;
02906 Text unused;
02907
02908 try {
02909
02910
02911
02912 port = VestaConfig::get_Text("Repository", "VestaSourceSRPC_port");
02913 if(VestaConfig::get("Repository",
02914 "VestaSourceSRPC_max_running", unused))
02915 max_running = VestaConfig::get_int("Repository",
02916 "VestaSourceSRPC_max_running");
02917 if(VestaConfig::get("Repository",
02918 "VestaSourceSRPC_max_blocked", unused))
02919 max_blocked = VestaConfig::get_int("Repository",
02920 "VestaSourceSRPC_max_blocked");
02921 if(VestaConfig::get("Repository",
02922 "VestaSourceSRPC_read_timeout", unused))
02923 readTimeout = VestaConfig::get_int("Repository",
02924 "VestaSourceSRPC_read_timeout");
02925 if(VestaConfig::get("Repository",
02926 "VestaSourceSRPC_readWhole_raw_bufsiz", unused))
02927 readWhole_raw_bufsiz =
02928 VestaConfig::get_int("Repository",
02929 "VestaSourceSRPC_readWhole_raw_bufsiz");
02930 if(VestaConfig::get("Repository",
02931 "VestaSourceSRPC_readWhole_deflate_bufsiz",
02932 unused))
02933 readWhole_deflate_bufsiz =
02934 VestaConfig::get_int("Repository",
02935 "VestaSourceSRPC_readWhole_deflate_bufsiz");
02936 if(VestaConfig::get("Repository",
02937 "VestaSourceSRPC_readWhole_deflate_level",
02938 unused))
02939 readWhole_deflate_level =
02940 VestaConfig::get_int("Repository",
02941 "VestaSourceSRPC_readWhole_deflate_level");
02942 } catch (VestaConfig::failure f) {
02943 Repos::dprintf(DBG_ALWAYS,
02944 "VestaConfig::failure in VestaSourceServerExport: %s\n",
02945 f.msg.cchars());
02946 abort();
02947 }
02948
02949 static Basics::thread_attr ls_thread_attr(STACK_SIZE);
02950
02951 #if defined (_POSIX_THREAD_PRIORITY_SCHEDULING) && !defined(__linux__)
02952
02953 ls_thread_attr.set_schedpolicy(SCHED_RR);
02954 ls_thread_attr.set_inheritsched(PTHREAD_EXPLICIT_SCHED);
02955 ls_thread_attr.set_sched_priority(sched_get_priority_min(SCHED_RR));
02956 #endif
02957
02958 static LimService receptionist(port, max_running,
02959 max_blocked, VestaSourceReceptionist,
02960 VestaSourceFailure, NULL, ls_thread_attr);
02961 receptionist.Forked_Run();
02962 }