Go to the documentation of this file.00001
00022 #ifndef HYPERTABLE_HANDLERMAP_H
00023 #define HYPERTABLE_HANDLERMAP_H
00024
00025 #include <cassert>
00026
00027
00028
00029 #include <boost/thread/condition.hpp>
00030
00031 #include "Common/Mutex.h"
00032 #include "Common/Error.h"
00033 #include "Common/Logger.h"
00034 #include "Common/ReferenceCount.h"
00035 #include "Common/SockAddrMap.h"
00036 #include "Common/Time.h"
00037 #include "Common/Timer.h"
00038
00039 #include "CommAddress.h"
00040 #include "CommBuf.h"
00041 #include "IOHandlerData.h"
00042 #include "IOHandlerDatagram.h"
00043 #include "ProxyMap.h"
00044
00045 namespace Hypertable {
00046
00047 class HandlerMap : public ReferenceCount {
00048
00049 public:
00050
00051 HandlerMap() : m_proxies_loaded(false) { }
00052
00053 int32_t insert_handler(IOHandler *handler) {
00054 ScopedLock lock(m_mutex);
00055 if (m_handler_map.find(handler->get_address()) != m_handler_map.end())
00056 return Error::COMM_ALREADY_CONNECTED;
00057 m_handler_map[handler->get_address()] = handler;
00058 return Error::OK;
00059 }
00060
00061 int32_t insert_handler(IOHandlerData *handler) {
00062 ScopedLock lock(m_mutex);
00063 if (m_handler_map.find(handler->get_address()) != m_handler_map.end())
00064 return Error::COMM_ALREADY_CONNECTED;
00065 m_handler_map[handler->get_address()] = handler;
00066 if (ReactorFactory::proxy_master) {
00067 CommBufPtr comm_buf = m_proxy_map.create_update_message();
00068 comm_buf->write_header_and_reset();
00069 return handler->send_message(comm_buf);
00070 }
00071 return Error::OK;
00072 }
00073
00074 int set_alias(const InetAddr &addr, const InetAddr &alias) {
00075 ScopedLock lock(m_mutex);
00076 SockAddrMap<IOHandlerPtr>::iterator iter;
00077
00078 if (m_handler_map.find(alias) != m_handler_map.end())
00079 return Error::COMM_CONFLICTING_ADDRESS;
00080
00081 if ((iter = m_handler_map.find(addr)) == m_handler_map.end())
00082 return Error::COMM_NOT_CONNECTED;
00083
00084 (*iter).second->set_alias(alias);
00085 m_handler_map[alias] = (*iter).second;
00086
00087 return Error::OK;
00088 }
00089
00090 int add_proxy(const String &proxy, const InetAddr &addr) {
00091 ScopedLock lock(m_mutex);
00092 ProxyMapT new_map, invalidated_map;
00093
00094 m_proxy_map.update_mapping(proxy, addr, invalidated_map, new_map);
00095
00096 foreach(const ProxyMapT::value_type &v, invalidated_map) {
00097 IOHandler *handler = lookup_handler(v.second);
00098 if (handler)
00099 handler->set_proxy("");
00100 }
00101
00102 foreach(const ProxyMapT::value_type &v, new_map) {
00103 IOHandler *handler = lookup_handler(v.second);
00104 if (handler)
00105 handler->set_proxy(v.first);
00106 }
00107
00108 return propagate_mappings(new_map);
00109 }
00110
00116 void get_proxy_map(ProxyMapT &proxy_map) {
00117 m_proxy_map.get_map(proxy_map);
00118 }
00119
00120
00121 void update_proxies(const char *message, size_t message_len) {
00122 ScopedLock lock(m_mutex);
00123 String mappings(message, message_len);
00124 ProxyMapT new_map, invalidated_map;
00125
00126 HT_ASSERT(!ReactorFactory::proxy_master);
00127
00128 m_proxy_map.update_mappings(mappings, invalidated_map, new_map);
00129
00130 foreach(const ProxyMapT::value_type &v, invalidated_map) {
00131 IOHandler *handler = lookup_handler(v.second);
00132 if (handler)
00133 handler->set_proxy("");
00134 }
00135
00136 foreach(const ProxyMapT::value_type &v, new_map) {
00137 IOHandler *handler = lookup_handler(v.second);
00138 if (handler)
00139 handler->set_proxy(v.first);
00140 }
00141
00142 m_proxies_loaded = true;
00143 m_cond.notify_all();
00144 }
00145
00146 bool wait_for_proxy_load(Timer &timer) {
00147 ScopedLock lock(m_mutex);
00148 boost::xtime drop_time;
00149
00150 timer.start();
00151
00152 while (!m_proxies_loaded) {
00153 boost::xtime_get(&drop_time, boost::TIME_UTC);
00154 xtime_add_millis(drop_time, timer.remaining());
00155 if (!m_cond.timed_wait(lock, drop_time))
00156 return false;
00157 }
00158 return true;
00159 }
00160
00161 int contains_data_handler(const CommAddress &addr) {
00162 IOHandlerDataPtr data_handler;
00163 return lookup_data_handler(addr, data_handler);
00164 }
00165
00166 int lookup_data_handler(const CommAddress &addr,
00167 IOHandlerDataPtr &io_handler_data) {
00168 ScopedLock lock(m_mutex);
00169 InetAddr inet_addr;
00170 int error;
00171
00172 if ((error = translate_address(addr, &inet_addr)) != Error::OK)
00173 return error;
00174
00175 IOHandler *handler = lookup_handler(inet_addr);
00176 if (handler) {
00177 io_handler_data = dynamic_cast<IOHandlerData *>(handler);
00178 if (io_handler_data)
00179 return Error::OK;
00180 }
00181 return Error::COMM_NOT_CONNECTED;
00182 }
00183
00184 int32_t insert_datagram_handler(IOHandler *handler) {
00185 ScopedLock lock(m_mutex);
00186 if (m_datagram_handler_map.find(handler->get_local_address())
00187 != m_datagram_handler_map.end())
00188 return Error::COMM_ALREADY_CONNECTED;
00189 m_datagram_handler_map[handler->get_local_address()] = handler;
00190 return Error::OK;
00191 }
00192
00193 int lookup_datagram_handler(const CommAddress &addr,
00194 IOHandlerDatagramPtr &io_handler_dg) {
00195 ScopedLock lock(m_mutex);
00196 InetAddr inet_addr;
00197 int error;
00198
00199 if ((error = translate_address(addr, &inet_addr)) != Error::OK)
00200 return error;
00201
00202 SockAddrMap<IOHandlerPtr>::iterator iter =
00203 m_datagram_handler_map.find(inet_addr);
00204
00205 if (iter == m_datagram_handler_map.end())
00206 return Error::COMM_NOT_CONNECTED;
00207
00208 io_handler_dg = (IOHandlerDatagram *)(*iter).second.get();
00209
00210 return Error::OK;
00211 }
00212
00213 int remove_handler(const CommAddress &addr, IOHandlerPtr &handler) {
00214 SockAddrMap<IOHandlerPtr>::iterator iter;
00215 InetAddr inet_addr;
00216 int error;
00217
00218 if ((error = translate_address(addr, &inet_addr)) != Error::OK)
00219 return error;
00220
00221 if ((iter = m_handler_map.find(inet_addr)) != m_handler_map.end()) {
00222 handler = (*iter).second;
00223 m_handler_map.erase(iter);
00224 InetAddr other = handler->get_address();
00225
00226 if (inet_addr == other)
00227 handler->get_alias(&other);
00228
00229 if (other.sin_port != 0) {
00230 if ((iter = m_handler_map.find(other)) != m_handler_map.end())
00231 m_handler_map.erase(iter);
00232 else {
00233 HT_ERRORF("Unable to find mapping for %s in HandlerMap",
00234 InetAddr::format(other).c_str());
00235 }
00236 }
00237 }
00238 else if ((iter = m_datagram_handler_map.find(inet_addr))
00239 != m_datagram_handler_map.end()) {
00240 handler = (*iter).second;
00241 m_datagram_handler_map.erase(iter);
00242 }
00243 else
00244 return Error::COMM_NOT_CONNECTED;
00245 return Error::OK;
00246 }
00247
00248 bool decomission_handler(const CommAddress &addr, IOHandlerPtr &handler) {
00249 ScopedLock lock(m_mutex);
00250
00251 if (remove_handler(addr, handler) == Error::OK) {
00252 m_decomissioned_handlers.insert(handler);
00253 return true;
00254 }
00255 return false;
00256 }
00257
00258 bool decomission_handler(const CommAddress &addr) {
00259 IOHandlerPtr handler;
00260 return decomission_handler(addr, handler);
00261 }
00262
00263 bool translate_proxy_address(const CommAddress &proxy_addr, CommAddress &addr) {
00264 InetAddr inet_addr;
00265 HT_ASSERT(proxy_addr.is_proxy());
00266 if (!m_proxy_map.get_mapping(proxy_addr.proxy, inet_addr))
00267 return false;
00268 addr.set_inet(inet_addr);
00269 return true;
00270 }
00271
00272 void purge_handler(IOHandler *handler) {
00273 ScopedLock lock(m_mutex);
00274 m_decomissioned_handlers.erase(handler);
00275 if (m_decomissioned_handlers.empty())
00276 m_cond.notify_all();
00277 }
00278
00279 void decomission_all(std::set<IOHandler *> &handlers) {
00280 ScopedLock lock(m_mutex);
00281 SockAddrMap<IOHandlerPtr>::iterator iter;
00282
00283
00284 for (iter = m_handler_map.begin(); iter != m_handler_map.end(); ++iter) {
00285 m_decomissioned_handlers.insert((*iter).second);
00286 handlers.insert((*iter).second.get());
00287 }
00288 m_handler_map.clear();
00289
00290
00291 for (iter = m_datagram_handler_map.begin();
00292 iter != m_datagram_handler_map.end(); ++iter) {
00293 m_decomissioned_handlers.insert((*iter).second);
00294 handlers.insert((*iter).second.get());
00295 }
00296 m_datagram_handler_map.clear();
00297 }
00298
00299 void wait_for_empty() {
00300 ScopedLock lock(m_mutex);
00301 if (!m_decomissioned_handlers.empty())
00302 m_cond.wait(lock);
00303 }
00304
00305 int propagate_mappings(ProxyMapT &mappings) {
00306 int last_error = Error::OK;
00307
00308 if (mappings.empty())
00309 return Error::OK;
00310
00311 SockAddrMap<IOHandlerPtr>::iterator iter;
00312 String mapping;
00313
00314 foreach(const ProxyMapT::value_type &v, mappings)
00315 mapping += v.first + "\t" + InetAddr::format(v.second) + "\n";
00316
00317 uint8_t *buffer = new uint8_t [ mapping.length() + 1 ];
00318 strcpy((char *)buffer, mapping.c_str());
00319 boost::shared_array<uint8_t> payload(buffer);
00320 CommHeader header;
00321 header.flags |= CommHeader::FLAGS_BIT_PROXY_MAP_UPDATE;
00322 for (iter = m_handler_map.begin(); iter != m_handler_map.end(); ++iter) {
00323 IOHandlerData *io_handler_data = dynamic_cast<IOHandlerData *>((*iter).second.get());
00324 if (io_handler_data) {
00325 CommBufPtr comm_buf = new CommBuf(header, 0, payload, mapping.length()+1);
00326 comm_buf->write_header_and_reset();
00327 int error = io_handler_data->send_message(comm_buf);
00328 if (error != Error::OK) {
00329 HT_ERRORF("Unable to propagate proxy mappings to %s - %s",
00330 InetAddr(io_handler_data->get_address()).format().c_str(),
00331 Error::get_text(error));
00332 last_error = error;
00333 }
00334 }
00335 }
00336 return last_error;
00337 }
00338
00339 private:
00340
00344 int translate_address(const CommAddress &addr, InetAddr *inet_addr) {
00345
00346 HT_ASSERT(addr.is_set());
00347
00348 if (addr.is_proxy()) {
00349 if (!m_proxy_map.get_mapping(addr.proxy, *inet_addr))
00350 return Error::COMM_INVALID_PROXY;
00351 }
00352 else
00353 memcpy(inet_addr, &addr.inet, sizeof(InetAddr));
00354
00355 return Error::OK;
00356 }
00357
00358 IOHandler *lookup_handler(const InetAddr &addr) {
00359 SockAddrMap<IOHandlerPtr>::iterator iter = m_handler_map.find(addr);
00360 if (iter == m_handler_map.end())
00361 return 0;
00362 return (*iter).second.get();
00363 }
00364
00365 Mutex m_mutex;
00366 boost::condition m_cond;
00367 SockAddrMap<IOHandlerPtr> m_handler_map;
00368 SockAddrMap<IOHandlerPtr> m_datagram_handler_map;
00369 std::set<IOHandlerPtr, ltiohp> m_decomissioned_handlers;
00370 ProxyMap m_proxy_map;
00371 bool m_proxies_loaded;
00372 };
00373 typedef boost::intrusive_ptr<HandlerMap> HandlerMapPtr;
00374
00375 }
00376
00377
00378 #endif // HYPERTABLE_HANDLERMAP_H