diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 16:05:43 -0400 |
---|---|---|
committer | Tim O'Shea <tim.oshea753@gmail.com> | 2014-10-27 16:05:43 -0400 |
commit | ffad094471e749351aa09cbd149d53a7a0e77a61 (patch) | |
tree | f98c377bc32b432ec4cedd8c089661953ce08bc9 /gr-zeromq/lib/sub_source_impl.cc | |
parent | 721a043aa572d71dd2122c9a108f70e3d57d301e (diff) |
zmq: tags should now be serializing and deserializing correctly for pub_sink/sub_source
Diffstat (limited to 'gr-zeromq/lib/sub_source_impl.cc')
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.cc | 33 |
1 files changed, 27 insertions, 6 deletions
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 38ddc78e59..4cc9c25def 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -81,16 +81,37 @@ namespace gr { // Receive data zmq::message_t msg; d_socket->recv(&msg); - // Copy to ouput buffer and return - if (msg.size() >= d_itemsize*d_vlen*noutput_items) { - memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items); + // Deserialize header data / tags + std::istringstream iss( std::string(static_cast<char*>(msg.data()), msg.size())); + uint64_t rcv_offset; + size_t rcv_ntags; + iss.read( (char*)&rcv_offset, sizeof(uint64_t ) ); + iss.read( (char*)&rcv_ntags, sizeof(size_t ) ); + for(size_t i=0; i<rcv_ntags; i++){ + uint64_t tag_offset; + iss.read( (char*)&tag_offset, sizeof(uint64_t ) ); + std::stringbuf sb( iss.str() ); + pmt::pmt_t key = pmt::deserialize( sb ); + pmt::pmt_t val = pmt::deserialize( sb ); + pmt::pmt_t src = pmt::deserialize( sb ); + uint64_t new_tag_offset = tag_offset + nitems_read(0) - rcv_offset; + add_item_tag(0, new_tag_offset, key, val, src); + iss.str(sb.str()); + } + + // Pass sample data along + std::vector<char> samp(iss.gcount()); + iss.read( &samp[0], iss.gcount() ); + + // Copy to ouput buffer and return + if (samp.size() >= d_itemsize*d_vlen*noutput_items) { + memcpy(out, (void *)&samp[0], d_itemsize*d_vlen*noutput_items); return noutput_items; } else { - memcpy(out, (void *)msg.data(), msg.size()); - - return msg.size()/(d_itemsize*d_vlen); + memcpy(out, (void *)&samp[0], samp.size()); + return samp.size()/(d_itemsize*d_vlen); } } else { |