IOHandler.h

Go to the documentation of this file.
00001 
00023 #ifndef HYPERTABLE_IOHANDLER_H
00024 #define HYPERTABLE_IOHANDLER_H
00025 
00026 extern "C" {
00027 #include <errno.h>
00028 #include <time.h>
00029 #include <sys/types.h>
00030 #include <sys/socket.h>
00031 #include <sys/time.h>
00032 #include <poll.h>
00033 #if defined(__APPLE__) || defined(__FreeBSD__)
00034 #include <sys/event.h>
00035 #elif defined(__linux__)
00036 #include <sys/epoll.h>
00037 #if !defined(POLLRDHUP)
00038 #define POLLRDHUP 0x2000
00039 #endif
00040 #elif defined(__sun__)
00041 #include <port.h>
00042 #include <sys/port_impl.h>
00043 #endif
00044 }
00045 
00046 #include "Common/Logger.h"
00047 #include "Common/Mutex.h"
00048 #include "Common/ReferenceCount.h"
00049 
00050 #include "DispatchHandler.h"
00051 #include "ReactorFactory.h"
00052 #include "ExpireTimer.h"
00053 
00054 namespace Hypertable {
00055 
00059   class IOHandler : public ReferenceCount {
00060 
00061   public:
00062 
00063     IOHandler(int sd, const InetAddr &addr, DispatchHandlerPtr &dhp)
00064       : m_free_flag(0), m_addr(addr), m_sd(sd), m_dispatch_handler_ptr(dhp) {
00065       ReactorFactory::get_reactor(m_reactor_ptr);
00066       m_poll_interest = 0;
00067       socklen_t namelen = sizeof(m_local_addr);
00068       getsockname(m_sd, (sockaddr *)&m_local_addr, &namelen);
00069       memset(&m_alias, 0, sizeof(m_alias));
00070     }
00071 
00072     // define default poll() interface for everyone since it is chosen at runtime
00073     virtual bool handle_event(struct pollfd *event, clock_t arrival_clocks,
00074                               time_t arival_time=0) = 0;
00075 
00076 #if defined(__APPLE__) || defined(__FreeBSD__)
00077     virtual bool handle_event(struct kevent *event, clock_t arrival_clocks,
00078                               time_t arival_time=0) = 0;
00079 #elif defined(__linux__)
00080     virtual bool handle_event(struct epoll_event *event, clock_t arrival_clocks,
00081                               time_t arival_time=0) = 0;
00082 #elif defined(__sun__)
00083     virtual bool handle_event(port_event_t *event, clock_t arrival_clocks,
00084                               time_t arival_time=0) = 0;
00085 #else
00086     ImplementMe;
00087 #endif
00088 
00089     virtual ~IOHandler() {
00090       HT_EXPECT(m_free_flag != 0xdeadbeef, Error::FAILED_EXPECTATION);
00091       m_free_flag = 0xdeadbeef;
00092       return;
00093     }
00094 
00095     void deliver_event(Event *event) {
00096       memcpy(&event->local_addr, &m_local_addr, sizeof(m_local_addr));
00097       if (!m_dispatch_handler_ptr) {
00098         HT_INFOF("%s", event->to_str().c_str());
00099         delete event;
00100       }
00101       else {
00102         EventPtr event_ptr(event);
00103         m_dispatch_handler_ptr->handle(event_ptr);
00104       }
00105     }
00106 
00107     void deliver_event(Event *event, DispatchHandler *dh) {
00108       memcpy(&event->local_addr, &m_local_addr, sizeof(m_local_addr));
00109       if (!dh) {
00110         if (!m_dispatch_handler_ptr) {
00111           HT_INFOF("%s", event->to_str().c_str());
00112           delete event;
00113         }
00114         else {
00115           EventPtr event_ptr(event);
00116           m_dispatch_handler_ptr->handle(event_ptr);
00117         }
00118       }
00119       else {
00120         EventPtr event_ptr(event);
00121         dh->handle(event_ptr);
00122       }
00123     }
00124 
00125     int start_polling(int mode=Reactor::READ_READY) {
00126       if (ReactorFactory::use_poll) {
00127         m_poll_interest = mode;
00128         return m_reactor_ptr->add_poll_interest(m_sd, poll_events(mode), this);
00129       }
00130 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00131       return add_poll_interest(mode);
00132 #elif defined(__linux__)
00133       struct epoll_event event;
00134       memset(&event, 0, sizeof(struct epoll_event));
00135       event.data.ptr = this;
00136       if (mode & Reactor::READ_READY)
00137         event.events |= EPOLLIN;
00138       if (mode & Reactor::WRITE_READY)
00139         event.events |= EPOLLOUT;
00140       if (ReactorFactory::ms_epollet)
00141         event.events |= POLLRDHUP | EPOLLET;
00142       m_poll_interest = mode;
00143       if (epoll_ctl(m_reactor_ptr->poll_fd, EPOLL_CTL_ADD, m_sd, &event) < 0) {
00144         HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, %x) failed : %s",
00145                   m_reactor_ptr->poll_fd, m_sd, event.events, strerror(errno));
00146         return Error::COMM_POLL_ERROR;
00147       }
00148 #endif
00149       return Error::OK;
00150     }
00151 
00152     int add_poll_interest(int mode);
00153 
00154     int remove_poll_interest(int mode);
00155 
00156     int reset_poll_interest() {
00157       return add_poll_interest(m_poll_interest);
00158     }
00159 
00160     InetAddr &get_address() { return m_addr; }
00161 
00162     InetAddr &get_local_address() { return m_local_addr; }
00163 
00164     void get_local_address(InetAddr *addrp) {
00165       *addrp = m_local_addr;
00166     }
00167 
00168     void set_alias(const InetAddr &alias) {
00169       m_alias = alias;
00170     }
00171 
00172     void get_alias(InetAddr *aliasp) {
00173       *aliasp = m_alias;
00174     }
00175 
00176     void set_proxy(const String &proxy) {
00177       ScopedLock lock(m_mutex);
00178       m_proxy = proxy;
00179     }
00180 
00181     int get_sd() { return m_sd; }
00182 
00183     void get_reactor(ReactorPtr &reactor_ptr) { reactor_ptr = m_reactor_ptr; }
00184 
00185     void shutdown() {
00186       ExpireTimer timer;
00187       m_reactor_ptr->schedule_removal(this);
00188       boost::xtime_get(&timer.expire_time, boost::TIME_UTC);
00189       timer.expire_time.nsec += 200000000LL;
00190       timer.handler = 0;
00191       m_reactor_ptr->add_timer(timer);
00192     }
00193 
00194     void display_event(struct pollfd *event);
00195 
00196 #if defined(__APPLE__) || defined(__FreeBSD__)
00197     void display_event(struct kevent *event);
00198 #elif defined(__linux__)
00199     void display_event(struct epoll_event *event);
00200 #elif defined(__sun__)
00201     void display_event(port_event_t *event);
00202 #endif
00203 
00204   protected:
00205 
00206     short poll_events(int mode) {
00207       short events = 0;
00208       if (mode & Reactor::READ_READY)
00209         events |= POLLIN;
00210       if (mode & Reactor::WRITE_READY)
00211         events |= POLLOUT;
00212       return events;
00213     }
00214 
00215     void stop_polling() {
00216       if (ReactorFactory::use_poll) {
00217         m_poll_interest = 0;
00218         m_reactor_ptr->modify_poll_interest(m_sd, 0);
00219         return;
00220       }
00221 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
00222       remove_poll_interest(Reactor::READ_READY|Reactor::WRITE_READY);
00223 #elif defined(__linux__)
00224       struct epoll_event event;  // this is necessary for < Linux 2.6.9
00225       if (epoll_ctl(m_reactor_ptr->poll_fd, EPOLL_CTL_DEL, m_sd, &event) < 0) {
00226         HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_DEL, %d) failed : %s",
00227                      m_reactor_ptr->poll_fd, m_sd, strerror(errno));
00228         exit(1);
00229       }
00230       m_poll_interest = 0;
00231 #endif
00232     }
00233 
00234     Mutex               m_mutex;
00235     uint32_t            m_free_flag;
00236     String              m_proxy;
00237     InetAddr            m_addr;
00238     InetAddr            m_local_addr;
00239     InetAddr            m_alias;
00240     int                 m_sd;
00241     DispatchHandlerPtr  m_dispatch_handler_ptr;
00242     ReactorPtr          m_reactor_ptr;
00243     int                 m_poll_interest;
00244   };
00245   typedef boost::intrusive_ptr<IOHandler> IOHandlerPtr;
00246 
00247   struct ltiohp {
00248     bool operator()(const IOHandlerPtr &p1, const IOHandlerPtr &p2) const {
00249       return p1.get() < p2.get();
00250     }
00251   };
00252 
00253 }
00254 
00255 
00256 #endif // HYPERTABLE_IOHANDLER_H