00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef lazy_dht_HPP
00026 #define lazy_dht_HPP
00027 #include <boost/unordered_map.hpp>
00028 #include <boost/intrusive/list.hpp>
00029
00030 #include <graphlab/rpc/dc.hpp>
00031 #include <graphlab/parallel/pthread_tools.hpp>
00032 #include <graphlab/util/synchronized_unordered_map.hpp>
00033 #include <graphlab/util/dense_bitset.hpp>
00034 #include <graphlab/rpc/lazy_dht.hpp>
00035
00036 namespace graphlab {
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 template<typename KeyType, typename ValueType>
00050 class lazy_dht{
00051 public:
00052
00053 typedef dc_impl::lru_list<KeyType, ValueType> lru_entry_type;
00054
00055 typedef boost::unordered_map<KeyType, ValueType> map_type;
00056
00057 typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
00058
00059 struct wait_struct {
00060 mutex mut;
00061 conditional cond;
00062 ValueType val;
00063 size_t numreplies;
00064 bool hasvalue;
00065 };
00066
00067 typedef boost::intrusive::member_hook<lru_entry_type,
00068 typename lru_entry_type::lru_member_hook_type,
00069 &lru_entry_type::member_hook_> MemberOption;
00070
00071 typedef boost::intrusive::list<lru_entry_type,
00072 MemberOption,
00073 boost::intrusive::constant_time_size<false> > lru_list_type;
00074
00075
00076 lazy_dht(distributed_control &dc,
00077 size_t max_cache_size = 65536):rmi(dc, this),data(11) {
00078 cache.rehash(max_cache_size);
00079 maxcache = max_cache_size;
00080 logger(LOG_INFO, "%d Creating distributed_hash_table. Cache Limit = %d",
00081 dc.procid(), maxcache);
00082 reqs = 0;
00083 misses = 0;
00084 dc.barrier();
00085 }
00086
00087
00088 ~lazy_dht() {
00089 data.clear();
00090 typename cache_type::iterator i = cache.begin();
00091 while (i != cache.end()) {
00092 delete i->second;
00093 ++i;
00094 }
00095 cache.clear();
00096 }
00097
00098
00099
00100 void set(const KeyType& key, const ValueType &newval) {
00101 datalock.lock();
00102 data[key] = newval;
00103 datalock.unlock();
00104 }
00105
00106
00107 std::pair<bool, ValueType> get_owned(const KeyType &key) const {
00108 std::pair<bool, ValueType> ret;
00109 datalock.lock();
00110 typename map_type::const_iterator iter = data.find(key);
00111 if (iter == data.end()) {
00112 ret.first = false;
00113 }
00114 else {
00115 ret.first = true;
00116 ret.second = iter->second;
00117 }
00118 datalock.unlock();
00119 return ret;
00120 }
00121
00122 void remote_get_owned(const KeyType &key, procid_t source, size_t ptr) const {
00123 std::pair<bool, ValueType> ret;
00124 datalock.lock();
00125 typename map_type::const_iterator iter = data.find(key);
00126 if (iter == data.end()) {
00127 ret.first = false;
00128 }
00129 else {
00130 ret.first = true;
00131 ret.second = iter->second;
00132 }
00133 datalock.unlock();
00134 rmi.remote_call(source, &lazy_dht<KeyType,ValueType>::get_reply, ptr, ret.second, ret.first);
00135 }
00136
00137 void get_reply(size_t ptr, ValueType& val, bool hasvalue) {
00138 wait_struct* w = reinterpret_cast<wait_struct*>(ptr);
00139 w->mut.lock();
00140 if (hasvalue) {
00141 w->val = val;
00142 w->hasvalue = true;
00143 }
00144 w->numreplies--;
00145 if (w->numreplies == 0) w->cond.signal();
00146 w->mut.unlock();
00147
00148 }
00149
00150
00151 std::pair<bool, ValueType> get(const KeyType &key) const {
00152 std::pair<bool, ValueType> ret = get_owned(key);
00153 if (ret.first) return ret;
00154
00155 wait_struct w;
00156 w.numreplies = rmi.numprocs() - 1;
00157 size_t ptr = reinterpret_cast<size_t>(&w);
00158
00159 for (size_t i = 0;i < rmi.numprocs(); ++i) {
00160 if (i != rmi.procid()) {
00161 rmi.remote_call(i, &lazy_dht<KeyType,ValueType>::remote_get_owned, key, rmi.procid(), ptr);
00162 }
00163 }
00164 w.mut.lock();
00165 while (w.numreplies > 0) w.cond.wait(w.mut);
00166 w.mut.unlock();
00167 ret.first = w.hasvalue;
00168 ret.second = w.val;
00169 if (ret.first) update_cache(key, ret.second);
00170 return ret;
00171 }
00172
00173
00174
00175
00176 std::pair<bool, ValueType> get_cached(const KeyType &key) const {
00177 std::pair<bool, ValueType> ret = get_owned(key);
00178 if (ret.first) return ret;
00179
00180 reqs++;
00181 cachelock.lock();
00182
00183 typename cache_type::iterator i = cache.find(key);
00184 if (i == cache.end()) {
00185
00186 cachelock.unlock();
00187 misses++;
00188 return get(key);
00189 }
00190 else {
00191
00192 ret.first = true;
00193 ret.second = i->second->value;
00194
00195 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
00196 lruage.push_front(*(i->second));
00197 cachelock.unlock();
00198 return ret;
00199 }
00200 }
00201
00202
00203 void invalidate(const KeyType &key) const{
00204 cachelock.lock();
00205
00206 typename cache_type::iterator i = cache.find(key);
00207 if (i != cache.end()) {
00208
00209 delete i->second;
00210 cache.erase(i);
00211 }
00212 cachelock.unlock();
00213 }
00214
00215
00216 double cache_miss_rate() {
00217 return double(misses) / double(reqs);
00218 }
00219
00220 size_t num_gets() const {
00221 return reqs;
00222 }
00223 size_t num_misses() const {
00224 return misses;
00225 }
00226
00227 size_t cache_size() const {
00228 return cache.size();
00229 }
00230
00231 private:
00232
00233 mutable dc_dist_object<lazy_dht<KeyType, ValueType> > rmi;
00234
00235 mutex datalock;
00236 map_type data;
00237
00238
00239 mutex cachelock;
00240 mutable cache_type cache;
00241 mutable lru_list_type lruage;
00242
00243
00244 procid_t numprocs;
00245 size_t maxcache;
00246
00247 mutable size_t reqs;
00248 mutable size_t misses;
00249
00250
00251
00252
00253
00254
00255 void update_cache(const KeyType &key, const ValueType &val) const{
00256 cachelock.lock();
00257 typename cache_type::iterator i = cache.find(key);
00258
00259 if (i == cache.end()) {
00260 cachelock.unlock();
00261
00262 if (cache.size() >= maxcache) remove_lru();
00263 cachelock.lock();
00264
00265
00266 std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key, new lru_entry_type(key, val)));
00267 if (ret.second) lruage.push_front(*(ret.first->second));
00268 }
00269 else {
00270
00271 i->second->value = val;
00272
00273
00274 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
00275 lruage.push_front(*(i->second));
00276 }
00277 cachelock.unlock();
00278 }
00279
00280
00281 void remove_lru() const{
00282 cachelock.lock();
00283 KeyType keytoerase = lruage.back().key;
00284
00285 typename cache_type::iterator i = cache.find(keytoerase);
00286 if (i != cache.end()) {
00287
00288 delete i->second;
00289 cache.erase(i);
00290 }
00291 cachelock.unlock();
00292 }
00293
00294 };
00295
00296 }
00297 #endif
00298