diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 21:31:33 -0400 |
---|---|---|
committer | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 21:31:33 -0400 |
commit | cb4e8856fdb7b1d0f30d01852d57566efe692cd2 (patch) | |
tree | 5ef5729f9c24b6b1fe9377cf0f56acace7a8ca95 | |
parent | b161a35d47f4143b8fe460fee9ffd38a348d08e3 (diff) |
zmq: sync blocks now all support tag headers
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/push_sink.h | 2 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/rep_sink.h | 2 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.cc | 2 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.cc | 30 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.h | 3 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 38 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.h | 3 |
7 files changed, 49 insertions, 31 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_sink.h index b54a1e40d8..1b8999e409 100644 --- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h @@ -55,7 +55,7 @@ namespace gr { * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h index 1da325257f..6d3c47b626 100644 --- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h @@ -53,7 +53,7 @@ namespace gr { * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index 43819f3fd8..45739900b3 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -27,8 +27,6 @@ #include <gnuradio/io_signature.h> #include "pub_sink_impl.h" #include "tag_headers.h" -#include <sstream> -#include <cstring> namespace gr { namespace zeromq { diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index d949a7f95f..4cc9ab9c2a 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "push_sink_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { push_sink::sptr - push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout) + push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new push_sink_impl(itemsize, vlen, address, timeout)); + (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags)); } - push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout) + push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("push_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -74,10 +75,23 @@ namespace gr { // If we got a reply, process if (itemsout[0].revents & ZMQ_POLLOUT) { - // create message copy and send - zmq::message_t msg(d_itemsize*d_vlen*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - d_socket->send(msg); + + // encode the current offset, # tags, and tags into header + std::string header(""); + + if(d_pass_tags){ + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header( offset, tags ); + } + + // create message copy and send + zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); + if(d_pass_tags) + memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); + d_socket->send(msg); return noutput_items; } diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h index 9a10065eba..2590a7f692 100644 --- a/gr-zeromq/lib/push_sink_impl.h +++ b/gr-zeromq/lib/push_sink_impl.h @@ -37,9 +37,10 @@ namespace gr { float d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout); + push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags=false); ~push_sink_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index a8fd5881e5..88ed6c11c0 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "rep_sink_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { rep_sink::sptr - rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout) + rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new rep_sink_impl(itemsize, vlen, address, timeout)); + (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags)); } - rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout) + rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("rep_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -78,22 +79,25 @@ namespace gr { zmq::message_t request; d_socket->recv(&request); int req_output_items = *(static_cast<int*>(request.data())); + int nitems_send = std::min(noutput_items, req_output_items); + + // encode the current offset, # tags, and tags into header + std::string header(""); + if(d_pass_tags){ + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header( offset, tags ); + } // create message copy and send - if (noutput_items < req_output_items) { - zmq::message_t msg(d_itemsize*d_vlen*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - d_socket->send(msg); + zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send); + if(d_pass_tags) + memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send); + d_socket->send(msg); - return noutput_items; - } - else { - zmq::message_t msg(d_itemsize*d_vlen*req_output_items); - memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items); - d_socket->send(msg); - - return req_output_items; - } + return nitems_send; } return 0; diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h index ff69735757..68bb9eb776 100644 --- a/gr-zeromq/lib/rep_sink_impl.h +++ b/gr-zeromq/lib/rep_sink_impl.h @@ -37,9 +37,10 @@ namespace gr { int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout); + rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags=false); ~rep_sink_impl(); int work(int noutput_items, |