diff options
Diffstat (limited to 'gr-zeromq/python/zeromq/qa_zeromq_pubsub.py')
-rw-r--r-- | gr-zeromq/python/zeromq/qa_zeromq_pubsub.py | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py index baa2e5a206..44a936ec46 100644 --- a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py +++ b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py @@ -12,8 +12,22 @@ from gnuradio import gr, gr_unittest from gnuradio import blocks, zeromq +import pmt import time +def make_tag(key, value, offset, srcid=None): + tag = gr.tag_t() + tag.key = pmt.string_to_symbol(key) + tag.value = pmt.to_pmt(value) + tag.offset = offset + if srcid is not None: + tag.srcid = pmt.to_pmt(srcid) + return tag + +def compare_tags(a, b): + return a.offset == b.offset and pmt.equal(a.key, b.key) and \ + pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid) + class qa_zeromq_pubsub (gr_unittest.TestCase): def setUp (self): @@ -44,5 +58,40 @@ class qa_zeromq_pubsub (gr_unittest.TestCase): self.send_tb.wait() self.assertFloatTuplesAlmostEqual(sink.data(), src_data) + def test_002 (self): + # same as test_001, but insert a tag and set key filter + vlen = 10 + src_data = list(range(vlen))*100 + + src_tags = tuple([make_tag('key', 'val', 0, 'src'), make_tag('key', 'val', 1, 'src')]) + + src = blocks.vector_source_f(src_data, False, vlen, tags=src_tags) + zeromq_pub_sink = zeromq.pub_sink(gr.sizeof_float, vlen, "tcp://127.0.0.1:0", 0, pass_tags=True, key="filter_key") + address = zeromq_pub_sink.last_endpoint() + zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, address, 0, pass_tags=True, key="filter_key") + sink = blocks.vector_sink_f(vlen) + self.send_tb.connect(src, zeromq_pub_sink) + self.recv_tb.connect(zeromq_sub_source, sink) + + # start both flowgraphs + self.recv_tb.start() + time.sleep(0.5) + self.send_tb.start() + time.sleep(0.5) + self.recv_tb.stop() + self.send_tb.stop() + self.recv_tb.wait() + self.send_tb.wait() + + # compare data + self.assertFloatTuplesAlmostEqual(sink.data(), src_data) + + # compare all tags + rx_tags = sink.tags() + self.assertEqual(len(src_tags), len(rx_tags)) + + for in_tag, out_tag in zip(src_tags, rx_tags): + self.assertTrue(compare_tags(in_tag, out_tag)) + if __name__ == '__main__': gr_unittest.run(qa_zeromq_pubsub) |