lazy_dht.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 /*
00019   \author Yucheng Low (ylow)
00020   An implementation of a distributed integer -> integer map with caching
00021   capabilities. 
00022 
00023 */
00024 
00025 #ifndef lazy_dht_HPP
00026 #define lazy_dht_HPP
00027 #include <boost/unordered_map.hpp>
00028 #include <boost/intrusive/list.hpp>
00029 
00030 #include <graphlab/rpc/dc.hpp>
00031 #include <graphlab/parallel/pthread_tools.hpp>
00032 #include <graphlab/util/synchronized_unordered_map.hpp>
00033 #include <graphlab/util/dense_bitset.hpp>
00034 #include <graphlab/rpc/lazy_dht.hpp>
00035 
00036 namespace graphlab {
00037 
00038 /**
00039 \ingroup rpc
00040 This implements a distributed key -> value map with caching capabilities.
00041 It is up to the user to determine cache invalidation policies. User explicitly
00042 calls the invalidate() function to clear local cache entries.
00043 This is an extremely lazy DHT in that it is up to the user to guarantee that
00044 the keys are unique. Any machine can call set on any key, and the result of the 
00045 key will be stored locally. Reads on any unknown keys will be resolved using a broadcast
00046 operation. 
00047 */
00048 
00049 template<typename KeyType, typename ValueType>
00050 class lazy_dht{
00051  public:
00052 
00053   typedef dc_impl::lru_list<KeyType, ValueType> lru_entry_type;
00054   /// datatype of the data map
00055   typedef boost::unordered_map<KeyType, ValueType> map_type;
00056   /// datatype of the local cache map
00057   typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
00058 
00059   struct wait_struct {
00060     mutex mut;
00061     conditional cond;
00062     ValueType val;
00063     size_t numreplies;
00064     bool hasvalue;
00065   };
00066 
00067   typedef boost::intrusive::member_hook<lru_entry_type,
00068                                         typename lru_entry_type::lru_member_hook_type,
00069                                         &lru_entry_type::member_hook_> MemberOption;
00070   /// datatype of the intrusive LRU list embedded in the cache map
00071   typedef boost::intrusive::list<lru_entry_type, 
00072                                  MemberOption, 
00073                                  boost::intrusive::constant_time_size<false> > lru_list_type;
00074 
00075   /// Constructor. Creates the integer map.
00076   lazy_dht(distributed_control &dc, 
00077                          size_t max_cache_size = 65536):rmi(dc, this),data(11) {
00078     cache.rehash(max_cache_size);
00079     maxcache = max_cache_size;
00080     logger(LOG_INFO, "%d Creating distributed_hash_table. Cache Limit = %d", 
00081            dc.procid(), maxcache);
00082     reqs = 0;
00083     misses = 0;
00084     dc.barrier();
00085   }
00086 
00087 
00088   ~lazy_dht() {
00089     data.clear();
00090     typename cache_type::iterator i = cache.begin();
00091     while (i != cache.end()) {
00092       delete i->second;
00093       ++i;
00094     }
00095     cache.clear();
00096   }
00097   
00098   
00099   /// Sets the key to the value
00100   void set(const KeyType& key, const ValueType &newval)  {
00101     datalock.lock();
00102     data[key] = newval;
00103     datalock.unlock();
00104   }
00105   
00106 
00107   std::pair<bool, ValueType> get_owned(const KeyType &key) const {
00108     std::pair<bool, ValueType> ret;
00109     datalock.lock();
00110     typename map_type::const_iterator iter = data.find(key);    
00111     if (iter == data.end()) {
00112       ret.first = false;
00113     }
00114     else {
00115       ret.first = true;
00116       ret.second = iter->second;
00117     }
00118     datalock.unlock();
00119     return ret;
00120   }
00121   
00122  void remote_get_owned(const KeyType &key, procid_t source, size_t ptr) const {
00123     std::pair<bool, ValueType> ret;
00124     datalock.lock();
00125     typename map_type::const_iterator iter = data.find(key);    
00126     if (iter == data.end()) {
00127       ret.first = false;
00128     }
00129     else {
00130       ret.first = true;
00131       ret.second = iter->second;
00132     }
00133     datalock.unlock();
00134     rmi.remote_call(source, &lazy_dht<KeyType,ValueType>::get_reply, ptr, ret.second, ret.first);
00135   }
00136 
00137   void get_reply(size_t ptr, ValueType& val, bool hasvalue) {
00138     wait_struct* w = reinterpret_cast<wait_struct*>(ptr);      
00139     w->mut.lock();
00140     if (hasvalue) {
00141       w->val = val;
00142       w->hasvalue = true;
00143     }
00144     w->numreplies--;
00145     if (w->numreplies == 0) w->cond.signal();
00146     w->mut.unlock();
00147     
00148   }
00149 
00150   /** Gets the value associated with the key. returns true on success.. */
00151   std::pair<bool, ValueType> get(const KeyType &key) const {
00152     std::pair<bool, ValueType> ret = get_owned(key);
00153     if (ret.first) return ret;
00154     
00155     wait_struct w;
00156     w.numreplies = rmi.numprocs() - 1;
00157     size_t ptr = reinterpret_cast<size_t>(&w);
00158     // otherwise I need to find someone with the key
00159     for (size_t i = 0;i < rmi.numprocs(); ++i) {
00160       if (i != rmi.procid()) {
00161         rmi.remote_call(i, &lazy_dht<KeyType,ValueType>::remote_get_owned, key, rmi.procid(), ptr);
00162       }
00163     }
00164     w.mut.lock();
00165     while (w.numreplies > 0) w.cond.wait(w.mut);
00166     w.mut.unlock();
00167     ret.first = w.hasvalue;
00168     ret.second = w.val;
00169     if (ret.first) update_cache(key, ret.second);
00170     return ret;
00171   }
00172 
00173 
00174   /** Gets the value associated with the key, reading from cache if available
00175       Note that the cache may be out of date. */
00176   std::pair<bool, ValueType> get_cached(const KeyType &key) const {
00177     std::pair<bool, ValueType> ret = get_owned(key);
00178     if (ret.first) return ret;
00179     
00180     reqs++;
00181     cachelock.lock();
00182     // check if it is in the cache
00183     typename cache_type::iterator i = cache.find(key);
00184     if (i == cache.end()) {
00185       // nope. not in cache. Call the regular get
00186       cachelock.unlock();
00187       misses++;
00188       return get(key);
00189     }
00190     else {
00191       // yup. in cache. return the value
00192       ret.first = true;
00193       ret.second = i->second->value;
00194       // shift the cache entry to the head of the LRU list
00195       lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
00196       lruage.push_front(*(i->second));
00197       cachelock.unlock();
00198       return ret;
00199     }
00200   }
00201 
00202   /// Invalidates the cache entry associated with this key
00203   void invalidate(const KeyType &key) const{
00204     cachelock.lock();
00205     // is the key I am invalidating in the cache?
00206     typename cache_type::iterator i = cache.find(key);
00207     if (i != cache.end()) {
00208       // drop it from the lru list
00209       delete i->second;
00210       cache.erase(i);
00211     }
00212     cachelock.unlock();
00213   }
00214 
00215 
00216   double cache_miss_rate() {
00217     return double(misses) / double(reqs);
00218   }
00219 
00220   size_t num_gets() const {
00221     return reqs;
00222   }
00223   size_t num_misses() const {
00224     return misses;
00225   }
00226 
00227   size_t cache_size() const {
00228     return cache.size();
00229   }
00230 
00231  private:
00232 
00233   mutable dc_dist_object<lazy_dht<KeyType, ValueType> > rmi;
00234   
00235   mutex datalock;
00236   map_type data;  /// The actual table data that is distributed
00237 
00238   
00239   mutex cachelock; /// lock for the cache datastructures
00240   mutable cache_type cache;   /// The cache table
00241   mutable lru_list_type lruage; /// THe LRU linked list associated with the cache
00242 
00243 
00244   procid_t numprocs;   /// NUmber of processors
00245   size_t maxcache;     /// Maximum cache size allowed
00246 
00247   mutable size_t reqs;
00248   mutable size_t misses;
00249   
00250 
00251 
00252   
00253 
00254   /// Updates the cache with this new value
00255   void update_cache(const KeyType &key, const ValueType &val) const{
00256     cachelock.lock();
00257     typename cache_type::iterator i = cache.find(key);
00258     // create a new entry
00259     if (i == cache.end()) {
00260       cachelock.unlock();
00261       // if we are out of room, remove the lru entry
00262       if (cache.size() >= maxcache) remove_lru();
00263       cachelock.lock();
00264       // insert the element, remember the iterator so we can push it
00265       // straight to the LRU list
00266       std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key, new lru_entry_type(key, val)));
00267       if (ret.second)  lruage.push_front(*(ret.first->second));
00268     }
00269     else {
00270         // modify entry in place
00271         i->second->value = val;
00272         // swap to front of list
00273         //boost::swap_nodes(lru_list_type::s_iterator_to(i->second), lruage.begin());
00274         lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
00275         lruage.push_front(*(i->second));
00276     }
00277     cachelock.unlock();
00278   }
00279 
00280   /// Removes the least recently used element from the cache
00281   void remove_lru() const{
00282     cachelock.lock();
00283     KeyType keytoerase = lruage.back().key;
00284     // is the key I am invalidating in the cache?
00285     typename cache_type::iterator i = cache.find(keytoerase);
00286     if (i != cache.end()) {
00287       // drop it from the lru list
00288       delete i->second;
00289       cache.erase(i);
00290     }
00291     cachelock.unlock();
00292   }
00293 
00294 };
00295 
00296 }
00297 #endif
00298