dc_buffered_stream_send_expqueue_z.hpp

00001 /*
00002 This file is part of GraphLab.
00003 
00004 GraphLab is free software: you can redistribute it and/or modify
00005 it under the terms of the GNU Lesser General Public License as 
00006 published by the Free Software Foundation, either version 3 of 
00007 the License, or (at your option) any later version.
00008 
00009 GraphLab is distributed in the hope that it will be useful,
00010 but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 GNU Lesser General Public License for more details.
00013 
00014 You should have received a copy of the GNU Lesser General Public 
00015 License along with GraphLab.  If not, see <http://www.gnu.org/licenses/>.
00016 */
00017 
00018 #ifndef DC_BUFFERED_STREAM_SEND_EXPQUEUE_Z_HPP
00019 #define DC_BUFFERED_STREAM_SEND_EXPQUEUE_Z_HPP
00020 #include <iostream>
00021 #include <boost/function.hpp>
00022 #include <boost/bind.hpp>
00023 #include <boost/type_traits/is_base_of.hpp>
00024 #include <zlib.h>
00025 #include <graphlab/rpc/dc_internal_types.hpp>
00026 #include <graphlab/rpc/dc_types.hpp>
00027 #include <graphlab/rpc/dc_comm_base.hpp>
00028 #include <graphlab/rpc/dc_send.hpp>
00029 #include <graphlab/parallel/pthread_tools.hpp>
00030 #include <graphlab/util/blocking_queue.hpp>
00031 #include <graphlab/logger/logger.hpp>
00032 
00033 
00034 namespace graphlab {
00035 class distributed_control;
00036 
00037 namespace dc_impl {
00038 
00039 struct expqueue_z_entry{
00040   char* c;
00041   size_t len;
00042 };
00043 
00044 /**
00045    \ingroup rpc
00046   Sender for the dc class.
00047   The job of the sender is to take as input data blocks of
00048   pieces which should be sent to a single destination socket.
00049   This can be thought of as a sending end of a multiplexor.
00050   This class performs compressed ZLib transmissions and is the matching
00051   sender for dc_stream_receive_z.
00052 
00053   \ref dc_stream_receive_z
00054 */
00055 
00056 class dc_buffered_stream_send_expqueue_z: public dc_send{
00057  public:
00058   dc_buffered_stream_send_expqueue_z(distributed_control* dc, dc_comm_base *comm, procid_t target, bool zlib = false): dc(dc), 
00059                                     comm(comm), target(target), done(false) { 
00060   
00061     zstrm.zalloc = Z_NULL;
00062     zstrm.zfree = Z_NULL;
00063     zstrm.opaque = Z_NULL;
00064     ASSERT_TRUE(deflateInit(&zstrm, 1) == Z_OK); // level 6 out of 0-9 compression
00065     thr = launch_in_new_thread(boost::bind(&dc_buffered_stream_send_expqueue_z::send_loop, 
00066                                       this));
00067   }
00068   
00069   ~dc_buffered_stream_send_expqueue_z() {
00070   }
00071   
00072 
00073   inline bool channel_active(procid_t target) const {
00074     return comm->channel_active(target);
00075   }
00076 
00077   /**
00078    Called by the controller when there is data to send.
00079    if len is -1, the function has to compute the length by itself,
00080    or send the data from the stream directly. the strm is not copyable.
00081   */
00082   void send_data(procid_t target, 
00083                  unsigned char packet_type_mask,
00084                  std::istream &istrm,
00085                  size_t len = size_t(-1));
00086                  
00087   /** Another possible interface the controller can
00088   call with when there is data to send. The caller has
00089   responsibility for freeing the pointer when this call returns*/
00090   void send_data(procid_t target, 
00091                  unsigned char packet_type_mask,
00092                  char* data, size_t len);
00093 
00094   void send_loop();
00095   
00096 
00097   void shutdown();
00098   
00099   inline size_t bytes_sent() {
00100     return bytessent.value;
00101   }
00102 
00103  private:
00104   /// pointer to the owner
00105   distributed_control* dc;
00106   dc_comm_base *comm;
00107   procid_t target;
00108 
00109   blocking_queue<expqueue_z_entry> sendqueue;
00110 
00111   thread thr;
00112   bool done;
00113   atomic<size_t> bytessent;
00114   
00115   z_stream zstrm;
00116 
00117 };
00118 
00119 
00120 
00121 } // namespace dc_impl
00122 } // namespace graphlab
00123 #endif // DC_BUFFERED_STREAM_SEND_EXPQUEUE_HPP
00124