00001
00022 #include "Common/Compat.h"
00023
00024 #include <cassert>
00025 #include <cstdio>
00026 #include <iostream>
00027 #include <set>
00028
00029 extern "C" {
00030 #include <arpa/inet.h>
00031 #include <errno.h>
00032 #include <netinet/in.h>
00033 #include <stdio.h>
00034 #include <stdlib.h>
00035 #include <sys/socket.h>
00036 #include <sys/time.h>
00037 #include <sys/types.h>
00038 #if defined(__APPLE__) || defined(__FreeBSD__)
00039 #include <sys/event.h>
00040 #endif
00041 }
00042
00043 #include "Common/Error.h"
00044 #include "Common/FileUtils.h"
00045 #include "Common/Logger.h"
00046
00047 #include "IOHandlerData.h"
00048 #include "Reactor.h"
00049 #include "ReactorFactory.h"
00050
00051 using namespace Hypertable;
00052 using namespace std;
00053
00054 const int Reactor::READ_READY = 0x01;
00055 const int Reactor::WRITE_READY = 0x02;
00056
00057
00061 Reactor::Reactor() : m_mutex(), m_interrupt_in_progress(false) {
00062 struct sockaddr_in addr;
00063
00064 if (!ReactorFactory::use_poll) {
00065 #if defined(__linux__)
00066 if ((poll_fd = epoll_create(256)) < 0) {
00067 perror("epoll_create");
00068 exit(1);
00069 }
00070 #elif defined(__sun__)
00071 if ((poll_fd = port_create()) < 0) {
00072 perror("creation of event port failed");
00073 exit(1);
00074 }
00075 #elif defined(__APPLE__) || defined(__FreeBSD__)
00076 kqd = kqueue();
00077 #endif
00078 }
00079
00085 if ((m_interrupt_sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
00086 HT_ERRORF("socket() failure: %s", strerror(errno));
00087 exit(1);
00088 }
00089
00090
00091 FileUtils::set_flags(m_interrupt_sd, O_NONBLOCK);
00092
00093
00094 memset(&addr, 0 , sizeof(sockaddr_in));
00095 addr.sin_family = AF_INET;
00096 addr.sin_addr.s_addr = inet_addr("127.0.0.1");
00097 addr.sin_port = 0;
00098
00099
00100 if ((bind(m_interrupt_sd, (sockaddr *)&addr, sizeof(sockaddr_in))) < 0) {
00101 HT_ERRORF("bind() failure: %s", strerror(errno));
00102 exit(1);
00103 }
00104
00105
00106 socklen_t namelen = sizeof(addr);
00107 getsockname(m_interrupt_sd, (sockaddr *)&addr, &namelen);
00108
00109
00110 if (connect(m_interrupt_sd, (sockaddr *)&addr, sizeof(addr)) < 0) {
00111 HT_ERRORF("connect(interrupt_sd) failed - %s", strerror(errno));
00112 exit(1);
00113 }
00114
00115 if (ReactorFactory::use_poll) {
00116 ScopedLock lock(m_poll_array_mutex);
00117 if ((size_t)m_interrupt_sd >= polldata.size()) {
00118 size_t i = polldata.size();
00119 polldata.resize(m_interrupt_sd+1);
00120 for (; i<polldata.size(); i++) {
00121 polldata[i].pollfd.fd = -1;
00122 polldata[i].pollfd.events = 0;
00123 polldata[i].pollfd.revents = 0;
00124 polldata[i].handler = 0;
00125 }
00126 }
00127 polldata[m_interrupt_sd].pollfd.fd = m_interrupt_sd;
00128 polldata[m_interrupt_sd].pollfd.events = POLLIN;
00129 poll_loop_interrupt();
00130 }
00131 else {
00132 #if defined(__linux__)
00133 if (ReactorFactory::ms_epollet) {
00134
00135 struct epoll_event event;
00136 memset(&event, 0, sizeof(struct epoll_event));
00137 event.events = EPOLLIN | EPOLLOUT | POLLRDHUP | EPOLLET;
00138 if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
00139 HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, EPOLLIN|EPOLLOUT|POLLRDHUP|"
00140 "EPOLLET) failed : %s", poll_fd, m_interrupt_sd,
00141 strerror(errno));
00142 exit(1);
00143 }
00144 }
00145 else {
00146 struct epoll_event event;
00147 memset(&event, 0, sizeof(struct epoll_event));
00148 if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
00149 HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, 0) failed : %s",
00150 poll_fd, m_interrupt_sd, strerror(errno));
00151 exit(1);
00152 }
00153 }
00154 #endif
00155 }
00156
00157 memset(&m_next_wakeup, 0, sizeof(m_next_wakeup));
00158 }
00159
00160
00161 void Reactor::handle_timeouts(PollTimeout &next_timeout) {
00162 vector<ExpireTimer> expired_timers;
00163 EventPtr event_ptr;
00164 boost::xtime now, next_req_timeout;
00165 ExpireTimer timer;
00166
00167 while(true) {
00168 {
00169 ScopedLock lock(m_mutex);
00170 IOHandler *handler;
00171 DispatchHandler *dh;
00172
00173 boost::xtime_get(&now, boost::TIME_UTC);
00174
00175 while ((dh = m_request_cache.get_next_timeout(now, handler,
00176 &next_req_timeout)) != 0) {
00177 handler->deliver_event(new Event(Event::ERROR, ((IOHandlerData *)
00178 handler)->get_address(), Error::REQUEST_TIMEOUT), dh);
00179 }
00180
00181 if (next_req_timeout.sec != 0) {
00182 next_timeout.set(now, next_req_timeout);
00183 memcpy(&m_next_wakeup, &next_req_timeout, sizeof(m_next_wakeup));
00184 }
00185 else {
00186 next_timeout.set_indefinite();
00187 memset(&m_next_wakeup, 0, sizeof(m_next_wakeup));
00188 }
00189
00190 if (!m_timer_heap.empty()) {
00191 ExpireTimer timer;
00192
00193 while (!m_timer_heap.empty()) {
00194 timer = m_timer_heap.top();
00195 if (xtime_cmp(timer.expire_time, now) > 0) {
00196 if (next_req_timeout.sec == 0
00197 || xtime_cmp(timer.expire_time, next_req_timeout) < 0) {
00198 next_timeout.set(now, timer.expire_time);
00199 memcpy(&m_next_wakeup, &timer.expire_time, sizeof(m_next_wakeup));
00200 }
00201 break;
00202 }
00203 expired_timers.push_back(timer);
00204 m_timer_heap.pop();
00205 }
00206
00207 }
00208 }
00209
00213 for (size_t i=0; i<expired_timers.size(); i++) {
00214 event_ptr = new Event(Event::TIMER, Error::OK);
00215 if (expired_timers[i].handler)
00216 expired_timers[i].handler->handle(event_ptr);
00217 }
00218
00219 {
00220 ScopedLock lock(m_mutex);
00221
00222 if (!m_timer_heap.empty()) {
00223 timer = m_timer_heap.top();
00224
00225 if (xtime_cmp(now, timer.expire_time) > 0)
00226 continue;
00227
00228 if (next_req_timeout.sec == 0
00229 || xtime_cmp(timer.expire_time, next_req_timeout) < 0) {
00230 next_timeout.set(now, timer.expire_time);
00231 memcpy(&m_next_wakeup, &timer.expire_time, sizeof(m_next_wakeup));
00232 }
00233 }
00234
00235 poll_loop_continue();
00236 }
00237
00238 break;
00239 }
00240
00241 }
00242
00243
00244
00248 int Reactor::poll_loop_interrupt() {
00249
00250 m_interrupt_in_progress = true;
00251
00252 if (ReactorFactory::use_poll) {
00253 ssize_t n;
00254
00255
00256 if ((n = FileUtils::send(m_interrupt_sd, "1", 1)) < 0) {
00257 HT_ERRORF("send(interrupt_sd) failed - %s", strerror(errno));
00258 return Error::COMM_SEND_ERROR;
00259 }
00260 return Error::OK;
00261 }
00262
00263 #if defined(__linux__)
00264
00265 if (ReactorFactory::ms_epollet) {
00266
00267 char buf[8];
00268 ssize_t n;
00269
00274 if ((n = FileUtils::send(m_interrupt_sd, "1", 1)) < 0) {
00275 HT_ERRORF("send(interrupt_sd) failed - %s", strerror(errno));
00276 return Error::COMM_SEND_ERROR;
00277 }
00278
00279 if ((n = FileUtils::recv(m_interrupt_sd, buf, 8)) == -1) {
00280 HT_ERRORF("recv(interrupt_sd) failed - %s", strerror(errno));
00281 return Error::COMM_RECEIVE_ERROR;
00282 }
00283 }
00284 else {
00285
00286 struct epoll_event event;
00287 memset(&event, 0, sizeof(struct epoll_event));
00288 event.events = EPOLLOUT;
00289 if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
00295 return Error::COMM_POLL_ERROR;
00296 }
00297 }
00298
00299 #elif defined(__sun__)
00300
00301 if (port_alert(poll_fd, PORT_ALERT_SET, 1, NULL) < 0) {
00302 HT_ERRORF("port_alert(%d, PORT_ALERT_SET, 1, 0) failed - %s",
00303 poll_fd, strerror(errno));
00304 return Error::COMM_POLL_ERROR;
00305 }
00306
00307 #elif defined(__APPLE__) || defined(__FreeBSD__)
00308 struct kevent event;
00309
00310 EV_SET(&event, m_interrupt_sd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, 0);
00311
00312 if (kevent(kqd, &event, 1, 0, 0, 0) == -1) {
00313 HT_ERRORF("kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
00314 return Error::COMM_POLL_ERROR;
00315 }
00316 #endif
00317 return Error::OK;
00318 }
00319
00320
00321
00325 int Reactor::poll_loop_continue() {
00326
00327 if (!m_interrupt_in_progress || ReactorFactory::use_poll) {
00328 m_interrupt_in_progress = false;
00329 return Error::OK;
00330 }
00331
00332 #if defined(__linux__)
00333
00334 if (!ReactorFactory::ms_epollet) {
00335 struct epoll_event event;
00336
00337 memset(&event, 0, sizeof(struct epoll_event));
00338 event.events = EPOLLERR | EPOLLHUP;
00339
00340 if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
00341 HT_ERRORF("epoll_ctl(EPOLL_CTL_MOD, sd=%d) : %s", m_interrupt_sd,
00342 strerror(errno));
00343 return Error::COMM_POLL_ERROR;
00344 }
00345 }
00346
00347 #elif defined(__sun__)
00348
00349 if (port_alert(poll_fd, PORT_ALERT_SET, 0, NULL) < 0) {
00350 HT_ERRORF("port_alert(%d, PORT_ALERT_SET, 0, 0) failed - %s",
00351 poll_fd, strerror(errno));
00352 return Error::COMM_POLL_ERROR;
00353 }
00354
00355 #elif defined(__APPLE__) || defined(__FreeBSD__)
00356 struct kevent devent;
00357
00358 EV_SET(&devent, m_interrupt_sd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00359
00360 if (kevent(kqd, &devent, 1, 0, 0, 0) == -1 && errno != ENOENT) {
00361 HT_ERRORF("kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
00362 return Error::COMM_POLL_ERROR;
00363 }
00364 #else
00365 ImplementMe();
00366 #endif
00367 m_interrupt_in_progress = false;
00368 return Error::OK;
00369 }
00370
00371
00372 int Reactor::add_poll_interest(int sd, short events, IOHandler *handler) {
00373 ScopedLock lock(m_poll_array_mutex);
00374
00375 if (polldata.size() <= (size_t)sd) {
00376 size_t i = polldata.size();
00377 polldata.resize(sd+1);
00378 for (; i<polldata.size(); i++) {
00379 memset(&polldata[i], 0, sizeof(PollDescriptorT));
00380 polldata[i].pollfd.fd = -1;
00381 }
00382 }
00383
00384 polldata[sd].pollfd.fd = sd;
00385 polldata[sd].pollfd.events = events;
00386 polldata[sd].handler = handler;
00387 return poll_loop_interrupt();
00388 }
00389
00390 int Reactor::remove_poll_interest(int sd) {
00391 ScopedLock lock(m_poll_array_mutex);
00392
00393 HT_ASSERT(polldata.size() > (size_t)sd);
00394
00395 if ((size_t)sd == polldata.size()-1) {
00396 int last_entry = sd;
00397 do {
00398 last_entry--;
00399 } while (last_entry > 0 && polldata[last_entry].pollfd.fd == -1);
00400 polldata.resize(last_entry+1);
00401 }
00402 else {
00403 polldata[sd].pollfd.fd = -1;
00404 polldata[sd].handler = 0;
00405 }
00406 return poll_loop_interrupt();
00407 }
00408
00409 int Reactor::modify_poll_interest(int sd, short events) {
00410 ScopedLock lock(m_poll_array_mutex);
00411 HT_ASSERT(polldata.size() > (size_t)sd);
00412 polldata[sd].pollfd.events = events;
00413 return poll_loop_interrupt();
00414 }
00415
00416
00417 void Reactor::fetch_poll_array(std::vector<struct pollfd> &fdarray,
00418 std::vector<IOHandler *> &handlers) {
00419 ScopedLock lock(m_poll_array_mutex);
00420
00421 fdarray.clear();
00422 handlers.clear();
00423
00424 for (size_t i=0; i<polldata.size(); i++) {
00425 if (polldata[i].pollfd.fd != -1 && polldata[i].pollfd.events) {
00426 fdarray.push_back(polldata[i].pollfd);
00427 handlers.push_back(polldata[i].handler);
00428 }
00429 }
00430 }