diff options
Diffstat (limited to 'gr-zeromq/lib/sub_msg_source_impl.cc')
-rw-r--r-- | gr-zeromq/lib/sub_msg_source_impl.cc | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc index fd9229fdfd..b65179107f 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.cc +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -15,9 +15,10 @@ #include "sub_msg_source_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> -#include <boost/date_time/posix_time/posix_time.hpp> -#include <boost/thread/thread.hpp> +#include <boost/bind.hpp> +#include <chrono> #include <memory> +#include <thread> namespace gr { namespace zeromq { @@ -73,6 +74,7 @@ bool sub_msg_source_impl::stop() void sub_msg_source_impl::readloop() { + using namespace std::chrono_literals; while (!d_finished) { zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; @@ -84,10 +86,17 @@ void sub_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket.recv(msg); + const bool ok = bool(d_socket.recv(msg)); #else - d_socket.recv(&msg); + const bool ok = d_socket.recv(&msg); #endif + if (!ok) { + // Should not happen, we've checked POLLIN. + GR_LOG_ERROR(d_logger, "Failed to receive message."); + std::this_thread::sleep_for(100ms); + continue; + } + std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); try { @@ -97,7 +106,7 @@ void sub_msg_source_impl::readloop() GR_LOG_ERROR(d_logger, std::string("Invalid PMT message: ") + e.what()); } } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + std::this_thread::sleep_for(100ms); } } } |