dc_services.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #include <graphlab/rpc/dc_dist_object.hpp>
00019 #ifndef GRAPHLAB_DC_SERVICES_HPP
00020 #define GRAPHLAB_DC_SERVICES_HPP
00021 #include <graphlab/parallel/pthread_tools.hpp>
00022 
00023 
00024 
00025 #include <graphlab/macros_def.hpp>
00026 namespace graphlab {
00027 
00028   /**
00029     \ingroup rpc
00030     Creates a new context for MPI-like global global operations.
00031     Where all machines create an instance of dc_services at the same time,
00032     operations performed by the new dc_services instance will not interfere
00033     and will run in parallel with other contexts. 
00034     i.e. If I have two distributed dc_services instances, one instance can 
00035     perform a barrier while another instance performs a broadcast() at the same 
00036     time.
00037     
00038     \note Only simple algorithms for the MPI collective operations (barrier, broadcast, etc)
00039     are implemented. Significant work is necessary to improve the performance of the collectives.
00040   */
00041   class dc_services {
00042   private:
00043     dc_dist_object<dc_services> rmi;
00044   
00045   public:
00046     dc_services(distributed_control &dc):rmi(dc, this) {  }
00047     
00048     /// Returns the underlying dc_dist_object 
00049     dc_dist_object<dc_services>& rmi_instance() {
00050       return rmi;
00051     }
00052 
00053     /// Returns the underlying dc_dist_object 
00054     const dc_dist_object<dc_services>& rmi_instance() const {
00055       return rmi;
00056     }
00057   
00058     /**
00059     This comm barrier is not a true "barrier" but is
00060     essentially a sequentialization point. It guarantees that
00061     all calls from this machine to the target machine performed
00062     before the comm_barrier() call are completed before any call
00063     sent after the comm barrier() call.
00064     
00065       \note This affects the global context
00066     */
00067     inline void comm_barrier(procid_t targetmachine) {
00068       rmi.comm_barrier(targetmachine);
00069     }
00070 
00071   /**
00072     This is a convenience function which broadcasts a comm_barrier()
00073     \note having all machines call the comm barrier does not guarantee
00074     that all calls have been processed. Basically 'p' local barriers
00075     do not result in a global barrier.
00076     
00077     \note This affects the global context
00078   */
00079     inline void comm_barrier() {
00080       rmi.comm_barrier();
00081     }
00082     
00083     /**
00084     This is a blocking send_to. It send an object T to the target 
00085     machine, but waits for the target machine to call recv_from
00086     before returning. Functionally similar to MPI's matched sending/receiving
00087     */
00088     template <typename U>
00089     inline void send_to(procid_t target, U& t, bool control = false) {
00090       rmi.send_to(target, t, control);
00091     }
00092     
00093     /**
00094     A blocking recv_from. Must be matched with a send_to call from the
00095     target before both source and target resumes.
00096     */
00097     template <typename U>
00098     inline void recv_from(procid_t source, U& t, bool control = false) {
00099       rmi.recv_from(source, t, control);
00100     }
00101 
00102   /**
00103      This function allows one machine to broadcasts a variable to all machines.
00104 
00105      The originator calls broadcast with data provided in 
00106      in 'data' and originator set to true. 
00107      All other callers call with originator set to false.
00108 
00109      The originator will then return 'data'. All other machines
00110      will receive the originator's transmission in the "data" parameter.
00111 
00112      This call is guaranteed to have barrier-like behavior. That is to say,
00113      this call will block until all machines enter the broadcast function.
00114 
00115      \note Behavior is undefined if more than one machine calls broadcast
00116      with originator set to true.
00117 
00118      \note Behavior is undefined if multiple threads on the same machine
00119      call broadcast simultaneously. If multiple-thread broadcast is necessary,
00120      each thread should use its own instance of the services class.
00121   */
00122     template <typename U>
00123     inline void broadcast(U& data, bool originator, bool control = false) { 
00124       rmi.broadcast(data, originator, control);
00125     }
00126 
00127   /**
00128    * data must be of length data[numprocs].
00129    * My data is stored in data[dc.procid()].
00130    * when function returns, machine sendto will have the complete vector
00131    * where data[i] is the data contributed by machine i.
00132    * All machines must have the same parameter for "sendto"
00133    */
00134     template <typename U>
00135     inline void gather(std::vector<U>& data, procid_t sendto, bool control = false) {
00136       rmi.gather(data, sendto, control);
00137     }
00138 
00139   /**
00140    * Each machine creates a vector 'data' with size equivalent to the number of machines.
00141    * Each machine then fills the entry data[procid()] with information that it 
00142    * wishes to communicate.
00143    * After calling all_gather(), all machines will return with identical
00144    * vectors 'data', where data[i] contains the information machine i stored.
00145    */
00146     template <typename U>
00147     inline void all_gather(std::vector<U>& data, bool control = false) {
00148       rmi.all_gather(data, control);
00149     }
00150 
00151 
00152   /**
00153    * This function is takes a vector of local elements T which must
00154    * be comparable and constructs a vector of length numprocs where
00155    * each element is a subset of the local contribution from that
00156    * machine and the union of all elements in the union of all local
00157    * contributions and all entries are unique:
00158    *
00159    * Usage: Each process reads the files that are stored locally and
00160    * wants to know which subset of local files to read even when
00161    * multiple processes see the same files.
00162    */
00163     template <typename U>
00164     inline void gather_partition(const std::vector<U>& local_contribution,
00165                           std::vector< std::vector<U> >& ret_partition,
00166                           bool control = false) {
00167       rmi.gather_partition(local_contribution, ret_partition, control);
00168     }
00169     
00170     /**
00171     A regular barrier equivalent to MPI_Barrier.
00172     A thread machine entering this barrier will wait until one thread on each 
00173     machines enter this barrier.
00174     
00175     \see full_barrier
00176     */
00177     inline void barrier() {
00178       rmi.barrier();
00179     }
00180     
00181     
00182     /**
00183   This barrier ensures globally across all machines that
00184   all calls issued prior to this barrier are completed before
00185   returning. This function could return prematurely if
00186   other threads are still issuing function calls since we
00187   cannot differentiate between calls issued before the barrier
00188   and calls issued while the barrier is being evaluated.
00189   
00190   Therefore, when used in a multithreaded scenario, the user must ensure
00191   that all other threads which may perform operations using this object
00192   are stopped before the full barrier is initated.
00193   
00194   \see barrier
00195   */
00196     inline void full_barrier() {
00197       rmi.full_barrier();
00198     }
00199   
00200  
00201 
00202   };
00203 
00204 
00205 } // end of namespace graphlab
00206 
00207 
00208 #include <graphlab/macros_undef.hpp>
00209 #endif
00210