reply_increment_counter.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef REPLY_INCREMENT_COUNTER_HPP
00019 #define REPLY_INCREMENT_COUNTER_HPP
00020 #include <string>
00021 #include <graphlab/parallel/atomic.hpp>
00022 #include <graphlab/parallel/pthread_tools.hpp>
00023 
00024 namespace graphlab {
00025 
00026 class distributed_control;
00027 
00028 namespace dc_impl {
00029 /**
00030 \ingroup rpc
00031 A wrapper around a char array. This structure 
00032 is incapable of freeing itself and must be managed externally
00033 */
00034 struct blob {
00035   /// Constructs a blob containing a pointer to a character array with length len
00036   blob(char* c, size_t len):c(c),len(len) { };
00037   blob():c(NULL), len(0){ };
00038   
00039   char *c;  ///< stored pointer 
00040   size_t len; ///< stored length
00041   
00042   
00043   /// serialize the char array
00044   void save(oarchive& oarc) const {
00045     oarc << len;
00046     if (len > 0) serialize(oarc, c, len);
00047   }
00048   
00049   /// deserializes a char array. If there is already a char array here, it will be freed
00050  void load(iarchive& iarc) {
00051     if (c) ::free(c);
00052     c = NULL;
00053     iarc >> len;
00054     if (len > 0) {
00055       c = (char*) malloc(len);
00056       deserialize(iarc, c, len);
00057     }
00058   }
00059   
00060   /// Free the stored char array.
00061   void free() {
00062     if (c) {
00063       ::free(c);
00064       c = NULL;
00065       len = 0;
00066     }
00067   }
00068 };
00069 
00070 /**
00071 Defines a really useful function that performs an atomic
00072 increment of a flag when called. This is useful for waiting
00073 for a reply to a request
00074 \note: usemutex = false probably does not work and should be deprecated.
00075 \see reply_increment_counter
00076 */
00077 struct reply_ret_type{
00078   atomic<size_t> flag;
00079   blob val;
00080   bool usemutex;
00081   mutex mut;
00082   conditional cond;
00083   /**
00084    * Constructs a reply object which waits for 'retcount' replies.
00085    * usemutex should always be true
00086    */
00087   reply_ret_type(bool usemutex, size_t retcount = 1):flag(retcount), 
00088                                                      usemutex(true) { 
00089   }
00090   
00091   ~reply_ret_type() { }
00092 
00093   /**
00094    * Waits for all replies to complete. It is up to the 
00095    * reply implementation to decrement the counter.
00096    */
00097   inline void wait() {
00098     if (usemutex) {
00099       mut.lock();
00100       while(flag.value != 0) cond.wait(mut);
00101       mut.unlock();
00102     }
00103     else {
00104       while(flag.value != 0) sched_yield();
00105     }
00106   }
00107 };
00108 
00109 }
00110 
00111 
00112 /**
00113  * \ingroup rpc
00114  * A simple RPC call which converts ptr to a pointer to a reply_ret_type,
00115  * stores the blob in it, and decrements its reply counter.
00116  * \see reply_ret_type
00117  */
00118 void reply_increment_counter(distributed_control &dc, procid_t src, 
00119                              size_t ptr, dc_impl::blob ret);
00120 
00121 }
00122 
00123 #endif
00124