diff options
Diffstat (limited to 'gr-zeromq/lib/req_source_impl.cc')
-rw-r--r-- | gr-zeromq/lib/req_source_impl.cc | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index 5c5071e15e..f69d447f98 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -46,11 +46,14 @@ namespace gr { { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); d_socket->connect (address); @@ -82,7 +85,7 @@ namespace gr { } zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&itemsin[0], 1, d_timeout); + zmq::poll(&itemsin[0], 1, d_timeout); // If we got a reply, process if (itemsin[0].revents & ZMQ_POLLIN) { @@ -94,14 +97,14 @@ namespace gr { std::string buf(static_cast<char*>(reply.data()), reply.size()); if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } - } + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + buf = parse_tag_header(buf, rcv_offset, tags); + for(size_t i=0; i<tags.size(); i++){ + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } // Copy to ouput buffer and return |