Distributed GraphLab

To work with distributed GraphLab, MPI is needed. We have tested with MPICH2 and the MPI distribution that comes with OS X. However, other distributions of MPI (Open MPI) should work as well.

Distributed GraphLab is functionally quite similar to the regular GraphLab. But due to the law of leaky abstractions (http://www.joelonsoftware.com/articles/LeakyAbstractions.html), there are some issues the user will have to take into consideration.

Firstly, the graph cannot be created on the fly, and must be created before-hand and loaded from an atom index file ( Distributed Graph Creation ).

Next, since multiple instances of the program is started across the network, some amount of coordination is needed to ensure that all instances behave identically. We strongly encourage the use of graphlab::core (which is the distributed setting is graphlab::distributed_core) since that simplifies the amount of management needed.

Attention:
The general rule is that any operation which affect operation on a global scale (for instance, a configuration setting) should be called by all instances simultaneously.

For users who have read the shared memory detailed_example, or are already familiar with the shared memory GraphLab, can simply read on. Otherwise here is a distributed version of the shared memory detailed_example: Distributed Detailed Example

Depending on the complexity of your program, moving from the shared memory version to the distributed version can be quite mechanical. The key differences are listed here.

An easy way to see what changes you have to make to move a shared memory version to a distributed version is to take a diff between tests/demo/demo.cpp and tests/dist_demo/dist_demo.cpp

Types

Instead of including graphlab.hpp, you should include distributed_graphlab.hpp Similarly, the distributed version core is called dgraphlab::istributed_core and the distributed version of the types struct is graphlab::distributed_types.

We recommend using the graphlabb::distributed_types system,

typedef graphlab::distributed_graph<vertex_data, edge_data> graph_type;
typedef graphlab::distributed_types<graph_type> gl;

since it this will allow you to "copy and paste" code from your shared memory implementation easily while the type system changes the back-end implementations.

For instance, your update function should not use graphlab::edge_list, but should use gl::edge_list since the exact type is different for the shared memory graph and for the distributed graph.

Similarly, gl::distributed_core should be used instead of core and gl::distributed_glshared should be used instead of glshared

Use The Disk Graph

Unlike the shared memory version, the graph cannot be created on the fly. The user must first create a disk graph, then construct the distributed_core / distributed_graph from the disk graph.

Distributed Execution

The user must constantly keep in mind that in the distributed setting, there are a collection of "independent" copies of the program being executed, all communicating through MPI or through the GraphLab RPC system.

All distributed_core / engine functions generally require all machines to execute the exact same code path, running the same functions at the same time. For instance, the following code:

  core.set_sync(...);
  core.set_engine_options(clopts);
  core.build_engine();

should be executed by all machines in the same order at the same time. Some of these functions (for instance core.build_engine() ) has an internal distributed barrier which ensures that all machines must reach the build_engine() call before execution can proceed.

There are a few exceptions to this rule (such as add_task()) and these are documented.

Distributed Graph Limitations

The regular graph has functions which permit obtaining vertex and edge data by reference. The distributed_graph has similar functions but for obvious reasons, it is restricted to vertex and edge data "owned" by the local partition. The alternate functions get_vertex_data() / set_vertex_data() as well as the edge varieties should be used instead. These return by value and can get/set vertex and edge data from across the network.

The user should keep in mind that these remote network gets/sets are inherently slow and is where the Law of Leaky Abstractions slip through. If a large number of vertex/edge reads and writes are needed, it is recommended that the user make use of the RPC operations to gather/scatter the information in bulk instead of accessing the data one at a time. Alternatively, the graph can be saved to disk and loaded using one machine making use of the disk_graph class.

Build The Engine

The shared memory core automatically destroys and reconstructs the engine as options change. This is unfortunately difficult to coordinate in the distributed setting. Therefore the user must explicitly construct the engine once all options are set through the graphlab::distributed_core::build_engine() function.

Once the engine has been constructed, it can be used repeatedly. However, engine options can no longer be modified.

Distributed Command Line Options

The graphlab::command_line_options object automatically injects options for the shared memory engines. To get options for the distributed engine,

  opts.use_distributed_options();

must be called prior to calling opts.parse()

Distributed Engine Types

The user will notice that the new command line options include a --engine option. Distributed Graphlab implements two engines with very different characteristics. One is called the chromatic engine (--engine=dist_chromatic), the other is called the locking engine --engine=dist_locking. The chromatic engine is the default.

Chromatic Engine

The chromatic engine operates by associating an update_task with each vertex. It then makes use of the graph coloring to run update tasks in parallel. For instance if my graph has 3 colors, it will run all update tasks on vertices of color 0 in parallel. Stop, then run all tasks on vertices of color 1 in parallel, etc.

The chromatic engine therefore does not use a scheduler since it essentially has a built in chromatic scheduler. However, it assumes that The graph coloring is valid for the scope type requested. In other words, if an edge scope is requested, then the graph coloring must ensure that neighboring vertices do not have the same color. If a full scope is requested, then the graph coloring must ensure that vertices one hop away from each other do not have the same color. If coloring constraints are violated, the engine will still run properly, though consistency guarantees do not hold.

Engine Options

The max iteration count and whether to permute the vertex update order can be set. There are two engine options, "max_iterations=N" and "randomize_schedule=0 or 1" and these can be set on the command line:

--engine="dist_chromatic(max_iterations=10,randomize_schedule=1)"

Limitations

The chromatic engine is therefore somewhat limited in its scheduling capabilities (only supporting a chromatic schedule), supports only one update function type, and requires a valid graph coloring. However, it benefits from having a much smaller overhead than the other more general Locking engine. It is therefore the default engine.

Locking Engine

The locking engine is the generalization of the shared memory implementation to the distributed setting. It makes use of distributed locking to ensure consistency, and uses a variety of pipelining methods to keep a large number of operations in flight at any time. It can work with a subset of the shared memory schedulers.

The locking engine is much more general and operates almost identically to the shared memory engines. However, the implementation complexity leads to some loss of efficiency.

Engine Options

The locking engine takes one option: max_deferred_tasks_per_node which is the maximum number of update tasks to keep in flight in the locking pipeline. The default value is 1000. This can be set on the command line using:

--engine=dist_locking(max_deferred_tasks_per_node=10)"

Incomplete Implementations

distributed_glshared_const is not yet implemented. The user can "emulate" this by simply having a regular global variable and ensuring that all processes set the value of the global variable on start up. Alternatively, distributed_glshared can be used as well.

Engine / core's sync_now() operation is not yet implemented.