diff options
author | Martin Braun <martin.braun@ettus.com> | 2021-02-12 15:55:43 +0100 |
---|---|---|
committer | Martin Braun <martin@gnuradio.org> | 2021-02-15 01:11:10 -0800 |
commit | ec3d116546aa9710b324b1713cea058d80c906a8 (patch) | |
tree | e5f17655468142d8dbed1becab68a4076f017cca /gr-zeromq/lib | |
parent | 0351066c8b2398eb5894d89fd8ed37cdab43368d (diff) |
zeromq: Fix warnings with recv()
The recv() call on a ZMQ socket produces a warning if the return value
is not stored. We follow the advice and check the return value, just in
case.
Signed-off-by: Martin Braun <martin.braun@ettus.com>
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 30 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_msg_source_impl.cc | 16 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_msg_sink_impl.cc | 9 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 9 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.cc | 17 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_msg_source_impl.cc | 19 |
6 files changed, 71 insertions, 29 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index e6bddc603d..3a8cfcfb82 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -210,10 +210,16 @@ bool base_source_impl::load_message(bool wait) /* Get the message */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(d_msg); + const bool ok = bool(d_socket.recv(d_msg)); #else - d_socket.recv(&d_msg); + const bool ok = d_socket.recv(&d_msg); #endif + if (!ok) { + // This shouldn't happen since we polled POLLIN, but ZMQ wants us to check + // the return value. + GR_LOG_WARN(d_logger, "Failed to recv() message."); + return false; + } /* Throw away key and get the first message. Avoid blocking if a multi-part * message is not sent */ @@ -222,14 +228,18 @@ bool base_source_impl::load_message(bool wait) d_socket.getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); d_msg.rebuild(); - if (is_multipart) + if (is_multipart) { #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(d_msg); + const bool multi_ok = bool(d_socket.recv(d_msg)); #else - d_socket.recv(&d_msg); + const bool multi_ok = d_socket.recv(&d_msg); #endif - else + if (!multi_ok) { + GR_LOG_ERROR(d_logger, "Failure to receive multi-part message."); + } + } else { return false; + } } /* Parse header from the first (or only) message of a multi-part message */ if (d_pass_tags && !more) { @@ -246,10 +256,8 @@ bool base_source_impl::load_message(bool wait) /* Each message must contain an integer multiple of data vectors */ if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) { - throw std::runtime_error( - boost::str(boost::format("Incompatible vector sizes: " - "need a multiple of %1% bytes per message") % - d_vsize)); + throw std::runtime_error("Incompatible vector sizes: need a multiple of " + + std::to_string(d_vsize) + " bytes per message"); } /* We got one ! */ @@ -258,5 +266,3 @@ bool base_source_impl::load_message(bool wait) } /* namespace zeromq */ } /* namespace gr */ - -// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc index 06463ef9cd..220a908565 100644 --- a/gr-zeromq/lib/pull_msg_source_impl.cc +++ b/gr-zeromq/lib/pull_msg_source_impl.cc @@ -15,9 +15,10 @@ #include "pull_msg_source_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> -#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread/thread.hpp> +#include <chrono> #include <memory> +#include <thread> namespace gr { namespace zeromq { @@ -74,6 +75,7 @@ bool pull_msg_source_impl::stop() void pull_msg_source_impl::readloop() { + using namespace std::chrono_literals; while (!d_finished) { zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; @@ -85,10 +87,16 @@ void pull_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(msg); + const bool ok = bool(d_socket.recv(msg)); #else - d_socket.recv(&msg); + const bool ok = d_socket.recv(&msg); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + std::this_thread::sleep_for(100ms); + continue; + } std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); @@ -100,7 +108,7 @@ void pull_msg_source_impl::readloop() } } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + std::this_thread::sleep_for(100ms); } } } diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc index d457e7fae7..ecb88deb01 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.cc +++ b/gr-zeromq/lib/rep_msg_sink_impl.cc @@ -89,10 +89,15 @@ void rep_msg_sink_impl::readloop() // receive data request zmq::message_t request; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(request); + const bool ok = bool(d_socket.recv(request)); #else - d_socket.recv(&request); + const bool ok = d_socket.recv(&request); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + break; // Fall back to re-check d_finished + } int req_output_items = *(static_cast<int*>(request.data())); if (req_output_items != 1) diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 920cc1ff27..f50feccbf2 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -59,10 +59,15 @@ int rep_sink_impl::work(int noutput_items, /* Get and parse the request */ zmq::message_t request; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(request); + bool ok = bool(d_socket.recv(request)); #else - d_socket.recv(&request); + bool ok = d_socket.recv(&request); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + break; + } int nitems_send = noutput_items - done; if (request.size() >= sizeof(uint32_t)) { diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc index 43a1852f0b..2bd6435e22 100644 --- a/gr-zeromq/lib/req_msg_source_impl.cc +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -15,9 +15,11 @@ #include "req_msg_source_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> -#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/bind.hpp> #include <boost/thread/thread.hpp> +#include <chrono> #include <memory> +#include <thread> namespace gr { namespace zeromq { @@ -74,6 +76,7 @@ bool req_msg_source_impl::stop() void req_msg_source_impl::readloop() { + using namespace std::chrono_literals; while (!d_finished) { // std::cout << "readloop\n"; @@ -103,10 +106,16 @@ void req_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(msg); + const bool ok = bool(d_socket.recv(msg)); #else - d_socket.recv(&msg); + const bool ok = d_socket.recv(&msg); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + std::this_thread::sleep_for(100ms); + continue; + } std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); @@ -118,7 +127,7 @@ void req_msg_source_impl::readloop() } } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + std::this_thread::sleep_for(100ms); } } } diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc index fd9229fdfd..b65179107f 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.cc +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -15,9 +15,10 @@ #include "sub_msg_source_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> -#include <boost/date_time/posix_time/posix_time.hpp> -#include <boost/thread/thread.hpp> +#include <boost/bind.hpp> +#include <chrono> #include <memory> +#include <thread> namespace gr { namespace zeromq { @@ -73,6 +74,7 @@ bool sub_msg_source_impl::stop() void sub_msg_source_impl::readloop() { + using namespace std::chrono_literals; while (!d_finished) { zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; @@ -84,10 +86,17 @@ void sub_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(msg); + const bool ok = bool(d_socket.recv(msg)); #else - d_socket.recv(&msg); + const bool ok = d_socket.recv(&msg); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + std::this_thread::sleep_for(100ms); + continue; + } + std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); try { @@ -97,7 +106,7 @@ void sub_msg_source_impl::readloop() GR_LOG_ERROR(d_logger, std::string("Invalid PMT message: ") + e.what()); } } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + std::this_thread::sleep_for(100ms); } } } |