diff options
author | Brian Orr <brian.orr@gmail.com> | 2017-02-12 14:22:08 -0800 |
---|---|---|
committer | Brian Orr <brian.orr@gmail.com> | 2017-04-13 09:06:40 -0700 |
commit | 6f9303b15e3570db1ff94fb856f69b7d348ba48e (patch) | |
tree | ac68822c42bec67618cb14d54852fbb200c3fece /gr-zeromq/lib/base_impl.cc | |
parent | 8a65da6f994948746decfded8ae4576ac95de1e7 (diff) |
Support receiving multi-part ZeroMQ messages
ZeroMQ sink blocks will attempt to load all parts of a multi-part message
before processing tags and outputting items. Allows senders to take
advantage of ZeroMQ's zero-copy message delivery.
Add check for incompatible data sizes between ZMQ endpoints.
Fixes #1080
Diffstat (limited to 'gr-zeromq/lib/base_impl.cc')
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 17 |
1 files changed, 15 insertions, 2 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index f33315dd40..76baeafff2 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -165,6 +165,11 @@ namespace gr { if (!(items[0].revents & ZMQ_POLLIN)) return false; + /* Is this the start or continuation of a multi-part message? */ + int64_t more = 0; + size_t more_len = sizeof(more); + d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len); + /* Reset */ d_msg.rebuild(); d_tags.clear(); @@ -174,8 +179,8 @@ namespace gr { /* Get the message */ d_socket->recv(&d_msg); - /* Parse header */ - if (d_pass_tags) + /* Parse header from the first (or only) message of a multi-part message */ + if (d_pass_tags && !more) { uint64_t rcv_offset; @@ -188,6 +193,14 @@ namespace gr { } } + /* Each message must contain an integer mutliple of data vectors */ + if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) + { + throw std::runtime_error( + boost::str(boost::format("Incompatible vector sizes: " + "need a multiple of %1% bytes per message") % d_vsize)); + } + /* We got one ! */ return true; } |