00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef DISTRIBUTED_CHROMATIC_ENGINE_HPP
00019 #define DISTRIBUTED_CHROMATIC_ENGINE_HPP
00020
00021 #include <functional>
00022 #include <algorithm>
00023 #include <ext/functional>
00024 #include <boost/bind.hpp>
00025 #include <graphlab/parallel/pthread_tools.hpp>
00026 #include <graphlab/parallel/atomic.hpp>
00027 #include <graphlab/util/timer.hpp>
00028 #include <graphlab/util/random.hpp>
00029 #include <graphlab/util/dense_bitset.hpp>
00030 #include <graphlab/util/mutable_queue.hpp>
00031
00032 #include <graphlab/engine/iengine.hpp>
00033 #include <graphlab/scope/iscope.hpp>
00034 #include <graphlab/tasks/update_task.hpp>
00035 #include <graphlab/logger/logger.hpp>
00036 #include <graphlab/metrics/metrics.hpp>
00037 #include <graphlab/schedulers/support/redirect_scheduler_callback.hpp>
00038
00039 #include <graphlab/rpc/dc.hpp>
00040 #include <graphlab/distributed2/distributed_glshared_manager.hpp>
00041 #include <graphlab/distributed2/graph/dgraph_scope.hpp>
00042
00043 #include <graphlab/macros_def.hpp>
00044
00045 namespace graphlab {
00046
00047
00048
00049
00050
00051
00052
00053
00054 template <typename Graph>
00055 class distributed_chromatic_engine : public iengine<Graph> {
00056 public:
00057 typedef iengine<Graph> iengine_base;
00058 typedef typename iengine_base::update_task_type update_task_type;
00059 typedef typename iengine_base::update_function_type update_function_type;
00060 typedef typename iengine_base::termination_function_type termination_function_type;
00061 typedef typename iengine_base::iscope_type iscope_type;
00062
00063 typedef typename iengine_base::sync_function_type sync_function_type;
00064 typedef typename iengine_base::merge_function_type merge_function_type;
00065
00066
00067 typedef imonitor<Graph> imonitor_type;
00068
00069 typedef redirect_scheduler_callback<Graph,
00070 distributed_chromatic_engine<Graph> > callback_type;
00071 typedef icallback<Graph> icallback_type;
00072
00073 private:
00074
00075 dc_dist_object<distributed_chromatic_engine<Graph> > rmi;
00076
00077
00078 Graph &graph;
00079
00080 callback_type callback;
00081
00082
00083 distributed_glshared_manager glshared_manager;
00084
00085
00086 size_t ncpus;
00087
00088
00089 bool use_cpu_affinity;
00090
00091
00092 bool use_sched_yield;
00093
00094
00095
00096 std::vector<size_t> update_counts;
00097
00098 atomic<size_t> numsyncs;
00099
00100
00101 std::vector<termination_function_type> term_functions;
00102
00103
00104 size_t timeout_millis;
00105 timer ti;
00106
00107
00108 bool force_stop;
00109
00110
00111 size_t task_budget;
00112
00113 size_t randomize_schedule;
00114
00115
00116 atomic<size_t> num_pending_tasks;
00117
00118
00119
00120 exec_status termination_reason;
00121
00122 scope_range::scope_range_enum default_scope_range;
00123
00124 std::vector<std::vector<vertex_id_t> > color_block;
00125 dense_bitset scheduled_vertices;
00126
00127
00128 update_function_type update_function;
00129 size_t max_iterations;
00130 double barrier_time;
00131 size_t num_dist_barriers_called;
00132
00133
00134 bool const_nbr_vertices, const_edges;
00135 struct sync_task {
00136 sync_function_type sync_fun;
00137 merge_function_type merge_fun;
00138 distributed_glshared_base::apply_function_type apply_fun;
00139 size_t sync_interval;
00140 size_t next_time;
00141 any zero;
00142 vertex_id_t rangelow;
00143 vertex_id_t rangehigh;
00144 distributed_glshared_base *sharedvariable;
00145 any mergeval;
00146 std::vector<any> thread_intermediate;
00147 sync_task() :
00148 sync_fun(NULL), merge_fun(NULL), apply_fun(NULL),
00149 sync_interval(-1),
00150 next_time(0), rangelow(0),
00151 rangehigh(vertex_id_t(-1)), sharedvariable(NULL) { }
00152 };
00153
00154
00155 std::vector<sync_task> sync_tasks;
00156
00157
00158 std::vector<sync_task*> active_sync_tasks;
00159
00160
00161 metrics engine_metrics;
00162
00163 public:
00164 distributed_chromatic_engine(distributed_control &dc,
00165 Graph& graph,
00166 size_t ncpus = 1):
00167 rmi(dc, this),
00168 graph(graph),
00169 callback(this),
00170 glshared_manager(dc),
00171 ncpus( std::max(ncpus, size_t(1)) ),
00172 use_cpu_affinity(false),
00173 use_sched_yield(true),
00174 update_counts(std::max(ncpus, size_t(1)), 0),
00175 timeout_millis(0),
00176 force_stop(false),
00177 task_budget(0),
00178 randomize_schedule(0),
00179 termination_reason(EXEC_UNSET),
00180 scheduled_vertices(graph.owned_vertices().size()),
00181 update_function(NULL),
00182 max_iterations(0),
00183 barrier_time(0.0),
00184 const_nbr_vertices(true),
00185 const_edges(false),
00186 engine_metrics("engine"),
00187 thread_color_barrier(ncpus){
00188 rmi.barrier();
00189 }
00190
00191 ~distributed_chromatic_engine() {
00192 rmi.barrier();
00193 }
00194
00195
00196
00197 size_t get_ncpus() const { return ncpus; }
00198
00199
00200 void set_sched_yield(bool value) {
00201 use_sched_yield = value;
00202 rmi.barrier();
00203 }
00204
00205 void set_cpu_affinities(bool value) {
00206 use_cpu_affinity = value;
00207 rmi.barrier();
00208 }
00209
00210
00211
00212
00213
00214
00215 void set_default_scope(scope_range::scope_range_enum default_scope_range_) {
00216 default_scope_range = default_scope_range_;
00217 rmi.barrier();
00218 }
00219
00220 using iengine<Graph>::exec_status_as_string;
00221
00222
00223
00224
00225
00226 exec_status last_exec_status() const {
00227 return termination_reason;
00228 }
00229
00230
00231
00232
00233
00234
00235 size_t thisproc_update_counts() const {
00236 size_t sum = 0;
00237 for(size_t i = 0; i < update_counts.size(); ++i)
00238 sum += update_counts[i];
00239 return sum;
00240 }
00241
00242
00243
00244
00245
00246 size_t last_update_count() const {
00247 return total_update_count;
00248 }
00249
00250
00251
00252
00253
00254
00255 void add_terminator(termination_function_type term) {
00256 term_functions.push_back(term);
00257 rmi.barrier();
00258 }
00259
00260
00261
00262
00263
00264
00265 void clear_terminators() {
00266 term_functions.clear();
00267 rmi.barrier();
00268 }
00269
00270
00271
00272
00273
00274
00275
00276 void set_timeout(size_t timeout_seconds = 0) {
00277 timeout_millis = timeout_seconds * 1000;
00278 rmi.barrier();
00279 }
00280
00281
00282
00283
00284
00285
00286 void set_task_budget(size_t max_tasks) {
00287 task_budget = max_tasks;
00288 rmi.barrier();
00289 }
00290
00291
00292
00293
00294
00295
00296
00297
00298 void add_task(update_task_type task, double priority) {
00299 if (update_function != NULL) assert(update_function == task.function());
00300 else update_function = task.function();
00301
00302 if (graph.is_owned(task.vertex())) {
00303 num_pending_tasks.inc(!
00304 scheduled_vertices.set_bit(graph.globalvid_to_localvid(task.vertex()))
00305 );
00306
00307 }
00308 else {
00309 rmi.remote_call(graph.globalvid_to_owner(task.vertex()),
00310 &distributed_chromatic_engine<Graph>::add_task,
00311 task,
00312 priority);
00313 }
00314 }
00315
00316
00317
00318
00319
00320
00321 void add_tasks(const std::vector<vertex_id_t>& vertices,
00322 update_function_type func, double priority) {
00323
00324 for (size_t i = 0;i < vertices.size(); ++i) {
00325 add_task(update_task_type(vertices[i], func), priority);
00326 }
00327 }
00328
00329
00330 void add_task_to_all_from_remote(size_t func,
00331 double priority) {
00332 add_task_to_all_impl(reinterpret_cast<update_function_type>(func), priority);
00333 }
00334
00335
00336 void add_task_to_all_impl(update_function_type func,
00337 double priority) {
00338 if (update_function != NULL) assert(update_function == func);
00339 else update_function = func;
00340
00341 scheduled_vertices.fill();
00342 num_pending_tasks.value = graph.owned_vertices().size();
00343
00344 }
00345
00346
00347
00348
00349
00350
00351 void add_task_to_all(update_function_type func,
00352 double priority) {
00353 add_task_to_all_impl(func,priority);
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363 rmi.barrier();
00364 }
00365
00366
00367
00368
00369
00370 void set_sync(glshared_base& shared,
00371 sync_function_type sync,
00372 glshared_base::apply_function_type apply,
00373 const any& zero,
00374 size_t sync_interval = 0,
00375 merge_function_type merge = NULL,
00376 vertex_id_t rangelow = 0,
00377 vertex_id_t rangehigh = -1) {
00378 ASSERT_MSG(merge != NULL, "merge is required for the distributed engine");
00379 sync_task st;
00380 st.sync_fun = sync;
00381 st.merge_fun = merge;
00382 st.apply_fun = apply;
00383 st.sync_interval = sync_interval;
00384 st.next_time = 0;
00385 st.zero = zero;
00386 st.rangelow = rangelow;
00387 st.rangehigh = rangehigh;
00388 st.sharedvariable = dynamic_cast<distributed_glshared_base*>(&shared) ;
00389 sync_tasks.push_back(st);
00390 rmi.barrier();
00391 }
00392
00393
00394 void generate_color_blocks() {
00395
00396
00397
00398
00399
00400
00401 std::vector<std::vector<std::pair<size_t, vertex_id_t> > > color_block_and_weight;
00402 const size_t num_colors(graph.recompute_num_colors());
00403
00404 color_block_and_weight.resize(num_colors);
00405
00406 foreach(vertex_id_t v, graph.owned_vertices()) {
00407 color_block_and_weight[graph.get_color(v)].push_back(
00408 std::make_pair(graph.globalvid_to_replicas(v).size(),
00409 graph.globalvid_to_localvid(v)));
00410 }
00411 color_block.clear();
00412 color_block.resize(num_colors);
00413 if (randomize_schedule) {
00414 for (size_t i = 0; i < color_block_and_weight.size(); ++i) {
00415 random::shuffle(color_block_and_weight[i].begin(),
00416 color_block_and_weight[i].end());
00417 }
00418 }
00419 else {
00420
00421
00422
00423 for (size_t i = 0; i < color_block_and_weight.size(); ++i) {
00424 std::sort(color_block_and_weight[i].rbegin(),
00425 color_block_and_weight[i].rend());
00426 }
00427 }
00428
00429
00430 for (size_t i = 0;i < color_block_and_weight.size(); ++i ) {
00431 std::transform(color_block_and_weight[i].begin(),
00432 color_block_and_weight[i].end(),
00433 std::back_inserter(color_block[i]),
00434 __gnu_cxx::select2nd<std::pair<size_t, vertex_id_t> >());
00435 }
00436
00437 }
00438
00439
00440 private:
00441
00442 atomic<size_t> curidx;
00443 barrier thread_color_barrier;
00444 public:
00445
00446 struct termination_evaluation{
00447 size_t pending_tasks;
00448 size_t executed_tasks;
00449 bool terminator;
00450 bool timeout;
00451 bool force_stop;
00452 termination_evaluation(): pending_tasks(0),
00453 executed_tasks(0),
00454 terminator(false),
00455 timeout(false),
00456 force_stop(false) { }
00457
00458 void save(oarchive &oarc) const {
00459 oarc << pending_tasks
00460 << executed_tasks
00461 << terminator
00462 << timeout
00463 << force_stop;
00464 }
00465
00466 void load(iarchive &iarc) {
00467 iarc >> pending_tasks
00468 >> executed_tasks
00469 >> terminator
00470 >> timeout
00471 >> force_stop;
00472 }
00473 };
00474
00475
00476
00477
00478 void init_syncs() {
00479 active_sync_tasks.clear();
00480
00481 for (size_t i = 0;i < sync_tasks.size(); ++i) {
00482 sync_tasks[i].thread_intermediate.clear();
00483 sync_tasks[i].thread_intermediate.resize(ncpus, sync_tasks[i].zero);
00484
00485 active_sync_tasks.push_back(&(sync_tasks[i]));
00486 }
00487 }
00488
00489
00490
00491
00492
00493 void eval_syncs(vertex_id_t curvertex, iscope_type& scope, size_t threadid) {
00494
00495 foreach(sync_task* task, active_sync_tasks) {
00496
00497 if (task->rangelow <= curvertex && curvertex <= task->rangehigh) {
00498 task->sync_fun(scope, task->thread_intermediate[threadid]);
00499 }
00500 }
00501 }
00502
00503
00504 void sync_end_iteration(size_t threadid) {
00505
00506 for (size_t curtask = threadid; curtask < active_sync_tasks.size(); curtask += ncpus) {
00507 sync_task* task = active_sync_tasks[curtask];
00508 task->mergeval = task->thread_intermediate[0];
00509 task->thread_intermediate[0] = task->zero;
00510 for(size_t i = 1; i < task->thread_intermediate.size(); ++i) {
00511 task->merge_fun(task->mergeval, task->thread_intermediate[i]);
00512 task->thread_intermediate[i] = task->zero;
00513 }
00514
00515 task->thread_intermediate.clear();
00516 task->thread_intermediate.resize(ncpus, sync_tasks[curtask].zero);
00517
00518 }
00519
00520 thread_color_barrier.wait();
00521
00522
00523 if (threadid == 0) {
00524 for (size_t i = 0;i < active_sync_tasks.size(); ++i) {
00525 sync_task* task = active_sync_tasks[i];
00526 procid_t target = task->sharedvariable->preferred_machine();
00527 std::vector<any> gathervals(rmi.numprocs());
00528 gathervals[rmi.procid()] = task->mergeval;
00529 rmi.gather(gathervals, target);
00530
00531
00532 if (target == rmi.procid()) {
00533 task->mergeval = gathervals[0];
00534 for (size_t i = 1; i < gathervals.size(); ++i) {
00535 task->merge_fun(task->mergeval, gathervals[i]);
00536 }
00537
00538 task->sharedvariable->apply(task->apply_fun, task->mergeval);
00539 numsyncs.inc();
00540 }
00541 }
00542 }
00543 }
00544
00545
00546
00547 void compute_sync_schedule(size_t num_executed_tasks) {
00548
00549 for (size_t i = 0;i < active_sync_tasks.size(); ++i) {
00550 sync_tasks[i].next_time = num_executed_tasks + sync_tasks[i].sync_interval;
00551
00552
00553 if (sync_tasks[i].sync_interval == 0) {
00554 sync_tasks[i].next_time = size_t(-1);
00555 }
00556 }
00557 active_sync_tasks.clear();
00558
00559 for (size_t i = 0;i < sync_tasks.size(); ++i) {
00560 if (sync_tasks[i].next_time < num_executed_tasks) {
00561 active_sync_tasks.push_back(&(sync_tasks[i]));
00562 }
00563 }
00564 }
00565
00566
00567
00568 size_t check_global_termination(bool check_dynamic_schedule) {
00569 std::vector<termination_evaluation> termination_test;
00570 termination_test.resize(rmi.numprocs());
00571
00572 if (check_dynamic_schedule) {
00573 termination_test[rmi.procid()].pending_tasks = num_pending_tasks.value;
00574 }
00575
00576 size_t numupdates = 0;
00577 for (size_t i = 0; i < update_counts.size(); ++i) numupdates += update_counts[i];
00578 termination_test[rmi.procid()].executed_tasks = numupdates;
00579
00580 if (timeout_millis > 0 && ti.current_time_millis() > timeout_millis) {
00581 termination_test[rmi.procid()].timeout = true;
00582 }
00583
00584 for (size_t i = rmi.procid(); i < term_functions.size(); i += rmi.numprocs()) {
00585 if (term_functions[i]()) {
00586 termination_test[rmi.procid()].terminator = true;
00587 break;
00588 }
00589 }
00590 termination_test[rmi.procid()].force_stop = force_stop;
00591
00592
00593 rmi.gather(termination_test, 0);
00594
00595 termination_evaluation aggregate;
00596 if (rmi.procid() == 0) {
00597 for (size_t i = 0;i < termination_test.size(); ++i) {
00598 aggregate.pending_tasks += termination_test[i].pending_tasks;
00599 aggregate.executed_tasks += termination_test[i].executed_tasks;
00600 aggregate.terminator |= termination_test[i].terminator;
00601 aggregate.timeout |= termination_test[i].timeout;
00602 aggregate.force_stop |= termination_test[i].force_stop;
00603 }
00604
00605 if (check_dynamic_schedule && aggregate.pending_tasks == 0) {
00606 termination_reason = EXEC_TASK_DEPLETION;
00607 }
00608 else if (task_budget > 0 && aggregate.executed_tasks >= task_budget) {
00609 termination_reason = EXEC_TASK_BUDGET_EXCEEDED;
00610 }
00611 else if (timeout_millis > 0 && aggregate.timeout) {
00612 termination_reason = EXEC_TIMEOUT;
00613 }
00614 else if (aggregate.terminator) {
00615 termination_reason = EXEC_TERM_FUNCTION;
00616 }
00617 else if (aggregate.force_stop) {
00618 termination_reason = EXEC_FORCED_ABORT;
00619 }
00620 }
00621 size_t treason = termination_reason;
00622
00623
00624 std::pair<size_t, size_t> reason_and_task(treason, aggregate.executed_tasks);
00625 rmi.broadcast(reason_and_task, rmi.procid() == 0);
00626 termination_reason = exec_status(reason_and_task.first);
00627 return reason_and_task.second;
00628 }
00629
00630 void start_thread(size_t threadid) {
00631
00632 dgraph_scope<Graph> scope;
00633 timer ti;
00634
00635
00636 size_t iter = 0;
00637 bool usestatic = max_iterations > 0;
00638 while(1) {
00639
00640 if (usestatic && iter >= max_iterations) {
00641 termination_reason = EXEC_TASK_DEPLETION;
00642 break;
00643 }
00644 bool hassynctasks = active_sync_tasks.size() > 0;
00645
00646 for (size_t c = 0;c < color_block.size(); ++c) {
00647
00648 while(1) {
00649
00650 size_t i = curidx.inc_ret_last();
00651
00652 if (i >= color_block[c].size()) break;
00653
00654 vertex_id_t localvid = color_block[c][i];
00655 vertex_id_t globalvid = graph.localvid_to_globalvid(color_block[c][i]);
00656 if (usestatic || scheduled_vertices.clear_bit(localvid)) {
00657 if (!usestatic) num_pending_tasks.dec();
00658
00659
00660 scope.init(&graph, globalvid);
00661
00662 update_function(scope, callback);
00663
00664 if (hassynctasks) eval_syncs(globalvid, scope, threadid);
00665 scope.commit_async_untracked();
00666 update_counts[threadid]++;
00667 }
00668 else {
00669
00670
00671 scope.init(&graph, globalvid);
00672 if (hassynctasks) eval_syncs(globalvid, scope, threadid);
00673 scope.commit_async_untracked();
00674 }
00675 }
00676
00677 thread_color_barrier.wait();
00678 curidx.value = 0;
00679
00680
00681 if (threadid == 0) {
00682 ti.start();
00683 graph.wait_for_all_async_syncs();
00684
00685
00686
00687 if (const_nbr_vertices == false || const_edges == false) rmi.dc().barrier();
00688 rmi.dc().full_barrier();
00689 num_dist_barriers_called++;
00690
00691 barrier_time += ti.current_time();
00692 }
00693 thread_color_barrier.wait();
00694
00695 }
00696
00697
00698 sync_end_iteration(threadid);
00699 thread_color_barrier.wait();
00700 if (threadid == 0) {
00701 ti.start();
00702
00703 size_t numtasksdone = check_global_termination(!usestatic);
00704
00705
00706 compute_sync_schedule(numtasksdone);
00707 barrier_time += ti.current_time();
00708 }
00709
00710 thread_color_barrier.wait();
00711
00712 ++iter;
00713 if (termination_reason != EXEC_UNSET) {
00714
00715 break;
00716 }
00717 }
00718 }
00719
00720 void set_const_edges(bool const_edges_ = true) {
00721 const_edges = const_edges_;
00722 }
00723
00724 void set_const_nbr_vertices(bool const_nbr_vertices_ = true) {
00725 const_nbr_vertices = const_nbr_vertices_;
00726 }
00727
00728
00729
00730 void start() {
00731 assert(update_function != NULL);
00732
00733 if (default_scope_range == scope_range::FULL_CONSISTENCY) {
00734 const_nbr_vertices = false;
00735 }
00736
00737
00738 generate_color_blocks();
00739 init_syncs();
00740 termination_reason = EXEC_UNSET;
00741 barrier_time = 0.0;
00742 force_stop = false;
00743 numsyncs.value = 0;
00744 num_dist_barriers_called = 0;
00745 std::fill(update_counts.begin(), update_counts.end(), 0);
00746
00747 rmi.dc().full_barrier();
00748 rmi.dc().full_barrier();
00749
00750
00751 curidx.value = 0;
00752 ti.start();
00753
00754 thread_group thrgrp;
00755 for (size_t i = 0;i < ncpus; ++i) {
00756 size_t aff = use_cpu_affinity ? i : -1;
00757 thrgrp.launch(boost::bind(
00758 &distributed_chromatic_engine<Graph>::start_thread,
00759 this, i), aff);
00760 }
00761
00762 thrgrp.join();
00763 rmi.barrier();
00764
00765
00766
00767
00768 std::vector<size_t> procupdatecounts(rmi.numprocs(), 0);
00769 procupdatecounts[rmi.procid()] = thisproc_update_counts();
00770 rmi.gather(procupdatecounts, 0);
00771
00772 std::vector<double> barrier_times(rmi.numprocs(), 0);
00773 barrier_times[rmi.procid()] = barrier_time;
00774 rmi.gather(barrier_times, 0);
00775
00776 std::map<std::string, size_t> ret = rmi.gather_statistics();
00777
00778 if (rmi.procid() == 0) {
00779 engine_metrics.add("runtime",
00780 ti.current_time(), TIME);
00781 total_update_count = 0;
00782 for(size_t i = 0; i < procupdatecounts.size(); ++i) {
00783 engine_metrics.add_vector_entry("updatecount", i, procupdatecounts[i]);
00784 total_update_count += procupdatecounts[i];
00785 }
00786 total_barrier_time = 0;
00787 for(size_t i = 0; i < barrier_times.size(); ++i) {
00788 engine_metrics.add_vector_entry("barrier_time", i, barrier_times[i]);
00789 total_barrier_time += barrier_times[i];
00790 }
00791
00792 engine_metrics.set("termination_reason",
00793 exec_status_as_string(termination_reason));
00794 engine_metrics.add("dist_barriers_issued",
00795 num_dist_barriers_called, INTEGER);
00796
00797 engine_metrics.set("num_vertices", graph.num_vertices(), INTEGER);
00798 engine_metrics.set("num_edges", graph.num_edges(), INTEGER);
00799 engine_metrics.add("num_syncs", numsyncs.value, INTEGER);
00800 engine_metrics.set("isdynamic", max_iterations == 0, INTEGER);
00801 engine_metrics.add("iterations", max_iterations, INTEGER);
00802 engine_metrics.set("total_calls_sent", ret["total_calls_sent"], INTEGER);
00803 engine_metrics.set("total_bytes_sent", ret["total_bytes_sent"], INTEGER);
00804 total_bytes_sent = ret["total_bytes_sent"];
00805 }
00806
00807
00808
00809 }
00810
00811
00812
00813
00814
00815
00816
00817 void sync_now(glshared_base& shared) {
00818
00819 }
00820
00821
00822 void set_engine_options(const scheduler_options& opts) {
00823 opts.get_int_option("max_iterations", max_iterations);
00824 opts.get_int_option("randomize_schedule", randomize_schedule);
00825 any uf;
00826 if (opts.get_any_option("update_function", uf)) {
00827 update_function = uf.as<update_function_type>();
00828 }
00829 rmi.barrier();
00830 }
00831
00832 void set_scheduler_options(const scheduler_options& opts) {
00833 }
00834
00835 void set_randomize_schedule(bool randomize_schedule_) {
00836 randomize_schedule = randomize_schedule_;
00837 rmi.barrier();
00838 }
00839
00840
00841
00842 static void print_options_help(std::ostream &out) {
00843 out << "max_iterations = [integer, default = 0]\n";
00844 out << "randomize_schedule = [integer, default = 0]\n";
00845 out << "update_function = [update_function_type,"
00846 "default = set on add_task]\n";
00847 };
00848
00849
00850 void stop() {
00851 force_stop = true;
00852 }
00853
00854 void register_monitor(imonitor_type* listener) {
00855 logger(LOG_FATAL, "distributed engine does not support register monitor");
00856 }
00857
00858 size_t total_update_count;
00859
00860 size_t get_tasks_done() const {
00861 return total_update_count;
00862 }
00863
00864 double total_barrier_time;
00865 double get_barrier_time() const {
00866 return total_barrier_time;
00867 }
00868
00869 long long int total_bytes_sent;
00870 long long int get_total_bytes_sent() {
00871 return total_bytes_sent;
00872 }
00873
00874 metrics get_metrics() {
00875 return engine_metrics;
00876 }
00877
00878
00879 void reset_metrics() {
00880 engine_metrics.clear();
00881 }
00882
00883 };
00884
00885 }
00886
00887 #include <graphlab/macros_undef.hpp>
00888
00889 #endif // DISTRIBUTED_CHROMATIC_ENGINE_HPP
00890