diff options
author | Andriy Gelman <andriy.gelman@gmail.com> | 2020-01-12 01:54:44 -0500 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2020-03-02 18:15:24 -0500 |
commit | 1b362a2336f6f9b0db57021e280ff65e32409905 (patch) | |
tree | c8fcaef950c4661a652b90862e34a9629bdc42ae | |
parent | 5f18a419e2cec9f46d67147cae957d304d7a0195 (diff) |
gr-zeromq: Add optional key filtering
Fixes #2236
Allows to filter a multi-part message by key/topic on a sub source and
insert key/topic on a pub sink.
Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
-rw-r--r-- | gr-zeromq/grc/zeromq_pub_sink.block.yml | 6 | ||||
-rw-r--r-- | gr-zeromq/grc/zeromq_sub_source.block.yml | 9 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/pub_sink.h | 18 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/sub_source.h | 4 | ||||
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 45 | ||||
-rw-r--r-- | gr-zeromq/lib/base_impl.h | 14 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.cc | 22 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.h | 3 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.cc | 24 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.h | 3 | ||||
-rw-r--r-- | gr-zeromq/python/zeromq/qa_zeromq_pubsub.py | 49 | ||||
-rwxr-xr-x | gr-zeromq/python/zeromq/qa_zeromq_sub.py | 45 |
12 files changed, 208 insertions, 34 deletions
diff --git a/gr-zeromq/grc/zeromq_pub_sink.block.yml b/gr-zeromq/grc/zeromq_pub_sink.block.yml index a576e265ec..1c4c42c344 100644 --- a/gr-zeromq/grc/zeromq_pub_sink.block.yml +++ b/gr-zeromq/grc/zeromq_pub_sink.block.yml @@ -35,6 +35,10 @@ parameters: dtype: int default: '-1' hide: ${ ('part' if hwm == -1 else 'none') } +- id: key + label: Filter Key + dtype: string + default: '' inputs: - domain: stream @@ -44,7 +48,7 @@ inputs: templates: imports: from gnuradio import zeromq make: zeromq.pub_sink(${type.itemsize}, ${vlen}, ${address}, ${timeout}, ${pass_tags}, - ${hwm}) + ${hwm}, ${key}) cpp_templates: includes: [ '#include <gnuradio/zeromq/pub_sink.h>' ] diff --git a/gr-zeromq/grc/zeromq_sub_source.block.yml b/gr-zeromq/grc/zeromq_sub_source.block.yml index 9c7431188b..6e480ad960 100644 --- a/gr-zeromq/grc/zeromq_sub_source.block.yml +++ b/gr-zeromq/grc/zeromq_sub_source.block.yml @@ -35,6 +35,10 @@ parameters: dtype: int default: '-1' hide: ${ ('part' if hwm == -1 else 'none') } +- id: key + label: Filter Key + dtype: string + default: '' outputs: - domain: stream @@ -44,7 +48,7 @@ outputs: templates: imports: from gnuradio import zeromq make: zeromq.sub_source(${type.itemsize}, ${vlen}, ${address}, ${timeout}, ${pass_tags}, - ${hwm}) + ${hwm}, ${key}) cpp_templates: includes: [ '#include <gnuradio/zeromq/sub_source.h>' ] @@ -54,7 +58,8 @@ cpp_templates: const_cast<char *>(${address}${'.c_str())' if str(address)[0] not in '"\'' else ')'}, ${timeout}, ${pass_tags}, - ${hwm}); + ${hwm}, + ${key}); link: ['gnuradio-zeromq'] translations: 'True': 'true' diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h index cef6a82a54..d9410c48cf 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h @@ -22,11 +22,17 @@ namespace zeromq { * \ingroup zeromq * * \details - * This block acts a a streaming sink for a GNU Radio flowgraph - * and writes its contents to a ZMQ PUB socket. A PUB socket may + * This block acts as a streaming sink for a GNU Radio flowgraph + * and writes its contents to a ZMQ PUB socket. A PUB socket may * have subscribers and will pass all incoming stream data to each - * subscriber. Subscribers can be either another gr-zeromq source - * block or a non-GNU Radio ZMQ socket. + * subscriber with a matching key. If the publisher's key is set to + * "GNURadio", the following example subscriber keys will match: "G", + * "GN", .., "GNURadio". In other words, the subscriber must contain + * the first set of characters from the publisher's key. If the subscriber + * sets an empty key "", it will accept all input messages from the + * publisher (including the key itself if one is set). Subscribers + * can either be another gr-zeromq source block or a non-GNU Radio + * ZMQ socket. */ class ZEROMQ_API pub_sink : virtual public gr::sync_block { @@ -42,13 +48,15 @@ public: * \param timeout Receive timeout in milliseconds, default is 100ms, 1us increments. * \param pass_tags Whether sink will serialize and pass tags over the link. * \param hwm High Watermark to configure the socket to (-1 => zmq's default) + * \param key Prepend a key/topic to the start of each message (default is none) */ static sptr make(size_t itemsize, size_t vlen, char* address, int timeout = 100, bool pass_tags = false, - int hwm = -1); + int hwm = -1, + const std::string& key = ""); /*! * \brief Return a std::string of ZMQ_LAST_ENDPOINT from the underlying ZMQ socket. diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_source.h index 88d286b3b8..c2be39e615 100644 --- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h @@ -39,13 +39,15 @@ public: * \param timeout Receive timeout in milliseconds, default is 100ms, 1us increments. * \param pass_tags Whether source will look for and deserialize tags. * \param hwm High Watermark to configure the socket to (-1 => zmq's default) + * \param key Subscriber filter key. Leave empty to pass all messages. */ static sptr make(size_t itemsize, size_t vlen, char* address, int timeout = 100, bool pass_tags = false, - int hwm = -1); + int hwm = -1, + const std::string& key = ""); /*! * \brief Return a std::string of ZMQ_LAST_ENDPOINT from the underlying ZMQ socket. diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index cae6c0af82..67f800274f 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -19,8 +19,13 @@ namespace gr { namespace zeromq { -base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) - : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) +base_impl::base_impl(int type, + size_t itemsize, + size_t vlen, + int timeout, + bool pass_tags, + const std::string& key) + : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key) { /* "Fix" timeout value (ms for new API, us for old API) */ int major, minor, patch; @@ -57,8 +62,9 @@ base_sink_impl::base_sink_impl(int type, char* address, int timeout, bool pass_tags, - int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags) + int hwm, + const std::string& key) + : base_impl(type, itemsize, vlen, timeout, pass_tags, key) { /* Set high watermark */ if (hwm >= 0) { @@ -78,6 +84,16 @@ int base_sink_impl::send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset) { + /* Send key if it exists */ + if (d_key.size() > 0) { + zmq::message_t key_message(d_key.size()); + memcpy(key_message.data(), d_key.data(), d_key.size()); +#if USE_NEW_CPPZMQ_SEND_RECV + d_socket->send(key_message, zmq::send_flags::sndmore); +#else + d_socket->send(key_message, ZMQ_SNDMORE); +#endif + } /* Meta-data header */ std::string header(""); if (d_pass_tags) { @@ -115,8 +131,9 @@ base_source_impl::base_source_impl(int type, char* address, int timeout, bool pass_tags, - int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags), + int hwm, + const std::string& key) + : base_impl(type, itemsize, vlen, timeout, pass_tags, key), d_consumed_bytes(0), d_consumed_items(0) { @@ -192,6 +209,22 @@ bool base_source_impl::load_message(bool wait) d_socket->recv(&d_msg); #endif + /* Throw away key and get the first message. Avoid blocking if a multi-part + * message is not sent */ + if (d_key.size() > 0 && !more) { + int64_t is_multipart; + d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); + + d_msg.rebuild(); + if (is_multipart) +#if USE_NEW_CPPZMQ_SEND_RECV + d_socket->recv(d_msg); +#else + d_socket->recv(&d_msg); +#endif + else + return false; + } /* Parse header from the first (or only) message of a multi-part message */ if (d_pass_tags && !more) { uint64_t rcv_offset; diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h index 69f2464106..c9b5245fdb 100644 --- a/gr-zeromq/lib/base_impl.h +++ b/gr-zeromq/lib/base_impl.h @@ -20,7 +20,12 @@ namespace zeromq { class base_impl : public virtual gr::sync_block { public: - base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags); + base_impl(int type, + size_t itemsize, + size_t vlen, + int timeout, + bool pass_tags, + const std::string& key = ""); virtual ~base_impl(); protected: @@ -30,6 +35,7 @@ protected: size_t d_vsize; int d_timeout; bool d_pass_tags; + const std::string d_key; }; class base_sink_impl : public base_impl @@ -41,7 +47,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key = ""); protected: int send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset); @@ -56,7 +63,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key = ""); protected: zmq::message_t d_msg; diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index 098888bade..84a7364152 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -19,19 +19,29 @@ namespace gr { namespace zeromq { -pub_sink::sptr pub_sink::make( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +pub_sink::sptr pub_sink::make(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) { return gnuradio::get_initial_sptr( - new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); + new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key)); } -pub_sink_impl::pub_sink_impl( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +pub_sink_impl::pub_sink_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) : gr::sync_block("pub_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm) + base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm, key) { /* All is delegated */ } diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h index 295380558e..c97f6c40ba 100644 --- a/gr-zeromq/lib/pub_sink_impl.h +++ b/gr-zeromq/lib/pub_sink_impl.h @@ -26,7 +26,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key); int work(int noutput_items, gr_vector_const_void_star& input_items, diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 58271a4110..7fe5eabc3c 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -19,22 +19,32 @@ namespace gr { namespace zeromq { -sub_source::sptr sub_source::make( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +sub_source::sptr sub_source::make(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) { return gnuradio::get_initial_sptr( - new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); + new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key)); } -sub_source_impl::sub_source_impl( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +sub_source_impl::sub_source_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) : gr::sync_block("sub_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm) + base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key) { /* Subscribe */ - d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size()); } int sub_source_impl::work(int noutput_items, diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h index 78d3ddb868..08221381b0 100644 --- a/gr-zeromq/lib/sub_source_impl.h +++ b/gr-zeromq/lib/sub_source_impl.h @@ -26,7 +26,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key); int work(int noutput_items, gr_vector_const_void_star& input_items, diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py index baa2e5a206..44a936ec46 100644 --- a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py +++ b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py @@ -12,8 +12,22 @@ from gnuradio import gr, gr_unittest from gnuradio import blocks, zeromq +import pmt import time +def make_tag(key, value, offset, srcid=None): + tag = gr.tag_t() + tag.key = pmt.string_to_symbol(key) + tag.value = pmt.to_pmt(value) + tag.offset = offset + if srcid is not None: + tag.srcid = pmt.to_pmt(srcid) + return tag + +def compare_tags(a, b): + return a.offset == b.offset and pmt.equal(a.key, b.key) and \ + pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid) + class qa_zeromq_pubsub (gr_unittest.TestCase): def setUp (self): @@ -44,5 +58,40 @@ class qa_zeromq_pubsub (gr_unittest.TestCase): self.send_tb.wait() self.assertFloatTuplesAlmostEqual(sink.data(), src_data) + def test_002 (self): + # same as test_001, but insert a tag and set key filter + vlen = 10 + src_data = list(range(vlen))*100 + + src_tags = tuple([make_tag('key', 'val', 0, 'src'), make_tag('key', 'val', 1, 'src')]) + + src = blocks.vector_source_f(src_data, False, vlen, tags=src_tags) + zeromq_pub_sink = zeromq.pub_sink(gr.sizeof_float, vlen, "tcp://127.0.0.1:0", 0, pass_tags=True, key="filter_key") + address = zeromq_pub_sink.last_endpoint() + zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, address, 0, pass_tags=True, key="filter_key") + sink = blocks.vector_sink_f(vlen) + self.send_tb.connect(src, zeromq_pub_sink) + self.recv_tb.connect(zeromq_sub_source, sink) + + # start both flowgraphs + self.recv_tb.start() + time.sleep(0.5) + self.send_tb.start() + time.sleep(0.5) + self.recv_tb.stop() + self.send_tb.stop() + self.recv_tb.wait() + self.send_tb.wait() + + # compare data + self.assertFloatTuplesAlmostEqual(sink.data(), src_data) + + # compare all tags + rx_tags = sink.tags() + self.assertEqual(len(src_tags), len(rx_tags)) + + for in_tag, out_tag in zip(src_tags, rx_tags): + self.assertTrue(compare_tags(in_tag, out_tag)) + if __name__ == '__main__': gr_unittest.run(qa_zeromq_pubsub) diff --git a/gr-zeromq/python/zeromq/qa_zeromq_sub.py b/gr-zeromq/python/zeromq/qa_zeromq_sub.py index 63d8a978ef..9e82420c24 100755 --- a/gr-zeromq/python/zeromq/qa_zeromq_sub.py +++ b/gr-zeromq/python/zeromq/qa_zeromq_sub.py @@ -48,7 +48,6 @@ class qa_zeromq_sub (gr_unittest.TestCase): def test_002 (self): vlen = 10 - # Construct multipart source data to publish raw_data = [numpy.array(range(vlen), 'float32')*100, numpy.array(range(vlen, 2*vlen), 'float32')*100] src_data = [a.tostring() for a in raw_data] @@ -67,6 +66,50 @@ class qa_zeromq_sub (gr_unittest.TestCase): expected_data = numpy.concatenate(raw_data) self.assertFloatTuplesAlmostEqual(sink.data(), expected_data) + def test_003 (self): + # Check that message is received when correct key is used + # Construct multipart source data to publish + vlen = 10 + raw_data = [numpy.array(range(vlen), 'float32')*100, numpy.array(range(vlen, 2*vlen), 'float32')*100] + src_data = [a.tostring() for a in raw_data] + + src_data = [b"filter_key"] + src_data + + zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, self._address, key="filter_key") + sink = blocks.vector_sink_f(vlen) + self.tb.connect(zeromq_sub_source, sink) + + self.tb.start() + time.sleep(0.05) + self.pub_socket.send_multipart(src_data) + time.sleep(0.5) + self.tb.stop() + self.tb.wait() + + # Source block will concatenate everything together + expected_data = numpy.concatenate(raw_data) + self.assertFloatTuplesAlmostEqual(sink.data(), expected_data) + + def test_004 (self): + # Test that no message is received when wrong key is used + vlen = 10 + raw_data = [numpy.array(range(vlen), 'float32')*100, numpy.array(range(vlen, 2*vlen), 'float32')*100] + src_data = [a.tostring() for a in raw_data] + + src_data = [b"filter_key"] + src_data + + zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, self._address, key="wrong_filter_key") + sink = blocks.vector_sink_f(vlen) + self.tb.connect(zeromq_sub_source, sink) + + self.tb.start() + time.sleep(0.05) + self.pub_socket.send_multipart(src_data) + time.sleep(0.5) + self.tb.stop() + self.tb.wait() + + assert( len(sink.data()) == 0 ) if __name__ == '__main__': gr_unittest.run(qa_zeromq_sub) |