HandlerMap.h

Go to the documentation of this file.
00001 
00022 #ifndef HYPERTABLE_HANDLERMAP_H
00023 #define HYPERTABLE_HANDLERMAP_H
00024 
00025 #include <cassert>
00026 
00027 //#define HT_DISABLE_LOG_DEBUG
00028 
00029 #include <boost/thread/condition.hpp>
00030 
00031 #include "Common/Mutex.h"
00032 #include "Common/Error.h"
00033 #include "Common/Logger.h"
00034 #include "Common/ReferenceCount.h"
00035 #include "Common/SockAddrMap.h"
00036 #include "Common/Time.h"
00037 #include "Common/Timer.h"
00038 
00039 #include "CommAddress.h"
00040 #include "CommBuf.h"
00041 #include "IOHandlerData.h"
00042 #include "IOHandlerDatagram.h"
00043 #include "ProxyMap.h"
00044 
00045 namespace Hypertable {
00046 
00047   class HandlerMap : public ReferenceCount {
00048 
00049   public:
00050 
00051   HandlerMap() : m_proxies_loaded(false) { }
00052 
00053     int32_t insert_handler(IOHandler *handler) {
00054       ScopedLock lock(m_mutex);
00055       if (m_handler_map.find(handler->get_address()) != m_handler_map.end())
00056         return Error::COMM_ALREADY_CONNECTED;
00057       m_handler_map[handler->get_address()] = handler;
00058       return Error::OK;
00059     }
00060 
00061     int32_t insert_handler(IOHandlerData *handler) {
00062       ScopedLock lock(m_mutex);
00063       if (m_handler_map.find(handler->get_address()) != m_handler_map.end())
00064         return Error::COMM_ALREADY_CONNECTED;
00065       m_handler_map[handler->get_address()] = handler;
00066       if (ReactorFactory::proxy_master) {
00067         CommBufPtr comm_buf = m_proxy_map.create_update_message();
00068         comm_buf->write_header_and_reset();
00069         return handler->send_message(comm_buf);
00070       }
00071       return Error::OK;
00072     }
00073 
00074     int set_alias(const InetAddr &addr, const InetAddr &alias) {
00075       ScopedLock lock(m_mutex);
00076       SockAddrMap<IOHandlerPtr>::iterator iter;
00077 
00078       if (m_handler_map.find(alias) != m_handler_map.end())
00079         return Error::COMM_CONFLICTING_ADDRESS;
00080 
00081       if ((iter = m_handler_map.find(addr)) == m_handler_map.end())
00082         return Error::COMM_NOT_CONNECTED;
00083 
00084       (*iter).second->set_alias(alias);
00085       m_handler_map[alias] = (*iter).second;
00086 
00087       return Error::OK;
00088     }
00089 
00090     int add_proxy(const String &proxy, const InetAddr &addr) {
00091       ScopedLock lock(m_mutex);
00092       ProxyMapT new_map, invalidated_map;
00093 
00094       m_proxy_map.update_mapping(proxy, addr, invalidated_map, new_map);
00095 
00096       foreach(const ProxyMapT::value_type &v, invalidated_map) {
00097         IOHandler *handler = lookup_handler(v.second);
00098         if (handler)
00099           handler->set_proxy("");
00100       }
00101 
00102       foreach(const ProxyMapT::value_type &v, new_map) {
00103         IOHandler *handler = lookup_handler(v.second);
00104         if (handler)
00105           handler->set_proxy(v.first);
00106       }
00107 
00108       return propagate_mappings(new_map);
00109     }
00110 
00116     void get_proxy_map(ProxyMapT &proxy_map) {
00117       m_proxy_map.get_map(proxy_map);
00118     }
00119 
00120 
00121     void update_proxies(const char *message, size_t message_len) {
00122       ScopedLock lock(m_mutex);
00123       String mappings(message, message_len);
00124       ProxyMapT new_map, invalidated_map;
00125 
00126       HT_ASSERT(!ReactorFactory::proxy_master);
00127 
00128       m_proxy_map.update_mappings(mappings, invalidated_map, new_map);
00129 
00130       foreach(const ProxyMapT::value_type &v, invalidated_map) {
00131         IOHandler *handler = lookup_handler(v.second);
00132         if (handler)
00133           handler->set_proxy("");
00134       }
00135 
00136       foreach(const ProxyMapT::value_type &v, new_map) {
00137         IOHandler *handler = lookup_handler(v.second);
00138         if (handler)
00139           handler->set_proxy(v.first);
00140       }
00141 
00142       m_proxies_loaded = true;
00143       m_cond.notify_all();
00144     }
00145 
00146     bool wait_for_proxy_load(Timer &timer) {
00147       ScopedLock lock(m_mutex);
00148       boost::xtime drop_time;
00149 
00150       timer.start();
00151 
00152       while (!m_proxies_loaded) {
00153         boost::xtime_get(&drop_time, boost::TIME_UTC);
00154         xtime_add_millis(drop_time, timer.remaining());
00155         if (!m_cond.timed_wait(lock, drop_time))
00156           return false;
00157       }
00158       return true;
00159     }
00160 
00161     int contains_data_handler(const CommAddress &addr) {
00162       IOHandlerDataPtr data_handler;
00163       return lookup_data_handler(addr, data_handler);
00164     }
00165 
00166     int lookup_data_handler(const CommAddress &addr,
00167                             IOHandlerDataPtr &io_handler_data) {
00168       ScopedLock lock(m_mutex);
00169       InetAddr inet_addr;
00170       int error;
00171 
00172       if ((error = translate_address(addr, &inet_addr)) != Error::OK)
00173         return error;
00174 
00175       IOHandler *handler = lookup_handler(inet_addr);
00176       if (handler) {
00177         io_handler_data = dynamic_cast<IOHandlerData *>(handler);
00178         if (io_handler_data)
00179           return Error::OK;
00180       }
00181       return Error::COMM_NOT_CONNECTED;
00182     }
00183 
00184     int32_t insert_datagram_handler(IOHandler *handler) {
00185       ScopedLock lock(m_mutex);
00186       if (m_datagram_handler_map.find(handler->get_local_address())
00187           != m_datagram_handler_map.end())
00188         return Error::COMM_ALREADY_CONNECTED;        
00189       m_datagram_handler_map[handler->get_local_address()] = handler;
00190       return Error::OK;
00191     }
00192 
00193     int lookup_datagram_handler(const CommAddress &addr,
00194                                 IOHandlerDatagramPtr &io_handler_dg) {
00195       ScopedLock lock(m_mutex);
00196       InetAddr inet_addr;
00197       int error;
00198 
00199       if ((error = translate_address(addr, &inet_addr)) != Error::OK)
00200         return error;
00201 
00202       SockAddrMap<IOHandlerPtr>::iterator iter =
00203         m_datagram_handler_map.find(inet_addr);
00204 
00205       if (iter == m_datagram_handler_map.end())
00206         return Error::COMM_NOT_CONNECTED;
00207 
00208       io_handler_dg = (IOHandlerDatagram *)(*iter).second.get();
00209 
00210       return Error::OK;
00211     }
00212 
00213     int remove_handler(const CommAddress &addr, IOHandlerPtr &handler) {
00214       SockAddrMap<IOHandlerPtr>::iterator iter;
00215       InetAddr inet_addr;
00216       int error;
00217 
00218       if ((error = translate_address(addr, &inet_addr)) != Error::OK)
00219         return error;
00220 
00221       if ((iter = m_handler_map.find(inet_addr)) != m_handler_map.end()) {
00222         handler = (*iter).second;
00223         m_handler_map.erase(iter);
00224         InetAddr other = handler->get_address();
00225 
00226         if (inet_addr == other)
00227           handler->get_alias(&other);
00228 
00229         if (other.sin_port != 0) {
00230           if ((iter = m_handler_map.find(other)) != m_handler_map.end())
00231             m_handler_map.erase(iter);
00232           else {
00233             HT_ERRORF("Unable to find mapping for %s in HandlerMap",
00234                       InetAddr::format(other).c_str());
00235           }
00236         }
00237       }
00238       else if ((iter = m_datagram_handler_map.find(inet_addr))
00239                 != m_datagram_handler_map.end()) {
00240         handler = (*iter).second;
00241         m_datagram_handler_map.erase(iter);
00242       }
00243       else
00244         return Error::COMM_NOT_CONNECTED;
00245       return Error::OK;
00246     }
00247 
00248     bool decomission_handler(const CommAddress &addr, IOHandlerPtr &handler) {
00249       ScopedLock lock(m_mutex);
00250 
00251       if (remove_handler(addr, handler) == Error::OK) {
00252         m_decomissioned_handlers.insert(handler);
00253         return true;
00254       }
00255       return false;
00256     }
00257 
00258     bool decomission_handler(const CommAddress &addr) {
00259       IOHandlerPtr handler;
00260       return decomission_handler(addr, handler);
00261     }
00262 
00263     bool translate_proxy_address(const CommAddress &proxy_addr, CommAddress &addr) {
00264       InetAddr inet_addr;
00265       HT_ASSERT(proxy_addr.is_proxy());
00266       if (!m_proxy_map.get_mapping(proxy_addr.proxy, inet_addr))
00267         return false;
00268       addr.set_inet(inet_addr);
00269       return true;
00270     }
00271 
00272     void purge_handler(IOHandler *handler) {
00273       ScopedLock lock(m_mutex);
00274       m_decomissioned_handlers.erase(handler);
00275       if (m_decomissioned_handlers.empty())
00276         m_cond.notify_all();
00277     }
00278 
00279     void decomission_all(std::set<IOHandler *> &handlers) {
00280       ScopedLock lock(m_mutex);
00281       SockAddrMap<IOHandlerPtr>::iterator iter;
00282 
00283       // TCP handlers
00284       for (iter = m_handler_map.begin(); iter != m_handler_map.end(); ++iter) {
00285         m_decomissioned_handlers.insert((*iter).second);
00286         handlers.insert((*iter).second.get());
00287       }
00288       m_handler_map.clear();
00289 
00290       // UDP handlers
00291       for (iter = m_datagram_handler_map.begin();
00292            iter != m_datagram_handler_map.end(); ++iter) {
00293         m_decomissioned_handlers.insert((*iter).second);
00294         handlers.insert((*iter).second.get());
00295       }
00296       m_datagram_handler_map.clear();
00297     }
00298 
00299     void wait_for_empty() {
00300       ScopedLock lock(m_mutex);
00301       if (!m_decomissioned_handlers.empty())
00302         m_cond.wait(lock);
00303     }
00304 
00305     int propagate_mappings(ProxyMapT &mappings) {
00306       int last_error = Error::OK;
00307 
00308       if (mappings.empty())
00309         return Error::OK;
00310 
00311       SockAddrMap<IOHandlerPtr>::iterator iter;
00312       String mapping;
00313 
00314       foreach(const ProxyMapT::value_type &v, mappings)
00315         mapping += v.first + "\t" + InetAddr::format(v.second) + "\n";
00316 
00317       uint8_t *buffer = new uint8_t [ mapping.length() + 1 ];
00318       strcpy((char *)buffer, mapping.c_str());
00319       boost::shared_array<uint8_t> payload(buffer);
00320       CommHeader header;
00321       header.flags |= CommHeader::FLAGS_BIT_PROXY_MAP_UPDATE;
00322       for (iter = m_handler_map.begin(); iter != m_handler_map.end(); ++iter) {
00323         IOHandlerData *io_handler_data = dynamic_cast<IOHandlerData *>((*iter).second.get());
00324         if (io_handler_data) {
00325           CommBufPtr comm_buf = new CommBuf(header, 0, payload, mapping.length()+1);
00326           comm_buf->write_header_and_reset();
00327           int error = io_handler_data->send_message(comm_buf);
00328           if (error != Error::OK) {
00329             HT_ERRORF("Unable to propagate proxy mappings to %s - %s",
00330                       InetAddr(io_handler_data->get_address()).format().c_str(),
00331                       Error::get_text(error));
00332             last_error = error;
00333           }
00334         }
00335       }
00336       return last_error;
00337     }
00338 
00339   private:
00340 
00344     int translate_address(const CommAddress &addr, InetAddr *inet_addr) {
00345 
00346       HT_ASSERT(addr.is_set());
00347 
00348       if (addr.is_proxy()) {
00349         if (!m_proxy_map.get_mapping(addr.proxy, *inet_addr))
00350           return Error::COMM_INVALID_PROXY;
00351       }
00352       else
00353         memcpy(inet_addr, &addr.inet, sizeof(InetAddr));
00354 
00355       return Error::OK;
00356     }
00357 
00358     IOHandler *lookup_handler(const InetAddr &addr) {
00359       SockAddrMap<IOHandlerPtr>::iterator iter = m_handler_map.find(addr);
00360       if (iter == m_handler_map.end())
00361         return 0;
00362       return (*iter).second.get();
00363     }
00364 
00365     Mutex                      m_mutex;
00366     boost::condition           m_cond;
00367     SockAddrMap<IOHandlerPtr>  m_handler_map;
00368     SockAddrMap<IOHandlerPtr>  m_datagram_handler_map;
00369     std::set<IOHandlerPtr, ltiohp>  m_decomissioned_handlers;
00370     ProxyMap                   m_proxy_map;
00371     bool                       m_proxies_loaded;
00372   };
00373   typedef boost::intrusive_ptr<HandlerMap> HandlerMapPtr;
00374 
00375 }
00376 
00377 
00378 #endif // HYPERTABLE_HANDLERMAP_H