distributed_chromatic_engine.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef DISTRIBUTED_CHROMATIC_ENGINE_HPP
00019 #define DISTRIBUTED_CHROMATIC_ENGINE_HPP
00020 
00021 #include <functional>
00022 #include <algorithm>
00023 #include <ext/functional> // for select1st
00024 #include <boost/bind.hpp>
00025 #include <graphlab/parallel/pthread_tools.hpp>
00026 #include <graphlab/parallel/atomic.hpp>
00027 #include <graphlab/util/timer.hpp>
00028 #include <graphlab/util/random.hpp>
00029 #include <graphlab/util/dense_bitset.hpp>
00030 #include <graphlab/util/mutable_queue.hpp>
00031 
00032 #include <graphlab/engine/iengine.hpp>
00033 #include <graphlab/scope/iscope.hpp>
00034 #include <graphlab/tasks/update_task.hpp>
00035 #include <graphlab/logger/logger.hpp>
00036 #include <graphlab/metrics/metrics.hpp>
00037 #include <graphlab/schedulers/support/redirect_scheduler_callback.hpp>
00038 
00039 #include <graphlab/rpc/dc.hpp>
00040 #include <graphlab/distributed2/distributed_glshared_manager.hpp>
00041 #include <graphlab/distributed2/graph/dgraph_scope.hpp>
00042 
00043 #include <graphlab/macros_def.hpp>
00044 
00045 namespace graphlab {
00046 
00047 
00048 
00049 /**
00050 All processes must receive the same options at the same time.
00051 i.e. if set_cpu_affinities is called, all processes mus call it at the same time.
00052 This is true for all set_* functions.
00053 */
00054 template <typename Graph>
00055 class distributed_chromatic_engine : public iengine<Graph> {
00056  public:
00057   typedef iengine<Graph> iengine_base;
00058   typedef typename iengine_base::update_task_type update_task_type;
00059   typedef typename iengine_base::update_function_type update_function_type;
00060   typedef typename iengine_base::termination_function_type termination_function_type;
00061   typedef typename iengine_base::iscope_type iscope_type;
00062   
00063   typedef typename iengine_base::sync_function_type sync_function_type;
00064   typedef typename iengine_base::merge_function_type merge_function_type;
00065 
00066   // unused
00067   typedef imonitor<Graph> imonitor_type;
00068 
00069   typedef redirect_scheduler_callback<Graph, 
00070                                       distributed_chromatic_engine<Graph> > callback_type;
00071   typedef icallback<Graph> icallback_type;
00072 
00073  private:
00074   // the local rmi instance
00075   dc_dist_object<distributed_chromatic_engine<Graph> > rmi;
00076   
00077   // the graph we are processing
00078   Graph &graph;
00079   
00080   callback_type callback;
00081   
00082   // The manager will automatically attach to all the glshared variables
00083   distributed_glshared_manager glshared_manager; 
00084   
00085   /** Number of cpus to use */
00086   size_t ncpus; 
00087 
00088   /** Use processor affinities */
00089   bool use_cpu_affinity;
00090 
00091   /** Use schedule yielding when waiting on the scheduler*/
00092   bool use_sched_yield;
00093  
00094   
00095   /** Track the number of updates */
00096   std::vector<size_t> update_counts;
00097 
00098   atomic<size_t> numsyncs;
00099 
00100   /** terminators */ 
00101   std::vector<termination_function_type> term_functions;
00102 
00103   /** The timeout time in millis */
00104   size_t timeout_millis;
00105   timer ti;
00106   
00107   /// Used to identify when the engine is stopped through stop
00108   bool force_stop;
00109   
00110   /** The total number of tasks that should be executed */
00111   size_t task_budget;
00112   
00113   size_t randomize_schedule;
00114   
00115   /** If dynamic scheduling is used, the number of scheduled tasks */
00116   atomic<size_t> num_pending_tasks;
00117   
00118   
00119   /** The cause of the last termination condition */
00120   exec_status termination_reason;
00121 
00122   scope_range::scope_range_enum default_scope_range;
00123  
00124   std::vector<std::vector<vertex_id_t> > color_block; // set of localvids in each color
00125   dense_bitset scheduled_vertices;  // take advantage that local vertices
00126                                     // are always the first N
00127   
00128   update_function_type update_function;
00129   size_t max_iterations;
00130   double barrier_time;
00131   size_t num_dist_barriers_called;
00132   
00133   // other optimizations
00134   bool const_nbr_vertices, const_edges;
00135   struct sync_task {
00136     sync_function_type sync_fun;
00137     merge_function_type merge_fun;
00138     distributed_glshared_base::apply_function_type apply_fun;
00139     size_t sync_interval;
00140     size_t next_time;
00141     any zero;
00142     vertex_id_t rangelow;
00143     vertex_id_t rangehigh;
00144     distributed_glshared_base *sharedvariable;
00145     any mergeval;
00146     std::vector<any> thread_intermediate;
00147     sync_task() :
00148       sync_fun(NULL), merge_fun(NULL), apply_fun(NULL),
00149       sync_interval(-1),
00150       next_time(0), rangelow(0), 
00151       rangehigh(vertex_id_t(-1)), sharedvariable(NULL) { }
00152   };
00153   
00154   /// A list of all registered sync tasks
00155   std::vector<sync_task> sync_tasks;
00156   
00157   /// The list of tasks which are currently being evaluated
00158   std::vector<sync_task*> active_sync_tasks;
00159   
00160 
00161   metrics engine_metrics;  
00162 
00163  public:
00164   distributed_chromatic_engine(distributed_control &dc,
00165                                     Graph& graph,
00166                                     size_t ncpus = 1):
00167                             rmi(dc, this),
00168                             graph(graph),
00169                             callback(this),
00170                             glshared_manager(dc),
00171                             ncpus( std::max(ncpus, size_t(1)) ),
00172                             use_cpu_affinity(false),
00173                             use_sched_yield(true),
00174                             update_counts(std::max(ncpus, size_t(1)), 0),
00175                             timeout_millis(0),
00176                             force_stop(false),
00177                             task_budget(0),
00178                             randomize_schedule(0),
00179                             termination_reason(EXEC_UNSET),
00180                             scheduled_vertices(graph.owned_vertices().size()),
00181                             update_function(NULL),
00182                             max_iterations(0),
00183                             barrier_time(0.0),
00184                             const_nbr_vertices(true),
00185                             const_edges(false),
00186                             engine_metrics("engine"),
00187                             thread_color_barrier(ncpus){ 
00188     rmi.barrier();
00189   }
00190   
00191   ~distributed_chromatic_engine() {
00192     rmi.barrier();
00193   }
00194   
00195   
00196   //! Get the number of cpus
00197   size_t get_ncpus() const { return ncpus; }
00198 
00199   //! set sched yield
00200   void set_sched_yield(bool value) {
00201     use_sched_yield = value;
00202     rmi.barrier();
00203   }
00204 
00205   void set_cpu_affinities(bool value) {
00206     use_cpu_affinity = value;
00207     rmi.barrier();
00208   }
00209 
00210 
00211   /**
00212    * Set the default scope range.  The scope ranges are defined in
00213    * iscope.hpp
00214    */
00215   void set_default_scope(scope_range::scope_range_enum default_scope_range_) {
00216     default_scope_range = default_scope_range_;
00217     rmi.barrier();
00218   }
00219   
00220   using iengine<Graph>::exec_status_as_string;   
00221   
00222 
00223   /**
00224    * Return the reason why the engine last terminated
00225    */
00226   exec_status last_exec_status() const {
00227     return termination_reason;
00228   }
00229 
00230   /**
00231    * This function computes the last update count by adding all the
00232    * update counts of the individual threads.  This is an underestimate
00233    * if the engine is currently running.
00234    */
00235   size_t thisproc_update_counts() const {
00236     size_t sum = 0;
00237     for(size_t i = 0; i < update_counts.size(); ++i)
00238       sum += update_counts[i];
00239     return sum;
00240   } 
00241 
00242 
00243   /**
00244    * Returns the total number of updates executed
00245    */
00246   size_t last_update_count() const {
00247     return total_update_count;
00248   } // end of last_update_count
00249 
00250 
00251   /**
00252    * Add a terminator to the engine.
00253    * Must be called by all machines simultaneously.
00254    */
00255   void add_terminator(termination_function_type term) {
00256     term_functions.push_back(term);
00257     rmi.barrier();
00258   }
00259 
00260 
00261   /**
00262    * Clear all terminators from the engine
00263    * Must be called by all machines simultaneously.
00264    */
00265    void clear_terminators() {
00266     term_functions.clear();
00267     rmi.barrier();
00268   }
00269 
00270 
00271 
00272   /**
00273    * Set a timeout. Disabled if set to 0.
00274    * Must be called by all machines simultaneously.
00275    */
00276   void set_timeout(size_t timeout_seconds = 0) {
00277     timeout_millis = timeout_seconds * 1000;
00278     rmi.barrier();
00279   }
00280   
00281   /**
00282    * Sets a Task budget - max number of tasks to allow.
00283    * Disabled if set to 0.
00284    * Must be called by all machines simultaneously.
00285    */
00286   void set_task_budget(size_t max_tasks) {
00287     task_budget = max_tasks;
00288     rmi.barrier();
00289   }
00290   
00291 
00292   /**
00293    * \brief Adds an update task with a particular priority.
00294    * add_task on any vertex can be called by any machine.
00295    * The call is asynchronous and may not be completed until
00296    * a full_barrier is issued.
00297    */
00298   void add_task(update_task_type task, double priority) {
00299     if (update_function != NULL) assert(update_function == task.function());
00300     else update_function = task.function();
00301     
00302     if (graph.is_owned(task.vertex())) {
00303       num_pending_tasks.inc(!
00304               scheduled_vertices.set_bit(graph.globalvid_to_localvid(task.vertex())) 
00305                           );
00306       //std::cout << "add task to " << task.vertex() << std::endl;
00307     }
00308     else {
00309       rmi.remote_call(graph.globalvid_to_owner(task.vertex()),
00310                       &distributed_chromatic_engine<Graph>::add_task,
00311                       task,
00312                       priority);
00313     }
00314   }
00315 
00316   /**
00317    * \brief Creates a collection of tasks on all the vertices in
00318    * 'vertices', and all with the same update function and priority
00319    * This function is forwarded to the scheduler.
00320    */
00321   void add_tasks(const std::vector<vertex_id_t>& vertices,
00322                          update_function_type func, double priority) {
00323     // not the most efficient way to do it...
00324     for (size_t i = 0;i < vertices.size(); ++i) {
00325       add_task(update_task_type(vertices[i], func), priority);
00326     }
00327   }
00328 
00329   /// \internal
00330   void add_task_to_all_from_remote(size_t func,
00331                                   double priority) {
00332     add_task_to_all_impl(reinterpret_cast<update_function_type>(func), priority);
00333   }
00334   
00335   /// \internal
00336   void add_task_to_all_impl(update_function_type func,
00337                             double priority) {
00338     if (update_function != NULL) assert(update_function == func);
00339     else update_function = func;
00340     
00341     scheduled_vertices.fill();
00342     num_pending_tasks.value = graph.owned_vertices().size();
00343 
00344   }
00345  
00346   /**
00347    * \brief Creates a collection of tasks on all the vertices in the graph,
00348    * with the same update function and priority
00349    * Must be called by all machines simultaneously
00350    */
00351   void add_task_to_all(update_function_type func,
00352                                double priority) {
00353     add_task_to_all_impl(func,priority);
00354     // forward add_task_to_all to all machines
00355    /*for (size_t i = 0;i < rmi.numprocs(); ++i) {
00356       if (i != rmi.procid()) {
00357         rmi.remote_call(i,
00358                         &distributed_chromatic_engine<Graph>::add_task_to_all_from_remote,
00359                         reinterpret_cast<size_t>(func),
00360                         priority);
00361       }
00362     }*/
00363     rmi.barrier();
00364   }
00365 
00366   /**
00367     Registers a sync operation.
00368     Must be called by all machine simultaneously
00369   */
00370   void set_sync(glshared_base& shared,
00371                 sync_function_type sync,
00372                 glshared_base::apply_function_type apply,
00373                 const any& zero,
00374                 size_t sync_interval = 0,
00375                 merge_function_type merge = NULL,
00376                 vertex_id_t rangelow = 0,
00377                 vertex_id_t rangehigh = -1) {
00378     ASSERT_MSG(merge != NULL, "merge is required for the distributed engine");
00379     sync_task st;
00380     st.sync_fun = sync;
00381     st.merge_fun = merge;
00382     st.apply_fun = apply;
00383     st.sync_interval = sync_interval;
00384     st.next_time = 0;
00385     st.zero = zero;
00386     st.rangelow = rangelow;
00387     st.rangehigh = rangehigh;
00388     st.sharedvariable = dynamic_cast<distributed_glshared_base*>(&shared) ;
00389     sync_tasks.push_back(st);
00390     rmi.barrier();
00391   }
00392 
00393   
00394   void generate_color_blocks() {
00395     // construct for each color, the set of vertices as well as the 
00396     // number of replicas for that vertex.
00397     // the number of replicas - 1 is the amount of communication
00398     // we have to perform to synchronize modifications to that vertex
00399 
00400 
00401     std::vector<std::vector<std::pair<size_t, vertex_id_t> > > color_block_and_weight;
00402     const size_t num_colors(graph.recompute_num_colors());
00403     // the list of vertices for each color
00404     color_block_and_weight.resize(num_colors);
00405     
00406     foreach(vertex_id_t v, graph.owned_vertices()) {
00407       color_block_and_weight[graph.get_color(v)].push_back(
00408                                     std::make_pair(graph.globalvid_to_replicas(v).size(), 
00409                                               graph.globalvid_to_localvid(v)));
00410     }
00411     color_block.clear();
00412     color_block.resize(num_colors);
00413     if (randomize_schedule) {
00414       for (size_t i = 0; i < color_block_and_weight.size(); ++i) {
00415         random::shuffle(color_block_and_weight[i].begin(),
00416                             color_block_and_weight[i].end());
00417       }
00418     }
00419     else {
00420       // optimize ordering. Sort in descending order
00421       // put all those which need a lot of communication in the front
00422       // to give communication the maximum amount if time possible.
00423       for (size_t i = 0; i < color_block_and_weight.size(); ++i) {
00424         std::sort(color_block_and_weight[i].rbegin(),
00425                   color_block_and_weight[i].rend());
00426       }
00427     }
00428 
00429     // insert the sorted vertices into the final color_block
00430     for (size_t i = 0;i < color_block_and_weight.size(); ++i ) {  
00431       std::transform(color_block_and_weight[i].begin(),
00432                     color_block_and_weight[i].end(), 
00433                     std::back_inserter(color_block[i]),
00434                     __gnu_cxx::select2nd<std::pair<size_t, vertex_id_t> >());
00435     }
00436     
00437   }
00438   
00439   /************  Actual Execution Engine ****************/
00440  private:
00441 
00442   atomic<size_t> curidx;
00443   barrier thread_color_barrier;
00444  public: 
00445   
00446   struct termination_evaluation{
00447     size_t pending_tasks;
00448     size_t executed_tasks;
00449     bool terminator;
00450     bool timeout;
00451     bool force_stop;
00452     termination_evaluation(): pending_tasks(0),
00453                               executed_tasks(0),
00454                               terminator(false),
00455                               timeout(false),
00456                               force_stop(false) { }
00457                               
00458     void save(oarchive &oarc) const {
00459       oarc << pending_tasks
00460            << executed_tasks
00461            << terminator
00462            << timeout
00463            << force_stop;
00464     }
00465     
00466     void load(iarchive &iarc) {
00467       iarc >> pending_tasks
00468            >> executed_tasks
00469            >> terminator
00470            >> timeout
00471            >> force_stop;
00472     }
00473   };
00474 
00475   /**
00476    * Initialize the sync tasks. Called by start()
00477    */
00478   void init_syncs() {
00479     active_sync_tasks.clear();
00480     // setup the intermediate values. initialize them to zero
00481     for (size_t i = 0;i < sync_tasks.size(); ++i) {
00482       sync_tasks[i].thread_intermediate.clear();
00483       sync_tasks[i].thread_intermediate.resize(ncpus, sync_tasks[i].zero);
00484       // everyone runs at the start even if scheduling interval is 0
00485       active_sync_tasks.push_back(&(sync_tasks[i]));
00486     }
00487   }
00488 
00489   /**
00490    * Called whenever a vertex is executed.
00491    * Accumulates the available syncs
00492    */
00493   void eval_syncs(vertex_id_t curvertex, iscope_type& scope, size_t threadid) {
00494     // go through all the active sync tasks
00495     foreach(sync_task* task, active_sync_tasks) {
00496       // if in range, sync!
00497       if (task->rangelow <= curvertex && curvertex <= task->rangehigh) {
00498         task->sync_fun(scope, task->thread_intermediate[threadid]);
00499       }
00500     }
00501   }
00502 
00503   /** Called at the end of the iteration. Called by all threads after a barrier*/
00504   void sync_end_iteration(size_t threadid) {
00505     // merge and apply all the syncs. distribute the work among the threads
00506     for (size_t curtask = threadid; curtask < active_sync_tasks.size(); curtask += ncpus) {
00507       sync_task* task = active_sync_tasks[curtask];
00508       task->mergeval = task->thread_intermediate[0];
00509       task->thread_intermediate[0] = task->zero;
00510       for(size_t i = 1; i < task->thread_intermediate.size(); ++i) {
00511         task->merge_fun(task->mergeval, task->thread_intermediate[i]);
00512         task->thread_intermediate[i] = task->zero;
00513       }
00514       // zero out the intermediate
00515       task->thread_intermediate.clear();
00516       task->thread_intermediate.resize(ncpus, sync_tasks[curtask].zero);
00517       // for efficiency, lets merge each sync task to the prefered machine
00518     }
00519     
00520     thread_color_barrier.wait();
00521 
00522     // one thread of each machine participates in |active_sync_tasks| gathers
00523     if (threadid == 0) {
00524       for (size_t i = 0;i < active_sync_tasks.size(); ++i) {
00525         sync_task* task = active_sync_tasks[i];
00526         procid_t target = task->sharedvariable->preferred_machine();
00527         std::vector<any> gathervals(rmi.numprocs());
00528         gathervals[rmi.procid()] = task->mergeval;
00529         rmi.gather(gathervals, target);
00530 
00531         // now if I am target I need to do the final merge and apply
00532         if (target == rmi.procid()) {
00533           task->mergeval = gathervals[0];
00534           for (size_t i = 1; i < gathervals.size(); ++i) {
00535             task->merge_fun(task->mergeval, gathervals[i]);
00536           }
00537           // apply!!!
00538           task->sharedvariable->apply(task->apply_fun, task->mergeval);
00539           numsyncs.inc();
00540         }
00541       }
00542     }
00543   }
00544 
00545   /** clears the active sync tasks and figure out what syncs to run next.
00546       Called by one thread from each machine after sync_end_iteration */
00547   void compute_sync_schedule(size_t num_executed_tasks) {
00548     // update the next time variable
00549     for (size_t i = 0;i < active_sync_tasks.size(); ++i) {
00550       sync_tasks[i].next_time = num_executed_tasks + sync_tasks[i].sync_interval;
00551       // if sync interval of 0, this was the first iteration.
00552       // then I just set next time to infinity and it will never be run again
00553       if (sync_tasks[i].sync_interval == 0) {
00554         sync_tasks[i].next_time = size_t(-1);
00555       }
00556     }
00557     active_sync_tasks.clear();
00558     // figure out what to run next
00559     for (size_t i = 0;i < sync_tasks.size(); ++i) {
00560       if (sync_tasks[i].next_time < num_executed_tasks) {
00561         active_sync_tasks.push_back(&(sync_tasks[i]));
00562       }
00563     }
00564   }
00565 
00566   /** Checks all machines for termination and sets the termination reason.
00567       Also returns the number of update tasks completed globally */
00568   size_t check_global_termination(bool check_dynamic_schedule) {
00569     std::vector<termination_evaluation> termination_test;
00570     termination_test.resize(rmi.numprocs());
00571     
00572     if (check_dynamic_schedule) {
00573       termination_test[rmi.procid()].pending_tasks = num_pending_tasks.value;
00574     }
00575 
00576     size_t numupdates = 0;
00577     for (size_t i = 0; i < update_counts.size(); ++i) numupdates += update_counts[i];
00578     termination_test[rmi.procid()].executed_tasks = numupdates;
00579   
00580     if (timeout_millis > 0 && ti.current_time_millis() > timeout_millis) {
00581       termination_test[rmi.procid()].timeout = true;
00582     }
00583     
00584     for (size_t i = rmi.procid(); i < term_functions.size(); i += rmi.numprocs()) {
00585       if (term_functions[i]()) {
00586         termination_test[rmi.procid()].terminator = true;
00587         break;
00588       }
00589     }
00590     termination_test[rmi.procid()].force_stop = force_stop;
00591     // gather all to 0.
00592     // machine 0 evaluates termiation
00593     rmi.gather(termination_test, 0);
00594     // used to globally evaluate termination
00595     termination_evaluation aggregate;
00596     if (rmi.procid() == 0) {
00597       for (size_t i = 0;i < termination_test.size(); ++i) {
00598         aggregate.pending_tasks += termination_test[i].pending_tasks;
00599         aggregate.executed_tasks += termination_test[i].executed_tasks;
00600         aggregate.terminator |= termination_test[i].terminator;
00601         aggregate.timeout |= termination_test[i].timeout;
00602         aggregate.force_stop |= termination_test[i].force_stop;
00603       }
00604       
00605       if (check_dynamic_schedule && aggregate.pending_tasks == 0) {
00606         termination_reason = EXEC_TASK_DEPLETION;
00607       }
00608       else if (task_budget > 0 && aggregate.executed_tasks >= task_budget) {
00609         termination_reason = EXEC_TASK_BUDGET_EXCEEDED;
00610       }
00611       else if (timeout_millis > 0 && aggregate.timeout) {
00612         termination_reason = EXEC_TIMEOUT;
00613       }
00614       else if (aggregate.terminator) {
00615         termination_reason = EXEC_TERM_FUNCTION;
00616       }
00617       else if (aggregate.force_stop) {
00618         termination_reason = EXEC_FORCED_ABORT;
00619       }
00620     }
00621     size_t treason = termination_reason;
00622     // note this is OK because only machine 0 will have the right value for
00623     // executed_tasks. And everyone is receiving from machine 0
00624     std::pair<size_t, size_t> reason_and_task(treason, aggregate.executed_tasks);
00625     rmi.broadcast(reason_and_task, rmi.procid() == 0);
00626     termination_reason = exec_status(reason_and_task.first);
00627     return reason_and_task.second;
00628   }
00629  
00630   void start_thread(size_t threadid) {
00631     // create the scope
00632     dgraph_scope<Graph> scope;
00633     timer ti;
00634 
00635     // loop over iterations
00636     size_t iter = 0;
00637     bool usestatic = max_iterations > 0;
00638     while(1) {
00639       // if max_iterations is defined, quit
00640       if (usestatic && iter >= max_iterations) {
00641         termination_reason = EXEC_TASK_DEPLETION;
00642         break;
00643       }
00644       bool hassynctasks = active_sync_tasks.size() > 0;
00645       // loop over colors    
00646       for (size_t c = 0;c < color_block.size(); ++c) {
00647         // internal loop over vertices in the color
00648         while(1) {
00649           // grab a vertex  
00650           size_t i = curidx.inc_ret_last();  
00651           // if index out of scope, we are done with this color. break
00652           if (i >= color_block[c].size()) break;
00653           // otherwise, get the local and globalvid
00654           vertex_id_t localvid = color_block[c][i];
00655           vertex_id_t globalvid = graph.localvid_to_globalvid(color_block[c][i]);
00656           if (usestatic || scheduled_vertices.clear_bit(localvid)) {
00657             if (!usestatic) num_pending_tasks.dec();
00658             // otherwise. run the vertex
00659             // create the scope
00660             scope.init(&graph, globalvid);
00661             // run the update function
00662             update_function(scope, callback);
00663             // check if there are tasks to run
00664             if (hassynctasks) eval_syncs(globalvid, scope, threadid);
00665             scope.commit_async_untracked();
00666             update_counts[threadid]++;
00667           }
00668           else {
00669             // ok this vertex is not scheduled. But if there are syncs
00670             // to run I will still need to get the scope
00671             scope.init(&graph, globalvid);
00672             if (hassynctasks) eval_syncs(globalvid, scope, threadid);
00673             scope.commit_async_untracked();
00674           }
00675         }
00676         // wait for all threads to synchronize on this color.
00677         thread_color_barrier.wait();
00678         curidx.value = 0;
00679         // full barrier on the color
00680         // this will complete synchronization of all add tasks as well
00681         if (threadid == 0) {
00682           ti.start();
00683           graph.wait_for_all_async_syncs();
00684           // TODO! If synchronize() calls were made then this barrier is necessary
00685           // but the time needed to figure out if a synchronize call is required 
00686           // could be as long as the barrier itself
00687           if (const_nbr_vertices == false || const_edges == false)  rmi.dc().barrier();
00688           rmi.dc().full_barrier();
00689           num_dist_barriers_called++;
00690           //std::cout << rmi.procid() << ": Full Barrier at end of color" << std::endl;
00691           barrier_time += ti.current_time();
00692         }
00693         thread_color_barrier.wait();
00694 
00695       }
00696 
00697 
00698       sync_end_iteration(threadid);
00699       thread_color_barrier.wait();
00700       if (threadid == 0) {
00701         ti.start();
00702         //std::cout << rmi.procid() << ": End of all colors" << std::endl;
00703         size_t numtasksdone = check_global_termination(!usestatic);
00704 
00705         //std::cout << numtasksdone << " tasks done" << std::endl;
00706         compute_sync_schedule(numtasksdone);
00707         barrier_time += ti.current_time();
00708       }
00709       // all threads must wait for 0
00710       thread_color_barrier.wait();
00711 
00712       ++iter;
00713       if (termination_reason != EXEC_UNSET) {
00714         //std::cout << rmi.procid() << ": Termination Reason: " << termination_reason << std::endl;
00715         break;
00716       }
00717     }
00718   }
00719   
00720   void set_const_edges(bool const_edges_ = true) {
00721     const_edges = const_edges_;
00722   }
00723 
00724   void set_const_nbr_vertices(bool const_nbr_vertices_ = true) {
00725     const_nbr_vertices = const_nbr_vertices_;
00726   }
00727 
00728   
00729   /** Execute the engine */
00730   void start() {
00731     assert(update_function != NULL);
00732     
00733     if (default_scope_range == scope_range::FULL_CONSISTENCY) {
00734       const_nbr_vertices = false;
00735     }
00736     // generate colors then
00737     // wait for everyone to enter start    
00738     generate_color_blocks();
00739     init_syncs();
00740     termination_reason = EXEC_UNSET;
00741     barrier_time = 0.0;
00742     force_stop = false;
00743     numsyncs.value = 0;
00744     num_dist_barriers_called = 0;
00745     std::fill(update_counts.begin(), update_counts.end(), 0);
00746     // two full barrers to complete flush replies
00747     rmi.dc().full_barrier();
00748     rmi.dc().full_barrier();
00749 
00750     // reset indices
00751     curidx.value = 0;
00752     ti.start();
00753     // spawn threads
00754     thread_group thrgrp; 
00755     for (size_t i = 0;i < ncpus; ++i) {
00756       size_t aff = use_cpu_affinity ? i : -1;
00757       thrgrp.launch(boost::bind(
00758                             &distributed_chromatic_engine<Graph>::start_thread,
00759                             this, i), aff);
00760     }
00761     
00762     thrgrp.join();              
00763     rmi.barrier();
00764     
00765 
00766     
00767     // proc 0 gathers all update counts
00768     std::vector<size_t> procupdatecounts(rmi.numprocs(), 0);
00769     procupdatecounts[rmi.procid()] = thisproc_update_counts();
00770     rmi.gather(procupdatecounts, 0);
00771     
00772     std::vector<double> barrier_times(rmi.numprocs(), 0);
00773     barrier_times[rmi.procid()] = barrier_time;
00774     rmi.gather(barrier_times, 0);
00775     // get RMI statistics
00776     std::map<std::string, size_t> ret = rmi.gather_statistics();
00777 
00778     if (rmi.procid() == 0) {
00779       engine_metrics.add("runtime",
00780                         ti.current_time(), TIME);
00781       total_update_count = 0;
00782       for(size_t i = 0; i < procupdatecounts.size(); ++i) {
00783         engine_metrics.add_vector_entry("updatecount", i, procupdatecounts[i]);
00784         total_update_count +=  procupdatecounts[i];
00785       }
00786       total_barrier_time = 0;
00787       for(size_t i = 0; i < barrier_times.size(); ++i) {
00788         engine_metrics.add_vector_entry("barrier_time", i, barrier_times[i]);
00789         total_barrier_time += barrier_times[i];
00790       }
00791 
00792       engine_metrics.set("termination_reason", 
00793                         exec_status_as_string(termination_reason));
00794       engine_metrics.add("dist_barriers_issued",
00795                         num_dist_barriers_called, INTEGER);
00796 
00797       engine_metrics.set("num_vertices", graph.num_vertices(), INTEGER);
00798       engine_metrics.set("num_edges", graph.num_edges(), INTEGER);
00799       engine_metrics.add("num_syncs", numsyncs.value, INTEGER);
00800       engine_metrics.set("isdynamic", max_iterations == 0, INTEGER);
00801       engine_metrics.add("iterations", max_iterations, INTEGER);
00802       engine_metrics.set("total_calls_sent", ret["total_calls_sent"], INTEGER);
00803       engine_metrics.set("total_bytes_sent", ret["total_bytes_sent"], INTEGER);
00804       total_bytes_sent = ret["total_bytes_sent"];
00805     }
00806     
00807     
00808     
00809   }
00810   
00811   /**
00812    * Performs a sync immediately. This function requires that the shared
00813    * variable already be registered with the engine.
00814    * and that the engine is not currently running
00815    * All processes must call simultaneously
00816    */
00817   void sync_now(glshared_base& shared) {
00818     // TODO
00819   }
00820   
00821     /** \brief Update the scheduler options.  */
00822   void set_engine_options(const scheduler_options& opts) {
00823     opts.get_int_option("max_iterations", max_iterations);
00824     opts.get_int_option("randomize_schedule", randomize_schedule);
00825     any uf;
00826     if (opts.get_any_option("update_function", uf)) {
00827       update_function = uf.as<update_function_type>();
00828     }
00829     rmi.barrier();
00830   }
00831 
00832   void set_scheduler_options(const scheduler_options& opts) {
00833   }
00834   
00835   void set_randomize_schedule(bool randomize_schedule_) {
00836     randomize_schedule = randomize_schedule_;
00837     rmi.barrier();
00838   }
00839 
00840   
00841   
00842   static void print_options_help(std::ostream &out) {
00843     out << "max_iterations = [integer, default = 0]\n";
00844     out << "randomize_schedule = [integer, default = 0]\n";
00845     out << "update_function = [update_function_type,"
00846       "default = set on add_task]\n";
00847   };
00848 
00849 
00850   void stop() {
00851     force_stop = true;
00852   }
00853 
00854   void register_monitor(imonitor_type* listener) {
00855     logger(LOG_FATAL, "distributed engine does not support register monitor");
00856   }   
00857   
00858   size_t total_update_count;
00859   
00860   size_t get_tasks_done() const {
00861     return  total_update_count;
00862   }
00863   
00864   double total_barrier_time;
00865   double get_barrier_time() const {
00866       return total_barrier_time;
00867   }
00868   
00869   long long int total_bytes_sent;
00870   long long int get_total_bytes_sent() {
00871      return total_bytes_sent;
00872   }
00873 
00874   metrics get_metrics() {
00875     return engine_metrics;
00876   }
00877 
00878 
00879   void reset_metrics() {
00880     engine_metrics.clear();
00881   }
00882 
00883 };
00884 
00885 } // namespace graphlab
00886 
00887 #include <graphlab/macros_undef.hpp>
00888 
00889 #endif // DISTRIBUTED_CHROMATIC_ENGINE_HPP
00890