summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/sub_source_impl.cc
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-10-27 16:05:43 -0400
committerTim O'Shea <tim.oshea753@gmail.com>2014-10-27 16:05:43 -0400
commitffad094471e749351aa09cbd149d53a7a0e77a61 (patch)
treef98c377bc32b432ec4cedd8c089661953ce08bc9 /gr-zeromq/lib/sub_source_impl.cc
parent721a043aa572d71dd2122c9a108f70e3d57d301e (diff)
zmq: tags should now be serializing and deserializing correctly for pub_sink/sub_source
Diffstat (limited to 'gr-zeromq/lib/sub_source_impl.cc')
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc33
1 files changed, 27 insertions, 6 deletions
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 38ddc78e59..4cc9c25def 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -81,16 +81,37 @@ namespace gr {
// Receive data
zmq::message_t msg;
d_socket->recv(&msg);
- // Copy to ouput buffer and return
- if (msg.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items);
+ // Deserialize header data / tags
+ std::istringstream iss( std::string(static_cast<char*>(msg.data()), msg.size()));
+ uint64_t rcv_offset;
+ size_t rcv_ntags;
+ iss.read( (char*)&rcv_offset, sizeof(uint64_t ) );
+ iss.read( (char*)&rcv_ntags, sizeof(size_t ) );
+ for(size_t i=0; i<rcv_ntags; i++){
+ uint64_t tag_offset;
+ iss.read( (char*)&tag_offset, sizeof(uint64_t ) );
+ std::stringbuf sb( iss.str() );
+ pmt::pmt_t key = pmt::deserialize( sb );
+ pmt::pmt_t val = pmt::deserialize( sb );
+ pmt::pmt_t src = pmt::deserialize( sb );
+ uint64_t new_tag_offset = tag_offset + nitems_read(0) - rcv_offset;
+ add_item_tag(0, new_tag_offset, key, val, src);
+ iss.str(sb.str());
+ }
+
+ // Pass sample data along
+ std::vector<char> samp(iss.gcount());
+ iss.read( &samp[0], iss.gcount() );
+
+ // Copy to ouput buffer and return
+ if (samp.size() >= d_itemsize*d_vlen*noutput_items) {
+ memcpy(out, (void *)&samp[0], d_itemsize*d_vlen*noutput_items);
return noutput_items;
}
else {
- memcpy(out, (void *)msg.data(), msg.size());
-
- return msg.size()/(d_itemsize*d_vlen);
+ memcpy(out, (void *)&samp[0], samp.size());
+ return samp.size()/(d_itemsize*d_vlen);
}
}
else {