00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef DC_BUFFERED_STREAM_SEND_EXPQUEUE2_HPP
00019 #define DC_BUFFERED_STREAM_SEND_EXPQUEUE2_HPP
00020 #include <iostream>
00021 #include <boost/function.hpp>
00022 #include <boost/bind.hpp>
00023 #include <boost/type_traits/is_base_of.hpp>
00024 #include <graphlab/rpc/dc_internal_types.hpp>
00025 #include <graphlab/rpc/dc_types.hpp>
00026 #include <graphlab/rpc/dc_comm_base.hpp>
00027 #include <graphlab/rpc/dc_send.hpp>
00028 #include <graphlab/parallel/pthread_tools.hpp>
00029 #include <graphlab/util/blocking_queue.hpp>
00030 #include <graphlab/logger/logger.hpp>
00031 namespace graphlab {
00032 class distributed_control;
00033
00034 namespace dc_impl {
00035
00036 struct expqueue2_entry{
00037 procid_t target;
00038 char* c;
00039 size_t len;
00040 };
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 class dc_buffered_stream_send_expqueue2: public dc_send{
00058 public:
00059 dc_buffered_stream_send_expqueue2(distributed_control* dc, dc_comm_base *comm): dc(dc),
00060 comm(comm), done(false) {
00061 thr = launch_in_new_thread(boost::bind(&dc_buffered_stream_send_expqueue2::send_loop,
00062 this));
00063 }
00064
00065 ~dc_buffered_stream_send_expqueue2() {
00066 }
00067
00068
00069 inline bool channel_active(procid_t target) const {
00070 return comm->channel_active(target);
00071 }
00072
00073
00074
00075
00076
00077
00078 void send_data(procid_t target,
00079 unsigned char packet_type_mask,
00080 std::istream &istrm,
00081 size_t len = size_t(-1));
00082
00083
00084
00085
00086 void send_data(procid_t target,
00087 unsigned char packet_type_mask,
00088 char* data, size_t len);
00089
00090 void send_loop();
00091
00092
00093 void shutdown();
00094
00095 inline size_t bytes_sent() {
00096 return bytessent.value;
00097 }
00098
00099 private:
00100
00101 distributed_control* dc;
00102 dc_comm_base *comm;
00103
00104 blocking_queue<expqueue2_entry> sendqueue;
00105
00106 thread thr;
00107 bool done;
00108 atomic<size_t> bytessent;
00109
00110 };
00111
00112
00113
00114 }
00115 }
00116 #endif // DC_BUFFERED_STREAM_SEND_EXPQUEUE_HPP
00117