00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef REPLY_INCREMENT_COUNTER_HPP
00019 #define REPLY_INCREMENT_COUNTER_HPP
00020 #include <string>
00021 #include <graphlab/parallel/atomic.hpp>
00022 #include <graphlab/parallel/pthread_tools.hpp>
00023
00024 namespace graphlab {
00025
00026 class distributed_control;
00027
00028 namespace dc_impl {
00029
00030
00031
00032
00033
00034 struct blob {
00035
00036 blob(char* c, size_t len):c(c),len(len) { };
00037 blob():c(NULL), len(0){ };
00038
00039 char *c;
00040 size_t len;
00041
00042
00043
00044 void save(oarchive& oarc) const {
00045 oarc << len;
00046 if (len > 0) serialize(oarc, c, len);
00047 }
00048
00049
00050 void load(iarchive& iarc) {
00051 if (c) ::free(c);
00052 c = NULL;
00053 iarc >> len;
00054 if (len > 0) {
00055 c = (char*) malloc(len);
00056 deserialize(iarc, c, len);
00057 }
00058 }
00059
00060
00061 void free() {
00062 if (c) {
00063 ::free(c);
00064 c = NULL;
00065 len = 0;
00066 }
00067 }
00068 };
00069
00070
00071
00072
00073
00074
00075
00076
00077 struct reply_ret_type{
00078 atomic<size_t> flag;
00079 blob val;
00080 bool usemutex;
00081 mutex mut;
00082 conditional cond;
00083
00084
00085
00086
00087 reply_ret_type(bool usemutex, size_t retcount = 1):flag(retcount),
00088 usemutex(true) {
00089 }
00090
00091 ~reply_ret_type() { }
00092
00093
00094
00095
00096
00097 inline void wait() {
00098 if (usemutex) {
00099 mut.lock();
00100 while(flag.value != 0) cond.wait(mut);
00101 mut.unlock();
00102 }
00103 else {
00104 while(flag.value != 0) sched_yield();
00105 }
00106 }
00107 };
00108
00109 }
00110
00111
00112
00113
00114
00115
00116
00117
00118 void reply_increment_counter(distributed_control &dc, procid_t src,
00119 size_t ptr, dc_impl::blob ret);
00120
00121 }
00122
00123 #endif
00124