00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef GRAPHLAB_MULTI_multi_blocking_queue_HPP
00019 #define GRAPHLAB_MULTI_multi_blocking_queue_HPP
00020
00021
00022 #include <vector>
00023 #include <deque>
00024 #include <iostream>
00025 #include <graphlab/util/random.hpp>
00026 #include <graphlab/parallel/pthread_tools.hpp>
00027
00028 #include <graphlab/macros_def.hpp>
00029
00030 namespace graphlab {
00031
00032
00033
00034
00035
00036
00037 template<typename T>
00038 class multi_blocking_queue {
00039 private:
00040
00041 typedef typename std::deque<T> queue_type;
00042
00043 struct single_queue {
00044 queue_type m_queue;
00045 mutex m_mutex;
00046 conditional m_conditional;
00047 bool handler_sleeping;
00048 size_t numelem;
00049 };
00050 size_t num_queues;
00051 std::vector<single_queue> allqueues;
00052 bool m_alive;
00053
00054 public:
00055
00056
00057 multi_blocking_queue(size_t num_queues = 1) :
00058 num_queues(num_queues), m_alive(true) {
00059 init(num_queues);
00060 }
00061
00062
00063
00064
00065
00066 void init(size_t nqueues) {
00067 num_queues = nqueues;
00068 allqueues.resize(num_queues);
00069 for (size_t i = 0;i < allqueues.size(); ++i) {
00070 allqueues[i].handler_sleeping = false;
00071 allqueues[i].numelem = 0;
00072 }
00073 }
00074
00075
00076 inline void enqueue(const T& elem) {
00077
00078 const size_t prod =
00079 random::fast_uniform<size_t>(0, num_queues * num_queues - 1);
00080 size_t r1 = prod / num_queues;
00081 size_t r2 = prod % num_queues;
00082 size_t qidx =
00083 (allqueues[r1].numelem < allqueues[r2].numelem) ? r1 : r2;
00084 single_queue& queueref = allqueues[qidx];
00085 queueref.m_mutex.lock();
00086 queueref.m_queue.push_back(elem);
00087 queueref.numelem++;
00088 if (queueref.handler_sleeping) queueref.m_conditional.signal();
00089 queueref.m_mutex.unlock();
00090 }
00091
00092
00093 inline void enqueue_specific(const T& elem, size_t qidx_) {
00094 size_t qidx = qidx_ % num_queues;
00095 single_queue& queueref = allqueues[qidx];
00096 queueref.m_mutex.lock();
00097 queueref.m_queue.push_back(elem);
00098 queueref.numelem++;
00099 if (queueref.handler_sleeping) queueref.m_conditional.signal();
00100 queueref.m_mutex.unlock();
00101 }
00102
00103
00104
00105
00106 inline std::pair<T, bool> dequeue(size_t id) {
00107 std::pair<T, bool> ret;
00108 ret.second = false;
00109
00110 single_queue& queueref = allqueues[id];
00111 queueref.m_mutex.lock();
00112 while(queueref.m_queue.empty() && m_alive) {
00113 queueref.handler_sleeping = true;
00114 queueref.m_conditional.wait(queueref.m_mutex);
00115 queueref.handler_sleeping = false;
00116 }
00117
00118 if(!queueref.m_queue.empty()) {
00119 ret.second = true;
00120 ret.first = queueref.m_queue.front();
00121 queueref.m_queue.pop_front();
00122 queueref.numelem--;
00123 }
00124 queueref.m_mutex.unlock();
00125 return ret;
00126 }
00127
00128 inline std::pair<T, bool> try_dequeue(size_t id) {
00129 std::pair<T, bool> ret;
00130 ret.second = false;
00131
00132 for (size_t i = 0; i < allqueues.size(); ++i) {
00133 single_queue& queueref = allqueues[id];
00134 if (queueref.numelem > 0) {
00135 queueref.m_mutex.lock();
00136
00137 if(!queueref.m_queue.empty()) {
00138 ret.second = true;
00139 ret.first = queueref.m_queue.front();
00140 queueref.m_queue.pop_front();
00141 queueref.numelem--;
00142 queueref.m_mutex.unlock();
00143 break;
00144 }
00145 queueref.m_mutex.unlock();
00146 }
00147 id++;
00148 if (id == allqueues.size()) id = 0;
00149 }
00150 return ret;
00151
00152 }
00153
00154
00155 inline bool empty(size_t id) {
00156 single_queue& queueref = allqueues[id];
00157 return queueref.numelem == 0;
00158 }
00159
00160
00161
00162
00163
00164 inline void stop_blocking() {
00165
00166 for (size_t i = 0; i < allqueues.size(); ++i) {
00167 allqueues[i].m_mutex.lock();
00168 }
00169 m_alive = false;
00170 for (size_t i = 0; i < allqueues.size(); ++i) {
00171 allqueues[i].m_conditional.broadcast();
00172 allqueues[i].m_mutex.unlock();
00173 }
00174
00175 }
00176
00177
00178 inline size_t size(size_t id) {
00179 single_queue& queueref = allqueues[id];
00180 return queueref.numelem;
00181 }
00182
00183
00184
00185 ~multi_blocking_queue() {
00186 stop_blocking();
00187 }
00188 };
00189
00190
00191 }
00192
00193 #include <graphlab/macros_undef.hpp>
00194
00195 #endif
00196