synchronized_multiqueue.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef GRAPHLAB_SYNCHRONIZED_MULTIQUEUE_HPP
00019 #define GRAPHLAB_SYNCHRONIZED_MULTIQUEUE_HPP
00020 #include <queue>
00021 
00022 #include <graphlab/parallel/atomic.hpp>
00023 #include <graphlab/parallel/pthread_tools.hpp>
00024 #include <graphlab/util/random.hpp>
00025 
00026 
00027 namespace graphlab {
00028   /// \ingroup util_internal
00029   template <typename T>
00030   class synchronized_multiqueue {
00031 
00032   public:
00033     synchronized_multiqueue(size_t numqueues_ = 8) {
00034       numqueues = numqueues_;
00035       queue.resize(numqueues);
00036       locks.resize(numqueues);
00037       num_queues_full.value = 0;
00038     };
00039     ~synchronized_multiqueue() { };
00040 
00041     inline void push(const T &item, size_t queuehint) {
00042       locks[queuehint].lock();
00043       if (queue[queuehint].empty()) num_queues_full.inc();
00044       queue[queuehint].push(item);
00045       locks[queuehint].unlock();
00046     }
00047   
00048     inline bool safepop(T* ret, size_t queuehint) {
00049       locks[queuehint].lock();
00050       // if the queue has stuff in it
00051       if (queue[queuehint].size() == 0) {
00052         locks[queuehint].unlock();
00053         return false;
00054       }
00055       *ret = queue[queuehint].front();
00056       queue[queuehint].pop();
00057       if (queue[queuehint].empty()) num_queues_full.dec();
00058       locks[queuehint].unlock();
00059       return true;
00060     }
00061   
00062     /** Pushes an element into the queue without a hint. If you use this, you 
00063         really should lock. */ 
00064     void push(const T &item) {
00065       //  M.D. Mitzenmacher The Power of Two Choices in Randomized
00066       // Load Balancing (1991)
00067       // http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
00068 
00069       // pushing without a hint. push into the smaller of 2 selections
00070       if (numqueues == 1) { 
00071         push(item, 0);
00072         return;
00073       }
00074       size_t r1 = random::uniform<size_t>(0, numqueues-1);
00075       size_t r2 = random::uniform<size_t>(0, numqueues-2);
00076       if (r2 >= r1) ++r2;
00077       size_t queuehint = (queue[r1].size() < queue[r2].size()) ? r1 : r2;
00078       push(item, queuehint);
00079     }
00080   
00081     size_t size() const{
00082       size_t count = 0;
00083       for (size_t i = 0; i < queue.size(); ++i) {
00084         count += queue[i].size();
00085       }
00086       return count;
00087     }
00088 
00089     bool empty() const {
00090       return (num_queues_full.value == 0);
00091     }
00092   private:
00093 
00094 
00095     // static inline size_t rand_int(size_t q) {
00096     //   //     double v = thread::rand01();
00097     //   //     size_t r = *reinterpret_cast<size_t*>(&v); 
00098     //   //     return (r >> 8) % q;
00099     //   return  size_t(std::floor(thread::rand01() * q));
00100     // }
00101 
00102     std::vector<std::queue<T> > queue;
00103     std::vector<spinlock> locks;
00104     size_t numqueues;
00105     atomic<size_t> num_queues_full;
00106   };
00107 
00108 }
00109 #endif
00110