diff options
Diffstat (limited to 'gr-pdu/python/pdu/qa_time_delta.py')
-rwxr-xr-x | gr-pdu/python/pdu/qa_time_delta.py | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/gr-pdu/python/pdu/qa_time_delta.py b/gr-pdu/python/pdu/qa_time_delta.py index 0cb0f99020..2003419ba7 100755 --- a/gr-pdu/python/pdu/qa_time_delta.py +++ b/gr-pdu/python/pdu/qa_time_delta.py @@ -20,7 +20,8 @@ class qa_time_delta(gr_unittest.TestCase): def setUp(self): self.tb = gr.top_block() - self.time_delta = pdu.time_delta(pmt.intern("sys time delta (ms)"), pmt.intern("system_time")) + self.time_delta = pdu.time_delta(pmt.intern( + "sys time delta (ms)"), pmt.intern("system_time")) self.debug = blocks.message_debug() self.tb.msg_connect((self.time_delta, 'pdu'), (self.debug, 'store')) @@ -30,8 +31,9 @@ class qa_time_delta(gr_unittest.TestCase): def test_001_invalid_a(self): self.tb.start() - self.time_delta.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("NOT A PDU")) - time.sleep(0.01) # short delay to ensure the message is processed + self.time_delta.to_basic_block()._post( + pmt.intern("pdu"), pmt.intern("NOT A PDU")) + time.sleep(0.01) # short delay to ensure the message is processed self.tb.stop() self.tb.wait() @@ -39,14 +41,16 @@ class qa_time_delta(gr_unittest.TestCase): self.assertEqual(0, self.debug.num_messages()) def test_001_invalid_b(self): - in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - meta = pmt.dict_add(pmt.make_dict(), pmt.intern('sam'), pmt.from_double(25.1)) + in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + meta = pmt.dict_add(pmt.make_dict(), pmt.intern( + 'sam'), pmt.from_double(25.1)) in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data)) # set up fg self.tb.start() self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu) - time.sleep(0.01) # short delay to ensure the message is processed + time.sleep(0.01) # short delay to ensure the message is processed self.tb.stop() self.tb.wait() @@ -56,19 +60,25 @@ class qa_time_delta(gr_unittest.TestCase): def test_002_normal(self): tnow = time.time() - in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow - 10.0)) + in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + meta = pmt.dict_add(pmt.make_dict(), pmt.intern( + 'system_time'), pmt.from_double(tnow - 10.0)) in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data)) - e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow)) - e_meta = pmt.dict_add(e_meta, pmt.intern('sys time delta (ms)'), pmt.from_double(10000.0)) + e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern( + 'system_time'), pmt.from_double(tnow)) + e_meta = pmt.dict_add(e_meta, pmt.intern( + 'sys time delta (ms)'), pmt.from_double(10000.0)) e_pdu = pmt.cons(e_meta, pmt.init_c32vector(len(e_data), e_data)) # set up fg self.tb.start() self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu) - self.waitFor(lambda: self.debug.num_messages() == 1, timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: self.debug.num_messages() == + 1, timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() @@ -76,7 +86,8 @@ class qa_time_delta(gr_unittest.TestCase): self.assertEqual(1, self.debug.num_messages()) a_meta = pmt.car(self.debug.get_message(0)) time_tag = pmt.dict_ref(a_meta, pmt.intern("system_time"), pmt.PMT_NIL) - delta_tag = pmt.dict_ref(a_meta, pmt.intern("sys time delta (ms)"), pmt.PMT_NIL) + delta_tag = pmt.dict_ref(a_meta, pmt.intern( + "sys time delta (ms)"), pmt.PMT_NIL) self.assertAlmostEqual(tnow, pmt.to_double(time_tag), delta=60) self.assertAlmostEqual(10000, pmt.to_double(delta_tag), delta=10) |