dc.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef GRAPHLAB_DC_HPP
00019 #define GRAPHLAB_DC_HPP
00020 #include <iostream>
00021 #include <boost/iostreams/stream.hpp>
00022 #include <graphlab/parallel/pthread_tools.hpp>
00023 #include <graphlab/util/resizing_array_sink.hpp>
00024 #include <graphlab/util/blocking_queue.hpp>
00025 #include <graphlab/util/multi_blocking_queue.hpp>
00026 #include <graphlab/util/dense_bitset.hpp>
00027 #include <graphlab/serialization/serialization_includes.hpp>
00028 #include <graphlab/metrics/metrics.hpp>
00029 
00030 #include <graphlab/rpc/dc_types.hpp>
00031 #include <graphlab/rpc/dc_internal_types.hpp>
00032 
00033 #include <graphlab/rpc/dc_receive.hpp>
00034 #include <graphlab/rpc/dc_send.hpp>
00035 #include <graphlab/rpc/dc_comm_base.hpp>
00036 #include <graphlab/rpc/dc_dist_object_base.hpp>
00037 
00038 #include <graphlab/rpc/is_rpc_call.hpp>
00039 #include <graphlab/rpc/portable_dispatch.hpp>
00040 #include <graphlab/rpc/portable_issue.hpp>
00041 #include <graphlab/rpc/function_call_issue.hpp>
00042 #include <graphlab/rpc/request_issue.hpp>
00043 #include <graphlab/rpc/reply_increment_counter.hpp>
00044 #include <graphlab/rpc/function_ret_type.hpp>
00045 
00046 #include <boost/preprocessor.hpp>
00047 #include <graphlab/rpc/function_arg_types_def.hpp>
00048 
00049 namespace graphlab {
00050 
00051 
00052 /**
00053  * \ingroup rpc
00054 Distributed control constructor parameters.
00055 */
00056 struct dc_init_param{
00057   /** A vector containing a list of hostnames/ipaddresses and port numbers
00058   * of all machines participating in this RPC program.
00059   * for instance:
00060   * \code
00061   * machines.push_back("127.0.0.1:10000");
00062   * machines.push_back("127.0.0.1:10001");
00063   * \endcode
00064   */
00065   std::vector<std::string> machines;
00066   
00067   /** Additional construction options of the form 
00068     "key1=value1,key2=value2".
00069     Available options are:
00070     
00071     \li \b compressed=yes Use ZLib compressed communication
00072     \li \b buffered_send=yes Put an circular buffer on outgoing transmission
00073     \li \b buffered_queued_send=yes Put a queue buffer on outgoing transmission
00074     \li \b buffered_queued_send_single=yes Like buffered_queued but use only one sending thread
00075     \li \b buffered_recv=yes Put a buffer on incoming transmissions 
00076                              (not recommended. Tends to decrease performance)
00077                              
00078     Internal options which should not be used
00079     \li \b __socket__=NUMBER Forces TCP comm to use this socket number for its
00080                              listening socket instead of creating a new one.
00081                              The socket must already be bound to the listening port.
00082   */
00083   std::string initstring; 
00084   
00085   /** The index of this machine into the machines vector */
00086   procid_t curmachineid;  
00087   /** Number of background RPC handling threads to create */
00088   size_t numhandlerthreads; 
00089   /** The communication method. Must be TCP_COMM */
00090   dc_comm_type commtype;    
00091 };
00092 
00093 #define DEFAULT_NUMHANDLERTHREADS 8
00094 #define DEFAULT_COMMTYPE TCP_COMM
00095 
00096 // forward declaration for dc services
00097 class dc_services;
00098 class distributed_control;
00099 
00100 
00101 /**
00102  * \ingroup rpc
00103 graphlab::distributed_control is the primary distributed RPC object. This class initializes distributed
00104 communication, as well as provide basic RPC routines and collective operations.
00105 
00106 In addition to the documented functions, the following RPC routines are provided.
00107 
00108 \par void distributed_control::remote_call(procid_t targetmachine, function, ...)
00109  remote_call performs a non-blocking RPC call to the target machine to
00110  run the provided function pointer. All arguments are transmitted by value
00111  and must be serializable.
00112  \li \b targetmachine: The ID of the machine to run the function on
00113  \li \b function: The function to run on the target machine
00114 
00115 \par void distributed_control::fast_remote_call(procid_t targetmachine, function, ...)
00116  fast_remote_call is the same as remote_call, but the receiver completes the function
00117  call using the receiving thread instead of a thread pool. This should only be used if
00118  the function is short and does not block.
00119  \li \b targetmachine: The ID of the machine to run the function on
00120  \li \b function: The function to run on the target machine
00121 
00122 \par void distributed_control::control_call(procid_t targetmachine, function, ...)
00123  Same as remote_call, but calls performed using the control_call do not contribute
00124  to the call counter and has no effect on graphlab::async_consensus.
00125  \li \b targetmachine: The ID of the machine to run the function on
00126  \li \b function: The function to run on the target machine
00127 
00128 
00129 \par RetType distributed_control::remote_request(procid_t targetmachine, function, ...)
00130  remote_request performs a blocking RPC call to the target machine to
00131  run the provided function pointer. All arguments are transmitted by value
00132  and must be serializable. This call only returns when the target machine
00133  completes the function call. The return value of the call is serialized and
00134  returned.
00135  \li \b targetmachine: The ID of the machine to run the function on
00136  \li \b function: The function to run on the target machine
00137 
00138 \par RetType distributed_control::fast_remote_request(procid_t targetmachine, function, ...)
00139  fast_remote_request is the same as remote_request, but the receiver completes the function
00140  call using the receiving thread instead of a thread pool. This should only be used if
00141  the function is short and does not block.
00142  \li \b targetmachine: The ID of the machine to run the function on
00143  \li \b function: The function to run on the target machine
00144 
00145 \par void distributed_control::control_request(procid_t targetmachine, function, ...)
00146  Same as remote_request, but calls performed using the control_request do not contribute
00147  to the call counter and has no effect on graphlab::async_consensus.
00148  \li \b targetmachine: The ID of the machine to run the function on
00149  \li \b function: The function to run on the target machine
00150 
00151 */
00152 class distributed_control{
00153   public:
00154         /**  Each element of the function call queue is a data/len pair */
00155     struct function_call_block{
00156       function_call_block() {}
00157       function_call_block(procid_t source, const dc_impl::packet_hdr& hdr, 
00158                           char* data, size_t len): 
00159                           source(source), hdr(hdr), 
00160                           data(data), len(len) {}
00161       procid_t source;
00162       dc_impl::packet_hdr hdr;
00163       char* data;
00164       size_t len;
00165     };
00166   private:
00167    /// initialize receiver threads. private form of the constructor
00168    void init(const std::vector<std::string> &machines,
00169              const std::string &initstring,
00170              procid_t curmachineid,
00171              size_t numhandlerthreads,
00172              dc_comm_type commtype = DEFAULT_COMMTYPE);
00173    
00174   /// a pointer to the communications subsystem
00175   dc_impl::dc_comm_base* comm; 
00176  
00177   /// senders and receivers to all machines
00178   std::vector<dc_impl::dc_receive*> receivers;
00179   std::vector<dc_impl::dc_send*> senders;
00180   
00181   /// A thread group of function call handlers
00182   thread_group fcallhandlers;
00183   
00184   /// a queue of functions to be executed
00185   multi_blocking_queue<function_call_block> fcallqueue;
00186   
00187   /// A map of function name to dispatch function. Used for "portable" calls
00188   dc_impl::dispatch_map_type portable_dispatch_call_map;
00189   dc_impl::dispatch_map_type portable_dispatch_request_map;
00190 
00191   
00192   /// object registrations;
00193   std::vector<void*> registered_objects;
00194   std::vector<dc_impl::dc_dist_object_base*> registered_rmi_instance;
00195 
00196   /// For convenience, we provide a instance of dc_services 
00197   dc_services* distributed_services;
00198 
00199   /// ID of the local machine
00200   procid_t localprocid;
00201 
00202   
00203   /// Number of machines
00204   procid_t localnumprocs;
00205   
00206   std::vector<atomic<size_t> > global_calls_sent;
00207   std::vector<atomic<size_t> > global_calls_received;
00208   
00209   bool single_sender;
00210   
00211   /// the callback given to the comms class. Called when data is inbound
00212   friend void dc_recv_callback(void* tag, procid_t src, const char* buf, size_t len);
00213   
00214   
00215   /// the callback given to the comms class. Called when data is inbound
00216   template <typename T> friend class dc_dist_object;
00217   
00218   
00219   /// disable the operator= by placing it in private 
00220   distributed_control& operator=(const distributed_control& dc) { return *this; }
00221 
00222 
00223   std::map<std::string, std::string> parse_options(std::string initstring);
00224   
00225   volatile inline size_t num_registered_objects() {
00226     return registered_objects.size();
00227   }
00228   
00229 
00230   void compute_master_ranks();
00231   procid_t masterid;
00232   
00233   metrics rpc_metrics;
00234   
00235  public:
00236    
00237 
00238   
00239   distributed_control(dc_init_param initparam) {
00240     init(initparam.machines, 
00241          initparam.initstring, 
00242          initparam.curmachineid, 
00243          initparam.numhandlerthreads,
00244          initparam.commtype);
00245   }
00246 
00247   distributed_control(const std::vector<std::string> &machines,
00248                       const std::string &initstring, 
00249                       procid_t curmachineid, 
00250                       size_t numhandlerthreads = DEFAULT_NUMHANDLERTHREADS,
00251                       dc_comm_type commtype = DEFAULT_COMMTYPE) {
00252     init(machines, initstring, curmachineid, numhandlerthreads, commtype);
00253   }
00254 
00255   ~distributed_control();
00256 
00257   /// returns the id of the current processor
00258   inline procid_t procid() const {
00259     return localprocid;
00260   }
00261   
00262   /// returns the number of processors in total.
00263   inline procid_t numprocs() const {
00264     return localnumprocs;
00265   }
00266   
00267   /**
00268   Sets the sequentialization key to the new value, returning the previous value.
00269   When the key is set to an arbitrary non-zero value, all 
00270   remote calls/remote requests made by the current thread will be
00271   sequentially processed by the remote machines.
00272   
00273   All RPC calls made using the same key value will sequentialize.
00274   
00275   User should 
00276   oldval = set_sequentialization_key(newval)
00277   ...
00278   ... do stuff
00279   ...
00280   set_sequentialization_key(oldval)
00281   */
00282   static unsigned char set_sequentialization_key(unsigned char newkey);
00283   
00284   /**
00285   Creates a new sequentialization key, returning the old value.
00286   All remote calls/remote requests made by the current thread will be
00287   sequentially processed by the remote machines.
00288   
00289   Essentially all RPC calls made using the same key value will sequentialize.
00290   However, since new_sequentialization_key() uses a very naive key selection system,
00291   we recommend the use of set_sequentialization_key() especially in the case of
00292   multi-threaded code.
00293 
00294   User should 
00295   oldval = new_sequentialization_key()
00296   ...
00297   ... do stuff
00298   ...
00299   set_sequentialization_key(oldval)
00300   All RPC calls in while the key is set will be sequentialized on the receiving
00301   machine.
00302   */
00303   static unsigned char new_sequentialization_key();
00304   
00305   /// gets the current sequentialization key. This function is not generally useful.
00306   static unsigned char get_sequentialization_key();
00307 
00308   
00309   /*
00310   This generates the interface functions for the standard calls, basic calls, and fast calls
00311   The generated code looks like this:
00312   
00313   template<typename F , typename T0> void remote_call (procid_t target, F remote_function , const T0 &i0 )
00314   {
00315     ASSERT_LT(target, senders.size());
00316     dc_impl::remote_call_issue1 <F , T0> ::exec(senders[target], 
00317                                                 STANDARD_CALL, 
00318                                                 target, 
00319                                                 remote_function , 
00320                                                 i0 );                                
00321   }
00322   The arguments passed to the RPC_INTERFACE_GENERATOR ARE: (interface name, issue processor name, flags)
00323 
00324   */
00325   #define GENARGS(Z,N,_)  BOOST_PP_CAT(const T, N) BOOST_PP_CAT(&i, N)
00326   #define GENI(Z,N,_) BOOST_PP_CAT(i, N)
00327   #define GENT(Z,N,_) BOOST_PP_CAT(T, N)
00328   #define GENARC(Z,N,_) arc << BOOST_PP_CAT(i, N);
00329 
00330   #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
00331   template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
00332   void  BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) {  \
00333     ASSERT_LT(target, senders.size()); \
00334     BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
00335         <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
00336           ::exec(senders[target],  BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
00337   }   \
00338   
00339   /*
00340   Generates the interface functions. 3rd argument is a tuple (interface name, issue name, flags)
00341   */
00342   BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (remote_call, dc_impl::remote_call_issue, STANDARD_CALL) )
00343   BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (fast_remote_call,dc_impl::remote_call_issue, FAST_CALL) )
00344   BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (control_call, dc_impl::remote_call_issue, (FAST_CALL | CONTROL_PACKET)) )
00345  
00346 
00347   #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
00348   template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
00349     BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) {  \
00350     ASSERT_LT(target, senders.size()); \
00351     return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
00352         <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
00353           ::exec(senders[target],  BOOST_PP_TUPLE_ELEM(3,2,ARGS), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
00354   }   \
00355 
00356   /*
00357   Generates the interface functions. 3rd argument is a tuple (interface name, issue name, flags)
00358   */
00359  // BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type remote_request, dc_impl::remote_request_issue, 0) )
00360    BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type remote_request, dc_impl::remote_request_issue, STANDARD_CALL) )
00361 
00362   BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type fast_remote_request, dc_impl::remote_request_issue, FAST_CALL) )
00363   BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type control_request, dc_impl::remote_request_issue, (FAST_CALL | CONTROL_PACKET)) )
00364  
00365 
00366   
00367   #undef RPC_INTERFACE_GENERATOR
00368   #undef REQUEST_INTERFACE_GENERATOR
00369   #undef GENARC
00370   #undef GENT
00371   #undef GENI
00372   #undef GENARGS
00373   
00374   /**
00375    * \cond DC_INTERNAL
00376   Immediately calls the function described by the data
00377   inside the buffer. This should not be called directly.
00378   */
00379   void exec_function_call(procid_t source, const dc_impl::packet_hdr& hdr, std::istream &istrm);
00380   
00381   
00382   
00383   /**
00384   Performs a deferred function call using the information
00385   inside the buffer. This function will take over ownership of 
00386   the buffer and will free it when done
00387   */
00388   void deferred_function_call(procid_t source, const dc_impl::packet_hdr& hdr, 
00389                               char* buf, size_t len);
00390   
00391 
00392   /**
00393   This is called by the function handler threads
00394   */
00395   void fcallhandler_loop(size_t id);
00396   
00397   inline void inc_calls_sent(procid_t procid) {
00398 //    ASSERT_FALSE(full_barrier_in_effect);
00399     global_calls_sent[procid].inc();
00400   }
00401 
00402   inline void inc_calls_received(procid_t procid) {
00403     
00404     if (!full_barrier_in_effect) {
00405         global_calls_received[procid].inc();
00406     }
00407     else {
00408       //check the proc I just incremented.
00409       // If I just exceeded the required size, I need
00410       // to decrement the full barrier counter
00411       if (global_calls_received[procid].inc() == calls_to_receive[procid]) {
00412         // if it was me who set the bit
00413         if (procs_complete.set_bit(procid) == false) {
00414           // then decrement the incomplete count.
00415           // if it was me to decreased it to 0
00416           // lock and signal
00417           full_barrier_lock.lock();
00418           if (num_proc_recvs_incomplete.dec() == 0) {
00419             full_barrier_cond.signal();
00420           }
00421           full_barrier_lock.unlock();
00422         }
00423       }
00424     }
00425   }
00426   /// \endcond
00427   
00428   inline size_t calls_sent() const {
00429     size_t ctr = 0;
00430     for (size_t i = 0;i < numprocs(); ++i) {
00431       ctr += global_calls_sent[i].value;
00432     }
00433     return ctr;
00434   }
00435 
00436   inline size_t calls_received() const {
00437     size_t ctr = 0;
00438     for (size_t i = 0;i < numprocs(); ++i) {
00439       ctr += global_calls_received[i].value;
00440     }
00441     return ctr;
00442   }
00443 
00444   inline size_t bytes_sent() const {
00445     if (single_sender) {
00446       return senders[0]->bytes_sent();
00447     }
00448     else {
00449       size_t ret = 0;
00450       for (size_t i = 0;i < senders.size(); ++i) ret += senders[i]->bytes_sent();
00451       return ret;
00452     }
00453   }  
00454   
00455   
00456   inline size_t network_bytes_sent() const {
00457     return comm->network_bytes_sent();
00458   }  
00459   
00460   inline size_t bytes_received() const {
00461     size_t ret = 0;
00462     for (size_t i = 0;i < receivers.size(); ++i) ret += receivers[i]->bytes_received();
00463     return ret;
00464   }  
00465 
00466   /**
00467     Returns true if this is the process with the lowest ID
00468     currently running on this machine in this working directory
00469   */
00470   inline bool is_master_rank() const {
00471     return masterid == procid();
00472   }
00473 
00474 
00475   /**
00476     Returns the lowest ID of all the processes
00477     currently running on this machine in this working directory
00478   */  
00479   inline procid_t master_rank() const {
00480     return masterid;
00481   }
00482 
00483   /**
00484     registers a portable RPC call.
00485   */
00486   template <typename F, F f>
00487   void register_rpc(std::string c) {
00488     portable_dispatch_request_map[c] = (dc_impl::dispatch_type)
00489               dc_impl::portable_detail::find_dispatcher<F,        // function type
00490                               __GLRPC_FRESULT,                            // result
00491                               boost::function_traits<               
00492                                     typename boost::remove_pointer<F>::type
00493                                                     >::arity ,   // number of arguments
00494                               f,                                    // function itself
00495                               typename dc_impl::is_rpc_call<F>::type  // whether it is an RPC style call
00496                               >::dispatch_request_fn();
00497                               
00498     portable_dispatch_call_map[c] = (dc_impl::dispatch_type)
00499               dc_impl::portable_detail::find_dispatcher<F,        // function type
00500                               __GLRPC_FRESULT,                            // result
00501                               boost::function_traits<               
00502                                     typename boost::remove_pointer<F>::type
00503                                                     >::arity ,   // number of arguments
00504                               f,                                    // function itself
00505                               typename dc_impl::is_rpc_call<F>::type  // whether it is an RPC style call
00506                               >::dispatch_call_fn();
00507   }
00508 
00509   /// \cond DC_INTERNAL
00510   inline size_t register_object(void* v, dc_impl::dc_dist_object_base *rmiinstance) {
00511     ASSERT_NE(v, (void*)NULL);
00512     registered_objects.push_back(v);
00513     registered_rmi_instance.push_back(rmiinstance);
00514     return registered_objects.size() - 1;
00515   }
00516 
00517 
00518   inline void* get_registered_object(size_t id) {
00519     while(id >= num_registered_objects()) sched_yield();
00520     ASSERT_NE(registered_objects[id], (void*)NULL);
00521     return registered_objects[id];
00522   }
00523 
00524   inline dc_impl::dc_dist_object_base* get_rmi_instance(size_t id) {
00525     while(id >= num_registered_objects()) sched_yield();
00526     ASSERT_NE(registered_rmi_instance[id], (void*)NULL);
00527     return registered_rmi_instance[id];
00528   }  
00529   inline void clear_registered_object(size_t id) {
00530     registered_objects[id] = (void*)NULL;
00531     registered_rmi_instance[id] = NULL;
00532   }
00533   
00534   
00535   /**
00536   This is depreated. Use the public functions. In particular
00537   services().full_barrier() may not work as expected
00538   */
00539   __attribute__((__deprecated__)) dc_services& services();
00540   
00541   /// \endcond
00542   
00543   /**
00544    This comm barrier is not a true "barrier" but is
00545    essentially a sequentialization point. It guarantees that
00546    all calls from this machine to the target machine performed
00547    before the comm_barrier() call are completed before any call
00548    sent after the comm barrier() call.
00549   */
00550   void comm_barrier(procid_t targetmachine);
00551   
00552   /**
00553     This is a convenience function which broadcasts a comm_barrier()
00554     \note having all machines call the comm barrier does not guarantee
00555     that all calls have been processed. Basically 'p' local barriers
00556     do not result in a global barrier.
00557   */
00558   void comm_barrier();
00559 
00560 
00561 
00562 // Temp hack.
00563   long long int total_bytes_sent;
00564   long long int get_total_bytes_sent() {
00565      return total_bytes_sent;
00566   }
00567 
00568   /**
00569   This is a blocking send_to. It send an object T to the target 
00570   machine, but waits for the target machine to call recv_from
00571   before returning. Functionally similar to MPI's matched sending/receiving
00572   */
00573   template <typename U>
00574   inline void send_to(procid_t target, U& t, bool control = false);
00575   
00576   /**
00577   A blocking recv_from. Must be matched with a send_to call from the
00578   target before both source and target resumes.
00579   */
00580   template <typename U>
00581   inline void recv_from(procid_t source, U& t, bool control = false);
00582 
00583   
00584   /**
00585     This function allows one machine to broadcasts a variable to all machines.
00586 
00587     The originator calls broadcast with data provided in 
00588     in 'data' and originator set to true. 
00589     All other callers call with originator set to false.
00590 
00591     The originator will then return 'data'. All other machines
00592     will receive the originator's transmission in the "data" parameter.
00593 
00594     This call is guaranteed to have barrier-like behavior. That is to say,
00595     this call will block until all machines enter the broadcast function.
00596 
00597     \note Behavior is undefined if more than one machine calls broadcast
00598     with originator set to true.
00599 
00600     \note Behavior is undefined if multiple threads on the same machine
00601     call broadcast simultaneously. If multiple-thread broadcast is necessary,
00602     each thread should use its own instance of the services class.
00603   */
00604   template <typename U>
00605   inline void broadcast(U& data, bool originator, bool control = false);
00606 
00607   /**
00608    * Collects information contributed by each machine onto 
00609    * one machine.
00610    * data must be of length data[numprocs].
00611    * My data is stored in data[dc.procid()].
00612    * when function returns, machine sendto will have the complete vector
00613    * where data[i] is the data contributed by machine i.
00614    * All machines must have the same parameter for "sendto"
00615    */
00616   template <typename U>
00617   inline void gather(std::vector<U>& data, procid_t sendto, bool control = false);
00618 
00619   
00620   /**
00621    * Each machine creates a vector 'data' with size equivalent to the number of machines.
00622    * Each machine then fills the entry data[procid()] with information that it 
00623    * wishes to communicate.
00624    * After calling all_gather(), all machines will return with identical
00625    * vectors 'data', where data[i] contains the information machine i stored.
00626    */
00627   template <typename U>
00628   inline void all_gather(std::vector<U>& data, bool control = false);
00629 
00630   
00631   /**
00632    * This function is takes a vector of local elements T which must
00633    * be comparable and constructs a vector of length numprocs where
00634    * each element is a subset of the local contribution from that
00635    * machine and the union of all elements in the union of all local
00636    * contributions and all entries are unique:
00637    *
00638    * Usage: Each process reads the files that are stored locally and
00639    * wants to know which subset of local files to read even when
00640    * multiple processes see the same files.
00641    */
00642   template <typename U>
00643   inline void gather_partition(const std::vector<U>& local_contribution,
00644                         std::vector< std::vector<U> >& ret_partition,
00645                         bool control = false);
00646   
00647 /**
00648     A regular barrier equivalent to MPI_Barrier.
00649     A machine entering this barrier will wait until every machine 
00650     reaches this barrier before continuing. Only one thread from each machine
00651     should call the barrier.
00652     
00653     \see full_barrier
00654     */
00655   void barrier();
00656   
00657 
00658 
00659 
00660  /*****************************************************************************
00661                       Implementation of Full Barrier
00662 *****************************************************************************/
00663   /**
00664   Similar to the barrier(), but provides additional guarantees that 
00665   all calls issued prior to this barrier are completed before
00666   returning. 
00667   
00668   \note This function could return prematurely if
00669   other threads are still issuing function calls since we
00670   cannot differentiate between calls issued before the barrier
00671   and calls issued while the barrier is being evaluated.
00672   Therefore, when used in a multithreaded scenario, the user must ensure
00673   that all other threads which may perform operations using this object
00674   are stopped before the full barrier is initated.
00675   
00676   \see barrier
00677   */
00678   void full_barrier();
00679  private:
00680   mutex full_barrier_lock;
00681   conditional full_barrier_cond;
00682   std::vector<size_t> calls_to_receive;
00683   // used to inform the counter that the full barrier
00684   // is in effect and all modifications to the calls_recv
00685   // counter will need to lock and signal
00686   bool full_barrier_in_effect;
00687   
00688   /** number of 'source' processor counts which have
00689   not achieved the right recv count */
00690   atomic<size_t> num_proc_recvs_incomplete; 
00691                                       
00692   /// Marked as 1 if the proc is complete
00693   dense_bitset procs_complete;
00694   
00695  /*****************************************************************************
00696                       Collection of Statistics
00697 *****************************************************************************/
00698  
00699  private:
00700   struct collected_statistics {
00701     size_t callssent;
00702     size_t bytessent;
00703     size_t network_bytessent;
00704     collected_statistics(): callssent(0), bytessent(0), network_bytessent(0) { }
00705     void save(oarchive &oarc) const {
00706       oarc << callssent << bytessent << network_bytessent;
00707     }
00708     void load(iarchive &iarc) {
00709       iarc >> callssent >> bytessent >> network_bytessent;
00710     }
00711   };
00712  public:
00713   /** Gather RPC statistics. All machines must call 
00714    this function at the same time. However, only proc 0 will
00715    return values */
00716   std::map<std::string, size_t> gather_statistics();
00717 
00718   /** Fills metrics information. All machines must call this
00719    * function simultaneously. Only proc 0 will have metrics
00720    */
00721   void fill_metrics();
00722 
00723   /** returns metrics information collected by fill_metrics
00724    *  Only proc 0 will have metrics
00725    */
00726   inline metrics get_metrics() {
00727     return rpc_metrics;
00728   }
00729 
00730   inline void reset_metrics() {
00731     logstream(LOG_WARNING) << "Metrics cannot be reset on distributed control" << std::endl;
00732   }
00733   
00734   /** Dumps the metric information to a reporter
00735    * Only proc 0 will have metrics
00736    */
00737   inline void report_metrics(imetrics_reporter &reporter) {
00738     rpc_metrics.report(reporter);
00739   }
00740 
00741 };
00742 
00743 
00744 
00745 
00746 } // namespace graphlab
00747 
00748 #define REGISTER_RPC(dc, f) dc.register_rpc<typeof(f)*, f>(std::string(BOOST_PP_STRINGIZE(f))) 
00749 
00750 #include <graphlab/rpc/function_arg_types_undef.hpp>
00751 #include <graphlab/rpc/function_call_dispatch.hpp>
00752 #include <graphlab/rpc/request_dispatch.hpp>
00753 #include <graphlab/rpc/dc_dist_object.hpp>
00754 #include <graphlab/rpc/dc_services.hpp>
00755 
00756 namespace graphlab {
00757 
00758 template <typename U>
00759 inline void distributed_control::send_to(procid_t target, U& t, bool control) {
00760   distributed_services->send_to(target, t, control);
00761 }
00762 
00763 template <typename U>
00764 inline void distributed_control::recv_from(procid_t source, U& t, bool control) {
00765   distributed_services->recv_from(source, t, control);
00766 }
00767 
00768 template <typename U>
00769 inline void distributed_control::broadcast(U& data, bool originator, bool control) { 
00770   distributed_services->broadcast(data, originator, control);
00771 }
00772 
00773 template <typename U>
00774 inline void distributed_control::gather(std::vector<U>& data, procid_t sendto, bool control) {
00775   distributed_services->gather(data, sendto, control);
00776 }
00777 
00778 template <typename U>
00779 inline void distributed_control::all_gather(std::vector<U>& data, bool control) {
00780   distributed_services->all_gather(data, control);
00781 }
00782 
00783 template <typename U>
00784 inline void distributed_control::gather_partition(const std::vector<U>& local_contribution,
00785                       std::vector< std::vector<U> >& ret_partition,
00786                       bool control) {
00787   distributed_services->gather_partition(local_contribution, ret_partition, control);
00788 }
00789 
00790 
00791 
00792 }
00793 #endif
00794