From 1b362a2336f6f9b0db57021e280ff65e32409905 Mon Sep 17 00:00:00 2001
From: Andriy Gelman <andriy.gelman@gmail.com>
Date: Sun, 12 Jan 2020 01:54:44 -0500
Subject: gr-zeromq: Add optional key filtering

Fixes #2236

Allows to filter a multi-part message by key/topic on a sub source and
insert key/topic on a pub sink.

Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
---
 gr-zeromq/lib/sub_source_impl.cc | 24 +++++++++++++++++-------
 1 file changed, 17 insertions(+), 7 deletions(-)

(limited to 'gr-zeromq/lib/sub_source_impl.cc')

diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 58271a4110..7fe5eabc3c 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -19,22 +19,32 @@
 namespace gr {
 namespace zeromq {
 
-sub_source::sptr sub_source::make(
-    size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source::sptr sub_source::make(size_t itemsize,
+                                  size_t vlen,
+                                  char* address,
+                                  int timeout,
+                                  bool pass_tags,
+                                  int hwm,
+                                  const std::string& key)
 {
     return gnuradio::get_initial_sptr(
-        new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+        new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key));
 }
 
-sub_source_impl::sub_source_impl(
-    size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source_impl::sub_source_impl(size_t itemsize,
+                                 size_t vlen,
+                                 char* address,
+                                 int timeout,
+                                 bool pass_tags,
+                                 int hwm,
+                                 const std::string& key)
     : gr::sync_block("sub_source",
                      gr::io_signature::make(0, 0, 0),
                      gr::io_signature::make(1, 1, itemsize * vlen)),
-      base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+      base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key)
 {
     /* Subscribe */
-    d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+    d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size());
 }
 
 int sub_source_impl::work(int noutput_items,
-- 
cgit v1.2.3