diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 18:31:03 -0400 |
---|---|---|
committer | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 18:31:03 -0400 |
commit | 169a6c796d7fd3ff5dfc5114e38e6a05000068d5 (patch) | |
tree | f26815fd6dbd17fcee560413393d1df1bdb5feb5 /gr-zeromq/lib/sub_source_impl.cc | |
parent | e8cfb9953c5daf77ac249804e014f28c10523631 (diff) |
zmq: hoisting tag parsing into helper function, should now be easy to use in other zmq blocks
Diffstat (limited to 'gr-zeromq/lib/sub_source_impl.cc')
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.cc | 37 |
1 files changed, 13 insertions, 24 deletions
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 6c9da1bbbe..9276a5e281 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -26,6 +26,7 @@ #include <gnuradio/io_signature.h> #include "sub_source_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { @@ -83,38 +84,26 @@ namespace gr { d_socket->recv(&msg); // Deserialize header data / tags - std::istringstream iss( std::string(static_cast<char*>(msg.data()), msg.size())); + std::string buf(static_cast<char*>(msg.data()), msg.size()); if(d_pass_tags){ - uint64_t rcv_offset; - size_t rcv_ntags; - iss.read( (char*)&rcv_offset, sizeof(uint64_t ) ); - iss.read( (char*)&rcv_ntags, sizeof(size_t ) ); - for(size_t i=0; i<rcv_ntags; i++){ - uint64_t tag_offset; - iss.read( (char*)&tag_offset, sizeof(uint64_t ) ); - std::stringbuf sb( iss.str() ); - pmt::pmt_t key = pmt::deserialize( sb ); - pmt::pmt_t val = pmt::deserialize( sb ); - pmt::pmt_t src = pmt::deserialize( sb ); - uint64_t new_tag_offset = tag_offset + nitems_read(0) - rcv_offset; - add_item_tag(0, new_tag_offset, key, val, src); - iss.str(sb.str()); + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + buf = parse_tag_header(buf, rcv_offset, tags); + for(size_t i=0; i<tags.size(); i++){ + tags[i].offset -= rcv_offset - nitems_read(0); + add_item_tag(0, tags[i]); + } } - } - - // Pass sample data along - std::vector<char> samp(iss.gcount()); - iss.read( &samp[0], iss.gcount() ); // Copy to ouput buffer and return - if (samp.size() >= d_itemsize*d_vlen*noutput_items) { - memcpy(out, (void *)&samp[0], d_itemsize*d_vlen*noutput_items); + if (buf.size() >= d_itemsize*d_vlen*noutput_items) { + memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items); return noutput_items; } else { - memcpy(out, (void *)&samp[0], samp.size()); - return samp.size()/(d_itemsize*d_vlen); + memcpy(out, (void *)&buf[0], buf.size()); + return buf.size()/(d_itemsize*d_vlen); } } else { |