summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndriy Gelman <andriy.gelman@gmail.com>2020-01-12 01:54:44 -0500
committermormj <34754695+mormj@users.noreply.github.com>2020-03-02 18:15:24 -0500
commit1b362a2336f6f9b0db57021e280ff65e32409905 (patch)
treec8fcaef950c4661a652b90862e34a9629bdc42ae
parent5f18a419e2cec9f46d67147cae957d304d7a0195 (diff)
gr-zeromq: Add optional key filtering
Fixes #2236 Allows to filter a multi-part message by key/topic on a sub source and insert key/topic on a pub sink. Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
-rw-r--r--gr-zeromq/grc/zeromq_pub_sink.block.yml6
-rw-r--r--gr-zeromq/grc/zeromq_sub_source.block.yml9
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/pub_sink.h18
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/sub_source.h4
-rw-r--r--gr-zeromq/lib/base_impl.cc45
-rw-r--r--gr-zeromq/lib/base_impl.h14
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc22
-rw-r--r--gr-zeromq/lib/pub_sink_impl.h3
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc24
-rw-r--r--gr-zeromq/lib/sub_source_impl.h3
-rw-r--r--gr-zeromq/python/zeromq/qa_zeromq_pubsub.py49
-rwxr-xr-xgr-zeromq/python/zeromq/qa_zeromq_sub.py45
12 files changed, 208 insertions, 34 deletions
diff --git a/gr-zeromq/grc/zeromq_pub_sink.block.yml b/gr-zeromq/grc/zeromq_pub_sink.block.yml
index a576e265ec..1c4c42c344 100644
--- a/gr-zeromq/grc/zeromq_pub_sink.block.yml
+++ b/gr-zeromq/grc/zeromq_pub_sink.block.yml
@@ -35,6 +35,10 @@ parameters:
dtype: int
default: '-1'
hide: ${ ('part' if hwm == -1 else 'none') }
+- id: key
+ label: Filter Key
+ dtype: string
+ default: ''
inputs:
- domain: stream
@@ -44,7 +48,7 @@ inputs:
templates:
imports: from gnuradio import zeromq
make: zeromq.pub_sink(${type.itemsize}, ${vlen}, ${address}, ${timeout}, ${pass_tags},
- ${hwm})
+ ${hwm}, ${key})
cpp_templates:
includes: [ '#include <gnuradio/zeromq/pub_sink.h>' ]
diff --git a/gr-zeromq/grc/zeromq_sub_source.block.yml b/gr-zeromq/grc/zeromq_sub_source.block.yml
index 9c7431188b..6e480ad960 100644
--- a/gr-zeromq/grc/zeromq_sub_source.block.yml
+++ b/gr-zeromq/grc/zeromq_sub_source.block.yml
@@ -35,6 +35,10 @@ parameters:
dtype: int
default: '-1'
hide: ${ ('part' if hwm == -1 else 'none') }
+- id: key
+ label: Filter Key
+ dtype: string
+ default: ''
outputs:
- domain: stream
@@ -44,7 +48,7 @@ outputs:
templates:
imports: from gnuradio import zeromq
make: zeromq.sub_source(${type.itemsize}, ${vlen}, ${address}, ${timeout}, ${pass_tags},
- ${hwm})
+ ${hwm}, ${key})
cpp_templates:
includes: [ '#include <gnuradio/zeromq/sub_source.h>' ]
@@ -54,7 +58,8 @@ cpp_templates:
const_cast<char *>(${address}${'.c_str())' if str(address)[0] not in '"\'' else ')'},
${timeout},
${pass_tags},
- ${hwm});
+ ${hwm},
+ ${key});
link: ['gnuradio-zeromq']
translations:
'True': 'true'
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
index cef6a82a54..d9410c48cf 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
@@ -22,11 +22,17 @@ namespace zeromq {
* \ingroup zeromq
*
* \details
- * This block acts a a streaming sink for a GNU Radio flowgraph
- * and writes its contents to a ZMQ PUB socket. A PUB socket may
+ * This block acts as a streaming sink for a GNU Radio flowgraph
+ * and writes its contents to a ZMQ PUB socket. A PUB socket may
* have subscribers and will pass all incoming stream data to each
- * subscriber. Subscribers can be either another gr-zeromq source
- * block or a non-GNU Radio ZMQ socket.
+ * subscriber with a matching key. If the publisher's key is set to
+ * "GNURadio", the following example subscriber keys will match: "G",
+ * "GN", .., "GNURadio". In other words, the subscriber must contain
+ * the first set of characters from the publisher's key. If the subscriber
+ * sets an empty key "", it will accept all input messages from the
+ * publisher (including the key itself if one is set). Subscribers
+ * can either be another gr-zeromq source block or a non-GNU Radio
+ * ZMQ socket.
*/
class ZEROMQ_API pub_sink : virtual public gr::sync_block
{
@@ -42,13 +48,15 @@ public:
* \param timeout Receive timeout in milliseconds, default is 100ms, 1us increments.
* \param pass_tags Whether sink will serialize and pass tags over the link.
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
+ * \param key Prepend a key/topic to the start of each message (default is none)
*/
static sptr make(size_t itemsize,
size_t vlen,
char* address,
int timeout = 100,
bool pass_tags = false,
- int hwm = -1);
+ int hwm = -1,
+ const std::string& key = "");
/*!
* \brief Return a std::string of ZMQ_LAST_ENDPOINT from the underlying ZMQ socket.
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
index 88d286b3b8..c2be39e615 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
@@ -39,13 +39,15 @@ public:
* \param timeout Receive timeout in milliseconds, default is 100ms, 1us increments.
* \param pass_tags Whether source will look for and deserialize tags.
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
+ * \param key Subscriber filter key. Leave empty to pass all messages.
*/
static sptr make(size_t itemsize,
size_t vlen,
char* address,
int timeout = 100,
bool pass_tags = false,
- int hwm = -1);
+ int hwm = -1,
+ const std::string& key = "");
/*!
* \brief Return a std::string of ZMQ_LAST_ENDPOINT from the underlying ZMQ socket.
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index cae6c0af82..67f800274f 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -19,8 +19,13 @@
namespace gr {
namespace zeromq {
-base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags)
- : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+base_impl::base_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ int timeout,
+ bool pass_tags,
+ const std::string& key)
+ : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key)
{
/* "Fix" timeout value (ms for new API, us for old API) */
int major, minor, patch;
@@ -57,8 +62,9 @@ base_sink_impl::base_sink_impl(int type,
char* address,
int timeout,
bool pass_tags,
- int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags)
+ int hwm,
+ const std::string& key)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags, key)
{
/* Set high watermark */
if (hwm >= 0) {
@@ -78,6 +84,16 @@ int base_sink_impl::send_message(const void* in_buf,
const int in_nitems,
const uint64_t in_offset)
{
+ /* Send key if it exists */
+ if (d_key.size() > 0) {
+ zmq::message_t key_message(d_key.size());
+ memcpy(key_message.data(), d_key.data(), d_key.size());
+#if USE_NEW_CPPZMQ_SEND_RECV
+ d_socket->send(key_message, zmq::send_flags::sndmore);
+#else
+ d_socket->send(key_message, ZMQ_SNDMORE);
+#endif
+ }
/* Meta-data header */
std::string header("");
if (d_pass_tags) {
@@ -115,8 +131,9 @@ base_source_impl::base_source_impl(int type,
char* address,
int timeout,
bool pass_tags,
- int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags),
+ int hwm,
+ const std::string& key)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags, key),
d_consumed_bytes(0),
d_consumed_items(0)
{
@@ -192,6 +209,22 @@ bool base_source_impl::load_message(bool wait)
d_socket->recv(&d_msg);
#endif
+ /* Throw away key and get the first message. Avoid blocking if a multi-part
+ * message is not sent */
+ if (d_key.size() > 0 && !more) {
+ int64_t is_multipart;
+ d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len);
+
+ d_msg.rebuild();
+ if (is_multipart)
+#if USE_NEW_CPPZMQ_SEND_RECV
+ d_socket->recv(d_msg);
+#else
+ d_socket->recv(&d_msg);
+#endif
+ else
+ return false;
+ }
/* Parse header from the first (or only) message of a multi-part message */
if (d_pass_tags && !more) {
uint64_t rcv_offset;
diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h
index 69f2464106..c9b5245fdb 100644
--- a/gr-zeromq/lib/base_impl.h
+++ b/gr-zeromq/lib/base_impl.h
@@ -20,7 +20,12 @@ namespace zeromq {
class base_impl : public virtual gr::sync_block
{
public:
- base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags);
+ base_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ int timeout,
+ bool pass_tags,
+ const std::string& key = "");
virtual ~base_impl();
protected:
@@ -30,6 +35,7 @@ protected:
size_t d_vsize;
int d_timeout;
bool d_pass_tags;
+ const std::string d_key;
};
class base_sink_impl : public base_impl
@@ -41,7 +47,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key = "");
protected:
int send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset);
@@ -56,7 +63,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key = "");
protected:
zmq::message_t d_msg;
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 098888bade..84a7364152 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -19,19 +19,29 @@
namespace gr {
namespace zeromq {
-pub_sink::sptr pub_sink::make(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+pub_sink::sptr pub_sink::make(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
{
return gnuradio::get_initial_sptr(
- new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+ new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key));
}
-pub_sink_impl::pub_sink_impl(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+pub_sink_impl::pub_sink_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
: gr::sync_block("pub_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+ base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm, key)
{
/* All is delegated */
}
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 295380558e..c97f6c40ba 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -26,7 +26,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key);
int work(int noutput_items,
gr_vector_const_void_star& input_items,
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 58271a4110..7fe5eabc3c 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -19,22 +19,32 @@
namespace gr {
namespace zeromq {
-sub_source::sptr sub_source::make(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source::sptr sub_source::make(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
{
return gnuradio::get_initial_sptr(
- new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+ new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key));
}
-sub_source_impl::sub_source_impl(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source_impl::sub_source_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
: gr::sync_block("sub_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+ base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key)
{
/* Subscribe */
- d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+ d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size());
}
int sub_source_impl::work(int noutput_items,
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 78d3ddb868..08221381b0 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -26,7 +26,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key);
int work(int noutput_items,
gr_vector_const_void_star& input_items,
diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py
index baa2e5a206..44a936ec46 100644
--- a/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py
+++ b/gr-zeromq/python/zeromq/qa_zeromq_pubsub.py
@@ -12,8 +12,22 @@
from gnuradio import gr, gr_unittest
from gnuradio import blocks, zeromq
+import pmt
import time
+def make_tag(key, value, offset, srcid=None):
+ tag = gr.tag_t()
+ tag.key = pmt.string_to_symbol(key)
+ tag.value = pmt.to_pmt(value)
+ tag.offset = offset
+ if srcid is not None:
+ tag.srcid = pmt.to_pmt(srcid)
+ return tag
+
+def compare_tags(a, b):
+ return a.offset == b.offset and pmt.equal(a.key, b.key) and \
+ pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid)
+
class qa_zeromq_pubsub (gr_unittest.TestCase):
def setUp (self):
@@ -44,5 +58,40 @@ class qa_zeromq_pubsub (gr_unittest.TestCase):
self.send_tb.wait()
self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
+ def test_002 (self):
+ # same as test_001, but insert a tag and set key filter
+ vlen = 10
+ src_data = list(range(vlen))*100
+
+ src_tags = tuple([make_tag('key', 'val', 0, 'src'), make_tag('key', 'val', 1, 'src')])
+
+ src = blocks.vector_source_f(src_data, False, vlen, tags=src_tags)
+ zeromq_pub_sink = zeromq.pub_sink(gr.sizeof_float, vlen, "tcp://127.0.0.1:0", 0, pass_tags=True, key="filter_key")
+ address = zeromq_pub_sink.last_endpoint()
+ zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, address, 0, pass_tags=True, key="filter_key")
+ sink = blocks.vector_sink_f(vlen)
+ self.send_tb.connect(src, zeromq_pub_sink)
+ self.recv_tb.connect(zeromq_sub_source, sink)
+
+ # start both flowgraphs
+ self.recv_tb.start()
+ time.sleep(0.5)
+ self.send_tb.start()
+ time.sleep(0.5)
+ self.recv_tb.stop()
+ self.send_tb.stop()
+ self.recv_tb.wait()
+ self.send_tb.wait()
+
+ # compare data
+ self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
+
+ # compare all tags
+ rx_tags = sink.tags()
+ self.assertEqual(len(src_tags), len(rx_tags))
+
+ for in_tag, out_tag in zip(src_tags, rx_tags):
+ self.assertTrue(compare_tags(in_tag, out_tag))
+
if __name__ == '__main__':
gr_unittest.run(qa_zeromq_pubsub)
diff --git a/gr-zeromq/python/zeromq/qa_zeromq_sub.py b/gr-zeromq/python/zeromq/qa_zeromq_sub.py
index 63d8a978ef..9e82420c24 100755
--- a/gr-zeromq/python/zeromq/qa_zeromq_sub.py
+++ b/gr-zeromq/python/zeromq/qa_zeromq_sub.py
@@ -48,7 +48,6 @@ class qa_zeromq_sub (gr_unittest.TestCase):
def test_002 (self):
vlen = 10
-
# Construct multipart source data to publish
raw_data = [numpy.array(range(vlen), 'float32')*100, numpy.array(range(vlen, 2*vlen), 'float32')*100]
src_data = [a.tostring() for a in raw_data]
@@ -67,6 +66,50 @@ class qa_zeromq_sub (gr_unittest.TestCase):
expected_data = numpy.concatenate(raw_data)
self.assertFloatTuplesAlmostEqual(sink.data(), expected_data)
+ def test_003 (self):
+ # Check that message is received when correct key is used
+ # Construct multipart source data to publish
+ vlen = 10
+ raw_data = [numpy.array(range(vlen), 'float32')*100, numpy.array(range(vlen, 2*vlen), 'float32')*100]
+ src_data = [a.tostring() for a in raw_data]
+
+ src_data = [b"filter_key"] + src_data
+
+ zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, self._address, key="filter_key")
+ sink = blocks.vector_sink_f(vlen)
+ self.tb.connect(zeromq_sub_source, sink)
+
+ self.tb.start()
+ time.sleep(0.05)
+ self.pub_socket.send_multipart(src_data)
+ time.sleep(0.5)
+ self.tb.stop()
+ self.tb.wait()
+
+ # Source block will concatenate everything together
+ expected_data = numpy.concatenate(raw_data)
+ self.assertFloatTuplesAlmostEqual(sink.data(), expected_data)
+
+ def test_004 (self):
+ # Test that no message is received when wrong key is used
+ vlen = 10
+ raw_data = [numpy.array(range(vlen), 'float32')*100, numpy.array(range(vlen, 2*vlen), 'float32')*100]
+ src_data = [a.tostring() for a in raw_data]
+
+ src_data = [b"filter_key"] + src_data
+
+ zeromq_sub_source = zeromq.sub_source(gr.sizeof_float, vlen, self._address, key="wrong_filter_key")
+ sink = blocks.vector_sink_f(vlen)
+ self.tb.connect(zeromq_sub_source, sink)
+
+ self.tb.start()
+ time.sleep(0.05)
+ self.pub_socket.send_multipart(src_data)
+ time.sleep(0.5)
+ self.tb.stop()
+ self.tb.wait()
+
+ assert( len(sink.data()) == 0 )
if __name__ == '__main__':
gr_unittest.run(qa_zeromq_sub)