summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-10-27 21:31:33 -0400
committerTim O'Shea <tim.oshea753@gmail.com>2014-10-27 21:31:33 -0400
commitcb4e8856fdb7b1d0f30d01852d57566efe692cd2 (patch)
tree5ef5729f9c24b6b1fe9377cf0f56acace7a8ca95
parentb161a35d47f4143b8fe460fee9ffd38a348d08e3 (diff)
zmq: sync blocks now all support tag headers
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/push_sink.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/rep_sink.h2
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc2
-rw-r--r--gr-zeromq/lib/push_sink_impl.cc30
-rw-r--r--gr-zeromq/lib/push_sink_impl.h3
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc38
-rw-r--r--gr-zeromq/lib/rep_sink_impl.h3
7 files changed, 49 insertions, 31 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index b54a1e40d8..1b8999e409 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -55,7 +55,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
index 1da325257f..6d3c47b626 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
@@ -53,7 +53,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 43819f3fd8..45739900b3 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -27,8 +27,6 @@
#include <gnuradio/io_signature.h>
#include "pub_sink_impl.h"
#include "tag_headers.h"
-#include <sstream>
-#include <cstring>
namespace gr {
namespace zeromq {
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index d949a7f95f..4cc9ab9c2a 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "push_sink_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, timeout));
+ (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags));
}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("push_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -74,10 +75,23 @@ namespace gr {
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
- // create message copy and send
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg);
+
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
+
+ // create message copy and send
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items);
+ d_socket->send(msg);
return noutput_items;
}
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 9a10065eba..2590a7f692 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
float d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags=false);
~push_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index a8fd5881e5..88ed6c11c0 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "rep_sink_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
rep_sink::sptr
- rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new rep_sink_impl(itemsize, vlen, address, timeout));
+ (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags));
}
- rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("rep_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -78,22 +79,25 @@ namespace gr {
zmq::message_t request;
d_socket->recv(&request);
int req_output_items = *(static_cast<int*>(request.data()));
+ int nitems_send = std::min(noutput_items, req_output_items);
+
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
// create message copy and send
- if (noutput_items < req_output_items) {
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg);
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send);
+ d_socket->send(msg);
- return noutput_items;
- }
- else {
- zmq::message_t msg(d_itemsize*d_vlen*req_output_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items);
- d_socket->send(msg);
-
- return req_output_items;
- }
+ return nitems_send;
}
return 0;
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index ff69735757..68bb9eb776 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
int d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags=false);
~rep_sink_impl();
int work(int noutput_items,