IOHandler.cc

Go to the documentation of this file.
00001 
00022 #include "Common/Compat.h"
00023 
00024 #include <cstdio>
00025 #include <iostream>
00026 using namespace std;
00027 
00028 extern "C" {
00029 #include <errno.h>
00030 #if defined(__APPLE__) || defined(__FreeBSD__)
00031 #include <sys/event.h>
00032 #elif defined(__sun__)
00033 #include <sys/types.h>
00034 #include <sys/socket.h>
00035 #endif
00036 }
00037 
00038 #include "Common/Logger.h"
00039 
00040 #include "IOHandler.h"
00041 #include "Reactor.h"
00042 using namespace Hypertable;
00043 
00044 #define HANDLE_POLL_INTERFACE_MODIFY \
00045   if (ReactorFactory::use_poll) \
00046     return m_reactor_ptr->modify_poll_interest(m_sd, poll_events(m_poll_interest));
00047 
00048 #define HANDLE_POLL_INTERFACE_ADD \
00049   if (ReactorFactory::use_poll) \
00050     return m_reactor_ptr->add_poll_interest(m_sd, poll_events(m_poll_interest), this);
00051 
00052 void IOHandler::display_event(struct pollfd *event) {
00053   char buf[128];
00054 
00055   buf[0] = 0;
00056   if (event->revents & POLLIN)
00057     strcat(buf, "POLLIN ");
00058   if (event->revents & POLLRDNORM)
00059     strcat(buf, "POLLRDNORM ");
00060   if (event->revents & POLLRDBAND)
00061     strcat(buf, "POLLRDBAND ");
00062   if (event->revents & POLLPRI)
00063     strcat(buf, "POLLPRI ");
00064   if (event->revents & POLLOUT)
00065     strcat(buf, "POLLOUT ");
00066   if (event->revents & POLLWRNORM)
00067     strcat(buf, "POLLWRNORM ");
00068   if (event->revents & POLLWRBAND)
00069     strcat(buf, "POLLWRBAND ");
00070   if (event->revents & POLLERR)
00071     strcat(buf, "POLLERR ");
00072   if (event->revents & POLLHUP)
00073     strcat(buf, "POLLHUP ");
00074   if (event->revents & POLLNVAL)
00075     strcat(buf, "POLLNVAL ");
00076 
00077   if (buf[0] == 0)
00078     sprintf(buf, "0x%x ", event->revents);
00079 
00080   clog << "poll events = " << buf << endl;
00081 }
00082 
00083 #if defined(__linux__)
00084 
00085 int IOHandler::add_poll_interest(int mode) {
00086 
00087   m_poll_interest |= mode;
00088 
00089   HANDLE_POLL_INTERFACE_ADD;
00090 
00091   if (!ReactorFactory::ms_epollet) {
00092     struct epoll_event event;
00093 
00094     memset(&event, 0, sizeof(struct epoll_event));
00095     event.data.ptr = this;
00096 
00097     if (m_poll_interest & Reactor::READ_READY)
00098       event.events |= EPOLLIN;
00099     if (m_poll_interest & Reactor::WRITE_READY)
00100       event.events |= EPOLLOUT;
00101 
00102     if (epoll_ctl(m_reactor_ptr->poll_fd, EPOLL_CTL_MOD, m_sd, &event) < 0) {
00108       return Error::COMM_POLL_ERROR;      
00109     }
00110   }
00111   return Error::OK;
00112 }
00113 
00114 
00115 
00116 int IOHandler::remove_poll_interest(int mode) {
00117   m_poll_interest &= ~mode;
00118 
00119   HANDLE_POLL_INTERFACE_MODIFY;
00120 
00121   if (!ReactorFactory::ms_epollet) {
00122     struct epoll_event event;
00123 
00124     memset(&event, 0, sizeof(struct epoll_event));
00125     event.data.ptr = this;
00126 
00127     if (m_poll_interest & Reactor::READ_READY)
00128       event.events |= EPOLLIN;
00129     if (m_poll_interest & Reactor::WRITE_READY)
00130       event.events |= EPOLLOUT;
00131 
00132     if (epoll_ctl(m_reactor_ptr->poll_fd, EPOLL_CTL_MOD, m_sd, &event) < 0) {
00133       HT_ERRORF("epoll_ctl(EPOLL_CTL_MOD, sd=%d) (mode=%x) : %s",
00134                 m_sd, mode, strerror(errno));
00135       return Error::COMM_POLL_ERROR;
00136     }
00137   }
00138   return Error::OK;
00139 }
00140 
00141 
00142 
00143 void IOHandler::display_event(struct epoll_event *event) {
00144   char buf[128];
00145 
00146   buf[0] = 0;
00147   if (event->events & EPOLLIN)
00148     strcat(buf, "EPOLLIN ");
00149   else if (event->events & EPOLLOUT)
00150     strcat(buf, "EPOLLOUT ");
00151   else if (event->events & EPOLLPRI)
00152     strcat(buf, "EPOLLPRI ");
00153   else if (event->events & EPOLLERR)
00154     strcat(buf, "EPOLLERR ");
00155   else if (event->events & EPOLLHUP)
00156     strcat(buf, "EPOLLHUP ");
00157   else if (ReactorFactory::ms_epollet && event->events & POLLRDHUP)
00158     strcat(buf, "POLLRDHUP ");
00159   else if (event->events & EPOLLET)
00160     strcat(buf, "EPOLLET ");
00161 #if defined(EPOLLONESHOT)
00162   else if (event->events & EPOLLONESHOT)
00163     strcat(buf, "EPOLLONESHOT ");
00164 #endif 
00165 
00166   if (buf[0] == 0)
00167     sprintf(buf, "0x%x ", event->events);
00168 
00169   clog << "epoll events = " << buf << endl;
00170 
00171   return;
00172 }
00173 
00174 #elif defined(__sun__)
00175 
00176 int IOHandler::add_poll_interest(int mode) {
00177   int events = 0;
00178 
00179   m_poll_interest |= mode;
00180 
00181   HANDLE_POLL_INTERFACE_ADD;
00182 
00183   if (m_poll_interest & Reactor::WRITE_READY)
00184     events |= POLLOUT;
00185 
00186   if (m_poll_interest & Reactor::READ_READY)
00187     events |= POLLIN;
00188 
00189   if (events) {
00190     if (port_associate(m_reactor_ptr->poll_fd, PORT_SOURCE_FD,
00191                        m_sd, events, this) < 0)
00192       HT_ERRORF("port_associate(%d, POLLIN, %d) - %s", m_reactor_ptr->poll_fd, m_sd,
00193                 strerror(errno));
00194     return Error::COMM_POLL_ERROR;
00195   }
00196   return Error::OK;
00197 }
00198 
00199 int IOHandler::remove_poll_interest(int mode) {
00200 
00201   if ((m_poll_interest & mode) == 0)
00202     return Error::OK;
00203 
00204   m_poll_interest &= ~mode;
00205 
00206   HANDLE_POLL_INTERFACE_MODIFY;
00207 
00208   if (m_poll_interest)
00209     reset_poll_interest();
00210   else {
00211     if (port_dissociate(m_reactor_ptr->poll_fd, PORT_SOURCE_FD, m_sd) < 0) {
00212       HT_ERRORF("port_dissociate(%d, PORT_SOURCE_FD, %d) - %s",
00213                 m_reactor_ptr->poll_fd, m_sd, strerror(errno));
00214       return Error::COMM_POLL_ERROR;
00215     }
00216   }
00217   return Error::OK;
00218 }
00219 
00220 void IOHandler::display_event(port_event_t *event) {
00221   char buf[128];
00222 
00223   buf[0] = 0;
00224 
00225   if (event->portev_events & POLLIN)
00226     strcat(buf, "POLLIN ");
00227   if (event->portev_events & POLLPRI)
00228     strcat(buf, "POLLPRI ");
00229   if (event->portev_events & POLLOUT)
00230     strcat(buf, "POLLOUT ");
00231   if (event->portev_events & POLLRDNORM)
00232     strcat(buf, "POLLRDNORM ");
00233   if (event->portev_events & POLLRDBAND)
00234     strcat(buf, "POLLRDBAND ");
00235   if (event->portev_events & POLLWRBAND)
00236     strcat(buf, "POLLWRBAND ");
00237   if (event->portev_events & POLLERR)
00238     strcat(buf, "POLLERR ");
00239   if (event->portev_events & POLLHUP)
00240     strcat(buf, "POLLHUP ");
00241   if (event->portev_events & POLLNVAL)
00242     strcat(buf, "POLLNVAL ");
00243   if (event->portev_events & POLLREMOVE)
00244     strcat(buf, "POLLREMOVE ");
00245 
00246   if (buf[0] == 0)
00247     sprintf(buf, "0x%x ", event->portev_events);
00248 
00249   clog << "port events = " << buf << endl;
00250 
00251 }
00252 
00253 #elif defined(__APPLE__) || defined(__FreeBSD__)
00254 
00255 int IOHandler::add_poll_interest(int mode) {
00256   struct kevent events[2];
00257   int count=0;
00258 
00259   m_poll_interest |= mode;
00260 
00261   HANDLE_POLL_INTERFACE_ADD;
00262 
00263   if (mode & Reactor::READ_READY) {
00264     EV_SET(&events[count], m_sd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, this);
00265     count++;
00266   }
00267   if (mode & Reactor::WRITE_READY) {
00268     EV_SET(&events[count], m_sd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, this);
00269     count++;
00270   }
00271   assert(count > 0);
00272 
00273   if (kevent(m_reactor_ptr->kqd, events, count, 0, 0, 0) == -1) {
00274     HT_ERRORF("kevent(sd=%d) (mode=%x) : %s", m_sd, mode, strerror(errno));
00275     return Error::COMM_POLL_ERROR;
00276   }
00277   return Error::OK;
00278 }
00279 
00280 
00281 int IOHandler::remove_poll_interest(int mode) {
00282   struct kevent devents[2];
00283   int count = 0;
00284 
00285   m_poll_interest &= ~mode;
00286 
00287   HANDLE_POLL_INTERFACE_MODIFY;
00288 
00289   if (mode & Reactor::READ_READY) {
00290     EV_SET(&devents[count], m_sd, EVFILT_READ, EV_DELETE, 0, 0, 0);
00291     count++;
00292   }
00293 
00294   if (mode & Reactor::WRITE_READY) {
00295     EV_SET(&devents[count], m_sd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00296     count++;
00297   }
00298 
00299   if (kevent(m_reactor_ptr->kqd, devents, count, 0, 0, 0) == -1
00300       && errno != ENOENT) {
00301     HT_ERRORF("kevent(sd=%d) (mode=%x) : %s", m_sd, mode, strerror(errno));
00302     return Error::COMM_POLL_ERROR;
00303   }
00304   return Error::OK;
00305 }
00306 
00307 
00311 void IOHandler::display_event(struct kevent *event) {
00312 
00313   clog << "kevent: ident=" << hex << (long)event->ident;
00314 
00315   switch (event->filter) {
00316   case EVFILT_READ:
00317     clog << ", EVFILT_READ, fflags=";
00318     if (event->fflags & NOTE_LOWAT)
00319       clog << "NOTE_LOWAT";
00320     else
00321       clog << event->fflags;
00322     break;
00323   case EVFILT_WRITE:
00324     clog << ", EVFILT_WRITE, fflags=";
00325     if (event->fflags & NOTE_LOWAT)
00326       clog << "NOTE_LOWAT";
00327     else
00328       clog << event->fflags;
00329     break;
00330   case EVFILT_AIO:
00331     clog << ", EVFILT_AIO, fflags=" << event->fflags;
00332     break;
00333   case EVFILT_VNODE:
00334     clog << ", EVFILT_VNODE, fflags={";
00335     if (event->fflags & NOTE_DELETE)
00336       clog << " NOTE_DELETE";
00337     if (event->fflags & NOTE_WRITE)
00338       clog << " NOTE_WRITE";
00339     if (event->fflags & NOTE_EXTEND)
00340       clog << " NOTE_EXTEND";
00341     if (event->fflags & NOTE_ATTRIB)
00342       clog << " NOTE_ATTRIB";
00343     if (event->fflags & NOTE_LINK)
00344       clog << " NOTE_LINK";
00345     if (event->fflags & NOTE_RENAME)
00346       clog << " NOTE_RENAME";
00347     if (event->fflags & NOTE_REVOKE)
00348       clog << " NOTE_REVOKE";
00349     clog << " }";
00350     break;
00351   case EVFILT_PROC:
00352     clog << ", EVFILT_VNODE, fflags={";
00353     if (event->fflags & NOTE_EXIT)
00354       clog << " NOTE_EXIT";
00355     if (event->fflags & NOTE_FORK)
00356       clog << " NOTE_FORK";
00357     if (event->fflags & NOTE_EXEC)
00358       clog << " NOTE_EXEC";
00359     if (event->fflags & NOTE_TRACK)
00360       clog << " NOTE_TRACK";
00361     if (event->fflags & NOTE_TRACKERR)
00362       clog << " NOTE_TRACKERR";
00363     if (event->fflags & NOTE_CHILD)
00364       clog << " NOTE_CHILD";
00365     clog << " pid=" << (event->flags & NOTE_PDATAMASK);
00366     break;
00367   case EVFILT_SIGNAL:
00368     clog << ", EVFILT_SIGNAL, fflags=" << event->fflags;
00369     break;
00370   case EVFILT_TIMER:
00371 #ifdef __FreeBSD__
00372     clog << ", EVFILT_TIMER, fflags=" << event->fflags;
00373 #else
00374     clog << ", EVFILT_TIMER, fflags={";
00375     if (event->fflags & NOTE_SECONDS)
00376       clog << " NOTE_SECONDS";
00377     if (event->fflags & NOTE_USECONDS)
00378       clog << " NOTE_USECONDS";
00379     if (event->fflags & NOTE_NSECONDS)
00380       clog << " NOTE_NSECONDS";
00381     if (event->fflags & NOTE_ABSOLUTE)
00382       clog << " NOTE_ABSOLUTE";
00383     clog << " }";
00384 #endif
00385     break;
00386   }
00387 
00388   if(event->flags != 0) {
00389     clog << ", flags=";
00390     if ((event->flags & EV_EOF) || (event->flags & EV_ERROR)) {
00391       clog << "{";
00392       if (event->flags & EV_EOF)
00393         clog << " EV_EOF";
00394       if (event->flags & EV_ERROR)
00395         clog << " EV_ERROR";
00396       clog << "}";
00397     }
00398     else
00399       clog << hex << event->flags;
00400   }
00401   clog << ", data=" << dec << (long)event->data << endl;
00402 }
00403 
00404 #else
00405   ImplementMe;
00406 #endif