From ffad094471e749351aa09cbd149d53a7a0e77a61 Mon Sep 17 00:00:00 2001
From: Tim O'Shea <tim.oshea753@gmail.com>
Date: Mon, 27 Oct 2014 16:05:43 -0400
Subject: zmq: tags should now be serializing and deserializing correctly for
 pub_sink/sub_source

---
 gr-zeromq/lib/sub_source_impl.cc | 33 +++++++++++++++++++++++++++------
 1 file changed, 27 insertions(+), 6 deletions(-)

(limited to 'gr-zeromq/lib/sub_source_impl.cc')

diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 38ddc78e59..4cc9c25def 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -81,16 +81,37 @@ namespace gr {
         // Receive data
         zmq::message_t msg;
         d_socket->recv(&msg);
-        // Copy to ouput buffer and return
-        if (msg.size() >= d_itemsize*d_vlen*noutput_items) {
-          memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items);
 
+        // Deserialize header data / tags
+        std::istringstream iss( std::string(static_cast<char*>(msg.data()), msg.size()));
+        uint64_t rcv_offset;
+        size_t   rcv_ntags;
+        iss.read( (char*)&rcv_offset, sizeof(uint64_t ) );
+        iss.read( (char*)&rcv_ntags,  sizeof(size_t   ) );
+        for(size_t i=0; i<rcv_ntags; i++){
+            uint64_t tag_offset;
+            iss.read( (char*)&tag_offset, sizeof(uint64_t ) );
+            std::stringbuf sb( iss.str() );
+            pmt::pmt_t key = pmt::deserialize( sb );
+            pmt::pmt_t val = pmt::deserialize( sb );
+            pmt::pmt_t src = pmt::deserialize( sb );
+            uint64_t new_tag_offset = tag_offset + nitems_read(0) - rcv_offset;
+            add_item_tag(0, new_tag_offset, key, val, src);
+            iss.str(sb.str());
+            }
+
+        // Pass sample data along
+        std::vector<char> samp(iss.gcount());
+        iss.read( &samp[0], iss.gcount() );
+
+        // Copy to ouput buffer and return
+        if (samp.size() >= d_itemsize*d_vlen*noutput_items) {
+          memcpy(out, (void *)&samp[0], d_itemsize*d_vlen*noutput_items);
           return noutput_items;
         }
         else {
-          memcpy(out, (void *)msg.data(), msg.size());
-
-          return msg.size()/(d_itemsize*d_vlen);
+          memcpy(out, (void *)&samp[0], samp.size());
+          return samp.size()/(d_itemsize*d_vlen);
         }
       }
       else {
-- 
cgit v1.2.3