diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 18:15:20 -0400 |
---|---|---|
committer | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 18:15:20 -0400 |
commit | e8cfb9953c5daf77ac249804e014f28c10523631 (patch) | |
tree | 00f7624f7c395f84987103647938d0b3b5b4e9b7 /gr-zeromq | |
parent | bd90b2505476af3e07ac882ff9f810e8d457bdb6 (diff) |
zmq: hoisting header encoding to helper function
Diffstat (limited to 'gr-zeromq')
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.cc | 25 | ||||
-rw-r--r-- | gr-zeromq/lib/tag_headers.h | 24 |
2 files changed, 31 insertions, 18 deletions
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index 7b57e27ccc..43819f3fd8 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -26,6 +26,7 @@ #include <gnuradio/io_signature.h> #include "pub_sink_impl.h" +#include "tag_headers.h" #include <sstream> #include <cstring> @@ -72,31 +73,19 @@ namespace gr { const char *in = (const char *)input_items[0]; // encode the current offset, # tags, and tags into header - size_t headlen(0); - std::stringstream ss; + std::string header(""); if(d_pass_tags){ + uint64_t offset = nitems_read(0); std::vector<gr::tag_t> tags; get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - size_t ntags = tags.size(); - ss.write( reinterpret_cast< const char* >( nitems_read(0) ), sizeof(uint64_t) ); // offset - ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags - std::stringbuf sb(""); - for(size_t i=0; i<tags.size(); i++){ - ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset - sb.str(""); - pmt::serialize( tags[i].key, sb ); // key - pmt::serialize( tags[i].value, sb ); // value - pmt::serialize( tags[i].srcid, sb ); // srcid - ss.write( sb.str().c_str() , sb.str().length() ); // offset - } - headlen = ss.gcount(); + header = gen_tag_header( offset, tags ); } // create message copy and send - zmq::message_t msg(headlen + d_itemsize*d_vlen*noutput_items); + zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); if(d_pass_tags) - memcpy((void*) msg.data(), ss.str().c_str(), ss.str().length() ); - memcpy((uint8_t *)msg.data() + headlen, in, d_itemsize*d_vlen*noutput_items); + memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); d_socket->send(msg); return noutput_items; diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h new file mode 100644 index 0000000000..0a4dfee46e --- /dev/null +++ b/gr-zeromq/lib/tag_headers.h @@ -0,0 +1,24 @@ + +#include <gnuradio/io_signature.h> +#include <gnuradio/block.h> +#include <sstream> +#include <cstring> + +std::string gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags){ + std::stringstream ss; + size_t ntags = tags.size(); + ss.write( reinterpret_cast< const char* >( offset ), sizeof(uint64_t) ); // offset + ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags + std::stringbuf sb(""); + for(size_t i=0; i<tags.size(); i++){ + ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset + sb.str(""); + pmt::serialize( tags[i].key, sb ); // key + pmt::serialize( tags[i].value, sb ); // value + pmt::serialize( tags[i].srcid, sb ); // srcid + ss.write( sb.str().c_str() , sb.str().length() ); // offset + } + return ss.str(); +} + + |