Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

VDirEvaluator.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // 
00020 // VDirEvaluator.C
00021 //
00022 // Directory of type evaluatorDirectory.  The contents of such a
00023 // directory are a binding held by the evalutor.  The methods of this
00024 // object type make remote calls to the evalutor.
00025 //
00026 
00027 #include <assert.h>
00028 #include "Basics.H"
00029 #include "VDirEvaluator.H"
00030 #include "Evaluator_Dir_SRPC.H"
00031 #include "MultiSRPC.H"
00032 #include "VLeaf.H"
00033 #include "VMemPool.H"
00034 #include "VestaConfig.H"
00035 #include "IndexKey.H"
00036 #include "ArcTable.H"
00037 #include "logging.H"
00038 #include "ListResults.H"
00039 
00040 #include "timing.H"
00041 
00042 // Types
00043 struct EntryInfo {
00044     // One entry from an evaluator[ROE]Directory
00045     VestaSource::typeTag type;
00046     char* name;
00047     unsigned int index;
00048     struct EvalDirInfo* edi; // if type= evaluator[ROE]Directory
00049     ShortId sid;             // if type= immutableFile
00050     FP::Tag fptag;           // if type= immutableFile
00051 };
00052 typedef Table<IndexKey, EntryInfo*>::Default IndexToEntryTable;
00053 typedef Table<IndexKey, EntryInfo*>::Iterator IndexToEntryIter;
00054 typedef Table<ArcKey, EntryInfo*>::Default ArcToEntryTable;
00055 typedef Table<ArcKey, EntryInfo*>::Iterator ArcToEntryIter;
00056 
00057 struct EvalDirInfo {
00058     // Local representation and entry cache for evaluator[ROE]Directory
00059     // Locking: the entry caches may need to be modified by threads holding
00060     //   only a read lock on the current volatile tree, so the locking
00061     //   rule is that you must hold either a write lock on the current
00062     //   volatile tree, or a read lock *and* cache_mu (below).
00063     char* hostname;
00064     char* port;
00065     Bit64 dirhandle;
00066     Bit32 timestamp;
00067     IndexToEntryTable itab;
00068     ArcToEntryTable atab;
00069     Bit8* rep; // pointer to rep in VMemPool
00070     bool* alive; // pointer to a member of evaluator top directory
00071                  // (VolatileRootEntry)
00072     Basics::mutex cache_mu;
00073 };
00074 
00075 // Module globals
00076 static MultiSRPC* multi; // has internal mutex
00077 static pthread_once_t once = PTHREAD_ONCE_INIT;
00078 static int listChunkSize = 8300;
00079 static int listPerEntryOverhead = 40;
00080 
00081 // By default we give the evaluator a maximum of five minutes to respond
00082 // to a call to the ToolDirectoryServer.  Of course normally it should
00083 // be much less than this.  The limit is there just to prevent a
00084 // suspended evaluator causing a lock to be held and a thread to be
00085 // waiting indefinitely.
00086 static unsigned int readTimeout = 300;
00087 
00088 static FP::Tag nullFPTag("");
00089 
00090 extern "C"
00091 {
00092   static void
00093   VDirEvaluator_init()
00094   {
00095     try {
00096         Text val;
00097         multi = NEW(MultiSRPC);
00098         assert(DIR_HANDLE_BYTES == sizeof(Bit64));
00099         if (VestaConfig::get("Repository",
00100                              "EvaluatorDirSRPC_listChunkSize", val)) {
00101             listChunkSize = atoi(val.cchars());
00102         }
00103         if (VestaConfig::get("Repository",
00104                              "EvaluatorDirSRPC_listPerEntryOverhead", val)) {
00105             listPerEntryOverhead = atoi(val.cchars());
00106         }
00107         if (VestaConfig::get("Repository",
00108                              "EvaluatorDirSRPC_read_timeout", val)) {
00109             readTimeout = atoi(val.cchars());
00110         }
00111     } catch (VestaConfig::failure f) {
00112         Repos::dprintf(DBG_ALWAYS, 
00113                        "VDirEvaluator_init got VestaConfig::failure: %s\n",
00114                        f.msg.cchars());
00115         abort();
00116     }
00117   }
00118 }
00119 
00120 VDirEvaluator::VDirEvaluator(VestaSource::typeTag type, 
00121                              const char* hostname, const char* port,
00122                              Bit64 dirhandle, bool* alive, time_t timestamp)
00123   throw ()
00124 {
00125     pthread_once(&once, VDirEvaluator_init);
00126     this->type = type;
00127 
00128     rep = (Bit8*) VMemPool::allocate(VMemPool::vDirEvaluator,
00129                                      VDIREV_SIZE);
00130     edi = NEW(EvalDirInfo);
00131     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00132     setRepEDI(edi);
00133     edi->hostname = strdup(hostname);
00134     edi->port = strdup(port);
00135     edi->dirhandle = dirhandle;
00136     edi->timestamp = timestamp;
00137     edi->rep = rep;
00138     edi->alive = alive;
00139     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00140 }
00141 
00142 VDirEvaluator::VDirEvaluator(VestaSource::typeTag type, Bit8* rep) throw ()
00143 {
00144     this->type = type;
00145     this->rep = rep;
00146     this->edi = repEDI();
00147     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00148 }
00149 
00150 VDirEvaluator::VDirEvaluator(VestaSource::typeTag type, EvalDirInfo* edi)
00151   throw ()
00152 {
00153     this->type = type;
00154     this->rep = edi->rep;
00155     this->edi = edi;
00156     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00157 }
00158 
00159 VestaSource::errorCode
00160 VDirEvaluator::lookup(Arc arc, VestaSource*& result,
00161                       AccessControl::Identity who,
00162                       unsigned int indexOffset) throw ()
00163 {
00164   // Start by initializing the result, in case we fail before setting
00165   // it to something useful.
00166   result = 0;
00167 
00168     bool hit = false;
00169     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00170 
00171     // Optimize out repeated SRPCs to dead evaluator
00172     if (!(*edi->alive)) return VestaSource::rpcFailure;
00173 
00174     // Look in cache first
00175     ArcKey ak(arc, strlen(arc));
00176     EntryInfo* ei;
00177     edi->cache_mu.lock();
00178     if (edi->atab.Get(ak, ei)) {
00179         // Hit
00180         hit = true;
00181         LongId result_longid;
00182         switch (ei->type) {
00183           case VestaSource::evaluatorDirectory:
00184           case VestaSource::evaluatorROEDirectory:
00185             Repos::dprintf(DBG_VDIREVAL, 
00186                            "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00187                            " -> dir 0x%" FORMAT_LENGTH_INT_64 "x (cache)\n",
00188                            arc, edi->dirhandle, ei->edi->dirhandle);
00189             
00190             // Check for LongId overflow.
00191             result_longid = longid.append(ei->index);
00192             if(result_longid == NullLongId)
00193               {
00194                 edi->cache_mu.unlock();
00195                 return VestaSource::longIdOverflow;
00196               }
00197 
00198             result = NEW_CONSTR(VDirEvaluator, (ei->type, ei->edi));
00199             result->ac = this->ac;
00200             result->longid = result_longid;
00201             result->pseudoInode = indexToPseudoInode(ei->index);
00202             break;
00203           case VestaSource::device:
00204             Repos::dprintf(DBG_VDIREVAL, 
00205                            "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00206                            " -> device 0x%x (cache)\n",
00207                            arc, edi->dirhandle, ei->sid);
00208 
00209             // Check for LongId overflow.
00210             result_longid = longid.append(ei->index);
00211             if(result_longid == NullLongId)
00212               {
00213                 edi->cache_mu.unlock();
00214                 return VestaSource::longIdOverflow;
00215               }
00216 
00217             result = NEW_CONSTR(VLeaf, (ei->type, ei->sid));
00218             result->ac = this->ac;
00219             result->longid = result_longid;
00220             result->pseudoInode = indexToPseudoInode(ei->index);
00221             break;
00222           case VestaSource::immutableFile:
00223             Repos::dprintf(DBG_VDIREVAL, 
00224                            "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00225                            " -> sid 0x%08x (cache)\n",
00226                            arc, edi->dirhandle, ei->sid);
00227 
00228             // Check for LongId overflow.  (We skip this if our type
00229             // is evaluatorROEDirectory, as then the object will get a
00230             // ShortId derived LongId.)
00231             if((VestaSource::type != VestaSource::evaluatorROEDirectory) &&
00232                ((result_longid = longid.append(ei->index))  == NullLongId))
00233               {
00234                 edi->cache_mu.unlock();
00235                 return VestaSource::longIdOverflow;
00236               }
00237 
00238             result = NEW_CONSTR(VLeaf, (ei->type, ei->sid));
00239             result->fptag = ei->fptag;
00240             result->ac = this->ac;
00241             if (VestaSource::type == VestaSource::evaluatorROEDirectory) {
00242                 result->longid = LongId::fromShortId(ei->sid, &ei->fptag);
00243                 result->pseudoInode = ei->sid;
00244                 result->ac.mode = 0444;
00245             } else {
00246                 result->longid = result_longid;
00247                 result->pseudoInode = indexToPseudoInode(ei->index);
00248             }
00249             break;
00250           case VestaSource::deleted:
00251             // Failed lookup
00252             Repos::dprintf(DBG_VDIREVAL, 
00253                            "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00254                            " -> notFound (cache)\n",
00255                            arc, edi->dirhandle);
00256             result = NULL;
00257             edi->cache_mu.unlock();
00258             return VestaSource::notFound;
00259           default:
00260             assert(false);
00261             break;
00262         }
00263         result->master = true;
00264         result->attribs = NULL;
00265         edi->cache_mu.unlock();
00266         return VestaSource::ok;
00267     }
00268     edi->cache_mu.unlock();
00269     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00270 
00271     // Do SRPC
00272     RECORD_TIME_POINT;
00273     MultiSRPC::ConnId id = -1;
00274     VestaSource::errorCode err;
00275     try {
00276         SRPC *srpc;
00277         id = multi->Start((Text) edi->hostname, (Text) edi->port, srpc);
00278         srpc->enable_read_timeout(readTimeout);
00279         
00280         srpc->start_call(ed_lookup, EVALUATOR_DIR_SRPC_VERSION);
00281         srpc->send_bytes((const char*) &edi->dirhandle, sizeof(Bit64));
00282         srpc->send_chars(arc);
00283         srpc->send_end();
00284         int edtype = srpc->recv_int();
00285 
00286         unsigned int rawIndex, index;
00287         Bit64 subdirHandle = 0;
00288         ShortId sid = NullShortId;
00289         FP::Tag fptag = nullFPTag;
00290         int len;
00291         EvalDirInfo* resEDI = NULL;
00292         LongId result_longid;
00293         
00294         switch (edtype) {
00295           case ed_none:
00296             srpc->recv_end();
00297             Repos::dprintf(DBG_VDIREVAL,
00298                            "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00299                            " -> notFound\n",
00300                            arc, edi->dirhandle);
00301             err = VestaSource::notFound;
00302             break;
00303 
00304           case ed_directory:
00305             rawIndex = (unsigned int) srpc->recv_int();
00306             index = indexOffset + 2*(rawIndex + 1);
00307             len = sizeof(subdirHandle);
00308             srpc->recv_bytes_here((char*) &subdirHandle, len);
00309             srpc->recv_end();
00310             // Check for LongId overflow.
00311             result_longid = longid.append(index);
00312             if(result_longid == NullLongId)
00313               {
00314                 err = VestaSource::longIdOverflow;
00315               }
00316             else
00317               {
00318                 result = NEW_CONSTR(VDirEvaluator,
00319                                     (this->type,
00320                                      edi->hostname, edi->port,
00321                                      subdirHandle, edi->alive, edi->timestamp));
00322                 resEDI = ((VDirEvaluator*)result)->edi; // save for entry cache
00323                 result->fptag = fptag;
00324                 result->longid = result_longid;
00325                 result->master = true;
00326                 result->ac = this->ac;
00327                 result->pseudoInode = indexToPseudoInode(index);
00328                 result->attribs = NULL;
00329                 Repos::dprintf(DBG_VDIREVAL,
00330                                "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00331                                " -> dir 0x%" FORMAT_LENGTH_INT_64 "x\n",
00332                                arc, edi->dirhandle, subdirHandle);
00333                 err = VestaSource::ok;
00334               }
00335             break;
00336 
00337           case ed_file:
00338             rawIndex = (unsigned int) srpc->recv_int();
00339             index = indexOffset + 2*(rawIndex + 1);
00340             sid = (ShortId) srpc->recv_int();
00341             fptag.Recv(*srpc);
00342             srpc->recv_end();
00343             // Check for LongId overflow.  (We skip this if our type
00344             // is evaluatorROEDirectory, as then the object will get a
00345             // ShortId derived LongId.)
00346             if((VestaSource::type != VestaSource::evaluatorROEDirectory) &&
00347                ((result_longid = longid.append(index)) == NullLongId))
00348               {
00349                 err = VestaSource::longIdOverflow;
00350               }
00351             else
00352               {
00353                 result = NEW_CONSTR(VLeaf, (VestaSource::immutableFile, sid));
00354                 result->fptag = fptag;
00355                 result->ac = this->ac;
00356                 if (VestaSource::type == VestaSource::evaluatorROEDirectory) {
00357                   result->longid = LongId::fromShortId(sid, &fptag);
00358                   result->pseudoInode = sid;
00359                   result->ac.mode = 0444;
00360                 } else {
00361                   result->longid = result_longid;
00362                   result->pseudoInode = indexToPseudoInode(index);
00363                 }
00364                 result->master = true;
00365                 result->attribs = NULL;
00366                 Repos::dprintf(DBG_VDIREVAL, 
00367                                "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00368                                " -> sid 0x%08x\n",
00369                                arc, edi->dirhandle, sid);
00370                 err = VestaSource::ok;
00371               }
00372             break;
00373 
00374           case ed_device:
00375             rawIndex = (unsigned int) srpc->recv_int();
00376             index = indexOffset + 2*(rawIndex + 1);
00377             sid = (ShortId) srpc->recv_int(); // really the device number
00378             srpc->recv_end();
00379             // Check for LongId overflow.
00380             result_longid = longid.append(index);
00381             if(result_longid == NullLongId)
00382               {
00383                 err = VestaSource::longIdOverflow;
00384               }
00385             else
00386               {
00387                 result = NEW_CONSTR(VLeaf, (VestaSource::device, sid));
00388                 result->fptag = fptag;
00389                 result->longid = result_longid;
00390                 result->master = true;
00391                 result->ac = this->ac;
00392                 result->pseudoInode = indexToPseudoInode(index);
00393                 result->attribs = NULL;
00394                 Repos::dprintf(DBG_VDIREVAL, 
00395                                "eval dir lookup %s in 0x%" FORMAT_LENGTH_INT_64 "x"
00396                                " -> device 0x%x\n",
00397                                arc, edi->dirhandle, sid);
00398                 err = VestaSource::ok;
00399               }
00400             break;
00401 
00402           default:
00403             assert(false);
00404             break;
00405         }
00406         assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00407         if (err == VestaSource::ok) {
00408             // Enter into cache
00409             edi->cache_mu.lock();
00410             EntryInfo* ei;
00411             if (edi->itab.Delete(index, ei)) {
00412                 // Already an entry?!  This should happen only if
00413                 // another thread just added it since we took our
00414                 // cache miss above.
00415                 assert(result->type == ei->type);
00416                 assert(strcmp(ei->name, arc) == 0);
00417                 /*assert(ei->handle == subdirHandle);*/ /* not guaranteed */
00418                 assert(ei->sid == sid);
00419                 assert(ei->fptag == fptag);
00420                 free(ei->name);
00421                 delete ei;
00422             }
00423             ei = NEW(EntryInfo);
00424             ei->type = result->type;
00425             ei->name = strdup(arc);
00426             ei->index = index;
00427             ei->edi = resEDI;
00428             ei->sid = sid;
00429             ei->fptag = fptag;
00430             edi->itab.Put(index, ei);
00431             ArcKey ak(ei->name, strlen(ei->name));
00432             edi->atab.Put(ak, ei);
00433             if (resEDI) {
00434               assert(VMemPool::type(resEDI->rep) == VMemPool::vDirEvaluator);
00435             }
00436             edi->cache_mu.unlock();
00437         } else if (err == VestaSource::notFound) {
00438             // Enter failed lookup into cache
00439             edi->cache_mu.lock();
00440             EntryInfo* ei;
00441             char* duparc = strdup(arc);
00442             ArcKey ak(duparc, strlen(duparc));
00443             if (edi->atab.Delete(ak, ei)) {
00444                 // Already an entry?!  This should happen only if
00445                 // another thread just added it since we took our
00446                 // cache miss above.
00447                 assert(ei->type == VestaSource::deleted);
00448                 assert(strcmp(ei->name, arc) == 0);
00449                 free(ei->name);
00450                 delete ei;
00451             }
00452             ei = NEW(EntryInfo);
00453             ei->type = VestaSource::deleted;
00454             ei->name = duparc;
00455             ei->index = 0;
00456             ei->edi = NULL;
00457             ei->sid = NullShortId;
00458             ei->fptag = nullFPTag;
00459             edi->atab.Put(ak, ei);
00460             edi->cache_mu.unlock();
00461         }
00462         assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00463 
00464     } catch (SRPC::failure f) {
00465         char buf[100];
00466         Repos::pr_nfs_fh(buf, (nfs_fh*) &longid.value);
00467         Repos::dprintf(DBG_ALWAYS,
00468                        "VDirEvaluator::lookup of %s in %s (%s:%s "
00469                        "0x%" FORMAT_LENGTH_INT_64 "x)"
00470                        " returns VestaSource::rpcFailure: %s (%d)\n",
00471                        arc, buf, edi->hostname, edi->port, edi->dirhandle, 
00472                        f.msg.cchars(), f.r);
00473         err = VestaSource::rpcFailure;
00474         *edi->alive = false;
00475     }
00476     multi->End(id);
00477     RECORD_TIME_POINT;
00478     return err;
00479 }
00480 
00481 VestaSource::errorCode
00482 VDirEvaluator::lookupIndex(unsigned int index, VestaSource*& result,
00483                            char* arcbuf) throw ()
00484 {
00485   // Start by initializing the result, in case we fail before setting
00486   // it to something useful.
00487   result = 0;
00488 
00489     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00490     // An evaluatorDirectory has only even indices (from our point of
00491     // view).  Also, 0 is not a valid index; it terminates a LongId.
00492     if (index & 1) {
00493         return VestaSource::notFound;
00494     }
00495 
00496     // Optimize out repeated SRPCs to dead evaluator
00497     if (!(*edi->alive)) return VestaSource::rpcFailure;
00498 
00499     // Look in cache first
00500     EntryInfo* ei;
00501     edi->cache_mu.lock();
00502     if (edi->itab.Get(index, ei)) {
00503         // Hit
00504       LongId result_longid;
00505         switch (ei->type) {
00506           case VestaSource::evaluatorDirectory:
00507           case VestaSource::evaluatorROEDirectory:
00508             Repos::dprintf(DBG_VDIREVAL, 
00509                            "eval dir lookupIndex %u in 0x%" FORMAT_LENGTH_INT_64 "x"
00510                            " -> dir 0x%" FORMAT_LENGTH_INT_64 "x "
00511                            "(cache)\n", index, edi->dirhandle, ei->edi->dirhandle);
00512             
00513             // Check for LongId overflow.
00514             result_longid = longid.append(index);
00515             if(result_longid == NullLongId)
00516               {
00517                 edi->cache_mu.unlock();
00518                 return VestaSource::longIdOverflow;
00519               }
00520 
00521             result = NEW_CONSTR(VDirEvaluator, (ei->type, ei->edi));
00522             result->ac = this->ac;
00523             result->longid = result_longid;
00524             result->pseudoInode = indexToPseudoInode(index);
00525             break;
00526           case VestaSource::device:
00527             Repos::dprintf(DBG_VDIREVAL, 
00528                            "eval dir lookupIndex %u in 0x%" FORMAT_LENGTH_INT_64 "x"
00529                            " -> device 0x%x "
00530                            "(cache)\n", index, edi->dirhandle, ei->sid);
00531             
00532             // Check for LongId overflow.
00533             result_longid = longid.append(index);
00534             if(result_longid == NullLongId)
00535               {
00536                 edi->cache_mu.unlock();
00537                 return VestaSource::longIdOverflow;
00538               }
00539 
00540             result = NEW_CONSTR(VLeaf, (ei->type, ei->sid));
00541             result->ac = this->ac;
00542             result->longid = result_longid;
00543             result->pseudoInode = indexToPseudoInode(index);
00544             break;
00545           case VestaSource::immutableFile:
00546             Repos::dprintf(DBG_VDIREVAL, 
00547                            "eval dir lookupIndex %u in 0x%" FORMAT_LENGTH_INT_64 "x"
00548                            " -> sid 0x%08x "
00549                            "(cache)\n", index, edi->dirhandle, ei->sid);
00550 
00551             // Check for LongId overflow.  (We skip this if our type
00552             // is evaluatorROEDirectory, as then the object will get a
00553             // ShortId derived LongId.)
00554             if((VestaSource::type != VestaSource::evaluatorROEDirectory) &&
00555                ((result_longid = longid.append(ei->index))  == NullLongId))
00556               {
00557                 edi->cache_mu.unlock();
00558                 return VestaSource::longIdOverflow;
00559               }
00560 
00561             result = NEW_CONSTR(VLeaf, (ei->type, ei->sid));
00562             result->ac = this->ac;
00563             if (VestaSource::type == VestaSource::evaluatorROEDirectory) {
00564                 result->longid = LongId::fromShortId(ei->sid, &ei->fptag);
00565                 result->pseudoInode = ei->sid;
00566                 result->ac.mode = 0444;
00567             } else {
00568                 result->longid = result_longid;
00569                 result->pseudoInode = indexToPseudoInode(index);
00570             }
00571             break;
00572           default:
00573             assert(false);
00574             break;
00575         }
00576         result->fptag = ei->fptag;
00577         result->master = true;
00578         result->attribs = NULL;
00579         if (arcbuf != NULL) strcpy(arcbuf, ei->name);
00580         edi->cache_mu.unlock();
00581         assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00582         return VestaSource::ok;
00583     }
00584     edi->cache_mu.unlock();
00585 
00586     // Miss; do SRPC
00587     // Should never happen: we never discard anything from our cache
00588     assert(false);
00589     return VestaSource::notFound;
00590 }
00591 
00592 
00593 VestaSource::errorCode
00594 VDirEvaluator::list(unsigned int firstIndex,
00595                     VestaSource::listCallback callback, void* closure,
00596                     AccessControl::Identity who,
00597                     bool deltaOnly, unsigned int indexOffset) throw ()
00598 {
00599     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00600     int curChunkSize = 0;
00601     VestaSource::errorCode err = VestaSource::ok;
00602     // Convert 0 or even index > 0 to raw form
00603     assert(indexOffset == 0);
00604     assert((firstIndex & 1) == 0);
00605     int rawIndex = (firstIndex == 0) ? 0 : (firstIndex >> 1) - 1; 
00606 
00607     if (deltaOnly) return VestaSource::ok;
00608 
00609     // Optimize out repeated SRPCs to dead evaluator
00610     if (!(*edi->alive)) return VestaSource::rpcFailure;
00611 
00612     RECORD_TIME_POINT;
00613     MultiSRPC::ConnId id = -1;
00614     try {
00615         SRPC *srpc;
00616         id = multi->Start((Text) edi->hostname, (Text) edi->port, srpc);
00617         srpc->enable_read_timeout(readTimeout);
00618 
00619         // Do multiple calls, each fetching about chunkSize bytes worth of
00620         // results.  The idea is that our client will probably return
00621         // false ("stop") after it has received that many bytes, so we
00622         // don't want to get more than that in a stream from the server.
00623         bool stop_req = false, dir_end = false;
00624         // Note that we buffer the results of each call *before* calling
00625         // the callback, as we don't want to hold up the server during the
00626         // callback's execution.
00627         ListResultSeq list_result;
00628         do {
00629             srpc->start_call(ed_list, EVALUATOR_DIR_SRPC_VERSION);
00630             srpc->send_bytes((const char*) &edi->dirhandle, sizeof(Bit64));
00631             srpc->send_int(rawIndex); 
00632             srpc->send_int(listChunkSize);
00633             srpc->send_int(listPerEntryOverhead);
00634             srpc->send_end();
00635             
00636             srpc->recv_seq_start();
00637             while(!dir_end) {
00638                 ListResultItem list_item;
00639                 int arclen = sizeof(list_item.arc);
00640                 bool got_end;
00641                 srpc->recv_chars_here(list_item.arc, arclen, &got_end);
00642                 if (got_end) break;
00643                 ed_entry_type edtype = (ed_entry_type) srpc->recv_int();
00644                 list_item.filesid = (ShortId) srpc->recv_int();
00645                 list_item.index = (rawIndex + 1) * 2;
00646                 rawIndex++;
00647                 list_item.pseudoInode = indexToPseudoInode(list_item.index);
00648                 switch (edtype) {
00649                   case ed_none:
00650                     dir_end = true;
00651                   case ed_directory:
00652                     list_item.type = this->type;
00653                     break;
00654                   case ed_file:
00655                     list_item.type = VestaSource::immutableFile;
00656                     if (this->type == VestaSource::evaluatorROEDirectory) {
00657                         list_item.pseudoInode = list_item.filesid;
00658                     }
00659                     break;
00660                   case ed_device:
00661                     list_item.type = VestaSource::device;
00662                     break;
00663                 }
00664                 // Evaluator directory entries are always master.
00665                 list_item.master = true;
00666 
00667                 // Add this item to the result sequence unless this
00668                 // was the terminating entry.
00669                 if(!dir_end)
00670                   list_result.addhi(list_item);
00671             }
00672             srpc->recv_seq_end();
00673             srpc->recv_end();
00674 
00675             // Call the callback once per entry returned by the server
00676             // until the callback tells us to stop or we run out.
00677             while((list_result.size() > 0) && !stop_req)
00678               {
00679                 ListResultItem list_item = list_result.remlo();
00680             
00681                 curChunkSize += strlen(list_item.arc) + listPerEntryOverhead;
00682 
00683                 stop_req = !callback(closure, list_item.type, list_item.arc,
00684                                      list_item.index, list_item.pseudoInode,
00685                                      list_item.filesid, list_item.master);
00686               }
00687 
00688             if(stop_req && listChunkSize > curChunkSize + 100)
00689               {
00690                 // Seems we guessed wrong; use smaller size now.
00691                 listChunkSize = curChunkSize + 100;
00692               }
00693 
00694         } while (!stop_req && !dir_end);
00695 
00696     } catch (SRPC::failure f) {
00697         char buf[100];
00698         Repos::pr_nfs_fh(buf, (nfs_fh*) &longid.value);
00699         Repos::dprintf(DBG_ALWAYS,
00700                        "VDirEvaluator::list of %s (%s:%s "
00701                        "0x%" FORMAT_LENGTH_INT_64 "x)"
00702                        " returns VestaSource::rpcFailure: %s (%d)\n", 
00703                        buf, edi->hostname, edi->port, edi->dirhandle, 
00704                        f.msg.cchars(), f.r);
00705         err = VestaSource::rpcFailure;
00706         *edi->alive = false;
00707     }
00708     multi->End(id);
00709     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00710     RECORD_TIME_POINT;
00711     return err;
00712 }
00713 
00714 void
00715 VDirEvaluator::mark(bool byName, ArcTable* hidden) throw ()
00716 {
00717   assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00718   setVisited(true);
00719   // Mark cached children
00720   IndexToEntryIter iter(&edi->itab);
00721   IndexKey ik;
00722   EntryInfo* ei;
00723   while (iter.Next(ik, ei)) {
00724     if (ei->type == VestaSource::evaluatorDirectory ||
00725         ei->type == VestaSource::evaluatorROEDirectory) {
00726       VDirEvaluator child(VestaSource::evaluatorDirectory, ei->edi);
00727       child.mark();
00728     }
00729   }
00730 }
00731 
00732 void
00733 VDirEvaluator::markCallback(void* closure, VMemPool::typeCode type) throw ()
00734 {
00735     // Nothing to do
00736 }
00737 
00738 bool
00739 VDirEvaluator::sweepCallback(void* closure, VMemPool::typeCode type,
00740                              void* addr, Bit32& size) throw ()
00741 {
00742     VDirEvaluator vs(VestaSource::evaluatorDirectory, (Bit8*) addr);
00743     bool retain = vs.visited();
00744     vs.setHasName(false);
00745     vs.setVisited(false);
00746     size = VDIREV_SIZE;
00747     if (!retain) {
00748       ArcToEntryIter iter(&vs.edi->atab);
00749       ArcKey ak;
00750       EntryInfo* ei;
00751       while (iter.Next(ak, ei)) {
00752         free(ei->name);
00753         delete ei;
00754       }
00755       free(vs.edi->hostname);
00756       free(vs.edi->port);
00757       delete vs.edi;
00758     }
00759     return retain;
00760 }
00761 
00762 void
00763 VDirEvaluator::rebuildCallback(void* closure, VMemPool::typeCode type,
00764                                void* addr, Bit32& size) throw ()
00765 {
00766   // Compute size
00767   VDirEvaluator vs(VestaSource::evaluatorDirectory, (Bit8*) addr);
00768   size = VDIREV_SIZE;
00769 
00770   // Update cached pointer to this rep.  This is really needed only
00771   // after reading a checkpoint, but also (harmlessly) gets executed
00772   // after a mark and sweep.
00773   vs.edi->rep = vs.rep;
00774 }
00775 
00776 Bit32
00777 VDirEvaluator::checkpoint(Bit32& nextSP, std::fstream& ckpt) throw ()
00778 {
00779   if (visited()) return (Bit32) (PointerInt) repEDI();  // reused field
00780   assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00781 
00782   // Checkpoint cached children
00783   IndexToEntryIter iter(&edi->itab);
00784   IndexKey ik;
00785   EntryInfo* ei;
00786   while (iter.Next(ik, ei)) {
00787     if (ei->type == VestaSource::evaluatorDirectory ||
00788         ei->type == VestaSource::evaluatorROEDirectory) {
00789       VDirEvaluator child(VestaSource::evaluatorDirectory, ei->edi);
00790       child.checkpoint(nextSP, ckpt);
00791     }
00792   }
00793 
00794   // Checkpoint this
00795   Bit32 newSP = nextSP;
00796   ckpt.write((char *) rep, VDIREV_SIZE);
00797   int pad = (-VDIREV_SIZE) & VMemPool::alignmentMask;
00798   nextSP += VDIREV_SIZE + pad;
00799   while (pad--) {
00800     ckpt.put(VMemPool::freeByte);
00801   }
00802   // Crash if writing to the checkpoint file is failing
00803   if (!ckpt.good()) {
00804     Repos::dprintf(DBG_ALWAYS, "write to checkpoint file failed: errno %d\n", errno);
00805     assert(ckpt.good());  // crash
00806   }
00807   setVisited(true);
00808   setRepEDI(reinterpret_cast<EvalDirInfo*>(newSP));  //reuse (smash) this field
00809 
00810   return newSP;
00811 }
00812 
00813 // Requires a write lock on the volatile tree this directory is in.
00814 void
00815 VDirEvaluator::freeTree() throw ()
00816 {
00817     assert(VMemPool::type(rep) == VMemPool::vDirEvaluator);
00818     ArcToEntryIter iter(&edi->atab);
00819     ArcKey ak;
00820     EntryInfo* ei;
00821     while (iter.Next(ak, ei)) {
00822         if (ei->type == VestaSource::evaluatorDirectory ||
00823             ei->type == VestaSource::evaluatorROEDirectory) {
00824           VDirEvaluator vde(VestaSource::evaluatorDirectory, ei->edi);
00825           vde.freeTree();
00826         }
00827         free(ei->name);
00828         delete ei;
00829     }
00830     VMemPool::free(rep, VDIREV_SIZE, VMemPool::vDirEvaluator);
00831     free(edi->hostname);
00832     free(edi->port);
00833     delete edi;
00834 }
00835 
00836 bool
00837 VDirEvaluator::alive()
00838 {
00839   if (!(*edi->alive)) return false;
00840   MultiSRPC::ConnId id = -1;
00841   try {
00842     SRPC *srpc;
00843     id = multi->Start((Text) edi->hostname, (Text) edi->port, srpc);
00844     if (!srpc->alive()) {
00845       multi->Discard(id);
00846       multi->Purge(edi->hostname, edi->port);
00847       *edi->alive = false;      
00848     }
00849   } catch (SRPC::failure f) {
00850     *edi->alive = false;
00851   }
00852   multi->End(id);
00853   return *edi->alive;
00854 }
00855 
00856 // Purge cached connections for talking to this evaluator
00857 void
00858 VDirEvaluator::purge()
00859 {
00860   try
00861     {
00862       multi->Purge(edi->hostname, edi->port);
00863     }
00864   catch(const SRPC::failure &f)
00865     {
00866       Repos::dprintf(DBG_ALWAYS,
00867                      "VDirEvaluator::purge (client %s:%s) "
00868                      "caught SRPC::failure: %s (%d)\n", 
00869                      edi->hostname, edi->port, 
00870                      f.msg.cchars(), f.r);
00871     }
00872 }
00873 
00874 time_t
00875 VDirEvaluator::timestamp() throw ()
00876 {
00877   return edi->timestamp;
00878 }

Generated on Mon May 8 00:48:49 2006 for Vesta by  doxygen 1.4.2