00001 /* 00002 This file is part of GraphLab. 00003 00004 GraphLab is free software: you can redistribute it and/or modify 00005 it under the terms of the GNU Lesser General Public License as 00006 published by the Free Software Foundation, either version 3 of 00007 the License, or (at your option) any later version. 00008 00009 GraphLab is distributed in the hope that it will be useful, 00010 but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00012 GNU Lesser General Public License for more details. 00013 00014 You should have received a copy of the GNU Lesser General Public 00015 License along with GraphLab. If not, see <http://www.gnu.org/licenses/>. 00016 */ 00017 00018 #include <graphlab/rpc/dc_dist_object.hpp> 00019 #ifndef GRAPHLAB_DC_SERVICES_HPP 00020 #define GRAPHLAB_DC_SERVICES_HPP 00021 #include <graphlab/parallel/pthread_tools.hpp> 00022 00023 00024 00025 #include <graphlab/macros_def.hpp> 00026 namespace graphlab { 00027 00028 /** 00029 \ingroup rpc 00030 Creates a new context for MPI-like global global operations. 00031 Where all machines create an instance of dc_services at the same time, 00032 operations performed by the new dc_services instance will not interfere 00033 and will run in parallel with other contexts. 00034 i.e. If I have two distributed dc_services instances, one instance can 00035 perform a barrier while another instance performs a broadcast() at the same 00036 time. 00037 00038 \note Only simple algorithms for the MPI collective operations (barrier, broadcast, etc) 00039 are implemented. Significant work is necessary to improve the performance of the collectives. 00040 */ 00041 class dc_services { 00042 private: 00043 dc_dist_object<dc_services> rmi; 00044 00045 public: 00046 dc_services(distributed_control &dc):rmi(dc, this) { } 00047 00048 /// Returns the underlying dc_dist_object 00049 dc_dist_object<dc_services>& rmi_instance() { 00050 return rmi; 00051 } 00052 00053 /// Returns the underlying dc_dist_object 00054 const dc_dist_object<dc_services>& rmi_instance() const { 00055 return rmi; 00056 } 00057 00058 /** 00059 This comm barrier is not a true "barrier" but is 00060 essentially a sequentialization point. It guarantees that 00061 all calls from this machine to the target machine performed 00062 before the comm_barrier() call are completed before any call 00063 sent after the comm barrier() call. 00064 00065 \note This affects the global context 00066 */ 00067 inline void comm_barrier(procid_t targetmachine) { 00068 rmi.comm_barrier(targetmachine); 00069 } 00070 00071 /** 00072 This is a convenience function which broadcasts a comm_barrier() 00073 \note having all machines call the comm barrier does not guarantee 00074 that all calls have been processed. Basically 'p' local barriers 00075 do not result in a global barrier. 00076 00077 \note This affects the global context 00078 */ 00079 inline void comm_barrier() { 00080 rmi.comm_barrier(); 00081 } 00082 00083 /** 00084 This is a blocking send_to. It send an object T to the target 00085 machine, but waits for the target machine to call recv_from 00086 before returning. Functionally similar to MPI's matched sending/receiving 00087 */ 00088 template <typename U> 00089 inline void send_to(procid_t target, U& t, bool control = false) { 00090 rmi.send_to(target, t, control); 00091 } 00092 00093 /** 00094 A blocking recv_from. Must be matched with a send_to call from the 00095 target before both source and target resumes. 00096 */ 00097 template <typename U> 00098 inline void recv_from(procid_t source, U& t, bool control = false) { 00099 rmi.recv_from(source, t, control); 00100 } 00101 00102 /** 00103 This function allows one machine to broadcasts a variable to all machines. 00104 00105 The originator calls broadcast with data provided in 00106 in 'data' and originator set to true. 00107 All other callers call with originator set to false. 00108 00109 The originator will then return 'data'. All other machines 00110 will receive the originator's transmission in the "data" parameter. 00111 00112 This call is guaranteed to have barrier-like behavior. That is to say, 00113 this call will block until all machines enter the broadcast function. 00114 00115 \note Behavior is undefined if more than one machine calls broadcast 00116 with originator set to true. 00117 00118 \note Behavior is undefined if multiple threads on the same machine 00119 call broadcast simultaneously. If multiple-thread broadcast is necessary, 00120 each thread should use its own instance of the services class. 00121 */ 00122 template <typename U> 00123 inline void broadcast(U& data, bool originator, bool control = false) { 00124 rmi.broadcast(data, originator, control); 00125 } 00126 00127 /** 00128 * data must be of length data[numprocs]. 00129 * My data is stored in data[dc.procid()]. 00130 * when function returns, machine sendto will have the complete vector 00131 * where data[i] is the data contributed by machine i. 00132 * All machines must have the same parameter for "sendto" 00133 */ 00134 template <typename U> 00135 inline void gather(std::vector<U>& data, procid_t sendto, bool control = false) { 00136 rmi.gather(data, sendto, control); 00137 } 00138 00139 /** 00140 * Each machine creates a vector 'data' with size equivalent to the number of machines. 00141 * Each machine then fills the entry data[procid()] with information that it 00142 * wishes to communicate. 00143 * After calling all_gather(), all machines will return with identical 00144 * vectors 'data', where data[i] contains the information machine i stored. 00145 */ 00146 template <typename U> 00147 inline void all_gather(std::vector<U>& data, bool control = false) { 00148 rmi.all_gather(data, control); 00149 } 00150 00151 00152 /** 00153 * This function is takes a vector of local elements T which must 00154 * be comparable and constructs a vector of length numprocs where 00155 * each element is a subset of the local contribution from that 00156 * machine and the union of all elements in the union of all local 00157 * contributions and all entries are unique: 00158 * 00159 * Usage: Each process reads the files that are stored locally and 00160 * wants to know which subset of local files to read even when 00161 * multiple processes see the same files. 00162 */ 00163 template <typename U> 00164 inline void gather_partition(const std::vector<U>& local_contribution, 00165 std::vector< std::vector<U> >& ret_partition, 00166 bool control = false) { 00167 rmi.gather_partition(local_contribution, ret_partition, control); 00168 } 00169 00170 /** 00171 A regular barrier equivalent to MPI_Barrier. 00172 A thread machine entering this barrier will wait until one thread on each 00173 machines enter this barrier. 00174 00175 \see full_barrier 00176 */ 00177 inline void barrier() { 00178 rmi.barrier(); 00179 } 00180 00181 00182 /** 00183 This barrier ensures globally across all machines that 00184 all calls issued prior to this barrier are completed before 00185 returning. This function could return prematurely if 00186 other threads are still issuing function calls since we 00187 cannot differentiate between calls issued before the barrier 00188 and calls issued while the barrier is being evaluated. 00189 00190 Therefore, when used in a multithreaded scenario, the user must ensure 00191 that all other threads which may perform operations using this object 00192 are stopped before the full barrier is initated. 00193 00194 \see barrier 00195 */ 00196 inline void full_barrier() { 00197 rmi.full_barrier(); 00198 } 00199 00200 00201 00202 }; 00203 00204 00205 } // end of namespace graphlab 00206 00207 00208 #include <graphlab/macros_undef.hpp> 00209 #endif 00210
1.7.1