summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/sub_source_impl.cc
diff options
context:
space:
mode:
authorAndriy Gelman <andriy.gelman@gmail.com>2020-01-12 01:54:44 -0500
committermormj <34754695+mormj@users.noreply.github.com>2020-03-02 18:15:24 -0500
commit1b362a2336f6f9b0db57021e280ff65e32409905 (patch)
treec8fcaef950c4661a652b90862e34a9629bdc42ae /gr-zeromq/lib/sub_source_impl.cc
parent5f18a419e2cec9f46d67147cae957d304d7a0195 (diff)
gr-zeromq: Add optional key filtering
Fixes #2236 Allows to filter a multi-part message by key/topic on a sub source and insert key/topic on a pub sink. Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
Diffstat (limited to 'gr-zeromq/lib/sub_source_impl.cc')
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc24
1 files changed, 17 insertions, 7 deletions
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 58271a4110..7fe5eabc3c 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -19,22 +19,32 @@
namespace gr {
namespace zeromq {
-sub_source::sptr sub_source::make(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source::sptr sub_source::make(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
{
return gnuradio::get_initial_sptr(
- new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+ new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key));
}
-sub_source_impl::sub_source_impl(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source_impl::sub_source_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
: gr::sync_block("sub_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+ base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key)
{
/* Subscribe */
- d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+ d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size());
}
int sub_source_impl::work(int noutput_items,