00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <Basics.H>
00031 #include "SRPC.H"
00032 #include "SRPC_impl.H"
00033
00034 #if defined(__linux__)
00035
00036 typedef unsigned short in_port_t;
00037 typedef unsigned int in_addr_t;
00038 #endif
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051 const char *SRPC_VERSION = "SRPC V1.4";
00052
00053 const int INITIAL_WAITERS_KLUDGE = 5;
00054
00055
00056
00057
00058
00059 SRPC_impl::SRPC_impl(bool i_am_caller)
00060 : is_caller(i_am_caller), is_sender(i_am_caller),
00061 version_sent(false), version_checked(false),
00062 state(SRPC_impl::initial),
00063 buffered_ic(SRPC_impl::ic_null), s((TCP_sock *)NULL),
00064 listener((SRPC_listener *)NULL),
00065 use_read_timeout(false), use_read_timeout_between_calls(false),
00066 read_timeout_secs(0)
00067 {
00068 }
00069
00070 SRPC_impl::~SRPC_impl() throw (SRPC::failure)
00071 {
00072 if (this->listener != (SRPC_listener *)NULL) {
00073 if (!SRPC_listener::destroy(this->listener))
00074 throw_failure(SRPC::internal_trouble, "listeners list scrambled.");
00075 }
00076 if (this->s != (TCP_sock *)NULL) delete this->s;
00077 }
00078
00079
00080
00081
00082
00083 void SRPC_impl::throw_failure(int reason, const Text &msg, bool local_only)
00084 throw(SRPC::failure)
00085 {
00086 if (!(local_only || this->s == (TCP_sock *)NULL)) {
00087 try {
00088 Text rmsg = "[from other end] " + msg;
00089 send_item_code(SRPC_impl::ic_failure);
00090 send_int32(reason);
00091 send_bytes(rmsg.chars(), rmsg.Length(), true);
00092 } catch (TCP_sock::failure) {
00093
00094 }
00095 }
00096 this->state = SRPC_impl::failed;
00097 this->f = SRPC::failure(reason, msg);
00098 throw(this->f);
00099 }
00100
00101 void SRPC_impl::report_TCP_failure(TCP_sock::failure &tf)
00102 throw (SRPC::failure)
00103 {
00104 SRPC::failure f = SRPC::convert_TCP_failure(tf);
00105 throw_failure(f.r, f.msg, true);
00106 }
00107
00108
00109
00110
00111
00112 chars op_names[] = {
00113 "start_call", "await_call",
00114 "send_int32", "recv_int32",
00115 "send_chars", "recv_chars", "recv_chars_here",
00116 "send_bytes", "recv_bytes", "recv_bytes_here",
00117 "send_text", "recv_text",
00118 "send_socket", "recv_socket",
00119 "send_int32_seq", "recv_int32_seq",
00120 "send_chars_seq", "recv_chars_seq",
00121 "send_bytes_seq", "recv_bytes_seq",
00122 "send_seq_start", "recv_seq_start", "send_seq_end", "recv_seq_end",
00123 "send_end", "recv_end",
00124 "send_failure",
00125
00126
00127 "send_int16", "recv_int16",
00128 "send_int16_array", "recv_int16_array",
00129 "send_int64", "recv_int64",
00130
00131 "send_int32_array", "recv_int32_array",
00132 "send_int64_array", "recv_int64_array",
00133
00134 "send_bool", "recv_bool"
00135 };
00136
00137 chars state_names[] = {
00138 "initial", "ready", "data_out", "seq_out", "data_in", "seq_in", "failed"
00139 };
00140
00141 enum {
00142 rs_pos = 0,
00143 recver_pos = 0, sender_pos = 1,
00144 cc_pos = 2,
00145 callee_pos = 2, caller_pos = 3,
00146 state0_pos = 4
00147 };
00148
00149 #define M(x) (1<<(state0_pos+SRPC_impl::x))
00150 #define ANY_STATE (-1)
00151 #define IS_SENDER (1<<sender_pos)
00152 #define IS_RECVER (1<<recver_pos)
00153 #define IS_CALLER (1<<caller_pos)
00154 #define IS_CALLEE (1<<callee_pos)
00155 #define EITHER_END (IS_CALLER | IS_CALLEE)
00156 #define FRESH_SENDER EITHER_END | IS_SENDER | M(initial) | M(ready)
00157 #define FRESH_RECVER EITHER_END | IS_RECVER | M(initial) | M(ready)
00158
00159 int legal_states[] = {
00160 IS_CALLER | IS_SENDER | M(initial),
00161 IS_CALLEE | IS_RECVER | M(initial),
00162 FRESH_SENDER | M(data_out) | M(seq_out),
00163 FRESH_RECVER | M(data_in) | M(seq_in),
00164 FRESH_SENDER | M(data_out) | M(seq_out),
00165 FRESH_RECVER | M(data_in) | M(seq_in),
00166 FRESH_RECVER | M(data_in) | M(seq_in),
00167 FRESH_SENDER | M(data_out) | M(seq_out),
00168 FRESH_RECVER | M(data_in) | M(seq_in),
00169 FRESH_RECVER | M(data_in) | M(seq_in),
00170 FRESH_SENDER | M(data_out) | M(seq_out),
00171 FRESH_RECVER | M(data_in) | M(seq_in),
00172 FRESH_SENDER | M(data_out) | M(seq_out),
00173 FRESH_RECVER | M(data_in) | M(seq_in),
00174 FRESH_SENDER | M(data_out) | M(seq_out),
00175 FRESH_RECVER | M(data_in) | M(seq_in),
00176 FRESH_SENDER | M(data_out) | M(seq_out),
00177 FRESH_RECVER | M(data_in) | M(seq_in),
00178 FRESH_SENDER | M(data_out) | M(seq_out),
00179 FRESH_RECVER | M(data_in) | M(seq_in),
00180 FRESH_SENDER | M(data_out),
00181 FRESH_RECVER | M(data_in),
00182 EITHER_END | IS_SENDER | M(seq_out),
00183 EITHER_END | IS_RECVER | M(seq_in),
00184 FRESH_SENDER | M(data_out),
00185 FRESH_RECVER | M(data_in),
00186 ANY_STATE,
00187
00188
00189 FRESH_SENDER | M(data_out) | M(seq_out),
00190 FRESH_RECVER | M(data_in) | M(seq_in),
00191 FRESH_SENDER | M(data_out) | M(seq_out),
00192 FRESH_RECVER | M(data_in) | M(seq_in),
00193 FRESH_SENDER | M(data_out) | M(seq_out),
00194 FRESH_RECVER | M(data_in) | M(seq_in),
00195 FRESH_SENDER | M(data_out) | M(seq_out),
00196 FRESH_RECVER | M(data_in) | M(seq_in),
00197 FRESH_SENDER | M(data_out) | M(seq_out),
00198 FRESH_RECVER | M(data_in) | M(seq_in),
00199
00200 FRESH_SENDER | M(data_out) | M(seq_out),
00201 FRESH_RECVER | M(data_in) | M(seq_in)
00202 };
00203
00204 #undef M
00205 #undef ANY_STATE
00206 #undef IS_SENDER
00207 #undef IS_RECVER
00208 #undef IS_CALLER
00209 #undef IS_CALLEE
00210 #undef EITHER_END
00211 #undef FRESH_SENDER
00212 #undef FRESH_RECVER
00213
00214 void SRPC_impl::check_state(int op) throw (SRPC::failure)
00215 {
00216 int m =
00217 (1<<(this->is_sender+rs_pos)) | (1<<(this->is_caller+cc_pos)) |
00218 (1<<(this->state+state0_pos));
00219 if ((m & legal_states[op]) != m) {
00220 throw_failure(SRPC::protocol_violation,
00221 Text(op_names[op]) + " isn't legal now (in state '" +
00222 state_names[this->state] + "')");
00223 }
00224 }
00225
00226 void SRPC_impl::ensure_sending_state() throw ()
00227 {
00228 switch (this->state) {
00229 case SRPC_impl::data_out:
00230 case SRPC_impl::seq_out:
00231 break;
00232 default:
00233 this->state = SRPC_impl::data_out;
00234 }
00235 }
00236
00237 void SRPC_impl::ensure_recving_state() throw ()
00238 {
00239 switch (this->state) {
00240 case SRPC_impl::data_in:
00241 case SRPC_impl::seq_in:
00242 break;
00243 default:
00244 this->state = SRPC_impl::data_in;
00245 }
00246 }
00247
00248
00249
00250
00251
00252 void SRPC_impl::send_bytes(const char *bs, int len, bool flush)
00253 throw(TCP_sock::failure)
00254 {
00255 send_int32(len);
00256 s->send_data(bs, len, flush);
00257 }
00258
00259 void SRPC_impl::recv_bytes(char *bs, int len)
00260 throw(TCP_sock::failure, SRPC::failure)
00261 {
00262
00263
00264
00265
00266 if(this->use_read_timeout &&
00267 ((!this->state == SRPC_impl::initial) ||
00268 this->use_read_timeout_between_calls))
00269 {
00270 this->s->enable_read_timeout(this->read_timeout_secs);
00271 }
00272 else
00273 {
00274 this->s->disable_read_timeout();
00275 }
00276
00277 int curr = 0;
00278 while (len > 0) {
00279 int bytes_read;
00280 bytes_read = this->s->recv_data(&bs[curr], len);
00281 curr += bytes_read;
00282 len -= bytes_read;
00283 }
00284 }
00285
00286
00287
00288
00289
00290 void SRPC_impl::send_int16(Basics::int16 i, bool flush)
00291 throw (TCP_sock::failure)
00292 {
00293 i = Basics::hton16(i);
00294 this->s->send_data((const char *) &i, sizeof(i), flush);
00295 }
00296
00297 Basics::int16 SRPC_impl::recv_int16()
00298 throw (TCP_sock::failure)
00299 {
00300 Basics::int16 x;
00301 recv_bytes((char *) &x, sizeof(x));
00302 return Basics::ntoh16(x);
00303 }
00304
00305 void SRPC_impl::send_int32(Basics::int32 i, bool flush)
00306 throw (TCP_sock::failure)
00307 {
00308 i = Basics::hton32(i);
00309 this->s->send_data((const char *) &i, sizeof(i), flush);
00310 }
00311
00312 Basics::int32 SRPC_impl::recv_int32()
00313 throw (TCP_sock::failure)
00314 {
00315 Basics::int32 x;
00316 recv_bytes((char *) &x, sizeof(x));
00317 return Basics::ntoh32(x);
00318 }
00319
00320 void SRPC_impl::send_int64(Basics::int64 i, bool flush)
00321 throw (TCP_sock::failure)
00322 {
00323 i = Basics::hton64(i);
00324 this->s->send_data((const char *) &i, sizeof(i), flush);
00325 }
00326
00327 Basics::int64 SRPC_impl::recv_int64()
00328 throw (TCP_sock::failure)
00329 {
00330 Basics::int64 x;
00331 recv_bytes((char *) &x, sizeof(x));
00332 return Basics::ntoh64(x);
00333 }
00334
00335
00336
00337
00338
00339 void SRPC_impl::send_item_code(item_code ic, bool flush)
00340 throw (TCP_sock::failure)
00341 {
00342 char c;
00343 if (!(this->version_sent)) {
00344 c = SRPC_impl::ic_hello;
00345 this->s->send_data(&c, 1);
00346 send_bytes(SRPC_VERSION, strlen(SRPC_VERSION)+1);
00347 this->version_sent = true;
00348 }
00349 c = ic;
00350 this->s->send_data(&c, 1, flush);
00351 }
00352
00353 item_code SRPC_impl::read_item_code()
00354 throw (TCP_sock::failure, SRPC::failure)
00355 {
00356 char c;
00357 item_code ic;
00358
00359
00360
00361 if (this->s == (TCP_sock *)NULL && !(this->is_caller)) {
00362 this->s = this->listener->sock->wait_for_connect();
00363 }
00364
00365
00366
00367 if (this->buffered_ic == SRPC_impl::ic_null) {
00368 this->recv_bytes(&c, 1);
00369 ic = (item_code) (unsigned char) c;
00370 } else {
00371 ic = this->buffered_ic;
00372 this->buffered_ic = SRPC_impl::ic_null;
00373 }
00374
00375 if (!(this->version_checked)) {
00376 if (ic != SRPC_impl::ic_hello) {
00377 this->throw_failure(SRPC::protocol_violation,
00378 "Other end didn't say hello.", true);
00379 }
00380
00381 int len = this->recv_int32();
00382 char *ver = NEW_PTRFREE_ARRAY(char, len+1);
00383 this->recv_bytes(ver, len);
00384 ver[len] = '\0';
00385
00386 if (strcmp(ver, SRPC_VERSION) != 0) {
00387 delete[] ver;
00388 this->throw_failure(SRPC::protocol_violation,
00389 "Client/server SRPC version mismatch", true);
00390 }
00391 delete[] ver;
00392
00393
00394
00395 this->version_checked = true;
00396
00397
00398 this->recv_bytes(&c, 1);
00399 ic = (item_code) (unsigned char) c;
00400 }
00401
00402 if (ic == SRPC_impl::ic_failure) {
00403
00404 int code = this->recv_int32();
00405 int len = this->recv_int32();
00406 char *str = NEW_PTRFREE_ARRAY(char, len);
00407 this->recv_bytes(str, len);
00408 Text msg(str, len);
00409 delete[] str;
00410 this->throw_failure(code, msg, true);
00411 }
00412
00413 return ic;
00414 }
00415
00416 void SRPC_impl::recv_item_code(item_code expected, bool *got_end)
00417 throw (TCP_sock::failure, SRPC::failure)
00418 {
00419 item_code ic = this->read_item_code();
00420
00421 if (got_end != (bool *)NULL) *got_end = false;
00422 if (expected == ic) return;
00423 if (this->state == SRPC_impl::seq_in && ic == SRPC_impl::ic_seq_end
00424 && got_end != (bool *)NULL) {
00425
00426 this->buffered_ic = ic;
00427 *got_end = true;
00428 return;
00429 }
00430
00431 char msg[50];
00432 sprintf(msg, "Mix-up: expected item code %d, got %d", expected, ic);
00433 this->throw_failure(SRPC::protocol_violation, msg);
00434 }
00435
00436 item_code SRPC_impl::recv_item_code(item_code expected1, item_code expected2,
00437 bool *got_end)
00438 throw (TCP_sock::failure, SRPC::failure)
00439 {
00440 item_code ic = this->read_item_code();
00441
00442 if (got_end != (bool *)NULL) *got_end = false;
00443 if ((expected1 == ic) || (expected2 == ic)) return ic;
00444 if (this->state == SRPC_impl::seq_in && ic == SRPC_impl::ic_seq_end
00445 && got_end != (bool *)NULL) {
00446
00447 this->buffered_ic = ic;
00448 *got_end = true;
00449 return SRPC_impl::ic_seq_end;
00450 }
00451
00452 char msg[60];
00453 sprintf(msg, "Mix-up: expected item code %d or %d, got %d",
00454 expected1, expected2, ic);
00455 this->throw_failure(SRPC::protocol_violation, msg);
00456
00457
00458 return SRPC_impl::ic_null;
00459 }
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471 Basics::mutex SRPC_listener::m;
00472 SRPC_listener *SRPC_listener::listeners = NULL;
00473
00474
00475
00476
00477
00478
00479 SRPC_listener *SRPC_listener::create(u_short port, TCP_sock *ls)
00480 throw (TCP_sock::failure)
00481 {
00482 SRPC_listener *s;
00483 SRPC_listener::m.lock();
00484 try {
00485 s = SRPC_listener::listeners;
00486 while (s != (SRPC_listener *)NULL) {
00487 if (s->port == port) break;
00488 s = s->next;
00489 }
00490 if (s == (SRPC_listener *)NULL) {
00491 s = NEW(SRPC_listener);
00492 s->next = SRPC_listener::listeners;
00493 if (s->my_sock = (ls == (TCP_sock *)NULL)) {
00494 s->sock = NEW_CONSTR(TCP_sock, (port));
00495 s->sock->set_waiters(INITIAL_WAITERS_KLUDGE);
00496
00497
00498 s->sock->enable_keepalive();
00499 }
00500 else s->sock = ls;
00501 s->port = port;
00502 s->n = 0;
00503 SRPC_listener::listeners = s;
00504 }
00505 s->n++;
00506 } catch (...) { SRPC_listener::m.unlock(); throw; }
00507 SRPC_listener::m.unlock();
00508 return s;
00509 }
00510
00511
00512 bool SRPC_listener::destroy(SRPC_listener *listener)
00513 throw (TCP_sock::failure)
00514 {
00515 bool result = true;
00516 SRPC_listener::m.lock();
00517 try {
00518 if (--(listener->n) == 0) {
00519
00520 SRPC_listener *l = SRPC_listener::listeners;
00521 SRPC_listener *prev = (SRPC_listener *)NULL;
00522 while (result = (l != (SRPC_listener *)NULL)) {
00523 if (l->sock == listener->sock) {
00524 if (prev == (SRPC_listener *)NULL) {
00525 SRPC_listener::listeners = l->next;
00526 } else {
00527 prev->next = l->next;
00528 }
00529 if (listener->my_sock) {
00530
00531 delete listener->sock;
00532 }
00533 delete listener;
00534 break;
00535 }
00536 prev = l;
00537 l = l->next;
00538 }
00539 }
00540 } catch (...) { SRPC_listener::m.unlock(); throw; }
00541 SRPC_listener::m.unlock();
00542 return result;
00543 }
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586