Go to the documentation of this file.00001
00022 #ifndef HYPERTABLE_REACTOR_H
00023 #define HYPERTABLE_REACTOR_H
00024
00025 #include <queue>
00026 #include <set>
00027 #include <vector>
00028
00029 #include <boost/thread/thread.hpp>
00030
00031 extern "C" {
00032 #include <poll.h>
00033 }
00034
00035 #include "Common/Mutex.h"
00036 #include "Common/ReferenceCount.h"
00037
00038 #include "PollTimeout.h"
00039 #include "RequestCache.h"
00040 #include "ExpireTimer.h"
00041
00042 namespace Hypertable {
00043
00044 typedef struct {
00045 struct pollfd pollfd;
00046 IOHandler *handler;
00047 } PollDescriptorT;
00048
00049 class Reactor : public ReferenceCount {
00050
00051 friend class ReactorFactory;
00052
00053 public:
00054
00055 static const int READ_READY;
00056 static const int WRITE_READY;
00057
00058 Reactor();
00059 ~Reactor() {
00060 poll_loop_interrupt();
00061 }
00062
00063 void operator()();
00064
00065 void add_request(uint32_t id, IOHandler *handler, DispatchHandler *dh,
00066 boost::xtime &expire) {
00067 ScopedLock lock(m_mutex);
00068 m_request_cache.insert(id, handler, dh, expire);
00069 if (m_next_wakeup.sec == 0 || xtime_cmp(expire, m_next_wakeup) < 0)
00070 poll_loop_interrupt();
00071 }
00072
00073 DispatchHandler *remove_request(uint32_t id) {
00074 ScopedLock lock(m_mutex);
00075 return m_request_cache.remove(id);
00076 }
00077
00078 void cancel_requests(IOHandler *handler, int32_t error=Error::COMM_BROKEN_CONNECTION) {
00079 ScopedLock lock(m_mutex);
00080 m_request_cache.purge_requests(handler, error);
00081 }
00082
00083 void add_timer(ExpireTimer &timer) {
00084 ScopedLock lock(m_mutex);
00085 m_timer_heap.push(timer);
00086 poll_loop_interrupt();
00087 }
00088
00089 void schedule_removal(IOHandler *handler) {
00090 ScopedLock lock(m_mutex);
00091 m_removed_handlers.insert(handler);
00092 }
00093
00094 void get_removed_handlers(std::set<IOHandler *> &dst) {
00095 ScopedLock lock(m_mutex);
00096 dst = m_removed_handlers;
00097 m_removed_handlers.clear();
00098 }
00099
00100 void handle_timeouts(PollTimeout &next_timeout);
00101
00102 #if defined(__linux__) || defined (__sun__)
00103 int poll_fd;
00104 #elif defined (__APPLE__) || defined(__FreeBSD__)
00105 int kqd;
00106 #endif
00107
00108 int add_poll_interest(int sd, short events, IOHandler *handler);
00109 int remove_poll_interest(int sd);
00110 int modify_poll_interest(int sd, short events);
00111 void fetch_poll_array(std::vector<struct pollfd> &fdarray,
00112 std::vector<IOHandler *> &handlers);
00113
00114 Mutex m_poll_array_mutex;
00115 std::vector<PollDescriptorT> polldata;
00116
00117 int poll_loop_interrupt();
00118 int poll_loop_continue();
00119
00120 int interrupt_sd() { return m_interrupt_sd; }
00121
00122 protected:
00123 typedef std::priority_queue<ExpireTimer, std::vector<ExpireTimer>, LtTimer>
00124 TimerHeap;
00125
00126 Mutex m_mutex;
00127 RequestCache m_request_cache;
00128 TimerHeap m_timer_heap;
00129 int m_interrupt_sd;
00130 bool m_interrupt_in_progress;
00131 boost::xtime m_next_wakeup;
00132 std::set<IOHandler *> m_removed_handlers;
00133 };
00134
00135 typedef intrusive_ptr<Reactor> ReactorPtr;
00136
00137 }
00138
00139 #endif // HYPERTABLE_REACTOR_H