multi_blocking_queue.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef GRAPHLAB_MULTI_multi_blocking_queue_HPP
00019 #define GRAPHLAB_MULTI_multi_blocking_queue_HPP
00020 
00021 
00022 #include <vector>
00023 #include <deque>
00024 #include <iostream>
00025 #include <graphlab/util/random.hpp>
00026 #include <graphlab/parallel/pthread_tools.hpp>
00027 
00028 #include <graphlab/macros_def.hpp>
00029 
00030 namespace graphlab {
00031 
00032    /** 
00033     * \ingroup util_internal
00034     * \brief Implements a blocking queue useful for producer/consumer models
00035     */
00036     
00037   template<typename T>
00038   class multi_blocking_queue {
00039   private:
00040     
00041     typedef typename std::deque<T> queue_type;
00042     
00043     struct single_queue {
00044       queue_type m_queue;
00045       mutex m_mutex;
00046       conditional m_conditional;
00047       bool handler_sleeping;
00048       size_t numelem;
00049     };
00050     size_t num_queues;
00051     std::vector<single_queue> allqueues;
00052     bool m_alive;
00053     
00054   public:
00055     
00056     //! creates a blocking queue
00057     multi_blocking_queue(size_t num_queues = 1) : 
00058           num_queues(num_queues), m_alive(true) { 
00059       init(num_queues);
00060     }
00061     
00062     /**
00063       an alternate initialization which can be called after construction.
00064       This is not safe once the queue is being used.
00065     */
00066     void init(size_t nqueues) {
00067       num_queues = nqueues;
00068       allqueues.resize(num_queues);
00069       for (size_t i = 0;i < allqueues.size(); ++i) {
00070         allqueues[i].handler_sleeping = false;
00071         allqueues[i].numelem = 0;
00072       }
00073     }
00074     
00075     //! Add an element to the blocking queue
00076     inline void enqueue(const T& elem) {
00077       // TODO: this can easily be done with some bit operations on the 
00078       const size_t prod = 
00079         random::fast_uniform<size_t>(0, num_queues * num_queues - 1);
00080       size_t r1 = prod / num_queues;
00081       size_t r2 = prod % num_queues;
00082       size_t qidx = 
00083         (allqueues[r1].numelem < allqueues[r2].numelem) ? r1 : r2;
00084       single_queue& queueref = allqueues[qidx];
00085       queueref.m_mutex.lock();
00086       queueref.m_queue.push_back(elem);
00087       queueref.numelem++;
00088       if (queueref.handler_sleeping) queueref.m_conditional.signal();
00089       queueref.m_mutex.unlock();
00090     }
00091     
00092     //! Add an element to the blocking queue
00093     inline void enqueue_specific(const T& elem, size_t qidx_) {
00094       size_t qidx = qidx_ % num_queues;
00095       single_queue& queueref = allqueues[qidx];
00096       queueref.m_mutex.lock();
00097       queueref.m_queue.push_back(elem);
00098       queueref.numelem++;
00099       if (queueref.handler_sleeping) queueref.m_conditional.signal();
00100       queueref.m_mutex.unlock();
00101     }
00102     /**
00103      * Blocks until an element is available in the queue or an
00104      * interrupt is invoked on the queue.
00105      */
00106     inline std::pair<T, bool> dequeue(size_t id) {
00107       std::pair<T, bool> ret;
00108       ret.second = false;
00109       
00110       single_queue& queueref = allqueues[id];
00111       queueref.m_mutex.lock();
00112       while(queueref.m_queue.empty() && m_alive) {
00113         queueref.handler_sleeping = true;
00114         queueref.m_conditional.wait(queueref.m_mutex);
00115         queueref.handler_sleeping = false;
00116       }
00117       
00118       if(!queueref.m_queue.empty()) {
00119         ret.second = true;  // success
00120         ret.first = queueref.m_queue.front();
00121         queueref.m_queue.pop_front();
00122         queueref.numelem--;
00123       } 
00124       queueref.m_mutex.unlock();
00125       return ret;
00126     }
00127 
00128     inline std::pair<T, bool> try_dequeue(size_t id) {
00129       std::pair<T, bool> ret;
00130       ret.second = false;
00131       
00132       for (size_t i = 0; i < allqueues.size(); ++i) {
00133         single_queue& queueref = allqueues[id];
00134         if (queueref.numelem > 0) {
00135           queueref.m_mutex.lock();
00136           
00137           if(!queueref.m_queue.empty()) {
00138             ret.second = true;  // success
00139             ret.first = queueref.m_queue.front();
00140             queueref.m_queue.pop_front();
00141             queueref.numelem--;
00142             queueref.m_mutex.unlock();
00143             break;
00144           } 
00145           queueref.m_mutex.unlock();
00146         }
00147         id++;
00148         if (id == allqueues.size()) id = 0;
00149       }
00150       return ret;
00151 
00152     }
00153 
00154     //! Return true if queue 'id' is empty
00155     inline bool empty(size_t id) { 
00156       single_queue& queueref = allqueues[id];
00157       return queueref.numelem == 0;
00158     }
00159 
00160     /** Wakes up all threads waiting on the queue whether 
00161         or not an element is available. Once this function is called,
00162         the blocking queue is essentially destroyed and can no longer be used.
00163     */
00164     inline void stop_blocking() {
00165 
00166       for (size_t i = 0; i < allqueues.size(); ++i) {
00167         allqueues[i].m_mutex.lock();
00168       }
00169       m_alive = false;
00170       for (size_t i = 0; i < allqueues.size(); ++i) {
00171         allqueues[i].m_conditional.broadcast();
00172         allqueues[i].m_mutex.unlock();
00173       }
00174 
00175     }
00176     
00177     //! get the size of the queue
00178     inline size_t size(size_t id) {
00179       single_queue& queueref = allqueues[id];
00180       return queueref.numelem;
00181     }
00182 
00183 
00184 
00185     ~multi_blocking_queue() {
00186       stop_blocking();
00187     }    
00188   }; // end of multi_blocking_queue class
00189   
00190 
00191 } // end of namespace graphlab
00192 
00193 #include <graphlab/macros_undef.hpp>
00194 
00195 #endif
00196