diff options
author | Andriy Gelman <andriy.gelman@gmail.com> | 2020-01-12 01:54:44 -0500 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2020-03-02 18:15:24 -0500 |
commit | 1b362a2336f6f9b0db57021e280ff65e32409905 (patch) | |
tree | c8fcaef950c4661a652b90862e34a9629bdc42ae /gr-zeromq/lib | |
parent | 5f18a419e2cec9f46d67147cae957d304d7a0195 (diff) |
gr-zeromq: Add optional key filtering
Fixes #2236
Allows to filter a multi-part message by key/topic on a sub source and
insert key/topic on a pub sink.
Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 45 | ||||
-rw-r--r-- | gr-zeromq/lib/base_impl.h | 14 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.cc | 22 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.h | 3 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.cc | 24 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.h | 3 |
6 files changed, 87 insertions, 24 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index cae6c0af82..67f800274f 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -19,8 +19,13 @@ namespace gr { namespace zeromq { -base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) - : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) +base_impl::base_impl(int type, + size_t itemsize, + size_t vlen, + int timeout, + bool pass_tags, + const std::string& key) + : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key) { /* "Fix" timeout value (ms for new API, us for old API) */ int major, minor, patch; @@ -57,8 +62,9 @@ base_sink_impl::base_sink_impl(int type, char* address, int timeout, bool pass_tags, - int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags) + int hwm, + const std::string& key) + : base_impl(type, itemsize, vlen, timeout, pass_tags, key) { /* Set high watermark */ if (hwm >= 0) { @@ -78,6 +84,16 @@ int base_sink_impl::send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset) { + /* Send key if it exists */ + if (d_key.size() > 0) { + zmq::message_t key_message(d_key.size()); + memcpy(key_message.data(), d_key.data(), d_key.size()); +#if USE_NEW_CPPZMQ_SEND_RECV + d_socket->send(key_message, zmq::send_flags::sndmore); +#else + d_socket->send(key_message, ZMQ_SNDMORE); +#endif + } /* Meta-data header */ std::string header(""); if (d_pass_tags) { @@ -115,8 +131,9 @@ base_source_impl::base_source_impl(int type, char* address, int timeout, bool pass_tags, - int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags), + int hwm, + const std::string& key) + : base_impl(type, itemsize, vlen, timeout, pass_tags, key), d_consumed_bytes(0), d_consumed_items(0) { @@ -192,6 +209,22 @@ bool base_source_impl::load_message(bool wait) d_socket->recv(&d_msg); #endif + /* Throw away key and get the first message. Avoid blocking if a multi-part + * message is not sent */ + if (d_key.size() > 0 && !more) { + int64_t is_multipart; + d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); + + d_msg.rebuild(); + if (is_multipart) +#if USE_NEW_CPPZMQ_SEND_RECV + d_socket->recv(d_msg); +#else + d_socket->recv(&d_msg); +#endif + else + return false; + } /* Parse header from the first (or only) message of a multi-part message */ if (d_pass_tags && !more) { uint64_t rcv_offset; diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h index 69f2464106..c9b5245fdb 100644 --- a/gr-zeromq/lib/base_impl.h +++ b/gr-zeromq/lib/base_impl.h @@ -20,7 +20,12 @@ namespace zeromq { class base_impl : public virtual gr::sync_block { public: - base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags); + base_impl(int type, + size_t itemsize, + size_t vlen, + int timeout, + bool pass_tags, + const std::string& key = ""); virtual ~base_impl(); protected: @@ -30,6 +35,7 @@ protected: size_t d_vsize; int d_timeout; bool d_pass_tags; + const std::string d_key; }; class base_sink_impl : public base_impl @@ -41,7 +47,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key = ""); protected: int send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset); @@ -56,7 +63,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key = ""); protected: zmq::message_t d_msg; diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index 098888bade..84a7364152 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -19,19 +19,29 @@ namespace gr { namespace zeromq { -pub_sink::sptr pub_sink::make( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +pub_sink::sptr pub_sink::make(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) { return gnuradio::get_initial_sptr( - new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); + new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key)); } -pub_sink_impl::pub_sink_impl( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +pub_sink_impl::pub_sink_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) : gr::sync_block("pub_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm) + base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm, key) { /* All is delegated */ } diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h index 295380558e..c97f6c40ba 100644 --- a/gr-zeromq/lib/pub_sink_impl.h +++ b/gr-zeromq/lib/pub_sink_impl.h @@ -26,7 +26,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key); int work(int noutput_items, gr_vector_const_void_star& input_items, diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 58271a4110..7fe5eabc3c 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -19,22 +19,32 @@ namespace gr { namespace zeromq { -sub_source::sptr sub_source::make( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +sub_source::sptr sub_source::make(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) { return gnuradio::get_initial_sptr( - new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); + new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm, key)); } -sub_source_impl::sub_source_impl( - size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +sub_source_impl::sub_source_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm, + const std::string& key) : gr::sync_block("sub_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm) + base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key) { /* Subscribe */ - d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size()); } int sub_source_impl::work(int noutput_items, diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h index 78d3ddb868..08221381b0 100644 --- a/gr-zeromq/lib/sub_source_impl.h +++ b/gr-zeromq/lib/sub_source_impl.h @@ -26,7 +26,8 @@ public: char* address, int timeout, bool pass_tags, - int hwm); + int hwm, + const std::string& key); int work(int noutput_items, gr_vector_const_void_star& input_items, |