Comm.cc

Go to the documentation of this file.
00001 
00022 //#define HT_DISABLE_LOG_DEBUG
00023 
00024 #include "Common/Compat.h"
00025 
00026 #include <cassert>
00027 #include <iostream>
00028 
00029 extern "C" {
00030 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00031 #include <arpa/inet.h>
00032 #include <netinet/ip.h>
00033 #endif
00034 #include <errno.h>
00035 #include <fcntl.h>
00036 #include <netdb.h>
00037 #include <netinet/in.h>
00038 #include <netinet/tcp.h>
00039 #include <poll.h>
00040 #include <sys/socket.h>
00041 #include <sys/types.h>
00042 #include <arpa/inet.h>
00043 }
00044 
00045 #include "Common/Config.h"
00046 #include "Common/Error.h"
00047 #include "Common/InetAddr.h"
00048 #include "Common/FileUtils.h"
00049 #include "Common/SystemInfo.h"
00050 #include "Common/Time.h"
00051 
00052 #include "ReactorFactory.h"
00053 #include "ReactorRunner.h"
00054 #include "Comm.h"
00055 #include "IOHandlerAccept.h"
00056 #include "IOHandlerData.h"
00057 
00058 using namespace Hypertable;
00059 using namespace std;
00060 
00061 atomic_t Comm::ms_next_request_id = ATOMIC_INIT(1);
00062 
00063 Comm *Comm::ms_instance = NULL;
00064 Mutex Comm::ms_mutex;
00065 
00066 Comm::Comm() {
00067   if (ReactorFactory::ms_reactors.size() == 0) {
00068     HT_ERROR("ReactorFactory::initialize must be called before creating "
00069              "AsyncComm::comm object");
00070     HT_ABORT;
00071   }
00072 
00073   ReactorFactory::get_reactor(m_timer_reactor);
00074   m_handler_map = ReactorRunner::handler_map;
00075 }
00076 
00077 
00078 Comm::~Comm() {
00079   set<IOHandler *> handlers;
00080   m_handler_map->decomission_all(handlers);
00081 
00082   foreach(IOHandler *handler, handlers)
00083     handler->shutdown();
00084 
00085   // wait for all decomissioned handlers to get purged by Reactor
00086   m_handler_map->wait_for_empty();
00087 
00088   // Since Comm is a singleton, this is OK
00089   ReactorFactory::destroy();
00090 }
00091 
00092 
00093 void Comm::destroy() {
00094   if (ms_instance) {
00095     delete ms_instance;
00096     ms_instance = 0;
00097   }
00098 }
00099 
00100 
00101 int
00102 Comm::connect(const CommAddress &addr, DispatchHandlerPtr &default_handler) {
00103   int sd;
00104   int error = m_handler_map->contains_data_handler(addr);
00105 
00106   if (error == Error::OK)
00107     return Error::COMM_ALREADY_CONNECTED;
00108   else if (error != Error::COMM_NOT_CONNECTED)
00109     return error;
00110 
00111   if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00112     HT_ERRORF("socket: %s", strerror(errno));
00113     return Error::COMM_SOCKET_ERROR;
00114   }
00115 
00116   return connect_socket(sd, addr, default_handler);
00117 }
00118 
00119 
00120 
00121 int
00122 Comm::connect(const CommAddress &addr, const CommAddress &local_addr,
00123               DispatchHandlerPtr &default_handler) {
00124   int sd;
00125   int error = m_handler_map->contains_data_handler(addr);
00126 
00127   HT_ASSERT(local_addr.is_inet());
00128 
00129   if (error == Error::OK)
00130     return Error::COMM_ALREADY_CONNECTED;
00131   else if (error != Error::COMM_NOT_CONNECTED)
00132     return error;
00133 
00134   if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00135     HT_ERRORF("socket: %s", strerror(errno));
00136     return Error::COMM_SOCKET_ERROR;
00137   }
00138 
00139   // bind socket to local address
00140   if ((bind(sd, (const sockaddr *)&local_addr.inet, sizeof(sockaddr_in))) < 0) {
00141     HT_ERRORF( "bind: %s: %s", local_addr.to_str().c_str(), strerror(errno));
00142     return Error::COMM_BIND_ERROR;
00143   }
00144 
00145   return connect_socket(sd, addr, default_handler);
00146 }
00147 
00148 
00149 int Comm::set_alias(const InetAddr &addr, const InetAddr &alias) {
00150   ScopedLock lock(ms_mutex);
00151   return m_handler_map->set_alias(addr, alias);
00152 }
00153 
00154 
00155 void Comm::listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf) {
00156   DispatchHandlerPtr null_handler(0);
00157   listen(addr, chf, null_handler);
00158 }
00159 
00160 
00161 int Comm::add_proxy(const String &proxy, const InetAddr &addr) {
00162   HT_ASSERT(ReactorFactory::proxy_master);
00163   return m_handler_map->add_proxy(proxy, addr);
00164 }
00165 
00166 void Comm::get_proxy_map(ProxyMapT &proxy_map) {
00167   m_handler_map->get_proxy_map(proxy_map);
00168 }
00169 
00170 bool Comm::wait_for_proxy_load(Timer &timer) {
00171   return m_handler_map->wait_for_proxy_load(timer);
00172 }
00173 
00174 
00175 void
00176 Comm::listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf,
00177              DispatchHandlerPtr &default_handler) {
00178   IOHandlerPtr handler;
00179   IOHandlerAccept *accept_handler;
00180   int one = 1;
00181   int sd;
00182 
00183   HT_ASSERT(addr.is_inet());
00184 
00185   if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
00186     HT_THROW(Error::COMM_SOCKET_ERROR, strerror(errno));
00187 
00188   // Set to non-blocking
00189   FileUtils::set_flags(sd, O_NONBLOCK);
00190 
00191 #if defined(__linux__)
00192   if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
00193     HT_ERRORF("setting TCP_NODELAY: %s", strerror(errno));
00194 #elif defined(__sun__)
00195   if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)) < 0)
00196     HT_ERRORF("setting TCP_NODELAY: %s", strerror(errno));
00197 #elif defined(__APPLE__) || defined(__FreeBSD__)
00198   if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) < 0)
00199     HT_WARNF("setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
00200 #endif
00201 
00202   if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
00203     HT_ERRORF("setting SO_REUSEADDR: %s", strerror(errno));
00204 
00205   if ((bind(sd, (const sockaddr *)&addr.inet, sizeof(sockaddr_in))) < 0)
00206     HT_THROWF(Error::COMM_BIND_ERROR, "binding to %s: %s",
00207               addr.to_str().c_str(), strerror(errno));
00208 
00209   if (::listen(sd, 1000) < 0)
00210     HT_THROWF(Error::COMM_LISTEN_ERROR, "listening: %s", strerror(errno));
00211 
00212   handler = accept_handler = new IOHandlerAccept(sd, addr.inet, default_handler,
00213                                                  m_handler_map, chf);
00214   int32_t error = m_handler_map->insert_handler(accept_handler);
00215   if (error != Error::OK)
00216     HT_THROWF(error, "Error inserting accept handler for %s into handler map",
00217               addr.to_str().c_str());
00218   accept_handler->start_polling();
00219 }
00220 
00221 
00222 
00223 int
00224 Comm::send_request(const CommAddress &addr, uint32_t timeout_ms,
00225                    CommBufPtr &cbuf, DispatchHandler *resp_handler) {
00226   ScopedLock lock(ms_mutex);
00227   IOHandlerDataPtr data_handler;
00228   int error;
00229 
00230   if ((error = m_handler_map->lookup_data_handler(addr, data_handler)) != Error::OK) {
00231     HT_WARNF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
00232     return error;
00233   }
00234 
00235   return send_request(data_handler, timeout_ms, cbuf, resp_handler);
00236 }
00237 
00238 
00239 
00240 int Comm::send_request(IOHandlerDataPtr &data_handler, uint32_t timeout_ms,
00241                        CommBufPtr &cbuf, DispatchHandler *resp_handler) {
00242   int error;
00243 
00244   cbuf->header.flags |= CommHeader::FLAGS_BIT_REQUEST;
00245   if (resp_handler == 0) {
00246     cbuf->header.flags |= CommHeader::FLAGS_BIT_IGNORE_RESPONSE;
00247     cbuf->header.id = 0;
00248   }
00249   else {
00250     cbuf->header.id = atomic_inc_return(&ms_next_request_id);
00251     if (cbuf->header.id == 0)
00252       cbuf->header.id = atomic_inc_return(&ms_next_request_id);
00253   }
00254 
00255   cbuf->header.timeout_ms = timeout_ms;
00256   cbuf->write_header_and_reset();
00257 
00258   if ((error = data_handler->send_message(cbuf, timeout_ms, resp_handler))
00259       != Error::OK)
00260     data_handler->shutdown();
00261 
00262   return error;
00263 }
00264 
00265 
00266 
00267 int Comm::send_response(const CommAddress &addr, CommBufPtr &cbuf) {
00268   ScopedLock lock(ms_mutex);
00269   IOHandlerDataPtr data_handler;
00270   int error;
00271 
00272   if ((error = m_handler_map->lookup_data_handler(addr, data_handler)) != Error::OK) {
00273     HT_ERRORF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
00274     return error;
00275   }
00276 
00277   cbuf->header.flags &= CommHeader::FLAGS_MASK_REQUEST;
00278 
00279   cbuf->write_header_and_reset();
00280 
00281   if ((error = data_handler->send_message(cbuf)) != Error::OK)
00282     data_handler->shutdown();
00283 
00284   return error;
00285 }
00286 
00287 
00288 void
00289 Comm::create_datagram_receive_socket(CommAddress &addr, int tos,
00290                                      DispatchHandlerPtr &dhp) {
00291   IOHandlerPtr handler;
00292   IOHandlerDatagram *dg_handler;
00293   int one = 1;
00294   int sd;
00295 
00296   HT_ASSERT(addr.is_inet());
00297 
00298   if ((sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
00299     HT_THROWF(Error::COMM_SOCKET_ERROR, "%s", strerror(errno));
00300 
00301   // Set to non-blocking
00302   FileUtils::set_flags(sd, O_NONBLOCK);
00303 
00304   int bufsize = 4*32768;
00305 
00306   if (setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&bufsize, sizeof(bufsize))
00307       < 0) {
00308     HT_ERRORF("setsockopt(SO_SNDBUF) failed - %s", strerror(errno));
00309   }
00310   if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&bufsize, sizeof(bufsize))
00311       < 0) {
00312     HT_ERRORF("setsockopt(SO_RCVBUF) failed - %s", strerror(errno));
00313   }
00314 
00315   if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
00316     HT_WARNF("setsockopt(SO_REUSEADDR) failure: %s", strerror(errno));
00317   }
00318 
00319   if (tos) {
00320     int opt;
00321 #if defined(__linux__)
00322     opt = tos;
00323     setsockopt(sd, SOL_IP, IP_TOS, &opt, sizeof(opt));
00324     opt = tos;
00325     setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &opt, sizeof(opt));
00326 #elif defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00327     opt = IPTOS_LOWDELAY;       /* see <netinet/in.h> */
00328     setsockopt(sd, IPPROTO_IP, IP_TOS, &opt, sizeof(opt));
00329 #endif
00330   }
00331 
00332   // bind socket
00333   if ((bind(sd, (const sockaddr *)&addr.inet, sizeof(sockaddr_in))) < 0)
00334     HT_THROWF(Error::COMM_BIND_ERROR, "binding to %s: %s",
00335               addr.to_str().c_str(), strerror(errno));
00336 
00337   handler = dg_handler = new IOHandlerDatagram(sd, addr.inet, dhp);
00338 
00339   addr.set_inet( handler->get_local_address() );
00340 
00341   int32_t error = m_handler_map->insert_datagram_handler(dg_handler);
00342   if (error != Error::OK)
00343     HT_THROWF(error, "Error inserting datagram handler for %s into handler map",
00344               addr.to_str().c_str());
00345   dg_handler->start_polling();
00346 }
00347 
00348 
00349 int
00350 Comm::send_datagram(const CommAddress &addr, const CommAddress &send_addr,
00351                     CommBufPtr &cbuf) {
00352   ScopedLock lock(ms_mutex);
00353   IOHandlerDatagramPtr dg_handler;
00354   int error;
00355 
00356   HT_ASSERT(addr.is_inet());
00357 
00358   if ((error = m_handler_map->lookup_datagram_handler(send_addr, dg_handler)) != Error::OK) {
00359     HT_ERRORF("Datagram send/local address %s not registered",
00360               send_addr.to_str().c_str());
00361     return error;
00362   }
00363 
00364   cbuf->header.flags |= (CommHeader::FLAGS_BIT_REQUEST |
00365                          CommHeader::FLAGS_BIT_IGNORE_RESPONSE);
00366 
00367   cbuf->write_header_and_reset();
00368 
00369   if ((error = dg_handler->send_message(addr.inet, cbuf)) != Error::OK)
00370     dg_handler->shutdown();
00371 
00372   return error;
00373 }
00374 
00375 
00376 int Comm::set_timer(uint32_t duration_millis, DispatchHandler *handler) {
00377   ExpireTimer timer;
00378   boost::xtime_get(&timer.expire_time, boost::TIME_UTC);
00379   xtime_add_millis(timer.expire_time, duration_millis);
00380   timer.handler = handler;
00381   m_timer_reactor->add_timer(timer);
00382   return Error::OK;
00383 }
00384 
00385 
00386 int
00387 Comm::set_timer_absolute(boost::xtime expire_time, DispatchHandler *handler) {
00388   ExpireTimer timer;
00389   memcpy(&timer.expire_time, &expire_time, sizeof(boost::xtime));
00390   timer.handler = handler;
00391   m_timer_reactor->add_timer(timer);
00392   return Error::OK;
00393 }
00394 
00395 
00396 int
00397 Comm::get_local_address(const CommAddress &addr,
00398                         CommAddress &local_addr) {
00399   ScopedLock lock(ms_mutex);
00400   IOHandlerDataPtr data_handler;
00401   int error;
00402 
00403   if ((error = m_handler_map->lookup_data_handler(addr, data_handler)) != Error::OK) {
00404     HT_ERRORF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
00405     return error;
00406   }
00407 
00408   local_addr.set_inet( data_handler->get_local_address() );
00409 
00410   return Error::OK;
00411 }
00412 
00413 
00414 int Comm::close_socket(const CommAddress &addr) {
00415   IOHandlerPtr handler;
00416 
00417   if (!m_handler_map->decomission_handler(addr, handler))
00418     return Error::COMM_NOT_CONNECTED;
00419 
00420   handler->shutdown();
00421 
00422   return Error::OK;
00423 }
00424 
00425 
00430 int
00431 Comm::connect_socket(int sd, const CommAddress &addr,
00432                      DispatchHandlerPtr &default_handler) {
00433   IOHandlerPtr handler;
00434   IOHandlerData *data_handler;
00435   int32_t error;
00436   int one = 1;
00437   CommAddress connectable_addr;
00438 
00439   if (addr.is_proxy()) {
00440     if (!m_handler_map->translate_proxy_address(addr, connectable_addr))
00441       return Error::COMM_INVALID_PROXY;
00442   }
00443   else
00444     connectable_addr = addr;
00445 
00446   // Set to non-blocking
00447   FileUtils::set_flags(sd, O_NONBLOCK);
00448 
00449 #if defined(__linux__)
00450   if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
00451     HT_ERRORF("setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
00452 #elif defined(__sun__)
00453   if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0)
00454     HT_ERRORF("setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
00455 #elif defined(__APPLE__) || defined(__FreeBSD__)
00456   if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) < 0)
00457     HT_WARNF("setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
00458 #endif
00459 
00460   handler = data_handler = new IOHandlerData(sd, connectable_addr.inet, default_handler);
00461   if (addr.is_proxy())
00462     handler->set_proxy(addr.proxy);
00463   if ((error = m_handler_map->insert_handler(data_handler)) != Error::OK)
00464     return error;
00465 
00466   while (::connect(sd, (struct sockaddr *)&connectable_addr.inet, sizeof(struct sockaddr_in))
00467           < 0) {
00468     if (errno == EINTR) {
00469       poll(0, 0, 1000);
00470       continue;
00471     }
00472     else if (errno == EINPROGRESS) {
00473       //HT_INFO("connect() in progress starting to poll");
00474       return data_handler->start_polling(Reactor::READ_READY|Reactor::WRITE_READY);
00475     }
00476     m_handler_map->remove_handler(connectable_addr, handler);
00477     HT_ERRORF("connecting to %s: %s", connectable_addr.to_str().c_str(),
00478               strerror(errno));
00479     return Error::COMM_CONNECT_ERROR;
00480   }
00481 
00482   return data_handler->start_polling(Reactor::READ_READY|Reactor::WRITE_READY);
00483 }