00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef GRAPHLAB_DISTRIBUTED_CORE_HPP
00019 #define GRAPHLAB_DISTRIBUTED_CORE_HPP
00020
00021 #include <graphlab/engine/iengine.hpp>
00022 #include <graphlab/engine/engine_options.hpp>
00023 #include <graphlab/distributed2/distributed2_includes.hpp>
00024 #include <graphlab/util/command_line_options.hpp>
00025 #include <graphlab/util/mpi_tools.hpp>
00026 #include <graphlab/rpc/dc.hpp>
00027 #include <graphlab/rpc/dc_init_from_mpi.hpp>
00028 #include <graphlab/rpc/dc_init_from_env.hpp>
00029
00030 #include <graphlab/schedulers/ischeduler.hpp>
00031 #include <graphlab/scope/iscope.hpp>
00032 #include <graphlab/graph/graph.hpp>
00033
00034
00035
00036 #include <graphlab/metrics/metrics.hpp>
00037 #include <graphlab/metrics/reporters/null_reporter.hpp>
00038 #include <graphlab/metrics/reporters/basic_reporter.hpp>
00039 #include <graphlab/metrics/reporters/file_reporter.hpp>
00040 #include <graphlab/metrics/reporters/html_reporter.hpp>
00041
00042
00043
00044 #include <graphlab/macros_def.hpp>
00045 namespace graphlab {
00046
00047
00048 template<typename Graph> struct distributed_types;
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085 template <typename VertexType, typename EdgeType>
00086 class distributed_core {
00087 public:
00088 typedef graphlab::distributed_types<graphlab::distributed_graph<VertexType, EdgeType> > distributed_types;
00089
00090 public:
00091
00092
00093
00094 distributed_core(distributed_control &dc, std::string atomindex) :
00095 dc(dc),
00096 mgraph(dc, atomindex),
00097 mengine(NULL),
00098 coremetrics("distributed_core"), reporter(new null_reporter) { }
00099 private:
00100
00101 distributed_core(const distributed_core& other);
00102
00103 distributed_core& operator=(const distributed_core& other);
00104
00105 public:
00106
00107
00108
00109
00110
00111 ~distributed_core() {
00112 if (meopts.get_metrics_type() != "none") {
00113
00114 fill_metrics();
00115 report_metrics();
00116 }
00117 delete mengine;
00118 delete reporter;
00119 }
00120
00121
00122
00123
00124 typename distributed_types::distributed_graph& graph() { return mgraph; }
00125
00126
00127
00128
00129 const typename distributed_types::distributed_graph& graph() const { return mgraph; }
00130
00131
00132
00133
00134
00135
00136 void set_scheduler_type(const std::string& scheduler_type) {
00137 ASSERT_EQ(mengine, NULL);
00138 bool success = meopts.set_scheduler_type(scheduler_type);
00139 ASSERT_TRUE(success);
00140 }
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157 void set_scope_type(const std::string& scope_type) {
00158 ASSERT_EQ(mengine, NULL);
00159 bool success = meopts.set_scope_type(scope_type);
00160 ASSERT_TRUE(success);
00161 }
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175 void set_engine_type(const std::string& engine_type) {
00176 ASSERT_EQ(mengine, NULL);
00177 bool success = meopts.set_engine_type(engine_type);
00178 ASSERT_TRUE(success);
00179 }
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190 void set_metrics_type(const std::string& metrics_type) {
00191 bool metrics_set_success = meopts.set_metrics_type(metrics_type);
00192 ASSERT_TRUE(metrics_set_success);
00193
00194 delete reporter;
00195 if (meopts.get_metrics_type() == "file") {
00196 reporter = new file_reporter("graphlab_metrics.txt");
00197 } else if (meopts.get_metrics_type() == "html") {
00198 reporter = new html_reporter("graphlab_metrics.html");
00199 } else if (meopts.get_metrics_type() == "basic") {
00200 reporter = new basic_reporter;
00201 } else {
00202 reporter = new null_reporter;
00203 }
00204 }
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214 void set_ncpus(size_t ncpus) {
00215 ASSERT_EQ(mengine, NULL);
00216 meopts.set_ncpus(ncpus);
00217 }
00218
00219
00220
00221
00222
00223
00224
00225 typename distributed_types::iengine& engine() {
00226 ASSERT_NE(mengine, NULL);
00227 return *mengine;
00228 }
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238 bool build_engine() {
00239 ASSERT_EQ(mengine, NULL);
00240
00241 mengine = distributed_engine_factory::new_engine(dc, meopts, mgraph);
00242 if(mengine == NULL) return false;
00243 mengine->set_engine_options(meopts.get_engine_options());
00244 return true;
00245 }
00246
00247
00248
00249
00250
00251
00252 void set_engine_options(const engine_options& opts) {
00253 ASSERT_EQ(mengine, NULL);
00254 meopts = opts;
00255
00256 delete reporter;
00257 if (meopts.get_metrics_type() == "file") {
00258 reporter = new file_reporter("graphlab_metrics.txt");
00259 } else if (meopts.get_metrics_type() == "html") {
00260 reporter = new html_reporter("graphlab_metrics.html");
00261 } else if (meopts.get_metrics_type() == "basic") {
00262 reporter = new basic_reporter;
00263 } else {
00264 reporter = new null_reporter;
00265 }
00266 }
00267
00268
00269
00270
00271
00272 imetrics_reporter& get_reporter() {
00273 return *reporter;
00274 }
00275
00276
00277
00278
00279
00280 const engine_options& get_engine_options() const {
00281 return meopts;
00282 }
00283
00284
00285
00286
00287
00288
00289
00290 scheduler_options& sched_options() {
00291 return meopts.get_scheduler_options();
00292 }
00293
00294
00295
00296
00297
00298 const scheduler_options& sched_options() const{
00299 return meopts.get_scheduler_options();
00300 }
00301
00302
00303
00304
00305
00306
00307
00308
00309 bool parse_engine_options(int argc, char **argv) {
00310 ASSERT_EQ(mengine, NULL);
00311 command_line_options clopts;
00312 bool success = clopts.parse(argc, argv);
00313 ASSERT_TRUE(success);
00314 return set_engine_options(clopts);
00315 }
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325 double start() {
00326 if (mengine == NULL) {
00327 bool success = build_engine();
00328 ASSERT_TRUE(success);
00329 ASSERT_NE(mengine, NULL);
00330 }
00331
00332 mengine->set_scheduler_options( meopts.get_scheduler_options() );
00333 graphlab::timer ti;
00334 ti.start();
00335 mengine->start();
00336 return ti.current_time();
00337 }
00338
00339
00340
00341
00342
00343
00344
00345 void add_task(vertex_id_t vertex,
00346 typename distributed_types::update_function func,
00347 double priority) {
00348 typename distributed_types::update_task task(vertex, func);
00349 add_task(task, priority);
00350 }
00351
00352
00353
00354
00355
00356
00357
00358 void add_task(typename distributed_types::update_task task, double priority) {
00359 engine().add_task(task, priority);
00360 }
00361
00362
00363
00364
00365
00366
00367
00368 void add_tasks(const std::vector<vertex_id_t>& vertices,
00369 typename distributed_types::update_function func, double priority) {
00370 engine().add_tasks(vertices, func, priority);
00371 }
00372
00373
00374
00375
00376
00377
00378
00379 void add_task_to_all(typename distributed_types::update_function func,
00380 double priority) {
00381 engine().add_task_to_all(func, priority);
00382 }
00383
00384
00385
00386
00387
00388
00389 size_t last_update_count() {
00390 ASSERT_NE(mengine, NULL);
00391 return mengine->last_update_count();
00392 }
00393
00394
00395
00396
00397
00398 void fill_metrics() {
00399 coremetrics.set("ncpus", meopts.get_ncpus());
00400 coremetrics.set("engine", meopts.get_engine_type());
00401 coremetrics.set("scope", meopts.get_scope_type());
00402 coremetrics.set("scheduler", meopts.get_scheduler_type());
00403 coremetrics.set("affinities", meopts.get_cpu_affinities() ? "true" : "false");
00404 coremetrics.set("schedyield", meopts.get_sched_yield() ? "true" : "false");
00405 coremetrics.set("compile_flags", meopts.get_compile_flags());
00406 }
00407
00408
00409
00410
00411 void reset_metrics() {
00412 coremetrics.clear();
00413 if (mengine) engine().reset_metrics();
00414 }
00415
00416
00417
00418
00419 void report_metrics() {
00420 coremetrics.report(get_reporter());
00421 engine().report_metrics(get_reporter());
00422 }
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458 void set_sync(distributed_glshared_base& shared,
00459 typename distributed_types::iengine::sync_function_type sync,
00460 glshared_base::apply_function_type apply,
00461 const any& zero,
00462 size_t sync_interval ,
00463 typename distributed_types::iengine::merge_function_type merge ,
00464 vertex_id_t rangelow = 0,
00465 vertex_id_t rangehigh = -1) {
00466 engine().set_sync(shared, sync, apply, zero,
00467 sync_interval, merge, rangelow, rangehigh);
00468
00469 }
00470
00471
00472
00473
00474
00475
00476
00477 void sync_now(glshared_base& shared) ;
00478 private:
00479
00480
00481 distributed_control& dc;
00482
00483 typename distributed_types::distributed_graph mgraph;
00484 engine_options meopts;
00485 typename distributed_types::iengine *mengine;
00486
00487 metrics coremetrics;
00488 imetrics_reporter* reporter;
00489 };
00490
00491 }
00492 #include <graphlab/macros_undef.hpp>
00493 #endif
00494