00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef analyzer_listener_HPP
00020 #define analyzer_listener_HPP
00021
00022 #include <vector>
00023 #include <string>
00024
00025 #include <graphlab/app_support/appstats.hpp>
00026 #include <graphlab/graph/graph.hpp>
00027 #include <graphlab/util/timer.hpp>
00028 #include <graphlab/parallel/atomic.hpp>
00029 #include <graphlab/monitoring/imonitor.hpp>
00030 #include <graphlab/parallel/pthread_tools.hpp>
00031
00032 namespace graphlab {
00033
00034
00035 typedef std::vector<double>(*global_dumper)(blob_graph &g);
00036
00037 class analyzer_listener :
00038 public imonitor<blob_graph> {
00039
00040 typedef imonitor<blob_graph>::update_task_type
00041 update_task_type;
00042
00043 typedef imonitor<blob_graph>::iengine_type iengine_type;
00044
00045 blob_graph* g;
00046 std::vector<vertex_stats> stats;
00047 timer t;
00048
00049 std::string appname;
00050
00051
00052 mutex startlock;
00053 conditional startcond;
00054
00055 bool pause;
00056
00057 global_dumper dumperfunc;
00058 int dumpfreq;
00059 std::vector<std::string> dumpheaders;
00060 FILE * dumpfile;
00061
00062 atomic<size_t> taskcount;
00063
00064
00065 public:
00066
00067 analyzer_listener(blob_graph* _g, std::string _appname) :
00068 stats(_g->num_vertices()),taskcount(0) {
00069 g = _g;
00070 appname = _appname;
00071 for(unsigned int i=0; i<g->num_vertices(); i++) {
00072 stats[i].vertexid = i;
00073 stats[i].priority = 0.0f;
00074 stats[i].last_update_time = 0.0f;
00075 stats[i].value = 0.0f;
00076 stats[i].updatecount = 0;
00077 }
00078 pause = false;
00079 dumperfunc = NULL;
00080 }
00081
00082 ~analyzer_listener() {
00083 }
00084
00085 virtual void set_global_dumper(std::vector<std::string> headers, global_dumper dumpfunc, int dumpfreq_updates=5000) {
00086 dumperfunc = dumpfunc;
00087 dumpfreq = dumpfreq_updates;
00088 dumpheaders = headers;
00089
00090
00091 std::string filename = appname + "_dump.dat";
00092 dumpfile = fopen(filename.c_str(), "w");
00093
00094
00095 fprintf(dumpfile, "updates\t");
00096 for(unsigned int i=0; i<headers.size(); i++) {
00097 fprintf(dumpfile, "\t");
00098 fprintf(dumpfile, "%s", headers[i].c_str());
00099 }
00100 fprintf(dumpfile,"\n");
00101 }
00102
00103 virtual void write_graph_datafile() {
00104 std::string filename = appname + "_graph.dat";
00105 FILE * f = fopen(filename.c_str(), "w");
00106 fprintf(f, "indegree\toutdegree\n");
00107 for(vertex_id_t vid = 0; vid< g->num_vertices(); vid++) {
00108 fprintf(f, "%u\t%u\n", (unsigned int)g->num_in_neighbors(vid),
00109 (unsigned int)g->num_out_neighbors(vid));
00110 }
00111 fclose(f);
00112
00113 printf("Wrote %s \n", filename.c_str());
00114 }
00115
00116 virtual void write_final_stats() {
00117 std::string filename = appname + "_updates.dat";
00118 FILE * f = fopen(filename.c_str(), "w");
00119 fprintf(f, "num_updates\n");
00120
00121 for(vertex_id_t vid = 0; vid< g->num_vertices(); vid++) {
00122 fprintf(f, "%u\n", (unsigned int)stats[vid].updatecount);
00123 }
00124
00125 fclose(f);
00126 }
00127
00128
00129 virtual void dump(size_t tc) {
00130 std::vector<double> newdump = dumperfunc(*g);
00131 fprintf(dumpfile, "%u", (unsigned int) tc);
00132 for(unsigned int i=0; i<newdump.size(); i++) {
00133 fprintf(dumpfile, "\t%lf", newdump[i]);
00134 }
00135 fprintf(dumpfile, "\n");
00136 }
00137
00138 virtual void init(iengine_type* engine) {
00139 printf("====== ANALYZER SERVER LISTENER STARTED ===== \n");
00140
00141
00142 write_graph_datafile();
00143 }
00144
00145
00146
00147
00148 virtual void pauseExec() {
00149 startlock.lock();
00150 pause = true;
00151 startlock.unlock();
00152 }
00153
00154 virtual void continueExec() {
00155 startlock.lock();
00156 pause = false;
00157 startcond.broadcast();
00158 startlock.unlock();
00159 }
00160
00161
00162 void scheduler_task_scheduled(update_task_type task, double current_max_priority) {
00163 vertex_id_t vid = task.vertex();
00164 stats[vid].updatecount++;
00165
00166
00167 while(pause) {
00168
00169 startlock.lock();
00170 if (pause) {
00171 startcond.wait(startlock);
00172 }
00173 startlock.unlock();
00174 }
00175
00176
00177 if (dumperfunc != NULL) {
00178 size_t tc = taskcount.inc();
00179 if (tc % dumpfreq == 0) {
00180
00181 pauseExec();
00182 dump(tc);
00183 continueExec();
00184 }
00185 }
00186
00187 }
00188
00189
00190 std::vector<vertex_stats> get_stats_snapshot() {
00191 return stats;
00192 }
00193
00194
00195 virtual void scheduler_task_added(update_task_type task,
00196 double priority) { }
00197
00198 virtual void scheduler_task_promoted(update_task_type task,
00199 double diffpriority,
00200 double totalpriority) { }
00201
00202
00203
00204
00205 virtual void app_set_vertex_value(vertex_id_t vid, double value) {
00206 stats[vid].value = (float)value;
00207 }
00208
00209
00210 virtual void app_set_vertex_value_scale(double min, double max) { }
00211
00212 virtual void engine_worker_dies(size_t cpuid, int tcc) {
00213 if (cpuid == 0) {
00214 write_final_stats();
00215 size_t tc = taskcount.inc()-1;
00216 dump(tc);
00217
00218 }
00219 }
00220
00221
00222 };
00223
00224
00225 }
00226
00227 #endif
00228