Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

dupe.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 //
00020 // dupe.C
00021 // Last modified on Mon May 23 21:46:28 EDT 2005 by ken@xorian.net  
00022 //      modified on Tue Aug  7 19:41:11 PDT 2001 by mann  
00023 //      modified on Mon Apr 28 17:30:26 PDT 1997 by heydon
00024 //
00025 // ONC RPC duplicate suppression:
00026 // (1) Detects when an RPC we are still processing is retransmitted
00027 //     and drops the retransmission on the floor.
00028 // (2) Detects when an RPC we recently replied to is retransmitted
00029 //     and retransmits the reply from its cache.
00030 //
00031 
00032 #if __linux__
00033 #include <stdint.h>
00034 #endif
00035 #include <pthread.h>
00036 #include <errno.h>
00037 #include <sys/socket.h>
00038 
00039 #include <Basics.H>
00040 #include <VestaConfig.H>
00041 
00042 #include "dupe.H"
00043 extern "C" {
00044 #include "svc_udp.h"
00045 }
00046 #include "logging.H"
00047 
00048 //
00049 // Module types
00050 //
00051 class RPCKey {
00052   public:
00053     unsigned int xid;
00054     struct sockaddr_in caller;
00055     Word Hash() const throw() {
00056         return ((Word) xid) ^ ((Word) caller.sin_port)
00057           ^ ((Word) caller.sin_addr.s_addr);
00058     };
00059     inline RPCKey() { };
00060     inline RPCKey(unsigned int xid_, struct sockaddr_in& caller_) {
00061         xid = xid_; caller = caller_;
00062     };
00063     friend int operator==(RPCKey k1, RPCKey k2) {
00064         return k1.xid == k2.xid && k1.caller.sin_port == k2.caller.sin_port &&
00065             k1.caller.sin_addr.s_addr == k2.caller.sin_addr.s_addr;
00066     };
00067 };
00068  
00069 class RPCValue {
00070   public:
00071     RPCKey key;
00072     char* reply; // if NULL, this request is still being processed
00073     int replylen;
00074     RPCValue* older;
00075     RPCValue* newer;
00076     inline RPCValue() { };
00077     inline RPCValue(unsigned int xid_, struct sockaddr_in& caller_,
00078                     char* reply_, int replylen_)
00079       : key(xid_, caller_)
00080     {
00081         reply = reply_; replylen = replylen_;
00082         older = NULL; newer = NULL;
00083     };
00084 };
00085 
00086 typedef Table<RPCKey, RPCValue*>::Default RPCTable;
00087 
00088 class DupeTable
00089 {
00090 private:
00091   Basics::mutex mu;
00092   RPCTable dupeTable;
00093   int dupeTableCount;
00094   RPCValue* oldestDupe;
00095   RPCValue* newestDupe;
00096   ReposStats::DupeStats dupeStats;
00097 
00098 public:
00099   DupeTable()
00100     : dupeTableCount(0), oldestDupe(NULL), newestDupe(NULL)
00101   {
00102   }
00103 
00104   bool new_rpc(SVCXPRT* xprt, struct rpc_msg* msg);
00105 
00106   void completed_rpc(SVCXPRT* xprt, struct rpc_msg* msg, int replylen);
00107 
00108   inline ReposStats::DupeStats get_stats()
00109   {
00110     ReposStats::DupeStats result;
00111     this->mu.lock();
00112     result = this->dupeStats;
00113     this->mu.unlock();
00114     return result;
00115   }
00116 
00117 private:
00118   // Hide copy constructor, which should never be used.
00119   DupeTable(const DupeTable &);
00120       
00121 };
00122 
00123 //
00124 // Module globals
00125 //
00126 static pthread_once_t once = PTHREAD_ONCE_INIT;
00127 
00128 // Maximum number of past responses to keep.
00129 static unsigned int dupeTableMax = 20;
00130 
00131 // Number of bits from the client IP address and RPC XID so use in
00132 // chosing the DupeTable to use.  There will be
00133 // 2^(dupe_hash_ip_bits+dupe_hash_xid_bits) tables.
00134 static unsigned int dupe_hash_ip_bits = 0, dupe_hash_xid_bits = 2;
00135 static unsigned char dupe_hash_ip_mask, dupe_hash_xid_mask;
00136 
00137 static unsigned int dupeTableCount;
00138 DupeTable *dupeTables = NULL;
00139 
00140 //
00141 // One-time initialization
00142 //
00143 extern "C"
00144 {
00145   static void
00146   dupe_init()
00147   {
00148     Text value;
00149     if (VestaConfig::get("Repository", "dupe_table_max", value)) {
00150         dupeTableMax = atoi(value.cchars());
00151     }
00152     if (VestaConfig::get("Repository", "dupe_hash_ip_bits", value)) {
00153         dupe_hash_ip_bits = atoi(value.cchars());
00154         assert(dupe_hash_ip_bits <= 8);
00155     }
00156     if (VestaConfig::get("Repository", "dupe_hash_xid_bits", value)) {
00157         dupe_hash_xid_bits = atoi(value.cchars());
00158         assert(dupe_hash_xid_bits <= 8);
00159     }
00160 
00161     // Generate masks for the requested number of bits of IP and XID.
00162     dupe_hash_ip_mask = 0;
00163     for(unsigned int i = 0; i < dupe_hash_ip_bits; i++)
00164       {
00165         dupe_hash_ip_mask |= (1 << i);
00166       }
00167     dupe_hash_xid_mask = 0;
00168     for(unsigned int i = 0; i < dupe_hash_xid_bits; i++)
00169       {
00170         dupe_hash_xid_mask |= (1 << i);
00171       }
00172 
00173     // Compute the number of duplicate suppression tables we will
00174     // have.
00175     dupeTableCount = (1 << (dupe_hash_ip_bits+dupe_hash_xid_bits));
00176     assert(dupeTableCount > 0);
00177     assert((dupe_hash_ip_mask | (dupe_hash_xid_mask << dupe_hash_ip_bits)) <
00178            dupeTableCount);
00179 
00180     dupeTables = NEW_ARRAY(DupeTable, dupeTableCount);
00181   }
00182 }
00183 
00184 static unsigned int dupeTableIndex(unsigned int xid,
00185                                    const struct sockaddr_in &caller)
00186 {
00187   // Which bits of the XID change more rapidly may depend on the byte
00188   // order of the sender.  To try to get a small number of
00189   // rapidly-changing bits from the XID, XOR the byte of the XID
00190   // together.
00191   unsigned char xid_byte = xid;
00192   xid_byte ^= (xid >> 8);
00193   xid_byte ^= (xid >> 16);
00194   xid_byte ^= (xid >> 24);
00195   assert(sizeof(xid) == 4);
00196 
00197   // We use the low byte of the IP address, as those bits are more
00198   // likely to be different between clients.  (Note: assumes IPv4!)
00199   unsigned char ip_byte = ntohl(caller.sin_addr.s_addr);
00200 
00201   // Mask off unwanted bits.
00202   xid_byte &= dupe_hash_xid_mask;
00203   ip_byte &= dupe_hash_ip_mask;
00204 
00205   // Concatenate together the bits from the XID and the IP address.
00206   unsigned int result = ((((unsigned int) xid_byte) << dupe_hash_ip_bits) |
00207                          ((unsigned int) ip_byte));
00208 
00209   assert(result < dupeTableCount);
00210   return result;
00211 }
00212 
00213 //
00214 // If the RPC is a duplicate, reply to it and return false.  If not,
00215 // note that the RPC is in progress and return true.  In the latter
00216 // case the caller must invoke completed_rpc after replying to the
00217 // call.
00218 //
00219 extern "C" int
00220 new_rpc(SVCXPRT* xprt, struct rpc_msg* msg)
00221 {
00222   pthread_once(&once, dupe_init);
00223 
00224   unsigned int table_index = dupeTableIndex(msg->rm_xid, xprt->xp_raddr);
00225   assert(dupeTables != NULL);
00226   return dupeTables[table_index].new_rpc(xprt, msg);
00227 }
00228 
00229 bool DupeTable::new_rpc(SVCXPRT* xprt, struct rpc_msg* msg)
00230 {
00231   RPCValue* oldv;
00232   RPCValue* newv = NEW_CONSTR(RPCValue,
00233                               (msg->rm_xid, xprt->xp_raddr, NULL, 0));
00234   this->mu.lock();
00235   bool isDupe = this->dupeTable.Get(newv->key, oldv);
00236   if (isDupe) {
00237     delete newv;
00238     if (oldv->reply == NULL) {
00239       // Currently in process; drop request on the floor
00240       this->dupeStats.inProcess++;
00241     } else {
00242       // Already done; retransmit reply
00243       this->dupeStats.completed++;
00244       (void) sendto(xprt->xp_sock, oldv->reply, oldv->replylen,
00245                     0, (struct sockaddr*) &xprt->xp_raddr,
00246                     xprt->xp_addrlen);
00247       // Remove from LRU list
00248       if (oldv->older == NULL) {
00249         assert(this->oldestDupe == oldv);
00250         this->oldestDupe = oldv->newer;
00251       } else {
00252         oldv->older->newer = oldv->newer;
00253       }
00254       if (oldv->newer == NULL) {
00255         assert(this->newestDupe == oldv);
00256         this->newestDupe = oldv->older;
00257       } else {
00258         oldv->newer->older = oldv->older;
00259       }
00260       // Add to "newest" end
00261       oldv->newer = NULL;
00262       oldv->older = this->newestDupe;
00263       this->newestDupe = oldv;
00264       if (oldv->older == NULL) {
00265         this->oldestDupe = oldv;
00266       } else {
00267         assert(oldv->older->newer == NULL);
00268         oldv->older->newer = oldv;
00269       }
00270     }
00271   } else {
00272     // New request; remember it's in process
00273     this->dupeStats.non++;
00274     (void) this->dupeTable.Put(newv->key, newv);
00275   }
00276   this->mu.unlock();
00277   return !isDupe;
00278 }
00279 
00280 //
00281 // Note that this RPC is completed and cache its result, so that
00282 // new_rpc will retransmit the result if we receive a retransmitted
00283 // call.
00284 //
00285 extern "C" void
00286 completed_rpc(SVCXPRT* xprt, struct rpc_msg* msg, int replylen)
00287 {
00288   pthread_once(&once, dupe_init);
00289 
00290   unsigned int table_index = dupeTableIndex(msg->rm_xid, xprt->xp_raddr);
00291   assert(dupeTables != NULL);
00292 
00293   dupeTables[table_index].completed_rpc(xprt, msg, replylen);
00294 }
00295 
00296 void DupeTable::completed_rpc(SVCXPRT* xprt, struct rpc_msg* msg, int replylen)
00297 {
00298   RPCKey k(msg->rm_xid, xprt->xp_raddr);
00299   RPCValue* v;
00300   this->mu.lock();
00301 
00302   bool inProcess = this->dupeTable.Get(k, v);
00303   assert(inProcess);
00304   assert(v->reply == NULL);
00305 
00306   // If list is full, delete oldest element, but keep reply buffer
00307   char* savedbuf = NULL;
00308   if (this->dupeTableCount >= dupeTableMax) {
00309     assert(this->oldestDupe != NULL);  // we just added something!
00310     RPCValue* dying = this->oldestDupe;
00311     this->oldestDupe = this->oldestDupe->newer;
00312     if (this->oldestDupe == NULL) {
00313       this->newestDupe = NULL;
00314     } else {
00315       this->oldestDupe->older = NULL;
00316     }
00317     savedbuf = dying->reply;
00318     RPCValue* junk;
00319     this->dupeTable.Delete(dying->key, junk, false);
00320     assert(junk == dying);
00321     delete dying;
00322   } else {
00323     this->dupeTableCount++;
00324   }
00325 
00326   // Save this reply
00327   v->reply = rpc_buffer(xprt);
00328   v->replylen = replylen;
00329   if (savedbuf == NULL) {
00330     rpc_buffer(xprt) = (char*) malloc(su_data(xprt)->su_iosz);
00331   } else {
00332     rpc_buffer(xprt) = savedbuf;
00333   }
00334   xdrmem_create(&(su_data(xprt)->su_xdrs), rpc_buffer(xprt),
00335                 su_data(xprt)->su_iosz, XDR_ENCODE);
00336 
00337   // Add to "newest" end of LRU list
00338   v->newer = NULL;
00339   v->older = this->newestDupe;
00340   this->newestDupe = v;
00341   if (v->older == NULL) {
00342     this->oldestDupe = v;
00343   } else {
00344     assert(v->older->newer == NULL);
00345     v->older->newer = v;
00346   }
00347 
00348   this->mu.unlock();
00349 }
00350 
00351 ReposStats::DupeStats get_dupe_stats() throw()
00352 {
00353   ReposStats::DupeStats result;
00354   for(unsigned int index = 0; index < dupeTableCount; index++)
00355     {
00356       result += dupeTables[index].get_stats();
00357     }
00358   return result;
00359 }

Generated on Mon May 8 00:48:44 2006 for Vesta by  doxygen 1.4.2