summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrennan Ashton <bashton@brennanashton.com>2018-11-03 00:56:56 -0700
committerBrennan Ashton <bashton@brennanashton.com>2018-11-03 01:53:57 -0700
commitfb78c4b0e60ce7a94bed3450d85e52d0fe96adff (patch)
tree89e61ccd1530ea5f226c757ecbf52a5074c91027
parentabd594ae7df1dd15f9893e190846d789f752ab2d (diff)
gr-zeromq: Keep qa_zeromq_pub test from blocking
The core of this issue is the socket may not be ready when add_socket returns from probe_manager. Add a delay to give the socket time. Also make recv non-blocking, if the message is not recevied it will throw an exception. The thread was also calling poll with None. This is bad since if socket never gets the message the thread will wait forever. To help fail fast this patch also sets a time limit on the polling thread. This is referenced in #1261
-rw-r--r--gr-zeromq/python/zeromq/probe_manager.py10
-rw-r--r--gr-zeromq/python/zeromq/qa_zeromq_pub.py4
2 files changed, 11 insertions, 3 deletions
diff --git a/gr-zeromq/python/zeromq/probe_manager.py b/gr-zeromq/python/zeromq/probe_manager.py
index c224ca7870..e3952e7351 100644
--- a/gr-zeromq/python/zeromq/probe_manager.py
+++ b/gr-zeromq/python/zeromq/probe_manager.py
@@ -20,6 +20,8 @@ from __future__ import unicode_literals
# Boston, MA 02110-1301, USA.
#
+import time
+
import zmq
import numpy
@@ -32,19 +34,23 @@ class probe_manager(object):
def add_socket(self, address, data_type, callback_func):
socket = self.zmq_context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"")
+ # Do not wait more than 5 seconds for a message
+ socket.setsockopt(zmq.RCVTIMEO, 100)
socket.connect(address)
# use a tuple to store interface elements
self.interfaces.append((socket, data_type, callback_func))
self.poller.register(socket, zmq.POLLIN)
+ # Give it time for the socket to be ready
+ time.sleep(0.5)
def watcher(self):
- poll = dict(self.poller.poll(None))
+ poll = dict(self.poller.poll(1000))
for i in self.interfaces:
# i = (socket, data_type, callback_func)
if poll.get(i[0]) == zmq.POLLIN:
# receive data
msg_packed = i[0].recv()
# use numpy to unpack the data
- msg_unpacked = numpy.fromstring(msg_packed, numpy.dtype(i[1]))
+ msg_unpacked = numpy.frombuffer(msg_packed, numpy.dtype(i[1]))
# invoke callback function
i[2](msg_unpacked)
diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pub.py b/gr-zeromq/python/zeromq/qa_zeromq_pub.py
index 2cee710d2a..01f32c5e7f 100644
--- a/gr-zeromq/python/zeromq/qa_zeromq_pub.py
+++ b/gr-zeromq/python/zeromq/qa_zeromq_pub.py
@@ -48,7 +48,9 @@ class qa_zeromq_pub (gr_unittest.TestCase):
zmq_pull_t.daemon = True
zmq_pull_t.start()
self.tb.run()
- zmq_pull_t.join()
+ zmq_pull_t.join(6.0)
+ # Check to see if we timed out
+ self.assertFalse(zmq_pull_t.is_alive())
self.assertFloatTuplesAlmostEqual(self.rx_data, src_data)
def recv_data (self, data):