00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <stdlib.h>
00025
00026 extern "C" int _Prand_r(unsigned int *seedptr);
00027
00028 #include <Basics.H>
00029 #include "FlushQueue.H"
00030
00031 using std::cout;
00032 using std::cerr;
00033 using std::endl;
00034
00035 static Basics::mutex mu;
00036 static FlushQueue *q;
00037
00038 static void Syntax(char *msg, char *arg = (char *)NULL) throw ()
00039 {
00040 cerr << "Error: " << msg;
00041 if (arg != (char *)NULL) cerr << ": `" << arg << "'";
00042 cerr << endl;
00043 cerr << "Syntax: TestFlushQueue [ -threads num ] [ -pause msecs ]" << endl;
00044 exit(1);
00045 }
00046
00047 class ThreadArgs {
00048 public:
00049 ThreadArgs(int threadId, int msecPause) throw ()
00050 : threadId(threadId), msecPause(msecPause) { }
00051 Basics::thread th;
00052 int threadId;
00053 int msecPause;
00054 };
00055
00056 static void Pause(int total_msecs) throw ()
00057 {
00058 int secs = total_msecs / 1000;
00059 int msecs = total_msecs % 1000;
00060 Basics::thread::pause(secs, msecs);
00061 }
00062
00063 static void* ThreadBody(void *voidarg) throw ()
00064
00065 {
00066 ThreadArgs *args = (ThreadArgs *)voidarg;
00067 unsigned int seed = args->threadId;
00068 while (true) {
00069
00070 int r = rand_r(&seed);
00071 unsigned long pause = 5L * (long)(args->msecPause) * (long)r;
00072 Pause((int)(pause / RAND_MAX));
00073
00074
00075 mu.lock();
00076 cout << "Thread " << args->threadId << ": enqueued" << endl;
00077 q->Enqueue();
00078 cout << "Thread " << args->threadId << ": working" << endl;
00079 mu.unlock();
00080
00081
00082 Pause(args->msecPause);
00083
00084
00085 mu.lock();
00086 cout << "Thread " << args->threadId << ": dequeued" << endl;
00087 q->Dequeue();
00088 mu.unlock();
00089
00090 }
00091
00092
00093 }
00094
00095 int main(int argc, char *argv[])
00096 {
00097
00098 int numThreads = 5;
00099 int msecPause = 300;
00100
00101
00102 int arg = 1;
00103 while (arg < argc) {
00104 char *curr = argv[arg];
00105 if (*curr == '-') {
00106 if (!strcmp(curr, "-threads")) {
00107 arg++;
00108 if (arg < argc) {
00109 if (sscanf(argv[arg], "%d", &numThreads) != 1) {
00110 Syntax("illegal argument to -threads", argv[arg]);
00111 }
00112 arg++;
00113 } else {
00114 Syntax("no argument supplied to switch", curr);
00115 }
00116 } else if (!strcmp(curr, "-pause")) {
00117 arg++;
00118 if (arg < argc) {
00119 if (sscanf(argv[arg], "%d", &msecPause) != 1) {
00120 Syntax("illegal argument to -pause", argv[arg]);
00121 }
00122 arg++;
00123 } else {
00124 Syntax("no argument supplied to switch", curr);
00125 }
00126 } else {
00127 Syntax("unrecognized switch", curr);
00128 }
00129 } else {
00130 Syntax("unrecognized argument", curr);
00131 }
00132 }
00133 assert(arg == argc);
00134
00135
00136 q = NEW_CONSTR(FlushQueue, (&mu));
00137
00138
00139 for (int i = 0; i < numThreads; i++) {
00140 ThreadArgs *args = NEW_PTRFREE_CONSTR(ThreadArgs, (i + 1, msecPause));
00141 args->th.fork_and_detach(ThreadBody, (void *)args);
00142 }
00143
00144
00145 Basics::cond c;
00146 mu.lock();
00147 while (true) c.wait(mu);
00148
00149 }