diff options
Diffstat (limited to 'gr-zeromq/lib/pull_source_impl.cc')
-rw-r--r-- | gr-zeromq/lib/pull_source_impl.cc | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc index 87b330a862..3215096f56 100644 --- a/gr-zeromq/lib/pull_source_impl.cc +++ b/gr-zeromq/lib/pull_source_impl.cc @@ -46,11 +46,14 @@ namespace gr { { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); d_socket->connect (address); @@ -86,25 +89,22 @@ namespace gr { // check header for tags... std::string buf(static_cast<char*>(msg.data()), msg.size()); if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } - } - + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + buf = parse_tag_header(buf, rcv_offset, tags); + for(size_t i=0; i<tags.size(); i++){ + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } // Copy to ouput buffer and return if (buf.size() >= d_itemsize*d_vlen*noutput_items) { memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items); - return noutput_items; } else { memcpy(out, (void *)&buf[0], buf.size()); - return buf.size()/(d_itemsize*d_vlen); } } |