dc_sctp_comm.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef DC_SCTP_COMM_HPP
00019 #define DC_SCTP_COMM_HPP
00020 
00021 #include <sys/socket.h>
00022 #include <netinet/in.h>
00023 
00024 #include <vector>
00025 #include <string>
00026 #include <map>
00027 
00028 #include <graphlab/parallel/pthread_tools.hpp>
00029 #include <graphlab/rpc/dc_types.hpp>
00030 #include <graphlab/rpc/dc_internal_types.hpp>
00031 #include <graphlab/rpc/dc_comm_base.hpp>
00032 
00033 namespace graphlab {
00034 namespace dc_impl {
00035   
00036 /**
00037 \ingroup rpc_internal
00038 SCTP implementation of the communications subsystem
00039 This is experimental
00040 */
00041 class dc_sctp_comm:public dc_comm_base {
00042  public:
00043    
00044   dc_sctp_comm();
00045   
00046   size_t capabilities() const {
00047     return COMM_STREAM;
00048   }
00049   
00050   /**
00051    this fuction should pause until all communication has been set up
00052    and returns the number of systems in the network.
00053    After which, all other remaining public functions (numprocs(), send(), etc)
00054    should operate normally. Every received message should immediate trigger the 
00055    attached receiver
00056    
00057    machines: a vector of strings where each string is of the form [IP]:[portnumber]
00058    initopts: unused
00059    curmachineid: The ID of the current machine. machines[curmachineid] will be 
00060                  the listening address of this machine
00061    
00062    recvcallback: A function pointer to the receiving function. This function must be thread-safe
00063    tag: An additional pointer passed to the receiving function.
00064   */
00065   void init(const std::vector<std::string> &machines,
00066             const std::map<std::string,std::string> &initopts,
00067             procid_t curmachineid,
00068             std::vector<dc_receive*> receiver);
00069 
00070   /** shuts down all sockets and cleans up */
00071   void close();
00072   
00073   ~dc_sctp_comm();
00074   
00075   // always true. SCTP can send anywhere
00076   inline bool channel_active(size_t target) const {
00077     return true;
00078   }
00079     
00080   /**
00081     Returns the number of machines in the network.
00082     Only valid after call to init();
00083   */
00084   inline procid_t numprocs() const {
00085     return nprocs;
00086   }
00087   
00088   inline procid_t procid() const {
00089     return curid;
00090   }
00091   
00092   inline size_t network_bytes_sent() const {
00093     return network_bytessent.value;
00094   }
00095 
00096  inline size_t network_bytes_received() const {
00097     //TODO
00098     return 0;
00099   }
00100 
00101   void flush(size_t target);
00102   /**
00103    Sends the string of length len to the target machine dest.
00104    Only valid after call to init();
00105    Establishes a connection if necessary
00106   */
00107   void send(size_t target, const char* buf, size_t len);
00108   
00109   void send2(size_t target, 
00110              const char* buf1, const size_t len1,
00111              const char* buf2, const size_t len2); 
00112 
00113 
00114  private:
00115  
00116   void set_socket_options(int fd);
00117 
00118   // opens the listening sock and spawns a thread to listen on it
00119   void open_listening();  
00120   void open_sending();  
00121   
00122   // constructs a connection to the target machine
00123   void connect(size_t target);
00124 
00125   // wrapper around the standard send. but loops till the buffer is all sent
00126   int sendtosock(int sockfd, size_t target, const char* buf, size_t len, uint16_t stream);
00127   
00128   /// all_addrs[i] will contain the IP address of machine i
00129   std::vector<uint32_t> all_addrs;
00130   std::vector<struct sockaddr_in> all_sock_addrs;
00131   std::map<uint32_t, procid_t> addr2id;
00132   std::vector<uint32_t> portnums;
00133   
00134   procid_t curid; 
00135   procid_t nprocs;
00136   
00137   /// the socket we use to listen on (server socket)
00138   int listensock;
00139   thread listenthread;
00140   
00141   comm_recv_callback_type recvcallback;
00142   void* tag;
00143   
00144   int sendsock;
00145   
00146   atomic<size_t> network_bytessent;
00147   
00148   void server_handler_loop();
00149   
00150   std::vector<dc_receive*> receiver;
00151   
00152   std::vector<char> machines_started;
00153   /// waits for all machines to start up
00154   void wait_for_all_machines();
00155   void handle_control(procid_t src, 
00156                       const char* buf, size_t len);
00157 
00158 };
00159 
00160 } // namespace dc_impl
00161 } // namespace graphlab
00162 #endif
00163