diff options
author | Josh Morman <jmorman@gnuradio.org> | 2021-11-24 12:36:26 -0500 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-11-24 14:41:53 -0500 |
commit | aa58bb7d9a37212f46af3cb52e39bfdd6e1bdaf6 (patch) | |
tree | dff8b2ba7b410598f4390695dd619ed417b14ce0 | |
parent | 0ec71943a5a1f4dd22841e57e5f503795e86b931 (diff) |
pdu: pep8 formatting
Signed-off-by: Josh Morman <jmorman@gnuradio.org>
-rw-r--r-- | gr-pdu/python/pdu/pdu_lambda.py | 24 | ||||
-rwxr-xr-x | gr-pdu/python/pdu/qa_add_system_time.py | 21 | ||||
-rw-r--r-- | gr-pdu/python/pdu/qa_pdu.py | 1 | ||||
-rwxr-xr-x | gr-pdu/python/pdu/qa_pdu_lambda.py | 3 | ||||
-rw-r--r-- | gr-pdu/python/pdu/qa_pdu_split.py | 65 | ||||
-rwxr-xr-x | gr-pdu/python/pdu/qa_pdu_to_stream.py | 67 | ||||
-rwxr-xr-x | gr-pdu/python/pdu/qa_tags_to_pdu.py | 232 | ||||
-rwxr-xr-x | gr-pdu/python/pdu/qa_take_skip_to_pdu.py | 64 | ||||
-rwxr-xr-x | gr-pdu/python/pdu/qa_time_delta.py | 37 |
9 files changed, 313 insertions, 201 deletions
diff --git a/gr-pdu/python/pdu/pdu_lambda.py b/gr-pdu/python/pdu/pdu_lambda.py index e6a286ed2a..d80e679562 100644 --- a/gr-pdu/python/pdu/pdu_lambda.py +++ b/gr-pdu/python/pdu/pdu_lambda.py @@ -10,7 +10,9 @@ from gnuradio import gr import numpy as np -import math, pmt, time +import math +import pmt +import time class pdu_lambda(gr.basic_block): @@ -40,8 +42,8 @@ class pdu_lambda(gr.basic_block): def __init__(self, fn, metadict, key=pmt.PMT_NIL): gr.basic_block.__init__(self, - name="pdu_lambda", - in_sig=[],out_sig=[]) + name="pdu_lambda", + in_sig=[], out_sig=[]) self.set_fn(fn) self.set_key(key) self.metadict_mode = metadict @@ -61,7 +63,7 @@ class pdu_lambda(gr.basic_block): print(e) pass self.message_port_pub(pmt.intern("pdu"), - pmt.cons(meta, pmt.cdr(pdu))); + pmt.cons(meta, pmt.cdr(pdu))) elif self.metadict_mode == "UVEC": vec = pmt.cdr(pdu) @@ -72,8 +74,8 @@ class pdu_lambda(gr.basic_block): pass self.message_port_pub(pmt.intern("pdu"), - pmt.cons(pmt.car(pdu), vec)); - + pmt.cons(pmt.car(pdu), vec)) + elif self.metadict_mode == "RAW": # TODO: This is more of a "message lambda" block, in the future it should be # a separate block outside the PDU namespace @@ -83,14 +85,14 @@ class pdu_lambda(gr.basic_block): print(e) pass self.message_port_pub(pmt.intern("pdu"), pdu) - + else: - raise ValueError("pdu_lambda block instantiated in unknown mode " + repr(self.metadict_mode)) + raise ValueError( + "pdu_lambda block instantiated in unknown mode " + repr(self.metadict_mode)) pass - - def set_fn(self,fn): + def set_fn(self, fn): self.fn = fn - def set_key(self,key): + def set_key(self, key): self.key = key diff --git a/gr-pdu/python/pdu/qa_add_system_time.py b/gr-pdu/python/pdu/qa_add_system_time.py index 44d0ad4bef..0688122a6b 100755 --- a/gr-pdu/python/pdu/qa_add_system_time.py +++ b/gr-pdu/python/pdu/qa_add_system_time.py @@ -32,29 +32,35 @@ class qa_add_system_time(gr_unittest.TestCase): def test_001_basic_io(self): self.tb.start() # provide two non PDU inputs and one PDU input - self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("BAD PDU")) + self.add_sys_time.to_basic_block()._post( + pmt.intern("pdu"), pmt.intern("BAD PDU")) self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.from_long(4), pmt.PMT_NIL)) self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.make_dict(), pmt.init_f32vector(1, [0.0]))) - self.waitFor(lambda: self.debug.num_messages() >= 1, timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: self.debug.num_messages() >= + 1, timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() # make sure we got one message and it has a systime key self.assertEqual(1, self.debug.num_messages()) - self.assertTrue(pmt.dict_has_key(pmt.car(self.debug.get_message(0)), pmt.intern('systime'))) + self.assertTrue(pmt.dict_has_key( + pmt.car(self.debug.get_message(0)), pmt.intern('systime'))) def test_002_timing(self): self.tb.start() - self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("BAD PDU")) + self.add_sys_time.to_basic_block()._post( + pmt.intern("pdu"), pmt.intern("BAD PDU")) self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.make_dict(), pmt.init_u8vector(1, [0]))) - time.sleep(1.0) # wait for one second to provide a time difference between messages + # wait for one second to provide a time difference between messages + time.sleep(1.0) self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.make_dict(), pmt.init_u8vector(1, [0]))) - self.waitFor(lambda: self.debug.num_messages() == 2, timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: self.debug.num_messages() == + 2, timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() @@ -64,7 +70,8 @@ class qa_add_system_time(gr_unittest.TestCase): t1 = pmt.to_double(pmt.dict_ref(pmt.car(self.debug.get_message(1)), pmt.intern("systime"), pmt.from_double(0.0))) - self.assertTrue(((t1 - t0) - 1) < 0.05) # should be sufficient tolerance + # should be sufficient tolerance + self.assertTrue(((t1 - t0) - 1) < 0.05) if __name__ == '__main__': diff --git a/gr-pdu/python/pdu/qa_pdu.py b/gr-pdu/python/pdu/qa_pdu.py index 388a93b931..e194b5061a 100644 --- a/gr-pdu/python/pdu/qa_pdu.py +++ b/gr-pdu/python/pdu/qa_pdu.py @@ -144,5 +144,6 @@ class test_pdu(gr_unittest.TestCase): data = pmt.u8vector_elements(pmt.cdr(dbg.get_message(0))) self.assertEqual([1, 2, 3], data) + if __name__ == '__main__': gr_unittest.run(test_pdu) diff --git a/gr-pdu/python/pdu/qa_pdu_lambda.py b/gr-pdu/python/pdu/qa_pdu_lambda.py index f93475b8d9..6f32695d8a 100755 --- a/gr-pdu/python/pdu/qa_pdu_lambda.py +++ b/gr-pdu/python/pdu/qa_pdu_lambda.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest from gnuradio import pdu + class qa_pdu_lambda(gr_unittest.TestCase): def setUp(self): @@ -21,7 +22,7 @@ class qa_pdu_lambda(gr_unittest.TestCase): def test_smoketest(self): # FIXME: Test will fail until you pass sensible arguments to the constructor - instance = pdu.pdu_lambda(lambda uvec: uvec*10, False) + instance = pdu.pdu_lambda(lambda uvec: uvec * 10, False) def test_001_descriptive_test_name(self): # set up fg diff --git a/gr-pdu/python/pdu/qa_pdu_split.py b/gr-pdu/python/pdu/qa_pdu_split.py index 988fb6b164..a957c4d97d 100644 --- a/gr-pdu/python/pdu/qa_pdu_split.py +++ b/gr-pdu/python/pdu/qa_pdu_split.py @@ -16,13 +16,13 @@ import time class qa_pdu_split (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_split (self): + def test_001_split(self): split = pdu.pdu_split() d1 = blocks.message_debug() d2 = blocks.message_debug() @@ -30,28 +30,36 @@ class qa_pdu_split (gr_unittest.TestCase): self.tb.msg_connect((split, 'dict'), (d1, 'store')) self.tb.msg_connect((split, 'vec'), (d2, 'store')) - in_meta1 = pmt.dict_add(pmt.make_dict(), pmt.intern('num'), pmt.from_long(4)) - in_meta2 = pmt.dict_add(pmt.make_dict(), pmt.intern('n'), pmt.from_long(99)) + in_meta1 = pmt.dict_add( + pmt.make_dict(), pmt.intern('num'), pmt.from_long(4)) + in_meta2 = pmt.dict_add( + pmt.make_dict(), pmt.intern('n'), pmt.from_long(99)) in_pdu = pmt.cons(in_meta1, pmt.init_u8vector(6, range(6))) self.tb.start() split.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("MALFORMED PDU")) - split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(2, range(2)))) - split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, []))) + split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons( + pmt.PMT_NIL, pmt.init_u8vector(2, range(2)))) + split.to_basic_block()._post(pmt.intern( + "pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, []))) split.to_basic_block()._post(pmt.intern("pdu"), in_pdu) - split.to_basic_block()._post(pmt.intern("system"), pmt.cons(pmt.intern("done"), pmt.from_long(1))) - self.waitFor(lambda: d1.num_messages() == 2, timeout=1.0, poll_interval=0.01) - self.waitFor(lambda: d2.num_messages() == 2, timeout=1.0, poll_interval=0.01) + split.to_basic_block()._post(pmt.intern("system"), + pmt.cons(pmt.intern("done"), pmt.from_long(1))) + self.waitFor(lambda: d1.num_messages() == 2, + timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: d2.num_messages() == 2, + timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() self.assertTrue(pmt.equal(d1.get_message(0), in_meta2)) self.assertTrue(pmt.equal(d1.get_message(1), in_meta1)) - self.assertTrue(pmt.equal(d2.get_message(0), pmt.init_u8vector(2, range(2)))) - self.assertTrue(pmt.equal(d2.get_message(1), pmt.init_u8vector(6, range(6)))) + self.assertTrue(pmt.equal(d2.get_message( + 0), pmt.init_u8vector(2, range(2)))) + self.assertTrue(pmt.equal(d2.get_message( + 1), pmt.init_u8vector(6, range(6)))) - - def test_002_pass_empty (self): + def test_002_pass_empty(self): split = pdu.pdu_split(True) d1 = blocks.message_debug() d2 = blocks.message_debug() @@ -59,26 +67,35 @@ class qa_pdu_split (gr_unittest.TestCase): self.tb.msg_connect((split, 'dict'), (d1, 'store')) self.tb.msg_connect((split, 'vec'), (d2, 'store')) - in_meta1 = pmt.dict_add(pmt.make_dict(), pmt.intern('num'), pmt.from_long(4)) - in_meta2 = pmt.dict_add(pmt.make_dict(), pmt.intern('n'), pmt.from_long(99)) + in_meta1 = pmt.dict_add( + pmt.make_dict(), pmt.intern('num'), pmt.from_long(4)) + in_meta2 = pmt.dict_add( + pmt.make_dict(), pmt.intern('n'), pmt.from_long(99)) in_pdu = pmt.cons(in_meta1, pmt.init_u8vector(6, range(6))) self.tb.start() split.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("MALFORMED PDU")) - split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(2, range(2)))) - split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, []))) + split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons( + pmt.PMT_NIL, pmt.init_u8vector(2, range(2)))) + split.to_basic_block()._post(pmt.intern( + "pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, []))) split.to_basic_block()._post(pmt.intern("pdu"), in_pdu) - split.to_basic_block()._post(pmt.intern("system"), pmt.cons(pmt.intern("done"), pmt.from_long(1))) - self.waitFor(lambda: d1.num_messages() == 3, timeout=1.0, poll_interval=0.01) - self.waitFor(lambda: d2.num_messages() == 3, timeout=1.0, poll_interval=0.01) + split.to_basic_block()._post(pmt.intern("system"), + pmt.cons(pmt.intern("done"), pmt.from_long(1))) + self.waitFor(lambda: d1.num_messages() == 3, + timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: d2.num_messages() == 3, + timeout=1.0, poll_interval=0.01) self.tb.wait() self.assertTrue(pmt.equal(d1.get_message(0), pmt.PMT_NIL)) self.assertTrue(pmt.equal(d1.get_message(1), in_meta2)) self.assertTrue(pmt.equal(d1.get_message(2), in_meta1)) - self.assertTrue(pmt.equal(d2.get_message(0), pmt.init_u8vector(2, range(2)))) + self.assertTrue(pmt.equal(d2.get_message( + 0), pmt.init_u8vector(2, range(2)))) self.assertTrue(pmt.equal(d2.get_message(1), pmt.init_u8vector(0, []))) - self.assertTrue(pmt.equal(d2.get_message(2), pmt.init_u8vector(6, range(6)))) + self.assertTrue(pmt.equal(d2.get_message( + 2), pmt.init_u8vector(6, range(6)))) if __name__ == '__main__': diff --git a/gr-pdu/python/pdu/qa_pdu_to_stream.py b/gr-pdu/python/pdu/qa_pdu_to_stream.py index c2f69c2186..9890c1021b 100755 --- a/gr-pdu/python/pdu/qa_pdu_to_stream.py +++ b/gr-pdu/python/pdu/qa_pdu_to_stream.py @@ -13,31 +13,37 @@ import numpy as np import pmt import time + class qa_pdu_to_bursts (gr_unittest.TestCase): - def setUp (self): + def setUp(self): self.tb = gr.top_block() self.p2s = pdu.pdu_to_stream_c(pdu.EARLY_BURST_APPEND, 64) self.vs = blocks.vector_sink_c(1) - self.tag_debug = blocks.tag_debug(gr.sizeof_gr_complex*1, '', "") + self.tag_debug = blocks.tag_debug(gr.sizeof_gr_complex * 1, '', "") self.tag_debug.set_display(True) self.tb.connect((self.p2s, 0), (self.vs, 0)) self.tb.connect((self.p2s, 0), (self.tag_debug, 0)) - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_basic (self): - in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - in_pdu = pmt.cons(pmt.make_dict(), pmt.init_c32vector(len(in_data), in_data)) - e_tag_0 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL)) - e_tag_1 = gr.tag_utils.python_to_tag((len(in_data)-1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL)) + def test_001_basic(self): + in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + in_pdu = pmt.cons( + pmt.make_dict(), pmt.init_c32vector(len(in_data), in_data)) + e_tag_0 = gr.tag_utils.python_to_tag( + (0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL)) + e_tag_1 = gr.tag_utils.python_to_tag( + (len(in_data) - 1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL)) self.tb.start() self.p2s.to_basic_block()._post(pmt.intern("pdus"), pmt.intern("MALFORMED PDU")) self.p2s.to_basic_block()._post(pmt.intern("pdus"), in_pdu) - self.waitFor(lambda: len(self.vs.tags()) == 2, timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: len(self.vs.tags()) == 2, + timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() @@ -51,19 +57,26 @@ class qa_pdu_to_bursts (gr_unittest.TestCase): self.assertTrue(pmt.equal(tags[1].value, e_tag_1.value)) self.assertTrue((in_data == np.real(self.vs.data())).all()) - def test_002_timed (self): - in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - tag_time = pmt.make_tuple(pmt.from_uint64(11), pmt.from_double(0.123456)) - in_dict = pmt.dict_add(pmt.make_dict(), pmt.intern("tx_time"), tag_time) + def test_002_timed(self): + in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + tag_time = pmt.make_tuple( + pmt.from_uint64(11), pmt.from_double(0.123456)) + in_dict = pmt.dict_add( + pmt.make_dict(), pmt.intern("tx_time"), tag_time) in_pdu = pmt.cons(in_dict, pmt.init_c32vector(len(in_data), in_data)) - e_tag_0 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL)) - e_tag_1 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL)) - e_tag_2 = gr.tag_utils.python_to_tag((len(in_data)-1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL)) + e_tag_0 = gr.tag_utils.python_to_tag( + (0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL)) + e_tag_1 = gr.tag_utils.python_to_tag( + (0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL)) + e_tag_2 = gr.tag_utils.python_to_tag( + (len(in_data) - 1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL)) self.tb.start() self.p2s.to_basic_block()._post(pmt.intern("pdus"), pmt.intern("MALFORMED PDU")) self.p2s.to_basic_block()._post(pmt.intern("pdus"), in_pdu) - self.waitFor(lambda: len(self.vs.tags()) == 3, timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: len(self.vs.tags()) == 3, + timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() @@ -80,19 +93,25 @@ class qa_pdu_to_bursts (gr_unittest.TestCase): self.assertTrue(pmt.equal(tags[2].value, e_tag_2.value)) self.assertTrue((in_data == np.real(self.vs.data())).all()) - def test_003_timed_long (self): + def test_003_timed_long(self): in_data = np.arange(25000).tolist() - tag_time = pmt.make_tuple(pmt.from_uint64(11), pmt.from_double(0.123456)) - in_dict = pmt.dict_add(pmt.make_dict(), pmt.intern("tx_time"), tag_time) + tag_time = pmt.make_tuple( + pmt.from_uint64(11), pmt.from_double(0.123456)) + in_dict = pmt.dict_add( + pmt.make_dict(), pmt.intern("tx_time"), tag_time) in_pdu = pmt.cons(in_dict, pmt.init_c32vector(len(in_data), in_data)) - e_tag_0 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL)) - e_tag_1 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL)) - e_tag_2 = gr.tag_utils.python_to_tag((len(in_data)-1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL)) + e_tag_0 = gr.tag_utils.python_to_tag( + (0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL)) + e_tag_1 = gr.tag_utils.python_to_tag( + (0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL)) + e_tag_2 = gr.tag_utils.python_to_tag( + (len(in_data) - 1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL)) self.tb.start() self.p2s.to_basic_block()._post(pmt.intern("pdus"), pmt.intern("MALFORMED PDU")) self.p2s.to_basic_block()._post(pmt.intern("pdus"), in_pdu) - self.waitFor(lambda: len(self.vs.tags()) == 3, timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: len(self.vs.tags()) == 3, + timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() diff --git a/gr-pdu/python/pdu/qa_tags_to_pdu.py b/gr-pdu/python/pdu/qa_tags_to_pdu.py index de12af4f20..9e3be51199 100755 --- a/gr-pdu/python/pdu/qa_tags_to_pdu.py +++ b/gr-pdu/python/pdu/qa_tags_to_pdu.py @@ -13,211 +13,261 @@ import numpy as np import pmt import time + class qa_tags_to_pdu (gr_unittest.TestCase): - def setUp (self): + def setUp(self): pass - def tearDown (self): + def tearDown(self): pass - def test_001_simple (self): - self.tb = gr.top_block () + def test_001_simple(self): + self.tb = gr.top_block() start_time = 0.1 - sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag = gr.tag_utils.python_to_tag((34+(8*31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + sob_tag = gr.tag_utils.python_to_tag( + (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag = gr.tag_utils.python_to_tag( + (34 + (8 * 31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) vs = blocks.vector_source_s(range(350), False, 1, [sob_tag, eob_tag]) - t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time) + t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern( + 'EOB'), 1024, 512000, ([]), False, 0, start_time) t2p.set_eob_parameters(8, 0) dbg = blocks.message_debug() self.tb.connect(vs, t2p) self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store')) - expected_vec = pmt.init_s16vector((8*31), range(34,34+(8*31))) + expected_vec = pmt.init_s16vector((8 * 31), range(34, 34 + (8 * 31))) expected_time = start_time + (34 / 512000.0) - self.tb.run () + self.tb.run() self.assertEqual(dbg.num_messages(), 1) self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec)) - time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time) + time_tuple1 = pmt.dict_ref( + pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time) self.tb = None - - def test_002_secondSOB (self): - self.tb = gr.top_block () + def test_002_secondSOB(self): + self.tb = gr.top_block() start_time = 4.999999999 - sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - sob_tag2 = gr.tag_utils.python_to_tag((51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag = gr.tag_utils.python_to_tag((51+(8*26), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) - vs = blocks.vector_source_s(range(350), False, 1, [sob_tag, sob_tag2, eob_tag]) - t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 460800, ([]), False, 0, start_time) + sob_tag = gr.tag_utils.python_to_tag( + (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + sob_tag2 = gr.tag_utils.python_to_tag( + (51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag = gr.tag_utils.python_to_tag( + (51 + (8 * 26), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + vs = blocks.vector_source_s(range(350), False, 1, [ + sob_tag, sob_tag2, eob_tag]) + t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern( + 'EOB'), 1024, 460800, ([]), False, 0, start_time) t2p.set_eob_parameters(8, 0) dbg = blocks.message_debug() self.tb.connect(vs, t2p) self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store')) - expected_vec = pmt.init_s16vector((8*26), range(51,51+(8*26))) + expected_vec = pmt.init_s16vector((8 * 26), range(51, 51 + (8 * 26))) expected_time = start_time + (51 / 460800.0) - self.tb.run () + self.tb.run() self.assertEqual(dbg.num_messages(), 1) self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec)) - time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time) + time_tuple1 = pmt.dict_ref( + pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time) self.tb = None - - def test_003_double_eob_rej_tt_update (self): - self.tb = gr.top_block () + def test_003_double_eob_rej_tt_update(self): + self.tb = gr.top_block() start_time = 0.0 - sob_tag = gr.tag_utils.python_to_tag((51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag = gr.tag_utils.python_to_tag((51+(8*11), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) - time_tuple = pmt.make_tuple(pmt.from_uint64(4), pmt.from_double(0.125), pmt.from_uint64(10000000), pmt.from_double(4000000.0)) - time_tag = gr.tag_utils.python_to_tag((360, pmt.intern("rx_time"), time_tuple, pmt.intern("src"))) - sob_tag2 = gr.tag_utils.python_to_tag((400, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag2e = gr.tag_utils.python_to_tag((409, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag2 = gr.tag_utils.python_to_tag((416, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) - vs = blocks.vector_source_s(range(500), False, 1, [sob_tag, eob_tag, time_tag, sob_tag2, eob_tag2e, eob_tag2]) - t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 1000000, ([]), False, 0, start_time) + sob_tag = gr.tag_utils.python_to_tag( + (51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag = gr.tag_utils.python_to_tag( + (51 + (8 * 11), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + time_tuple = pmt.make_tuple(pmt.from_uint64(4), pmt.from_double( + 0.125), pmt.from_uint64(10000000), pmt.from_double(4000000.0)) + time_tag = gr.tag_utils.python_to_tag( + (360, pmt.intern("rx_time"), time_tuple, pmt.intern("src"))) + sob_tag2 = gr.tag_utils.python_to_tag( + (400, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag2e = gr.tag_utils.python_to_tag( + (409, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag2 = gr.tag_utils.python_to_tag( + (416, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + vs = blocks.vector_source_s(range(500), False, 1, [ + sob_tag, eob_tag, time_tag, sob_tag2, eob_tag2e, eob_tag2]) + t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern( + 'EOB'), 1024, 1000000, ([]), False, 0, start_time) t2p.set_eob_parameters(8, 0) dbg = blocks.message_debug() self.tb.connect(vs, t2p) self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store')) - expected_vec1 = pmt.init_s16vector((8*11), range(51,51+(8*11))) - expected_vec2 = pmt.init_s16vector(16, list(range(400,409)) + [0]*7) + expected_vec1 = pmt.init_s16vector((8 * 11), range(51, 51 + (8 * 11))) + expected_vec2 = pmt.init_s16vector(16, list(range(400, 409)) + [0] * 7) expected_time1 = start_time + (51 / 1000000.0) - expected_time2 = 4.125 + ((400-360) / 1000000.0) + expected_time2 = 4.125 + ((400 - 360) / 1000000.0) - self.tb.run () + self.tb.run() self.assertEqual(dbg.num_messages(), 2) self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec1)) self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(1)), expected_vec2)) - time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) - time_tuple2 = pmt.dict_ref(pmt.car(dbg.get_message(1)), pmt.intern("rx_time"), pmt.PMT_NIL) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time1) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple2,0)) + pmt.to_double(pmt.tuple_ref(time_tuple2,1)), expected_time2) + time_tuple1 = pmt.dict_ref( + pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) + time_tuple2 = pmt.dict_ref( + pmt.car(dbg.get_message(1)), pmt.intern("rx_time"), pmt.PMT_NIL) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time1) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple2, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple2, 1)), expected_time2) self.tb = None - def test_004_boost_time (self): - self.tb = gr.top_block () + def test_004_boost_time(self): + self.tb = gr.top_block() start_time = 0.1 - sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag = gr.tag_utils.python_to_tag((34+(8*31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + sob_tag = gr.tag_utils.python_to_tag( + (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag = gr.tag_utils.python_to_tag( + (34 + (8 * 31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) vs = blocks.vector_source_s(range(350), False, 1, [sob_tag, eob_tag]) - t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time) + t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern( + 'EOB'), 1024, 512000, ([]), False, 0, start_time) t2p.enable_time_debug(True) t2p.set_eob_parameters(8, 0) dbg = blocks.message_debug() self.tb.connect(vs, t2p) self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store')) - expected_vec = pmt.init_s16vector((8*31), range(34,34+(8*31))) + expected_vec = pmt.init_s16vector((8 * 31), range(34, 34 + (8 * 31))) expected_time = start_time + (34 / 512000.0) ts = time.time() - self.tb.run () + self.tb.run() self.assertEqual(dbg.num_messages(), 1) self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec)) - time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time) + time_tuple1 = pmt.dict_ref( + pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time) #wct = pmt.to_double(pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("wall_clock_time"), pmt.PMT_NIL)) #self.assertTrue((wct - ts) < 1.0) self.tb = None - def test_005_two_sobs_misaligned (self): + def test_005_two_sobs_misaligned(self): # Two SOB tags and the SOB-to-EOB length is not aligned - self.tb = gr.top_block () + self.tb = gr.top_block() start_time = 0.1 - sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - sob_tag2 = gr.tag_utils.python_to_tag((35, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag = gr.tag_utils.python_to_tag((34+(8*31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) - vs = blocks.vector_source_s(range(1350), False, 1, [sob_tag, sob_tag2, eob_tag]) - t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time) + sob_tag = gr.tag_utils.python_to_tag( + (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + sob_tag2 = gr.tag_utils.python_to_tag( + (35, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag = gr.tag_utils.python_to_tag( + (34 + (8 * 31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + vs = blocks.vector_source_s(range(1350), False, 1, [ + sob_tag, sob_tag2, eob_tag]) + t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern( + 'EOB'), 1024, 512000, ([]), False, 0, start_time) t2p.set_eob_parameters(8, 0) dbg = blocks.message_debug() self.tb.connect(vs, t2p) self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store')) - expected_vec = pmt.init_s16vector((8*31), list(range(35,34+(8*31))) + [0]) + expected_vec = pmt.init_s16vector( + (8 * 31), list(range(35, 34 + (8 * 31))) + [0]) expected_time = start_time + (35 / 512000.0) - self.tb.run () + self.tb.run() self.assertEqual(dbg.num_messages(), 1) - #print "got ", dbg.get_message(0) - #print "expected", expected_vec - #print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0))))) + # print "got ", dbg.get_message(0) + # print "expected", expected_vec + # print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0))))) self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec)) - time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time) + time_tuple1 = pmt.dict_ref( + pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time) self.tb = None - def test_006_max_pdu_size (self): + def test_006_max_pdu_size(self): # two SOB tags exactly max_pdu_size samples apart, with an SOB-to-EOB length that is not divisible by the alignment size - self.tb = gr.top_block () + self.tb = gr.top_block() start_time = 0.1 max_size = 100 - sob_tag = gr.tag_utils.python_to_tag((10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - eob_tag = gr.tag_utils.python_to_tag((91, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) - sob_tag3 = gr.tag_utils.python_to_tag((11+max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - vs = blocks.vector_source_s(range(1350), False, 1, [sob_tag, eob_tag, sob_tag3]) - t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time) + sob_tag = gr.tag_utils.python_to_tag( + (10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + eob_tag = gr.tag_utils.python_to_tag( + (91, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src"))) + sob_tag3 = gr.tag_utils.python_to_tag( + (11 + max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + vs = blocks.vector_source_s(range(1350), False, 1, [ + sob_tag, eob_tag, sob_tag3]) + t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern( + 'EOB'), 1024, 512000, ([]), False, 0, start_time) t2p.set_eob_parameters(10, 0) t2p.set_max_pdu_size(max_size) dbg = blocks.message_debug() self.tb.connect(vs, t2p) self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store')) - expected_vec = pmt.init_s16vector((9*10), list(range(10,91)) + [0]*9) + expected_vec = pmt.init_s16vector( + (9 * 10), list(range(10, 91)) + [0] * 9) expected_time = start_time + (10 / 512000.0) - self.tb.run () + self.tb.run() # assertions for the first PDU only, second PDU will exist self.assertEqual(dbg.num_messages(), 2) - #print "got ", dbg.get_message(0) - #print "expected", expected_vec - #print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0))))) + # print "got ", dbg.get_message(0) + # print "expected", expected_vec + # print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0))))) self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec)) - time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time) + time_tuple1 = pmt.dict_ref( + pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time) self.tb = None - def test_007_max_pdu_size_SOBs (self): + def test_007_max_pdu_size_SOBs(self): # two SOB tags exactly max_pdu_size samples apart - self.tb = gr.top_block () + self.tb = gr.top_block() start_time = 0.1 max_size = 100 - sob_tag = gr.tag_utils.python_to_tag((10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) - sob_tag3 = gr.tag_utils.python_to_tag((10+max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + sob_tag = gr.tag_utils.python_to_tag( + (10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) + sob_tag3 = gr.tag_utils.python_to_tag( + (10 + max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src"))) vs = blocks.vector_source_s(range(1350), False, 1, [sob_tag, sob_tag3]) - t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time) + t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern( + 'EOB'), 1024, 512000, ([]), False, 0, start_time) t2p.set_eob_parameters(10, 0) t2p.set_max_pdu_size(max_size) dbg = blocks.message_debug() self.tb.connect(vs, t2p) self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store')) - expected_vec = pmt.init_s16vector((max_size), range(10,10+max_size)) + expected_vec = pmt.init_s16vector((max_size), range(10, 10 + max_size)) expected_time = start_time + (10 / 512000.0) - self.tb.run () + self.tb.run() # assertions for the first PDU only, second PDU will exist self.assertEqual(dbg.num_messages(), 2) - #print "got ", dbg.get_message(0) - #print "expected", expected_vec + # print "got ", dbg.get_message(0) + # print "expected", expected_vec self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec)) - time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) - self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time) + time_tuple1 = pmt.dict_ref( + pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL) + self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref( + time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time) self.tb = None diff --git a/gr-pdu/python/pdu/qa_take_skip_to_pdu.py b/gr-pdu/python/pdu/qa_take_skip_to_pdu.py index 76414c857a..bcc33e6f4f 100755 --- a/gr-pdu/python/pdu/qa_take_skip_to_pdu.py +++ b/gr-pdu/python/pdu/qa_take_skip_to_pdu.py @@ -14,6 +14,7 @@ import pmt import time import numpy + class qa_take_skip_to_pdu_X (gr_unittest.TestCase): # this method is necessary because by default pmt.equal does not evaluate @@ -34,67 +35,70 @@ class qa_take_skip_to_pdu_X (gr_unittest.TestCase): print("vectors not equal? " + repr(vec1) + repr(vec2)) self.assertTrue(False) - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_f_32 (self): - self.source = blocks.vector_source_f(range(0,32*3), False, 1, []) + def test_001_f_32(self): + self.source = blocks.vector_source_f(range(0, 32 * 3), False, 1, []) self.ts_pdu = pdu.take_skip_to_pdu_f(32, 32) self.debug = blocks.message_debug() self.tb.connect((self.source, 0), (self.ts_pdu, 0)) self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store')) - dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(0)) - vec = pmt.init_f32vector(32, range(0,32)) - expected = pmt.cons(dic,vec) - self.tb.run () + dic = pmt.dict_add(pmt.make_dict(), pmt.intern( + "pdu_num"), pmt.from_uint64(0)) + vec = pmt.init_f32vector(32, range(0, 32)) + expected = pmt.cons(dic, vec) + self.tb.run() actual = self.debug.get_message(0) self.assertEqualPDU(actual, expected) - def test_002_c_80 (self): - self.source = blocks.vector_source_c(range(0,32*3), False, 1, []) + def test_002_c_80(self): + self.source = blocks.vector_source_c(range(0, 32 * 3), False, 1, []) self.ts_pdu = pdu.take_skip_to_pdu_c(80, 32) self.debug = blocks.message_debug() self.tb.connect((self.source, 0), (self.ts_pdu, 0)) self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store')) - dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(0)) - vec = pmt.init_c32vector(80, range(0,80)) - expected = pmt.cons(dic,vec) - self.tb.run () + dic = pmt.dict_add(pmt.make_dict(), pmt.intern( + "pdu_num"), pmt.from_uint64(0)) + vec = pmt.init_c32vector(80, range(0, 80)) + expected = pmt.cons(dic, vec) + self.tb.run() actual = self.debug.get_message(0) self.assertEqualPDU(actual, expected) - - def test_003_s_2_11_7 (self): - self.source = blocks.vector_source_s(range(0,32*3), False, 1, []) + def test_003_s_2_11_7(self): + self.source = blocks.vector_source_s(range(0, 32 * 3), False, 1, []) self.ts_pdu = pdu.take_skip_to_pdu_s(2, 11) self.debug = blocks.message_debug() self.tb.connect((self.source, 0), (self.ts_pdu, 0)) self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store')) - dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(7)) - vec = pmt.init_s16vector(2, list(range(91,93))) - expected = pmt.cons(dic,vec) - self.tb.run () + dic = pmt.dict_add(pmt.make_dict(), pmt.intern( + "pdu_num"), pmt.from_uint64(7)) + vec = pmt.init_s16vector(2, list(range(91, 93))) + expected = pmt.cons(dic, vec) + self.tb.run() actual = self.debug.get_message(7) self.assertEqualPDU(actual, expected) - - def test_004_b_512 (self): - self.source = blocks.vector_source_b(list(range(0,256))*4, False, 1, []) - self.ts_pdu = pdu.take_skip_to_pdu_b(512,1) + def test_004_b_512(self): + self.source = blocks.vector_source_b( + list(range(0, 256)) * 4, False, 1, []) + self.ts_pdu = pdu.take_skip_to_pdu_b(512, 1) self.debug = blocks.message_debug() self.tb.connect((self.source, 0), (self.ts_pdu, 0)) self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store')) - dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(0)) - vec = pmt.init_u8vector(512, list(range(0,256))*2) - expected = pmt.cons(dic,vec) - self.tb.run () + dic = pmt.dict_add(pmt.make_dict(), pmt.intern( + "pdu_num"), pmt.from_uint64(0)) + vec = pmt.init_u8vector(512, list(range(0, 256)) * 2) + expected = pmt.cons(dic, vec) + self.tb.run() actual = self.debug.get_message(0) self.assertEqualPDU(actual, expected) diff --git a/gr-pdu/python/pdu/qa_time_delta.py b/gr-pdu/python/pdu/qa_time_delta.py index 0cb0f99020..2003419ba7 100755 --- a/gr-pdu/python/pdu/qa_time_delta.py +++ b/gr-pdu/python/pdu/qa_time_delta.py @@ -20,7 +20,8 @@ class qa_time_delta(gr_unittest.TestCase): def setUp(self): self.tb = gr.top_block() - self.time_delta = pdu.time_delta(pmt.intern("sys time delta (ms)"), pmt.intern("system_time")) + self.time_delta = pdu.time_delta(pmt.intern( + "sys time delta (ms)"), pmt.intern("system_time")) self.debug = blocks.message_debug() self.tb.msg_connect((self.time_delta, 'pdu'), (self.debug, 'store')) @@ -30,8 +31,9 @@ class qa_time_delta(gr_unittest.TestCase): def test_001_invalid_a(self): self.tb.start() - self.time_delta.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("NOT A PDU")) - time.sleep(0.01) # short delay to ensure the message is processed + self.time_delta.to_basic_block()._post( + pmt.intern("pdu"), pmt.intern("NOT A PDU")) + time.sleep(0.01) # short delay to ensure the message is processed self.tb.stop() self.tb.wait() @@ -39,14 +41,16 @@ class qa_time_delta(gr_unittest.TestCase): self.assertEqual(0, self.debug.num_messages()) def test_001_invalid_b(self): - in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - meta = pmt.dict_add(pmt.make_dict(), pmt.intern('sam'), pmt.from_double(25.1)) + in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + meta = pmt.dict_add(pmt.make_dict(), pmt.intern( + 'sam'), pmt.from_double(25.1)) in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data)) # set up fg self.tb.start() self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu) - time.sleep(0.01) # short delay to ensure the message is processed + time.sleep(0.01) # short delay to ensure the message is processed self.tb.stop() self.tb.wait() @@ -56,19 +60,25 @@ class qa_time_delta(gr_unittest.TestCase): def test_002_normal(self): tnow = time.time() - in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow - 10.0)) + in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + meta = pmt.dict_add(pmt.make_dict(), pmt.intern( + 'system_time'), pmt.from_double(tnow - 10.0)) in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data)) - e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] - e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow)) - e_meta = pmt.dict_add(e_meta, pmt.intern('sys time delta (ms)'), pmt.from_double(10000.0)) + e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1] + e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern( + 'system_time'), pmt.from_double(tnow)) + e_meta = pmt.dict_add(e_meta, pmt.intern( + 'sys time delta (ms)'), pmt.from_double(10000.0)) e_pdu = pmt.cons(e_meta, pmt.init_c32vector(len(e_data), e_data)) # set up fg self.tb.start() self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu) - self.waitFor(lambda: self.debug.num_messages() == 1, timeout=1.0, poll_interval=0.01) + self.waitFor(lambda: self.debug.num_messages() == + 1, timeout=1.0, poll_interval=0.01) self.tb.stop() self.tb.wait() @@ -76,7 +86,8 @@ class qa_time_delta(gr_unittest.TestCase): self.assertEqual(1, self.debug.num_messages()) a_meta = pmt.car(self.debug.get_message(0)) time_tag = pmt.dict_ref(a_meta, pmt.intern("system_time"), pmt.PMT_NIL) - delta_tag = pmt.dict_ref(a_meta, pmt.intern("sys time delta (ms)"), pmt.PMT_NIL) + delta_tag = pmt.dict_ref(a_meta, pmt.intern( + "sys time delta (ms)"), pmt.PMT_NIL) self.assertAlmostEqual(tnow, pmt.to_double(time_tag), delta=60) self.assertAlmostEqual(10000, pmt.to_double(delta_tag), delta=10) |