summaryrefslogtreecommitdiff
path: root/gr-pdu/python/pdu/qa_time_delta.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-pdu/python/pdu/qa_time_delta.py')
-rwxr-xr-xgr-pdu/python/pdu/qa_time_delta.py37
1 files changed, 24 insertions, 13 deletions
diff --git a/gr-pdu/python/pdu/qa_time_delta.py b/gr-pdu/python/pdu/qa_time_delta.py
index 0cb0f99020..2003419ba7 100755
--- a/gr-pdu/python/pdu/qa_time_delta.py
+++ b/gr-pdu/python/pdu/qa_time_delta.py
@@ -20,7 +20,8 @@ class qa_time_delta(gr_unittest.TestCase):
def setUp(self):
self.tb = gr.top_block()
- self.time_delta = pdu.time_delta(pmt.intern("sys time delta (ms)"), pmt.intern("system_time"))
+ self.time_delta = pdu.time_delta(pmt.intern(
+ "sys time delta (ms)"), pmt.intern("system_time"))
self.debug = blocks.message_debug()
self.tb.msg_connect((self.time_delta, 'pdu'), (self.debug, 'store'))
@@ -30,8 +31,9 @@ class qa_time_delta(gr_unittest.TestCase):
def test_001_invalid_a(self):
self.tb.start()
- self.time_delta.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("NOT A PDU"))
- time.sleep(0.01) # short delay to ensure the message is processed
+ self.time_delta.to_basic_block()._post(
+ pmt.intern("pdu"), pmt.intern("NOT A PDU"))
+ time.sleep(0.01) # short delay to ensure the message is processed
self.tb.stop()
self.tb.wait()
@@ -39,14 +41,16 @@ class qa_time_delta(gr_unittest.TestCase):
self.assertEqual(0, self.debug.num_messages())
def test_001_invalid_b(self):
- in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- meta = pmt.dict_add(pmt.make_dict(), pmt.intern('sam'), pmt.from_double(25.1))
+ in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ meta = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ 'sam'), pmt.from_double(25.1))
in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data))
# set up fg
self.tb.start()
self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu)
- time.sleep(0.01) # short delay to ensure the message is processed
+ time.sleep(0.01) # short delay to ensure the message is processed
self.tb.stop()
self.tb.wait()
@@ -56,19 +60,25 @@ class qa_time_delta(gr_unittest.TestCase):
def test_002_normal(self):
tnow = time.time()
- in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow - 10.0))
+ in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ meta = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ 'system_time'), pmt.from_double(tnow - 10.0))
in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data))
- e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow))
- e_meta = pmt.dict_add(e_meta, pmt.intern('sys time delta (ms)'), pmt.from_double(10000.0))
+ e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ 'system_time'), pmt.from_double(tnow))
+ e_meta = pmt.dict_add(e_meta, pmt.intern(
+ 'sys time delta (ms)'), pmt.from_double(10000.0))
e_pdu = pmt.cons(e_meta, pmt.init_c32vector(len(e_data), e_data))
# set up fg
self.tb.start()
self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu)
- self.waitFor(lambda: self.debug.num_messages() == 1, timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: self.debug.num_messages() ==
+ 1, timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
@@ -76,7 +86,8 @@ class qa_time_delta(gr_unittest.TestCase):
self.assertEqual(1, self.debug.num_messages())
a_meta = pmt.car(self.debug.get_message(0))
time_tag = pmt.dict_ref(a_meta, pmt.intern("system_time"), pmt.PMT_NIL)
- delta_tag = pmt.dict_ref(a_meta, pmt.intern("sys time delta (ms)"), pmt.PMT_NIL)
+ delta_tag = pmt.dict_ref(a_meta, pmt.intern(
+ "sys time delta (ms)"), pmt.PMT_NIL)
self.assertAlmostEqual(tnow, pmt.to_double(time_tag), delta=60)
self.assertAlmostEqual(10000, pmt.to_double(delta_tag), delta=10)