00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <signal.h>
00026 #include <Basics.H>
00027
00028 #include "SRPC.H"
00029 #include "LimService.H"
00030
00031 using std::cerr;
00032
00033
00034 class FatalError {
00035 public: FatalError() { }
00036 };
00037
00038 static void WriteFatalError(const SRPC::failure& f) throw ()
00039 {
00040 cerr << "Fatal SRPC failure in LimService:\n";
00041 cerr << " " << f.msg << " (code = " << f.r << ")\n";
00042 cerr.flush();
00043
00044
00045
00046
00047 throw FatalError();
00048 }
00049
00050 LimService::LimService(
00051 const Text &intfName, const int intfVersion,
00052 int maxRunning, int maxBlocked, LimService_Callback callback,
00053 LimService_FailureCallback failureCallback, void *arg, long stacksize,
00054 const Text &hostName, int failCode, const Text &failMessage)
00055 throw ()
00056 : intfName(intfName), intfVersion(intfVersion), maxRunning(maxRunning),
00057 maxTotal(maxRunning + maxBlocked), callback(callback),
00058 failureCallback(failureCallback), arg(arg),
00059 hostName(hostName), failCode(failCode), failMessage(failMessage),
00060 sock(NULL)
00061 {
00062 if(stacksize > 0)
00063 worker_attributes.set_stacksize(stacksize);
00064
00065 this->callback2 = NULL;
00066
00067 this->mu.lock();
00068 this->numRunning = this->numBlocked = 0;
00069 this->idleWorkers = (WorkerList *)NULL;
00070
00071
00072 this->mu.unlock();
00073 }
00074
00075 LimService::LimService(
00076 const int intfVersion,
00077 int maxRunning, int maxBlocked, LimService_Callback callback,
00078 LimService_FailureCallback failureCallback, void *arg, long stacksize,
00079 const Text &hostName, int failCode, const Text &failMessage)
00080 throw ()
00081 : intfVersion(intfVersion), maxRunning(maxRunning),
00082 maxTotal(maxRunning + maxBlocked), callback(callback),
00083 failureCallback(failureCallback), arg(arg),
00084 hostName(hostName), failCode(failCode), failMessage(failMessage)
00085 {
00086 if(stacksize > 0)
00087 worker_attributes.set_stacksize(stacksize);
00088
00089 this->callback2 = NULL;
00090
00091 this->sock = NEW(TCP_sock);
00092 sockaddr_in sin;
00093 this->sock->get_local_addr( sin);
00094 char buff[10];
00095 sprintf(buff, "%d", ntohs(sin.sin_port));
00096 this->intfName = Text(buff);
00097
00098
00099 this->mu.lock();
00100 this->numRunning = this->numBlocked = 0;
00101 this->idleWorkers = (WorkerList *)NULL;
00102
00103
00104 this->mu.unlock();
00105 }
00106
00107 LimService::LimService(
00108 const Text &intfName,
00109 int maxRunning, int maxBlocked, LimService_Callback2 callback,
00110 LimService_FailureCallback failureCallback, void *arg, long stacksize,
00111 const Text &hostName, int failCode, const Text &failMessage)
00112 throw ()
00113 : intfName(intfName), maxRunning(maxRunning),
00114 maxTotal(maxRunning + maxBlocked), callback2(callback),
00115 failureCallback(failureCallback), arg(arg),
00116 hostName(hostName), failCode(failCode), failMessage(failMessage),
00117 sock(NULL)
00118 {
00119 if(stacksize > 0)
00120 worker_attributes.set_stacksize(stacksize);
00121
00122 this->intfVersion = SRPC::default_intf_version;
00123 this->callback = NULL;
00124
00125 this->mu.lock();
00126 this->numRunning = this->numBlocked = 0;
00127 this->idleWorkers = (WorkerList *)NULL;
00128
00129
00130 this->mu.unlock();
00131 }
00132
00133 LimService::LimService(
00134 const Text &intfName,
00135 int maxRunning, int maxBlocked, LimService_Callback2 callback,
00136 LimService_FailureCallback failureCallback, void *arg,
00137 const Basics::thread_attr &worker_attr,
00138 const Text &hostName, int failCode, const Text &failMessage)
00139 throw ()
00140 : intfName(intfName), maxRunning(maxRunning),
00141 maxTotal(maxRunning + maxBlocked), callback2(callback),
00142 failureCallback(failureCallback), arg(arg),
00143 hostName(hostName), failCode(failCode), failMessage(failMessage),
00144 sock(NULL), worker_attributes(worker_attr)
00145 {
00146 this->intfVersion = SRPC::default_intf_version;
00147 this->callback = NULL;
00148
00149 this->mu.lock();
00150 this->numRunning = this->numBlocked = 0;
00151 this->idleWorkers = (WorkerList *)NULL;
00152
00153
00154 this->mu.unlock();
00155 }
00156
00157 LimService::LimService(
00158 int maxRunning, int maxBlocked, LimService_Callback2 callback,
00159 LimService_FailureCallback failureCallback, void *arg, long stacksize,
00160 const Text &hostName, int failCode, const Text &failMessage)
00161 throw ()
00162 : maxRunning(maxRunning),
00163 maxTotal(maxRunning + maxBlocked), callback2(callback),
00164 failureCallback(failureCallback), arg(arg),
00165 hostName(hostName), failCode(failCode), failMessage(failMessage)
00166 {
00167 if(stacksize > 0)
00168 worker_attributes.set_stacksize(stacksize);
00169
00170 this->intfVersion = SRPC::default_intf_version;
00171 this->callback = NULL;
00172
00173 this->sock = NEW(TCP_sock);
00174 sockaddr_in sin;
00175 this->sock->get_local_addr( sin);
00176 char buff[10];
00177 sprintf(buff, "%d", ntohs(sin.sin_port));
00178 this->intfName = Text(buff);
00179
00180
00181 this->mu.lock();
00182 this->numRunning = this->numBlocked = 0;
00183 this->idleWorkers = (WorkerList *)NULL;
00184
00185
00186 this->mu.unlock();
00187 }
00188
00189 void LimService::Run() throw (SRPC::failure)
00190 {
00191 while (true) {
00192
00193
00194 SRPC *srpc = (sock == NULL)
00195 ? NEW_CONSTR(SRPC, (SRPC::callee, intfName, hostName))
00196 : NEW_CONSTR(SRPC, (SRPC::callee, sock));
00197
00198
00199 mu.lock();
00200 server_state_change = true;
00201 state_change.signal();
00202 mu.unlock();
00203
00204
00205 try {
00206 int procId = SRPC::default_proc_id;
00207 int intfVer = intfVersion;
00208 srpc->await_call( procId, intfVer);
00209
00210
00211 LSWorker *worker = NewWorker();
00212 worker->Start(srpc, intfVer, procId);
00213 }
00214 catch (SRPC::failure f) {
00215
00216 if (this->failureCallback != NULL) {
00217 (this->failureCallback)(srpc, f, this->arg);
00218 }
00219
00220 delete srpc;
00221
00222 }
00223 }
00224 }
00225
00226 void LimService::HandleFailure(const SRPC::failure& f) throw ()
00227 {
00228 this->mu.lock();
00229 this->server_failure = f;
00230 this->server_state_change = true;
00231 this->state_change.signal();
00232 this->mu.unlock();
00233
00234
00235 }
00236
00237 void* LimService_StartServer(void *arg) throw ()
00238
00239 {
00240 LimService *ls = (LimService *)arg;
00241 try {
00242 ls->Run();
00243 } catch (SRPC::failure f) {
00244 ls->HandleFailure(f);
00245 }
00246 return (void *)NULL;
00247 }
00248
00249 Basics::thread LimService::Forked_Run() throw (SRPC::failure)
00250 {
00251 Basics::thread th;
00252 this->mu.lock();
00253 this->server_failure = SRPC::failure(0, "");
00254 this->server_state_change = false;
00255 this->mu.unlock();
00256 Basics::thread_attr listener_attr(worker_attributes);
00257 if(listener_attr.get_stacksize() < 100000)
00258 listener_attr.set_stacksize(100000);
00259 th.fork(LimService_StartServer, (void *)this, listener_attr);
00260 this->mu.lock();
00261 while (!(this->server_state_change)) {
00262 this->state_change.wait(this->mu);
00263 }
00264 SRPC::failure f = this->server_failure;
00265 this->mu.unlock();
00266 if (f.r != 0) throw(f);
00267 return th;
00268 }
00269
00270 void LimService::DecNumRunning() throw ()
00271 {
00272 this->mu.lock();
00273 this->numRunning--;
00274 assert(this->numRunning < this->maxRunning);
00275 this->mu.unlock();
00276 this->server_avail.signal();
00277 }
00278
00279 const int LSErrorCode = -15;
00280 const Text LSErrorMsg =
00281 Text("LimService error: callback should have thrown SRPC::failure");
00282
00283 void LimService::NewConnection(SRPC *srpc, int intfVersion, int procId)
00284 throw ()
00285 {
00286 try {
00287 while (true) {
00288
00289 this->mu.lock();
00290 if (this->numRunning + this->numBlocked >= this->maxTotal) {
00291
00292 this->mu.unlock();
00293 srpc->send_failure(this->failCode, this->failMessage);
00294 break;
00295 }
00296
00297 if (this->numBlocked > 0) {
00298 (this->numBlocked)++;
00299 this->server_avail.wait(this->mu);
00300 (this->numBlocked)--;
00301 }
00302
00303 while (this->numRunning >= this->maxRunning) {
00304 (this->numBlocked)++;
00305 this->server_avail.wait(this->mu);
00306 (this->numBlocked)--;
00307 }
00308 (this->numRunning)++;
00309 this->mu.unlock();
00310
00311
00312 try {
00313 if (this->callback) {
00314 (this->callback)(srpc, procId, this->arg);
00315 } else {
00316 (this->callback2)(srpc, intfVersion, procId, this->arg);
00317 }
00318 DecNumRunning();
00319 }
00320 catch (...) {
00321 DecNumRunning();
00322 throw;
00323 }
00324
00325
00326 SRPC::failure f;
00327 if (!srpc->previous_failure(&f)) {
00328 procId = SRPC::default_proc_id;
00329 intfVersion = this->intfVersion;
00330 srpc->await_call( procId, intfVersion);
00331 } else
00332 throw(SRPC::failure(LSErrorCode,
00333 LSErrorMsg + '(' + f.msg + ')'));
00334 }
00335 }
00336 catch (SRPC::failure f) {
00337
00338
00339 if (f.r == LSErrorCode) {
00340
00341 WriteFatalError(f);
00342 } else {
00343
00344 if (this->failureCallback != NULL) {
00345 (this->failureCallback)(srpc, f, this->arg);
00346 }
00347 }
00348 }
00349 }
00350
00351 LSWorker *LimService::NewWorker() throw ()
00352 {
00353 LSWorker *res;
00354 this->mu.lock();
00355 if (this->idleWorkers == NULL) {
00356
00357 res = NEW_CONSTR(LSWorker, (this, this->worker_attributes));
00358 } else {
00359
00360 WorkerList *w = this->idleWorkers;
00361 res = w->worker;
00362 this->idleWorkers = w->next;
00363 delete w;
00364 }
00365 this->mu.unlock();
00366 return res;
00367 }
00368
00369 void LimService::RegisterIdleWorker(LSWorker *worker) throw ()
00370 {
00371
00372 worker->Finish();
00373
00374
00375 WorkerList *cons = NEW(WorkerList);
00376 cons->worker = worker;
00377 this->mu.lock();
00378 cons->next = this->idleWorkers;
00379 this->idleWorkers = cons;
00380 this->mu.unlock();
00381 }
00382
00383
00384
00385 LSWorker::LSWorker(LimService *ls, const Basics::thread_attr &attr) throw ()
00386 : ls(ls), argsReady(false)
00387 {
00388
00389 this->th.fork_and_detach(LimService_Worker, (void *)this, attr);
00390 }
00391
00392 void* LimService_Worker(void* arg) throw ()
00393 {
00394 LSWorker *worker = (LSWorker *)arg;
00395 while (true) {
00396
00397 worker->mu.lock();
00398 while (!(worker->argsReady)) {
00399 worker->workToDo.wait(worker->mu);
00400 }
00401 worker->argsReady = false;
00402 worker->mu.unlock();
00403
00404
00405 worker->ls->NewConnection(worker->srpc, worker->intfVersion,
00406 worker->procId);
00407
00408
00409 worker->ls->RegisterIdleWorker(worker);
00410 }
00411
00412 }
00413
00414 void LSWorker::Start(SRPC *srpc, int intfVer, int procId) throw ()
00415 {
00416
00417 this->srpc = srpc;
00418 this->intfVersion = intfVer;
00419 this->procId = procId;
00420
00421
00422 this->mu.lock();
00423 this->argsReady = true;
00424 this->mu.unlock();
00425 this->workToDo.signal();
00426 }
00427
00428 void LSWorker::Finish() throw ()
00429 {
00430
00431 delete this->srpc;
00432 this->srpc = (SRPC *)NULL;
00433 }