Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

LimService.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 // Last modified on Sun May 22 22:47:50 EDT 2005 by ken@xorian.net         
00020 //      modified on Fri Apr 22 15:01:07 EDT 2005 by irina.furman@intel.com 
00021 //      modified on Fri Feb 11 19:52:54 PST 2000 by mann   
00022 //      modified on Mon Jun 16 11:07:43 PDT 1997 by heydon 
00023 //      modified on Thu Aug  1 14:45:35 PDT 1996 by levin  
00024 
00025 #include <signal.h>
00026 #include <Basics.H>
00027 
00028 #include "SRPC.H"
00029 #include "LimService.H"
00030 
00031 using std::cerr;
00032 
00033 // fatal programmer error
00034 class FatalError {
00035   public: FatalError() { /*EMPTY*/ }
00036 };
00037 
00038 static void WriteFatalError(const SRPC::failure& f) throw ()
00039 {
00040     cerr << "Fatal SRPC failure in LimService:\n";
00041     cerr << "  " << f.msg << " (code = " << f.r << ")\n";
00042     cerr.flush();
00043     // It seems that this function is lying about what it can throw in
00044     // order to force program termination (under the assumption the
00045     // the C++ runtime will handle unexpected exceptions by aborting
00046     // execution).  Questionable form at best.
00047     throw FatalError();
00048 }
00049 
00050 LimService::LimService(
00051   const Text &intfName, const int intfVersion,
00052   int maxRunning, int maxBlocked, LimService_Callback callback,
00053   LimService_FailureCallback failureCallback, void *arg, long stacksize,
00054   const Text &hostName, int failCode, const Text &failMessage)
00055   throw ()
00056 : intfName(intfName), intfVersion(intfVersion), maxRunning(maxRunning),
00057   maxTotal(maxRunning + maxBlocked), callback(callback),
00058   failureCallback(failureCallback), arg(arg),
00059   hostName(hostName), failCode(failCode), failMessage(failMessage),
00060   sock(NULL)
00061 {
00062   if(stacksize > 0)
00063     worker_attributes.set_stacksize(stacksize);
00064 
00065     this->callback2 = NULL;
00066     // initialize synchronization data
00067     this->mu.lock();
00068     this->numRunning = this->numBlocked = 0;
00069     this->idleWorkers = (WorkerList *)NULL;
00070     /* Note: "server_state_change" and "server_failure" are initialized
00071        by the "Forked_Run" method. */
00072     this->mu.unlock();
00073 }
00074 
00075 LimService::LimService(
00076   const int intfVersion,
00077   int maxRunning, int maxBlocked, LimService_Callback callback,
00078   LimService_FailureCallback failureCallback, void *arg, long stacksize,
00079   const Text &hostName, int failCode, const Text &failMessage)
00080   throw ()
00081 : intfVersion(intfVersion), maxRunning(maxRunning),
00082   maxTotal(maxRunning + maxBlocked), callback(callback),
00083   failureCallback(failureCallback), arg(arg),
00084   hostName(hostName), failCode(failCode), failMessage(failMessage)
00085 {
00086   if(stacksize > 0)
00087     worker_attributes.set_stacksize(stacksize);
00088 
00089     this->callback2 = NULL;
00090     // initialize "intfName" and "sock"
00091     this->sock = NEW(TCP_sock);   // choose a port number
00092     sockaddr_in sin;
00093     this->sock->get_local_addr(/*OUT*/ sin);
00094     char buff[10];
00095     sprintf(buff, "%d", ntohs(sin.sin_port));
00096     this->intfName = Text(buff);
00097 
00098     // initialize synchronization data
00099     this->mu.lock();
00100     this->numRunning = this->numBlocked = 0;
00101     this->idleWorkers = (WorkerList *)NULL;
00102     /* Note: "server_state_change" and "server_failure" are initialized
00103        by the "Forked_Run" method. */
00104     this->mu.unlock();
00105 }
00106 
00107 LimService::LimService(
00108   const Text &intfName,
00109   int maxRunning, int maxBlocked, LimService_Callback2 callback,
00110   LimService_FailureCallback failureCallback, void *arg, long stacksize,
00111   const Text &hostName, int failCode, const Text &failMessage)
00112   throw ()
00113 : intfName(intfName), maxRunning(maxRunning),
00114   maxTotal(maxRunning + maxBlocked), callback2(callback),
00115   failureCallback(failureCallback), arg(arg),
00116   hostName(hostName), failCode(failCode), failMessage(failMessage),
00117   sock(NULL)
00118 {
00119   if(stacksize > 0)
00120     worker_attributes.set_stacksize(stacksize);
00121 
00122     this->intfVersion = SRPC::default_intf_version;
00123     this->callback = NULL;
00124     // initialize synchronization data
00125     this->mu.lock();
00126     this->numRunning = this->numBlocked = 0;
00127     this->idleWorkers = (WorkerList *)NULL;
00128     /* Note: "server_state_change" and "server_failure" are initialized
00129        by the "Forked_Run" method. */
00130     this->mu.unlock();
00131 }
00132 
00133 LimService::LimService(
00134   const Text &intfName,
00135   int maxRunning, int maxBlocked, LimService_Callback2 callback,
00136   LimService_FailureCallback failureCallback, void *arg,
00137   const Basics::thread_attr &worker_attr,
00138   const Text &hostName, int failCode, const Text &failMessage)
00139   throw ()
00140 : intfName(intfName), maxRunning(maxRunning),
00141   maxTotal(maxRunning + maxBlocked), callback2(callback),
00142   failureCallback(failureCallback), arg(arg),
00143   hostName(hostName), failCode(failCode), failMessage(failMessage),
00144   sock(NULL), worker_attributes(worker_attr)
00145 {
00146     this->intfVersion = SRPC::default_intf_version;
00147     this->callback = NULL;
00148     // initialize synchronization data
00149     this->mu.lock();
00150     this->numRunning = this->numBlocked = 0;
00151     this->idleWorkers = (WorkerList *)NULL;
00152     /* Note: "server_state_change" and "server_failure" are initialized
00153        by the "Forked_Run" method. */
00154     this->mu.unlock();
00155 }
00156 
00157 LimService::LimService(
00158   int maxRunning, int maxBlocked, LimService_Callback2 callback,
00159   LimService_FailureCallback failureCallback, void *arg, long stacksize,
00160   const Text &hostName, int failCode, const Text &failMessage)
00161   throw ()
00162 : maxRunning(maxRunning),
00163   maxTotal(maxRunning + maxBlocked), callback2(callback),
00164   failureCallback(failureCallback), arg(arg),
00165   hostName(hostName), failCode(failCode), failMessage(failMessage)
00166 {
00167   if(stacksize > 0)
00168     worker_attributes.set_stacksize(stacksize);
00169 
00170     this->intfVersion = SRPC::default_intf_version;
00171     this->callback = NULL;
00172     // initialize "intfName" and "sock"
00173     this->sock = NEW(TCP_sock);   // choose a port number
00174     sockaddr_in sin;
00175     this->sock->get_local_addr(/*OUT*/ sin);
00176     char buff[10];
00177     sprintf(buff, "%d", ntohs(sin.sin_port));
00178     this->intfName = Text(buff);
00179 
00180     // initialize synchronization data
00181     this->mu.lock();
00182     this->numRunning = this->numBlocked = 0;
00183     this->idleWorkers = (WorkerList *)NULL;
00184     /* Note: "server_state_change" and "server_failure" are initialized
00185        by the "Forked_Run" method. */
00186     this->mu.unlock();
00187 }
00188 
00189 void LimService::Run() throw (SRPC::failure)
00190 {
00191     while (true) {
00192         // create a new SRPC to listen on (this is non-blocking)
00193         // This will throw an exception to the caller in the event of failure.
00194         SRPC *srpc = (sock == NULL)
00195           ? NEW_CONSTR(SRPC, (SRPC::callee, intfName, hostName))
00196           : NEW_CONSTR(SRPC, (SRPC::callee, sock));
00197 
00198         // signal "Forked_Run" that listener is active
00199         mu.lock();
00200         server_state_change = true;
00201         state_change.signal();
00202         mu.unlock();
00203 
00204         // wait for a connection
00205         try {
00206             int procId = SRPC::default_proc_id;
00207             int intfVer = intfVersion;
00208             srpc->await_call(/*INOUT*/ procId, /*INOUT*/ intfVer);
00209 
00210             // invoke a new worker thread to handle the call
00211             LSWorker *worker = NewWorker();
00212             worker->Start(srpc, intfVer, procId);
00213         }
00214         catch (SRPC::failure f) {
00215             // Invoke the user-supplied failure callback
00216             if (this->failureCallback != NULL) {
00217                 (this->failureCallback)(srpc, f, this->arg);
00218             }
00219             // clean up the SRPC connection
00220             delete srpc;
00221             // go around the loop, creating a new listener
00222         }
00223     }  // while
00224 }
00225 
00226 void LimService::HandleFailure(const SRPC::failure& f) throw ()
00227 {
00228     this->mu.lock();
00229     this->server_failure = f;
00230     this->server_state_change = true;
00231     this->state_change.signal();
00232     this->mu.unlock();
00233     // Should probably call client's failure proc, but only
00234     // if failure occurred after Forked_Run has gone away.
00235 }
00236 
00237 void* LimService_StartServer(void *arg) throw ()
00238 // "Forked_Run" thread closure
00239 {
00240     LimService *ls = (LimService *)arg;
00241     try {
00242         ls->Run();
00243     } catch (SRPC::failure f) {
00244         ls->HandleFailure(f);
00245     }
00246     return (void *)NULL;
00247 }
00248 
00249 Basics::thread LimService::Forked_Run() throw (SRPC::failure)
00250 {
00251     Basics::thread th;
00252     this->mu.lock();
00253     this->server_failure = SRPC::failure(0, "");
00254     this->server_state_change = false;
00255     this->mu.unlock();
00256     Basics::thread_attr listener_attr(worker_attributes);
00257     if(listener_attr.get_stacksize() < 100000)
00258       listener_attr.set_stacksize(100000);
00259     th.fork(LimService_StartServer, (void *)this, listener_attr);
00260     this->mu.lock();
00261     while (!(this->server_state_change)) {
00262         this->state_change.wait(this->mu);
00263     }
00264     SRPC::failure f = this->server_failure; // save failure with "mu" held
00265     this->mu.unlock();
00266     if (f.r != 0) throw(f);
00267     return th;
00268 }
00269 
00270 void LimService::DecNumRunning() throw ()
00271 {
00272     this->mu.lock();
00273     this->numRunning--;
00274     assert(this->numRunning < this->maxRunning);
00275     this->mu.unlock();
00276     this->server_avail.signal();
00277 }
00278 
00279 const int LSErrorCode = -15;
00280 const Text LSErrorMsg =
00281   Text("LimService error: callback should have thrown SRPC::failure");
00282 
00283 void LimService::NewConnection(SRPC *srpc, int intfVersion, int procId)
00284      throw ()
00285 {
00286     try {
00287         while (true) {
00288             // check that we have enough resources to run; if not, send failure
00289             this->mu.lock();
00290             if (this->numRunning + this->numBlocked >= this->maxTotal) {
00291                 // signal failure and exit loop
00292                 this->mu.unlock();
00293                 srpc->send_failure(this->failCode, this->failMessage);
00294                 break;
00295             }
00296             // on entry: if any threads are blocked, get in line
00297             if (this->numBlocked > 0) {
00298                 (this->numBlocked)++;
00299                 this->server_avail.wait(this->mu);
00300                 (this->numBlocked)--;
00301             }
00302             // block on condition variable if at max # of running threads
00303             while (this->numRunning >= this->maxRunning) {
00304                 (this->numBlocked)++;
00305                 this->server_avail.wait(this->mu);
00306                 (this->numBlocked)--;
00307             }
00308             (this->numRunning)++;
00309             this->mu.unlock();
00310 
00311             // call callback function
00312             try {
00313               if (this->callback) {
00314                 (this->callback)(srpc, procId, this->arg);
00315               } else {
00316                 (this->callback2)(srpc, intfVersion, procId, this->arg);
00317               }
00318               DecNumRunning();
00319             }
00320             catch (...) {
00321                 DecNumRunning();
00322                 throw;
00323             }
00324 
00325             // wait for another connection on "srpc"
00326             SRPC::failure f;
00327             if (!srpc->previous_failure(&f)) {
00328                 procId = SRPC::default_proc_id;
00329                 intfVersion = this->intfVersion;
00330                 srpc->await_call(/*INOUT*/ procId, /*INOUT*/ intfVersion);
00331             } else
00332                 throw(SRPC::failure(LSErrorCode,
00333                                     LSErrorMsg + '(' + f.msg + ')'));
00334         } // while
00335     } // try
00336     catch (SRPC::failure f) {
00337         // If the client caught the "SRPC::failure" exception and failed to
00338         // re-raise it, report a programmer error and crash.
00339         if (f.r == LSErrorCode) {
00340             // A programmer error occurred -- crash!
00341             WriteFatalError(f);
00342         } else {
00343             // otherwise, invoke the user-supplied failure callback
00344             if (this->failureCallback != NULL) {
00345                 (this->failureCallback)(srpc, f, this->arg);
00346             }
00347         }
00348     }
00349 }
00350 
00351 LSWorker *LimService::NewWorker() throw ()
00352 {
00353     LSWorker *res;
00354     this->mu.lock();
00355     if (this->idleWorkers == NULL) {
00356         // create new LSWorker object
00357         res = NEW_CONSTR(LSWorker, (this, this->worker_attributes));
00358     } else {
00359         // get LSWorker off avail list
00360         WorkerList *w = this->idleWorkers;
00361         res = w->worker;
00362         this->idleWorkers = w->next;
00363         delete w;
00364     }
00365     this->mu.unlock();
00366     return res;
00367 }
00368 
00369 void LimService::RegisterIdleWorker(LSWorker *worker) throw ()
00370 {
00371     // clean up thread
00372     worker->Finish();
00373 
00374     // add it to front of list
00375     WorkerList *cons = NEW(WorkerList);
00376     cons->worker = worker;
00377     this->mu.lock();
00378     cons->next = this->idleWorkers;
00379     this->idleWorkers = cons;
00380     this->mu.unlock();
00381 }
00382 
00383 // -------------------------------------------------------------------------
00384 
00385 LSWorker::LSWorker(LimService *ls, const Basics::thread_attr &attr) throw ()
00386   : ls(ls), argsReady(false)
00387 {
00388     // fork the thread that does the actual work
00389     this->th.fork_and_detach(LimService_Worker, (void *)this, attr);
00390 }
00391 
00392 void* LimService_Worker(void* arg) throw ()
00393 {
00394     LSWorker *worker = (LSWorker *)arg;
00395     while (true) {
00396         // wait for work to do
00397         worker->mu.lock();
00398         while (!(worker->argsReady)) {
00399             worker->workToDo.wait(worker->mu);
00400         }
00401         worker->argsReady = false;
00402         worker->mu.unlock();
00403 
00404         // do the work
00405         worker->ls->NewConnection(worker->srpc, worker->intfVersion,
00406                                   worker->procId);
00407 
00408         // restore thread to avail list
00409         worker->ls->RegisterIdleWorker(worker);
00410     }
00411     //return (void *)NULL; // not reached
00412 }
00413 
00414 void LSWorker::Start(SRPC *srpc, int intfVer, int procId) throw ()
00415 {
00416     // save arguments
00417     this->srpc = srpc;
00418     this->intfVersion = intfVer;
00419     this->procId = procId;
00420 
00421     // signal the thread to go
00422     this->mu.lock();
00423     this->argsReady = true;
00424     this->mu.unlock();
00425     this->workToDo.signal();
00426 }
00427 
00428 void LSWorker::Finish() throw ()
00429 {
00430     // clean up the SRPC connection
00431     delete this->srpc;
00432     this->srpc = (SRPC *)NULL;
00433 }

Generated on Mon May 8 00:48:57 2006 for Vesta by  doxygen 1.4.2