distributed_core.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef GRAPHLAB_DISTRIBUTED_CORE_HPP
00019 #define GRAPHLAB_DISTRIBUTED_CORE_HPP
00020 
00021 #include <graphlab/engine/iengine.hpp>
00022 #include <graphlab/engine/engine_options.hpp>
00023 #include <graphlab/distributed2/distributed2_includes.hpp>
00024 #include <graphlab/util/command_line_options.hpp>
00025 #include <graphlab/util/mpi_tools.hpp>
00026 #include <graphlab/rpc/dc.hpp>
00027 #include <graphlab/rpc/dc_init_from_mpi.hpp>
00028 #include <graphlab/rpc/dc_init_from_env.hpp>
00029 
00030 #include <graphlab/schedulers/ischeduler.hpp>
00031 #include <graphlab/scope/iscope.hpp>
00032 #include <graphlab/graph/graph.hpp>
00033 
00034 
00035 
00036 #include <graphlab/metrics/metrics.hpp>
00037 #include <graphlab/metrics/reporters/null_reporter.hpp>
00038 #include <graphlab/metrics/reporters/basic_reporter.hpp>
00039 #include <graphlab/metrics/reporters/file_reporter.hpp>
00040 #include <graphlab/metrics/reporters/html_reporter.hpp>
00041 
00042 
00043 
00044 #include <graphlab/macros_def.hpp>
00045 namespace graphlab {
00046 
00047   // Predecleration 
00048   template<typename Graph> struct distributed_types;
00049   
00050 
00051 
00052   /**
00053      \brief A GraphLab core is the base (or core) data structure in GraphLab.
00054      
00055      This is like \ref graphlab::core but for the distributed setting.
00056 
00057      The core is templatized over the VertexType and EdgeType however
00058      by using the ref types typedef, one can simply create a core by
00059      doing the following:
00060    
00061      \code
00062      gl::distributed_core glcore;
00063      \endcode
00064    
00065      The core contains the 
00066    
00067      \li Data Graph: which represents the structured data dependencies.
00068      \li Engine: The computational structure which contains the
00069      scheduling and execution statistics for the GraphLab program. The
00070      core provides pass-through calls for many engine functions.
00071         
00072      The core also manages the engine and scheduler construction
00073      parameters.
00074      
00075     The distributed core is more limited as compared to the 
00076     shared memory \ref graphlab::core version. In particular, engine construction
00077     must be executed manually through build_engine() and the
00078     engine options / scheduler options cannot be modified after engine construction.
00079     
00080     Also, some functions must be called by all machines simultaneously, 
00081     while others are "parallel" allowing any machine to call the function 
00082     seperately. This behavior is documented in each function. The user must
00083     take care to obey this requirement or it may result in unexpected behavior.
00084   */
00085   template <typename VertexType, typename EdgeType>
00086   class distributed_core {
00087   public:
00088     typedef graphlab::distributed_types<graphlab::distributed_graph<VertexType, EdgeType> > distributed_types;
00089 
00090   public:
00091     /** default constructor. Graph is constructed using the atom index.
00092      * All machines must construct simultaneously.
00093     */
00094     distributed_core(distributed_control &dc, std::string atomindex) :
00095       dc(dc),
00096       mgraph(dc, atomindex),
00097       mengine(NULL),
00098       coremetrics("distributed_core"), reporter(new null_reporter) { }
00099   private:
00100     //! Core is not copyable
00101     distributed_core(const distributed_core& other);
00102     //! Core is not copyable
00103     distributed_core& operator=(const distributed_core& other);
00104 
00105   public:
00106 
00107     /**
00108      * Destructor. 
00109      * All machines must call simultaneously.
00110      */
00111     ~distributed_core() { 
00112       if (meopts.get_metrics_type() != "none") {        
00113         // Write options to metrics
00114         fill_metrics();
00115         report_metrics();
00116       }
00117       delete mengine;
00118       delete reporter;
00119     } 
00120        
00121     /** Get a modifiable reference to the graph associated with this core
00122      * This function is parallel.
00123      */
00124     typename distributed_types::distributed_graph& graph() { return mgraph; }
00125 
00126     /** Get a constant reference to the graph associated with this core
00127      * This function is parallel.
00128      */
00129     const typename distributed_types::distributed_graph& graph() const { return mgraph; }
00130 
00131     /**
00132      * \brief Set the type of scheduler.
00133      * The engine must not be constructed yet.
00134      * All machines must call simultaneously.
00135      */
00136     void set_scheduler_type(const std::string& scheduler_type) {
00137       ASSERT_EQ(mengine, NULL);
00138       bool success = meopts.set_scheduler_type(scheduler_type);
00139       ASSERT_TRUE(success);
00140     }
00141 
00142     /**
00143      * \brief Set the scope consistency model used in this engine.
00144      *
00145      * The engine must not be constructed yet. 
00146      * All machines must call simultaneously.
00147      * The available scopes are:
00148      * 
00149      *  \li \b "full" This ensures full data consistency within the scope
00150      *  \li \b "edge" This ensures data consistency with just the
00151      *     vertex and edges
00152      *  \li \b "vertex" This ensures that a vertex cannot be updated
00153      *     by two processors simultaneously
00154      *
00155      * See \ref Scopes for details
00156      */
00157     void set_scope_type(const std::string& scope_type) {
00158       ASSERT_EQ(mengine, NULL);
00159       bool success = meopts.set_scope_type(scope_type);
00160       ASSERT_TRUE(success);
00161     }
00162 
00163 
00164     /**
00165      * \brief Set the engine type.
00166      *
00167      * The engine must not be constructed yet. 
00168      * All machines must call simultaneously.
00169      *
00170      *  \li \b "dist_locking" Distributed engine with consistency ensured 
00171      *                        through locking
00172      *  \li \b "dist_chromatic" Distributed engien with consistency ensured
00173      *                          through coloring
00174      */
00175     void set_engine_type(const std::string& engine_type) {
00176       ASSERT_EQ(mengine, NULL);
00177       bool success = meopts.set_engine_type(engine_type);
00178       ASSERT_TRUE(success);
00179     }
00180     
00181     /**
00182      * \brief Sets the output format of any recorded metrics
00183      *  This function is parallel.
00184      * 
00185      *  \li \b "none" No reporting
00186      *  \li \b "basic" Outputs to screen
00187      *  \li \b "file" Outputs to a text file graphlab_metrics.txt
00188      *  \li \b "html" Outputs to a html file graphlab_metrics.html
00189      */
00190     void set_metrics_type(const std::string& metrics_type) {
00191       bool metrics_set_success = meopts.set_metrics_type(metrics_type);
00192       ASSERT_TRUE(metrics_set_success);
00193       
00194       delete reporter;
00195       if (meopts.get_metrics_type() == "file") {
00196         reporter = new file_reporter("graphlab_metrics.txt");
00197       } else if (meopts.get_metrics_type() == "html") {
00198         reporter = new  html_reporter("graphlab_metrics.html");
00199       } else if (meopts.get_metrics_type() == "basic") {
00200         reporter = new basic_reporter;
00201       } else {
00202         reporter = new null_reporter;
00203       }
00204     }
00205 
00206     
00207     /**
00208      * \brief Set the number of cpus that the engine will use.
00209      *
00210      * The engine must not be constructed yet. 
00211      * All machines must call simultaneously.
00212      *
00213      */
00214     void set_ncpus(size_t ncpus) {
00215       ASSERT_EQ(mengine, NULL);
00216       meopts.set_ncpus(ncpus);
00217     }
00218 
00219 
00220     /**
00221      * Get a reference to the active engine.  
00222      * build_engine() must be called prior to this.
00223      * This function is parallel.
00224      */
00225     typename distributed_types::iengine& engine() {
00226       ASSERT_NE(mengine, NULL);
00227       return *mengine; 
00228     }
00229 
00230 
00231 
00232 
00233     /**
00234      * \brief Constructs the engine using the current defined options
00235      * Once an engine is constructed, options cannot be modified
00236      * All machines must call simultaneously.
00237      */
00238     bool build_engine() {
00239       ASSERT_EQ(mengine, NULL);
00240       // create the engine
00241       mengine = distributed_engine_factory::new_engine(dc, meopts, mgraph);
00242       if(mengine == NULL) return false;
00243       mengine->set_engine_options(meopts.get_engine_options());
00244       return true;
00245     }
00246 
00247     /**
00248      * \brief Set the engine options by passing in an engine options object.
00249      * The engine must not be constructed yet. 
00250      * All machines must call simultaneously.
00251      */
00252     void set_engine_options(const engine_options& opts) {
00253       ASSERT_EQ(mengine, NULL);
00254       meopts = opts;
00255       
00256       delete reporter;
00257       if (meopts.get_metrics_type() == "file") {
00258         reporter = new file_reporter("graphlab_metrics.txt");
00259       } else if (meopts.get_metrics_type() == "html") {
00260         reporter = new  html_reporter("graphlab_metrics.html");
00261       } else if (meopts.get_metrics_type() == "basic") {
00262         reporter = new basic_reporter;
00263       } else {
00264         reporter = new null_reporter;
00265       }
00266     }
00267 
00268     /**
00269      * \brief Gets the reporter
00270      * This function is parallel.
00271      */
00272     imetrics_reporter& get_reporter() {
00273       return *reporter;
00274     }
00275 
00276     /**
00277      * \brief Returns the engine options
00278      * This function is parallel
00279      */
00280     const engine_options& get_engine_options() const { 
00281       return meopts;
00282     }
00283 
00284     /**
00285      * \brief Returns a modifiable reference to the scheduler options
00286      * 
00287      * This function is parallel <b> but> any modifications to the options must be 
00288      * made the same way across all machines.
00289      */
00290     scheduler_options& sched_options() {
00291       return meopts.get_scheduler_options();
00292     }
00293 
00294     /**
00295      * \brief Returns a constant reference to the scheduler options
00296      * This function is parallel
00297      */
00298     const scheduler_options& sched_options() const{
00299       return meopts.get_scheduler_options();
00300     }
00301 
00302 
00303     /**
00304      * \brief Set the engine options by simply parsing the command line
00305      * arguments. 
00306      * The engine must not be constructed yet. 
00307      * All machines must call simultaneously.
00308      */
00309     bool parse_engine_options(int argc, char **argv) {
00310       ASSERT_EQ(mengine, NULL);
00311       command_line_options clopts;
00312       bool success = clopts.parse(argc, argv);
00313       ASSERT_TRUE(success);
00314       return set_engine_options(clopts);
00315     }
00316 
00317 
00318     /**
00319      * \brief Run the engine until a termination condition is reached or
00320      * there are no more tasks remaining to execute. This function
00321      * will call build_engine() internally if the engine has not yet been
00322      * constructed.
00323      * All machines must call simultaneously.
00324      */
00325     double start() {
00326       if (mengine == NULL) {
00327         bool success = build_engine();
00328         ASSERT_TRUE(success);
00329         ASSERT_NE(mengine, NULL);
00330       }
00331       // merge in options from command line and other manually set options
00332       mengine->set_scheduler_options( meopts.get_scheduler_options() );
00333       graphlab::timer ti;
00334       ti.start();
00335       mengine->start();
00336       return ti.current_time();
00337     }
00338   
00339 
00340     /**
00341      * \brief Add a single update function to a single vertex.
00342      * This function is parallel. Engine must have been constructed
00343      * using build_engine() prior to calling this function.
00344      */
00345     void add_task(vertex_id_t vertex,
00346                   typename distributed_types::update_function func,
00347                   double priority) {
00348       typename distributed_types::update_task task(vertex, func);
00349       add_task(task, priority);
00350     }
00351 
00352 
00353     /**
00354      * \brief Add a single task with a fixed priority.
00355      * This function is parallel. Engine must have been constructed
00356      * using build_engine() prior to calling this function.
00357      */
00358     void add_task(typename distributed_types::update_task task, double priority) {
00359       engine().add_task(task, priority);
00360     }
00361 
00362     /**
00363      * \brief Add the update function to all the veritces in the provided
00364      * vector with the given priority.
00365      * This function is parallel. Engine must have been constructed
00366      * using build_engine() prior to calling this function.
00367      */
00368     void add_tasks(const std::vector<vertex_id_t>& vertices, 
00369                    typename distributed_types::update_function func, double priority) {
00370       engine().add_tasks(vertices, func, priority);
00371     }
00372 
00373 
00374     /**
00375      * \brief Add the given function to all vertices using the given priority
00376      * This function is parallel. Engine must have been constructed
00377      * using build_engine() prior to calling this function.
00378      */
00379     void add_task_to_all(typename distributed_types::update_function func, 
00380                          double priority) {
00381       engine().add_task_to_all(func, priority);
00382     }
00383     
00384     /**
00385      * \brief Get the number of updates executed by the engine
00386      * This function is parallel. Engine must have been constructed
00387      * using build_engine() prior to calling this function.
00388      */
00389     size_t last_update_count() {
00390       ASSERT_NE(mengine, NULL);
00391       return mengine->last_update_count();
00392     }
00393     
00394     /**
00395      * \brief Fills the metrics with the engine options.
00396      * This function is parallel.
00397      */
00398     void fill_metrics() {
00399       coremetrics.set("ncpus", meopts.get_ncpus());
00400       coremetrics.set("engine", meopts.get_engine_type());
00401       coremetrics.set("scope", meopts.get_scope_type());
00402       coremetrics.set("scheduler", meopts.get_scheduler_type());
00403       coremetrics.set("affinities", meopts.get_cpu_affinities() ? "true" : "false");
00404       coremetrics.set("schedyield", meopts.get_sched_yield() ? "true" : "false");
00405       coremetrics.set("compile_flags", meopts.get_compile_flags());
00406     }
00407 
00408     /**
00409      * \brief Clears all recorded metrics. This function is parallel.
00410      */  
00411     void reset_metrics() {
00412       coremetrics.clear();
00413       if (mengine) engine().reset_metrics();
00414     }
00415       
00416     /**
00417        \brief Outputs the recorded metrics. This function is parallel.
00418     */
00419     void report_metrics() {
00420       coremetrics.report(get_reporter());
00421       engine().report_metrics(get_reporter());
00422     }
00423     
00424     /**
00425      * \brief Registers a sync with the engine.
00426      *
00427      * Registers a sync with the engine. All machines must call simultaneously.
00428      * 
00429      * The sync will be performed approximately every "interval" updates,
00430      * and will perform a reduction over all vertices from rangelow
00431      * to rangehigh inclusive.
00432      * The merge function may be NULL, in which it will not be used.
00433      * However, it is highly recommended to provide a merge function since
00434      * this allow the sync operation to be parallelized.
00435      *
00436      * The sync operation is guaranteed to be strictly sequentially consistent
00437      * with all other execution.
00438      *
00439      * \param shared The shared variable to synchronize
00440      * \param sync The reduction function
00441      * \param apply The final apply function which writes to the shared value
00442      * \param zero The initial zero value passed to the reduction
00443      * \param sync_interval Frequency at which the sync is initiated.
00444      *                      Corresponds approximately to the number of
00445      *                     update function calls before the sync is reevaluated.
00446      *                     If 0, the sync will only be evaluated once
00447      *                     at engine start,  and will never be evaluated again.
00448      * \param merge Combined intermediate reduction value. Required.
00449      * \param rangelow he lower range of vertex id to start syncing.
00450      *                 The range is inclusive. i.e. vertex with id 'rangelow'
00451      *                 and vertex with id 'rangehigh' will be included.
00452      *                 Defaults to 0.
00453      * \param rangehigh The upper range of vertex id to stop syncing.
00454      *                  The range is inclusive. i.e. vertex with id 'rangelow'
00455      *                  and vertex with id 'rangehigh' will be included.
00456      *                  Defaults to infinity.
00457      */
00458     void set_sync(distributed_glshared_base& shared,
00459                   typename distributed_types::iengine::sync_function_type sync,
00460                   glshared_base::apply_function_type apply,
00461                   const any& zero,
00462                   size_t sync_interval ,
00463                   typename distributed_types::iengine::merge_function_type merge ,
00464                   vertex_id_t rangelow = 0,
00465                   vertex_id_t rangehigh = -1) { 
00466       engine().set_sync(shared, sync, apply, zero, 
00467                         sync_interval, merge, rangelow, rangehigh);
00468       
00469     }
00470     
00471 
00472     /**
00473      * Performs a sync immediately. This function requires that the shared
00474      * variable already be registered with the engine.
00475      * Not implemented.
00476      */
00477     void sync_now(glshared_base& shared) ;
00478   private:
00479 
00480 
00481     distributed_control& dc;
00482     // graph and data objects
00483     typename distributed_types::distributed_graph mgraph;
00484     engine_options meopts;
00485     typename distributed_types::iengine *mengine;
00486     
00487     metrics coremetrics;
00488     imetrics_reporter* reporter;
00489   };
00490 
00491 }
00492 #include <graphlab/macros_undef.hpp>
00493 #endif
00494