diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2014-12-30 18:42:48 +0100 |
---|---|---|
committer | Tim O'Shea <tim.oshea753@gmail.com> | 2014-12-30 18:42:48 +0100 |
commit | 18944a9a761eb7c2256e4ad450b943f09664c410 (patch) | |
tree | b9a6a5313a1a15e3f9a8f80c4b5998c35aadb8ce /gr-zeromq | |
parent | 29bd7ae09383372afddcbab22fcd99b2333e4c1e (diff) |
zmq: rep/req msg blocks now working
Diffstat (limited to 'gr-zeromq')
-rw-r--r-- | gr-zeromq/lib/rep_msg_sink_impl.cc | 66 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_msg_sink_impl.h | 6 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.cc | 99 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.h | 5 |
4 files changed, 108 insertions, 68 deletions
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc index 72dd5fb71b..0a18a8b3d2 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.cc +++ b/gr-zeromq/lib/rep_msg_sink_impl.cc @@ -56,8 +56,8 @@ namespace gr { d_socket->bind (address); message_port_register_in(pmt::mp("in")); - set_msg_handler( pmt::mp("in"), - boost::bind(&rep_msg_sink_impl::handler, this, _1)); +// set_msg_handler( pmt::mp("in"), +// boost::bind(&rep_msg_sink_impl::handler, this, _1)); } rep_msg_sink_impl::~rep_msg_sink_impl() @@ -67,6 +67,19 @@ namespace gr { delete d_context; } + bool rep_msg_sink_impl::start(){ + d_finished = false; + d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop , this ) ); + return true; + } + + bool rep_msg_sink_impl::stop(){ + d_finished = true; + d_thread->join(); + return true; + } + +/* void rep_msg_sink_impl::handler(pmt::pmt_t msg){ std::stringbuf sb(""); pmt::serialize( msg, sb ); @@ -75,47 +88,54 @@ namespace gr { memcpy( zmsg.data(), s.c_str(), s.size() ); d_socket->send(zmsg); } +*/ int rep_msg_sink_impl::work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - return noutput_items; -/* - const char *in = (const char *) input_items[0]; + return noutput_items; + } + void rep_msg_sink_impl::readloop(){ + + while(!d_finished){ + + // while we have data, wait for query... + while(!empty_p(pmt::mp("in"))){ + + //std::cout << "wait for req ...\n"; + // wait for query... zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; zmq::poll (&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { + //std::cout << "wait for req ... got req\n"; // receive data request zmq::message_t request; d_socket->recv(&request); int req_output_items = *(static_cast<int*>(request.data())); - int nitems_send = std::min(noutput_items, req_output_items); + if(req_output_items != 1) + throw std::runtime_error("Request was not 1 msg for rep/req request!!"); - // encode the current offset, # tags, and tags into header - std::string header(""); - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); - } - // create message copy and send - zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send); - if(d_pass_tags) - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send); - d_socket->send(msg); + //std::cout << "get pmt in\n"; + pmt::pmt_t msg = delete_head_nowait(pmt::mp("in")); + std::stringbuf sb(""); + pmt::serialize( msg, sb ); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); + memcpy( zmsg.data(), s.c_str(), s.size() ); + //std::cout << "send pmt zmq\n"; + d_socket->send(zmsg); + } // if req - return nitems_send; - } + } // while !empty + + } // while !d_finished - return 0;*/ } } /* namespace zeromq */ } /* namespace gr */ diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h b/gr-zeromq/lib/rep_msg_sink_impl.h index 40d7969472..25bd0e87a0 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.h +++ b/gr-zeromq/lib/rep_msg_sink_impl.h @@ -35,6 +35,9 @@ namespace gr { int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + boost::thread *d_thread; + bool d_finished; + void readloop(); public: rep_msg_sink_impl(char *address, int timeout); @@ -43,7 +46,8 @@ namespace gr { int work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items); - void handler(pmt::pmt_t msg); + bool start(); + bool stop(); }; } // namespace zeromq diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc index 92e26e919c..2dda15208f 100644 --- a/gr-zeromq/lib/req_msg_source_impl.cc +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -65,56 +65,67 @@ namespace gr { delete d_context; } + bool req_msg_source_impl::start(){ + d_finished = false; + d_thread = new boost::thread( boost::bind( &req_msg_source_impl::readloop , this ) ); + return true; + } + + bool req_msg_source_impl::stop(){ + d_finished = true; + d_thread->join(); + return true; + } + + void req_msg_source_impl::readloop(){ + while(!d_finished){ + //std::cout << "readloop\n"; + + zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; + zmq::poll (&itemsout[0], 1, d_timeout); + + // If we got a reply, process + if (itemsout[0].revents & ZMQ_POLLOUT) { + // Request data, FIXME non portable? + int nmsg = 1; + zmq::message_t request(sizeof(int)); + memcpy ((void *) request.data (), &nmsg, sizeof(int)); + d_socket->send(request); + //std::cout << "sent request...\n"; + } + + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll (&items[0], 1, d_timeout); + //std::cout << "rx response...\n"; + + // If we got a reply, process + if (items[0].revents & ZMQ_POLLIN) { + //std::cout << "rx response... got data\n"; + + // Receive data + zmq::message_t msg; + d_socket->recv(&msg); + + //std::cout << "got msg...\n"; + + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + //std::cout << m << "\n"; + message_port_pub(pmt::mp("out"), m); + + } else { + usleep(100); + } + } + } + int req_msg_source_impl::work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { return noutput_items; -/* - char *out = (char*)output_items[0]; - - zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; - zmq::poll (&itemsout[0], 1, d_timeout); - - // If we got a reply, process - if (itemsout[0].revents & ZMQ_POLLOUT) { - // Request data, FIXME non portable? - zmq::message_t request(sizeof(int)); - memcpy ((void *) request.data (), &noutput_items, sizeof(int)); - d_socket->send(request); - } - - zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&itemsin[0], 1, d_timeout); - - // If we got a reply, process - if (itemsin[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t reply; - d_socket->recv(&reply); - - // Deserialize header data / tags - std::string buf(static_cast<char*>(reply.data()), reply.size()); - - if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } - } - - - // Copy to ouput buffer and return - memcpy(out, (void *)&buf[0], buf.size()); - return buf.size()/(d_itemsize*d_vlen); - } - - return 0; - */ } } /* namespace zeromq */ diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h index 635fa45b62..3a691743b5 100644 --- a/gr-zeromq/lib/req_msg_source_impl.h +++ b/gr-zeromq/lib/req_msg_source_impl.h @@ -35,11 +35,16 @@ namespace gr { int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + void readloop(); + boost::thread *d_thread; public: req_msg_source_impl(char *address, int timeout); ~req_msg_source_impl(); + bool start(); + bool stop(); + bool d_finished; int work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items); |