summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/rep_msg_sink_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/rep_msg_sink_impl.cc')
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.cc66
1 files changed, 43 insertions, 23 deletions
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc
index 72dd5fb71b..0a18a8b3d2 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -56,8 +56,8 @@ namespace gr {
d_socket->bind (address);
message_port_register_in(pmt::mp("in"));
- set_msg_handler( pmt::mp("in"),
- boost::bind(&rep_msg_sink_impl::handler, this, _1));
+// set_msg_handler( pmt::mp("in"),
+// boost::bind(&rep_msg_sink_impl::handler, this, _1));
}
rep_msg_sink_impl::~rep_msg_sink_impl()
@@ -67,6 +67,19 @@ namespace gr {
delete d_context;
}
+ bool rep_msg_sink_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop , this ) );
+ return true;
+ }
+
+ bool rep_msg_sink_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+/*
void rep_msg_sink_impl::handler(pmt::pmt_t msg){
std::stringbuf sb("");
pmt::serialize( msg, sb );
@@ -75,47 +88,54 @@ namespace gr {
memcpy( zmsg.data(), s.c_str(), s.size() );
d_socket->send(zmsg);
}
+*/
int
rep_msg_sink_impl::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- return noutput_items;
-/*
- const char *in = (const char *) input_items[0];
+ return noutput_items;
+ }
+ void rep_msg_sink_impl::readloop(){
+
+ while(!d_finished){
+
+ // while we have data, wait for query...
+ while(!empty_p(pmt::mp("in"))){
+
+ //std::cout << "wait for req ...\n";
+ // wait for query...
zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
zmq::poll (&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
+ //std::cout << "wait for req ... got req\n";
// receive data request
zmq::message_t request;
d_socket->recv(&request);
int req_output_items = *(static_cast<int*>(request.data()));
- int nitems_send = std::min(noutput_items, req_output_items);
+ if(req_output_items != 1)
+ throw std::runtime_error("Request was not 1 msg for rep/req request!!");
- // encode the current offset, # tags, and tags into header
- std::string header("");
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
- }
-
// create message copy and send
- zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
- if(d_pass_tags)
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send);
- d_socket->send(msg);
+ //std::cout << "get pmt in\n";
+ pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ //std::cout << "send pmt zmq\n";
+ d_socket->send(zmsg);
+ } // if req
- return nitems_send;
- }
+ } // while !empty
+
+ } // while !d_finished
- return 0;*/
}
} /* namespace zeromq */
} /* namespace gr */