Distributed Detailed Example

In this section we provide a brief tutorial on how the pieces of GraphLab come together to form a simple GraphLab program. In this tutorial we construct a synthetic application which uses many of the GraphLab concepts. However, many real GraphLab applications will not need all the pieces described in this tutorial. See dist_demo.cpp for the complete source.

This is very to the Detailed Example but modified for the distributed version. Depending on the complexity of your program, moving from the shared memory version to the distributed version can be quite mechanical. See Distributed GraphLab for a condensed list of the changes you have to make.

The Problem

We begin by first describing the synthetic problem we will be solving. We are given a d by d grid of magnetic coins. Each coin only interacts with its 4 neighboring coins as illustrated by the edges in Figure 1(a). When a coin is flipped one of the following two outcomes occurs:

Mixed Grid Mixed Grid Mixed Grid
(a) (b) (c)
This picture illustrates the grid of red/black coins. Each coin is connected to its 4 cardinal neighbors (as shown by the edges). (a) Illustrates an intermediate state of the system. (b,c) The two stable states of the system.

The only two stable joint states are when all coins are black, Figure 1(b), or where all coins are red, Figure 1(c). We are interested in computing the average number of flips for each coin before a stable state is reached.

Includes

We first need to include some headers: distributed_graphlab.hpp includes everything you will ever need for Distributed GraphLab.

// standard C++ headers 
#include <iostream> 
// includes the entire graphlab framework 
#include <distributed_graphlab.hpp> 

Getting Started with The Data Graph

The first step to designing a GraphLab program is setting up the data graph. To do this we will need to define the data elements and their dependencies. The primary data element in this simple program is the data at each vertex which records the number of flips so far and the current color of the vertex. Here we will assume the color red is true and the color black is false.

struct vertex_data { 
  size_t numflips; 
  bool color; 
}; 

Distributed GraphLab requires the vertex and edge data types to be serializable (Serialization ) since the data must be transmittable across the network. To do so, you must implement a save and load function so that GraphLab can understand your data-structures. The serialization mechanism is simple to use and it understands all basic datatypes as well as standard STL containers. If the STL container contains non-basic datatypes (such as a struct), save/load functions must be written for the datatype. If we wanted to be able to save the graph to a file we would implement the following functions. See Serialization for more details.

struct vertex_data { 
  size_t numflips; 
  bool color; 

  void save(graphlab::oarchive& archive) const { 
    archive << numflips << color; 
  } 

  void load(graphlab::iarchive& archive) { 
    archive >> numflips >> color; 
  } 
};

In this example, we do not need edge data. However GraphLab currently does not have a mechanism to completely disable the use of edge data. Therefore, we will just put an arbitrary small placeholder type on the edges.

typedef char edge_data;

Note that we do not need to write a save/load function here since GraphLab's serializer already understands basic datatypes. One could imagine an alternative problem where edge weights are associated with each pair of interacting coins.

GraphLab Typedefs

The GraphLab graph is templatized over the vertex data as well as the edge data. Here we define the type of the graph using a typedef for convenience.

typedef graphlab::distributed_graph<vertex_data, edge_data> graph_type;

Since graphlab is heavily templatized and can be inconvenient to use in its standard form, the graphlab::distributed_types structure provides convenient typedefed "shortcuts" to figure out the other graphlab types easily.

typedef graphlab::distributed_types<graph_type> gl;

The Graph Data Structure

The next step in constructing a GraphLab program is to construct the actual graph. Distributed Graphlab requires that the data graph be represented as a disk graph for input. And as stated in Graph Creation , there are three basic methods of creating a graph. Here, we will use the first method (Conversion of In Memory Graph to Disk Graph) where the graph is fully instantiated in memory before writing it out to disk.

We define a simple init_graph function which takes a gl::graph type (gl::graph is an in memory graph. The distributed graph is gl::distributed_graph), and creates a grid.

void init_graph(gl::graph& g, size_t dim) {
  for (size_t i = 0; i < dim * dim; ++i) { 
    // create the vertex data, randomizing the color 
    vertex_data vdata; 
    vdata.numflips = 0; 
    // Flip a uniform coin to obtain the initial color 
    if (graphlab::random::rand_int(1) == 1) vdata.color = true; 
    else vdata.color = false; 
    // create the vertex 
    g.add_vertex(vdata); 
  }
  edge_data edata; 
  for (size_t i = 0; i < dim; ++i) { 
    for (size_t j = 0; j < dim - 1; ++j) { 
      // add the horizontal edges in both directions 
      g.add_edge(dim * i + j, dim * i + j + 1, edata); 
      g.add_edge(dim * i + j + 1, dim * i + j, edata); 
      // add the vertical edges in both directions 
      g.add_edge(dim * j + i, dim * (j + 1) + i, edata); 
      g.add_edge(dim * (j + 1) + i, dim * j + i, edata); 
    } 
  }
  g.finalize(); 
} // end of init_graph function

We will use this function to create the disk graph a little later.

The Update Function

Now we define the update function which represents the basic block of computation in this program. The update function is applied to each vertex and has read/write access to the data at that vertex, as well as all adjacent edges and vertices. You may specify more than one update function, but we only need one for this application. Lets first outline the computation that will occur in the update function. Below is a list of the steps:

Each update function call has the option of inserting new tasks into the scheduler: in this case, its self and its neighbors. The algorithm terminates when there are no tasks remaining. There are other methods for terminating execution, such as registering a termination valuator with the engine, but we are not going to describe that here. We now present the entire update function and then discuss each part individually.

void update_function(gl::iscope& scope,
                     gl::icallback& scheduler) {
  // Get a reference to the vertex data an in edges
  vertex_data& curvdata = scope.vertex_data();
  gl::edge_list in_edges = scope.in_edge_ids();
  // Count the number of red neighbors
  size_t num_red_neighbors = 0;  
  for (size_t i = 0; i < in_edges.size(); ++i) {
    // eid is the current edge id
    size_t eid = in_edges[i];    
    size_t sourcev = scope.source(eid);
    const vertex_data& nbrvertex = scope.neighbor_vertex_data(sourcev);
    if (nbrvertex.color) ++num_red_neighbors;
  }
  // get the total number of neighbors we have
  size_t num_neighbors = in_edges.size();
  // Draw the new color
  bool new_color =
    graphlab::random::rand01() < (double(num_red_neighbors) / num_neighbors);
  // Determine if the draw was deterministic
  bool is_deterministic =
    num_neighbors == num_red_neighbors || num_red_neighbors == 0;
  // see if I flip and update the current vertex data.
  bool color_changed = new_color != curvdata.color;
  if (color_changed) ++curvdata.numflips;
  // Assign the new color 
  curvdata.color = new_color;
  // If I flipped, all my neighbors could be affected, loop through 
  // all my neighboring vertices and add them as tasks. 
  if (color_changed) {
    for (size_t i = 0; i < in_edges.size(); ++i) {
      size_t sourcev = scope.source(in_edges[i]);
      scheduler.add_task(gl::update_task(sourcev, update_function),
                        1.0);
    }
  }
  // Reschedule myself if this was not a deterministic draw
  if (is_deterministic == false) {
    scheduler.add_task(gl::update_task(scope.vertex(), update_function),
                        1.0);
  }
}

All update functions must have the following form:

void update_function(gl::iscope& scope, 
                     gl::icallback& scheduler) {

The parameters are described here:

Scope:
The scope provides access to a local neighborhood of a graph and has the type graphlab::iscope . The scope is centered on a particular vertex, scope.vertex(), and includes all adjacent edges and vertices. All vertices are identified by an unsigned integer type vertex_id_t, and all edges are similarly identified by an unsigned integer type edge_id_t. GraphLab guarantees that all vertices are sequentially numbered from 0 (so the largest vertex id is |num_vertices| - 1), and similarly for edges. All edges are directed. The scope provides methods to access the data associated with the vertex its neighbors and any inbound or outbound edges.
Scheduler:
There are three basic types of schedulers. The first type consists only of the synchronous scheduler which is automatically used with the synchronous engine. The second type of schedulers are the static asynchronous scheduler like the chromatic scheduler. These static schedulers execute a fixed static schedule until some termination condition is reached. None of the first two types of schedulers used the scheduler callback. The last class of schedules are the task schedulers, which enable dynamic computation and can receive new tasks (update function, vertex pairs) from the update functions by using the icallback interface. The schedulers include fifo, multiqueue_fifo, priority, multiqueue_priority and clustered_priority.

First we get a mutable reference to the vertex data on this vertex.

  vertex_data& curvdata = scope.vertex_data();

Note that the function scope.vertex_data() returns a reference to the vertex data. Modifications to this reference will directly modify the data on the graph. Then we get a constant reference to the vector of edge ids for all edges inbound to this vertex:

  const std::vector<gl::edge_id_t>& in_edges = scope.in_edge_ids();

We now compute the total number of red neighbors. This is done by looping through all my neighboring vertices and counting the number of red vertices. To do this I have to look at my neighboring vertices data. The neighbor_vertex_data(vid) function allow me to read the vertex data of a vertex adjacent to the current vertex. If we have edge data, the function edge_data(eid) will return a reference to the edge data on the edge eid. Since I am not going to need to change this data, I can just grab a const reference. You should always try to use const references whenever you know that you will definitely not be changing the data, since GraphLab could make additional optimizations for "read-only" operations. Similarly, you should never cast a constant reference to a regular reference. Modifications to constant references have undefined behavior.

  // Count the number of red neighbors
  size_t num_red_neighbors = 0;  
  for (size_t i = 0; i < in_edges.size(); ++i) {
    size_t eid = in_edges[i];    
    size_t sourcev = scope.source(eid);
    const vertex_data& nbrvertex = scope.neighbor_vertex_data(sourcev);
    if (nbrvertex.color) ++num_red_neighbors;
  }

Now we can decide on the new color of the vertex. We either match the majority color, or in the case of no majority color, we flip a coin. Extensive random number support is provided through graphlab::random. random::rand01() provides a random floating point number between 0 and 1. There are a number of other distribution based generators available in Random Number Generators

  // Draw the new color
  bool new_color =
    graphlab::random::rand01() < (double(num_red_neighbors) / num_neighbors);

Now we have decided on the new color of the vertex, we can go ahead and update the value of the vertex. Once again recall that curvdata is a reference to the actual graph data. Therefore we can just modify it directly. Here we will track whether the flip was deterministic and whether a new color was chosen.

  // Determine if the draw was deterministic
  bool is_deterministic =
    num_neighbors == num_red_neighbors || num_red_neighbors == 0;
  // see if I flip and update the current vertex data.
  bool color_changed = new_color != curvdata.color;
  if (color_changed) ++curvdata.numflips;
  // Assign the new color 
  curvdata.color = new_color;

Now for the task creation algorithm. There are 2 basic cases:

  if (color_changed) {
    for (size_t i = 0; i < in_edges.size(); ++i) {
      size_t sourcev = scope.source(in_edges[i]);
      scheduler.add_task(gl::update_task(sourcev, update_function),
                        1.0);
    }
  }

Now, there is another special case. If flipped myself on a random number, then I could switch colors when updating myself again. Therefore I should try again and update myself again in the future

  // Reschedule myself if this was not a deterministic draw
  if (is_deterministic == false) {
    scheduler.add_task(gl::update_task(scope.vertex(), update_function),
                        1.0);
  }
} // end of update_function

Shared Variables

The shared variable system serves two roles. First, it provides access to data which is globally accessible through all update functions. Second, the shared variables provides the capability to compute aggregations of all graph data. The first capability may not appear to be useful in the shared memory setting since one could simply define global variables. However, using the shared data manager, allows GraphLab to manage these "global variables" in a platform independent fashion; such as in the distributed setting and will lead to more portable code. Additionally, the Shared variables system use an RCU mechanism to ensure safe access to shared data.

For this application, we are interested in having an incremental counter which provides the total number of flips executed so far, as well as computing the proportion of red vertices in the graph. We can achieve these tasks using the shared data Sync mechanism.

The Sync mechanism allows you to build a 'Fold / Reduce' operation across all the vertices in the graph, and store the results in a shared variable.

We will therefore define the following 3 shared variables:

gl::distributed_glshared<size_t> NUM_VERTICES;
gl::distributed_glshared<double> RED_PROPORTION;
gl::distributed_glshared<size_t> NUM_FLIPS;

Num Vertices

The number of vertices in the graph is a constant, and will not be changed through out execution of the GraphLab program. We simply just set its value using the set() function.

NUM_VERTICES.set(DIM * DIM);

To get the value of a shared variable:

size_t numvertices = NUM_VERTICES.get_val();

Note that unlike the shared memory version which has a glshared_const type, we do not yet have a distributed_glshared_const type. Users could either use distributed_glshared, or could use regular variables.

Red Proportion

A sync is defined by a pair of functions, a reducer, and an apply The reducer is exactly a fold over all the vertices, and the apply takes the final value at the end of the reduce, and performs whatever transformation it needs, before writing it into the Shared variable. For instance, an L2 sum can be computed by having the reducer add the squares of the values at each vertex, then the apply function performs the square root.

We will use this to implement the RED_PROPORTION sync. The way we will implement this is to use the reducer to count the number of Red vertices. The apply function will then divide the result by the value in the NUM_VERTICES table entry. The reducer is a function is of the form:

void reduce_red_proportion( gl::iscope& scope, 
                            graphlab::any& accumulator) {
scope
The scope on the vertex we are currently accessing.
accumulator
The input and output of the fold/reduce operation.

In this reducer, we will simply increment the accumulator if the color of the vertex is red.

  if (scope.vertex_data().color) accumulator.as<double>()++; 
}

The apply function takes the followng 2 parameters

void apply_red_proportion(graphlab::any& current_data, 
                          const graphlab::any& new_data) {
current_data
The current (old) value of the shared variable. Overwriting this will update the value of the shared variable. The type of the data matches the type of the shared variable.
new_data
The result of the reduce operation.

The reduced result in new_data, will be the count of the number of red vertices. We can get the total number of vertices from NUM_VERTICES, and compute the proportion of red vertices by dividing the number of red vertices by the total number of vertices.

  size_t numvertices = NUM_VERTICES.get(); 
  // new_data is the reduced result, which is the number 
  // of red vertices 
  double numred = new_data.as<double>(); 
  // compute the proportion 
  double proportion = numred / numvertices; 
  // here we can output something as a progress monitor 
  std::cout << "Red Proportion: " << proportion << std::endl; 
  // write the final result into the shared variable 
  current_data.as<double> = (double)proportion; 
}

Now that both reduce and apply functions have been defined, we can create the sync, by calling in the beginning of the program

core.set_sync(RED_PROPORTION, 
              reduce_red_proportion, 
              apply_red_proportion, 
              double(0), 
              100);

Here we have set the reduction proportion key to store the result of running reduce_red_proportion to fold over all the vertices with starting value double(0) and result applied by using apply_red_proprotion. The operation will be applied approximately every 100 updates.

Num Flips

GraphLab provides a number of predefined syncing operations which allow simple reductions / applies to be implemented very quickly. For instance, computing sums, sums of squares, etc. We will implement the NUM_FLIPS entry using one of these predefined operations. Since the vertex data could be any arbitrary type, the predefined operations typically require the user to provide a simple function which extracts the information of interest from the vertex data. In this case, we are interested the numflips field.

size_t get_flip(const vertex_data& v) { 
  return v.numflips; 
}

To create the sync, we use the set_sync function as well, but using functions from graphlab::glshared_sync_ops and graphlab::glshared_apply_ops. In this case, our reduction function is a simply "sum", while our apply function should do nothing more than copy the result of the reduction into the shared variable.

core.set_sync(NUM_FLIPS, 
              gl::glshared_sync_ops::sum<size_t, get_flip>, 
              gl::glshared_apply_ops::identity<size_t>, 
              size_t(0), 
              100);

Putting It Together

We can now write a simple init_shared_data function to take a reference to a core and initialize all the shared variables.

void init_shared_data(gl::core& core, size_t dim) { 
  NUM_VERTICES.set(dim * dim); 
  core.set_sync(RED_PROPORTION, 
                reduce_red_proportion, 
                apply_red_proportion, 
                double(0), 
                100); 
  core.set_sync(NUM_FLIPS, 
                gl::glshared_sync_ops::sum<size_t, get_flip>, 
                gl::glshared_apply_ops::identity<size_t>, 
                size_t(0), 
                100); 
} 

Merge Function

A merge operation can also be provided which will allow the sync operation to be parallelized. This is highly recommended.

If a merge function is defined for a sync, the set of vertices will be partitioned into a collection of disjoint sets. The sync function is then performed on each set in parallel, producing a number of partial results. These partial results are then combined with the merge function. Finally, the apply function is executed on the final result and written to the shared variable.

Merging Red Proportion

Since the intermediate results of the sync is simply a count of the number of red vertices seen so far, the merge is then just a sum.

void merge_red_proportion(graphlab::any& target, 
                          const graphlab::any& source) {
  target.as<double>() += source.as<double>();
}

To sync using the merge operation, the set_sync is called using:

core.set_sync(RED_PROPORTION, 
              reduce_red_proportion, 
              apply_red_proportion, 
              double(0), 
              100,
              merge_red_proportion);

Merging Red Proportion

Similarly, a number pre-defined merge functions are defined in graphlab::glshared_merge_ops , and the NUM_FLIPS sync can be defined in the same way.

core.set_sync(NUM_FLIPS, 
              gl::glshared_sync_ops::sum<size_t, get_flip>, 
              gl::glshared_apply_ops::identity<size_t>, 
              size_t(0), 
              100,
              gl::glshared_merge_ops::sum<size_t>);

The Main

The Main function where everything begins. Here, we will demonstrate the minimal code needed to start a GraphLab job using all the parts we defined above. We will use the GraphLab command line tools to setup a basic engine using command line options. We first present the complete main and then discuss each of the parts.

int main(int argc,  char *argv[]) {
  // Parse the command line using the command line options tool
  // and scope type on the command line
  graphlab::command_line_options opts;
  
  size_t dimensions = 20;
  bool makegraph = false;
  opts.use_distributed_options();
  opts.attach_option("dim", 
           &dimensions, size_t(20), 
           "the dimension of the grid");
  opts.attach_option("makegraph", 
           &makegraph, makegraph, 
           "Makes Graph");

  // parse the command line
  bool success = opts.parse(argc, argv);
  if(!success) {
    return EXIT_FAILURE;
  }
  
  if (makegraph) {
    // call init_graph to create the graph
    gl::graph g;
    init_graph(g, dimensions);
    std::vector<graphlab::vertex_id_t> parts;
    g.metis_partition (16, parts);
    graph_partition_to_atomindex(g, parts, "demograph");
    return 0;
  }

  graphlab::mpi_tools::init(argc, argv);
  graphlab::dc_init_param param;
  ASSERT_TRUE(graphlab::init_param_from_mpi(param));
  graphlab::distributed_control dc(param);

  
  // Display the values
  opts.print();

  // create a graphlab core which contains the graph, shared data, and
  // engine
  gl::distributed_core glcore(dc, "demograph.idx");

  // Initialize the core with the command line arguments
  glcore.set_engine_options(opts);
  glcore.build_engine();
  // call create shared_data to create the shared data
  init_shared_data(glcore, dimensions);

  // since we are using a task scheduler, we need to
  // to create tasks. otherwise the engine will just terminate immediately
  // there are DIM * DIM vertices
  glcore.add_task_to_all(update_function, 1.0);
  
  // Run the graphlab engine 
  double runtime = glcore.start();

  // output the runtime
  std::cout << "Completed in " << runtime << " seconds" << std::endl;

  // now we can look the values using the get() function
  size_t numberofflips = NUM_FLIPS.get_val();
  double redprop = RED_PROPORTION.get_val();

  // output some interesting statistics
  std::cout << "Number of flips: " <<  numberofflips << std::endl;
  std::cout << "Red prop: " << redprop << std::endl;

  // output the graph
  // note that here we take advantage of the fact that vertex insertion
  // gives sequential numberings
  if (dc.procid() == 0) {
    size_t ctr = 0;
    for (size_t i = 0;i < dimensions; ++i) {
      for (size_t j = 0;j < dimensions; ++j) {
        std::cout << size_t(glcore.graph().get_vertex_data(ctr).color) << " ";
        ++ctr;
      }
      std::cout << std::endl;
    }
  }
  graphlab::mpi_tools::finalize();
}