summaryrefslogtreecommitdiff
path: root/gr-zeromq
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-10-27 18:15:20 -0400
committerTim O'Shea <tim.oshea753@gmail.com>2014-10-27 18:15:20 -0400
commite8cfb9953c5daf77ac249804e014f28c10523631 (patch)
tree00f7624f7c395f84987103647938d0b3b5b4e9b7 /gr-zeromq
parentbd90b2505476af3e07ac882ff9f810e8d457bdb6 (diff)
zmq: hoisting header encoding to helper function
Diffstat (limited to 'gr-zeromq')
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc25
-rw-r--r--gr-zeromq/lib/tag_headers.h24
2 files changed, 31 insertions, 18 deletions
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 7b57e27ccc..43819f3fd8 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -26,6 +26,7 @@
#include <gnuradio/io_signature.h>
#include "pub_sink_impl.h"
+#include "tag_headers.h"
#include <sstream>
#include <cstring>
@@ -72,31 +73,19 @@ namespace gr {
const char *in = (const char *)input_items[0];
// encode the current offset, # tags, and tags into header
- size_t headlen(0);
- std::stringstream ss;
+ std::string header("");
if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
std::vector<gr::tag_t> tags;
get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
- size_t ntags = tags.size();
- ss.write( reinterpret_cast< const char* >( nitems_read(0) ), sizeof(uint64_t) ); // offset
- ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags
- std::stringbuf sb("");
- for(size_t i=0; i<tags.size(); i++){
- ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset
- sb.str("");
- pmt::serialize( tags[i].key, sb ); // key
- pmt::serialize( tags[i].value, sb ); // value
- pmt::serialize( tags[i].srcid, sb ); // srcid
- ss.write( sb.str().c_str() , sb.str().length() ); // offset
- }
- headlen = ss.gcount();
+ header = gen_tag_header( offset, tags );
}
// create message copy and send
- zmq::message_t msg(headlen + d_itemsize*d_vlen*noutput_items);
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
if(d_pass_tags)
- memcpy((void*) msg.data(), ss.str().c_str(), ss.str().length() );
- memcpy((uint8_t *)msg.data() + headlen, in, d_itemsize*d_vlen*noutput_items);
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items);
d_socket->send(msg);
return noutput_items;
diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h
new file mode 100644
index 0000000000..0a4dfee46e
--- /dev/null
+++ b/gr-zeromq/lib/tag_headers.h
@@ -0,0 +1,24 @@
+
+#include <gnuradio/io_signature.h>
+#include <gnuradio/block.h>
+#include <sstream>
+#include <cstring>
+
+std::string gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags){
+ std::stringstream ss;
+ size_t ntags = tags.size();
+ ss.write( reinterpret_cast< const char* >( offset ), sizeof(uint64_t) ); // offset
+ ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags
+ std::stringbuf sb("");
+ for(size_t i=0; i<tags.size(); i++){
+ ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset
+ sb.str("");
+ pmt::serialize( tags[i].key, sb ); // key
+ pmt::serialize( tags[i].value, sb ); // value
+ pmt::serialize( tags[i].srcid, sb ); // srcid
+ ss.write( sb.str().c_str() , sb.str().length() ); // offset
+ }
+ return ss.str();
+}
+
+