analyzer_listener.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 
00019 #ifndef analyzer_listener_HPP
00020 #define analyzer_listener_HPP
00021 
00022 #include <vector>
00023 #include <string>
00024 
00025 #include <graphlab/app_support/appstats.hpp>
00026 #include <graphlab/graph/graph.hpp>
00027 #include <graphlab/util/timer.hpp>
00028 #include <graphlab/parallel/atomic.hpp>
00029 #include <graphlab/monitoring/imonitor.hpp>
00030 #include <graphlab/parallel/pthread_tools.hpp>
00031 
00032 namespace graphlab {
00033   
00034   // Sorry about the naming, but I like it.
00035   typedef std::vector<double>(*global_dumper)(blob_graph  &g);
00036   
00037   class analyzer_listener : 
00038     public imonitor<blob_graph> {
00039     
00040     typedef imonitor<blob_graph>::update_task_type
00041     update_task_type;
00042 
00043     typedef imonitor<blob_graph>::iengine_type iengine_type;
00044 
00045     blob_graph* g;
00046     std::vector<vertex_stats> stats;
00047     timer t;
00048     
00049     std::string appname;
00050     
00051     /* Locks */
00052     mutex startlock;
00053     conditional startcond;
00054     
00055     bool pause;
00056     
00057     global_dumper dumperfunc;
00058     int dumpfreq;
00059     std::vector<std::string> dumpheaders;
00060     FILE * dumpfile;
00061     
00062     atomic<size_t> taskcount;
00063     
00064     
00065   public:
00066         
00067     analyzer_listener(blob_graph* _g, std::string _appname) : 
00068       stats(_g->num_vertices()),taskcount(0) {      
00069       g = _g;
00070       appname = _appname;
00071       for(unsigned int i=0; i<g->num_vertices(); i++) {
00072         stats[i].vertexid = i;
00073         stats[i].priority = 0.0f;
00074         stats[i].last_update_time = 0.0f;
00075         stats[i].value = 0.0f;
00076         stats[i].updatecount = 0;
00077       }
00078       pause = false;
00079       dumperfunc = NULL;
00080     }
00081     
00082     ~analyzer_listener() {
00083     } 
00084     
00085     virtual void set_global_dumper(std::vector<std::string> headers, global_dumper dumpfunc, int dumpfreq_updates=5000) {
00086       dumperfunc = dumpfunc;
00087       dumpfreq = dumpfreq_updates;
00088       dumpheaders = headers;
00089       
00090       /* Open file */
00091       std::string filename = appname + "_dump.dat";
00092       dumpfile = fopen(filename.c_str(), "w");
00093       
00094       /* Write headers */
00095       fprintf(dumpfile, "updates\t");
00096       for(unsigned int i=0; i<headers.size(); i++) {
00097         fprintf(dumpfile, "\t");
00098         fprintf(dumpfile, "%s", headers[i].c_str());
00099       }
00100       fprintf(dumpfile,"\n");
00101     }
00102     
00103     virtual void write_graph_datafile() {
00104       std::string filename = appname + "_graph.dat";
00105       FILE * f = fopen(filename.c_str(), "w");      
00106       fprintf(f, "indegree\toutdegree\n");
00107       for(vertex_id_t vid = 0; vid< g->num_vertices(); vid++) {
00108         fprintf(f, "%u\t%u\n", (unsigned int)g->num_in_neighbors(vid), 
00109                 (unsigned int)g->num_out_neighbors(vid));
00110       }
00111       fclose(f);
00112       
00113       printf("Wrote %s \n", filename.c_str());
00114     }
00115     
00116     virtual void write_final_stats() {
00117       std::string filename = appname + "_updates.dat";
00118       FILE * f = fopen(filename.c_str(), "w");      
00119       fprintf(f, "num_updates\n");
00120       
00121       for(vertex_id_t vid = 0; vid< g->num_vertices(); vid++) {
00122         fprintf(f, "%u\n", (unsigned int)stats[vid].updatecount);
00123       }
00124       
00125       fclose(f);
00126     }
00127     
00128     
00129     virtual void dump(size_t tc) {
00130       std::vector<double> newdump = dumperfunc(*g);
00131       fprintf(dumpfile, "%u", (unsigned int) tc);
00132       for(unsigned int i=0; i<newdump.size(); i++) {
00133         fprintf(dumpfile, "\t%lf", newdump[i]); 
00134       }
00135       fprintf(dumpfile, "\n");
00136     }
00137     
00138     virtual void init(iengine_type* engine) {
00139       printf("====== ANALYZER SERVER LISTENER STARTED ===== \n");
00140       // Todo: start web server and wait for connection.
00141       
00142       write_graph_datafile();
00143     }
00144     
00145     /**
00146      * Pause & continue hacks
00147      */
00148     virtual void pauseExec() {
00149       startlock.lock();
00150       pause = true;
00151       startlock.unlock();
00152     }
00153     
00154     virtual void continueExec() {
00155       startlock.lock();
00156       pause = false;
00157       startcond.broadcast();
00158       startlock.unlock();
00159     }
00160     
00161     
00162     void scheduler_task_scheduled(update_task_type task, double current_max_priority) {
00163       vertex_id_t vid = task.vertex();
00164       stats[vid].updatecount++;
00165       
00166       /* Hack to pause - don't return control to scheduler/engine */
00167       while(pause) {
00168 
00169         startlock.lock();
00170         if (pause) {
00171           startcond.wait(startlock);
00172         }
00173         startlock.unlock();
00174       }
00175       
00176       /* Dump */
00177       if (dumperfunc != NULL) {
00178         size_t tc = taskcount.inc();
00179         if (tc % dumpfreq == 0) {
00180           /* Pause - there can be some odd updates, but maybe this is ok anyway */
00181           pauseExec();
00182           dump(tc);
00183           continueExec();
00184         }
00185       }
00186       
00187     }
00188     
00189     
00190     std::vector<vertex_stats> get_stats_snapshot() {
00191       return stats;
00192     }
00193     
00194     /* Scheduler calls */
00195     virtual void scheduler_task_added(update_task_type task, 
00196                                       double priority) { }
00197     
00198     virtual void scheduler_task_promoted(update_task_type task, 
00199                                          double diffpriority, 
00200                                          double totalpriority) {  }  
00201     
00202     
00203     
00204     /* Application calls */
00205     virtual void app_set_vertex_value(vertex_id_t vid, double value) { 
00206       stats[vid].value = (float)value;
00207     }
00208     
00209     /* Called by application to help visualizers to scale values properly */
00210     virtual void app_set_vertex_value_scale(double min, double max) { }
00211     
00212     virtual void engine_worker_dies(size_t cpuid, int tcc) { 
00213       if (cpuid == 0) {
00214         write_final_stats();
00215         size_t tc = taskcount.inc()-1;
00216         dump(tc);
00217         //DB fclose(dumpfile); //potential problem in a loop
00218       }
00219     }
00220     
00221     
00222   };
00223   
00224   
00225 }
00226 
00227 #endif
00228