00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef DC_BUFFERED_STREAM_SEND_EXPQUEUE_Z_HPP
00019 #define DC_BUFFERED_STREAM_SEND_EXPQUEUE_Z_HPP
00020 #include <iostream>
00021 #include <boost/function.hpp>
00022 #include <boost/bind.hpp>
00023 #include <boost/type_traits/is_base_of.hpp>
00024 #include <zlib.h>
00025 #include <graphlab/rpc/dc_internal_types.hpp>
00026 #include <graphlab/rpc/dc_types.hpp>
00027 #include <graphlab/rpc/dc_comm_base.hpp>
00028 #include <graphlab/rpc/dc_send.hpp>
00029 #include <graphlab/parallel/pthread_tools.hpp>
00030 #include <graphlab/util/blocking_queue.hpp>
00031 #include <graphlab/logger/logger.hpp>
00032
00033
00034 namespace graphlab {
00035 class distributed_control;
00036
00037 namespace dc_impl {
00038
00039 struct expqueue_z_entry{
00040 char* c;
00041 size_t len;
00042 };
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 class dc_buffered_stream_send_expqueue_z: public dc_send{
00057 public:
00058 dc_buffered_stream_send_expqueue_z(distributed_control* dc, dc_comm_base *comm, procid_t target, bool zlib = false): dc(dc),
00059 comm(comm), target(target), done(false) {
00060
00061 zstrm.zalloc = Z_NULL;
00062 zstrm.zfree = Z_NULL;
00063 zstrm.opaque = Z_NULL;
00064 ASSERT_TRUE(deflateInit(&zstrm, 1) == Z_OK);
00065 thr = launch_in_new_thread(boost::bind(&dc_buffered_stream_send_expqueue_z::send_loop,
00066 this));
00067 }
00068
00069 ~dc_buffered_stream_send_expqueue_z() {
00070 }
00071
00072
00073 inline bool channel_active(procid_t target) const {
00074 return comm->channel_active(target);
00075 }
00076
00077
00078
00079
00080
00081
00082 void send_data(procid_t target,
00083 unsigned char packet_type_mask,
00084 std::istream &istrm,
00085 size_t len = size_t(-1));
00086
00087
00088
00089
00090 void send_data(procid_t target,
00091 unsigned char packet_type_mask,
00092 char* data, size_t len);
00093
00094 void send_loop();
00095
00096
00097 void shutdown();
00098
00099 inline size_t bytes_sent() {
00100 return bytessent.value;
00101 }
00102
00103 private:
00104
00105 distributed_control* dc;
00106 dc_comm_base *comm;
00107 procid_t target;
00108
00109 blocking_queue<expqueue_z_entry> sendqueue;
00110
00111 thread thr;
00112 bool done;
00113 atomic<size_t> bytessent;
00114
00115 z_stream zstrm;
00116
00117 };
00118
00119
00120
00121 }
00122 }
00123 #endif // DC_BUFFERED_STREAM_SEND_EXPQUEUE_HPP
00124