summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/req_msg_source_impl.cc
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-12-30 18:42:48 +0100
committerTim O'Shea <tim.oshea753@gmail.com>2014-12-30 18:42:48 +0100
commit18944a9a761eb7c2256e4ad450b943f09664c410 (patch)
treeb9a6a5313a1a15e3f9a8f80c4b5998c35aadb8ce /gr-zeromq/lib/req_msg_source_impl.cc
parent29bd7ae09383372afddcbab22fcd99b2333e4c1e (diff)
zmq: rep/req msg blocks now working
Diffstat (limited to 'gr-zeromq/lib/req_msg_source_impl.cc')
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc99
1 files changed, 55 insertions, 44 deletions
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
index 92e26e919c..2dda15208f 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -65,56 +65,67 @@ namespace gr {
delete d_context;
}
+ bool req_msg_source_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &req_msg_source_impl::readloop , this ) );
+ return true;
+ }
+
+ bool req_msg_source_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+ void req_msg_source_impl::readloop(){
+ while(!d_finished){
+ //std::cout << "readloop\n";
+
+ zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
+ zmq::poll (&itemsout[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (itemsout[0].revents & ZMQ_POLLOUT) {
+ // Request data, FIXME non portable?
+ int nmsg = 1;
+ zmq::message_t request(sizeof(int));
+ memcpy ((void *) request.data (), &nmsg, sizeof(int));
+ d_socket->send(request);
+ //std::cout << "sent request...\n";
+ }
+
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+ //std::cout << "rx response...\n";
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+ //std::cout << "rx response... got data\n";
+
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+
+ //std::cout << "got msg...\n";
+
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ //std::cout << m << "\n";
+ message_port_pub(pmt::mp("out"), m);
+
+ } else {
+ usleep(100);
+ }
+ }
+ }
+
int
req_msg_source_impl::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
return noutput_items;
-/*
- char *out = (char*)output_items[0];
-
- zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
- zmq::poll (&itemsout[0], 1, d_timeout);
-
- // If we got a reply, process
- if (itemsout[0].revents & ZMQ_POLLOUT) {
- // Request data, FIXME non portable?
- zmq::message_t request(sizeof(int));
- memcpy ((void *) request.data (), &noutput_items, sizeof(int));
- d_socket->send(request);
- }
-
- zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&itemsin[0], 1, d_timeout);
-
- // If we got a reply, process
- if (itemsin[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t reply;
- d_socket->recv(&reply);
-
- // Deserialize header data / tags
- std::string buf(static_cast<char*>(reply.data()), reply.size());
-
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
-
-
- // Copy to ouput buffer and return
- memcpy(out, (void *)&buf[0], buf.size());
- return buf.size()/(d_itemsize*d_vlen);
- }
-
- return 0;
- */
}
} /* namespace zeromq */