summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc24
1 files changed, 22 insertions, 2 deletions
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 13f86045d7..56802039ba 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -26,6 +26,8 @@
#include <gnuradio/io_signature.h>
#include "pub_sink_impl.h"
+#include <sstream>
+#include <cstring>
namespace gr {
namespace zeromq {
@@ -69,9 +71,27 @@ namespace gr {
{
const char *in = (const char *)input_items[0];
+ // encode the current offset, # tags, and tags into header
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ std::stringstream ss;
+ size_t ntags = tags.size();
+ ss.write( reinterpret_cast< const char* >( nitems_read(0) ), sizeof(uint64_t) ); // offset
+ ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags
+ std::stringbuf sb("");
+ for(size_t i=0; i<tags.size(); i++){
+ ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset
+ sb.str("");
+ pmt::serialize( tags[i].key, sb ); // key
+ pmt::serialize( tags[i].value, sb ); // value
+ ss.write( sb.str().c_str() , sb.str().length() ); // offset
+ }
+ size_t headlen( ss.gcount() );
+
// create message copy and send
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
+ zmq::message_t msg(headlen + d_itemsize*d_vlen*noutput_items);
+ memcpy((void*) msg.data(), ss.str().c_str(), ss.str().length() );
+ memcpy((uint8_t *)msg.data() + headlen, in, d_itemsize*d_vlen*noutput_items);
d_socket->send(msg);
return noutput_items;