00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef FROM_DISTRIBUTED_GRAPH_INCLUDE
00019 #warning "distributed_graph_synchronization.hpp should not be included directly."
00020 #warning "You should include only distributed_graph.hpp"
00021 #warning "I will fix this for you now, but don't do it again!"
00022
00023 #include <graphlab/distributed2/graph/distributed_graph.hpp>
00024 #include <boost/range/iterator_range.hpp>
00025 #else
00026
00027
00028
00029
00030
00031
00032
00033 template <typename VertexData, typename EdgeData>
00034 void distributed_graph<VertexData, EdgeData>::synchronize_vertex(vertex_id_t vid, bool async) {
00035 vertex_id_t localvid = global2localvid[vid];
00036 if (is_ghost(vid)) {
00037 vertex_conditional_store out;
00038 out.hasdata = localstore.vertex_modified(localvid);
00039 if (out.hasdata) {
00040 localstore.set_vertex_modified(localvid, false);
00041 out.data.first = localstore.vertex_data(localvid);
00042 }
00043 if (async == false) {
00044 vertex_conditional_store v;
00045 v = rmi.remote_request(localvid2owner[localvid],
00046 &distributed_graph<VertexData, EdgeData>::get_vertex_if_version_less_than,
00047 vid,
00048 localstore.vertex_version(localvid),
00049 out);
00050 if (v.hasdata) {
00051 update_vertex_data_and_version(vid, v);
00052 }
00053 }
00054 else {
00055 pending_async_updates.flag.inc();
00056 rmi.remote_call(localvid2owner[localvid],
00057 &distributed_graph<VertexData, EdgeData>::async_get_vertex_if_version_less_than,
00058 rmi.procid(),
00059 vid,
00060 localstore.vertex_version(localvid),
00061 out);
00062 }
00063 }
00064 }
00065
00066
00067
00068
00069
00070 template <typename VertexData, typename EdgeData>
00071 void distributed_graph<VertexData, EdgeData>::synchronize_edge(edge_id_t eid, bool async) {
00072 vertex_id_t localtargetvid = localstore.target(eid);
00073 vertex_id_t targetvid = local2globalvid[localtargetvid];
00074 vertex_id_t localsourcevid = localstore.source(eid);
00075 vertex_id_t sourcevid = local2globalvid[localsourcevid];
00076
00077 if (is_ghost(targetvid)) {
00078
00079 edge_conditional_store out;
00080 out.hasdata = localstore.edge_modified(eid);
00081 if (out.hasdata) {
00082 localstore.set_edge_modified(eid, false);
00083 out.data.first = localstore.edge_data(eid);
00084 }
00085 if (async == false) {
00086 edge_conditional_store e = rmi.remote_request(localvid2owner[localtargetvid],
00087 &distributed_graph<VertexData, EdgeData>::get_edge_if_version_less_than2,
00088 sourcevid,
00089 targetvid,
00090 localstore.edge_version(eid),
00091 out);
00092 if (e.hasdata) {
00093 update_edge_data_and_version2(sourcevid, targetvid, e);
00094 }
00095 }
00096 else {
00097 pending_async_updates.flag.inc();
00098 rmi.remote_call(localvid2owner[localtargetvid],
00099 &distributed_graph<VertexData, EdgeData>::async_get_edge_if_version_less_than2,
00100 rmi.procid(),
00101 sourcevid,
00102 targetvid,
00103 localstore.edge_version(eid),
00104 out);
00105 }
00106 }
00107 }
00108
00109 template <typename VertexData, typename EdgeData>
00110 void distributed_graph<VertexData, EdgeData>::synchronize_scope(vertex_id_t vid, bool async) {
00111
00112 std::map<procid_t, std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> > requests;
00113 synchronize_scope_construct_req(vid, requests);
00114
00115 if (async) {
00116
00117 typename std::map<procid_t,
00118 std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> >::iterator iter;
00119 iter = requests.begin();
00120 size_t replytarget = reinterpret_cast<size_t>(&pending_async_updates);
00121 pending_async_updates.flag.inc(requests.size());
00122
00123 while(iter != requests.end()) {
00124 rmi.remote_call(iter->first,
00125 &distributed_graph<VertexData, EdgeData>::async_get_alot2,
00126 rmi.procid(),
00127 iter->second.first,
00128 replytarget,
00129 0);
00130 ++iter;
00131 }
00132 }
00133 else {
00134
00135 dc_impl::reply_ret_type reply(true, 0);
00136 typename std::map<procid_t,
00137 std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> >::iterator iter;
00138 iter = requests.begin();
00139 size_t replytarget = reinterpret_cast<size_t>(&reply);
00140 reply.flag.inc(requests.size());
00141
00142 while(iter != requests.end()) {
00143 rmi.remote_call(iter->first,
00144 &distributed_graph<VertexData, EdgeData>::async_get_alot2,
00145 rmi.procid(),
00146 iter->second.first,
00147 replytarget,
00148 0);
00149 ++iter;
00150 }
00151
00152 reply.wait();
00153 }
00154
00155
00156
00157
00158
00159 }
00160
00161
00162
00163
00164
00165
00166
00167 template <typename VertexData, typename EdgeData>
00168 void distributed_graph<VertexData, EdgeData>::synchronize_scope_construct_req(vertex_id_t vid,
00169 std::map<procid_t, std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> > &requests) {
00170 ASSERT_FALSE(is_ghost(vid));
00171 if (boundaryscopesset.find(vid) == boundaryscopesset.end()) return;
00172
00173
00174
00175
00176
00177
00178 vertex_id_t localvid = global2localvid[vid];
00179 requests.clear();
00180
00181
00182
00183
00184
00185 foreach(edge_id_t localineid, localstore.in_edge_ids(localvid)) {
00186 vertex_id_t localsourcevid = localstore.source(localineid);
00187 if (localvid_is_ghost(localsourcevid)) {
00188
00189 procid_t sourceowner = localvid2owner[localsourcevid];
00190 block_synchronize_request2 &req = requests[sourceowner].first;
00191 req.vid.push_back(local2globalvid[localsourcevid]);
00192 req.vidversion.push_back(localstore.vertex_version(localsourcevid));
00193 vertex_conditional_store vs;
00194 vs.hasdata = localstore.vertex_modified(localsourcevid);
00195 if (vs.hasdata) {
00196 localstore.set_vertex_modified(localsourcevid, false);
00197 vs.data.first = localstore.vertex_data(localsourcevid);
00198 vs.data.second = localstore.vertex_version(localsourcevid);
00199 }
00200 requests[sourceowner].second=req.vid.end();
00201 req.vstore.push_back(vs);
00202 }
00203 }
00204
00205 foreach(edge_id_t localouteid, localstore.out_edge_ids(localvid)) {
00206 vertex_id_t localtargetvid = localstore.target(localouteid);
00207 procid_t targetowner = localvid2owner[localstore.target(localouteid)];
00208
00209 if (localvid_is_ghost(localtargetvid)) {
00210 block_synchronize_request2 &req = requests[targetowner].first;
00211
00212
00213 if (std::binary_search(req.vid.begin(), requests[targetowner].second, local2globalvid[localtargetvid]) == false) {
00214 req.vid.push_back(local2globalvid[localtargetvid]);
00215 req.vidversion.push_back(localstore.vertex_version(localtargetvid));
00216 vertex_conditional_store vs;
00217 vs.hasdata = localstore.vertex_modified(localtargetvid);
00218 if (vs.hasdata) {
00219 localstore.set_vertex_modified(localtargetvid, false);
00220 vs.data.first = localstore.vertex_data(localtargetvid);
00221 vs.data.second = localstore.vertex_version(localtargetvid);
00222 }
00223 req.vstore.push_back(vs);
00224 }
00225
00226
00227 req.srcdest.push_back(std::pair<vertex_id_t, vertex_id_t>(vid, local2globalvid[localtargetvid]));
00228 req.edgeversion.push_back(localstore.edge_version(localouteid));
00229 edge_conditional_store es;
00230 es.hasdata = localstore.edge_modified(localouteid);
00231 if (es.hasdata) {
00232 localstore.set_edge_modified(localouteid, false);
00233 es.data.first = localstore.edge_data(localouteid);
00234 es.data.second = localstore.edge_version(localouteid);
00235 }
00236 req.estore.push_back(es);
00237 }
00238 }
00239 }
00240
00241
00242
00243 template <typename VertexData, typename EdgeData>
00244 void distributed_graph<VertexData, EdgeData>::async_synchronize_scope_callback(vertex_id_t vid,
00245 boost::function<void (void)> callback){
00246 vertex_id_t localvid = global2localvid[vid];
00247 ASSERT_TRUE(scope_callbacks[localvid].callback == NULL);
00248 ASSERT_TRUE(boundary_scopes_set().find(vid) != boundary_scopes_set().end());
00249
00250 scope_callbacks[localvid].callback = callback;
00251
00252 std::map<procid_t, std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> > requests;
00253 synchronize_scope_construct_req(vid, requests);
00254
00255
00256 typename std::map<procid_t,
00257 std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> >::iterator iter;
00258 iter = requests.begin();
00259
00260 ASSERT_EQ(scope_callbacks[localvid].counter.value, 0);
00261 scope_callbacks[localvid].counter.inc(requests.size());
00262 while(iter != requests.end()) {
00263
00264
00265 rmi.remote_call(iter->first,
00266 &distributed_graph<VertexData, EdgeData>::async_get_alot2,
00267 rmi.procid(),
00268 iter->second.first,
00269 0,
00270 localvid);
00271 ++iter;
00272 }
00273
00274
00275 }
00276
00277
00278
00279
00280 template <typename VertexData, typename EdgeData>
00281 void distributed_graph<VertexData, EdgeData>::wait_for_all_async_syncs() {
00282 pending_async_updates.wait();
00283 }
00284
00285
00286
00287 template <typename VertexData, typename EdgeData>
00288 typename distributed_graph<VertexData, EdgeData>::vertex_conditional_store
00289 distributed_graph<VertexData, EdgeData>::get_vertex_if_version_less_than(vertex_id_t vid,
00290 uint64_t vertexversion,
00291 vertex_conditional_store &vdata) {
00292 vertex_conditional_store ret;
00293 size_t localvid = global2localvid[vid];
00294 uint64_t local_vertex_version = localstore.vertex_version(localvid);
00295
00296 ASSERT_EQ(localvid2owner[localvid], rmi.procid());
00297 #ifdef DGRAPH_DEBUG
00298 logstream(LOG_DEBUG) << "get vertex: " << vid << ":" << vertexversion << " vs " << local_vertex_version << ". " << vdata.hasdata << std::endl;
00299 #endif
00300 ret.hasdata = false;
00301
00302 if (local_vertex_version > vertexversion) {
00303
00304 ret.hasdata = true;
00305 ret.data.first = localstore.vertex_data(localvid);
00306 ret.data.second = local_vertex_version;
00307 }
00308 else if (local_vertex_version == vertexversion) {
00309
00310 if (vdata.hasdata) {
00311 localstore.increment_and_update_vertex(localvid, vdata.data.first);
00312 }
00313 }
00314 else {
00315 logstream(LOG_FATAL) << "Remote node attempted to update vertex "
00316 << vid << " with a newer version than the owner" << std::endl;
00317 }
00318 return ret;
00319 }
00320
00321
00322 template <typename VertexData, typename EdgeData>
00323 typename distributed_graph<VertexData, EdgeData>::edge_conditional_store
00324 distributed_graph<VertexData, EdgeData>::get_edge_if_version_less_than2(vertex_id_t source,
00325 vertex_id_t target,
00326 uint64_t edgeversion,
00327 edge_conditional_store &edata) {
00328 edge_conditional_store ret;
00329 size_t localsource = global2localvid[source];
00330 size_t localtarget = global2localvid[target];
00331
00332
00333 std::pair<bool, edge_id_t> findret = localstore.find(localsource, localtarget);
00334 assert(findret.first);
00335 edge_id_t localeid = findret.second;
00336
00337 uint64_t local_edge_version = localstore.edge_version(localeid);
00338 ret.hasdata = false;
00339 #ifdef DGRAPH_DEBUG
00340
00341 logstream(LOG_DEBUG) << "get edge2: " << "(" << source << "->" << target << ")" << ":" << edgeversion << " vs " << local_edge_version << ". " << edata.hasdata << std::endl;
00342 #endif
00343 if (local_edge_version > edgeversion) {
00344 ret.hasdata = true;
00345 ret.data.first = localstore.edge_data(localeid);
00346 ret.data.second = local_edge_version;
00347 }
00348 else if (local_edge_version == edgeversion) {
00349 if (edata.hasdata) {
00350 localstore.increment_and_update_edge(localeid, edata.data.first);
00351 }
00352 }
00353 else {
00354 logstream(LOG_FATAL) << "Remote node attempted to update edge ("
00355 << source << ", " << target << ") with a newer version than the owner" << std::endl;
00356 }
00357 return ret;
00358 }
00359
00360
00361 template <typename VertexData, typename EdgeData>
00362 void distributed_graph<VertexData, EdgeData>::async_get_vertex_if_version_less_than(
00363 procid_t srcproc,
00364 vertex_id_t vid,
00365 uint64_t vertexversion,
00366 vertex_conditional_store &vdata) {
00367 rmi.remote_call(srcproc,
00368 &distributed_graph<VertexData, EdgeData>::reply_vertex_data_and_version,
00369 vid,
00370 get_vertex_if_version_less_than(vid, vertexversion, vdata));
00371 }
00372
00373 template <typename VertexData, typename EdgeData>
00374 void distributed_graph<VertexData, EdgeData>::async_get_edge_if_version_less_than2(
00375 procid_t srcproc,
00376 vertex_id_t source,
00377 vertex_id_t target,
00378 uint64_t edgeversion,
00379 edge_conditional_store &edata) {
00380 rmi.remote_call(srcproc,
00381 &distributed_graph<VertexData, EdgeData>::reply_edge_data_and_version2,
00382 source,
00383 target,
00384 get_edge_if_version_less_than2(source, target, edgeversion, edata));
00385 }
00386
00387
00388 template <typename VertexData, typename EdgeData>
00389 typename distributed_graph<VertexData, EdgeData>::block_synchronize_request2&
00390 distributed_graph<VertexData, EdgeData>::get_alot2(
00391 distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request) {
00392 std::vector<vertex_conditional_store> vresponse(request.vid.size());
00393 std::vector<edge_conditional_store> eresponse(request.srcdest.size());
00394 for (size_t i = 0;i < request.vid.size(); ++i) {
00395 request.vstore[i] = get_vertex_if_version_less_than(request.vid[i],
00396 request.vidversion[i],
00397 request.vstore[i]);
00398 }
00399 for (size_t i = 0;i < request.srcdest.size(); ++i) {
00400 request.estore[i] = get_edge_if_version_less_than2(request.srcdest[i].first,
00401 request.srcdest[i].second,
00402 request.edgeversion[i],
00403 request.estore[i]);
00404 }
00405 request.vidversion.clear();
00406 request.edgeversion.clear();
00407 return request;
00408 }
00409
00410 template <typename VertexData, typename EdgeData>
00411 void distributed_graph<VertexData, EdgeData>::async_get_alot2(
00412 procid_t srcproc,
00413 distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request,
00414 size_t replytarget,
00415 size_t tag) {
00416 get_alot2(request);
00417 rmi.remote_call(srcproc,
00418 &distributed_graph<VertexData, EdgeData>::reply_alot2,
00419 request,
00420 replytarget,
00421 tag);
00422 }
00423
00424 template <typename VertexData, typename EdgeData>
00425 void distributed_graph<VertexData, EdgeData>::reply_vertex_data_and_version(
00426 vertex_id_t vid,
00427 distributed_graph<VertexData, EdgeData>::vertex_conditional_store &vstore) {
00428 if (vstore.hasdata) update_vertex_data_and_version(vid, vstore);
00429
00430 reply_increment_counter(rmi.dc(), 0,
00431 reinterpret_cast<size_t>(&pending_async_updates), dc_impl::blob());
00432 }
00433
00434 template <typename VertexData, typename EdgeData>
00435 void distributed_graph<VertexData, EdgeData>::reply_edge_data_and_version2(
00436 vertex_id_t source,
00437 vertex_id_t target,
00438 distributed_graph<VertexData, EdgeData>::edge_conditional_store &estore) {
00439 if (estore.hasdata) update_edge_data_and_version2(source, target, estore);
00440 reply_increment_counter(rmi.dc(), 0,
00441 reinterpret_cast<size_t>(&pending_async_updates), dc_impl::blob());
00442
00443 }
00444
00445
00446
00447 template <typename VertexData, typename EdgeData>
00448 void distributed_graph<VertexData, EdgeData>::update_vertex_data_and_version(
00449 vertex_id_t vid,
00450 distributed_graph<VertexData, EdgeData>::vertex_conditional_store &vstore) {
00451 vertex_id_t localvid = global2localvid[vid];
00452
00453 ASSERT_NE(localvid2owner[localvid], rmi.procid());
00454 #ifdef DGRAPH_DEBUG
00455 logstream(LOG_DEBUG) << "Receiving vertex " << vid << "(" << localvid << ") " << ". "
00456 << vstore.data.second << " vs " << localstore.vertex_version(localvid) << std::endl;
00457 #endif
00458 localstore.conditional_update_vertex(localvid, vstore.data.first, vstore.data.second);
00459 }
00460
00461 template <typename VertexData, typename EdgeData>
00462 void distributed_graph<VertexData, EdgeData>::update_edge_data_and_version2(
00463 vertex_id_t source,
00464 vertex_id_t target,
00465 distributed_graph<VertexData, EdgeData>::edge_conditional_store &estore) {
00466 if (estore.hasdata) {
00467 vertex_id_t localsourcevid = global2localvid[source];
00468 vertex_id_t localtargetvid = global2localvid[target];
00469 ASSERT_NE(localvid2owner[localtargetvid], rmi.procid());
00470 std::pair<bool, edge_id_t> findret = localstore.find(localsourcevid, localtargetvid);
00471
00472 assert(findret.first);
00473 #ifdef DGRAPH_DEBUG
00474 logstream(LOG_DEBUG) << "Receiving edge (" << source << ","<<target << ") " << ". "
00475 << estore.data.second << " vs " << localstore.edge_version(findret.second) << std::endl;
00476 #endif
00477
00478 localstore.conditional_update_edge(findret.second, estore.data.first, estore.data.second);
00479 }
00480 }
00481
00482
00483 template <typename VertexData, typename EdgeData>
00484 void distributed_graph<VertexData, EdgeData>::update_alot2(
00485 distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request) {
00486 for (size_t i = 0;i < request.vid.size(); ++i) {
00487 update_vertex_data_and_version(request.vid[i], request.vstore[i]);
00488 }
00489
00490 for (size_t i = 0;i < request.srcdest.size(); ++i) {
00491 update_edge_data_and_version2(request.srcdest[i].first, request.srcdest[i].second, request.estore[i]);
00492 }
00493 }
00494
00495
00496 template <typename VertexData, typename EdgeData>
00497 void distributed_graph<VertexData, EdgeData>::reply_alot2(
00498 distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request,
00499 size_t replytarget,
00500 size_t tag) {
00501 update_alot2(request);
00502
00503
00504 if (replytarget != 0) {
00505 reply_increment_counter(rmi.dc(), 0,
00506 replytarget, dc_impl::blob());
00507 }
00508 else {
00509
00510 vertex_id_t localvid = tag;
00511
00512 ASSERT_TRUE(scope_callbacks[localvid].callback != NULL);
00513 if (scope_callbacks[localvid].counter.dec() == 0) {
00514
00515 boost::function<void (void)> tmp = scope_callbacks[localvid].callback;
00516 scope_callbacks[localvid].callback = NULL;
00517 tmp();
00518 }
00519 }
00520 }
00521
00522
00523 template <typename VertexData, typename EdgeData>
00524 void distributed_graph<VertexData, EdgeData>::synchronize_all_vertices(bool async) {
00525 foreach(vertex_id_t vid, ghostvertices) {
00526 synchronize_vertex(vid, async);
00527 }
00528 }
00529
00530 template <typename VertexData, typename EdgeData>
00531 void distributed_graph<VertexData, EdgeData>::synchronize_all_edges(bool async) {
00532 foreach(vertex_id_t vid, ghostvertices) {
00533 foreach(edge_id_t eid, localstore.in_edge_ids(global2localvid[vid])) {
00534 synchronize_edge(eid, async);
00535 }
00536 }
00537 }
00538
00539 template <typename VertexData, typename EdgeData>
00540 void distributed_graph<VertexData, EdgeData>::synchronize_all_scopes(bool async) {
00541 foreach(vertex_id_t vid, boundaryscopes) {
00542 synchronize_scope(vid, async);
00543 }
00544 }
00545
00546
00547
00548 template <typename VertexData, typename EdgeData>
00549 void distributed_graph<VertexData, EdgeData>::update_vertex_data_and_version_and_reply(
00550 vertex_id_t vid,
00551 distributed_graph<VertexData, EdgeData>::vertex_conditional_store &vstore,
00552 procid_t srcproc,
00553 size_t reply) {
00554
00555 update_vertex_data_and_version(vid, vstore);
00556
00557 if (srcproc != procid_t(-1)) {
00558 rmi.dc().remote_call(srcproc, reply_increment_counter, reply, dc_impl::blob());
00559 }
00560 }
00561
00562 template <typename VertexData, typename EdgeData>
00563 void distributed_graph<VertexData, EdgeData>::update_edge_data_and_version_and_reply2(
00564 vertex_id_t source,
00565 vertex_id_t target,
00566 distributed_graph<VertexData, EdgeData>::edge_conditional_store &estore,
00567 procid_t srcproc, size_t reply) {
00568 update_edge_data_and_version2(source, target, estore);
00569 if (srcproc != procid_t(-1)) {
00570 rmi.dc().remote_call(srcproc, reply_increment_counter, reply, dc_impl::blob());
00571 }
00572 }
00573
00574 template <typename VertexData, typename EdgeData>
00575 void distributed_graph<VertexData, EdgeData>::push_owned_vertex_to_replicas(vertex_id_t vid, bool async, bool untracked) {
00576 vertex_id_t localvid = global2localvid[vid];
00577
00578 const std::vector<procid_t>& replicas = localvid_to_replicas(localvid);
00579
00580 if (replicas.size() <= 1) return;
00581
00582 dc_impl::reply_ret_type ret(true, replicas.size() - 1);
00583
00584
00585 size_t retptr;
00586 if (async == false) {
00587 retptr = reinterpret_cast<size_t>(&ret);
00588 }
00589 else {
00590 retptr = reinterpret_cast<size_t>(&pending_push_updates);
00591 pending_push_updates.flag.inc(replicas.size() - 1);
00592 }
00593
00594
00595
00596
00597
00598 procid_t srcprocid;
00599 if (untracked == false) srcprocid = rmi.procid();
00600 else srcprocid = procid_t(-1);
00601
00602 vertex_conditional_store vstore;
00603 vstore.hasdata = true;
00604 vstore.data.first = localstore.vertex_data(localvid);
00605 vstore.data.second = localstore.vertex_version(localvid);
00606
00607 foreach(procid_t proc, replicas) {
00608 if (proc != rmi.procid()) {
00609 #ifdef DGRAPH_DEBUG
00610 logger(LOG_DEBUG, "Pushing vertex %d to proc %d", vid, proc);
00611 #endif
00612 rmi.remote_call(proc,
00613 &distributed_graph<VertexData, EdgeData>::update_vertex_data_and_version_and_reply,
00614 vid,
00615 vstore,
00616 srcprocid,
00617 retptr);
00618 }
00619 }
00620 if (async == false && untracked == false) ret.wait();
00621 }
00622
00623
00624 template <typename VertexData, typename EdgeData>
00625 void distributed_graph<VertexData, EdgeData>::push_owned_edge_to_replicas(edge_id_t eid, bool async, bool untracked) {
00626
00627 vertex_id_t globalsource = source(eid);
00628 vertex_id_t globaltarget = target(eid);
00629
00630 if (!is_owned(globaltarget)) return;
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640 procid_t sendto;
00641
00642 if (is_owned(globalsource)) {
00643 return;
00644 }
00645 else {
00646
00647 sendto = localvid2owner[localstore.source(eid)];
00648 }
00649
00650 dc_impl::reply_ret_type ret(true, 1);
00651
00652 size_t retptr;
00653 if (async == false) {
00654 retptr = reinterpret_cast<size_t>(&ret);
00655 }
00656 else {
00657 retptr = reinterpret_cast<size_t>(&pending_push_updates);
00658 }
00659
00660
00661
00662
00663
00664 procid_t srcprocid;
00665 if (untracked == false) srcprocid = rmi.procid();
00666 else srcprocid = procid_t(-1);
00667
00668 edge_conditional_store estore;
00669 estore.hasdata = true;
00670 estore.data.first = localstore.edge_data(eid);
00671 estore.data.second = localstore.edge_version(eid);
00672
00673 #ifdef DGRAPH_DEBUG
00674 logstream(LOG_DEBUG) << "Pushing edge (" << globalsource << ", " << globaltarget << ") to proc " << sendto << std::endl;
00675 #endif
00676 rmi.remote_call(sendto,
00677 &distributed_graph<VertexData, EdgeData>::update_edge_data_and_version_and_reply2,
00678 globalsource,
00679 globaltarget,
00680 estore,
00681 srcprocid,
00682 retptr);
00683
00684 if (async == false && untracked == false) ret.wait();
00685 }
00686
00687
00688 template <typename VertexData, typename EdgeData>
00689 void distributed_graph<VertexData, EdgeData>::push_all_owned_vertices_to_replicas() {
00690
00691 std::vector<block_synchronize_request2> blockpushes(rmi.numprocs());
00692
00693 foreach(vertex_id_t vid, boundary_scopes()) {
00694
00695 vertex_id_t localvid = global2localvid[vid];
00696
00697 const std::vector<procid_t>& replicas = localvid_to_replicas(localvid);
00698
00699 if (replicas.size() <= 1) continue;
00700
00701
00702
00703 vertex_conditional_store vstore;
00704 vstore.hasdata = true;
00705 vstore.data.first = localstore.vertex_data(localvid);
00706 vstore.data.second = localstore.vertex_version(localvid);
00707
00708 foreach(procid_t proc, replicas) {
00709 if (proc != rmi.procid()) {
00710 blockpushes[proc].vid.push_back(vid);
00711 blockpushes[proc].vidversion.push_back(localstore.vertex_version(localvid));
00712 blockpushes[proc].vstore.push_back(vstore);
00713 if (blockpushes[proc].vid.size() >= 1024*1024/sizeof(VertexData)) {
00714 rmi.remote_call(proc,
00715 &distributed_graph<VertexData, EdgeData>::update_alot2,
00716 blockpushes[proc]);
00717 blockpushes[proc].clear();
00718 }
00719 }
00720 }
00721 }
00722
00723 for(size_t proc = 0; proc < rmi.numprocs(); ++proc) {
00724 if (blockpushes[proc].vid.size() > 0) {
00725 assert(proc != rmi.procid());
00726 rmi.remote_call(proc,
00727 &distributed_graph<VertexData, EdgeData>::update_alot2,
00728 blockpushes[proc]);
00729 blockpushes[proc].clear();
00730 }
00731 }
00732 }
00733
00734 template <typename VertexData, typename EdgeData>
00735 void distributed_graph<VertexData, EdgeData>::push_all_owned_edges_to_replicas() {
00736 std::vector<std::vector<block_synchronize_request2> > blockpushes(omp_get_max_threads());
00737 for (size_t i = 0;i < blockpushes.size(); ++i) blockpushes[i].resize(rmi.numprocs());
00738
00739
00740 #pragma omp parallel for
00741 for (long i = 0;i < (long)ghostvertices.size(); ++i) {
00742 int thrid = omp_get_thread_num();
00743
00744 vertex_id_t vid = ghost_vertices()[i];
00745 vertex_id_t localvid = global2localvid[vid];
00746 procid_t proc = localvid2owner[localvid];
00747 foreach(edge_id_t localeid, localstore.out_edge_ids(localvid)) {
00748 vertex_id_t targetvid = local2globalvid[localstore.target(localeid)];
00749 edge_conditional_store estore;
00750 estore.hasdata = true;
00751 estore.data.first = localstore.edge_data(localeid);
00752 estore.data.second = localstore.edge_version(localeid);
00753
00754
00755 blockpushes[thrid][proc].srcdest.push_back(std::make_pair<vertex_id_t, vertex_id_t>(vid, targetvid));
00756 blockpushes[thrid][proc].edgeversion.push_back(localstore.edge_version(localeid));
00757 blockpushes[thrid][proc].estore.push_back(estore);
00758 if (blockpushes[thrid][proc].srcdest.size() >= 1*1024*1024/sizeof(EdgeData)) {
00759 rmi.remote_call(proc,
00760 &distributed_graph<VertexData, EdgeData>::update_alot2,
00761 blockpushes[thrid][proc]);
00762 blockpushes[thrid][proc].clear();
00763 }
00764 }
00765 }
00766 for (size_t i = 0;i < blockpushes.size(); ++i) {
00767 for(size_t proc = 0; proc < rmi.numprocs(); ++proc) {
00768 if (blockpushes[i][proc].srcdest.size() > 0) {
00769 assert(proc != rmi.procid());
00770 rmi.remote_call(proc,
00771 &distributed_graph<VertexData, EdgeData>::update_alot2,
00772 blockpushes[i][proc]);
00773 blockpushes[i][proc].clear();
00774 }
00775 }
00776 }
00777 }
00778
00779
00780 template <typename VertexData, typename EdgeData>
00781 void distributed_graph<VertexData, EdgeData>::push_owned_scope_to_replicas(vertex_id_t vid,
00782 bool onlymodified,
00783 bool clearmodified,
00784 bool async,
00785 bool untracked) {
00786
00787 if (boundaryscopesset.find(vid) == boundaryscopesset.end()) return;
00788 if (0) {
00789 if (is_owned(vid)) {
00790 vertex_id_t localvid = global2localvid[vid];
00791 if (localstore.vertex_modified(localvid)) {
00792 localstore.set_vertex_modified(localvid, false);
00793 push_owned_vertex_to_replicas(vid, async, untracked);
00794
00795 }
00796 foreach(edge_id_t eid, in_edge_ids(vid)) {
00797 if (localstore.edge_modified(eid)) {
00798 localstore.set_edge_modified(eid, false);
00799 push_owned_edge_to_replicas(eid, async, untracked);
00800 }
00801 }
00802 }
00803 }
00804 else {
00805 if (is_owned(vid)) {
00806 push_owned_vertex_to_replicas(vid, async, untracked);
00807 foreach(edge_id_t eid, in_edge_ids(vid)) {
00808 push_owned_edge_to_replicas(eid, async, untracked);
00809 }
00810 }
00811 }
00812 }
00813
00814 template <typename VertexData, typename EdgeData>
00815 void distributed_graph<VertexData, EdgeData>::wait_for_all_async_pushes() {
00816 pending_push_updates.wait();
00817 }
00818
00819 #endif
00820
00821