summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/base_impl.cc
diff options
context:
space:
mode:
authorBrian Orr <brian.orr@gmail.com>2017-02-12 14:22:08 -0800
committerBrian Orr <brian.orr@gmail.com>2017-04-13 09:06:40 -0700
commit6f9303b15e3570db1ff94fb856f69b7d348ba48e (patch)
treeac68822c42bec67618cb14d54852fbb200c3fece /gr-zeromq/lib/base_impl.cc
parent8a65da6f994948746decfded8ae4576ac95de1e7 (diff)
Support receiving multi-part ZeroMQ messages
ZeroMQ sink blocks will attempt to load all parts of a multi-part message before processing tags and outputting items. Allows senders to take advantage of ZeroMQ's zero-copy message delivery. Add check for incompatible data sizes between ZMQ endpoints. Fixes #1080
Diffstat (limited to 'gr-zeromq/lib/base_impl.cc')
-rw-r--r--gr-zeromq/lib/base_impl.cc17
1 files changed, 15 insertions, 2 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index f33315dd40..76baeafff2 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -165,6 +165,11 @@ namespace gr {
if (!(items[0].revents & ZMQ_POLLIN))
return false;
+ /* Is this the start or continuation of a multi-part message? */
+ int64_t more = 0;
+ size_t more_len = sizeof(more);
+ d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
+
/* Reset */
d_msg.rebuild();
d_tags.clear();
@@ -174,8 +179,8 @@ namespace gr {
/* Get the message */
d_socket->recv(&d_msg);
- /* Parse header */
- if (d_pass_tags)
+ /* Parse header from the first (or only) message of a multi-part message */
+ if (d_pass_tags && !more)
{
uint64_t rcv_offset;
@@ -188,6 +193,14 @@ namespace gr {
}
}
+ /* Each message must contain an integer mutliple of data vectors */
+ if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0)
+ {
+ throw std::runtime_error(
+ boost::str(boost::format("Incompatible vector sizes: "
+ "need a multiple of %1% bytes per message") % d_vsize));
+ }
+
/* We got one ! */
return true;
}