00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #include <sys/types.h>
00035 #include <netinet/tcp.h>
00036 #include <poll.h>
00037 #include <Basics.H>
00038 #include <OS.H>
00039 #include <VestaConfig.H>
00040
00041 #if defined(__sun__)
00042 #include <sys/filio.h>
00043 #endif
00044
00045 #include "TCP_sock.H"
00046
00047 #if defined(__digital__)
00048
00049
00050 typedef int socklen_t;
00051
00052 #ifndef NETDB_INTERNAL
00053
00054
00055 #define NETDB_INTERNAL -1
00056 #endif
00057 #endif
00058
00059
00060
00061
00062
00063 const char *TCP_NAME = "tcp";
00064
00065
00066
00067
00068
00069 pthread_once_t TCP_sock::init_block = PTHREAD_ONCE_INIT;
00070
00071
00072 Text TCP_sock::my_hostname;
00073 sockaddr_in TCP_sock::defaultsock;
00074 in_addr TCP_sock::self_addr;
00075 int TCP_sock::tcp_protolevel;
00076 static unsigned int g_resolv_try_again_limit = 100;
00077
00078 #if defined(__linux__)
00079 Basics::mutex TCP_sock::ndb_mutex;
00080 #endif
00081
00082
00083
00084
00085
00086 static void die(const Text &m) throw (TCP_sock::failure)
00087 {
00088 throw TCP_sock::failure(TCP_sock::internal_trouble, m);
00089 }
00090
00091 static void die_with_errno(int reason, const Text &m, int e = 0)
00092 throw (TCP_sock::failure)
00093 {
00094 if (e == 0) e = errno;
00095 Text emsg = Basics::errno_Text(e);
00096 throw TCP_sock::failure(reason, m + " (" + emsg + ")", e);
00097 }
00098
00099 extern "C"
00100 {
00101 void TCP_sock_init()
00102 {
00103 TCP_sock::init();
00104 }
00105 }
00106
00107
00108 void TCP_sock::ensure_init() throw (TCP_sock::failure)
00109 {
00110 if (pthread_once(&init_block, TCP_sock_init) != 0)
00111 throw TCP_sock::failure(internal_trouble, "pthread_once failed!");
00112 }
00113
00114 static bool close_fd(fd_t &fd) throw ()
00115 {
00116 if (fd == NO_FD) return true;
00117 if (close(fd) == SYSERROR) return false;
00118 fd = NO_FD;
00119 return true;
00120 }
00121
00122 static Text TCP_state_str[4] = { "fresh", "listening", "connected", "dead" };
00123
00124 void TCP_sock::ensure_state(sock_state should_be)
00125 throw (TCP_sock::failure)
00126 {
00127 if (this->state != should_be) {
00128 sock_state is = this->state;
00129 this->state = TCP_sock::dead;
00130 throw failure(TCP_sock::wrong_state,
00131 Text("Socket in wrong state; is: ") + TCP_state_str[is] +
00132 ", should be: " + TCP_state_str[should_be]);
00133 }
00134 }
00135
00136 void TCP_sock::ensure_sndbuffer() throw ()
00137 {
00138 if (this->sndbuff != NULL) return;
00139 this->sndbuff = NEW_PTRFREE_ARRAY(char, this->sndbuff_size);
00140 this->sndbuff_len = 0;
00141 }
00142
00143 void TCP_sock::ensure_rcvbuffer() throw ()
00144 {
00145 if (this->rcvbuff != NULL) return;
00146 this->rcvbuff = NEW_PTRFREE_ARRAY(char, this->rcvbuff_size);
00147 this->rcvcurp = this->rcvend = this->rcvbuff;
00148 }
00149
00150 void TCP_sock::configure_blocking() throw (TCP_sock::failure)
00151 {
00152
00153 bool non_blocking = (this->alerts_enabled || this->read_timeout_enabled);
00154 if (this->non_blocking != non_blocking) {
00155
00156 int nb_param = non_blocking ? 1 : 0;
00157 if (ioctl(this->fd, FIONBIO, &nb_param) == SYSERROR)
00158 die_with_errno(TCP_sock::internal_trouble, "ioctl(FIONBIO) failed");
00159 this->non_blocking = non_blocking;
00160 }
00161 }
00162
00163 void TCP_sock::controlled_wait()
00164 throw (TCP_sock::alerted, TCP_sock::failure)
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192 {
00193
00194 int timeout = -1;
00195 time_t wait_start;
00196
00197
00198 if(this->read_timeout_enabled)
00199 {
00200
00201 timeout = this->read_timeout_secs * 1000;
00202
00203
00204
00205
00206
00207 wait_start = time(NULL);
00208 }
00209
00210 while (true) {
00211 struct pollfd poll_data[2];
00212 unsigned int nfds = 1;
00213 poll_data[0].fd = this->fd;
00214 poll_data[0].events = POLLIN;
00215 if (this->alerts_enabled) {
00216 nfds = 2;
00217 poll_data[1].fd = this->pipe_rd;
00218 poll_data[1].events = POLLIN;
00219 }
00220
00221 int nready = poll(poll_data, nfds, timeout);
00222
00223 if (nready == SYSERROR) {
00224
00225 if (errno == EAGAIN)
00226
00227
00228
00229 sleep(1);
00230 else if (errno != EINTR) {
00231 this->state = TCP_sock::dead;
00232 die_with_errno(TCP_sock::internal_trouble, "Error on 'poll'");
00233 }
00234
00235 } else if(poll_data[0].revents & POLLNVAL) {
00236
00237 this->state = TCP_sock::dead;
00238 die("Error on socket input");
00239 } else if(this->alerts_enabled &&
00240 (poll_data[1].revents & (POLLERR | POLLNVAL))) {
00241
00242 this->state = TCP_sock::dead;
00243 die("Error on alert channel pipe");
00244 } else if(poll_data[0].revents & (POLLIN | POLLERR)) {
00245
00246
00247
00248
00249 break;
00250 } else if (this->alerts_enabled && (poll_data[1].revents & POLLIN)) {
00251
00252 this->m.lock();
00253
00254
00255
00256
00257 while (this->bytes_in_pipe > 0) {
00258 char c;
00259 if (read(this->pipe_rd, &c, 1) == SYSERROR) {
00260 this->state = TCP_sock::dead;
00261 this->m.unlock();
00262 die_with_errno(TCP_sock::internal_trouble, "Error reading pipe");
00263 }
00264 this->bytes_in_pipe--;
00265 }
00266 if (this->alert_flag) {
00267 this->alert_flag = false;
00268 this->m.unlock();
00269 throw TCP_sock::alerted();
00270 }
00271 this->m.unlock();
00272 } else if(this->read_timeout_enabled && (nready == 0)) {
00273
00274 throw
00275 TCP_sock::failure(TCP_sock::read_timeout,
00276 "read timeout (no data from peer)");
00277 }
00278
00279
00280 if(this->read_timeout_enabled)
00281 {
00282
00283
00284 time_t now = time(NULL);
00285
00286
00287
00288
00289 if(now > wait_start)
00290 {
00291 if((now - wait_start) < this->read_timeout_secs)
00292 {
00293
00294 timeout = (this->read_timeout_secs - (now - wait_start)) * 1000;
00295 }
00296 else
00297 {
00298
00299
00300
00301 timeout = 1;
00302 }
00303 }
00304 }
00305
00306 }
00307 }
00308
00309
00310
00311
00312
00313
00314 void TCP_sock::init() throw (TCP_sock::failure)
00315 {
00316 {
00317
00318
00319
00320 Text try_again_limit;
00321 if(VestaConfig::get("TCP_sock", "resolv_try_again_limit",
00322 try_again_limit))
00323 {
00324 g_resolv_try_again_limit = atoi(try_again_limit.cchars());
00325 }
00326 }
00327
00328
00329
00330 char temp_hostname[MAXHOSTNAMELEN+1];
00331 if (gethostname(temp_hostname, MAXHOSTNAMELEN) == SYSERROR)
00332 die_with_errno(TCP_sock::internal_trouble,
00333 "initial gethostname failed!");
00334 TCP_sock::my_hostname = Text(temp_hostname);
00335
00336
00337
00338 memset((char *)(&(TCP_sock::defaultsock)), 0,
00339 sizeof(TCP_sock::defaultsock));
00340 TCP_sock::defaultsock.sin_family = AF_INET;
00341 TCP_sock::defaultsock.sin_addr.s_addr = INADDR_ANY;
00342
00343
00344 try
00345 {
00346 TCP_sock::self_addr = TCP_sock::host_to_addr(TCP_sock::my_hostname);
00347 }
00348 catch(TCP_sock::failure f)
00349 {
00350
00351
00352 f.msg += " (resolving our own hostname!)";
00353 throw f;
00354 }
00355
00356
00357
00358
00359
00360
00361
00362
00363 TCP_sock::tcp_protolevel = IPPROTO_TCP;
00364 endprotoent();
00365 }
00366
00367
00368
00369
00370
00371 TCP_sock::TCP_sock(u_short port) throw (TCP_sock::failure)
00372 {
00373 TCP_sock::ensure_init();
00374 this->instance_init();
00375 this->new_socket(port);
00376 }
00377
00378 TCP_sock::TCP_sock(const empty_sock_marker &p_ignored)
00379 throw (TCP_sock::failure)
00380 {
00381 TCP_sock::ensure_init();
00382
00383 this->instance_init();
00384
00385
00386 this->fd = SYSERROR;
00387 }
00388
00389
00390 void TCP_sock::set_no_delay(int fd) throw (TCP_sock::failure)
00391
00392
00393 {
00394 int no_delay = 1;
00395 if (setsockopt(fd, TCP_sock::tcp_protolevel, TCP_NODELAY,
00396 (char *)(&no_delay), sizeof(no_delay)) == SYSERROR) {
00397 die_with_errno(TCP_sock::internal_trouble,
00398 "setsocketopt failed on TCP_NODELAY");
00399 }
00400 }
00401
00402 void TCP_sock::instance_init() throw()
00403 {
00404 memset(&this->me, 0, sizeof(this->me));
00405
00406
00407 this->sndbuff = (char *)NULL;
00408 this->sndbuff_len = 0;
00409
00410 this->rcvbuff = (char *)NULL;
00411 this->rcvcurp = (char *)NULL;
00412 this->rcvend = (char *)NULL;
00413
00414 this->sndbuff_size = TCP_sock::default_sndbuffer_size;
00415 this->rcvbuff_size = TCP_sock::default_rcvbuffer_size;
00416
00417 this->non_blocking = false;
00418
00419 this->alerts_enabled = false;
00420
00421 this->read_timeout_enabled = false;
00422
00423 this->state = TCP_sock::fresh;
00424 }
00425
00426 void TCP_sock::new_socket(u_short port)
00427 throw (TCP_sock::failure)
00428 {
00429
00430
00431 this->me = TCP_sock::defaultsock;
00432
00433
00434 this->me.sin_port = htons(port);
00435
00436
00437 this->fd = socket(AF_INET, SOCK_STREAM, TCP_sock::tcp_protolevel);
00438 if (this->fd == SYSERROR) {
00439 int code = (errno == ENOBUFS || errno == ENOMEM)
00440 ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
00441 die_with_errno(code, "couldn't create TCP socket");
00442 }
00443
00444
00445
00446 int reuseaddr = 1;
00447 if (setsockopt(this->fd, SOL_SOCKET, SO_REUSEADDR,
00448 (char *)(&reuseaddr), sizeof(reuseaddr)) == SYSERROR)
00449 die_with_errno(TCP_sock::internal_trouble, "setsockopt(reuseaddr) failed");
00450
00451
00452 if (bind(this->fd, (sockaddr *)(&me), sizeof(me)) == SYSERROR) {
00453 int code = (errno == EADDRINUSE) ? TCP_sock::environment_problem :
00454 (errno == EADDRNOTAVAIL || errno == EACCES)
00455 ? TCP_sock::invalid_parameter : TCP_sock::internal_trouble;
00456 die_with_errno(code,
00457 "bind socket to " + inet_ntoa_r(this->me.sin_addr) + " failed!");
00458 }
00459
00460
00461 if (this->me.sin_port == 0) {
00462 socklen_t size = sizeof(this->me);
00463 if (getsockname(this->fd, (sockaddr *)(&(this->me)), &size) == SYSERROR) {
00464 int code = (errno == ENOBUFS)
00465 ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
00466 die_with_errno(code,
00467 "getsockname failed on " + inet_ntoa_r(this->me.sin_addr));
00468 }
00469 };
00470
00471
00472 TCP_sock::set_no_delay(this->fd);
00473 }
00474
00475 TCP_sock::~TCP_sock() throw (TCP_sock::failure)
00476 {
00477 if (this->fd != SYSERROR) (void)close_fd(this->fd);
00478 if (this->alerts_enabled) {
00479 if (!close_fd(this->pipe_wr))
00480 die_with_errno(TCP_sock::internal_trouble, "close of pipe_wr failed!");
00481 if (!close_fd(this->pipe_rd))
00482 die_with_errno(TCP_sock::internal_trouble, "close of pipe_rd failed!");
00483 this->alerts_enabled = false;
00484 }
00485 if (this->sndbuff != (char *)NULL) delete[] this->sndbuff;
00486 if (this->rcvbuff != (char *)NULL) delete[] this->rcvbuff;
00487 }
00488
00489
00490
00491
00492
00493 void TCP_sock::set_output_buffer_size(int bytes)
00494 throw (TCP_sock::failure)
00495 {
00496 if (bytes <= 0)
00497 throw(TCP_sock::failure(TCP_sock::invalid_parameter,
00498 "Illegal buffer size"));
00499 this->m.lock();
00500 try {
00501 if (this->sndbuff != (char *)NULL) {
00502 throw(TCP_sock::failure(TCP_sock::wrong_state,
00503 "Can't set output buffer size now"));
00504 }
00505 this->sndbuff_size = bytes;
00506 } catch(TCP_sock::failure &f) { this->m.unlock(); throw; }
00507 this->m.unlock();
00508 }
00509
00510 void TCP_sock::set_input_buffer_size(int bytes)
00511 throw (TCP_sock::failure)
00512 {
00513 if (bytes <= 0)
00514 throw(TCP_sock::failure(TCP_sock::invalid_parameter,
00515 "Illegal buffer size"));
00516 this->m.lock();
00517 try {
00518 if (this->rcvbuff != (char *)NULL) {
00519 throw(TCP_sock::failure(TCP_sock::wrong_state,
00520 "Can't set input buffer size now"));
00521 }
00522 this->rcvbuff_size = bytes;
00523 } catch(failure &f) { this->m.unlock(); throw; }
00524 this->m.unlock();
00525 }
00526
00527 void TCP_sock::enable_alerts() throw (TCP_sock::failure)
00528 {
00529 this->m.lock();
00530 try {
00531 if (!(this->alerts_enabled)) {
00532 fd_t pipe_fds[2];
00533 if (pipe(pipe_fds) == SYSERROR) {
00534 int code = (errno == ENFILE || errno == ENOMEM)
00535 ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
00536 die_with_errno(code, "pipe call failed!");
00537 }
00538 this->pipe_rd = pipe_fds[0];
00539 this->pipe_wr = pipe_fds[1];
00540 this->bytes_in_pipe = 0;
00541 this->alert_flag = false;
00542 this->alerts_enabled = true;
00543 this->configure_blocking();
00544 }
00545 } catch(...) { this->m.unlock(); throw; }
00546 this->m.unlock();
00547 }
00548
00549 void TCP_sock::disable_alerts() throw (TCP_sock::failure)
00550 {
00551 this->m.lock();
00552 try {
00553 if (this->alerts_enabled) {
00554 if (!close_fd(this->pipe_wr))
00555 die_with_errno(TCP_sock::internal_trouble, "close of pipe_wr failed!");
00556 if (!close_fd(this->pipe_rd))
00557 die_with_errno(TCP_sock::internal_trouble, "close of pipe_rd failed!");
00558 this->alerts_enabled = false;
00559 this->configure_blocking();
00560 }
00561 } catch(...) { this->m.unlock(); throw; }
00562 this->m.unlock();
00563 }
00564
00565 void TCP_sock::alert() throw (TCP_sock::failure)
00566 {
00567 this->m.lock();
00568 try {
00569 if (!(this->alerts_enabled)) {
00570 throw(TCP_sock::failure(TCP_sock::wrong_state,
00571 "alerts aren't enabled on this socket"));
00572 }
00573 if (!(this->alert_flag)) {
00574 char c = '!';
00575 if (write(this->pipe_wr, &c, 1) == SYSERROR)
00576 die_with_errno(TCP_sock::internal_trouble, "write to pipe failed!");
00577 this->bytes_in_pipe++;
00578 this->alert_flag = true;
00579 }
00580 } catch(...) { this->m.unlock(); throw; }
00581 this->m.unlock();
00582 }
00583
00584 bool TCP_sock::test_alert() throw(TCP_sock::failure)
00585 {
00586 bool res;
00587 this->m.lock();
00588 try {
00589 if (!(this->alerts_enabled))
00590 throw failure(TCP_sock::wrong_state,
00591 "alerts aren't enabled on this socket.");
00592 res = this->alert_flag;
00593 this->alert_flag = false;
00594 } catch(...) { this->m.unlock(); throw; }
00595 this->m.unlock();
00596 return res;
00597 }
00598
00599 void TCP_sock::set_keepalive(bool state) throw (failure)
00600 {
00601 int keepalive = state ? 1 : 0;
00602
00603 this->m.lock();
00604 int result = setsockopt(this->fd, SOL_SOCKET, SO_KEEPALIVE,
00605 (char *)(&keepalive), sizeof(keepalive));
00606 this->m.unlock();
00607
00608 if(result == SYSERROR)
00609 {
00610 die_with_errno(TCP_sock::internal_trouble,
00611 "setsocketopt failed on SO_KEEPALIVE");
00612 }
00613 }
00614
00615 bool TCP_sock::get_keepalive() throw (failure)
00616 {
00617 int keepalive = 0;
00618 socklen_t keepalive_size = sizeof(keepalive);
00619
00620 this->m.lock();
00621 int result = getsockopt(this->fd, SOL_SOCKET, SO_KEEPALIVE,
00622 (char *)(&keepalive), &keepalive_size);
00623 this->m.unlock();
00624
00625 if(result == SYSERROR)
00626 {
00627 die_with_errno(TCP_sock::internal_trouble,
00628 "getsocketopt failed on SO_KEEPALIVE");
00629 }
00630
00631 return (keepalive != 0);
00632 }
00633
00634 void TCP_sock::enable_read_timeout(unsigned int seconds)
00635 throw (TCP_sock::failure)
00636 {
00637 read_timeout_enabled = true;
00638 read_timeout_secs = seconds;
00639
00640
00641
00642 configure_blocking();
00643 }
00644
00645 void TCP_sock::disable_read_timeout() throw (TCP_sock::failure)
00646 {
00647 read_timeout_enabled = false;
00648
00649
00650
00651 configure_blocking();
00652 }
00653
00654
00655
00656
00657
00658
00659 Text TCP_sock::this_host() throw (TCP_sock::failure)
00660 {
00661 TCP_sock::ensure_init();
00662 return TCP_sock::my_hostname;
00663 }
00664
00665 void TCP_sock::get_local_addr( sockaddr_in &addr)
00666 throw (TCP_sock::failure)
00667 {
00668 addr = this->me;
00669
00670
00671
00672
00673 if(addr.sin_addr.s_addr == INADDR_ANY)
00674 {
00675 addr.sin_addr = TCP_sock::self_addr;
00676 }
00677 }
00678
00679 void TCP_sock::get_remote_addr( sockaddr_in &addr)
00680 throw (TCP_sock::failure)
00681 {
00682 if (this->state != TCP_sock::dead) {
00683
00684
00685 this->ensure_state(TCP_sock::connected);
00686 }
00687 addr = this->him;
00688 }
00689
00690 bool TCP_sock::alive() throw (TCP_sock::failure)
00691 {
00692 bool result = true;
00693 this->m.lock();
00694 try {
00695
00696
00697
00698 while (true) {
00699 struct pollfd poll_data;
00700 poll_data.fd = this->fd;
00701 poll_data.events = POLLIN;
00702 int nready = poll(&poll_data, 1, 0);
00703
00704 if (nready == 0) break;
00705 else if (nready > 0) {
00706 if (poll_data.revents & (POLLERR | POLLNVAL)) {
00707 result = false;
00708 } else if (poll_data.revents & POLLIN) {
00709 int bytes_to_read;
00710 if (ioctl(this->fd, FIONREAD, &bytes_to_read) == SYSERROR)
00711 die_with_errno(TCP_sock::internal_trouble, "Error on 'ioctl'");
00712 result = (bytes_to_read != 0);
00713 } else {
00714 assert(false);
00715 }
00716 break;
00717 } else if (nready == SYSERROR) {
00718
00719 if (errno == EAGAIN) {
00720
00721
00722
00723 sleep(1);
00724
00725 } else if (errno != EINTR) {
00726 this->state = TCP_sock::dead;
00727 die_with_errno(TCP_sock::internal_trouble, "Error on 'poll'");
00728 }
00729
00730 } else {
00731 assert(false);
00732 }
00733 }
00734 } catch (failure &f) { this->m.unlock(); throw; }
00735 this->m.unlock();
00736 return result;
00737 }
00738
00739 u_short TCP_sock::parse_port(const Text &port)
00740 throw (TCP_sock::failure)
00741 {
00742 u_short l_result = 0;
00743
00744
00745 if (isdigit(port[0]))
00746 {
00747 l_result = atoi(port.cchars());
00748 }
00749 else
00750 {
00751 #if defined(__digital__) || defined(__linux__)
00752 #define USE_GETSERVBYNAME_R
00753 #endif
00754
00755 #if defined(USE_GETSERVBYNAME_R)
00756
00757 struct servent sp;
00758 # if defined(__digital__)
00759 struct servent_data sd;
00760 # elif defined(__linux__)
00761 struct servent *sp_result;
00762
00763
00764
00765
00766 char sp_buf[1024];
00767 # endif
00768
00769 # if defined(__digital__)
00770 memset((char *)(&sd), 0, sizeof(sd));
00771 # elif defined(__linux__)
00772 memset(sp_buf, 0, sizeof(sp_buf));
00773 # endif
00774
00775 # if defined(__digital__)
00776 if (getservbyname_r(port.cchars(), TCP_NAME, &sp, &sd) == SYSOK)
00777 # elif defined(__linux__)
00778 ndb_mutex.lock();
00779 if (getservbyname_r(port.cchars(), TCP_NAME,
00780 &sp,
00781 sp_buf, sizeof(sp_buf),
00782 &sp_result) == SYSOK)
00783 # else
00784 # error Do not know how to do getservbyname_r on this platform!
00785 # endif
00786 {
00787 # if defined(__linux__)
00788 ndb_mutex.unlock();
00789 # endif
00790 l_result = ntohs(sp.s_port);
00791 endservent();
00792 } else {
00793 # if defined(__linux__)
00794 ndb_mutex.unlock();
00795 # endif
00796 throw(TCP_sock::failure(TCP_sock::unknown_port,
00797 "unknown port: " + port));
00798 }
00799 #else
00800 struct addrinfo hints, *res = 0;
00801 memset(&hints, 0, sizeof(hints));
00802 hints.ai_family = PF_INET;
00803 int err = getaddrinfo(0, port.cchars(), &hints, &res);
00804 if(err != 0)
00805 {
00806 throw(TCP_sock::failure(TCP_sock::unknown_port,
00807 "unknown port: " + port + ": " +
00808 gai_strerror(err)));
00809 }
00810 assert(res != 0);
00811
00812 bool found = false;
00813 for(struct addrinfo *cur = res;
00814 (cur != 0) && !found;
00815 cur = cur->ai_next)
00816 {
00817 assert(cur->ai_addr != 0);
00818 if(cur->ai_addr->sa_family == AF_INET)
00819 {
00820 l_result = ((sockaddr_in *) cur->ai_addr)->sin_port;
00821 found = true;
00822 }
00823 }
00824
00825 freeaddrinfo(res);
00826
00827 if(!found)
00828 {
00829 throw(TCP_sock::failure(TCP_sock::unknown_port,
00830 "unknown port: " + port +
00831 " (getaddrinfo indicates success, "
00832 "but no usable result found)"));
00833 }
00834 #endif
00835 }
00836
00837
00838 return l_result;
00839 }
00840
00841 in_addr TCP_sock::host_to_addr(const Text &hostname)
00842 throw (TCP_sock::failure)
00843 {
00844 in_addr l_result;
00845
00846
00847 int dummy;
00848 if(isdigit(hostname[0]) &&
00849 (sscanf(hostname.chars(), "%d.%d.%d.%d",
00850 &dummy, &dummy, &dummy, &dummy) == 4))
00851 {
00852 l_result.s_addr = inet_addr(hostname.chars());
00853 }
00854 else
00855 {
00856 #if defined(__digital__) || defined(__linux__)
00857 #define USE_GETHOSTBYNAME_R
00858 #endif
00859
00860 #if defined(USE_GETHOSTBYNAME_R)
00861
00862 struct hostent hp;
00863 # if defined(__digital__)
00864 struct hostent_data hd;
00865 # elif defined(__linux__)
00866 struct hostent *hp_result;
00867
00868
00869
00870
00871 char hp_buf[1024];
00872 # endif
00873 int h_errno_r;
00874
00875 memset((char *)&l_result, 0, sizeof(l_result));
00876 # if defined(__digital__)
00877 memset((char *)(&hd), 0, sizeof(hd));
00878 # elif defined(__linux__)
00879 memset(hp_buf, 0, sizeof(hp_buf));
00880 # endif
00881
00882
00883 unsigned int try_again_count = 0;
00884 while(1)
00885 {
00886
00887
00888 if(try_again_count > g_resolv_try_again_limit)
00889 {
00890 throw
00891 TCP_sock::failure(TCP_sock::unknown_host,
00892 Text("misconfigured name resolution "
00893 "(spinning on non-fatal error) "
00894 "resolving host: ") +
00895 hostname);
00896
00897 }
00898 # if defined(__digital__)
00899 if (gethostbyname_r(hostname.chars(), &hp, &hd) == SYSERROR)
00900 # elif defined(__linux__)
00901 ndb_mutex.lock();
00902 if (gethostbyname_r(hostname.chars(), &hp,
00903 hp_buf, sizeof(hp_buf),
00904 &hp_result,
00905 &h_errno_r) != SYSOK)
00906 # else
00907 # error Do not know how to do gethostbyname_r on this platform!
00908 # endif
00909 {
00910 # if defined(__digital__)
00911 h_errno_r = h_errno;
00912 # elif defined(__linux__)
00913 ndb_mutex.unlock();
00914 # endif
00915 switch (h_errno_r)
00916 {
00917 case TRY_AGAIN:
00918
00919
00920
00921
00922 try_again_count++;
00923 break;
00924 case NETDB_INTERNAL:
00925
00926 die_with_errno(TCP_sock::unknown_host,
00927 Text("error resolving host: ") + hostname);
00928 case HOST_NOT_FOUND:
00929
00930 case NO_DATA:
00931
00932 throw(TCP_sock::failure(TCP_sock::unknown_host,
00933 Text("unknown host: ") + hostname));
00934 case NO_RECOVERY:
00935
00936 default:
00937
00938
00939 throw(TCP_sock::failure(TCP_sock::unknown_host,
00940 Text("error resolving host: ") +
00941 hostname));
00942 }
00943 }
00944 else
00945 {
00946 # if defined(__linux__)
00947 ndb_mutex.unlock();
00948 # endif
00949
00950
00951
00952
00953
00954 if(hp.h_addrtype != AF_INET)
00955 {
00956 throw(TCP_sock::failure(TCP_sock::unknown_host,
00957 Text("non-IPv4 host: ") + hostname));
00958 }
00959
00960
00961
00962
00963 if(hp.h_addr == 0)
00964 {
00965 try_again_count++;
00966 continue;
00967 }
00968
00969 memcpy((char *)(&l_result), hp.h_addr, hp.h_length);
00970 endhostent();
00971 break;
00972 }
00973 }
00974 #else
00975
00976 unsigned int try_again_count = 0;
00977 while(1)
00978 {
00979
00980
00981 if(try_again_count > g_resolv_try_again_limit)
00982 {
00983 throw
00984 TCP_sock::failure(TCP_sock::unknown_host,
00985 Text("misconfigured name resolution "
00986 "(spinning on non-fatal error) "
00987 "resolving host: ") +
00988 hostname);
00989
00990 }
00991
00992 struct addrinfo hints, *res = 0;
00993 memset(&hints, 0, sizeof(hints));
00994 hints.ai_family = PF_INET;
00995 int err = getaddrinfo(hostname.cchars(), 0, &hints, &res);
00996 if(err != 0)
00997 {
00998 switch (err)
00999 {
01000 case EAI_AGAIN:
01001
01002
01003
01004
01005 try_again_count++;
01006 break;
01007 case EAI_SYSTEM:
01008
01009 die_with_errno(TCP_sock::unknown_host,
01010 Text("error resolving host: ") + hostname);
01011 case EAI_NONAME:
01012
01013 #ifdef EAI_NODATA
01014 case EAI_NODATA:
01015
01016 #endif
01017 throw(TCP_sock::failure(TCP_sock::unknown_host,
01018 Text("unknown host: ") + hostname));
01019 case EAI_FAIL:
01020
01021 default:
01022
01023
01024 throw(TCP_sock::failure(TCP_sock::unknown_host,
01025 Text("error resolving host: ") +
01026 hostname + ": " +
01027 gai_strerror(err)));
01028 }
01029 }
01030 else
01031 {
01032 assert(res != 0);
01033
01034 bool found = false;
01035 for(struct addrinfo *cur = res;
01036 (cur != 0) && !found;
01037 cur = cur->ai_next)
01038 {
01039 assert(cur->ai_addr != 0);
01040 if(cur->ai_addr->sa_family == AF_INET)
01041 {
01042 l_result = ((sockaddr_in *) cur->ai_addr)->sin_addr;
01043 found = true;
01044 }
01045 }
01046
01047 freeaddrinfo(res);
01048
01049 if(!found)
01050 {
01051 throw(TCP_sock::failure(TCP_sock::unknown_host,
01052 Text("error resolving host: ") +
01053 hostname +
01054 " (getaddrinfo indicates success, "
01055 "but no usable result found)"));
01056 }
01057
01058 break;
01059 }
01060 }
01061 #endif
01062 }
01063
01064 return l_result;
01065 }
01066
01067
01068 void TCP_sock::name_to_sockaddr(const Text &hostname,
01069 const Text &port, sockaddr_in &s) throw (TCP_sock::failure)
01070 {
01071
01072 TCP_sock::ensure_init();
01073
01074
01075 memset((char *)(&s), 0, sizeof(s));
01076
01077
01078
01079 s.sin_addr = TCP_sock::host_to_addr(hostname.Empty()
01080 ? TCP_sock::my_hostname
01081 : hostname);
01082
01083
01084
01085 s.sin_family = AF_INET;
01086
01087
01088 s.sin_port = htons(TCP_sock::parse_port(port));
01089 }
01090
01091
01092 void TCP_sock::name_to_sockaddr(const Text &host_and_port,
01093 sockaddr_in &s) throw(TCP_sock::failure)
01094 {
01095 int i = host_and_port.FindCharR(':');
01096 if (i < 1) throw(TCP_sock::failure(TCP_sock::invalid_parameter,
01097 "No colon separating hostname and port number"));
01098 name_to_sockaddr(host_and_port.Sub(0, i),
01099 host_and_port.Sub(i+1), s);
01100 }
01101
01102
01103
01104
01105
01106 void TCP_sock::connect_to(const Text &hostname, const Text &port)
01107 throw (TCP_sock::failure)
01108 {
01109 sockaddr_in addr;
01110 try {
01111 TCP_sock::name_to_sockaddr(hostname, port, addr);
01112 } catch (TCP_sock::failure &f) { this->state = TCP_sock::dead; throw; };
01113 this->connect_to(addr);
01114 }
01115
01116 void TCP_sock::connect_to(const Text &host_and_port)
01117 throw (TCP_sock::failure)
01118 {
01119 sockaddr_in addr;
01120 try {
01121 TCP_sock::name_to_sockaddr(host_and_port, addr);
01122 } catch (TCP_sock::failure &f) { this->state = TCP_sock::dead; throw; };
01123 this->connect_to(addr);
01124 }
01125
01126 void TCP_sock::connect_to(sockaddr_in &addr)
01127 throw (TCP_sock::failure)
01128 {
01129 this->ensure_state(TCP_sock::fresh);
01130
01131 while (connect(this->fd, (sockaddr *)(&addr), sizeof(addr)) < 0) {
01132
01133
01134 int l_errno = errno;
01135 if (l_errno == EISCONN) {
01136
01137 break;
01138 } else if (l_errno == EINTR) {
01139
01140
01141 } else if ((l_errno == EWOULDBLOCK) || (l_errno == EINPROGRESS) ||
01142 (l_errno == EALREADY) || (l_errno == EAGAIN)) {
01143
01144 struct pollfd flist[1];
01145 flist[0].fd = this->fd;
01146 flist[0].events = POLLOUT;
01147 flist[0].revents = 0;
01148 poll(flist, 1, -1);
01149 } else if (l_errno == EADDRINUSE) {
01150
01151
01152
01153 char b;
01154 if (read(fd, &b, 0) == 0) {
01155 break;
01156 }
01157
01158 } else {
01159
01160 this->state = TCP_sock::dead;
01161 int code = TCP_sock::internal_trouble;
01162 switch (l_errno) {
01163 case ETIMEDOUT:
01164 case ENETUNREACH:
01165 code = TCP_sock::unknown_host;
01166 break;
01167 case ECONNREFUSED:
01168 code = TCP_sock::unknown_port;
01169 break;
01170 case EADDRNOTAVAIL:
01171 code = TCP_sock::invalid_parameter;
01172 break;
01173
01174 }
01175
01176 Text l_msg = "Can't connect to ";
01177 l_msg += inet_ntoa_r(addr.sin_addr);
01178 l_msg += ":";
01179
01180 char l_port_txt[6];
01181 sprintf(l_port_txt, "%d", ntohs(addr.sin_port));
01182
01183 l_msg += l_port_txt;
01184
01185 die_with_errno(code, l_msg, l_errno);
01186 }
01187 }
01188 this->him = addr;
01189 this->state = TCP_sock::connected;
01190 }
01191
01192
01193
01194
01195
01196 void TCP_sock::set_waiters(int waiters) throw (TCP_sock::failure)
01197 {
01198 this->ensure_state(TCP_sock::fresh);
01199 if (listen(this->fd, waiters) == SYSERROR)
01200 die_with_errno(TCP_sock::internal_trouble,
01201 "can't set_waiters ('listen' failed)");
01202 this->state = TCP_sock::listening;
01203 }
01204
01205 TCP_sock *TCP_sock::wait_for_connect()
01206 throw(TCP_sock::alerted, TCP_sock::failure)
01207 {
01208 socklen_t len;
01209
01210
01211
01212 TCP_sock *ns = NEW_CONSTR(TCP_sock, (empty_sock_marker()));
01213
01214 if (this->state == TCP_sock::fresh) this->set_waiters();
01215
01216 this->ensure_state(TCP_sock::listening);
01217
01218 while (true) {
01219 len = sizeof(ns->him);
01220 ns->fd = accept(this->fd, (sockaddr *)(&(ns->him)), &len);
01221 if (ns->fd != SYSERROR) break;
01222
01223 if (errno == EWOULDBLOCK) {
01224
01225 this->controlled_wait();
01226
01227 } else if (errno == EINTR) {
01228
01229 } else {
01230 this->state = TCP_sock::dead;
01231 int code = (errno == ENOMEM)
01232 ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
01233 die_with_errno(code,
01234 "accept failed on socket " + inet_ntoa_r(ns->me.sin_addr));
01235 }
01236 }
01237
01238
01239 TCP_sock::set_no_delay(ns->fd);
01240
01241
01242 len = sizeof(ns->me);
01243 if (getsockname(ns->fd, (sockaddr *)(&(ns->me)), &len) == SYSERROR) {
01244 this->state = TCP_sock::dead;
01245 int code = (errno == ENOBUFS)
01246 ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
01247 die_with_errno(code,
01248 "getsockname failed on " + inet_ntoa_r(ns->me.sin_addr));
01249 }
01250 ns->state = TCP_sock::connected;
01251 return ns;
01252 }
01253
01254
01255
01256
01257
01258 void TCP_sock::send_data(const char *buffer, int len, bool flush)
01259 throw (TCP_sock::failure)
01260 {
01261 int i = 0;
01262
01263 this->ensure_state(TCP_sock::connected);
01264 this->ensure_sndbuffer();
01265
01266 while (len > 0) {
01267 int n = min(len, this->sndbuff_size - this->sndbuff_len);
01268 memcpy(this->sndbuff + this->sndbuff_len, buffer + i, n);
01269 this->sndbuff_len += n;
01270 i += n; len -= n;
01271 if (len > 0 || flush) {
01272 int unsent = this->sndbuff_len, j = 0, sent;
01273 while (true) {
01274 sent = write(this->fd, sndbuff + j, unsent);
01275 if (sent == unsent) break;
01276 if (sent == SYSERROR) {
01277 if (errno == EINTR || errno == EWOULDBLOCK || errno == ENOBUFS) {
01278
01279
01280
01281
01282
01283 while (true) {
01284 struct pollfd poll_data[2];
01285 unsigned int nfds = 1;
01286
01287 poll_data[0].fd = this->fd;
01288 poll_data[0].events = POLLOUT;
01289 if (this->alerts_enabled) {
01290 nfds = 2;
01291 poll_data[1].fd = this->pipe_rd;
01292
01293
01294 poll_data[1].events = 0;
01295 }
01296
01297 int nready = poll(poll_data, nfds, 1000);
01298
01299 if (nready == 0) {
01300
01301 break;
01302 } else if (nready == SYSERROR) {
01303
01304 if (errno == EAGAIN) {
01305
01306
01307
01308
01309 sleep(1);
01310 break;
01311 } else if (errno != EINTR) {
01312 this->state = TCP_sock::dead;
01313 die_with_errno(TCP_sock::internal_trouble,
01314 "Error during 'poll'");
01315 }
01316
01317
01318
01319
01320 } else if (poll_data[0].revents & POLLNVAL) {
01321
01322 this->state = TCP_sock::dead;
01323 die("Error on socket output");
01324 } else if (this->alerts_enabled &&
01325 (poll_data[1].revents & (POLLERR | POLLNVAL))) {
01326
01327 this->state = TCP_sock::dead;
01328 die("Error on alert channel pipe");
01329 } else if (poll_data[0].revents & (POLLOUT |
01330 POLLERR | POLLHUP)) {
01331
01332
01333
01334
01335
01336 break;
01337 }
01338 }
01339 } else {
01340 this->state = TCP_sock::dead;
01341 int code = (errno == ENOBUFS)
01342 ? TCP_sock::environment_problem : TCP_sock::internal_trouble;
01343 die_with_errno(code, "Unable to send data");
01344 }
01345 } else {
01346 j += sent;
01347 unsent -= sent;
01348 }
01349 }
01350 this->sndbuff_len = 0;
01351 }
01352 }
01353 }
01354
01355 int TCP_sock::fill_rcvbuffer()
01356 throw(TCP_sock::alerted, TCP_sock::failure)
01357 {
01358 int bytes_read;
01359 int debug_errno = 0;
01360
01361 if (this->non_blocking) {
01362
01363 while (true) {
01364
01365
01366
01367
01368
01369
01370 do {
01371 bytes_read = read(this->fd, this->rcvbuff,
01372 TCP_sock::default_rcvbuffer_size);
01373 } while (bytes_read == SYSERROR && errno == EINTR);
01374
01375 debug_errno = errno;
01376
01377
01378
01379
01380
01381
01382
01383
01384
01385
01386
01387
01388
01389
01390
01391
01392
01393
01394
01395
01396
01397
01398
01399
01400
01401
01402
01403
01404
01405
01406
01407 if (bytes_read > 0) {
01408 return bytes_read;
01409 } else if (bytes_read == 0) {
01410 break;
01411 } else if (bytes_read == SYSERROR && errno != EWOULDBLOCK) {
01412 break;
01413 }
01414
01415
01416 assert(bytes_read == SYSERROR && errno == EWOULDBLOCK);
01417 this->controlled_wait();
01418
01419
01420
01421 }
01422 } else {
01423
01424 do {
01425 bytes_read = read(fd, rcvbuff, TCP_sock::default_rcvbuffer_size);
01426 } while (bytes_read == SYSERROR && errno == EINTR);
01427 debug_errno = errno;
01428 }
01429
01430
01431 if (bytes_read > 0) {
01432
01433 return bytes_read;
01434 } else if (bytes_read == 0) {
01435
01436 this->state = TCP_sock::dead;
01437 throw(TCP_sock::failure(TCP_sock::partner_went_away,
01438 "connection closed by peer"));
01439 } else if (bytes_read == SYSERROR && errno == ECONNRESET) {
01440
01441 this->state = TCP_sock::dead;
01442 throw(TCP_sock::failure(TCP_sock::partner_went_away,
01443 "connection reset by peer"));
01444 } else if (bytes_read == SYSERROR && (errno == ETIMEDOUT ||
01445 errno == EHOSTUNREACH)) {
01446
01447 this->state = TCP_sock::dead;
01448 throw(TCP_sock::failure(TCP_sock::partner_went_away,
01449 (errno == ETIMEDOUT)
01450 ? "connection timed out"
01451 : "peer unreachable"));
01452 } else if (bytes_read == SYSERROR) {
01453
01454 this->state = TCP_sock::dead;
01455 die_with_errno(TCP_sock::internal_trouble, "Error receiving data");
01456 }
01457 return -1;
01458 }
01459
01460 int TCP_sock::recv_data(char *buffer, int len)
01461 throw (TCP_sock::alerted, TCP_sock::failure)
01462 {
01463 int nread, bytes_read;
01464 this->ensure_state(TCP_sock::connected);
01465 this->ensure_rcvbuffer();
01466
01467 if (this->rcvcurp >= this->rcvend) {
01468 assert(this->rcvcurp == this->rcvend);
01469
01470 nread = this->fill_rcvbuffer();
01471 this->rcvcurp = this->rcvbuff;
01472 this->rcvend = this->rcvbuff + nread;
01473 }
01474 bytes_read = min(len, this->rcvend - this->rcvcurp);
01475 memcpy(buffer, this->rcvcurp, bytes_read);
01476 this->rcvcurp += bytes_read;
01477 return bytes_read;
01478 }