summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/sub_source_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/sub_source_impl.cc')
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc25
1 files changed, 13 insertions, 12 deletions
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 813ff5a1c0..1242688a90 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -46,12 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
- //int time = 0;
+
d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
d_socket->connect (address);
}
@@ -87,17 +89,16 @@ namespace gr {
std::string buf(static_cast<char*>(msg.data()), msg.size());
if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- //int olen = buf.size();
- buf = parse_tag_header(buf, rcv_offset, tags);
- //std::cout << "SUB: Header Len = " << olen - buf.size() << ", data len = " << buf.size() << "\n";
- for(size_t i=0; i<tags.size(); i++){
- //std::cout << "add item tag ... (offset = " << tags[i].offset << " rcv_offset = " << rcv_offset << " nitems_read(0) = " << nitems_written(0) << "\n";
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+
+ buf = parse_tag_header(buf, rcv_offset, tags);
+
+ for(size_t i=0; i<tags.size(); i++) {
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
// Copy to ouput buffer and return
if (buf.size() >= d_itemsize*d_vlen*noutput_items) {