00001
00022
00023
00024 #include "Common/Compat.h"
00025
00026 #include <cassert>
00027 #include <iostream>
00028
00029 extern "C" {
00030 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00031 #include <arpa/inet.h>
00032 #include <netinet/ip.h>
00033 #endif
00034 #include <errno.h>
00035 #include <fcntl.h>
00036 #include <netdb.h>
00037 #include <netinet/in.h>
00038 #include <netinet/tcp.h>
00039 #include <poll.h>
00040 #include <sys/socket.h>
00041 #include <sys/types.h>
00042 #include <arpa/inet.h>
00043 }
00044
00045 #include "Common/Config.h"
00046 #include "Common/Error.h"
00047 #include "Common/InetAddr.h"
00048 #include "Common/FileUtils.h"
00049 #include "Common/SystemInfo.h"
00050 #include "Common/Time.h"
00051
00052 #include "ReactorFactory.h"
00053 #include "ReactorRunner.h"
00054 #include "Comm.h"
00055 #include "IOHandlerAccept.h"
00056 #include "IOHandlerData.h"
00057
00058 using namespace Hypertable;
00059 using namespace std;
00060
00061 atomic_t Comm::ms_next_request_id = ATOMIC_INIT(1);
00062
00063 Comm *Comm::ms_instance = NULL;
00064 Mutex Comm::ms_mutex;
00065
00066 Comm::Comm() {
00067 if (ReactorFactory::ms_reactors.size() == 0) {
00068 HT_ERROR("ReactorFactory::initialize must be called before creating "
00069 "AsyncComm::comm object");
00070 HT_ABORT;
00071 }
00072
00073 ReactorFactory::get_reactor(m_timer_reactor);
00074 m_handler_map = ReactorRunner::handler_map;
00075 }
00076
00077
00078 Comm::~Comm() {
00079 set<IOHandler *> handlers;
00080 m_handler_map->decomission_all(handlers);
00081
00082 foreach(IOHandler *handler, handlers)
00083 handler->shutdown();
00084
00085
00086 m_handler_map->wait_for_empty();
00087
00088
00089 ReactorFactory::destroy();
00090 }
00091
00092
00093 void Comm::destroy() {
00094 if (ms_instance) {
00095 delete ms_instance;
00096 ms_instance = 0;
00097 }
00098 }
00099
00100
00101 int
00102 Comm::connect(const CommAddress &addr, DispatchHandlerPtr &default_handler) {
00103 int sd;
00104 int error = m_handler_map->contains_data_handler(addr);
00105
00106 if (error == Error::OK)
00107 return Error::COMM_ALREADY_CONNECTED;
00108 else if (error != Error::COMM_NOT_CONNECTED)
00109 return error;
00110
00111 if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00112 HT_ERRORF("socket: %s", strerror(errno));
00113 return Error::COMM_SOCKET_ERROR;
00114 }
00115
00116 return connect_socket(sd, addr, default_handler);
00117 }
00118
00119
00120
00121 int
00122 Comm::connect(const CommAddress &addr, const CommAddress &local_addr,
00123 DispatchHandlerPtr &default_handler) {
00124 int sd;
00125 int error = m_handler_map->contains_data_handler(addr);
00126
00127 HT_ASSERT(local_addr.is_inet());
00128
00129 if (error == Error::OK)
00130 return Error::COMM_ALREADY_CONNECTED;
00131 else if (error != Error::COMM_NOT_CONNECTED)
00132 return error;
00133
00134 if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00135 HT_ERRORF("socket: %s", strerror(errno));
00136 return Error::COMM_SOCKET_ERROR;
00137 }
00138
00139
00140 if ((bind(sd, (const sockaddr *)&local_addr.inet, sizeof(sockaddr_in))) < 0) {
00141 HT_ERRORF( "bind: %s: %s", local_addr.to_str().c_str(), strerror(errno));
00142 return Error::COMM_BIND_ERROR;
00143 }
00144
00145 return connect_socket(sd, addr, default_handler);
00146 }
00147
00148
00149 int Comm::set_alias(const InetAddr &addr, const InetAddr &alias) {
00150 ScopedLock lock(ms_mutex);
00151 return m_handler_map->set_alias(addr, alias);
00152 }
00153
00154
00155 void Comm::listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf) {
00156 DispatchHandlerPtr null_handler(0);
00157 listen(addr, chf, null_handler);
00158 }
00159
00160
00161 int Comm::add_proxy(const String &proxy, const InetAddr &addr) {
00162 HT_ASSERT(ReactorFactory::proxy_master);
00163 return m_handler_map->add_proxy(proxy, addr);
00164 }
00165
00166 void Comm::get_proxy_map(ProxyMapT &proxy_map) {
00167 m_handler_map->get_proxy_map(proxy_map);
00168 }
00169
00170 bool Comm::wait_for_proxy_load(Timer &timer) {
00171 return m_handler_map->wait_for_proxy_load(timer);
00172 }
00173
00174
00175 void
00176 Comm::listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf,
00177 DispatchHandlerPtr &default_handler) {
00178 IOHandlerPtr handler;
00179 IOHandlerAccept *accept_handler;
00180 int one = 1;
00181 int sd;
00182
00183 HT_ASSERT(addr.is_inet());
00184
00185 if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
00186 HT_THROW(Error::COMM_SOCKET_ERROR, strerror(errno));
00187
00188
00189 FileUtils::set_flags(sd, O_NONBLOCK);
00190
00191 #if defined(__linux__)
00192 if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
00193 HT_ERRORF("setting TCP_NODELAY: %s", strerror(errno));
00194 #elif defined(__sun__)
00195 if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)) < 0)
00196 HT_ERRORF("setting TCP_NODELAY: %s", strerror(errno));
00197 #elif defined(__APPLE__) || defined(__FreeBSD__)
00198 if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) < 0)
00199 HT_WARNF("setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
00200 #endif
00201
00202 if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
00203 HT_ERRORF("setting SO_REUSEADDR: %s", strerror(errno));
00204
00205 if ((bind(sd, (const sockaddr *)&addr.inet, sizeof(sockaddr_in))) < 0)
00206 HT_THROWF(Error::COMM_BIND_ERROR, "binding to %s: %s",
00207 addr.to_str().c_str(), strerror(errno));
00208
00209 if (::listen(sd, 1000) < 0)
00210 HT_THROWF(Error::COMM_LISTEN_ERROR, "listening: %s", strerror(errno));
00211
00212 handler = accept_handler = new IOHandlerAccept(sd, addr.inet, default_handler,
00213 m_handler_map, chf);
00214 int32_t error = m_handler_map->insert_handler(accept_handler);
00215 if (error != Error::OK)
00216 HT_THROWF(error, "Error inserting accept handler for %s into handler map",
00217 addr.to_str().c_str());
00218 accept_handler->start_polling();
00219 }
00220
00221
00222
00223 int
00224 Comm::send_request(const CommAddress &addr, uint32_t timeout_ms,
00225 CommBufPtr &cbuf, DispatchHandler *resp_handler) {
00226 ScopedLock lock(ms_mutex);
00227 IOHandlerDataPtr data_handler;
00228 int error;
00229
00230 if ((error = m_handler_map->lookup_data_handler(addr, data_handler)) != Error::OK) {
00231 HT_WARNF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
00232 return error;
00233 }
00234
00235 return send_request(data_handler, timeout_ms, cbuf, resp_handler);
00236 }
00237
00238
00239
00240 int Comm::send_request(IOHandlerDataPtr &data_handler, uint32_t timeout_ms,
00241 CommBufPtr &cbuf, DispatchHandler *resp_handler) {
00242 int error;
00243
00244 cbuf->header.flags |= CommHeader::FLAGS_BIT_REQUEST;
00245 if (resp_handler == 0) {
00246 cbuf->header.flags |= CommHeader::FLAGS_BIT_IGNORE_RESPONSE;
00247 cbuf->header.id = 0;
00248 }
00249 else {
00250 cbuf->header.id = atomic_inc_return(&ms_next_request_id);
00251 if (cbuf->header.id == 0)
00252 cbuf->header.id = atomic_inc_return(&ms_next_request_id);
00253 }
00254
00255 cbuf->header.timeout_ms = timeout_ms;
00256 cbuf->write_header_and_reset();
00257
00258 if ((error = data_handler->send_message(cbuf, timeout_ms, resp_handler))
00259 != Error::OK)
00260 data_handler->shutdown();
00261
00262 return error;
00263 }
00264
00265
00266
00267 int Comm::send_response(const CommAddress &addr, CommBufPtr &cbuf) {
00268 ScopedLock lock(ms_mutex);
00269 IOHandlerDataPtr data_handler;
00270 int error;
00271
00272 if ((error = m_handler_map->lookup_data_handler(addr, data_handler)) != Error::OK) {
00273 HT_ERRORF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
00274 return error;
00275 }
00276
00277 cbuf->header.flags &= CommHeader::FLAGS_MASK_REQUEST;
00278
00279 cbuf->write_header_and_reset();
00280
00281 if ((error = data_handler->send_message(cbuf)) != Error::OK)
00282 data_handler->shutdown();
00283
00284 return error;
00285 }
00286
00287
00288 void
00289 Comm::create_datagram_receive_socket(CommAddress &addr, int tos,
00290 DispatchHandlerPtr &dhp) {
00291 IOHandlerPtr handler;
00292 IOHandlerDatagram *dg_handler;
00293 int one = 1;
00294 int sd;
00295
00296 HT_ASSERT(addr.is_inet());
00297
00298 if ((sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
00299 HT_THROWF(Error::COMM_SOCKET_ERROR, "%s", strerror(errno));
00300
00301
00302 FileUtils::set_flags(sd, O_NONBLOCK);
00303
00304 int bufsize = 4*32768;
00305
00306 if (setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&bufsize, sizeof(bufsize))
00307 < 0) {
00308 HT_ERRORF("setsockopt(SO_SNDBUF) failed - %s", strerror(errno));
00309 }
00310 if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&bufsize, sizeof(bufsize))
00311 < 0) {
00312 HT_ERRORF("setsockopt(SO_RCVBUF) failed - %s", strerror(errno));
00313 }
00314
00315 if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
00316 HT_WARNF("setsockopt(SO_REUSEADDR) failure: %s", strerror(errno));
00317 }
00318
00319 if (tos) {
00320 int opt;
00321 #if defined(__linux__)
00322 opt = tos;
00323 setsockopt(sd, SOL_IP, IP_TOS, &opt, sizeof(opt));
00324 opt = tos;
00325 setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &opt, sizeof(opt));
00326 #elif defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00327 opt = IPTOS_LOWDELAY;
00328 setsockopt(sd, IPPROTO_IP, IP_TOS, &opt, sizeof(opt));
00329 #endif
00330 }
00331
00332
00333 if ((bind(sd, (const sockaddr *)&addr.inet, sizeof(sockaddr_in))) < 0)
00334 HT_THROWF(Error::COMM_BIND_ERROR, "binding to %s: %s",
00335 addr.to_str().c_str(), strerror(errno));
00336
00337 handler = dg_handler = new IOHandlerDatagram(sd, addr.inet, dhp);
00338
00339 addr.set_inet( handler->get_local_address() );
00340
00341 int32_t error = m_handler_map->insert_datagram_handler(dg_handler);
00342 if (error != Error::OK)
00343 HT_THROWF(error, "Error inserting datagram handler for %s into handler map",
00344 addr.to_str().c_str());
00345 dg_handler->start_polling();
00346 }
00347
00348
00349 int
00350 Comm::send_datagram(const CommAddress &addr, const CommAddress &send_addr,
00351 CommBufPtr &cbuf) {
00352 ScopedLock lock(ms_mutex);
00353 IOHandlerDatagramPtr dg_handler;
00354 int error;
00355
00356 HT_ASSERT(addr.is_inet());
00357
00358 if ((error = m_handler_map->lookup_datagram_handler(send_addr, dg_handler)) != Error::OK) {
00359 HT_ERRORF("Datagram send/local address %s not registered",
00360 send_addr.to_str().c_str());
00361 return error;
00362 }
00363
00364 cbuf->header.flags |= (CommHeader::FLAGS_BIT_REQUEST |
00365 CommHeader::FLAGS_BIT_IGNORE_RESPONSE);
00366
00367 cbuf->write_header_and_reset();
00368
00369 if ((error = dg_handler->send_message(addr.inet, cbuf)) != Error::OK)
00370 dg_handler->shutdown();
00371
00372 return error;
00373 }
00374
00375
00376 int Comm::set_timer(uint32_t duration_millis, DispatchHandler *handler) {
00377 ExpireTimer timer;
00378 boost::xtime_get(&timer.expire_time, boost::TIME_UTC);
00379 xtime_add_millis(timer.expire_time, duration_millis);
00380 timer.handler = handler;
00381 m_timer_reactor->add_timer(timer);
00382 return Error::OK;
00383 }
00384
00385
00386 int
00387 Comm::set_timer_absolute(boost::xtime expire_time, DispatchHandler *handler) {
00388 ExpireTimer timer;
00389 memcpy(&timer.expire_time, &expire_time, sizeof(boost::xtime));
00390 timer.handler = handler;
00391 m_timer_reactor->add_timer(timer);
00392 return Error::OK;
00393 }
00394
00395
00396 int
00397 Comm::get_local_address(const CommAddress &addr,
00398 CommAddress &local_addr) {
00399 ScopedLock lock(ms_mutex);
00400 IOHandlerDataPtr data_handler;
00401 int error;
00402
00403 if ((error = m_handler_map->lookup_data_handler(addr, data_handler)) != Error::OK) {
00404 HT_ERRORF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
00405 return error;
00406 }
00407
00408 local_addr.set_inet( data_handler->get_local_address() );
00409
00410 return Error::OK;
00411 }
00412
00413
00414 int Comm::close_socket(const CommAddress &addr) {
00415 IOHandlerPtr handler;
00416
00417 if (!m_handler_map->decomission_handler(addr, handler))
00418 return Error::COMM_NOT_CONNECTED;
00419
00420 handler->shutdown();
00421
00422 return Error::OK;
00423 }
00424
00425
00430 int
00431 Comm::connect_socket(int sd, const CommAddress &addr,
00432 DispatchHandlerPtr &default_handler) {
00433 IOHandlerPtr handler;
00434 IOHandlerData *data_handler;
00435 int32_t error;
00436 int one = 1;
00437 CommAddress connectable_addr;
00438
00439 if (addr.is_proxy()) {
00440 if (!m_handler_map->translate_proxy_address(addr, connectable_addr))
00441 return Error::COMM_INVALID_PROXY;
00442 }
00443 else
00444 connectable_addr = addr;
00445
00446
00447 FileUtils::set_flags(sd, O_NONBLOCK);
00448
00449 #if defined(__linux__)
00450 if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
00451 HT_ERRORF("setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
00452 #elif defined(__sun__)
00453 if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
00454 HT_ERRORF("setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
00455 #elif defined(__APPLE__) || defined(__FreeBSD__)
00456 if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) < 0)
00457 HT_WARNF("setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
00458 #endif
00459
00460 handler = data_handler = new IOHandlerData(sd, connectable_addr.inet, default_handler);
00461 if (addr.is_proxy())
00462 handler->set_proxy(addr.proxy);
00463 if ((error = m_handler_map->insert_handler(data_handler)) != Error::OK)
00464 return error;
00465
00466 while (::connect(sd, (struct sockaddr *)&connectable_addr.inet, sizeof(struct sockaddr_in))
00467 < 0) {
00468 if (errno == EINTR) {
00469 poll(0, 0, 1000);
00470 continue;
00471 }
00472 else if (errno == EINPROGRESS) {
00473
00474 return data_handler->start_polling(Reactor::READ_READY|Reactor::WRITE_READY);
00475 }
00476 m_handler_map->remove_handler(connectable_addr, handler);
00477 HT_ERRORF("connecting to %s: %s", connectable_addr.to_str().c_str(),
00478 strerror(errno));
00479 return Error::COMM_CONNECT_ERROR;
00480 }
00481
00482 return data_handler->start_polling(Reactor::READ_READY|Reactor::WRITE_READY);
00483 }