00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #if __linux__
00033 #include <stdint.h>
00034 #endif
00035 #include <pthread.h>
00036 #include <errno.h>
00037 #include <sys/socket.h>
00038
00039 #include <Basics.H>
00040 #include <VestaConfig.H>
00041
00042 #include "dupe.H"
00043 extern "C" {
00044 #include "svc_udp.h"
00045 }
00046 #include "logging.H"
00047
00048
00049
00050
00051 class RPCKey {
00052 public:
00053 unsigned int xid;
00054 struct sockaddr_in caller;
00055 Word Hash() const throw() {
00056 return ((Word) xid) ^ ((Word) caller.sin_port)
00057 ^ ((Word) caller.sin_addr.s_addr);
00058 };
00059 inline RPCKey() { };
00060 inline RPCKey(unsigned int xid_, struct sockaddr_in& caller_) {
00061 xid = xid_; caller = caller_;
00062 };
00063 friend int operator==(RPCKey k1, RPCKey k2) {
00064 return k1.xid == k2.xid && k1.caller.sin_port == k2.caller.sin_port &&
00065 k1.caller.sin_addr.s_addr == k2.caller.sin_addr.s_addr;
00066 };
00067 };
00068
00069 class RPCValue {
00070 public:
00071 RPCKey key;
00072 char* reply;
00073 int replylen;
00074 RPCValue* older;
00075 RPCValue* newer;
00076 inline RPCValue() { };
00077 inline RPCValue(unsigned int xid_, struct sockaddr_in& caller_,
00078 char* reply_, int replylen_)
00079 : key(xid_, caller_)
00080 {
00081 reply = reply_; replylen = replylen_;
00082 older = NULL; newer = NULL;
00083 };
00084 };
00085
00086 typedef Table<RPCKey, RPCValue*>::Default RPCTable;
00087
00088 class DupeTable
00089 {
00090 private:
00091 Basics::mutex mu;
00092 RPCTable dupeTable;
00093 int dupeTableCount;
00094 RPCValue* oldestDupe;
00095 RPCValue* newestDupe;
00096 ReposStats::DupeStats dupeStats;
00097
00098 public:
00099 DupeTable()
00100 : dupeTableCount(0), oldestDupe(NULL), newestDupe(NULL)
00101 {
00102 }
00103
00104 bool new_rpc(SVCXPRT* xprt, struct rpc_msg* msg);
00105
00106 void completed_rpc(SVCXPRT* xprt, struct rpc_msg* msg, int replylen);
00107
00108 inline ReposStats::DupeStats get_stats()
00109 {
00110 ReposStats::DupeStats result;
00111 this->mu.lock();
00112 result = this->dupeStats;
00113 this->mu.unlock();
00114 return result;
00115 }
00116
00117 private:
00118
00119 DupeTable(const DupeTable &);
00120
00121 };
00122
00123
00124
00125
00126 static pthread_once_t once = PTHREAD_ONCE_INIT;
00127
00128
00129 static unsigned int dupeTableMax = 20;
00130
00131
00132
00133
00134 static unsigned int dupe_hash_ip_bits = 0, dupe_hash_xid_bits = 2;
00135 static unsigned char dupe_hash_ip_mask, dupe_hash_xid_mask;
00136
00137 static unsigned int dupeTableCount;
00138 DupeTable *dupeTables = NULL;
00139
00140
00141
00142
00143 extern "C"
00144 {
00145 static void
00146 dupe_init()
00147 {
00148 Text value;
00149 if (VestaConfig::get("Repository", "dupe_table_max", value)) {
00150 dupeTableMax = atoi(value.cchars());
00151 }
00152 if (VestaConfig::get("Repository", "dupe_hash_ip_bits", value)) {
00153 dupe_hash_ip_bits = atoi(value.cchars());
00154 assert(dupe_hash_ip_bits <= 8);
00155 }
00156 if (VestaConfig::get("Repository", "dupe_hash_xid_bits", value)) {
00157 dupe_hash_xid_bits = atoi(value.cchars());
00158 assert(dupe_hash_xid_bits <= 8);
00159 }
00160
00161
00162 dupe_hash_ip_mask = 0;
00163 for(unsigned int i = 0; i < dupe_hash_ip_bits; i++)
00164 {
00165 dupe_hash_ip_mask |= (1 << i);
00166 }
00167 dupe_hash_xid_mask = 0;
00168 for(unsigned int i = 0; i < dupe_hash_xid_bits; i++)
00169 {
00170 dupe_hash_xid_mask |= (1 << i);
00171 }
00172
00173
00174
00175 dupeTableCount = (1 << (dupe_hash_ip_bits+dupe_hash_xid_bits));
00176 assert(dupeTableCount > 0);
00177 assert((dupe_hash_ip_mask | (dupe_hash_xid_mask << dupe_hash_ip_bits)) <
00178 dupeTableCount);
00179
00180 dupeTables = NEW_ARRAY(DupeTable, dupeTableCount);
00181 }
00182 }
00183
00184 static unsigned int dupeTableIndex(unsigned int xid,
00185 const struct sockaddr_in &caller)
00186 {
00187
00188
00189
00190
00191 unsigned char xid_byte = xid;
00192 xid_byte ^= (xid >> 8);
00193 xid_byte ^= (xid >> 16);
00194 xid_byte ^= (xid >> 24);
00195 assert(sizeof(xid) == 4);
00196
00197
00198
00199 unsigned char ip_byte = ntohl(caller.sin_addr.s_addr);
00200
00201
00202 xid_byte &= dupe_hash_xid_mask;
00203 ip_byte &= dupe_hash_ip_mask;
00204
00205
00206 unsigned int result = ((((unsigned int) xid_byte) << dupe_hash_ip_bits) |
00207 ((unsigned int) ip_byte));
00208
00209 assert(result < dupeTableCount);
00210 return result;
00211 }
00212
00213
00214
00215
00216
00217
00218
00219 extern "C" int
00220 new_rpc(SVCXPRT* xprt, struct rpc_msg* msg)
00221 {
00222 pthread_once(&once, dupe_init);
00223
00224 unsigned int table_index = dupeTableIndex(msg->rm_xid, xprt->xp_raddr);
00225 assert(dupeTables != NULL);
00226 return dupeTables[table_index].new_rpc(xprt, msg);
00227 }
00228
00229 bool DupeTable::new_rpc(SVCXPRT* xprt, struct rpc_msg* msg)
00230 {
00231 RPCValue* oldv;
00232 RPCValue* newv = NEW_CONSTR(RPCValue,
00233 (msg->rm_xid, xprt->xp_raddr, NULL, 0));
00234 this->mu.lock();
00235 bool isDupe = this->dupeTable.Get(newv->key, oldv);
00236 if (isDupe) {
00237 delete newv;
00238 if (oldv->reply == NULL) {
00239
00240 this->dupeStats.inProcess++;
00241 } else {
00242
00243 this->dupeStats.completed++;
00244 (void) sendto(xprt->xp_sock, oldv->reply, oldv->replylen,
00245 0, (struct sockaddr*) &xprt->xp_raddr,
00246 xprt->xp_addrlen);
00247
00248 if (oldv->older == NULL) {
00249 assert(this->oldestDupe == oldv);
00250 this->oldestDupe = oldv->newer;
00251 } else {
00252 oldv->older->newer = oldv->newer;
00253 }
00254 if (oldv->newer == NULL) {
00255 assert(this->newestDupe == oldv);
00256 this->newestDupe = oldv->older;
00257 } else {
00258 oldv->newer->older = oldv->older;
00259 }
00260
00261 oldv->newer = NULL;
00262 oldv->older = this->newestDupe;
00263 this->newestDupe = oldv;
00264 if (oldv->older == NULL) {
00265 this->oldestDupe = oldv;
00266 } else {
00267 assert(oldv->older->newer == NULL);
00268 oldv->older->newer = oldv;
00269 }
00270 }
00271 } else {
00272
00273 this->dupeStats.non++;
00274 (void) this->dupeTable.Put(newv->key, newv);
00275 }
00276 this->mu.unlock();
00277 return !isDupe;
00278 }
00279
00280
00281
00282
00283
00284
00285 extern "C" void
00286 completed_rpc(SVCXPRT* xprt, struct rpc_msg* msg, int replylen)
00287 {
00288 pthread_once(&once, dupe_init);
00289
00290 unsigned int table_index = dupeTableIndex(msg->rm_xid, xprt->xp_raddr);
00291 assert(dupeTables != NULL);
00292
00293 dupeTables[table_index].completed_rpc(xprt, msg, replylen);
00294 }
00295
00296 void DupeTable::completed_rpc(SVCXPRT* xprt, struct rpc_msg* msg, int replylen)
00297 {
00298 RPCKey k(msg->rm_xid, xprt->xp_raddr);
00299 RPCValue* v;
00300 this->mu.lock();
00301
00302 bool inProcess = this->dupeTable.Get(k, v);
00303 assert(inProcess);
00304 assert(v->reply == NULL);
00305
00306
00307 char* savedbuf = NULL;
00308 if (this->dupeTableCount >= dupeTableMax) {
00309 assert(this->oldestDupe != NULL);
00310 RPCValue* dying = this->oldestDupe;
00311 this->oldestDupe = this->oldestDupe->newer;
00312 if (this->oldestDupe == NULL) {
00313 this->newestDupe = NULL;
00314 } else {
00315 this->oldestDupe->older = NULL;
00316 }
00317 savedbuf = dying->reply;
00318 RPCValue* junk;
00319 this->dupeTable.Delete(dying->key, junk, false);
00320 assert(junk == dying);
00321 delete dying;
00322 } else {
00323 this->dupeTableCount++;
00324 }
00325
00326
00327 v->reply = rpc_buffer(xprt);
00328 v->replylen = replylen;
00329 if (savedbuf == NULL) {
00330 rpc_buffer(xprt) = (char*) malloc(su_data(xprt)->su_iosz);
00331 } else {
00332 rpc_buffer(xprt) = savedbuf;
00333 }
00334 xdrmem_create(&(su_data(xprt)->su_xdrs), rpc_buffer(xprt),
00335 su_data(xprt)->su_iosz, XDR_ENCODE);
00336
00337
00338 v->newer = NULL;
00339 v->older = this->newestDupe;
00340 this->newestDupe = v;
00341 if (v->older == NULL) {
00342 this->oldestDupe = v;
00343 } else {
00344 assert(v->older->newer == NULL);
00345 v->older->newer = v;
00346 }
00347
00348 this->mu.unlock();
00349 }
00350
00351 ReposStats::DupeStats get_dupe_stats() throw()
00352 {
00353 ReposStats::DupeStats result;
00354 for(unsigned int index = 0; index < dupeTableCount; index++)
00355 {
00356 result += dupeTables[index].get_stats();
00357 }
00358 return result;
00359 }