00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef GRAPHLAB_SYNCHRONIZED_MULTIQUEUE_HPP
00019 #define GRAPHLAB_SYNCHRONIZED_MULTIQUEUE_HPP
00020 #include <queue>
00021
00022 #include <graphlab/parallel/atomic.hpp>
00023 #include <graphlab/parallel/pthread_tools.hpp>
00024 #include <graphlab/util/random.hpp>
00025
00026
00027 namespace graphlab {
00028
00029 template <typename T>
00030 class synchronized_multiqueue {
00031
00032 public:
00033 synchronized_multiqueue(size_t numqueues_ = 8) {
00034 numqueues = numqueues_;
00035 queue.resize(numqueues);
00036 locks.resize(numqueues);
00037 num_queues_full.value = 0;
00038 };
00039 ~synchronized_multiqueue() { };
00040
00041 inline void push(const T &item, size_t queuehint) {
00042 locks[queuehint].lock();
00043 if (queue[queuehint].empty()) num_queues_full.inc();
00044 queue[queuehint].push(item);
00045 locks[queuehint].unlock();
00046 }
00047
00048 inline bool safepop(T* ret, size_t queuehint) {
00049 locks[queuehint].lock();
00050
00051 if (queue[queuehint].size() == 0) {
00052 locks[queuehint].unlock();
00053 return false;
00054 }
00055 *ret = queue[queuehint].front();
00056 queue[queuehint].pop();
00057 if (queue[queuehint].empty()) num_queues_full.dec();
00058 locks[queuehint].unlock();
00059 return true;
00060 }
00061
00062
00063
00064 void push(const T &item) {
00065
00066
00067
00068
00069
00070 if (numqueues == 1) {
00071 push(item, 0);
00072 return;
00073 }
00074 size_t r1 = random::uniform<size_t>(0, numqueues-1);
00075 size_t r2 = random::uniform<size_t>(0, numqueues-2);
00076 if (r2 >= r1) ++r2;
00077 size_t queuehint = (queue[r1].size() < queue[r2].size()) ? r1 : r2;
00078 push(item, queuehint);
00079 }
00080
00081 size_t size() const{
00082 size_t count = 0;
00083 for (size_t i = 0; i < queue.size(); ++i) {
00084 count += queue[i].size();
00085 }
00086 return count;
00087 }
00088
00089 bool empty() const {
00090 return (num_queues_full.value == 0);
00091 }
00092 private:
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102 std::vector<std::queue<T> > queue;
00103 std::vector<spinlock> locks;
00104 size_t numqueues;
00105 atomic<size_t> num_queues_full;
00106 };
00107
00108 }
00109 #endif
00110