dc_buffered_stream_send_expqueue2.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef DC_BUFFERED_STREAM_SEND_EXPQUEUE2_HPP
00019 #define DC_BUFFERED_STREAM_SEND_EXPQUEUE2_HPP
00020 #include <iostream>
00021 #include <boost/function.hpp>
00022 #include <boost/bind.hpp>
00023 #include <boost/type_traits/is_base_of.hpp>
00024 #include <graphlab/rpc/dc_internal_types.hpp>
00025 #include <graphlab/rpc/dc_types.hpp>
00026 #include <graphlab/rpc/dc_comm_base.hpp>
00027 #include <graphlab/rpc/dc_send.hpp>
00028 #include <graphlab/parallel/pthread_tools.hpp>
00029 #include <graphlab/util/blocking_queue.hpp>
00030 #include <graphlab/logger/logger.hpp>
00031 namespace graphlab {
00032 class distributed_control;
00033 
00034 namespace dc_impl {
00035 
00036 struct expqueue2_entry{
00037   procid_t target;
00038   char* c;
00039   size_t len;
00040 };
00041 
00042 /**
00043    \ingroup rpc
00044 Sender for the dc class.
00045   The job of the sender is to take as input data blocks of
00046   pieces which should be sent to a single destination socket.
00047   This can be thought of as a sending end of a multiplexor.
00048   This class performs buffered transmissions using an blocking 
00049   queue with one call per queue entry.
00050   A seperate thread is used to transmit queue entries. Unlike
00051   dc_buffered_stream_send_expqueue, no write combining is performed.
00052   
00053   This can be enabled by passing "buffered_queued_send_single=yes"
00054   in the distributed control initstring.
00055 */
00056 
00057 class dc_buffered_stream_send_expqueue2: public dc_send{
00058  public:
00059   dc_buffered_stream_send_expqueue2(distributed_control* dc, dc_comm_base *comm): dc(dc), 
00060                                     comm(comm), done(false) { 
00061     thr = launch_in_new_thread(boost::bind(&dc_buffered_stream_send_expqueue2::send_loop, 
00062                                       this));
00063   }
00064   
00065   ~dc_buffered_stream_send_expqueue2() {
00066   }
00067   
00068 
00069   inline bool channel_active(procid_t target) const {
00070     return comm->channel_active(target);
00071   }
00072 
00073   /**
00074    Called by the controller when there is data to send.
00075    if len is -1, the function has to compute the length by itself,
00076    or send the data from the stream directly. the strm is not copyable.
00077   */
00078   void send_data(procid_t target, 
00079                  unsigned char packet_type_mask,
00080                  std::istream &istrm,
00081                  size_t len = size_t(-1));
00082                  
00083   /** Another possible interface the controller can
00084   call with when there is data to send. The caller has
00085   responsibility for freeing the pointer when this call returns*/
00086   void send_data(procid_t target, 
00087                  unsigned char packet_type_mask,
00088                  char* data, size_t len);
00089 
00090   void send_loop();
00091   
00092 
00093   void shutdown();
00094   
00095   inline size_t bytes_sent() {
00096     return bytessent.value;
00097   }
00098 
00099  private:
00100   /// pointer to the owner
00101   distributed_control* dc;
00102   dc_comm_base *comm;
00103 
00104   blocking_queue<expqueue2_entry> sendqueue;
00105 
00106   thread thr;
00107   bool done;
00108   atomic<size_t> bytessent;
00109   
00110 };
00111 
00112 
00113 
00114 } // namespace dc_impl
00115 } // namespace graphlab
00116 #endif // DC_BUFFERED_STREAM_SEND_EXPQUEUE_HPP
00117