00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef GRAPHLAB_DC_HPP
00019 #define GRAPHLAB_DC_HPP
00020 #include <iostream>
00021 #include <boost/iostreams/stream.hpp>
00022 #include <graphlab/parallel/pthread_tools.hpp>
00023 #include <graphlab/util/resizing_array_sink.hpp>
00024 #include <graphlab/util/blocking_queue.hpp>
00025 #include <graphlab/util/multi_blocking_queue.hpp>
00026 #include <graphlab/util/dense_bitset.hpp>
00027 #include <graphlab/serialization/serialization_includes.hpp>
00028 #include <graphlab/metrics/metrics.hpp>
00029
00030 #include <graphlab/rpc/dc_types.hpp>
00031 #include <graphlab/rpc/dc_internal_types.hpp>
00032
00033 #include <graphlab/rpc/dc_receive.hpp>
00034 #include <graphlab/rpc/dc_send.hpp>
00035 #include <graphlab/rpc/dc_comm_base.hpp>
00036 #include <graphlab/rpc/dc_dist_object_base.hpp>
00037
00038 #include <graphlab/rpc/is_rpc_call.hpp>
00039 #include <graphlab/rpc/portable_dispatch.hpp>
00040 #include <graphlab/rpc/portable_issue.hpp>
00041 #include <graphlab/rpc/function_call_issue.hpp>
00042 #include <graphlab/rpc/request_issue.hpp>
00043 #include <graphlab/rpc/reply_increment_counter.hpp>
00044 #include <graphlab/rpc/function_ret_type.hpp>
00045
00046 #include <boost/preprocessor.hpp>
00047 #include <graphlab/rpc/function_arg_types_def.hpp>
00048
00049 namespace graphlab {
00050
00051
00052
00053
00054
00055
00056 struct dc_init_param{
00057
00058
00059
00060
00061
00062
00063
00064
00065 std::vector<std::string> machines;
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 std::string initstring;
00084
00085
00086 procid_t curmachineid;
00087
00088 size_t numhandlerthreads;
00089
00090 dc_comm_type commtype;
00091 };
00092
00093 #define DEFAULT_NUMHANDLERTHREADS 8
00094 #define DEFAULT_COMMTYPE TCP_COMM
00095
00096
00097 class dc_services;
00098 class distributed_control;
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152 class distributed_control{
00153 public:
00154
00155 struct function_call_block{
00156 function_call_block() {}
00157 function_call_block(procid_t source, const dc_impl::packet_hdr& hdr,
00158 char* data, size_t len):
00159 source(source), hdr(hdr),
00160 data(data), len(len) {}
00161 procid_t source;
00162 dc_impl::packet_hdr hdr;
00163 char* data;
00164 size_t len;
00165 };
00166 private:
00167
00168 void init(const std::vector<std::string> &machines,
00169 const std::string &initstring,
00170 procid_t curmachineid,
00171 size_t numhandlerthreads,
00172 dc_comm_type commtype = DEFAULT_COMMTYPE);
00173
00174
00175 dc_impl::dc_comm_base* comm;
00176
00177
00178 std::vector<dc_impl::dc_receive*> receivers;
00179 std::vector<dc_impl::dc_send*> senders;
00180
00181
00182 thread_group fcallhandlers;
00183
00184
00185 multi_blocking_queue<function_call_block> fcallqueue;
00186
00187
00188 dc_impl::dispatch_map_type portable_dispatch_call_map;
00189 dc_impl::dispatch_map_type portable_dispatch_request_map;
00190
00191
00192
00193 std::vector<void*> registered_objects;
00194 std::vector<dc_impl::dc_dist_object_base*> registered_rmi_instance;
00195
00196
00197 dc_services* distributed_services;
00198
00199
00200 procid_t localprocid;
00201
00202
00203
00204 procid_t localnumprocs;
00205
00206 std::vector<atomic<size_t> > global_calls_sent;
00207 std::vector<atomic<size_t> > global_calls_received;
00208
00209 bool single_sender;
00210
00211
00212 friend void dc_recv_callback(void* tag, procid_t src, const char* buf, size_t len);
00213
00214
00215
00216 template <typename T> friend class dc_dist_object;
00217
00218
00219
00220 distributed_control& operator=(const distributed_control& dc) { return *this; }
00221
00222
00223 std::map<std::string, std::string> parse_options(std::string initstring);
00224
00225 volatile inline size_t num_registered_objects() {
00226 return registered_objects.size();
00227 }
00228
00229
00230 void compute_master_ranks();
00231 procid_t masterid;
00232
00233 metrics rpc_metrics;
00234
00235 public:
00236
00237
00238
00239 distributed_control(dc_init_param initparam) {
00240 init(initparam.machines,
00241 initparam.initstring,
00242 initparam.curmachineid,
00243 initparam.numhandlerthreads,
00244 initparam.commtype);
00245 }
00246
00247 distributed_control(const std::vector<std::string> &machines,
00248 const std::string &initstring,
00249 procid_t curmachineid,
00250 size_t numhandlerthreads = DEFAULT_NUMHANDLERTHREADS,
00251 dc_comm_type commtype = DEFAULT_COMMTYPE) {
00252 init(machines, initstring, curmachineid, numhandlerthreads, commtype);
00253 }
00254
00255 ~distributed_control();
00256
00257
00258 inline procid_t procid() const {
00259 return localprocid;
00260 }
00261
00262
00263 inline procid_t numprocs() const {
00264 return localnumprocs;
00265 }
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282 static unsigned char set_sequentialization_key(unsigned char newkey);
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303 static unsigned char new_sequentialization_key();
00304
00305
00306 static unsigned char get_sequentialization_key();
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325 #define GENARGS(Z,N,_) BOOST_PP_CAT(const T, N) BOOST_PP_CAT(&i, N)
00326 #define GENI(Z,N,_) BOOST_PP_CAT(i, N)
00327 #define GENT(Z,N,_) BOOST_PP_CAT(T, N)
00328 #define GENARC(Z,N,_) arc << BOOST_PP_CAT(i, N);
00329
00330 #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
00331 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
00332 void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
00333 ASSERT_LT(target, senders.size()); \
00334 BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
00335 <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
00336 ::exec(senders[target], BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
00337 } \
00338
00339
00340
00341
00342 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (remote_call, dc_impl::remote_call_issue, STANDARD_CALL) )
00343 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (fast_remote_call,dc_impl::remote_call_issue, FAST_CALL) )
00344 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (control_call, dc_impl::remote_call_issue, (FAST_CALL | CONTROL_PACKET)) )
00345
00346
00347 #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
00348 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
00349 BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
00350 ASSERT_LT(target, senders.size()); \
00351 return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
00352 <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
00353 ::exec(senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
00354 } \
00355
00356
00357
00358
00359
00360 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type remote_request, dc_impl::remote_request_issue, STANDARD_CALL) )
00361
00362 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type fast_remote_request, dc_impl::remote_request_issue, FAST_CALL) )
00363 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type control_request, dc_impl::remote_request_issue, (FAST_CALL | CONTROL_PACKET)) )
00364
00365
00366
00367 #undef RPC_INTERFACE_GENERATOR
00368 #undef REQUEST_INTERFACE_GENERATOR
00369 #undef GENARC
00370 #undef GENT
00371 #undef GENI
00372 #undef GENARGS
00373
00374
00375
00376
00377
00378
00379 void exec_function_call(procid_t source, const dc_impl::packet_hdr& hdr, std::istream &istrm);
00380
00381
00382
00383
00384
00385
00386
00387
00388 void deferred_function_call(procid_t source, const dc_impl::packet_hdr& hdr,
00389 char* buf, size_t len);
00390
00391
00392
00393
00394
00395 void fcallhandler_loop(size_t id);
00396
00397 inline void inc_calls_sent(procid_t procid) {
00398
00399 global_calls_sent[procid].inc();
00400 }
00401
00402 inline void inc_calls_received(procid_t procid) {
00403
00404 if (!full_barrier_in_effect) {
00405 global_calls_received[procid].inc();
00406 }
00407 else {
00408
00409
00410
00411 if (global_calls_received[procid].inc() == calls_to_receive[procid]) {
00412
00413 if (procs_complete.set_bit(procid) == false) {
00414
00415
00416
00417 full_barrier_lock.lock();
00418 if (num_proc_recvs_incomplete.dec() == 0) {
00419 full_barrier_cond.signal();
00420 }
00421 full_barrier_lock.unlock();
00422 }
00423 }
00424 }
00425 }
00426
00427
00428 inline size_t calls_sent() const {
00429 size_t ctr = 0;
00430 for (size_t i = 0;i < numprocs(); ++i) {
00431 ctr += global_calls_sent[i].value;
00432 }
00433 return ctr;
00434 }
00435
00436 inline size_t calls_received() const {
00437 size_t ctr = 0;
00438 for (size_t i = 0;i < numprocs(); ++i) {
00439 ctr += global_calls_received[i].value;
00440 }
00441 return ctr;
00442 }
00443
00444 inline size_t bytes_sent() const {
00445 if (single_sender) {
00446 return senders[0]->bytes_sent();
00447 }
00448 else {
00449 size_t ret = 0;
00450 for (size_t i = 0;i < senders.size(); ++i) ret += senders[i]->bytes_sent();
00451 return ret;
00452 }
00453 }
00454
00455
00456 inline size_t network_bytes_sent() const {
00457 return comm->network_bytes_sent();
00458 }
00459
00460 inline size_t bytes_received() const {
00461 size_t ret = 0;
00462 for (size_t i = 0;i < receivers.size(); ++i) ret += receivers[i]->bytes_received();
00463 return ret;
00464 }
00465
00466
00467
00468
00469
00470 inline bool is_master_rank() const {
00471 return masterid == procid();
00472 }
00473
00474
00475
00476
00477
00478
00479 inline procid_t master_rank() const {
00480 return masterid;
00481 }
00482
00483
00484
00485
00486 template <typename F, F f>
00487 void register_rpc(std::string c) {
00488 portable_dispatch_request_map[c] = (dc_impl::dispatch_type)
00489 dc_impl::portable_detail::find_dispatcher<F,
00490 __GLRPC_FRESULT,
00491 boost::function_traits<
00492 typename boost::remove_pointer<F>::type
00493 >::arity ,
00494 f,
00495 typename dc_impl::is_rpc_call<F>::type
00496 >::dispatch_request_fn();
00497
00498 portable_dispatch_call_map[c] = (dc_impl::dispatch_type)
00499 dc_impl::portable_detail::find_dispatcher<F,
00500 __GLRPC_FRESULT,
00501 boost::function_traits<
00502 typename boost::remove_pointer<F>::type
00503 >::arity ,
00504 f,
00505 typename dc_impl::is_rpc_call<F>::type
00506 >::dispatch_call_fn();
00507 }
00508
00509
00510 inline size_t register_object(void* v, dc_impl::dc_dist_object_base *rmiinstance) {
00511 ASSERT_NE(v, (void*)NULL);
00512 registered_objects.push_back(v);
00513 registered_rmi_instance.push_back(rmiinstance);
00514 return registered_objects.size() - 1;
00515 }
00516
00517
00518 inline void* get_registered_object(size_t id) {
00519 while(id >= num_registered_objects()) sched_yield();
00520 ASSERT_NE(registered_objects[id], (void*)NULL);
00521 return registered_objects[id];
00522 }
00523
00524 inline dc_impl::dc_dist_object_base* get_rmi_instance(size_t id) {
00525 while(id >= num_registered_objects()) sched_yield();
00526 ASSERT_NE(registered_rmi_instance[id], (void*)NULL);
00527 return registered_rmi_instance[id];
00528 }
00529 inline void clear_registered_object(size_t id) {
00530 registered_objects[id] = (void*)NULL;
00531 registered_rmi_instance[id] = NULL;
00532 }
00533
00534
00535
00536
00537
00538
00539 __attribute__((__deprecated__)) dc_services& services();
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550 void comm_barrier(procid_t targetmachine);
00551
00552
00553
00554
00555
00556
00557
00558 void comm_barrier();
00559
00560
00561
00562
00563 long long int total_bytes_sent;
00564 long long int get_total_bytes_sent() {
00565 return total_bytes_sent;
00566 }
00567
00568
00569
00570
00571
00572
00573 template <typename U>
00574 inline void send_to(procid_t target, U& t, bool control = false);
00575
00576
00577
00578
00579
00580 template <typename U>
00581 inline void recv_from(procid_t source, U& t, bool control = false);
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604 template <typename U>
00605 inline void broadcast(U& data, bool originator, bool control = false);
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616 template <typename U>
00617 inline void gather(std::vector<U>& data, procid_t sendto, bool control = false);
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627 template <typename U>
00628 inline void all_gather(std::vector<U>& data, bool control = false);
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642 template <typename U>
00643 inline void gather_partition(const std::vector<U>& local_contribution,
00644 std::vector< std::vector<U> >& ret_partition,
00645 bool control = false);
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655 void barrier();
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678 void full_barrier();
00679 private:
00680 mutex full_barrier_lock;
00681 conditional full_barrier_cond;
00682 std::vector<size_t> calls_to_receive;
00683
00684
00685
00686 bool full_barrier_in_effect;
00687
00688
00689
00690 atomic<size_t> num_proc_recvs_incomplete;
00691
00692
00693 dense_bitset procs_complete;
00694
00695
00696
00697
00698
00699 private:
00700 struct collected_statistics {
00701 size_t callssent;
00702 size_t bytessent;
00703 size_t network_bytessent;
00704 collected_statistics(): callssent(0), bytessent(0), network_bytessent(0) { }
00705 void save(oarchive &oarc) const {
00706 oarc << callssent << bytessent << network_bytessent;
00707 }
00708 void load(iarchive &iarc) {
00709 iarc >> callssent >> bytessent >> network_bytessent;
00710 }
00711 };
00712 public:
00713
00714
00715
00716 std::map<std::string, size_t> gather_statistics();
00717
00718
00719
00720
00721 void fill_metrics();
00722
00723
00724
00725
00726 inline metrics get_metrics() {
00727 return rpc_metrics;
00728 }
00729
00730 inline void reset_metrics() {
00731 logstream(LOG_WARNING) << "Metrics cannot be reset on distributed control" << std::endl;
00732 }
00733
00734
00735
00736
00737 inline void report_metrics(imetrics_reporter &reporter) {
00738 rpc_metrics.report(reporter);
00739 }
00740
00741 };
00742
00743
00744
00745
00746 }
00747
00748 #define REGISTER_RPC(dc, f) dc.register_rpc<typeof(f)*, f>(std::string(BOOST_PP_STRINGIZE(f)))
00749
00750 #include <graphlab/rpc/function_arg_types_undef.hpp>
00751 #include <graphlab/rpc/function_call_dispatch.hpp>
00752 #include <graphlab/rpc/request_dispatch.hpp>
00753 #include <graphlab/rpc/dc_dist_object.hpp>
00754 #include <graphlab/rpc/dc_services.hpp>
00755
00756 namespace graphlab {
00757
00758 template <typename U>
00759 inline void distributed_control::send_to(procid_t target, U& t, bool control) {
00760 distributed_services->send_to(target, t, control);
00761 }
00762
00763 template <typename U>
00764 inline void distributed_control::recv_from(procid_t source, U& t, bool control) {
00765 distributed_services->recv_from(source, t, control);
00766 }
00767
00768 template <typename U>
00769 inline void distributed_control::broadcast(U& data, bool originator, bool control) {
00770 distributed_services->broadcast(data, originator, control);
00771 }
00772
00773 template <typename U>
00774 inline void distributed_control::gather(std::vector<U>& data, procid_t sendto, bool control) {
00775 distributed_services->gather(data, sendto, control);
00776 }
00777
00778 template <typename U>
00779 inline void distributed_control::all_gather(std::vector<U>& data, bool control) {
00780 distributed_services->all_gather(data, control);
00781 }
00782
00783 template <typename U>
00784 inline void distributed_control::gather_partition(const std::vector<U>& local_contribution,
00785 std::vector< std::vector<U> >& ret_partition,
00786 bool control) {
00787 distributed_services->gather_partition(local_contribution, ret_partition, control);
00788 }
00789
00790
00791
00792 }
00793 #endif
00794