Reactor.cc

Go to the documentation of this file.
00001 
00022 #include "Common/Compat.h"
00023 
00024 #include <cassert>
00025 #include <cstdio>
00026 #include <iostream>
00027 #include <set>
00028 
00029 extern "C" {
00030 #include <arpa/inet.h>
00031 #include <errno.h>
00032 #include <netinet/in.h>
00033 #include <stdio.h>
00034 #include <stdlib.h>
00035 #include <sys/socket.h>
00036 #include <sys/time.h>
00037 #include <sys/types.h>
00038 #if defined(__APPLE__) || defined(__FreeBSD__)
00039 #include <sys/event.h>
00040 #endif
00041 }
00042 
00043 #include "Common/Error.h"
00044 #include "Common/FileUtils.h"
00045 #include "Common/Logger.h"
00046 
00047 #include "IOHandlerData.h"
00048 #include "Reactor.h"
00049 #include "ReactorFactory.h"
00050 
00051 using namespace Hypertable;
00052 using namespace std;
00053 
00054 const int Reactor::READ_READY   = 0x01;
00055 const int Reactor::WRITE_READY  = 0x02;
00056 
00057 
00061 Reactor::Reactor() : m_mutex(), m_interrupt_in_progress(false) {
00062   struct sockaddr_in addr;
00063 
00064   if (!ReactorFactory::use_poll) {
00065 #if defined(__linux__)
00066     if ((poll_fd = epoll_create(256)) < 0) {
00067       perror("epoll_create");
00068       exit(1);
00069     }
00070 #elif defined(__sun__)
00071     if ((poll_fd = port_create()) < 0) {
00072       perror("creation of event port failed");
00073       exit(1);
00074     }
00075 #elif defined(__APPLE__) || defined(__FreeBSD__)
00076     kqd = kqueue();
00077 #endif
00078   }
00079 
00085   if ((m_interrupt_sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
00086     HT_ERRORF("socket() failure: %s", strerror(errno));
00087     exit(1);
00088   }
00089 
00090   // Set to non-blocking (are we sure we should do this?)
00091   FileUtils::set_flags(m_interrupt_sd, O_NONBLOCK);
00092 
00093   // create address structure to bind to - any available port - any address
00094   memset(&addr, 0 , sizeof(sockaddr_in));
00095   addr.sin_family = AF_INET;
00096   addr.sin_addr.s_addr = inet_addr("127.0.0.1");
00097   addr.sin_port = 0;
00098 
00099   // bind socket
00100   if ((bind(m_interrupt_sd, (sockaddr *)&addr, sizeof(sockaddr_in))) < 0) {
00101     HT_ERRORF("bind() failure: %s", strerror(errno));
00102     exit(1);
00103   }
00104 
00105   // get the assigned address
00106   socklen_t namelen = sizeof(addr);
00107   getsockname(m_interrupt_sd, (sockaddr *)&addr, &namelen);
00108 
00109   // connect to ourself
00110   if (connect(m_interrupt_sd, (sockaddr *)&addr, sizeof(addr)) < 0) {
00111     HT_ERRORF("connect(interrupt_sd) failed - %s", strerror(errno));
00112     exit(1);
00113   }
00114 
00115   if (ReactorFactory::use_poll) {
00116     ScopedLock lock(m_poll_array_mutex);
00117     if ((size_t)m_interrupt_sd >= polldata.size()) {
00118       size_t i = polldata.size();
00119       polldata.resize(m_interrupt_sd+1);
00120       for (; i<polldata.size(); i++) {
00121         polldata[i].pollfd.fd = -1;
00122         polldata[i].pollfd.events = 0;
00123         polldata[i].pollfd.revents = 0;
00124         polldata[i].handler = 0;
00125       }
00126     }
00127     polldata[m_interrupt_sd].pollfd.fd = m_interrupt_sd;
00128     polldata[m_interrupt_sd].pollfd.events = POLLIN;
00129     poll_loop_interrupt();
00130   }
00131   else {
00132 #if defined(__linux__)
00133     if (ReactorFactory::ms_epollet) {
00134 
00135       struct epoll_event event;
00136       memset(&event, 0, sizeof(struct epoll_event));
00137       event.events = EPOLLIN | EPOLLOUT | POLLRDHUP | EPOLLET;
00138       if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
00139         HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, EPOLLIN|EPOLLOUT|POLLRDHUP|"
00140                   "EPOLLET) failed : %s", poll_fd, m_interrupt_sd,
00141                   strerror(errno));
00142         exit(1);
00143       }
00144     }
00145     else {
00146       struct epoll_event event;
00147       memset(&event, 0, sizeof(struct epoll_event));
00148       if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
00149         HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, 0) failed : %s",
00150                   poll_fd, m_interrupt_sd, strerror(errno));
00151         exit(1);
00152       }
00153     }
00154 #endif
00155   }
00156 
00157   memset(&m_next_wakeup, 0, sizeof(m_next_wakeup));
00158 }
00159 
00160 
00161 void Reactor::handle_timeouts(PollTimeout &next_timeout) {
00162   vector<ExpireTimer> expired_timers;
00163   EventPtr event_ptr;
00164   boost::xtime     now, next_req_timeout;
00165   ExpireTimer timer;
00166 
00167   while(true) {
00168     {
00169       ScopedLock lock(m_mutex);
00170       IOHandler       *handler;
00171       DispatchHandler *dh;
00172 
00173       boost::xtime_get(&now, boost::TIME_UTC);
00174 
00175       while ((dh = m_request_cache.get_next_timeout(now, handler,
00176                                                     &next_req_timeout)) != 0) {
00177         handler->deliver_event(new Event(Event::ERROR, ((IOHandlerData *)
00178             handler)->get_address(), Error::REQUEST_TIMEOUT), dh);
00179       }
00180 
00181       if (next_req_timeout.sec != 0) {
00182         next_timeout.set(now, next_req_timeout);
00183         memcpy(&m_next_wakeup, &next_req_timeout, sizeof(m_next_wakeup));
00184       }
00185       else {
00186         next_timeout.set_indefinite();
00187         memset(&m_next_wakeup, 0, sizeof(m_next_wakeup));
00188       }
00189 
00190       if (!m_timer_heap.empty()) {
00191         ExpireTimer timer;
00192 
00193         while (!m_timer_heap.empty()) {
00194           timer = m_timer_heap.top();
00195           if (xtime_cmp(timer.expire_time, now) > 0) {
00196             if (next_req_timeout.sec == 0
00197                 || xtime_cmp(timer.expire_time, next_req_timeout) < 0) {
00198               next_timeout.set(now, timer.expire_time);
00199               memcpy(&m_next_wakeup, &timer.expire_time, sizeof(m_next_wakeup));
00200             }
00201             break;
00202           }
00203           expired_timers.push_back(timer);
00204           m_timer_heap.pop();
00205         }
00206 
00207       }
00208     }
00209 
00213     for (size_t i=0; i<expired_timers.size(); i++) {
00214       event_ptr = new Event(Event::TIMER, Error::OK);
00215       if (expired_timers[i].handler)
00216         expired_timers[i].handler->handle(event_ptr);
00217     }
00218 
00219     {
00220       ScopedLock lock(m_mutex);
00221 
00222       if (!m_timer_heap.empty()) {
00223         timer = m_timer_heap.top();
00224 
00225         if (xtime_cmp(now, timer.expire_time) > 0)
00226           continue;
00227 
00228         if (next_req_timeout.sec == 0
00229             || xtime_cmp(timer.expire_time, next_req_timeout) < 0) {
00230           next_timeout.set(now, timer.expire_time);
00231           memcpy(&m_next_wakeup, &timer.expire_time, sizeof(m_next_wakeup));
00232         }
00233       }
00234 
00235       poll_loop_continue();
00236     }
00237 
00238     break;
00239   }
00240 
00241 }
00242 
00243 
00244 
00248 int Reactor::poll_loop_interrupt() {
00249 
00250   m_interrupt_in_progress = true;
00251 
00252   if (ReactorFactory::use_poll) {
00253     ssize_t n;
00254 
00255     // Send 1 byte to ourselves to cause epoll_wait to return
00256     if ((n = FileUtils::send(m_interrupt_sd, "1", 1)) < 0) {
00257       HT_ERRORF("send(interrupt_sd) failed - %s", strerror(errno));
00258       return Error::COMM_SEND_ERROR;
00259     }
00260     return Error::OK;
00261   }
00262 
00263 #if defined(__linux__)
00264 
00265   if (ReactorFactory::ms_epollet) {
00266 
00267     char buf[8];
00268     ssize_t n;
00269 
00274     if ((n = FileUtils::send(m_interrupt_sd, "1", 1)) < 0) {
00275       HT_ERRORF("send(interrupt_sd) failed - %s", strerror(errno));
00276       return Error::COMM_SEND_ERROR;
00277     }
00278 
00279     if ((n = FileUtils::recv(m_interrupt_sd, buf, 8)) == -1) {
00280       HT_ERRORF("recv(interrupt_sd) failed - %s", strerror(errno));
00281       return Error::COMM_RECEIVE_ERROR;
00282     }
00283   }
00284   else {
00285 
00286     struct epoll_event event;
00287     memset(&event, 0, sizeof(struct epoll_event));
00288     event.events = EPOLLOUT;
00289     if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
00295       return Error::COMM_POLL_ERROR;
00296     }
00297   }
00298 
00299 #elif defined(__sun__)
00300 
00301   if (port_alert(poll_fd, PORT_ALERT_SET, 1, NULL) < 0) {
00302     HT_ERRORF("port_alert(%d, PORT_ALERT_SET, 1, 0) failed - %s",
00303               poll_fd, strerror(errno));
00304     return Error::COMM_POLL_ERROR;
00305   }
00306 
00307 #elif defined(__APPLE__) || defined(__FreeBSD__)
00308   struct kevent event;
00309 
00310   EV_SET(&event, m_interrupt_sd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, 0);
00311 
00312   if (kevent(kqd, &event, 1, 0, 0, 0) == -1) {
00313     HT_ERRORF("kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
00314     return Error::COMM_POLL_ERROR;
00315   }
00316 #endif
00317   return Error::OK;
00318 }
00319 
00320 
00321 
00325 int Reactor::poll_loop_continue() {
00326 
00327   if (!m_interrupt_in_progress || ReactorFactory::use_poll) {
00328     m_interrupt_in_progress = false;
00329     return Error::OK;
00330   }
00331 
00332 #if defined(__linux__)
00333 
00334   if (!ReactorFactory::ms_epollet) {
00335     struct epoll_event event;
00336 
00337     memset(&event, 0, sizeof(struct epoll_event));
00338     event.events = EPOLLERR | EPOLLHUP;
00339 
00340     if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
00341       HT_ERRORF("epoll_ctl(EPOLL_CTL_MOD, sd=%d) : %s", m_interrupt_sd,
00342                 strerror(errno));
00343       return Error::COMM_POLL_ERROR;
00344     }
00345   }
00346 
00347 #elif defined(__sun__)
00348 
00349   if (port_alert(poll_fd, PORT_ALERT_SET, 0, NULL) < 0) {
00350     HT_ERRORF("port_alert(%d, PORT_ALERT_SET, 0, 0) failed - %s",
00351               poll_fd, strerror(errno));
00352     return Error::COMM_POLL_ERROR;
00353   }
00354 
00355 #elif defined(__APPLE__) || defined(__FreeBSD__)
00356   struct kevent devent;
00357 
00358   EV_SET(&devent, m_interrupt_sd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00359 
00360   if (kevent(kqd, &devent, 1, 0, 0, 0) == -1 && errno != ENOENT) {
00361     HT_ERRORF("kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
00362     return Error::COMM_POLL_ERROR;
00363   }
00364 #else
00365   ImplementMe();
00366 #endif
00367   m_interrupt_in_progress = false;
00368   return Error::OK;
00369 }
00370 
00371 
00372 int Reactor::add_poll_interest(int sd, short events, IOHandler *handler) {
00373   ScopedLock lock(m_poll_array_mutex);
00374 
00375   if (polldata.size() <= (size_t)sd) {
00376     size_t i = polldata.size();
00377     polldata.resize(sd+1);
00378     for (; i<polldata.size(); i++) {
00379       memset(&polldata[i], 0, sizeof(PollDescriptorT));
00380       polldata[i].pollfd.fd = -1;
00381     }
00382   }
00383 
00384   polldata[sd].pollfd.fd = sd;
00385   polldata[sd].pollfd.events = events;
00386   polldata[sd].handler = handler;
00387   return poll_loop_interrupt();
00388 }
00389 
00390 int Reactor::remove_poll_interest(int sd) {
00391   ScopedLock lock(m_poll_array_mutex);
00392 
00393   HT_ASSERT(polldata.size() > (size_t)sd);
00394 
00395   if ((size_t)sd == polldata.size()-1) {
00396     int last_entry = sd;
00397     do {
00398       last_entry--;
00399     } while (last_entry > 0 && polldata[last_entry].pollfd.fd == -1);
00400     polldata.resize(last_entry+1);
00401   }
00402   else {
00403     polldata[sd].pollfd.fd = -1;
00404     polldata[sd].handler = 0;
00405   }
00406   return poll_loop_interrupt();
00407 }
00408 
00409 int Reactor::modify_poll_interest(int sd, short events) {
00410   ScopedLock lock(m_poll_array_mutex);
00411   HT_ASSERT(polldata.size() > (size_t)sd);
00412   polldata[sd].pollfd.events = events;
00413   return poll_loop_interrupt();
00414 }
00415 
00416 
00417 void Reactor::fetch_poll_array(std::vector<struct pollfd> &fdarray,
00418                                std::vector<IOHandler *> &handlers) {
00419   ScopedLock lock(m_poll_array_mutex);
00420 
00421   fdarray.clear();
00422   handlers.clear();
00423 
00424   for (size_t i=0; i<polldata.size(); i++) {
00425     if (polldata[i].pollfd.fd != -1 && polldata[i].pollfd.events) {
00426       fdarray.push_back(polldata[i].pollfd);
00427       handlers.push_back(polldata[i].handler);
00428     }
00429   }
00430 }