summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2021-02-12 15:55:43 +0100
committerMartin Braun <martin@gnuradio.org>2021-02-15 01:11:10 -0800
commitec3d116546aa9710b324b1713cea058d80c906a8 (patch)
treee5f17655468142d8dbed1becab68a4076f017cca /gr-zeromq/lib
parent0351066c8b2398eb5894d89fd8ed37cdab43368d (diff)
zeromq: Fix warnings with recv()
The recv() call on a ZMQ socket produces a warning if the return value is not stored. We follow the advice and check the return value, just in case. Signed-off-by: Martin Braun <martin.braun@ettus.com>
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r--gr-zeromq/lib/base_impl.cc30
-rw-r--r--gr-zeromq/lib/pull_msg_source_impl.cc16
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.cc9
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc9
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc17
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.cc19
6 files changed, 71 insertions, 29 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index e6bddc603d..3a8cfcfb82 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -210,10 +210,16 @@ bool base_source_impl::load_message(bool wait)
/* Get the message */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(d_msg);
+ const bool ok = bool(d_socket.recv(d_msg));
#else
- d_socket.recv(&d_msg);
+ const bool ok = d_socket.recv(&d_msg);
#endif
+ if (!ok) {
+ // This shouldn't happen since we polled POLLIN, but ZMQ wants us to check
+ // the return value.
+ GR_LOG_WARN(d_logger, "Failed to recv() message.");
+ return false;
+ }
/* Throw away key and get the first message. Avoid blocking if a multi-part
* message is not sent */
@@ -222,14 +228,18 @@ bool base_source_impl::load_message(bool wait)
d_socket.getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len);
d_msg.rebuild();
- if (is_multipart)
+ if (is_multipart) {
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(d_msg);
+ const bool multi_ok = bool(d_socket.recv(d_msg));
#else
- d_socket.recv(&d_msg);
+ const bool multi_ok = d_socket.recv(&d_msg);
#endif
- else
+ if (!multi_ok) {
+ GR_LOG_ERROR(d_logger, "Failure to receive multi-part message.");
+ }
+ } else {
return false;
+ }
}
/* Parse header from the first (or only) message of a multi-part message */
if (d_pass_tags && !more) {
@@ -246,10 +256,8 @@ bool base_source_impl::load_message(bool wait)
/* Each message must contain an integer multiple of data vectors */
if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) {
- throw std::runtime_error(
- boost::str(boost::format("Incompatible vector sizes: "
- "need a multiple of %1% bytes per message") %
- d_vsize));
+ throw std::runtime_error("Incompatible vector sizes: need a multiple of " +
+ std::to_string(d_vsize) + " bytes per message");
}
/* We got one ! */
@@ -258,5 +266,3 @@ bool base_source_impl::load_message(bool wait)
} /* namespace zeromq */
} /* namespace gr */
-
-// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc
index 06463ef9cd..220a908565 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.cc
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -15,9 +15,10 @@
#include "pull_msg_source_impl.h"
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp>
+#include <chrono>
#include <memory>
+#include <thread>
namespace gr {
namespace zeromq {
@@ -74,6 +75,7 @@ bool pull_msg_source_impl::stop()
void pull_msg_source_impl::readloop()
{
+ using namespace std::chrono_literals;
while (!d_finished) {
zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
@@ -85,10 +87,16 @@ void pull_msg_source_impl::readloop()
// Receive data
zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(msg);
+ const bool ok = bool(d_socket.recv(msg));
#else
- d_socket.recv(&msg);
+ const bool ok = d_socket.recv(&msg);
#endif
+ if (!ok) {
+ // Should not happen, we've checked POLLIN.
+ GR_LOG_ERROR(d_logger, "Failed to receive message.");
+ std::this_thread::sleep_for(100ms);
+ continue;
+ }
std::string buf(static_cast<char*>(msg.data()), msg.size());
std::stringbuf sb(buf);
@@ -100,7 +108,7 @@ void pull_msg_source_impl::readloop()
}
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ std::this_thread::sleep_for(100ms);
}
}
}
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc
index d457e7fae7..ecb88deb01 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -89,10 +89,15 @@ void rep_msg_sink_impl::readloop()
// receive data request
zmq::message_t request;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(request);
+ const bool ok = bool(d_socket.recv(request));
#else
- d_socket.recv(&request);
+ const bool ok = d_socket.recv(&request);
#endif
+ if (!ok) {
+ // Should not happen, we've checked POLLIN.
+ GR_LOG_ERROR(d_logger, "Failed to receive message.");
+ break; // Fall back to re-check d_finished
+ }
int req_output_items = *(static_cast<int*>(request.data()));
if (req_output_items != 1)
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 920cc1ff27..f50feccbf2 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -59,10 +59,15 @@ int rep_sink_impl::work(int noutput_items,
/* Get and parse the request */
zmq::message_t request;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(request);
+ bool ok = bool(d_socket.recv(request));
#else
- d_socket.recv(&request);
+ bool ok = d_socket.recv(&request);
#endif
+ if (!ok) {
+ // Should not happen, we've checked POLLIN.
+ GR_LOG_ERROR(d_logger, "Failed to receive message.");
+ break;
+ }
int nitems_send = noutput_items - done;
if (request.size() >= sizeof(uint32_t)) {
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
index 43a1852f0b..2bd6435e22 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -15,9 +15,11 @@
#include "req_msg_source_impl.h"
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
+#include <chrono>
#include <memory>
+#include <thread>
namespace gr {
namespace zeromq {
@@ -74,6 +76,7 @@ bool req_msg_source_impl::stop()
void req_msg_source_impl::readloop()
{
+ using namespace std::chrono_literals;
while (!d_finished) {
// std::cout << "readloop\n";
@@ -103,10 +106,16 @@ void req_msg_source_impl::readloop()
// Receive data
zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(msg);
+ const bool ok = bool(d_socket.recv(msg));
#else
- d_socket.recv(&msg);
+ const bool ok = d_socket.recv(&msg);
#endif
+ if (!ok) {
+ // Should not happen, we've checked POLLIN.
+ GR_LOG_ERROR(d_logger, "Failed to receive message.");
+ std::this_thread::sleep_for(100ms);
+ continue;
+ }
std::string buf(static_cast<char*>(msg.data()), msg.size());
std::stringbuf sb(buf);
@@ -118,7 +127,7 @@ void req_msg_source_impl::readloop()
}
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ std::this_thread::sleep_for(100ms);
}
}
}
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc
index fd9229fdfd..b65179107f 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.cc
+++ b/gr-zeromq/lib/sub_msg_source_impl.cc
@@ -15,9 +15,10 @@
#include "sub_msg_source_impl.h"
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/thread/thread.hpp>
+#include <boost/bind.hpp>
+#include <chrono>
#include <memory>
+#include <thread>
namespace gr {
namespace zeromq {
@@ -73,6 +74,7 @@ bool sub_msg_source_impl::stop()
void sub_msg_source_impl::readloop()
{
+ using namespace std::chrono_literals;
while (!d_finished) {
zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
@@ -84,10 +86,17 @@ void sub_msg_source_impl::readloop()
// Receive data
zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(msg);
+ const bool ok = bool(d_socket.recv(msg));
#else
- d_socket.recv(&msg);
+ const bool ok = d_socket.recv(&msg);
#endif
+ if (!ok) {
+ // Should not happen, we've checked POLLIN.
+ GR_LOG_ERROR(d_logger, "Failed to receive message.");
+ std::this_thread::sleep_for(100ms);
+ continue;
+ }
+
std::string buf(static_cast<char*>(msg.data()), msg.size());
std::stringbuf sb(buf);
try {
@@ -97,7 +106,7 @@ void sub_msg_source_impl::readloop()
GR_LOG_ERROR(d_logger, std::string("Invalid PMT message: ") + e.what());
}
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ std::this_thread::sleep_for(100ms);
}
}
}