summaryrefslogtreecommitdiff
path: root/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/python/zeromq/qa_zeromq_pubsub.py')
-rw-r--r--gr-zeromq/python/zeromq/qa_zeromq_pubsub.py49
1 files changed, 49 insertions, 0 deletions
diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py
index baa2e5a206..44a936ec46 100644
--- a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py
+++ b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py
@@ -12,8 +12,22 @@
from gnuradio import gr, gr_unittest
from gnuradio import blocks, zeromq
+import pmt
import time
+def make_tag(key, value, offset, srcid=None):
+ tag = gr.tag_t()
+ tag.key = pmt.string_to_symbol(key)
+ tag.value = pmt.to_pmt(value)
+ tag.offset = offset
+ if srcid is not None:
+ tag.srcid = pmt.to_pmt(srcid)
+ return tag
+
+def compare_tags(a, b):
+ return a.offset == b.offset and pmt.equal(a.key, b.key) and \
+ pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid)
+
class qa_zeromq_pubsub (gr_unittest.TestCase):
def setUp (self):
@@ -44,5 +58,40 @@ class qa_zeromq_pubsub (gr_unittest.TestCase):
self.send_tb.wait()
self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
+ def test_002 (self):
+ # same as test_001, but insert a tag and set key filter
+ vlen = 10
+ src_data = list(range(vlen))*100
+
+ src_tags = tuple([make_tag('key', 'val', 0, 'src'), make_tag('key', 'val', 1, 'src')])
+
+ src = blocks.vector_source_f(src_data, False, vlen, tags=src_tags)
+ zeromq_pub_sink = zeromq.pub_sink(gr.sizeof_float, vlen, "tcp://127.0.0.1:0", 0, pass_tags=True, key="filter_key")
+ address = zeromq_pub_sink.last_endpoint()
+ zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, address, 0, pass_tags=True, key="filter_key")
+ sink = blocks.vector_sink_f(vlen)
+ self.send_tb.connect(src, zeromq_pub_sink)
+ self.recv_tb.connect(zeromq_sub_source, sink)
+
+ # start both flowgraphs
+ self.recv_tb.start()
+ time.sleep(0.5)
+ self.send_tb.start()
+ time.sleep(0.5)
+ self.recv_tb.stop()
+ self.send_tb.stop()
+ self.recv_tb.wait()
+ self.send_tb.wait()
+
+ # compare data
+ self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
+
+ # compare all tags
+ rx_tags = sink.tags()
+ self.assertEqual(len(src_tags), len(rx_tags))
+
+ for in_tag, out_tag in zip(src_tags, rx_tags):
+ self.assertTrue(compare_tags(in_tag, out_tag))
+
if __name__ == '__main__':
gr_unittest.run(qa_zeromq_pubsub)