diff options
-rw-r--r-- | gr-zeromq/lib/pull_source_impl.cc | 22 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.cc | 36 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 19 | ||||
-rw-r--r-- | gr-zeromq/lib/req_source_impl.cc | 21 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.cc | 25 |
5 files changed, 67 insertions, 56 deletions
diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc index 87b330a862..3215096f56 100644 --- a/gr-zeromq/lib/pull_source_impl.cc +++ b/gr-zeromq/lib/pull_source_impl.cc @@ -46,11 +46,14 @@ namespace gr { { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); d_socket->connect (address); @@ -86,25 +89,22 @@ namespace gr { // check header for tags... std::string buf(static_cast<char*>(msg.data()), msg.size()); if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } - } - + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + buf = parse_tag_header(buf, rcv_offset, tags); + for(size_t i=0; i<tags.size(); i++){ + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } // Copy to ouput buffer and return if (buf.size() >= d_itemsize*d_vlen*noutput_items) { memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items); - return noutput_items; } else { memcpy(out, (void *)&buf[0], buf.size()); - return buf.size()/(d_itemsize*d_vlen); } } diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index 4cc9ab9c2a..677de1096e 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -46,11 +46,14 @@ namespace gr { { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); d_socket->bind (address); @@ -71,32 +74,33 @@ namespace gr { const char *in = (const char *) input_items[0]; zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; - zmq::poll (&itemsout[0], 1, d_timeout); + zmq::poll(&itemsout[0], 1, d_timeout); // If we got a reply, process if (itemsout[0].revents & ZMQ_POLLOUT) { - // encode the current offset, # tags, and tags into header - std::string header(""); - - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); + // encode the current offset, # tags, and tags into header + std::string header(""); + + if(d_pass_tags){ + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header(offset, tags); } - // create message copy and send - zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); - if(d_pass_tags) - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); - d_socket->send(msg); + // create message copy and send + zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); + + if(d_pass_tags) + memcpy((void*) msg.data(), header.c_str(), header.length()); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); + d_socket->send(msg); return noutput_items; } else { - return 0; + return 0; // FIXME: when scheduler supports return blocking } } diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 88ed6c11c0..85f9a786c9 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -46,11 +46,14 @@ namespace gr { { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_REP); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); d_socket->bind (address); @@ -71,7 +74,7 @@ namespace gr { const char *in = (const char *) input_items[0]; zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, d_timeout); + zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { @@ -80,20 +83,20 @@ namespace gr { d_socket->recv(&request); int req_output_items = *(static_cast<int*>(request.data())); int nitems_send = std::min(noutput_items, req_output_items); - + // encode the current offset, # tags, and tags into header std::string header(""); if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); - } + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header( offset, tags ); + } // create message copy and send zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send); if(d_pass_tags) - memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((void*) msg.data(), header.c_str(), header.length() ); memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send); d_socket->send(msg); diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index 5c5071e15e..f69d447f98 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -46,11 +46,14 @@ namespace gr { { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); d_socket->connect (address); @@ -82,7 +85,7 @@ namespace gr { } zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&itemsin[0], 1, d_timeout); + zmq::poll(&itemsin[0], 1, d_timeout); // If we got a reply, process if (itemsin[0].revents & ZMQ_POLLIN) { @@ -94,14 +97,14 @@ namespace gr { std::string buf(static_cast<char*>(reply.data()), reply.size()); if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } - } + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + buf = parse_tag_header(buf, rcv_offset, tags); + for(size_t i=0; i<tags.size(); i++){ + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } // Copy to ouput buffer and return diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 813ff5a1c0..1242688a90 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -46,12 +46,14 @@ namespace gr { { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_SUB); - //int time = 0; + d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); d_socket->connect (address); } @@ -87,17 +89,16 @@ namespace gr { std::string buf(static_cast<char*>(msg.data()), msg.size()); if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - //int olen = buf.size(); - buf = parse_tag_header(buf, rcv_offset, tags); - //std::cout << "SUB: Header Len = " << olen - buf.size() << ", data len = " << buf.size() << "\n"; - for(size_t i=0; i<tags.size(); i++){ - //std::cout << "add item tag ... (offset = " << tags[i].offset << " rcv_offset = " << rcv_offset << " nitems_read(0) = " << nitems_written(0) << "\n"; - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } - } + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + + buf = parse_tag_header(buf, rcv_offset, tags); + + for(size_t i=0; i<tags.size(); i++) { + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } // Copy to ouput buffer and return if (buf.size() >= d_itemsize*d_vlen*noutput_items) { |