00001
00023 #ifndef HYPERTABLE_IOHANDLER_H
00024 #define HYPERTABLE_IOHANDLER_H
00025
00026 extern "C" {
00027 #include <errno.h>
00028 #include <time.h>
00029 #include <sys/types.h>
00030 #include <sys/socket.h>
00031 #include <sys/time.h>
00032 #include <poll.h>
00033 #if defined(__APPLE__) || defined(__FreeBSD__)
00034 #include <sys/event.h>
00035 #elif defined(__linux__)
00036 #include <sys/epoll.h>
00037 #if !defined(POLLRDHUP)
00038 #define POLLRDHUP 0x2000
00039 #endif
00040 #elif defined(__sun__)
00041 #include <port.h>
00042 #include <sys/port_impl.h>
00043 #endif
00044 }
00045
00046 #include "Common/Logger.h"
00047 #include "Common/Mutex.h"
00048 #include "Common/ReferenceCount.h"
00049
00050 #include "DispatchHandler.h"
00051 #include "ReactorFactory.h"
00052 #include "ExpireTimer.h"
00053
00054 namespace Hypertable {
00055
00059 class IOHandler : public ReferenceCount {
00060
00061 public:
00062
00063 IOHandler(int sd, const InetAddr &addr, DispatchHandlerPtr &dhp)
00064 : m_free_flag(0), m_addr(addr), m_sd(sd), m_dispatch_handler_ptr(dhp) {
00065 ReactorFactory::get_reactor(m_reactor_ptr);
00066 m_poll_interest = 0;
00067 socklen_t namelen = sizeof(m_local_addr);
00068 getsockname(m_sd, (sockaddr *)&m_local_addr, &namelen);
00069 memset(&m_alias, 0, sizeof(m_alias));
00070 }
00071
00072
00073 virtual bool handle_event(struct pollfd *event, clock_t arrival_clocks,
00074 time_t arival_time=0) = 0;
00075
00076 #if defined(__APPLE__) || defined(__FreeBSD__)
00077 virtual bool handle_event(struct kevent *event, clock_t arrival_clocks,
00078 time_t arival_time=0) = 0;
00079 #elif defined(__linux__)
00080 virtual bool handle_event(struct epoll_event *event, clock_t arrival_clocks,
00081 time_t arival_time=0) = 0;
00082 #elif defined(__sun__)
00083 virtual bool handle_event(port_event_t *event, clock_t arrival_clocks,
00084 time_t arival_time=0) = 0;
00085 #else
00086 ImplementMe;
00087 #endif
00088
00089 virtual ~IOHandler() {
00090 HT_EXPECT(m_free_flag != 0xdeadbeef, Error::FAILED_EXPECTATION);
00091 m_free_flag = 0xdeadbeef;
00092 return;
00093 }
00094
00095 void deliver_event(Event *event) {
00096 memcpy(&event->local_addr, &m_local_addr, sizeof(m_local_addr));
00097 if (!m_dispatch_handler_ptr) {
00098 HT_INFOF("%s", event->to_str().c_str());
00099 delete event;
00100 }
00101 else {
00102 EventPtr event_ptr(event);
00103 m_dispatch_handler_ptr->handle(event_ptr);
00104 }
00105 }
00106
00107 void deliver_event(Event *event, DispatchHandler *dh) {
00108 memcpy(&event->local_addr, &m_local_addr, sizeof(m_local_addr));
00109 if (!dh) {
00110 if (!m_dispatch_handler_ptr) {
00111 HT_INFOF("%s", event->to_str().c_str());
00112 delete event;
00113 }
00114 else {
00115 EventPtr event_ptr(event);
00116 m_dispatch_handler_ptr->handle(event_ptr);
00117 }
00118 }
00119 else {
00120 EventPtr event_ptr(event);
00121 dh->handle(event_ptr);
00122 }
00123 }
00124
00125 int start_polling(int mode=Reactor::READ_READY) {
00126 if (ReactorFactory::use_poll) {
00127 m_poll_interest = mode;
00128 return m_reactor_ptr->add_poll_interest(m_sd, poll_events(mode), this);
00129 }
00130 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00131 return add_poll_interest(mode);
00132 #elif defined(__linux__)
00133 struct epoll_event event;
00134 memset(&event, 0, sizeof(struct epoll_event));
00135 event.data.ptr = this;
00136 if (mode & Reactor::READ_READY)
00137 event.events |= EPOLLIN;
00138 if (mode & Reactor::WRITE_READY)
00139 event.events |= EPOLLOUT;
00140 if (ReactorFactory::ms_epollet)
00141 event.events |= POLLRDHUP | EPOLLET;
00142 m_poll_interest = mode;
00143 if (epoll_ctl(m_reactor_ptr->poll_fd, EPOLL_CTL_ADD, m_sd, &event) < 0) {
00144 HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, %x) failed : %s",
00145 m_reactor_ptr->poll_fd, m_sd, event.events, strerror(errno));
00146 return Error::COMM_POLL_ERROR;
00147 }
00148 #endif
00149 return Error::OK;
00150 }
00151
00152 int add_poll_interest(int mode);
00153
00154 int remove_poll_interest(int mode);
00155
00156 int reset_poll_interest() {
00157 return add_poll_interest(m_poll_interest);
00158 }
00159
00160 InetAddr &get_address() { return m_addr; }
00161
00162 InetAddr &get_local_address() { return m_local_addr; }
00163
00164 void get_local_address(InetAddr *addrp) {
00165 *addrp = m_local_addr;
00166 }
00167
00168 void set_alias(const InetAddr &alias) {
00169 m_alias = alias;
00170 }
00171
00172 void get_alias(InetAddr *aliasp) {
00173 *aliasp = m_alias;
00174 }
00175
00176 void set_proxy(const String &proxy) {
00177 ScopedLock lock(m_mutex);
00178 m_proxy = proxy;
00179 }
00180
00181 int get_sd() { return m_sd; }
00182
00183 void get_reactor(ReactorPtr &reactor_ptr) { reactor_ptr = m_reactor_ptr; }
00184
00185 void shutdown() {
00186 ExpireTimer timer;
00187 m_reactor_ptr->schedule_removal(this);
00188 boost::xtime_get(&timer.expire_time, boost::TIME_UTC);
00189 timer.expire_time.nsec += 200000000LL;
00190 timer.handler = 0;
00191 m_reactor_ptr->add_timer(timer);
00192 }
00193
00194 void display_event(struct pollfd *event);
00195
00196 #if defined(__APPLE__) || defined(__FreeBSD__)
00197 void display_event(struct kevent *event);
00198 #elif defined(__linux__)
00199 void display_event(struct epoll_event *event);
00200 #elif defined(__sun__)
00201 void display_event(port_event_t *event);
00202 #endif
00203
00204 protected:
00205
00206 short poll_events(int mode) {
00207 short events = 0;
00208 if (mode & Reactor::READ_READY)
00209 events |= POLLIN;
00210 if (mode & Reactor::WRITE_READY)
00211 events |= POLLOUT;
00212 return events;
00213 }
00214
00215 void stop_polling() {
00216 if (ReactorFactory::use_poll) {
00217 m_poll_interest = 0;
00218 m_reactor_ptr->modify_poll_interest(m_sd, 0);
00219 return;
00220 }
00221 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00222 remove_poll_interest(Reactor::READ_READY|Reactor::WRITE_READY);
00223 #elif defined(__linux__)
00224 struct epoll_event event;
00225 if (epoll_ctl(m_reactor_ptr->poll_fd, EPOLL_CTL_DEL, m_sd, &event) < 0) {
00226 HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_DEL, %d) failed : %s",
00227 m_reactor_ptr->poll_fd, m_sd, strerror(errno));
00228 exit(1);
00229 }
00230 m_poll_interest = 0;
00231 #endif
00232 }
00233
00234 Mutex m_mutex;
00235 uint32_t m_free_flag;
00236 String m_proxy;
00237 InetAddr m_addr;
00238 InetAddr m_local_addr;
00239 InetAddr m_alias;
00240 int m_sd;
00241 DispatchHandlerPtr m_dispatch_handler_ptr;
00242 ReactorPtr m_reactor_ptr;
00243 int m_poll_interest;
00244 };
00245 typedef boost::intrusive_ptr<IOHandler> IOHandlerPtr;
00246
00247 struct ltiohp {
00248 bool operator()(const IOHandlerPtr &p1, const IOHandlerPtr &p2) const {
00249 return p1.get() < p2.get();
00250 }
00251 };
00252
00253 }
00254
00255
00256 #endif // HYPERTABLE_IOHANDLER_H