In this section we provide a brief tutorial on how the pieces of GraphLab come together to form a simple GraphLab program. In this tutorial we construct a synthetic application which uses many of the GraphLab concepts. However, many real GraphLab applications will not need all the pieces described in this tutorial. See dist_demo.cpp for the complete source.
This is very to the Detailed Example but modified for the distributed version. Depending on the complexity of your program, moving from the shared memory version to the distributed version can be quite mechanical. See Distributed GraphLab for a condensed list of the changes you have to make.
We begin by first describing the synthetic problem we will be solving. We are given a d by d grid of magnetic coins. Each coin only interacts with its 4 neighboring coins as illustrated by the edges in Figure 1(a). When a coin is flipped one of the following two outcomes occurs:
|
|
|
| (a) | (b) | (c) |
| This picture illustrates the grid of red/black coins. Each coin is connected to its 4 cardinal neighbors (as shown by the edges). (a) Illustrates an intermediate state of the system. (b,c) The two stable states of the system. | ||
The only two stable joint states are when all coins are black, Figure 1(b), or where all coins are red, Figure 1(c). We are interested in computing the average number of flips for each coin before a stable state is reached.
We first need to include some headers: distributed_graphlab.hpp includes everything you will ever need for Distributed GraphLab.
// standard C++ headers #include <iostream> // includes the entire graphlab framework #include <distributed_graphlab.hpp>
The first step to designing a GraphLab program is setting up the data graph. To do this we will need to define the data elements and their dependencies. The primary data element in this simple program is the data at each vertex which records the number of flips so far and the current color of the vertex. Here we will assume the color red is true and the color black is false.
struct vertex_data { size_t numflips; bool color; };
Distributed GraphLab requires the vertex and edge data types to be serializable (Serialization ) since the data must be transmittable across the network. To do so, you must implement a save and load function so that GraphLab can understand your data-structures. The serialization mechanism is simple to use and it understands all basic datatypes as well as standard STL containers. If the STL container contains non-basic datatypes (such as a struct), save/load functions must be written for the datatype. If we wanted to be able to save the graph to a file we would implement the following functions. See Serialization for more details.
struct vertex_data { size_t numflips; bool color; void save(graphlab::oarchive& archive) const { archive << numflips << color; } void load(graphlab::iarchive& archive) { archive >> numflips >> color; } };
In this example, we do not need edge data. However GraphLab currently does not have a mechanism to completely disable the use of edge data. Therefore, we will just put an arbitrary small placeholder type on the edges.
typedef char edge_data;
Note that we do not need to write a save/load function here since GraphLab's serializer already understands basic datatypes. One could imagine an alternative problem where edge weights are associated with each pair of interacting coins.
The GraphLab graph is templatized over the vertex data as well as the edge data. Here we define the type of the graph using a typedef for convenience.
typedef graphlab::distributed_graph<vertex_data, edge_data> graph_type;
Since graphlab is heavily templatized and can be inconvenient to use in its standard form, the graphlab::distributed_types structure provides convenient typedefed "shortcuts" to figure out the other graphlab types easily.
typedef graphlab::distributed_types<graph_type> gl;
The next step in constructing a GraphLab program is to construct the actual graph. Distributed Graphlab requires that the data graph be represented as a disk graph for input. And as stated in Graph Creation , there are three basic methods of creating a graph. Here, we will use the first method (Conversion of In Memory Graph to Disk Graph) where the graph is fully instantiated in memory before writing it out to disk.
We define a simple init_graph function which takes a gl::graph type (gl::graph is an in memory graph. The distributed graph is gl::distributed_graph), and creates a grid.
void init_graph(gl::graph& g, size_t dim) { for (size_t i = 0; i < dim * dim; ++i) { // create the vertex data, randomizing the color vertex_data vdata; vdata.numflips = 0; // Flip a uniform coin to obtain the initial color if (graphlab::random::rand_int(1) == 1) vdata.color = true; else vdata.color = false; // create the vertex g.add_vertex(vdata); } edge_data edata; for (size_t i = 0; i < dim; ++i) { for (size_t j = 0; j < dim - 1; ++j) { // add the horizontal edges in both directions g.add_edge(dim * i + j, dim * i + j + 1, edata); g.add_edge(dim * i + j + 1, dim * i + j, edata); // add the vertical edges in both directions g.add_edge(dim * j + i, dim * (j + 1) + i, edata); g.add_edge(dim * (j + 1) + i, dim * j + i, edata); } } g.finalize(); } // end of init_graph function
We will use this function to create the disk graph a little later.
Now we define the update function which represents the basic block of computation in this program. The update function is applied to each vertex and has read/write access to the data at that vertex, as well as all adjacent edges and vertices. You may specify more than one update function, but we only need one for this application. Lets first outline the computation that will occur in the update function. Below is a list of the steps:
Each update function call has the option of inserting new tasks into the scheduler: in this case, its self and its neighbors. The algorithm terminates when there are no tasks remaining. There are other methods for terminating execution, such as registering a termination valuator with the engine, but we are not going to describe that here. We now present the entire update function and then discuss each part individually.
void update_function(gl::iscope& scope, gl::icallback& scheduler) { // Get a reference to the vertex data an in edges vertex_data& curvdata = scope.vertex_data(); gl::edge_list in_edges = scope.in_edge_ids(); // Count the number of red neighbors size_t num_red_neighbors = 0; for (size_t i = 0; i < in_edges.size(); ++i) { // eid is the current edge id size_t eid = in_edges[i]; size_t sourcev = scope.source(eid); const vertex_data& nbrvertex = scope.neighbor_vertex_data(sourcev); if (nbrvertex.color) ++num_red_neighbors; } // get the total number of neighbors we have size_t num_neighbors = in_edges.size(); // Draw the new color bool new_color = graphlab::random::rand01() < (double(num_red_neighbors) / num_neighbors); // Determine if the draw was deterministic bool is_deterministic = num_neighbors == num_red_neighbors || num_red_neighbors == 0; // see if I flip and update the current vertex data. bool color_changed = new_color != curvdata.color; if (color_changed) ++curvdata.numflips; // Assign the new color curvdata.color = new_color; // If I flipped, all my neighbors could be affected, loop through // all my neighboring vertices and add them as tasks. if (color_changed) { for (size_t i = 0; i < in_edges.size(); ++i) { size_t sourcev = scope.source(in_edges[i]); scheduler.add_task(gl::update_task(sourcev, update_function), 1.0); } } // Reschedule myself if this was not a deterministic draw if (is_deterministic == false) { scheduler.add_task(gl::update_task(scope.vertex(), update_function), 1.0); } }
All update functions must have the following form:
void update_function(gl::iscope& scope,
gl::icallback& scheduler) {
The parameters are described here:
|num_vertices| - 1), and similarly for edges. All edges are directed. The scope provides methods to access the data associated with the vertex its neighbors and any inbound or outbound edges.First we get a mutable reference to the vertex data on this vertex.
vertex_data& curvdata = scope.vertex_data();
Note that the function scope.vertex_data() returns a reference to the vertex data. Modifications to this reference will directly modify the data on the graph. Then we get a constant reference to the vector of edge ids for all edges inbound to this vertex:
const std::vector<gl::edge_id_t>& in_edges = scope.in_edge_ids();
We now compute the total number of red neighbors. This is done by looping through all my neighboring vertices and counting the number of red vertices. To do this I have to look at my neighboring vertices data. The neighbor_vertex_data(vid) function allow me to read the vertex data of a vertex adjacent to the current vertex. If we have edge data, the function edge_data(eid) will return a reference to the edge data on the edge eid. Since I am not going to need to change this data, I can just grab a const reference. You should always try to use const references whenever you know that you will definitely not be changing the data, since GraphLab could make additional optimizations for "read-only" operations. Similarly, you should never cast a constant reference to a regular reference. Modifications to constant references have undefined behavior.
// Count the number of red neighbors size_t num_red_neighbors = 0; for (size_t i = 0; i < in_edges.size(); ++i) { size_t eid = in_edges[i]; size_t sourcev = scope.source(eid); const vertex_data& nbrvertex = scope.neighbor_vertex_data(sourcev); if (nbrvertex.color) ++num_red_neighbors; }
Now we can decide on the new color of the vertex. We either match the majority color, or in the case of no majority color, we flip a coin. Extensive random number support is provided through graphlab::random. random::rand01() provides a random floating point number between 0 and 1. There are a number of other distribution based generators available in Random Number Generators
// Draw the new color bool new_color = graphlab::random::rand01() < (double(num_red_neighbors) / num_neighbors);
Now we have decided on the new color of the vertex, we can go ahead and update the value of the vertex. Once again recall that curvdata is a reference to the actual graph data. Therefore we can just modify it directly. Here we will track whether the flip was deterministic and whether a new color was chosen.
// Determine if the draw was deterministic bool is_deterministic = num_neighbors == num_red_neighbors || num_red_neighbors == 0; // see if I flip and update the current vertex data. bool color_changed = new_color != curvdata.color; if (color_changed) ++curvdata.numflips; // Assign the new color curvdata.color = new_color;
Now for the task creation algorithm. There are 2 basic cases:
if (color_changed) { for (size_t i = 0; i < in_edges.size(); ++i) { size_t sourcev = scope.source(in_edges[i]); scheduler.add_task(gl::update_task(sourcev, update_function), 1.0); } }
Now, there is another special case. If flipped myself on a random number, then I could switch colors when updating myself again. Therefore I should try again and update myself again in the future
// Reschedule myself if this was not a deterministic draw if (is_deterministic == false) { scheduler.add_task(gl::update_task(scope.vertex(), update_function), 1.0); } } // end of update_function
The shared variable system serves two roles. First, it provides access to data which is globally accessible through all update functions. Second, the shared variables provides the capability to compute aggregations of all graph data. The first capability may not appear to be useful in the shared memory setting since one could simply define global variables. However, using the shared data manager, allows GraphLab to manage these "global variables" in a platform independent fashion; such as in the distributed setting and will lead to more portable code. Additionally, the Shared variables system use an RCU mechanism to ensure safe access to shared data.
For this application, we are interested in having an incremental counter which provides the total number of flips executed so far, as well as computing the proportion of red vertices in the graph. We can achieve these tasks using the shared data Sync mechanism.
The Sync mechanism allows you to build a 'Fold / Reduce' operation across all the vertices in the graph, and store the results in a shared variable.
We will therefore define the following 3 shared variables:
gl::distributed_glshared<size_t> NUM_VERTICES; gl::distributed_glshared<double> RED_PROPORTION; gl::distributed_glshared<size_t> NUM_FLIPS;
The number of vertices in the graph is a constant, and will not be changed through out execution of the GraphLab program. We simply just set its value using the set() function.
NUM_VERTICES.set(DIM * DIM);
To get the value of a shared variable:
size_t numvertices = NUM_VERTICES.get_val();
Note that unlike the shared memory version which has a glshared_const type, we do not yet have a distributed_glshared_const type. Users could either use distributed_glshared, or could use regular variables.
A sync is defined by a pair of functions, a reducer, and an apply The reducer is exactly a fold over all the vertices, and the apply takes the final value at the end of the reduce, and performs whatever transformation it needs, before writing it into the Shared variable. For instance, an L2 sum can be computed by having the reducer add the squares of the values at each vertex, then the apply function performs the square root.
We will use this to implement the RED_PROPORTION sync. The way we will implement this is to use the reducer to count the number of Red vertices. The apply function will then divide the result by the value in the NUM_VERTICES table entry. The reducer is a function is of the form:
void reduce_red_proportion( gl::iscope& scope, graphlab::any& accumulator) {
In this reducer, we will simply increment the accumulator if the color of the vertex is red.
if (scope.vertex_data().color) accumulator.as<double>()++; }
The apply function takes the followng 2 parameters
void apply_red_proportion(graphlab::any& current_data, const graphlab::any& new_data) {
The reduced result in new_data, will be the count of the number of red vertices. We can get the total number of vertices from NUM_VERTICES, and compute the proportion of red vertices by dividing the number of red vertices by the total number of vertices.
size_t numvertices = NUM_VERTICES.get(); // new_data is the reduced result, which is the number // of red vertices double numred = new_data.as<double>(); // compute the proportion double proportion = numred / numvertices; // here we can output something as a progress monitor std::cout << "Red Proportion: " << proportion << std::endl; // write the final result into the shared variable current_data.as<double> = (double)proportion; }
Now that both reduce and apply functions have been defined, we can create the sync, by calling in the beginning of the program
core.set_sync(RED_PROPORTION,
reduce_red_proportion,
apply_red_proportion,
double(0),
100);
Here we have set the reduction proportion key to store the result of running reduce_red_proportion to fold over all the vertices with starting value double(0) and result applied by using apply_red_proprotion. The operation will be applied approximately every 100 updates.
GraphLab provides a number of predefined syncing operations which allow simple reductions / applies to be implemented very quickly. For instance, computing sums, sums of squares, etc. We will implement the NUM_FLIPS entry using one of these predefined operations. Since the vertex data could be any arbitrary type, the predefined operations typically require the user to provide a simple function which extracts the information of interest from the vertex data. In this case, we are interested the numflips field.
size_t get_flip(const vertex_data& v) { return v.numflips; }
To create the sync, we use the set_sync function as well, but using functions from graphlab::glshared_sync_ops and graphlab::glshared_apply_ops. In this case, our reduction function is a simply "sum", while our apply function should do nothing more than copy the result of the reduction into the shared variable.
core.set_sync(NUM_FLIPS,
gl::glshared_sync_ops::sum<size_t, get_flip>,
gl::glshared_apply_ops::identity<size_t>,
size_t(0),
100);
We can now write a simple init_shared_data function to take a reference to a core and initialize all the shared variables.
void init_shared_data(gl::core& core, size_t dim) { NUM_VERTICES.set(dim * dim); core.set_sync(RED_PROPORTION, reduce_red_proportion, apply_red_proportion, double(0), 100); core.set_sync(NUM_FLIPS, gl::glshared_sync_ops::sum<size_t, get_flip>, gl::glshared_apply_ops::identity<size_t>, size_t(0), 100); }
A merge operation can also be provided which will allow the sync operation to be parallelized. This is highly recommended.
If a merge function is defined for a sync, the set of vertices will be partitioned into a collection of disjoint sets. The sync function is then performed on each set in parallel, producing a number of partial results. These partial results are then combined with the merge function. Finally, the apply function is executed on the final result and written to the shared variable.
Since the intermediate results of the sync is simply a count of the number of red vertices seen so far, the merge is then just a sum.
void merge_red_proportion(graphlab::any& target, const graphlab::any& source) { target.as<double>() += source.as<double>(); }
To sync using the merge operation, the set_sync is called using:
core.set_sync(RED_PROPORTION,
reduce_red_proportion,
apply_red_proportion,
double(0),
100,
merge_red_proportion);
Similarly, a number pre-defined merge functions are defined in graphlab::glshared_merge_ops , and the NUM_FLIPS sync can be defined in the same way.
core.set_sync(NUM_FLIPS,
gl::glshared_sync_ops::sum<size_t, get_flip>,
gl::glshared_apply_ops::identity<size_t>,
size_t(0),
100,
gl::glshared_merge_ops::sum<size_t>);
The Main function where everything begins. Here, we will demonstrate the minimal code needed to start a GraphLab job using all the parts we defined above. We will use the GraphLab command line tools to setup a basic engine using command line options. We first present the complete main and then discuss each of the parts.
int main(int argc, char *argv[]) { // Parse the command line using the command line options tool // and scope type on the command line graphlab::command_line_options opts; size_t dimensions = 20; bool makegraph = false; opts.use_distributed_options(); opts.attach_option("dim", &dimensions, size_t(20), "the dimension of the grid"); opts.attach_option("makegraph", &makegraph, makegraph, "Makes Graph"); // parse the command line bool success = opts.parse(argc, argv); if(!success) { return EXIT_FAILURE; } if (makegraph) { // call init_graph to create the graph gl::graph g; init_graph(g, dimensions); std::vector<graphlab::vertex_id_t> parts; g.metis_partition (16, parts); graph_partition_to_atomindex(g, parts, "demograph"); return 0; } graphlab::mpi_tools::init(argc, argv); graphlab::dc_init_param param; ASSERT_TRUE(graphlab::init_param_from_mpi(param)); graphlab::distributed_control dc(param); // Display the values opts.print(); // create a graphlab core which contains the graph, shared data, and // engine gl::distributed_core glcore(dc, "demograph.idx"); // Initialize the core with the command line arguments glcore.set_engine_options(opts); glcore.build_engine(); // call create shared_data to create the shared data init_shared_data(glcore, dimensions); // since we are using a task scheduler, we need to // to create tasks. otherwise the engine will just terminate immediately // there are DIM * DIM vertices glcore.add_task_to_all(update_function, 1.0); // Run the graphlab engine double runtime = glcore.start(); // output the runtime std::cout << "Completed in " << runtime << " seconds" << std::endl; // now we can look the values using the get() function size_t numberofflips = NUM_FLIPS.get_val(); double redprop = RED_PROPORTION.get_val(); // output some interesting statistics std::cout << "Number of flips: " << numberofflips << std::endl; std::cout << "Red prop: " << redprop << std::endl; // output the graph // note that here we take advantage of the fact that vertex insertion // gives sequential numberings if (dc.procid() == 0) { size_t ctr = 0; for (size_t i = 0;i < dimensions; ++i) { for (size_t j = 0;j < dimensions; ++j) { std::cout << size_t(glcore.graph().get_vertex_data(ctr).color) << " "; ++ctr; } std::cout << std::endl; } } graphlab::mpi_tools::finalize(); }
1.7.1