diff options
author | Brennan Ashton <bashton@brennanashton.com> | 2018-11-03 00:56:56 -0700 |
---|---|---|
committer | Brennan Ashton <bashton@brennanashton.com> | 2018-11-03 01:53:57 -0700 |
commit | fb78c4b0e60ce7a94bed3450d85e52d0fe96adff (patch) | |
tree | 89e61ccd1530ea5f226c757ecbf52a5074c91027 | |
parent | abd594ae7df1dd15f9893e190846d789f752ab2d (diff) |
gr-zeromq: Keep qa_zeromq_pub test from blocking
The core of this issue is the socket may not be ready when
add_socket returns from probe_manager. Add a delay to give the
socket time. Also make recv non-blocking, if the message is not
recevied it will throw an exception.
The thread was also calling poll with None. This is bad since
if socket never gets the message the thread will wait forever.
To help fail fast this patch also sets a time limit on the
polling thread.
This is referenced in #1261
-rw-r--r-- | gr-zeromq/python/zeromq/probe_manager.py | 10 | ||||
-rw-r--r-- | gr-zeromq/python/zeromq/qa_zeromq_pub.py | 4 |
2 files changed, 11 insertions, 3 deletions
diff --git a/gr-zeromq/python/zeromq/probe_manager.py b/gr-zeromq/python/zeromq/probe_manager.py index c224ca7870..e3952e7351 100644 --- a/gr-zeromq/python/zeromq/probe_manager.py +++ b/gr-zeromq/python/zeromq/probe_manager.py @@ -20,6 +20,8 @@ from __future__ import unicode_literals # Boston, MA 02110-1301, USA. # +import time + import zmq import numpy @@ -32,19 +34,23 @@ class probe_manager(object): def add_socket(self, address, data_type, callback_func): socket = self.zmq_context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") + # Do not wait more than 5 seconds for a message + socket.setsockopt(zmq.RCVTIMEO, 100) socket.connect(address) # use a tuple to store interface elements self.interfaces.append((socket, data_type, callback_func)) self.poller.register(socket, zmq.POLLIN) + # Give it time for the socket to be ready + time.sleep(0.5) def watcher(self): - poll = dict(self.poller.poll(None)) + poll = dict(self.poller.poll(1000)) for i in self.interfaces: # i = (socket, data_type, callback_func) if poll.get(i[0]) == zmq.POLLIN: # receive data msg_packed = i[0].recv() # use numpy to unpack the data - msg_unpacked = numpy.fromstring(msg_packed, numpy.dtype(i[1])) + msg_unpacked = numpy.frombuffer(msg_packed, numpy.dtype(i[1])) # invoke callback function i[2](msg_unpacked) diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pub.py b/gr-zeromq/python/zeromq/qa_zeromq_pub.py index 2cee710d2a..01f32c5e7f 100644 --- a/gr-zeromq/python/zeromq/qa_zeromq_pub.py +++ b/gr-zeromq/python/zeromq/qa_zeromq_pub.py @@ -48,7 +48,9 @@ class qa_zeromq_pub (gr_unittest.TestCase): zmq_pull_t.daemon = True zmq_pull_t.start() self.tb.run() - zmq_pull_t.join() + zmq_pull_t.join(6.0) + # Check to see if we timed out + self.assertFalse(zmq_pull_t.is_alive()) self.assertFloatTuplesAlmostEqual(self.rx_data, src_data) def recv_data (self, data): |