00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 #include <stdlib.h>
00051 #if defined(__digital__)
00052 #include <standards.h>
00053 #endif
00054 #include <time.h>
00055 #include <sys/time.h>
00056 #include <signal.h>
00057 #include <stdio.h>
00058 #include <Basics.H>
00059 #include <TCP_sock.H>
00060 #include <SRPC.H>
00061 #include <LimService.H>
00062 #include <MultiSRPC.H>
00063 #include "Debug.H"
00064
00065 using std::cout;
00066 using std::cerr;
00067
00068 const Text ClientSwitch = "-client";
00069 const Text ServerSwitch = "-server";
00070 char* const IntfName = "9874";
00071 int const IntfVersion = 1;
00072
00073 int const FirstProcId = 1;
00074 int const LastProcId = 2;
00075
00076
00077 static const int FactorMicroSecs = 1000;
00078 static const int USecsPerSec = 1000000;
00079
00080 static void Pause(int num) throw ()
00081 {
00082 int total = num * FactorMicroSecs;
00083 int secs = total / USecsPerSec;
00084 int usecs = total % USecsPerSec;
00085 Basics::thread::pause(secs, usecs);
00086 }
00087
00088 void WriteFailure(const SRPC::failure &f, int threadId = -1) throw ()
00089 {
00090 Debug::Lock();
00091 cout << Debug::Timestamp() << "SRPC Failure";
00092 if (threadId != -1) cout << ", thread " << threadId;
00093 cout << "\n " << f.msg << " (code = " << f.r << ")\n\n";
00094 cout.flush();
00095 Debug::Unlock();
00096 }
00097
00098 void WriteIdInfo(SRPC *srpc) throw (SRPC::failure)
00099 {
00100 Text local(srpc->local_socket());
00101 Text remote(srpc->remote_socket());
00102 cout << " Local socket = " << local << "\n";
00103 cout << " Remote socket = " << remote << "\n";
00104 }
00105
00106
00107 bool sWait = false;
00108
00109 void ServerCallback(SRPC *srpc, int procId, void *unused)
00110 throw (SRPC::failure)
00111 {
00112 int tid, arg, res, waitTime;
00113
00114
00115 try {
00116
00117 tid = srpc->recv_int();
00118 arg = srpc->recv_int();
00119 srpc->recv_end();
00120
00121
00122 switch (procId) {
00123 case 1: res = arg + 2;
00124 case 2: res = arg * 2;
00125 }
00126 if (sWait) { waitTime = Debug::MyRand(arg, res); }
00127
00128
00129 Debug::Lock();
00130 cout << Debug::Timestamp() << "Server CALLED -- Client thread "
00131 << tid << ":\n";
00132 cout << " Procedure ID = " << procId << "\n";
00133 cout << " Received value = " << arg << "\n";
00134 WriteIdInfo(srpc);
00135 if (sWait) { cout << " Waiting " << waitTime << " seconds\n"; }
00136 cout << "\n";
00137 cout.flush();
00138 Debug::Unlock();
00139
00140
00141 if (sWait) {
00142 Pause(waitTime);
00143
00144 }
00145
00146
00147 Debug::Lock();
00148 cout << Debug::Timestamp() << "Server RESULT -- Client thread " <<
00149 tid << ":\n";
00150 cout << " Result value = " << res << "\n";
00151 WriteIdInfo(srpc); cout << "\n";
00152 cout.flush();
00153 Debug::Unlock();
00154 srpc->send_int(res);
00155 srpc->send_end();
00156 } catch (SRPC::failure f) {
00157 WriteFailure(f);
00158 throw;
00159 }
00160 Debug::Unlock();
00161 }
00162
00163 void ExitServer() throw ()
00164 {
00165 cerr << "SYNTAX: server [-wait] [maxRunning [maxBlocked]]\n";
00166 cerr.flush();
00167 exit(1);
00168 }
00169
00170 void Server(int argc, char *argv[]) throw ()
00171 {
00172 int maxRunning, maxBlocked;
00173 int arg = 2;
00174
00175
00176 if (argc > 5) {
00177 cerr << "Error: Too many arguments\n";
00178 ExitServer();
00179 }
00180 maxRunning = maxBlocked = 10000;
00181 if (arg < argc && !strcmp(argv[arg], "-wait")) { sWait = true; arg++; }
00182 if (arg < argc) { sscanf(argv[arg], "%d", &maxRunning); arg++; }
00183 if (arg < argc) { sscanf(argv[2], "%d", &maxBlocked); }
00184
00185
00186 Debug::Lock();
00187 cout << "Starting server:\n";
00188 cout << " Exporting '" << IntfName << "', version " << IntfVersion <<"\n";
00189 cout << " Maximum number of running threads = " << maxRunning << "\n";
00190 cout << " Maximum number of blocked threads = " << maxBlocked << "\n\n";
00191 cout.flush();
00192 Debug::Unlock();
00193 LimService ls(IntfName, IntfVersion, maxRunning, maxBlocked,
00194 ServerCallback);
00195 try {
00196 ls.Run();
00197 } catch (SRPC::failure f) {
00198 WriteFailure(f);
00199 }
00200 }
00201
00202 void ExitClient() throw ()
00203 {
00204 cerr << "SYNTAX: client ";
00205 cerr << "[-wait] [-threads n] [-base baseThreadNum] [-host hostname]\n";
00206 cerr.flush();
00207 exit(1);
00208 }
00209
00210 struct ClientArgs {
00211 char *hostname;
00212 MultiSRPC *conns;
00213 int id;
00214 unsigned int waitTime;
00215 bool cWait;
00216 };
00217
00218 void *MainClientProc(void *ptr) throw ()
00219 {
00220 ClientArgs *args = (ClientArgs *)ptr;
00221 SRPC *srpc;
00222 MultiSRPC::ConnId id;
00223 int proc, arg, res, waitTime;
00224
00225
00226 struct sigaction dummy;
00227 struct sigaction action;
00228 action.sa_handler = SIG_IGN;
00229 sigemptyset(&action.sa_mask);
00230 action.sa_flags = 0;
00231 (void)sigaction(SIGPIPE, &action, &dummy);
00232
00233 Debug::Lock();
00234 cout << Debug::Timestamp() << "Starting client thread "
00235 << args->id << "\n";
00236 if (args->waitTime > 0) {
00237 cout << " Waiting " << args->waitTime << " second(s)\n";
00238 } else {
00239 cout << " No waiting\n";
00240 }
00241 (cout << "\n").flush();
00242 Debug::Unlock();
00243 if (args->waitTime > 0) {
00244 Pause(args->waitTime);
00245
00246 }
00247
00248 try {
00249 while (true) {
00250 id = args->conns->Start(args->hostname, srpc);
00251 proc = Debug::MyRand(FirstProcId, LastProcId);
00252 arg = Debug::MyRand(1, 4);
00253 Debug::Lock();
00254 cout << Debug::Timestamp() << "Client thread " << args->id << "\n";
00255 cout << " Calling procedure " << proc << "\n";
00256 cout << " Argument = " << arg << "\n";
00257 WriteIdInfo(srpc); cout << "\n";
00258 cout.flush();
00259 Debug::Unlock();
00260
00261 srpc->start_call(proc, IntfVersion);
00262 srpc->send_int(args->id);
00263 srpc->send_int(arg);
00264 srpc->send_end();
00265
00266 res = srpc->recv_int();
00267 srpc->recv_end();
00268 if (args->cWait) { waitTime = Debug::MyRand(arg, res); }
00269 Debug::Lock();
00270 cout << Debug::Timestamp() << "Client thread " << args->id << "\n";
00271 cout << " Received result = " << res << "\n";
00272 WriteIdInfo(srpc);
00273 if (args->cWait) {
00274 cout << " Waiting " << waitTime << " seconds\n";
00275 }
00276 cout << "\n";
00277 cout.flush();
00278 Debug::Unlock();
00279 args->conns->End(id);
00280 if (args->cWait) {
00281 Pause(waitTime);
00282
00283 }
00284 }
00285 } catch (SRPC::failure f) {
00286 WriteFailure(f, args->id);
00287 }
00288
00289 Debug::Lock();
00290 cout << Debug::Timestamp() << "Exiting client thread "
00291 << args->id << "\n\n";
00292 cout.flush();
00293 Debug::Unlock();
00294 return (void *)NULL;
00295 }
00296
00297 void *ClientProc(void *ptr) throw ()
00298
00299
00300 {
00301 Basics::thread th;
00302 ClientArgs *args = (ClientArgs *)ptr;
00303
00304 while (true) {
00305 th.fork(MainClientProc, ptr);
00306 (void)th.join();
00307 if (args->cWait) {
00308 args->waitTime = (unsigned int)Debug::MyRand(5, 10);
00309 }
00310 }
00311
00312 }
00313
00314 void Client(int argc, char *argv[]) throw ()
00315 {
00316 int numThreads = 1;
00317 char *hostname = (char *)NULL;
00318 MultiSRPC *conns;
00319 bool cWait = false;
00320
00321
00322 if (argc < 2 || argc > 7) {
00323 cerr << "Error: Incorrect number of arguments\n";
00324 ExitClient();
00325 }
00326 for (int arg = 2; arg < argc; arg ++) {
00327 if (strcmp(argv[arg], "-wait") == 0) {
00328 cWait = true;
00329 } else if (strcmp(argv[arg], "-threads") == 0) {
00330 if (sscanf(argv[arg+1], "%d", &numThreads) < 1 || numThreads < 1) {
00331 cerr << "Error: -threads argument not a positive integer\n";
00332 ExitClient();
00333 }
00334 arg++;
00335 } else if (strcmp(argv[arg], "-host") == 0) {
00336 hostname = argv[arg+1];
00337 arg++;
00338 } else {
00339 cerr << "Error: argument " << arg << " is bad\n";
00340 ExitClient();
00341 }
00342 }
00343 if (hostname == (char *)NULL) {
00344 char buff[MAXHOSTNAMELEN];
00345 if (gethostname(buff, MAXHOSTNAMELEN) < 0) {
00346 cerr << "Error: gethostname() failed\n";
00347 exit(2);
00348 }
00349 hostname = NEW_PTRFREE_ARRAY(char, strlen(buff) + 1);
00350 strcpy(hostname, buff);
00351 }
00352
00353
00354 Debug::Lock();
00355 cout << "Starting client:\n";
00356 cout << " Server host '" << hostname << "'\n";
00357 cout << " Interface '" << IntfName << "', version "<<IntfVersion<<"\n\n";
00358 cout.flush();
00359 Debug::Unlock();
00360
00361
00362 conns = NEW_CONSTR(MultiSRPC, (IntfName));
00363
00364
00365 ClientArgs *args;
00366 Basics::thread th;
00367 Debug::Lock();
00368 cout << Debug::Timestamp() << "Forking " << numThreads << " thread(s)\n\n";
00369 cout.flush();
00370 Debug::Unlock();
00371 for (int i = 0; i < numThreads; i++) {
00372 args = NEW(ClientArgs);
00373 args->hostname = hostname;
00374 args->conns = conns;
00375 args->id = i + 1;
00376 args->waitTime = 0;
00377 args->cWait = cWait;
00378 th.fork_and_detach(ClientProc, (void *)args);
00379 }
00380
00381
00382 Debug::Lock();
00383 cout << Debug::Timestamp()
00384 << "Main client thread now blocking forever...\n\n";
00385 cout.flush();
00386 Debug::Unlock();
00387 Debug::BlockForever();
00388 }
00389
00390 int main(int argc, char *argv[])
00391 {
00392
00393 if (argc > 1 && argv[1] == ServerSwitch) {
00394 Server(argc, argv);
00395 } else if (argc > 1 && argv[1] == ClientSwitch) {
00396 Client(argc, argv);
00397 } else {
00398 cerr << "You must specify " << ServerSwitch << " or "
00399 << ClientSwitch << " as the first argument.\n";
00400 cerr.flush();
00401 exit(3);
00402 }
00403 return(0);
00404 }