Reactor.h

Go to the documentation of this file.
00001 
00022 #ifndef HYPERTABLE_REACTOR_H
00023 #define HYPERTABLE_REACTOR_H
00024 
00025 #include <queue>
00026 #include <set>
00027 #include <vector>
00028 
00029 #include <boost/thread/thread.hpp>
00030 
00031 extern "C" {
00032 #include <poll.h>
00033 }
00034 
00035 #include "Common/Mutex.h"
00036 #include "Common/ReferenceCount.h"
00037 
00038 #include "PollTimeout.h"
00039 #include "RequestCache.h"
00040 #include "ExpireTimer.h"
00041 
00042 namespace Hypertable {
00043 
00044   typedef struct {
00045     struct pollfd pollfd;
00046     IOHandler *handler;
00047   } PollDescriptorT;
00048 
00049   class Reactor : public ReferenceCount {
00050 
00051     friend class ReactorFactory;
00052 
00053   public:
00054 
00055     static const int READ_READY;
00056     static const int WRITE_READY;
00057 
00058     Reactor();
00059     ~Reactor() {
00060       poll_loop_interrupt();
00061     }
00062 
00063     void operator()();
00064 
00065     void add_request(uint32_t id, IOHandler *handler, DispatchHandler *dh,
00066                      boost::xtime &expire) {
00067       ScopedLock lock(m_mutex);
00068       m_request_cache.insert(id, handler, dh, expire);
00069       if (m_next_wakeup.sec == 0 || xtime_cmp(expire, m_next_wakeup) < 0)
00070         poll_loop_interrupt();
00071     }
00072 
00073     DispatchHandler *remove_request(uint32_t id) {
00074       ScopedLock lock(m_mutex);
00075       return m_request_cache.remove(id);
00076     }
00077 
00078     void cancel_requests(IOHandler *handler, int32_t error=Error::COMM_BROKEN_CONNECTION) {
00079       ScopedLock lock(m_mutex);
00080       m_request_cache.purge_requests(handler, error);
00081     }
00082 
00083     void add_timer(ExpireTimer &timer) {
00084       ScopedLock lock(m_mutex);
00085       m_timer_heap.push(timer);
00086       poll_loop_interrupt();
00087     }
00088 
00089     void schedule_removal(IOHandler *handler) {
00090       ScopedLock lock(m_mutex);
00091       m_removed_handlers.insert(handler);
00092     }
00093 
00094     void get_removed_handlers(std::set<IOHandler *> &dst) {
00095       ScopedLock lock(m_mutex);
00096       dst = m_removed_handlers;
00097       m_removed_handlers.clear();
00098     }
00099 
00100     void handle_timeouts(PollTimeout &next_timeout);
00101 
00102 #if defined(__linux__) || defined (__sun__)
00103     int poll_fd;
00104 #elif defined (__APPLE__) || defined(__FreeBSD__)
00105     int kqd;
00106 #endif
00107 
00108     int add_poll_interest(int sd, short events, IOHandler *handler);
00109     int remove_poll_interest(int sd);
00110     int modify_poll_interest(int sd, short events);
00111     void fetch_poll_array(std::vector<struct pollfd> &fdarray,
00112                           std::vector<IOHandler *> &handlers);
00113 
00114     Mutex m_poll_array_mutex;
00115     std::vector<PollDescriptorT> polldata;
00116 
00117     int poll_loop_interrupt();
00118     int poll_loop_continue();
00119 
00120     int interrupt_sd() { return m_interrupt_sd; }
00121 
00122   protected:
00123     typedef std::priority_queue<ExpireTimer, std::vector<ExpireTimer>, LtTimer>
00124             TimerHeap;
00125 
00126     Mutex           m_mutex;
00127     RequestCache    m_request_cache;
00128     TimerHeap       m_timer_heap;
00129     int             m_interrupt_sd;
00130     bool            m_interrupt_in_progress;
00131     boost::xtime    m_next_wakeup;
00132     std::set<IOHandler *> m_removed_handlers;
00133   };
00134 
00135   typedef intrusive_ptr<Reactor> ReactorPtr;
00136 
00137 } // namespace Hypertable
00138 
00139 #endif // HYPERTABLE_REACTOR_H