summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/sub_source_impl.cc
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-10-27 18:31:03 -0400
committerTim O'Shea <tim.oshea753@gmail.com>2014-10-27 18:31:03 -0400
commit169a6c796d7fd3ff5dfc5114e38e6a05000068d5 (patch)
treef26815fd6dbd17fcee560413393d1df1bdb5feb5 /gr-zeromq/lib/sub_source_impl.cc
parente8cfb9953c5daf77ac249804e014f28c10523631 (diff)
zmq: hoisting tag parsing into helper function, should now be easy to use in other zmq blocks
Diffstat (limited to 'gr-zeromq/lib/sub_source_impl.cc')
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc37
1 files changed, 13 insertions, 24 deletions
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 6c9da1bbbe..9276a5e281 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -26,6 +26,7 @@
#include <gnuradio/io_signature.h>
#include "sub_source_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
@@ -83,38 +84,26 @@ namespace gr {
d_socket->recv(&msg);
// Deserialize header data / tags
- std::istringstream iss( std::string(static_cast<char*>(msg.data()), msg.size()));
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
if(d_pass_tags){
- uint64_t rcv_offset;
- size_t rcv_ntags;
- iss.read( (char*)&rcv_offset, sizeof(uint64_t ) );
- iss.read( (char*)&rcv_ntags, sizeof(size_t ) );
- for(size_t i=0; i<rcv_ntags; i++){
- uint64_t tag_offset;
- iss.read( (char*)&tag_offset, sizeof(uint64_t ) );
- std::stringbuf sb( iss.str() );
- pmt::pmt_t key = pmt::deserialize( sb );
- pmt::pmt_t val = pmt::deserialize( sb );
- pmt::pmt_t src = pmt::deserialize( sb );
- uint64_t new_tag_offset = tag_offset + nitems_read(0) - rcv_offset;
- add_item_tag(0, new_tag_offset, key, val, src);
- iss.str(sb.str());
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+ buf = parse_tag_header(buf, rcv_offset, tags);
+ for(size_t i=0; i<tags.size(); i++){
+ tags[i].offset -= rcv_offset - nitems_read(0);
+ add_item_tag(0, tags[i]);
+ }
}
- }
-
- // Pass sample data along
- std::vector<char> samp(iss.gcount());
- iss.read( &samp[0], iss.gcount() );
// Copy to ouput buffer and return
- if (samp.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)&samp[0], d_itemsize*d_vlen*noutput_items);
+ if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
+ memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
return noutput_items;
}
else {
- memcpy(out, (void *)&samp[0], samp.size());
- return samp.size()/(d_itemsize*d_vlen);
+ memcpy(out, (void *)&buf[0], buf.size());
+ return buf.size()/(d_itemsize*d_vlen);
}
}
else {