summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib
diff options
context:
space:
mode:
authorAndriy Gelman <andriy.gelman@gmail.com>2020-01-12 01:54:44 -0500
committermormj <34754695+mormj@users.noreply.github.com>2020-03-02 18:15:24 -0500
commit1b362a2336f6f9b0db57021e280ff65e32409905 (patch)
treec8fcaef950c4661a652b90862e34a9629bdc42ae /gr-zeromq/lib
parent5f18a419e2cec9f46d67147cae957d304d7a0195 (diff)
gr-zeromq: Add optional key filtering
Fixes #2236 Allows to filter a multi-part message by key/topic on a sub source and insert key/topic on a pub sink. Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r--gr-zeromq/lib/base_impl.cc45
-rw-r--r--gr-zeromq/lib/base_impl.h14
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc22
-rw-r--r--gr-zeromq/lib/pub_sink_impl.h3
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc24
-rw-r--r--gr-zeromq/lib/sub_source_impl.h3
6 files changed, 87 insertions, 24 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index cae6c0af82..67f800274f 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -19,8 +19,13 @@
namespace gr {
namespace zeromq {
-base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags)
- : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+base_impl::base_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ int timeout,
+ bool pass_tags,
+ const std::string& key)
+ : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key)
{
/* "Fix" timeout value (ms for new API, us for old API) */
int major, minor, patch;
@@ -57,8 +62,9 @@ base_sink_impl::base_sink_impl(int type,
char* address,
int timeout,
bool pass_tags,
- int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags)
+ int hwm,
+ const std::string& key)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags, key)
{
/* Set high watermark */
if (hwm >= 0) {
@@ -78,6 +84,16 @@ int base_sink_impl::send_message(const void* in_buf,
const int in_nitems,
const uint64_t in_offset)
{
+ /* Send key if it exists */
+ if (d_key.size() > 0) {
+ zmq::message_t key_message(d_key.size());
+ memcpy(key_message.data(), d_key.data(), d_key.size());
+#if USE_NEW_CPPZMQ_SEND_RECV
+ d_socket->send(key_message, zmq::send_flags::sndmore);
+#else
+ d_socket->send(key_message, ZMQ_SNDMORE);
+#endif
+ }
/* Meta-data header */
std::string header("");
if (d_pass_tags) {
@@ -115,8 +131,9 @@ base_source_impl::base_source_impl(int type,
char* address,
int timeout,
bool pass_tags,
- int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags),
+ int hwm,
+ const std::string& key)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags, key),
d_consumed_bytes(0),
d_consumed_items(0)
{
@@ -192,6 +209,22 @@ bool base_source_impl::load_message(bool wait)
d_socket->recv(&d_msg);
#endif
+ /* Throw away key and get the first message. Avoid blocking if a multi-part
+ * message is not sent */
+ if (d_key.size() > 0 && !more) {
+ int64_t is_multipart;
+ d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len);
+
+ d_msg.rebuild();
+ if (is_multipart)
+#if USE_NEW_CPPZMQ_SEND_RECV
+ d_socket->recv(d_msg);
+#else
+ d_socket->recv(&d_msg);
+#endif
+ else
+ return false;
+ }
/* Parse header from the first (or only) message of a multi-part message */
if (d_pass_tags && !more) {
uint64_t rcv_offset;
diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h
index 69f2464106..c9b5245fdb 100644
--- a/gr-zeromq/lib/base_impl.h
+++ b/gr-zeromq/lib/base_impl.h
@@ -20,7 +20,12 @@ namespace zeromq {
class base_impl : public virtual gr::sync_block
{
public:
- base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags);
+ base_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ int timeout,
+ bool pass_tags,
+ const std::string& key = "");
virtual ~base_impl();
protected:
@@ -30,6 +35,7 @@ protected:
size_t d_vsize;
int d_timeout;
bool d_pass_tags;
+ const std::string d_key;
};
class base_sink_impl : public base_impl
@@ -41,7 +47,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key = "");
protected:
int send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset);
@@ -56,7 +63,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key = "");
protected:
zmq::message_t d_msg;
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 098888bade..84a7364152 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -19,19 +19,29 @@
namespace gr {
namespace zeromq {
-pub_sink::sptr pub_sink::make(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+pub_sink::sptr pub_sink::make(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
{
return gnuradio::get_initial_sptr(
- new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+ new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key));
}
-pub_sink_impl::pub_sink_impl(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+pub_sink_impl::pub_sink_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
: gr::sync_block("pub_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+ base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm, key)
{
/* All is delegated */
}
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 295380558e..c97f6c40ba 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -26,7 +26,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key);
int work(int noutput_items,
gr_vector_const_void_star& input_items,
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 58271a4110..7fe5eabc3c 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -19,22 +19,32 @@
namespace gr {
namespace zeromq {
-sub_source::sptr sub_source::make(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source::sptr sub_source::make(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
{
return gnuradio::get_initial_sptr(
- new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+ new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key));
}
-sub_source_impl::sub_source_impl(
- size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+sub_source_impl::sub_source_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm,
+ const std::string& key)
: gr::sync_block("sub_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+ base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key)
{
/* Subscribe */
- d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+ d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size());
}
int sub_source_impl::work(int noutput_items,
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 78d3ddb868..08221381b0 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -26,7 +26,8 @@ public:
char* address,
int timeout,
bool pass_tags,
- int hwm);
+ int hwm,
+ const std::string& key);
int work(int noutput_items,
gr_vector_const_void_star& input_items,