Shared Data

The shared data system provides controlled thread-safe access to global variables. They are mainly used to provide two capabilities:

The detailed example provides a good description of how this is used.

Globally Shared Variables

All shared data must be global variables which are declared using the syntax.

gl::glshared<T> var;

, where T is the data type of variable. Arbitrary data types can be used and is guaranteed to be thread-safe. The current implementation operates by keeping two reference counted copies of the data and so will consume twice the memory capacity of using a regular variable.

To read the value of the variable by value:

T val = var.get_val();

To read the value of the variable by reference:

boost::shared_ptr<T> ptr = var.get_ptr();

The get_ptr() function returns a pointer to the data. The pointer is in the form of a boost::shared_ptr which can be dereferenced like a usual pointer using the dereference operator (*). While the pointer is still in scope, the values read from the pointer is guaranteed to not change. Under particular conditions, holding on to the shared pointer could prevent writes to the variable from progressing. The pointer should therefore be released as soon as possible by either letting it go out of scope, or by calling ptr.release() explicitly.

The variable is modified using the set_val() function, which is quite self-explanatory.

var.set_val(T newval);

The shared data type also provides Atomic operations.

Atomics

In addition to the regular get/set functions, the shared variable also provides two basic atomic operations.

T oldval = glshared<T>::exchange(T newval)

Writes the "newvalue" into the variable while returning a copy of the previous value.

void glshared<T>::apply(apply_function_type fun, const any& closure)

Calls the function fun with a reference to the value of the shared variable. The type of the function is

void(*apply_function_type)(any& current_value,
                           const any& closure);

It is unfortunate, but due to current design limitations, the current_value must be passed to the apply function as an any type. However, the type of the value within the any container is the type of original glshared variable. For instance, if shared variable is declared using:

      gl::glshared<size_t> shared_counts;

And I write an apply function which adds the closure value to the shared_counts, the resultant code will be:

      void add_counts(any& current_value, const any& increment) {
        current_value.as<size_t>() += increment.as<size_t>();
      }

Observe that the current_value can be accessed (and written to) through the graphlab::any::as() function and the type to be passed to the as() function matches the type of the shared variable. The apply function is allowed to make changes to the current_value, and future reads from this variable will return the new value.

Sync

Sync performs a function accumulation (also called fold/reduce) across all the vertices in the graph and writes the result to an associated entry in the shared data table. A Sync operation is defined by a pair of functions. A Sync function and a Apply function. This concept is best explained with an example. For instance, consider a graph with an double on each vertex.

typedef graphlab::graph<double, double> graph_type;
typedef graphlab::types<graph_type> gl;

and I would like to compute the L2 Norm of all the integers i.e. (square root of the sum of squares). I would define a shared variable used to hold the final result.

      gl::glshared<double> l2norm_value;

As well as the following sync and apply functions:

void squared_sum_sync(gl::iscope& scope, 
                 graphlab::any& accumulator) { 

  accumulator.as<double>() += 
         scope.const_vertex_data() * scope.const_vertex_data();
}

void square_root_apply(graphlab::any& current_data,  
                 const graphlab::any& new_data) {

  current_data.as<double>() = sqrt(new_data.as<double>());
}

The variable is associated with the sync/apply functions using:

core.set_sync(l2norm_value,       // shared variable
              squared_sum_sync,   // sync function
              square_root_apply,  // apply function
              double(0.0),        // initial sync value
              100);               // sync frequency

When evaluated, the sync function (squared_sum_sync) will be called on the scope of each vertex in turn. The accumulator is passed from one function call to the next function call, accumulating the values as it goes along. The squared_sum function is passed the index of the associated entry in the shared data table, as well as a reference to the table and the scope of current vertex being evaluated.

When all the vertices are done, the apply function (square_root_apply) is evaluated on the result of the accumulation. The new_data variable contains the final value of the accumulator during the accumulation stage. At this point current_data is a reference to the shared variable l2norm_value. That is to say: the value of current_data is equivalent to the value of l2norm_value.get_val(). Modifications to current_data will be reflected in future accesses to l2norm_value.

A sync is created using the graphlab::iengine::set_sync() member function of the engine or the core graphlab::core::set_sync() (the core simply forwards the call to the engine it contains).

Common Syncs and Applyies

A collection of commonly used Syncs and Applies are provided in gl::glshared_sync_ops and gl::glshared_apply_ops.

To use the glshared_sync_ops, you must provide an accessor function of the form AccumulationType accessor(const vertex_data& v);

Then you can make use of the following sync functions:

double accessor(const double& v) {
  return v;
}

shared_data.set_sync(l2norm_value, 
                     gl::glshared_sync_ops::l2sum<double, accessor>,
                     gl::glshared_apply_ops::sqrt<double>,
                     double(0.0), 
                     100);

Merge

The Sync operation as described cannot be parallelized. To permit parallelism, an additional merge function which combines partial synced results must be defined.

If a merge function is defined, the set of vertices will be partitioned into a collection of disjoint sets. The sync function is then performed on each set in parallel, producing a number of partial results. These partial results are then combined with the merge function. Finally, the apply function is executed on the final result and written to the shared variable.

For instance, in the earlier sync example where the L2 Norm of the graph vertex data is computed, intermediate results can be combined using:

static void sum_merge(any& dest,
                const any& src) {
  dest.as<double>() += src.as<double>();
}

The set Sync call must similarly be updated

core.set_sync(l2norm_value,       // shared variable
              squared_sum_sync,   // sync function
              square_root_apply,  // apply function
              double(0.0),        // initial sync value
              100,                // sync frequency
              sum_merge);         // merge function

Similarly, a collection of common Merges are provided in gl::glshared_merge_ops

Example:

shared_data.set_sync(l2norm_value, 
                     gl::glshared_sync_ops::l2sum<double, accessor>,
                     gl::glshared_apply_ops::sqrt<double>,
                     double(0.0), 
                     100,
                     gl::glshared_merge_ops::sum<double>);