core.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef GRAPHLAB_CORE_HPP
00019 #define GRAPHLAB_CORE_HPP
00020 
00021 #include <graphlab/engine/iengine.hpp>
00022 #include <graphlab/engine/engine_options.hpp>
00023 #include <graphlab/engine/engine_factory.hpp>
00024 
00025 #include <graphlab/util/command_line_options.hpp>
00026 
00027 #include <graphlab/schedulers/ischeduler.hpp>
00028 #include <graphlab/scope/iscope.hpp>
00029 #include <graphlab/graph/graph.hpp>
00030 
00031 
00032 
00033 #include <graphlab/metrics/metrics.hpp>
00034 #include <graphlab/metrics/reporters/null_reporter.hpp>
00035 #include <graphlab/metrics/reporters/basic_reporter.hpp>
00036 #include <graphlab/metrics/reporters/file_reporter.hpp>
00037 #include <graphlab/metrics/reporters/html_reporter.hpp>
00038 
00039 
00040 
00041 #include <graphlab/macros_def.hpp>
00042 namespace graphlab {
00043 
00044   // Predecleration 
00045   template<typename Graph> struct types;
00046   
00047 
00048 
00049   /**
00050      \brief A GraphLab core is the base (or core) data structure in GraphLab.
00051      
00052      Because many GraphLab programs will consists of a graph and an
00053      engine we have created a single data-structure, called a core,
00054      which manages all the pieces of GraphLab including engine and
00055      scheduler construction parameters.
00056 
00057      The core is templatized over the VertexType and EdgeType however
00058      by using the ref types typedef, one can simply create a core by
00059      doing the following:
00060    
00061      \code
00062      gl::core glcore;
00063      \endcode
00064    
00065 cd     The core contains the 
00066    
00067      \li Data Graph: which represents the structured data dependencies.
00068      \li Engine: The computational structure which contains the
00069      scheduling and execution statistics for the GraphLab program. The
00070      core provides pass-through calls for many engine functions.
00071         
00072      The core also manages the engine and scheduler construction
00073      parameters.
00074    
00075      The core will invisibly recreate the engine each time engine
00076      options are modified. This will mean that this internal behavior of
00077      the core should be pretty much "transparent" for the typical use
00078      case where engine options and scheduler options are defined before
00079      tasks are added to the scheduler.
00080    
00081      Otherwise, modifications to the engine options will result in the
00082      clearing of all scheduler tasks.
00083   */
00084   template <typename VertexType, typename EdgeType>
00085   class core {
00086   public:
00087     typedef graphlab::types<graphlab::graph<VertexType, EdgeType> > types;
00088 
00089   public:
00090     /// default constructor does nothing
00091     core() : 
00092       mengine(NULL),
00093       engine_has_been_modified(false), 
00094       coremetrics("core"), reporter(new null_reporter) { }
00095   private:
00096     //! Core is not copyable
00097     core(const core& other);
00098     //! Core is not copyable
00099     core& operator=(const core& other);
00100 
00101   public:
00102 
00103 
00104     ~core() { 
00105       if (meopts.get_metrics_type() != "none") {        
00106         // Write options to metrics
00107         fill_metrics();
00108         report_metrics();
00109       }
00110       destroy_engine(); 
00111       delete reporter;
00112     } 
00113        
00114     /// \brief Get a modifiable reference to the graph associated with this core
00115     typename types::graph& graph() { return mgraph; }
00116 
00117     /// \brief Get a constant reference to the graph associated with this core
00118     const typename types::graph& graph() const { return mgraph; }
00119 
00120     /**
00121      * \brief Set the type of scheduler.
00122      *
00123      * This will destroy the current engine and any tasks currently
00124      * associated with the scheduler.  See \ref Schedulers for the
00125      * list of supported schedulers.
00126      */
00127     void set_scheduler_type(const std::string& scheduler_type) {
00128       check_engine_modification();
00129       bool success = meopts.set_scheduler_type(scheduler_type);
00130       ASSERT_TRUE(success);
00131       destroy_engine();
00132     }
00133 
00134     /**
00135      * \brief Set the scope consistency model used in this engine.
00136      *
00137      * This will destroy the current engine and any tasks associated
00138      * with the current scheduler.  The available scopes are:
00139      * 
00140      *  \li \b "full" This ensures full data consistency within the scope
00141      *  \li \b "edge" This ensures data consistency with just the
00142      *     vertex and edges
00143      *  \li \b "vertex" This ensures that a vertex cannot be updated
00144      *     by two processors simultaneously
00145      *  \li \b "none" This eliminates all locking 
00146      *
00147      * See \ref Scopes for details
00148      */
00149     void set_scope_type(const std::string& scope_type) {
00150       check_engine_modification();
00151       bool success = meopts.set_scope_type(scope_type);
00152       ASSERT_TRUE(success);
00153       destroy_engine();
00154     }
00155 
00156 
00157     /**
00158      * \brief Set the engine type.
00159      *
00160      * This will destroy the current engine and any tasks associated
00161      * with the current scheduler. 
00162      *
00163      *  \li \b "async" This is the regular multithreaded engine
00164      *  \li \b "async_sim" This is a single threaded engine. But it can be 
00165      *                     be started with multiple "simulated threads".
00166      *                     The simulation is low-fidelity however, and should
00167      *                     be used with caution.
00168      */
00169     void set_engine_type(const std::string& engine_type) {
00170       check_engine_modification();
00171       bool success = meopts.set_engine_type(engine_type);
00172       ASSERT_TRUE(success);
00173       destroy_engine();
00174     }
00175     
00176     /**
00177      * \brief Sets the output format of any recorded metrics
00178      *  \li \b "none" No reporting
00179      *  \li \b "basic" Outputs to screen
00180      *  \li \b "file" Outputs to a text file graphlab_metrics.txt
00181      *  \li \b "html" Outputs to a html file graphlab_metrics.html
00182      */
00183     void set_metrics_type(const std::string& metrics_type) {
00184       bool metrics_set_success = meopts.set_metrics_type(metrics_type);
00185       ASSERT_TRUE(metrics_set_success);
00186       
00187       delete reporter;
00188       if (meopts.get_metrics_type() == "file") {
00189         reporter = new file_reporter("graphlab_metrics.txt");
00190       } else if (meopts.get_metrics_type() == "html") {
00191         reporter = new  html_reporter("graphlab_metrics.html");
00192       } else if (meopts.get_metrics_type() == "basic") {
00193         reporter = new basic_reporter;
00194       } else {
00195         reporter = new null_reporter;
00196       }
00197     }
00198 
00199     /**
00200        \brief Destroys a created engine (if any).
00201     */
00202     void reset() {
00203       engine_has_been_modified = false;
00204       destroy_engine();
00205     }
00206     
00207     /**
00208      * \brief Set the number of cpus that the engine will use.
00209      *
00210      * This will destroy the current engine and any tasks associated
00211      * with the current scheduler. 
00212      *
00213      */
00214     void set_ncpus(size_t ncpus) {
00215       check_engine_modification();
00216       meopts.set_ncpus(ncpus);
00217       destroy_engine();
00218     }
00219 
00220 
00221     /**
00222      * Get a reference to the active engine.  If no engine exists one is
00223      * created.
00224      */
00225     typename types::iengine& engine() { 
00226       bool engine_build_success = auto_build_engine();
00227       ASSERT_TRUE(engine_build_success);
00228       return *mengine; 
00229     }
00230 
00231 
00232 
00233 
00234     /**
00235      * \brief Destroys and reconstructs the current engine,
00236      * reprocessing the engine arguments.  
00237      */
00238     bool rebuild_engine() {
00239       destroy_engine();
00240       ASSERT_EQ(mengine, NULL);
00241       return auto_build_engine();
00242     }
00243 
00244     /**
00245      * \brief Set the engine options by passing in an engine options object.
00246      */
00247     void set_engine_options(const engine_options& opts) {
00248       check_engine_modification();
00249       meopts = opts;
00250       
00251       delete reporter;
00252       if (meopts.get_metrics_type() == "file") {
00253         reporter = new file_reporter("graphlab_metrics.txt");
00254       } else if (meopts.get_metrics_type() == "html") {
00255         reporter = new  html_reporter("graphlab_metrics.html");
00256       } else if (meopts.get_metrics_type() == "basic") {
00257         reporter = new basic_reporter;
00258       } else {
00259         reporter = new null_reporter;
00260       }
00261     }
00262 
00263     imetrics_reporter& get_reporter() {
00264       return *reporter;
00265     }
00266 
00267     /**
00268      * \brief Returns the engine options
00269      */
00270     const engine_options& get_engine_options() const { 
00271       return meopts;
00272     }
00273 
00274     /**
00275      * \brief Returns a modifiable reference to the scheduler options
00276      */
00277     scheduler_options& sched_options() {
00278       return meopts.get_scheduler_options();
00279     }
00280 
00281     /**
00282      * \brief Returns a constant reference to the scheduler options
00283      */
00284     const scheduler_options& sched_options() const{
00285       return meopts.get_scheduler_options();
00286     }
00287 
00288 
00289     /**
00290      * \brief Set the engine options by simply parsing the command line
00291      * arguments. 
00292      */
00293     bool parse_engine_options(int argc, char **argv) {
00294       check_engine_modification();
00295       command_line_options clopts;
00296       bool success = clopts.parse(argc, argv);
00297       ASSERT_TRUE(success);
00298       return set_engine_options(clopts);
00299     }
00300 
00301 
00302     /**
00303      * \brief Run the engine until a termination condition is reached or
00304      * there are no more tasks remaining to execute.
00305      */
00306     double start() {
00307       bool success = auto_build_engine();
00308       ASSERT_TRUE(success);
00309       ASSERT_NE(mengine, NULL);
00310       // merge in options from command line and other manually set options
00311       mengine->set_scheduler_options( meopts.get_scheduler_options() );
00312       graphlab::timer ti;
00313       ti.start();
00314       mengine->start();
00315       return ti.current_time();
00316     }
00317   
00318 
00319     /**
00320      * \brief Add a single update function to a single vertex.
00321      */
00322     void add_task(vertex_id_t vertex,
00323                   typename types::update_function func,
00324                   double priority) {
00325       engine_has_been_modified = true;
00326       typename types::update_task task(vertex, func);
00327       add_task(task, priority);
00328     }
00329 
00330 
00331     /**
00332      * \brief Add a single task with a fixed priority.
00333      */
00334     void add_task(typename types::update_task task, double priority) {
00335       engine_has_been_modified = true;
00336       engine().add_task(task, priority);
00337     }
00338 
00339     /**
00340      * \brief Add the update function to all the veritces in the provided
00341      * vector with the given priority.
00342      */
00343     void add_tasks(const std::vector<vertex_id_t>& vertices, 
00344                    typename types::update_function func, double priority) {
00345       engine_has_been_modified = true;
00346       engine().add_tasks(vertices, func, priority);
00347     }
00348 
00349 
00350     /**
00351      * \brief Add the given function to all vertices using the given priority
00352      */
00353     void add_task_to_all(typename types::update_function func, 
00354                          double priority) {
00355       engine_has_been_modified = true;
00356       engine().add_task_to_all(func, priority);
00357     }
00358     
00359     /**
00360      * \brief Get the number of updates executed by the engine
00361      */
00362     size_t last_update_count() {
00363       if(mengine == NULL) return 0;
00364       else return mengine->last_update_count();
00365     }
00366     
00367     void fill_metrics() {
00368       coremetrics.set("ncpus", meopts.get_ncpus());
00369       coremetrics.set("engine", meopts.get_engine_type());
00370       coremetrics.set("scope", meopts.get_scope_type());
00371       coremetrics.set("scheduler", meopts.get_scheduler_type());
00372       coremetrics.set("affinities", meopts.get_cpu_affinities() ? "true" : "false");
00373       coremetrics.set("schedyield", meopts.get_sched_yield() ? "true" : "false");
00374       coremetrics.set("compile_flags", meopts.get_compile_flags());
00375     }
00376 
00377     void reset_metrics() {
00378       coremetrics.clear();
00379       engine().reset_metrics();
00380     }
00381       
00382     /**
00383        \brief Outputs the recorded metrics
00384     */
00385     void report_metrics() {
00386       coremetrics.report(get_reporter());
00387       engine().report_metrics(get_reporter());
00388     }
00389     
00390     /**
00391      * \brief Registers a sync with the engine.
00392      *
00393      * Registers a sync with the engine.
00394      * The sync will be performed approximately every "interval" updates,
00395      * and will perform a reduction over all vertices from rangelow
00396      * to rangehigh inclusive.
00397      * The merge function may be NULL, in which it will not be used.
00398      * However, it is highly recommended to provide a merge function since
00399      * this allow the sync operation to be parallelized.
00400      *
00401      * The sync operation is guaranteed to be strictly sequentially consistent
00402      * with all other execution.
00403      *
00404      * \param shared The shared variable to synchronize
00405      * \param sync The reduction function
00406      * \param apply The final apply function which writes to the shared value
00407      * \param zero The initial zero value passed to the reduction
00408      * \param sync_interval Frequency at which the sync is initiated.
00409      *                      Corresponds approximately to the number of
00410      *                     update function calls before the sync is reevaluated.
00411      *                     If 0, the sync will only be evaluated once
00412      *                     at engine start,  and will never be evaluated again.
00413      *                     Defaults to 0.
00414      * \param merge Combined intermediate reduction value. defaults to NULL.
00415      *              in which case, it will not be used.
00416      * \param rangelow he lower range of vertex id to start syncing.
00417      *                 The range is inclusive. i.e. vertex with id 'rangelow'
00418      *                 and vertex with id 'rangehigh' will be included.
00419      *                 Defaults to 0.
00420      * \param rangehigh The upper range of vertex id to stop syncing.
00421      *                  The range is inclusive. i.e. vertex with id 'rangelow'
00422      *                  and vertex with id 'rangehigh' will be included.
00423      *                  Defaults to infinity.
00424      */
00425     void set_sync(glshared_base& shared,
00426                   typename types::iengine::sync_function_type sync,
00427                   glshared_base::apply_function_type apply,
00428                   const any& zero,
00429                   size_t sync_interval = 0,
00430                   typename types::iengine::merge_function_type merge = NULL,
00431                   vertex_id_t rangelow = 0,
00432                   vertex_id_t rangehigh = -1) { 
00433       engine_has_been_modified = true;
00434       engine().set_sync(shared, sync, apply, zero, 
00435                         sync_interval, merge, rangelow, rangehigh);
00436       
00437     }
00438     
00439 
00440     /**
00441      * Performs a sync immediately. This function requires that the shared
00442      * variable already be registered with the engine.
00443      */
00444     void sync_now(glshared_base& shared) { 
00445       engine().sync_now(shared);
00446     };
00447   private:
00448 
00449     /**
00450      * Build the engine if it has not already been built.
00451      */
00452     bool auto_build_engine() {
00453       if(mengine == NULL) {
00454         // create the engine
00455         mengine = engine_factory::new_engine(meopts, mgraph);
00456         if(mengine == NULL) return false;
00457         mengine->set_engine_options(meopts.get_engine_options());
00458       }
00459       // scheduler options is one parameter that is allowed
00460       // to change without rebuilding the engine
00461       return true;
00462     }
00463 
00464     /**
00465      * Destroy the engine if one exists.
00466      */
00467     void destroy_engine() {
00468       if(mengine != NULL) {
00469         delete mengine;
00470         mengine = NULL;
00471       }
00472     }
00473 
00474 
00475 
00476     /** Save the core to a file */
00477     void save(const std::string& filename) const {
00478       std::ofstream fout(filename.c_str());
00479       ASSERT_TRUE(fout.good());
00480       oarchive oarc(fout);
00481       oarc << *this;
00482       fout.close();
00483     } // end of save
00484     
00485     /** Save the core to an archive */
00486     void save(oarchive& arc) const {
00487       arc << mgraph
00488           << meopts;
00489     } // end of save
00490 
00491 
00492     /** Load the core from a file. */
00493     void load(const std::string& filename) {
00494       std::ifstream fin(filename.c_str());
00495       ASSERT_TRUE(fin.good());
00496       iarchive iarc(fin);
00497       iarc >> *this;
00498       fin.close();
00499     } // end of load
00500 
00501 
00502     /** Load the core from an archive. */
00503     void load(iarchive& arc) {
00504       arc >> mgraph
00505           >> meopts;
00506     } // end of load
00507 
00508 
00509     void check_engine_modification() {
00510       ASSERT_MSG(engine_has_been_modified == false, 
00511                  "Modifications to the engine/scheduler parameters are not"
00512                  "allowed once tasks have been inserted into the engine.");
00513     }
00514     
00515     // graph and data objects
00516     typename types::graph mgraph;
00517     engine_options meopts;
00518     typename types::iengine *mengine;
00519     /** For error tracking. Once engine has been modified, any scheduler/
00520      * engine parameter modifications will reset the modifications
00521      */
00522     bool engine_has_been_modified;
00523     metrics coremetrics;
00524 
00525     imetrics_reporter* reporter;
00526   };
00527 
00528 }
00529 #include <graphlab/macros_undef.hpp>
00530 #endif
00531