summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/req_msg_source_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/req_msg_source_impl.cc')
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc17
1 files changed, 13 insertions, 4 deletions
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
index 43a1852f0b..2bd6435e22 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -15,9 +15,11 @@
#include "req_msg_source_impl.h"
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
+#include <chrono>
#include <memory>
+#include <thread>
namespace gr {
namespace zeromq {
@@ -74,6 +76,7 @@ bool req_msg_source_impl::stop()
void req_msg_source_impl::readloop()
{
+ using namespace std::chrono_literals;
while (!d_finished) {
// std::cout << "readloop\n";
@@ -103,10 +106,16 @@ void req_msg_source_impl::readloop()
// Receive data
zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket.recv(msg);
+ const bool ok = bool(d_socket.recv(msg));
#else
- d_socket.recv(&msg);
+ const bool ok = d_socket.recv(&msg);
#endif
+ if (!ok) {
+ // Should not happen, we've checked POLLIN.
+ GR_LOG_ERROR(d_logger, "Failed to receive message.");
+ std::this_thread::sleep_for(100ms);
+ continue;
+ }
std::string buf(static_cast<char*>(msg.data()), msg.size());
std::stringbuf sb(buf);
@@ -118,7 +127,7 @@ void req_msg_source_impl::readloop()
}
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ std::this_thread::sleep_for(100ms);
}
}
}