distributed_graph_synchronization.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef FROM_DISTRIBUTED_GRAPH_INCLUDE
00019 #warning "distributed_graph_synchronization.hpp should not be included directly."
00020 #warning "You should include only distributed_graph.hpp"
00021 #warning "I will fix this for you now, but don't do it again!"
00022 
00023 #include <graphlab/distributed2/graph/distributed_graph.hpp>
00024 #include <boost/range/iterator_range.hpp>
00025 #else
00026 
00027 //#define DGRAPH_DEBUG
00028 
00029 /**
00030  * synchronize the data on vertex with global id vid
00031  * vid must be a ghost
00032  */
00033 template <typename VertexData, typename EdgeData>
00034 void distributed_graph<VertexData, EdgeData>::synchronize_vertex(vertex_id_t vid, bool async) {
00035   vertex_id_t localvid = global2localvid[vid];
00036   if (is_ghost(vid)) {
00037     vertex_conditional_store out;
00038     out.hasdata = localstore.vertex_modified(localvid);
00039     if (out.hasdata) {
00040       localstore.set_vertex_modified(localvid, false);
00041       out.data.first = localstore.vertex_data(localvid);
00042     }
00043     if (async == false) {
00044       vertex_conditional_store v;
00045       v = rmi.remote_request(localvid2owner[localvid],
00046                             &distributed_graph<VertexData, EdgeData>::get_vertex_if_version_less_than,
00047                             vid,
00048                             localstore.vertex_version(localvid),
00049                             out);
00050      if (v.hasdata) {
00051         update_vertex_data_and_version(vid, v);
00052       }
00053     }
00054     else {
00055       pending_async_updates.flag.inc();
00056       rmi.remote_call(localvid2owner[localvid],
00057                        &distributed_graph<VertexData, EdgeData>::async_get_vertex_if_version_less_than,
00058                        rmi.procid(),
00059                        vid,
00060                        localstore.vertex_version(localvid),
00061                        out);
00062     }
00063   }
00064 }
00065 
00066 /**
00067  * synchronize the data on edge with global id eid
00068  * target of edge must be a ghost
00069  */
00070 template <typename VertexData, typename EdgeData>
00071 void distributed_graph<VertexData, EdgeData>::synchronize_edge(edge_id_t eid, bool async) {
00072   vertex_id_t localtargetvid = localstore.target(eid);
00073   vertex_id_t targetvid = local2globalvid[localtargetvid];
00074   vertex_id_t localsourcevid = localstore.source(eid);
00075   vertex_id_t sourcevid = local2globalvid[localsourcevid];
00076 
00077   if (is_ghost(targetvid)) {
00078 
00079     edge_conditional_store out;
00080     out.hasdata = localstore.edge_modified(eid);
00081     if (out.hasdata) {
00082       localstore.set_edge_modified(eid, false);
00083       out.data.first = localstore.edge_data(eid);
00084     }
00085     if (async == false) {
00086       edge_conditional_store e = rmi.remote_request(localvid2owner[localtargetvid],
00087                                                     &distributed_graph<VertexData, EdgeData>::get_edge_if_version_less_than2,
00088                                                     sourcevid,
00089                                                     targetvid,
00090                                                     localstore.edge_version(eid),
00091                                                     out);
00092       if (e.hasdata) {
00093         update_edge_data_and_version2(sourcevid, targetvid, e);
00094       }
00095     }
00096     else {
00097       pending_async_updates.flag.inc();
00098       rmi.remote_call(localvid2owner[localtargetvid],
00099                         &distributed_graph<VertexData, EdgeData>::async_get_edge_if_version_less_than2,
00100                         rmi.procid(),
00101                         sourcevid,
00102                         targetvid,
00103                         localstore.edge_version(eid),
00104                         out);
00105     }
00106   }
00107 }
00108 
00109 template <typename VertexData, typename EdgeData>
00110 void distributed_graph<VertexData, EdgeData>::synchronize_scope(vertex_id_t vid, bool async) {
00111   // construct he requests
00112   std::map<procid_t, std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> > requests;
00113   synchronize_scope_construct_req(vid, requests);
00114   
00115   if (async) {
00116     // if asynchronous, the reply goes to pending_async_updates
00117     typename std::map<procid_t, 
00118           std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> >::iterator iter;
00119     iter = requests.begin();
00120     size_t replytarget = reinterpret_cast<size_t>(&pending_async_updates);
00121     pending_async_updates.flag.inc(requests.size());
00122 
00123     while(iter != requests.end()) {
00124       rmi.remote_call(iter->first,
00125                       &distributed_graph<VertexData, EdgeData>::async_get_alot2,
00126                       rmi.procid(),
00127                       iter->second.first,
00128                       replytarget,
00129                       0);
00130       ++iter;
00131     }
00132   }
00133   else {
00134     // otherwise we collect it into a local reply ret tye
00135     dc_impl::reply_ret_type reply(true, 0);
00136     typename std::map<procid_t, 
00137           std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> >::iterator iter;
00138     iter = requests.begin();
00139     size_t replytarget = reinterpret_cast<size_t>(&reply);
00140     reply.flag.inc(requests.size());
00141 
00142     while(iter != requests.end()) {
00143       rmi.remote_call(iter->first,
00144                       &distributed_graph<VertexData, EdgeData>::async_get_alot2,
00145                       rmi.procid(),
00146                       iter->second.first,
00147                       replytarget,
00148                       0);
00149       ++iter;
00150     }
00151 
00152     reply.wait();
00153   }
00154   
00155   
00156   
00157   
00158   
00159 }
00160 
00161 
00162 
00163 /**
00164  * Constructs the request for synchronizing the scope for vertex vid
00165  * vid must be owned by the current machine. 
00166  */
00167 template <typename VertexData, typename EdgeData>
00168 void distributed_graph<VertexData, EdgeData>::synchronize_scope_construct_req(vertex_id_t vid, 
00169       std::map<procid_t, std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> > &requests) {
00170   ASSERT_FALSE(is_ghost(vid));
00171   if (boundaryscopesset.find(vid) == boundaryscopesset.end()) return;
00172 
00173   // now this is very annoying. A significant amount of code is identical here.
00174   // whether with edge canonical numbering on or not. But I cannot refactor it
00175   // easily because the types are different and I don't really want to introduce
00176   // templates here for something so trivial.
00177   
00178     vertex_id_t localvid = global2localvid[vid];
00179     requests.clear();
00180     // I should have all the in-edges. but I need the in vertices.
00181     // need to track the vertices added so I don't add duplicate vertices
00182     // if the vertex has both in-out edges to this vertex.
00183     // trick! vids are ordered!
00184 
00185   foreach(edge_id_t localineid, localstore.in_edge_ids(localvid)) {
00186     vertex_id_t localsourcevid = localstore.source(localineid);
00187     if (localvid_is_ghost(localsourcevid)) {
00188       // need to synchronize incoming vertex
00189       procid_t sourceowner = localvid2owner[localsourcevid];
00190       block_synchronize_request2 &req = requests[sourceowner].first;
00191       req.vid.push_back(local2globalvid[localsourcevid]);
00192       req.vidversion.push_back(localstore.vertex_version(localsourcevid));
00193       vertex_conditional_store vs;
00194       vs.hasdata = localstore.vertex_modified(localsourcevid);
00195       if (vs.hasdata) {
00196         localstore.set_vertex_modified(localsourcevid, false);
00197         vs.data.first = localstore.vertex_data(localsourcevid);
00198         vs.data.second = localstore.vertex_version(localsourcevid);
00199       }
00200       requests[sourceowner].second=req.vid.end();
00201       req.vstore.push_back(vs);
00202     }
00203   }
00204   // now for the out edges
00205   foreach(edge_id_t localouteid, localstore.out_edge_ids(localvid)) {
00206     vertex_id_t localtargetvid = localstore.target(localouteid);
00207     procid_t targetowner = localvid2owner[localstore.target(localouteid)];
00208 
00209     if (localvid_is_ghost(localtargetvid)) {
00210       block_synchronize_request2 &req = requests[targetowner].first;
00211       // need to synchronize outgoing vertex and outgoing edge
00212       // do outgoing vertex first
00213       if (std::binary_search(req.vid.begin(), requests[targetowner].second, local2globalvid[localtargetvid]) == false) {
00214         req.vid.push_back(local2globalvid[localtargetvid]);
00215         req.vidversion.push_back(localstore.vertex_version(localtargetvid));
00216         vertex_conditional_store vs;
00217         vs.hasdata = localstore.vertex_modified(localtargetvid);
00218         if (vs.hasdata) {
00219           localstore.set_vertex_modified(localtargetvid, false);
00220           vs.data.first = localstore.vertex_data(localtargetvid);
00221           vs.data.second = localstore.vertex_version(localtargetvid);
00222         }
00223         req.vstore.push_back(vs);
00224       }
00225       // now for the outgoing edge
00226       
00227       req.srcdest.push_back(std::pair<vertex_id_t, vertex_id_t>(vid, local2globalvid[localtargetvid]));
00228       req.edgeversion.push_back(localstore.edge_version(localouteid));
00229       edge_conditional_store es;
00230       es.hasdata = localstore.edge_modified(localouteid);
00231       if (es.hasdata) {
00232         localstore.set_edge_modified(localouteid, false);
00233         es.data.first = localstore.edge_data(localouteid);
00234         es.data.second = localstore.edge_version(localouteid);
00235       }
00236       req.estore.push_back(es);
00237     }
00238   }
00239 }
00240 
00241 
00242 
00243 template <typename VertexData, typename EdgeData> 
00244 void distributed_graph<VertexData, EdgeData>::async_synchronize_scope_callback(vertex_id_t vid, 
00245                                                 boost::function<void (void)> callback){
00246   vertex_id_t localvid = global2localvid[vid];
00247   ASSERT_TRUE(scope_callbacks[localvid].callback == NULL);
00248   ASSERT_TRUE(boundary_scopes_set().find(vid) != boundary_scopes_set().end());
00249   // register the callback
00250   scope_callbacks[localvid].callback = callback;
00251   // construct the requests
00252   std::map<procid_t, std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> > requests;
00253   synchronize_scope_construct_req(vid, requests);
00254   
00255   //send the stuff
00256   typename std::map<procid_t, 
00257         std::pair<block_synchronize_request2, std::vector<vertex_id_t>::iterator> >::iterator iter;
00258   iter = requests.begin();
00259   
00260   ASSERT_EQ(scope_callbacks[localvid].counter.value, 0);
00261   scope_callbacks[localvid].counter.inc(requests.size());
00262   while(iter != requests.end()) {
00263 
00264     // the reply target is 0. see reply_alot2
00265     rmi.remote_call(iter->first,
00266                     &distributed_graph<VertexData, EdgeData>::async_get_alot2,
00267                     rmi.procid(),
00268                     iter->second.first,
00269                     0,
00270                     localvid);
00271     ++iter;
00272   }
00273 
00274 
00275 }
00276 
00277 /**
00278  * Waits for all asynchronous data synchronizations to complete
00279  */
00280 template <typename VertexData, typename EdgeData> 
00281 void distributed_graph<VertexData, EdgeData>::wait_for_all_async_syncs() {
00282   pending_async_updates.wait();
00283 }
00284 
00285 
00286 
00287 template <typename VertexData, typename EdgeData> 
00288 typename distributed_graph<VertexData, EdgeData>::vertex_conditional_store 
00289 distributed_graph<VertexData, EdgeData>::get_vertex_if_version_less_than(vertex_id_t vid,
00290                                                          uint64_t  vertexversion,
00291                                                          vertex_conditional_store &vdata) {
00292   vertex_conditional_store ret;
00293   size_t localvid = global2localvid[vid];
00294   uint64_t local_vertex_version = localstore.vertex_version(localvid);
00295   // Now I must the the owner of this vertex
00296   ASSERT_EQ(localvid2owner[localvid], rmi.procid());
00297   #ifdef DGRAPH_DEBUG
00298   logstream(LOG_DEBUG) << "get vertex: " << vid << ":" << vertexversion << " vs " << local_vertex_version << ". " << vdata.hasdata << std::endl;
00299   #endif
00300   ret.hasdata = false;
00301   
00302   if (local_vertex_version  > vertexversion) {
00303     // send if I have a later version
00304     ret.hasdata = true;
00305     ret.data.first = localstore.vertex_data(localvid);
00306     ret.data.second = local_vertex_version;
00307   }
00308   else if (local_vertex_version == vertexversion) {
00309     // if version is the same and there is data, store and increment the version    
00310     if (vdata.hasdata) {
00311       localstore.increment_and_update_vertex(localvid, vdata.data.first);
00312     }
00313   }
00314   else {
00315     logstream(LOG_FATAL) << "Remote node attempted to update vertex " 
00316                         << vid << " with a newer version than the owner" << std::endl;
00317   }
00318   return ret;
00319 }
00320 
00321 
00322 template <typename VertexData, typename EdgeData> 
00323 typename distributed_graph<VertexData, EdgeData>::edge_conditional_store 
00324 distributed_graph<VertexData, EdgeData>::get_edge_if_version_less_than2(vertex_id_t source,
00325                                                       vertex_id_t target,
00326                                                       uint64_t  edgeversion,
00327                                                       edge_conditional_store &edata) {
00328   edge_conditional_store ret;
00329   size_t localsource = global2localvid[source];
00330   size_t localtarget = global2localvid[target];
00331 
00332 
00333   std::pair<bool, edge_id_t> findret = localstore.find(localsource, localtarget);
00334   assert(findret.first);
00335   edge_id_t localeid = findret.second;
00336   
00337   uint64_t  local_edge_version = localstore.edge_version(localeid);
00338   ret.hasdata = false;
00339   #ifdef DGRAPH_DEBUG
00340   
00341   logstream(LOG_DEBUG) << "get edge2: " << "(" << source << "->" << target << ")" << ":" << edgeversion << " vs " << local_edge_version << ". " << edata.hasdata << std::endl;
00342   #endif
00343   if (local_edge_version > edgeversion) {
00344     ret.hasdata = true;
00345     ret.data.first = localstore.edge_data(localeid);
00346     ret.data.second = local_edge_version;
00347   }
00348   else if (local_edge_version == edgeversion) {
00349     if (edata.hasdata) {
00350       localstore.increment_and_update_edge(localeid, edata.data.first);
00351     }
00352   }
00353   else {
00354     logstream(LOG_FATAL) << "Remote node attempted to update edge (" 
00355                         << source <<  ", " << target << ") with a newer version than the owner" << std::endl;
00356   }
00357   return ret;
00358 }
00359 
00360 
00361 template <typename VertexData, typename EdgeData> 
00362 void distributed_graph<VertexData, EdgeData>::async_get_vertex_if_version_less_than(
00363                                                               procid_t srcproc, 
00364                                                               vertex_id_t vid, 
00365                                                               uint64_t vertexversion,
00366                                                               vertex_conditional_store &vdata) {
00367   rmi.remote_call(srcproc,
00368                   &distributed_graph<VertexData, EdgeData>::reply_vertex_data_and_version,
00369                   vid,
00370                   get_vertex_if_version_less_than(vid, vertexversion, vdata));
00371 }
00372 
00373 template <typename VertexData, typename EdgeData> 
00374 void distributed_graph<VertexData, EdgeData>::async_get_edge_if_version_less_than2(
00375                                               procid_t srcproc, 
00376                                               vertex_id_t source, 
00377                                               vertex_id_t target, 
00378                                               uint64_t edgeversion,
00379                                               edge_conditional_store &edata) {
00380   rmi.remote_call(srcproc,
00381                   &distributed_graph<VertexData, EdgeData>::reply_edge_data_and_version2,
00382                   source,
00383                   target,
00384                   get_edge_if_version_less_than2(source, target, edgeversion, edata));
00385 }
00386 
00387 
00388 template <typename VertexData, typename EdgeData> 
00389 typename distributed_graph<VertexData, EdgeData>::block_synchronize_request2& 
00390 distributed_graph<VertexData, EdgeData>::get_alot2(
00391                           distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request) {
00392   std::vector<vertex_conditional_store> vresponse(request.vid.size());
00393   std::vector<edge_conditional_store> eresponse(request.srcdest.size());
00394   for (size_t i = 0;i < request.vid.size(); ++i) {
00395     request.vstore[i] = get_vertex_if_version_less_than(request.vid[i], 
00396                                                         request.vidversion[i], 
00397                                                         request.vstore[i]);
00398   }
00399   for (size_t i = 0;i < request.srcdest.size(); ++i) {
00400     request.estore[i] = get_edge_if_version_less_than2(request.srcdest[i].first, 
00401                                                        request.srcdest[i].second, 
00402                                                        request.edgeversion[i], 
00403                                                        request.estore[i]);
00404   }
00405   request.vidversion.clear();
00406   request.edgeversion.clear();
00407   return request;
00408 }
00409 
00410 template <typename VertexData, typename EdgeData> 
00411 void distributed_graph<VertexData, EdgeData>::async_get_alot2(
00412                     procid_t srcproc,
00413                     distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request,
00414                     size_t replytarget,
00415                     size_t tag) {
00416   get_alot2(request);
00417   rmi.remote_call(srcproc,
00418                   &distributed_graph<VertexData, EdgeData>::reply_alot2,
00419                   request,
00420                   replytarget, 
00421                   tag);
00422 }
00423 
00424 template <typename VertexData, typename EdgeData> 
00425 void distributed_graph<VertexData, EdgeData>::reply_vertex_data_and_version(
00426                         vertex_id_t vid, 
00427                         distributed_graph<VertexData, EdgeData>::vertex_conditional_store &vstore) {
00428   if (vstore.hasdata) update_vertex_data_and_version(vid, vstore);
00429   // the dc and procid are meaningless. Just pass something
00430   reply_increment_counter(rmi.dc(), 0, 
00431                           reinterpret_cast<size_t>(&pending_async_updates), dc_impl::blob());
00432 }
00433 
00434 template <typename VertexData, typename EdgeData> 
00435 void distributed_graph<VertexData, EdgeData>::reply_edge_data_and_version2(
00436                   vertex_id_t source, 
00437                   vertex_id_t target, 
00438                   distributed_graph<VertexData, EdgeData>::edge_conditional_store &estore) {
00439   if (estore.hasdata) update_edge_data_and_version2(source, target, estore);
00440   reply_increment_counter(rmi.dc(), 0, 
00441                           reinterpret_cast<size_t>(&pending_async_updates), dc_impl::blob());
00442 
00443 }
00444 
00445 
00446 
00447 template <typename VertexData, typename EdgeData> 
00448 void distributed_graph<VertexData, EdgeData>::update_vertex_data_and_version(
00449                         vertex_id_t vid, 
00450                         distributed_graph<VertexData, EdgeData>::vertex_conditional_store &vstore) {
00451   vertex_id_t localvid = global2localvid[vid];
00452   // this must be a ghost
00453   ASSERT_NE(localvid2owner[localvid], rmi.procid());
00454 #ifdef DGRAPH_DEBUG
00455   logstream(LOG_DEBUG) << "Receiving vertex " << vid << "(" << localvid << ")  "  << ". "
00456                        << vstore.data.second << " vs " << localstore.vertex_version(localvid) << std::endl;
00457 #endif
00458   localstore.conditional_update_vertex(localvid, vstore.data.first, vstore.data.second);
00459 }
00460 
00461 template <typename VertexData, typename EdgeData> 
00462 void distributed_graph<VertexData, EdgeData>::update_edge_data_and_version2(
00463                           vertex_id_t source, 
00464                           vertex_id_t target, 
00465                           distributed_graph<VertexData, EdgeData>::edge_conditional_store &estore) {
00466   if (estore.hasdata) {
00467     vertex_id_t localsourcevid = global2localvid[source];
00468     vertex_id_t localtargetvid = global2localvid[target];
00469     ASSERT_NE(localvid2owner[localtargetvid], rmi.procid());
00470     std::pair<bool, edge_id_t> findret = localstore.find(localsourcevid, localtargetvid);
00471     
00472     assert(findret.first);
00473     #ifdef DGRAPH_DEBUG
00474     logstream(LOG_DEBUG) << "Receiving edge (" << source << ","<<target << ")  " << ". "
00475                        << estore.data.second << " vs " << localstore.edge_version(findret.second) << std::endl;
00476     #endif
00477 
00478     localstore.conditional_update_edge(findret.second, estore.data.first, estore.data.second);
00479   }
00480 }
00481 
00482 
00483 template <typename VertexData, typename EdgeData> 
00484 void distributed_graph<VertexData, EdgeData>::update_alot2(
00485                   distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request) {
00486   for (size_t i = 0;i < request.vid.size(); ++i) {
00487     update_vertex_data_and_version(request.vid[i], request.vstore[i]);
00488   }
00489 
00490   for (size_t i = 0;i < request.srcdest.size(); ++i) {
00491     update_edge_data_and_version2(request.srcdest[i].first, request.srcdest[i].second, request.estore[i]);
00492   }
00493 }
00494 
00495 
00496 template <typename VertexData, typename EdgeData> 
00497 void distributed_graph<VertexData, EdgeData>::reply_alot2(
00498                     distributed_graph<VertexData, EdgeData>::block_synchronize_request2 &request,
00499                     size_t replytarget,
00500                     size_t tag) {
00501   update_alot2(request);
00502   
00503   // special handling for callbacks
00504   if (replytarget != 0)  {
00505     reply_increment_counter(rmi.dc(), 0, 
00506                           replytarget, dc_impl::blob());  
00507   }
00508   else {
00509     // tag is local vid
00510     vertex_id_t localvid = tag;
00511 
00512     ASSERT_TRUE(scope_callbacks[localvid].callback != NULL);
00513     if (scope_callbacks[localvid].counter.dec() == 0) {
00514       // make a copy of it and clear the callbacks entry.
00515       boost::function<void (void)> tmp = scope_callbacks[localvid].callback;
00516       scope_callbacks[localvid].callback = NULL;
00517       tmp();
00518     }
00519   } 
00520 }
00521 
00522 
00523 template <typename VertexData, typename EdgeData>
00524 void distributed_graph<VertexData, EdgeData>::synchronize_all_vertices(bool async) {
00525   foreach(vertex_id_t vid, ghostvertices) {
00526     synchronize_vertex(vid, async);
00527   }
00528 }
00529 
00530 template <typename VertexData, typename EdgeData>
00531 void distributed_graph<VertexData, EdgeData>::synchronize_all_edges(bool async) {
00532   foreach(vertex_id_t vid, ghostvertices) {
00533     foreach(edge_id_t eid, localstore.in_edge_ids(global2localvid[vid])) {
00534       synchronize_edge(eid, async);
00535     }
00536   }
00537 }
00538 
00539 template <typename VertexData, typename EdgeData>
00540 void distributed_graph<VertexData, EdgeData>::synchronize_all_scopes(bool async) {
00541   foreach(vertex_id_t vid, boundaryscopes) {
00542     synchronize_scope(vid, async);
00543   }
00544 }
00545 
00546 
00547 
00548 template <typename VertexData, typename EdgeData> 
00549 void distributed_graph<VertexData, EdgeData>::update_vertex_data_and_version_and_reply(
00550                         vertex_id_t vid, 
00551                         distributed_graph<VertexData, EdgeData>::vertex_conditional_store &vstore,
00552                         procid_t srcproc,
00553                         size_t reply) {
00554                         
00555   update_vertex_data_and_version(vid, vstore);
00556 
00557   if (srcproc != procid_t(-1)) {
00558     rmi.dc().remote_call(srcproc, reply_increment_counter, reply, dc_impl::blob());
00559   }
00560 }
00561 
00562 template <typename VertexData, typename EdgeData> 
00563 void distributed_graph<VertexData, EdgeData>::update_edge_data_and_version_and_reply2(
00564                           vertex_id_t source, 
00565                           vertex_id_t target, 
00566                           distributed_graph<VertexData, EdgeData>::edge_conditional_store &estore,
00567                           procid_t srcproc, size_t reply) {
00568   update_edge_data_and_version2(source, target, estore);
00569   if (srcproc != procid_t(-1)) {
00570     rmi.dc().remote_call(srcproc, reply_increment_counter, reply, dc_impl::blob());
00571   }
00572 }
00573 
00574 template <typename VertexData, typename EdgeData>
00575 void distributed_graph<VertexData, EdgeData>::push_owned_vertex_to_replicas(vertex_id_t vid, bool async, bool untracked) {
00576   vertex_id_t localvid = global2localvid[vid];
00577   // get the replicas
00578   const std::vector<procid_t>& replicas = localvid_to_replicas(localvid);
00579   // owner is a replica too. if there are no other replicas quit
00580   if (replicas.size() <= 1) return;
00581   
00582   dc_impl::reply_ret_type ret(true, replicas.size() - 1);
00583   
00584   // if async, set the return reply to go to the global pending push updates
00585   size_t retptr;
00586   if (async == false) {
00587     retptr = reinterpret_cast<size_t>(&ret);
00588   }
00589   else {
00590     retptr = reinterpret_cast<size_t>(&pending_push_updates);
00591     pending_push_updates.flag.inc(replicas.size() - 1);
00592   }
00593   
00594   /**
00595   If untracked, set the reply procid to -1. That will mean that
00596   no reply will be returned at all
00597   */
00598   procid_t srcprocid;
00599   if (untracked == false) srcprocid = rmi.procid();
00600   else srcprocid = procid_t(-1);
00601   // build the store
00602   vertex_conditional_store vstore;
00603   vstore.hasdata = true;
00604   vstore.data.first = localstore.vertex_data(localvid);
00605   vstore.data.second = localstore.vertex_version(localvid);
00606   
00607  foreach(procid_t proc, replicas) {
00608     if (proc != rmi.procid()) {
00609 #ifdef DGRAPH_DEBUG
00610       logger(LOG_DEBUG, "Pushing vertex %d to proc %d", vid, proc);
00611 #endif
00612       rmi.remote_call(proc,
00613                       &distributed_graph<VertexData, EdgeData>::update_vertex_data_and_version_and_reply,
00614                       vid,
00615                       vstore,
00616                       srcprocid,
00617                       retptr);
00618     }
00619   }
00620   if (async == false && untracked == false)  ret.wait();
00621 }
00622 
00623 
00624 template <typename VertexData, typename EdgeData>
00625 void distributed_graph<VertexData, EdgeData>::push_owned_edge_to_replicas(edge_id_t eid, bool async, bool untracked) {
00626   // get the replicas
00627   vertex_id_t globalsource = source(eid);
00628   vertex_id_t globaltarget = target(eid);
00629   // firstly I must own this edge
00630   if (!is_owned(globaltarget)) return;
00631   
00632   // Now, there are 2 cases. 
00633   // Case 1, I own both source and target.
00634   //         in that case there is nothing to sync. Any other
00635   //         machine at most has ghosts of source and target, but will
00636   //         not have the end itself
00637   // Case 2, I own the target, but not the source
00638   //         then the only replica is the owner of the source
00639 
00640   procid_t sendto;
00641   
00642   if (is_owned(globalsource)) {
00643     return;
00644   }
00645   else {
00646     // otherwise, it is the owner of the source
00647     sendto = localvid2owner[localstore.source(eid)];
00648   }
00649 
00650   dc_impl::reply_ret_type ret(true, 1);  
00651   // if async, set the return reply to go to the global pending push updates
00652   size_t retptr;
00653   if (async == false) {
00654     retptr = reinterpret_cast<size_t>(&ret);
00655   }
00656   else {
00657     retptr = reinterpret_cast<size_t>(&pending_push_updates);
00658   }
00659   
00660   /**
00661   If untracked, set the reply procid to -1. That will mean that
00662   no reply will be returned at all
00663   */
00664   procid_t srcprocid;
00665   if (untracked == false) srcprocid = rmi.procid();
00666   else srcprocid = procid_t(-1);
00667   // build the store
00668   edge_conditional_store estore;
00669   estore.hasdata = true;
00670   estore.data.first = localstore.edge_data(eid);
00671   estore.data.second = localstore.edge_version(eid);
00672   
00673 #ifdef DGRAPH_DEBUG
00674   logstream(LOG_DEBUG) << "Pushing edge (" << globalsource << ", " << globaltarget << ") to proc " << sendto << std::endl;
00675 #endif
00676   rmi.remote_call(sendto,
00677                   &distributed_graph<VertexData, EdgeData>::update_edge_data_and_version_and_reply2,
00678                   globalsource,
00679                   globaltarget,
00680                   estore,
00681                   srcprocid,
00682                   retptr);
00683 
00684   if (async == false && untracked == false)  ret.wait();
00685 }
00686 
00687 
00688 template <typename VertexData, typename EdgeData>
00689 void distributed_graph<VertexData, EdgeData>::push_all_owned_vertices_to_replicas() {
00690   // perform block collective
00691   std::vector<block_synchronize_request2> blockpushes(rmi.numprocs()); 
00692 
00693   foreach(vertex_id_t vid, boundary_scopes()) {
00694 
00695     vertex_id_t localvid = global2localvid[vid];
00696     // get the replicas
00697     const std::vector<procid_t>& replicas = localvid_to_replicas(localvid);
00698     // owner is a replica too. if there are no other replicas quit
00699     if (replicas.size() <= 1) continue;
00700   
00701  
00702    // build the store
00703     vertex_conditional_store vstore;
00704     vstore.hasdata = true;
00705     vstore.data.first = localstore.vertex_data(localvid);
00706     vstore.data.second = localstore.vertex_version(localvid);
00707   
00708     foreach(procid_t proc, replicas) {
00709       if (proc != rmi.procid()) {
00710         blockpushes[proc].vid.push_back(vid);
00711         blockpushes[proc].vidversion.push_back(localstore.vertex_version(localvid));
00712         blockpushes[proc].vstore.push_back(vstore);
00713         if (blockpushes[proc].vid.size() >= 1024*1024/sizeof(VertexData)) {
00714           rmi.remote_call(proc,
00715                           &distributed_graph<VertexData, EdgeData>::update_alot2,
00716                           blockpushes[proc]);
00717           blockpushes[proc].clear();
00718         }
00719       }
00720     }
00721   }
00722 
00723   for(size_t proc = 0; proc < rmi.numprocs(); ++proc) {
00724     if (blockpushes[proc].vid.size() > 0) {
00725       assert(proc != rmi.procid());
00726       rmi.remote_call(proc,
00727                       &distributed_graph<VertexData, EdgeData>::update_alot2,
00728                       blockpushes[proc]);
00729       blockpushes[proc].clear();
00730     }
00731   }
00732 }
00733 
00734 template <typename VertexData, typename EdgeData>
00735 void distributed_graph<VertexData, EdgeData>::push_all_owned_edges_to_replicas() {
00736   std::vector<std::vector<block_synchronize_request2> > blockpushes(omp_get_max_threads()); 
00737   for (size_t i = 0;i < blockpushes.size(); ++i) blockpushes[i].resize(rmi.numprocs());
00738   
00739   
00740   #pragma omp parallel for
00741   for (long i = 0;i < (long)ghostvertices.size(); ++i) {
00742     int thrid = omp_get_thread_num();
00743     
00744     vertex_id_t vid = ghost_vertices()[i];
00745     vertex_id_t localvid = global2localvid[vid];
00746     procid_t proc = localvid2owner[localvid];
00747     foreach(edge_id_t localeid, localstore.out_edge_ids(localvid)) {
00748       vertex_id_t targetvid = local2globalvid[localstore.target(localeid)];
00749       edge_conditional_store estore;
00750       estore.hasdata = true;
00751       estore.data.first = localstore.edge_data(localeid);
00752       estore.data.second = localstore.edge_version(localeid);
00753   
00754 
00755       blockpushes[thrid][proc].srcdest.push_back(std::make_pair<vertex_id_t, vertex_id_t>(vid, targetvid));
00756       blockpushes[thrid][proc].edgeversion.push_back(localstore.edge_version(localeid));
00757       blockpushes[thrid][proc].estore.push_back(estore);
00758       if (blockpushes[thrid][proc].srcdest.size() >= 1*1024*1024/sizeof(EdgeData)) {
00759         rmi.remote_call(proc,
00760                         &distributed_graph<VertexData, EdgeData>::update_alot2,
00761                         blockpushes[thrid][proc]);
00762         blockpushes[thrid][proc].clear();
00763       }
00764     }
00765   }
00766   for (size_t i = 0;i < blockpushes.size(); ++i) {
00767     for(size_t proc = 0; proc < rmi.numprocs(); ++proc) {
00768       if (blockpushes[i][proc].srcdest.size() > 0) {
00769         assert(proc != rmi.procid());
00770         rmi.remote_call(proc,
00771                         &distributed_graph<VertexData, EdgeData>::update_alot2,
00772                         blockpushes[i][proc]);
00773         blockpushes[i][proc].clear();
00774       }
00775     }
00776   }
00777 }
00778 
00779 
00780 template <typename VertexData, typename EdgeData>
00781 void distributed_graph<VertexData, EdgeData>::push_owned_scope_to_replicas(vertex_id_t vid, 
00782                                                                             bool onlymodified, 
00783                                                                             bool clearmodified, 
00784                                                                             bool async,
00785                                                                             bool untracked) {
00786   // fast exit if this is not on a boundary
00787   if (boundaryscopesset.find(vid) == boundaryscopesset.end()) return;
00788   if (0) {
00789    if (is_owned(vid)) {
00790       vertex_id_t localvid = global2localvid[vid];
00791       if (localstore.vertex_modified(localvid)) {
00792         localstore.set_vertex_modified(localvid, false);
00793         push_owned_vertex_to_replicas(vid, async, untracked);
00794         
00795       }
00796       foreach(edge_id_t eid, in_edge_ids(vid)) {
00797         if (localstore.edge_modified(eid)) {
00798           localstore.set_edge_modified(eid, false);
00799           push_owned_edge_to_replicas(eid, async, untracked);
00800         }
00801       }
00802     }
00803   }
00804   else {
00805     if (is_owned(vid)) {
00806       push_owned_vertex_to_replicas(vid, async, untracked);
00807       foreach(edge_id_t eid, in_edge_ids(vid)) {
00808         push_owned_edge_to_replicas(eid, async, untracked);
00809       }
00810     }
00811   }
00812 }
00813 
00814 template <typename VertexData, typename EdgeData>
00815 void distributed_graph<VertexData, EdgeData>::wait_for_all_async_pushes() {
00816   pending_push_updates.wait();
00817 }
00818 
00819 #endif
00820 
00821