diff options
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 30 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_msg_source_impl.cc | 16 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_msg_sink_impl.cc | 9 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 9 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.cc | 17 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_msg_source_impl.cc | 19 | ||||
-rw-r--r-- | gr-zeromq/python/zeromq/qa_zeromq_pull_msg_source.py | 7 |
7 files changed, 75 insertions, 32 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index e6bddc603d..3a8cfcfb82 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -210,10 +210,16 @@ bool base_source_impl::load_message(bool wait) /* Get the message */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(d_msg); + const bool ok = bool(d_socket.recv(d_msg)); #else - d_socket.recv(&d_msg); + const bool ok = d_socket.recv(&d_msg); #endif + if (!ok) { + // This shouldn't happen since we polled POLLIN, but ZMQ wants us to check + // the return value. + GR_LOG_WARN(d_logger, "Failed to recv() message."); + return false; + } /* Throw away key and get the first message. Avoid blocking if a multi-part * message is not sent */ @@ -222,14 +228,18 @@ bool base_source_impl::load_message(bool wait) d_socket.getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); d_msg.rebuild(); - if (is_multipart) + if (is_multipart) { #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(d_msg); + const bool multi_ok = bool(d_socket.recv(d_msg)); #else - d_socket.recv(&d_msg); + const bool multi_ok = d_socket.recv(&d_msg); #endif - else + if (!multi_ok) { + GR_LOG_ERROR(d_logger, "Failure to receive multi-part message."); + } + } else { return false; + } } /* Parse header from the first (or only) message of a multi-part message */ if (d_pass_tags && !more) { @@ -246,10 +256,8 @@ bool base_source_impl::load_message(bool wait) /* Each message must contain an integer multiple of data vectors */ if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) { - throw std::runtime_error( - boost::str(boost::format("Incompatible vector sizes: " - "need a multiple of %1% bytes per message") % - d_vsize)); + throw std::runtime_error("Incompatible vector sizes: need a multiple of " + + std::to_string(d_vsize) + " bytes per message"); } /* We got one ! */ @@ -258,5 +266,3 @@ bool base_source_impl::load_message(bool wait) } /* namespace zeromq */ } /* namespace gr */ - -// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc index 06463ef9cd..220a908565 100644 --- a/gr-zeromq/lib/pull_msg_source_impl.cc +++ b/gr-zeromq/lib/pull_msg_source_impl.cc @@ -15,9 +15,10 @@ #include "pull_msg_source_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> -#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread/thread.hpp> +#include <chrono> #include <memory> +#include <thread> namespace gr { namespace zeromq { @@ -74,6 +75,7 @@ bool pull_msg_source_impl::stop() void pull_msg_source_impl::readloop() { + using namespace std::chrono_literals; while (!d_finished) { zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; @@ -85,10 +87,16 @@ void pull_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(msg); + const bool ok = bool(d_socket.recv(msg)); #else - d_socket.recv(&msg); + const bool ok = d_socket.recv(&msg); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + std::this_thread::sleep_for(100ms); + continue; + } std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); @@ -100,7 +108,7 @@ void pull_msg_source_impl::readloop() } } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + std::this_thread::sleep_for(100ms); } } } diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc index d457e7fae7..ecb88deb01 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.cc +++ b/gr-zeromq/lib/rep_msg_sink_impl.cc @@ -89,10 +89,15 @@ void rep_msg_sink_impl::readloop() // receive data request zmq::message_t request; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(request); + const bool ok = bool(d_socket.recv(request)); #else - d_socket.recv(&request); + const bool ok = d_socket.recv(&request); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + break; // Fall back to re-check d_finished + } int req_output_items = *(static_cast<int*>(request.data())); if (req_output_items != 1) diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 920cc1ff27..f50feccbf2 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -59,10 +59,15 @@ int rep_sink_impl::work(int noutput_items, /* Get and parse the request */ zmq::message_t request; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(request); + bool ok = bool(d_socket.recv(request)); #else - d_socket.recv(&request); + bool ok = d_socket.recv(&request); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + break; + } int nitems_send = noutput_items - done; if (request.size() >= sizeof(uint32_t)) { diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc index 43a1852f0b..2bd6435e22 100644 --- a/gr-zeromq/lib/req_msg_source_impl.cc +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -15,9 +15,11 @@ #include "req_msg_source_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> -#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/bind.hpp> #include <boost/thread/thread.hpp> +#include <chrono> #include <memory> +#include <thread> namespace gr { namespace zeromq { @@ -74,6 +76,7 @@ bool req_msg_source_impl::stop() void req_msg_source_impl::readloop() { + using namespace std::chrono_literals; while (!d_finished) { // std::cout << "readloop\n"; @@ -103,10 +106,16 @@ void req_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(msg); + const bool ok = bool(d_socket.recv(msg)); #else - d_socket.recv(&msg); + const bool ok = d_socket.recv(&msg); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + std::this_thread::sleep_for(100ms); + continue; + } std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); @@ -118,7 +127,7 @@ void req_msg_source_impl::readloop() } } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + std::this_thread::sleep_for(100ms); } } } diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc index fd9229fdfd..b65179107f 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.cc +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -15,9 +15,10 @@ #include "sub_msg_source_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> -#include <boost/date_time/posix_time/posix_time.hpp> -#include <boost/thread/thread.hpp> +#include <boost/bind.hpp> +#include <chrono> #include <memory> +#include <thread> namespace gr { namespace zeromq { @@ -73,6 +74,7 @@ bool sub_msg_source_impl::stop() void sub_msg_source_impl::readloop() { + using namespace std::chrono_literals; while (!d_finished) { zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; @@ -84,10 +86,17 @@ void sub_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(msg); + const bool ok = bool(d_socket.recv(msg)); #else - d_socket.recv(&msg); + const bool ok = d_socket.recv(&msg); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + std::this_thread::sleep_for(100ms); + continue; + } + std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); try { @@ -97,7 +106,7 @@ void sub_msg_source_impl::readloop() GR_LOG_ERROR(d_logger, std::string("Invalid PMT message: ") + e.what()); } } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + std::this_thread::sleep_for(100ms); } } } diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pull_msg_source.py b/gr-zeromq/python/zeromq/qa_zeromq_pull_msg_source.py index 0a2e541acf..af1dc696f1 100644 --- a/gr-zeromq/python/zeromq/qa_zeromq_pull_msg_source.py +++ b/gr-zeromq/python/zeromq/qa_zeromq_pull_msg_source.py @@ -48,15 +48,16 @@ class qa_zeromq_pull_msg_source(gr_unittest.TestCase): """Test receiving of valid PMT messages""" msg = pmt.to_pmt('test_valid_pmt') self.zmq_sock.send(pmt.serialize_str(msg)) - - time.sleep(0.1) + for _ in range(10): + if self.message_debug.num_messages() > 0: + break + time.sleep(0.2) self.assertEqual(1, self.message_debug.num_messages()) self.assertTrue(pmt.equal(msg, self.message_debug.get_message(0))) def test_invalid_pmt(self): """Test receiving of invalid PMT messages""" self.zmq_sock.send_string('test_invalid_pmt') - time.sleep(0.1) self.assertEqual(0, self.message_debug.num_messages()) |