summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gr-pdu/python/pdu/pdu_lambda.py24
-rwxr-xr-xgr-pdu/python/pdu/qa_add_system_time.py21
-rw-r--r--gr-pdu/python/pdu/qa_pdu.py1
-rwxr-xr-xgr-pdu/python/pdu/qa_pdu_lambda.py3
-rw-r--r--gr-pdu/python/pdu/qa_pdu_split.py65
-rwxr-xr-xgr-pdu/python/pdu/qa_pdu_to_stream.py67
-rwxr-xr-xgr-pdu/python/pdu/qa_tags_to_pdu.py232
-rwxr-xr-xgr-pdu/python/pdu/qa_take_skip_to_pdu.py64
-rwxr-xr-xgr-pdu/python/pdu/qa_time_delta.py37
9 files changed, 313 insertions, 201 deletions
diff --git a/gr-pdu/python/pdu/pdu_lambda.py b/gr-pdu/python/pdu/pdu_lambda.py
index e6a286ed2a..d80e679562 100644
--- a/gr-pdu/python/pdu/pdu_lambda.py
+++ b/gr-pdu/python/pdu/pdu_lambda.py
@@ -10,7 +10,9 @@
from gnuradio import gr
import numpy as np
-import math, pmt, time
+import math
+import pmt
+import time
class pdu_lambda(gr.basic_block):
@@ -40,8 +42,8 @@ class pdu_lambda(gr.basic_block):
def __init__(self, fn, metadict, key=pmt.PMT_NIL):
gr.basic_block.__init__(self,
- name="pdu_lambda",
- in_sig=[],out_sig=[])
+ name="pdu_lambda",
+ in_sig=[], out_sig=[])
self.set_fn(fn)
self.set_key(key)
self.metadict_mode = metadict
@@ -61,7 +63,7 @@ class pdu_lambda(gr.basic_block):
print(e)
pass
self.message_port_pub(pmt.intern("pdu"),
- pmt.cons(meta, pmt.cdr(pdu)));
+ pmt.cons(meta, pmt.cdr(pdu)))
elif self.metadict_mode == "UVEC":
vec = pmt.cdr(pdu)
@@ -72,8 +74,8 @@ class pdu_lambda(gr.basic_block):
pass
self.message_port_pub(pmt.intern("pdu"),
- pmt.cons(pmt.car(pdu), vec));
-
+ pmt.cons(pmt.car(pdu), vec))
+
elif self.metadict_mode == "RAW":
# TODO: This is more of a "message lambda" block, in the future it should be
# a separate block outside the PDU namespace
@@ -83,14 +85,14 @@ class pdu_lambda(gr.basic_block):
print(e)
pass
self.message_port_pub(pmt.intern("pdu"), pdu)
-
+
else:
- raise ValueError("pdu_lambda block instantiated in unknown mode " + repr(self.metadict_mode))
+ raise ValueError(
+ "pdu_lambda block instantiated in unknown mode " + repr(self.metadict_mode))
pass
-
- def set_fn(self,fn):
+ def set_fn(self, fn):
self.fn = fn
- def set_key(self,key):
+ def set_key(self, key):
self.key = key
diff --git a/gr-pdu/python/pdu/qa_add_system_time.py b/gr-pdu/python/pdu/qa_add_system_time.py
index 44d0ad4bef..0688122a6b 100755
--- a/gr-pdu/python/pdu/qa_add_system_time.py
+++ b/gr-pdu/python/pdu/qa_add_system_time.py
@@ -32,29 +32,35 @@ class qa_add_system_time(gr_unittest.TestCase):
def test_001_basic_io(self):
self.tb.start()
# provide two non PDU inputs and one PDU input
- self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("BAD PDU"))
+ self.add_sys_time.to_basic_block()._post(
+ pmt.intern("pdu"), pmt.intern("BAD PDU"))
self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"),
pmt.cons(pmt.from_long(4), pmt.PMT_NIL))
self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"),
pmt.cons(pmt.make_dict(), pmt.init_f32vector(1, [0.0])))
- self.waitFor(lambda: self.debug.num_messages() >= 1, timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: self.debug.num_messages() >=
+ 1, timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
# make sure we got one message and it has a systime key
self.assertEqual(1, self.debug.num_messages())
- self.assertTrue(pmt.dict_has_key(pmt.car(self.debug.get_message(0)), pmt.intern('systime')))
+ self.assertTrue(pmt.dict_has_key(
+ pmt.car(self.debug.get_message(0)), pmt.intern('systime')))
def test_002_timing(self):
self.tb.start()
- self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("BAD PDU"))
+ self.add_sys_time.to_basic_block()._post(
+ pmt.intern("pdu"), pmt.intern("BAD PDU"))
self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"),
pmt.cons(pmt.make_dict(), pmt.init_u8vector(1, [0])))
- time.sleep(1.0) # wait for one second to provide a time difference between messages
+ # wait for one second to provide a time difference between messages
+ time.sleep(1.0)
self.add_sys_time.to_basic_block()._post(pmt.intern("pdu"),
pmt.cons(pmt.make_dict(), pmt.init_u8vector(1, [0])))
- self.waitFor(lambda: self.debug.num_messages() == 2, timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: self.debug.num_messages() ==
+ 2, timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
@@ -64,7 +70,8 @@ class qa_add_system_time(gr_unittest.TestCase):
t1 = pmt.to_double(pmt.dict_ref(pmt.car(self.debug.get_message(1)),
pmt.intern("systime"),
pmt.from_double(0.0)))
- self.assertTrue(((t1 - t0) - 1) < 0.05) # should be sufficient tolerance
+ # should be sufficient tolerance
+ self.assertTrue(((t1 - t0) - 1) < 0.05)
if __name__ == '__main__':
diff --git a/gr-pdu/python/pdu/qa_pdu.py b/gr-pdu/python/pdu/qa_pdu.py
index 388a93b931..e194b5061a 100644
--- a/gr-pdu/python/pdu/qa_pdu.py
+++ b/gr-pdu/python/pdu/qa_pdu.py
@@ -144,5 +144,6 @@ class test_pdu(gr_unittest.TestCase):
data = pmt.u8vector_elements(pmt.cdr(dbg.get_message(0)))
self.assertEqual([1, 2, 3], data)
+
if __name__ == '__main__':
gr_unittest.run(test_pdu)
diff --git a/gr-pdu/python/pdu/qa_pdu_lambda.py b/gr-pdu/python/pdu/qa_pdu_lambda.py
index f93475b8d9..6f32695d8a 100755
--- a/gr-pdu/python/pdu/qa_pdu_lambda.py
+++ b/gr-pdu/python/pdu/qa_pdu_lambda.py
@@ -11,6 +11,7 @@
from gnuradio import gr, gr_unittest
from gnuradio import pdu
+
class qa_pdu_lambda(gr_unittest.TestCase):
def setUp(self):
@@ -21,7 +22,7 @@ class qa_pdu_lambda(gr_unittest.TestCase):
def test_smoketest(self):
# FIXME: Test will fail until you pass sensible arguments to the constructor
- instance = pdu.pdu_lambda(lambda uvec: uvec*10, False)
+ instance = pdu.pdu_lambda(lambda uvec: uvec * 10, False)
def test_001_descriptive_test_name(self):
# set up fg
diff --git a/gr-pdu/python/pdu/qa_pdu_split.py b/gr-pdu/python/pdu/qa_pdu_split.py
index 988fb6b164..a957c4d97d 100644
--- a/gr-pdu/python/pdu/qa_pdu_split.py
+++ b/gr-pdu/python/pdu/qa_pdu_split.py
@@ -16,13 +16,13 @@ import time
class qa_pdu_split (gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_001_split (self):
+ def test_001_split(self):
split = pdu.pdu_split()
d1 = blocks.message_debug()
d2 = blocks.message_debug()
@@ -30,28 +30,36 @@ class qa_pdu_split (gr_unittest.TestCase):
self.tb.msg_connect((split, 'dict'), (d1, 'store'))
self.tb.msg_connect((split, 'vec'), (d2, 'store'))
- in_meta1 = pmt.dict_add(pmt.make_dict(), pmt.intern('num'), pmt.from_long(4))
- in_meta2 = pmt.dict_add(pmt.make_dict(), pmt.intern('n'), pmt.from_long(99))
+ in_meta1 = pmt.dict_add(
+ pmt.make_dict(), pmt.intern('num'), pmt.from_long(4))
+ in_meta2 = pmt.dict_add(
+ pmt.make_dict(), pmt.intern('n'), pmt.from_long(99))
in_pdu = pmt.cons(in_meta1, pmt.init_u8vector(6, range(6)))
self.tb.start()
split.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("MALFORMED PDU"))
- split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(2, range(2))))
- split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, [])))
+ split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(
+ pmt.PMT_NIL, pmt.init_u8vector(2, range(2))))
+ split.to_basic_block()._post(pmt.intern(
+ "pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, [])))
split.to_basic_block()._post(pmt.intern("pdu"), in_pdu)
- split.to_basic_block()._post(pmt.intern("system"), pmt.cons(pmt.intern("done"), pmt.from_long(1)))
- self.waitFor(lambda: d1.num_messages() == 2, timeout=1.0, poll_interval=0.01)
- self.waitFor(lambda: d2.num_messages() == 2, timeout=1.0, poll_interval=0.01)
+ split.to_basic_block()._post(pmt.intern("system"),
+ pmt.cons(pmt.intern("done"), pmt.from_long(1)))
+ self.waitFor(lambda: d1.num_messages() == 2,
+ timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: d2.num_messages() == 2,
+ timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
self.assertTrue(pmt.equal(d1.get_message(0), in_meta2))
self.assertTrue(pmt.equal(d1.get_message(1), in_meta1))
- self.assertTrue(pmt.equal(d2.get_message(0), pmt.init_u8vector(2, range(2))))
- self.assertTrue(pmt.equal(d2.get_message(1), pmt.init_u8vector(6, range(6))))
+ self.assertTrue(pmt.equal(d2.get_message(
+ 0), pmt.init_u8vector(2, range(2))))
+ self.assertTrue(pmt.equal(d2.get_message(
+ 1), pmt.init_u8vector(6, range(6))))
-
- def test_002_pass_empty (self):
+ def test_002_pass_empty(self):
split = pdu.pdu_split(True)
d1 = blocks.message_debug()
d2 = blocks.message_debug()
@@ -59,26 +67,35 @@ class qa_pdu_split (gr_unittest.TestCase):
self.tb.msg_connect((split, 'dict'), (d1, 'store'))
self.tb.msg_connect((split, 'vec'), (d2, 'store'))
- in_meta1 = pmt.dict_add(pmt.make_dict(), pmt.intern('num'), pmt.from_long(4))
- in_meta2 = pmt.dict_add(pmt.make_dict(), pmt.intern('n'), pmt.from_long(99))
+ in_meta1 = pmt.dict_add(
+ pmt.make_dict(), pmt.intern('num'), pmt.from_long(4))
+ in_meta2 = pmt.dict_add(
+ pmt.make_dict(), pmt.intern('n'), pmt.from_long(99))
in_pdu = pmt.cons(in_meta1, pmt.init_u8vector(6, range(6)))
self.tb.start()
split.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("MALFORMED PDU"))
- split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(2, range(2))))
- split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, [])))
+ split.to_basic_block()._post(pmt.intern("pdu"), pmt.cons(
+ pmt.PMT_NIL, pmt.init_u8vector(2, range(2))))
+ split.to_basic_block()._post(pmt.intern(
+ "pdu"), pmt.cons(in_meta2, pmt.init_u8vector(0, [])))
split.to_basic_block()._post(pmt.intern("pdu"), in_pdu)
- split.to_basic_block()._post(pmt.intern("system"), pmt.cons(pmt.intern("done"), pmt.from_long(1)))
- self.waitFor(lambda: d1.num_messages() == 3, timeout=1.0, poll_interval=0.01)
- self.waitFor(lambda: d2.num_messages() == 3, timeout=1.0, poll_interval=0.01)
+ split.to_basic_block()._post(pmt.intern("system"),
+ pmt.cons(pmt.intern("done"), pmt.from_long(1)))
+ self.waitFor(lambda: d1.num_messages() == 3,
+ timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: d2.num_messages() == 3,
+ timeout=1.0, poll_interval=0.01)
self.tb.wait()
self.assertTrue(pmt.equal(d1.get_message(0), pmt.PMT_NIL))
self.assertTrue(pmt.equal(d1.get_message(1), in_meta2))
self.assertTrue(pmt.equal(d1.get_message(2), in_meta1))
- self.assertTrue(pmt.equal(d2.get_message(0), pmt.init_u8vector(2, range(2))))
+ self.assertTrue(pmt.equal(d2.get_message(
+ 0), pmt.init_u8vector(2, range(2))))
self.assertTrue(pmt.equal(d2.get_message(1), pmt.init_u8vector(0, [])))
- self.assertTrue(pmt.equal(d2.get_message(2), pmt.init_u8vector(6, range(6))))
+ self.assertTrue(pmt.equal(d2.get_message(
+ 2), pmt.init_u8vector(6, range(6))))
if __name__ == '__main__':
diff --git a/gr-pdu/python/pdu/qa_pdu_to_stream.py b/gr-pdu/python/pdu/qa_pdu_to_stream.py
index c2f69c2186..9890c1021b 100755
--- a/gr-pdu/python/pdu/qa_pdu_to_stream.py
+++ b/gr-pdu/python/pdu/qa_pdu_to_stream.py
@@ -13,31 +13,37 @@ import numpy as np
import pmt
import time
+
class qa_pdu_to_bursts (gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
self.tb = gr.top_block()
self.p2s = pdu.pdu_to_stream_c(pdu.EARLY_BURST_APPEND, 64)
self.vs = blocks.vector_sink_c(1)
- self.tag_debug = blocks.tag_debug(gr.sizeof_gr_complex*1, '', "")
+ self.tag_debug = blocks.tag_debug(gr.sizeof_gr_complex * 1, '', "")
self.tag_debug.set_display(True)
self.tb.connect((self.p2s, 0), (self.vs, 0))
self.tb.connect((self.p2s, 0), (self.tag_debug, 0))
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_001_basic (self):
- in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- in_pdu = pmt.cons(pmt.make_dict(), pmt.init_c32vector(len(in_data), in_data))
- e_tag_0 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL))
- e_tag_1 = gr.tag_utils.python_to_tag((len(in_data)-1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL))
+ def test_001_basic(self):
+ in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ in_pdu = pmt.cons(
+ pmt.make_dict(), pmt.init_c32vector(len(in_data), in_data))
+ e_tag_0 = gr.tag_utils.python_to_tag(
+ (0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL))
+ e_tag_1 = gr.tag_utils.python_to_tag(
+ (len(in_data) - 1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL))
self.tb.start()
self.p2s.to_basic_block()._post(pmt.intern("pdus"), pmt.intern("MALFORMED PDU"))
self.p2s.to_basic_block()._post(pmt.intern("pdus"), in_pdu)
- self.waitFor(lambda: len(self.vs.tags()) == 2, timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: len(self.vs.tags()) == 2,
+ timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
@@ -51,19 +57,26 @@ class qa_pdu_to_bursts (gr_unittest.TestCase):
self.assertTrue(pmt.equal(tags[1].value, e_tag_1.value))
self.assertTrue((in_data == np.real(self.vs.data())).all())
- def test_002_timed (self):
- in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- tag_time = pmt.make_tuple(pmt.from_uint64(11), pmt.from_double(0.123456))
- in_dict = pmt.dict_add(pmt.make_dict(), pmt.intern("tx_time"), tag_time)
+ def test_002_timed(self):
+ in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ tag_time = pmt.make_tuple(
+ pmt.from_uint64(11), pmt.from_double(0.123456))
+ in_dict = pmt.dict_add(
+ pmt.make_dict(), pmt.intern("tx_time"), tag_time)
in_pdu = pmt.cons(in_dict, pmt.init_c32vector(len(in_data), in_data))
- e_tag_0 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL))
- e_tag_1 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL))
- e_tag_2 = gr.tag_utils.python_to_tag((len(in_data)-1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL))
+ e_tag_0 = gr.tag_utils.python_to_tag(
+ (0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL))
+ e_tag_1 = gr.tag_utils.python_to_tag(
+ (0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL))
+ e_tag_2 = gr.tag_utils.python_to_tag(
+ (len(in_data) - 1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL))
self.tb.start()
self.p2s.to_basic_block()._post(pmt.intern("pdus"), pmt.intern("MALFORMED PDU"))
self.p2s.to_basic_block()._post(pmt.intern("pdus"), in_pdu)
- self.waitFor(lambda: len(self.vs.tags()) == 3, timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: len(self.vs.tags()) == 3,
+ timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
@@ -80,19 +93,25 @@ class qa_pdu_to_bursts (gr_unittest.TestCase):
self.assertTrue(pmt.equal(tags[2].value, e_tag_2.value))
self.assertTrue((in_data == np.real(self.vs.data())).all())
- def test_003_timed_long (self):
+ def test_003_timed_long(self):
in_data = np.arange(25000).tolist()
- tag_time = pmt.make_tuple(pmt.from_uint64(11), pmt.from_double(0.123456))
- in_dict = pmt.dict_add(pmt.make_dict(), pmt.intern("tx_time"), tag_time)
+ tag_time = pmt.make_tuple(
+ pmt.from_uint64(11), pmt.from_double(0.123456))
+ in_dict = pmt.dict_add(
+ pmt.make_dict(), pmt.intern("tx_time"), tag_time)
in_pdu = pmt.cons(in_dict, pmt.init_c32vector(len(in_data), in_data))
- e_tag_0 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL))
- e_tag_1 = gr.tag_utils.python_to_tag((0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL))
- e_tag_2 = gr.tag_utils.python_to_tag((len(in_data)-1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL))
+ e_tag_0 = gr.tag_utils.python_to_tag(
+ (0, pmt.intern("tx_sob"), pmt.PMT_T, pmt.PMT_NIL))
+ e_tag_1 = gr.tag_utils.python_to_tag(
+ (0, pmt.intern("tx_time"), tag_time, pmt.PMT_NIL))
+ e_tag_2 = gr.tag_utils.python_to_tag(
+ (len(in_data) - 1, pmt.intern("tx_eob"), pmt.PMT_T, pmt.PMT_NIL))
self.tb.start()
self.p2s.to_basic_block()._post(pmt.intern("pdus"), pmt.intern("MALFORMED PDU"))
self.p2s.to_basic_block()._post(pmt.intern("pdus"), in_pdu)
- self.waitFor(lambda: len(self.vs.tags()) == 3, timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: len(self.vs.tags()) == 3,
+ timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
diff --git a/gr-pdu/python/pdu/qa_tags_to_pdu.py b/gr-pdu/python/pdu/qa_tags_to_pdu.py
index de12af4f20..9e3be51199 100755
--- a/gr-pdu/python/pdu/qa_tags_to_pdu.py
+++ b/gr-pdu/python/pdu/qa_tags_to_pdu.py
@@ -13,211 +13,261 @@ import numpy as np
import pmt
import time
+
class qa_tags_to_pdu (gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
pass
- def tearDown (self):
+ def tearDown(self):
pass
- def test_001_simple (self):
- self.tb = gr.top_block ()
+ def test_001_simple(self):
+ self.tb = gr.top_block()
start_time = 0.1
- sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag = gr.tag_utils.python_to_tag((34+(8*31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ sob_tag = gr.tag_utils.python_to_tag(
+ (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag = gr.tag_utils.python_to_tag(
+ (34 + (8 * 31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
vs = blocks.vector_source_s(range(350), False, 1, [sob_tag, eob_tag])
- t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time)
+ t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern(
+ 'EOB'), 1024, 512000, ([]), False, 0, start_time)
t2p.set_eob_parameters(8, 0)
dbg = blocks.message_debug()
self.tb.connect(vs, t2p)
self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store'))
- expected_vec = pmt.init_s16vector((8*31), range(34,34+(8*31)))
+ expected_vec = pmt.init_s16vector((8 * 31), range(34, 34 + (8 * 31)))
expected_time = start_time + (34 / 512000.0)
- self.tb.run ()
+ self.tb.run()
self.assertEqual(dbg.num_messages(), 1)
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec))
- time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time)
+ time_tuple1 = pmt.dict_ref(
+ pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time)
self.tb = None
-
- def test_002_secondSOB (self):
- self.tb = gr.top_block ()
+ def test_002_secondSOB(self):
+ self.tb = gr.top_block()
start_time = 4.999999999
- sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- sob_tag2 = gr.tag_utils.python_to_tag((51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag = gr.tag_utils.python_to_tag((51+(8*26), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
- vs = blocks.vector_source_s(range(350), False, 1, [sob_tag, sob_tag2, eob_tag])
- t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 460800, ([]), False, 0, start_time)
+ sob_tag = gr.tag_utils.python_to_tag(
+ (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ sob_tag2 = gr.tag_utils.python_to_tag(
+ (51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag = gr.tag_utils.python_to_tag(
+ (51 + (8 * 26), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ vs = blocks.vector_source_s(range(350), False, 1, [
+ sob_tag, sob_tag2, eob_tag])
+ t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern(
+ 'EOB'), 1024, 460800, ([]), False, 0, start_time)
t2p.set_eob_parameters(8, 0)
dbg = blocks.message_debug()
self.tb.connect(vs, t2p)
self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store'))
- expected_vec = pmt.init_s16vector((8*26), range(51,51+(8*26)))
+ expected_vec = pmt.init_s16vector((8 * 26), range(51, 51 + (8 * 26)))
expected_time = start_time + (51 / 460800.0)
- self.tb.run ()
+ self.tb.run()
self.assertEqual(dbg.num_messages(), 1)
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec))
- time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time)
+ time_tuple1 = pmt.dict_ref(
+ pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time)
self.tb = None
-
- def test_003_double_eob_rej_tt_update (self):
- self.tb = gr.top_block ()
+ def test_003_double_eob_rej_tt_update(self):
+ self.tb = gr.top_block()
start_time = 0.0
- sob_tag = gr.tag_utils.python_to_tag((51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag = gr.tag_utils.python_to_tag((51+(8*11), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
- time_tuple = pmt.make_tuple(pmt.from_uint64(4), pmt.from_double(0.125), pmt.from_uint64(10000000), pmt.from_double(4000000.0))
- time_tag = gr.tag_utils.python_to_tag((360, pmt.intern("rx_time"), time_tuple, pmt.intern("src")))
- sob_tag2 = gr.tag_utils.python_to_tag((400, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag2e = gr.tag_utils.python_to_tag((409, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag2 = gr.tag_utils.python_to_tag((416, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
- vs = blocks.vector_source_s(range(500), False, 1, [sob_tag, eob_tag, time_tag, sob_tag2, eob_tag2e, eob_tag2])
- t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 1000000, ([]), False, 0, start_time)
+ sob_tag = gr.tag_utils.python_to_tag(
+ (51, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag = gr.tag_utils.python_to_tag(
+ (51 + (8 * 11), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ time_tuple = pmt.make_tuple(pmt.from_uint64(4), pmt.from_double(
+ 0.125), pmt.from_uint64(10000000), pmt.from_double(4000000.0))
+ time_tag = gr.tag_utils.python_to_tag(
+ (360, pmt.intern("rx_time"), time_tuple, pmt.intern("src")))
+ sob_tag2 = gr.tag_utils.python_to_tag(
+ (400, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag2e = gr.tag_utils.python_to_tag(
+ (409, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag2 = gr.tag_utils.python_to_tag(
+ (416, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ vs = blocks.vector_source_s(range(500), False, 1, [
+ sob_tag, eob_tag, time_tag, sob_tag2, eob_tag2e, eob_tag2])
+ t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern(
+ 'EOB'), 1024, 1000000, ([]), False, 0, start_time)
t2p.set_eob_parameters(8, 0)
dbg = blocks.message_debug()
self.tb.connect(vs, t2p)
self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store'))
- expected_vec1 = pmt.init_s16vector((8*11), range(51,51+(8*11)))
- expected_vec2 = pmt.init_s16vector(16, list(range(400,409)) + [0]*7)
+ expected_vec1 = pmt.init_s16vector((8 * 11), range(51, 51 + (8 * 11)))
+ expected_vec2 = pmt.init_s16vector(16, list(range(400, 409)) + [0] * 7)
expected_time1 = start_time + (51 / 1000000.0)
- expected_time2 = 4.125 + ((400-360) / 1000000.0)
+ expected_time2 = 4.125 + ((400 - 360) / 1000000.0)
- self.tb.run ()
+ self.tb.run()
self.assertEqual(dbg.num_messages(), 2)
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec1))
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(1)), expected_vec2))
- time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
- time_tuple2 = pmt.dict_ref(pmt.car(dbg.get_message(1)), pmt.intern("rx_time"), pmt.PMT_NIL)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time1)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple2,0)) + pmt.to_double(pmt.tuple_ref(time_tuple2,1)), expected_time2)
+ time_tuple1 = pmt.dict_ref(
+ pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ time_tuple2 = pmt.dict_ref(
+ pmt.car(dbg.get_message(1)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time1)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple2, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple2, 1)), expected_time2)
self.tb = None
- def test_004_boost_time (self):
- self.tb = gr.top_block ()
+ def test_004_boost_time(self):
+ self.tb = gr.top_block()
start_time = 0.1
- sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag = gr.tag_utils.python_to_tag((34+(8*31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ sob_tag = gr.tag_utils.python_to_tag(
+ (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag = gr.tag_utils.python_to_tag(
+ (34 + (8 * 31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
vs = blocks.vector_source_s(range(350), False, 1, [sob_tag, eob_tag])
- t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time)
+ t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern(
+ 'EOB'), 1024, 512000, ([]), False, 0, start_time)
t2p.enable_time_debug(True)
t2p.set_eob_parameters(8, 0)
dbg = blocks.message_debug()
self.tb.connect(vs, t2p)
self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store'))
- expected_vec = pmt.init_s16vector((8*31), range(34,34+(8*31)))
+ expected_vec = pmt.init_s16vector((8 * 31), range(34, 34 + (8 * 31)))
expected_time = start_time + (34 / 512000.0)
ts = time.time()
- self.tb.run ()
+ self.tb.run()
self.assertEqual(dbg.num_messages(), 1)
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec))
- time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time)
+ time_tuple1 = pmt.dict_ref(
+ pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time)
#wct = pmt.to_double(pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("wall_clock_time"), pmt.PMT_NIL))
#self.assertTrue((wct - ts) < 1.0)
self.tb = None
- def test_005_two_sobs_misaligned (self):
+ def test_005_two_sobs_misaligned(self):
# Two SOB tags and the SOB-to-EOB length is not aligned
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
start_time = 0.1
- sob_tag = gr.tag_utils.python_to_tag((34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- sob_tag2 = gr.tag_utils.python_to_tag((35, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag = gr.tag_utils.python_to_tag((34+(8*31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
- vs = blocks.vector_source_s(range(1350), False, 1, [sob_tag, sob_tag2, eob_tag])
- t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time)
+ sob_tag = gr.tag_utils.python_to_tag(
+ (34, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ sob_tag2 = gr.tag_utils.python_to_tag(
+ (35, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag = gr.tag_utils.python_to_tag(
+ (34 + (8 * 31), pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ vs = blocks.vector_source_s(range(1350), False, 1, [
+ sob_tag, sob_tag2, eob_tag])
+ t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern(
+ 'EOB'), 1024, 512000, ([]), False, 0, start_time)
t2p.set_eob_parameters(8, 0)
dbg = blocks.message_debug()
self.tb.connect(vs, t2p)
self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store'))
- expected_vec = pmt.init_s16vector((8*31), list(range(35,34+(8*31))) + [0])
+ expected_vec = pmt.init_s16vector(
+ (8 * 31), list(range(35, 34 + (8 * 31))) + [0])
expected_time = start_time + (35 / 512000.0)
- self.tb.run ()
+ self.tb.run()
self.assertEqual(dbg.num_messages(), 1)
- #print "got ", dbg.get_message(0)
- #print "expected", expected_vec
- #print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0)))))
+ # print "got ", dbg.get_message(0)
+ # print "expected", expected_vec
+ # print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0)))))
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec))
- time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time)
+ time_tuple1 = pmt.dict_ref(
+ pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time)
self.tb = None
- def test_006_max_pdu_size (self):
+ def test_006_max_pdu_size(self):
# two SOB tags exactly max_pdu_size samples apart, with an SOB-to-EOB length that is not divisible by the alignment size
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
start_time = 0.1
max_size = 100
- sob_tag = gr.tag_utils.python_to_tag((10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- eob_tag = gr.tag_utils.python_to_tag((91, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
- sob_tag3 = gr.tag_utils.python_to_tag((11+max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- vs = blocks.vector_source_s(range(1350), False, 1, [sob_tag, eob_tag, sob_tag3])
- t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time)
+ sob_tag = gr.tag_utils.python_to_tag(
+ (10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ eob_tag = gr.tag_utils.python_to_tag(
+ (91, pmt.intern("EOB"), pmt.PMT_T, pmt.intern("src")))
+ sob_tag3 = gr.tag_utils.python_to_tag(
+ (11 + max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ vs = blocks.vector_source_s(range(1350), False, 1, [
+ sob_tag, eob_tag, sob_tag3])
+ t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern(
+ 'EOB'), 1024, 512000, ([]), False, 0, start_time)
t2p.set_eob_parameters(10, 0)
t2p.set_max_pdu_size(max_size)
dbg = blocks.message_debug()
self.tb.connect(vs, t2p)
self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store'))
- expected_vec = pmt.init_s16vector((9*10), list(range(10,91)) + [0]*9)
+ expected_vec = pmt.init_s16vector(
+ (9 * 10), list(range(10, 91)) + [0] * 9)
expected_time = start_time + (10 / 512000.0)
- self.tb.run ()
+ self.tb.run()
# assertions for the first PDU only, second PDU will exist
self.assertEqual(dbg.num_messages(), 2)
- #print "got ", dbg.get_message(0)
- #print "expected", expected_vec
- #print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0)))))
+ # print "got ", dbg.get_message(0)
+ # print "expected", expected_vec
+ # print "len is {}".format(len(pmt.to_python(pmt.cdr(dbg.get_message(0)))))
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec))
- time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time)
+ time_tuple1 = pmt.dict_ref(
+ pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time)
self.tb = None
- def test_007_max_pdu_size_SOBs (self):
+ def test_007_max_pdu_size_SOBs(self):
# two SOB tags exactly max_pdu_size samples apart
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
start_time = 0.1
max_size = 100
- sob_tag = gr.tag_utils.python_to_tag((10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
- sob_tag3 = gr.tag_utils.python_to_tag((10+max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ sob_tag = gr.tag_utils.python_to_tag(
+ (10, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
+ sob_tag3 = gr.tag_utils.python_to_tag(
+ (10 + max_size, pmt.intern("SOB"), pmt.PMT_T, pmt.intern("src")))
vs = blocks.vector_source_s(range(1350), False, 1, [sob_tag, sob_tag3])
- t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern('EOB'), 1024, 512000, ([]), False, 0, start_time)
+ t2p = pdu.tags_to_pdu_s(pmt.intern('SOB'), pmt.intern(
+ 'EOB'), 1024, 512000, ([]), False, 0, start_time)
t2p.set_eob_parameters(10, 0)
t2p.set_max_pdu_size(max_size)
dbg = blocks.message_debug()
self.tb.connect(vs, t2p)
self.tb.msg_connect((t2p, 'pdus'), (dbg, 'store'))
- expected_vec = pmt.init_s16vector((max_size), range(10,10+max_size))
+ expected_vec = pmt.init_s16vector((max_size), range(10, 10 + max_size))
expected_time = start_time + (10 / 512000.0)
- self.tb.run ()
+ self.tb.run()
# assertions for the first PDU only, second PDU will exist
self.assertEqual(dbg.num_messages(), 2)
- #print "got ", dbg.get_message(0)
- #print "expected", expected_vec
+ # print "got ", dbg.get_message(0)
+ # print "expected", expected_vec
self.assertTrue(pmt.equal(pmt.cdr(dbg.get_message(0)), expected_vec))
- time_tuple1 = pmt.dict_ref(pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
- self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(time_tuple1,0)) + pmt.to_double(pmt.tuple_ref(time_tuple1,1)), expected_time)
+ time_tuple1 = pmt.dict_ref(
+ pmt.car(dbg.get_message(0)), pmt.intern("rx_time"), pmt.PMT_NIL)
+ self.assertAlmostEqual(pmt.to_uint64(pmt.tuple_ref(
+ time_tuple1, 0)) + pmt.to_double(pmt.tuple_ref(time_tuple1, 1)), expected_time)
self.tb = None
diff --git a/gr-pdu/python/pdu/qa_take_skip_to_pdu.py b/gr-pdu/python/pdu/qa_take_skip_to_pdu.py
index 76414c857a..bcc33e6f4f 100755
--- a/gr-pdu/python/pdu/qa_take_skip_to_pdu.py
+++ b/gr-pdu/python/pdu/qa_take_skip_to_pdu.py
@@ -14,6 +14,7 @@ import pmt
import time
import numpy
+
class qa_take_skip_to_pdu_X (gr_unittest.TestCase):
# this method is necessary because by default pmt.equal does not evaluate
@@ -34,67 +35,70 @@ class qa_take_skip_to_pdu_X (gr_unittest.TestCase):
print("vectors not equal? " + repr(vec1) + repr(vec2))
self.assertTrue(False)
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_001_f_32 (self):
- self.source = blocks.vector_source_f(range(0,32*3), False, 1, [])
+ def test_001_f_32(self):
+ self.source = blocks.vector_source_f(range(0, 32 * 3), False, 1, [])
self.ts_pdu = pdu.take_skip_to_pdu_f(32, 32)
self.debug = blocks.message_debug()
self.tb.connect((self.source, 0), (self.ts_pdu, 0))
self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store'))
- dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(0))
- vec = pmt.init_f32vector(32, range(0,32))
- expected = pmt.cons(dic,vec)
- self.tb.run ()
+ dic = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ "pdu_num"), pmt.from_uint64(0))
+ vec = pmt.init_f32vector(32, range(0, 32))
+ expected = pmt.cons(dic, vec)
+ self.tb.run()
actual = self.debug.get_message(0)
self.assertEqualPDU(actual, expected)
- def test_002_c_80 (self):
- self.source = blocks.vector_source_c(range(0,32*3), False, 1, [])
+ def test_002_c_80(self):
+ self.source = blocks.vector_source_c(range(0, 32 * 3), False, 1, [])
self.ts_pdu = pdu.take_skip_to_pdu_c(80, 32)
self.debug = blocks.message_debug()
self.tb.connect((self.source, 0), (self.ts_pdu, 0))
self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store'))
- dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(0))
- vec = pmt.init_c32vector(80, range(0,80))
- expected = pmt.cons(dic,vec)
- self.tb.run ()
+ dic = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ "pdu_num"), pmt.from_uint64(0))
+ vec = pmt.init_c32vector(80, range(0, 80))
+ expected = pmt.cons(dic, vec)
+ self.tb.run()
actual = self.debug.get_message(0)
self.assertEqualPDU(actual, expected)
-
- def test_003_s_2_11_7 (self):
- self.source = blocks.vector_source_s(range(0,32*3), False, 1, [])
+ def test_003_s_2_11_7(self):
+ self.source = blocks.vector_source_s(range(0, 32 * 3), False, 1, [])
self.ts_pdu = pdu.take_skip_to_pdu_s(2, 11)
self.debug = blocks.message_debug()
self.tb.connect((self.source, 0), (self.ts_pdu, 0))
self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store'))
- dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(7))
- vec = pmt.init_s16vector(2, list(range(91,93)))
- expected = pmt.cons(dic,vec)
- self.tb.run ()
+ dic = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ "pdu_num"), pmt.from_uint64(7))
+ vec = pmt.init_s16vector(2, list(range(91, 93)))
+ expected = pmt.cons(dic, vec)
+ self.tb.run()
actual = self.debug.get_message(7)
self.assertEqualPDU(actual, expected)
-
- def test_004_b_512 (self):
- self.source = blocks.vector_source_b(list(range(0,256))*4, False, 1, [])
- self.ts_pdu = pdu.take_skip_to_pdu_b(512,1)
+ def test_004_b_512(self):
+ self.source = blocks.vector_source_b(
+ list(range(0, 256)) * 4, False, 1, [])
+ self.ts_pdu = pdu.take_skip_to_pdu_b(512, 1)
self.debug = blocks.message_debug()
self.tb.connect((self.source, 0), (self.ts_pdu, 0))
self.tb.msg_connect((self.ts_pdu, 'pdus'), (self.debug, 'store'))
- dic = pmt.dict_add(pmt.make_dict(), pmt.intern("pdu_num"), pmt.from_uint64(0))
- vec = pmt.init_u8vector(512, list(range(0,256))*2)
- expected = pmt.cons(dic,vec)
- self.tb.run ()
+ dic = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ "pdu_num"), pmt.from_uint64(0))
+ vec = pmt.init_u8vector(512, list(range(0, 256)) * 2)
+ expected = pmt.cons(dic, vec)
+ self.tb.run()
actual = self.debug.get_message(0)
self.assertEqualPDU(actual, expected)
diff --git a/gr-pdu/python/pdu/qa_time_delta.py b/gr-pdu/python/pdu/qa_time_delta.py
index 0cb0f99020..2003419ba7 100755
--- a/gr-pdu/python/pdu/qa_time_delta.py
+++ b/gr-pdu/python/pdu/qa_time_delta.py
@@ -20,7 +20,8 @@ class qa_time_delta(gr_unittest.TestCase):
def setUp(self):
self.tb = gr.top_block()
- self.time_delta = pdu.time_delta(pmt.intern("sys time delta (ms)"), pmt.intern("system_time"))
+ self.time_delta = pdu.time_delta(pmt.intern(
+ "sys time delta (ms)"), pmt.intern("system_time"))
self.debug = blocks.message_debug()
self.tb.msg_connect((self.time_delta, 'pdu'), (self.debug, 'store'))
@@ -30,8 +31,9 @@ class qa_time_delta(gr_unittest.TestCase):
def test_001_invalid_a(self):
self.tb.start()
- self.time_delta.to_basic_block()._post(pmt.intern("pdu"), pmt.intern("NOT A PDU"))
- time.sleep(0.01) # short delay to ensure the message is processed
+ self.time_delta.to_basic_block()._post(
+ pmt.intern("pdu"), pmt.intern("NOT A PDU"))
+ time.sleep(0.01) # short delay to ensure the message is processed
self.tb.stop()
self.tb.wait()
@@ -39,14 +41,16 @@ class qa_time_delta(gr_unittest.TestCase):
self.assertEqual(0, self.debug.num_messages())
def test_001_invalid_b(self):
- in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- meta = pmt.dict_add(pmt.make_dict(), pmt.intern('sam'), pmt.from_double(25.1))
+ in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ meta = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ 'sam'), pmt.from_double(25.1))
in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data))
# set up fg
self.tb.start()
self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu)
- time.sleep(0.01) # short delay to ensure the message is processed
+ time.sleep(0.01) # short delay to ensure the message is processed
self.tb.stop()
self.tb.wait()
@@ -56,19 +60,25 @@ class qa_time_delta(gr_unittest.TestCase):
def test_002_normal(self):
tnow = time.time()
- in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow - 10.0))
+ in_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ meta = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ 'system_time'), pmt.from_double(tnow - 10.0))
in_pdu = pmt.cons(meta, pmt.init_c32vector(len(in_data), in_data))
- e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
- e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern('system_time'), pmt.from_double(tnow))
- e_meta = pmt.dict_add(e_meta, pmt.intern('sys time delta (ms)'), pmt.from_double(10000.0))
+ e_data = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1]
+ e_meta = pmt.dict_add(pmt.make_dict(), pmt.intern(
+ 'system_time'), pmt.from_double(tnow))
+ e_meta = pmt.dict_add(e_meta, pmt.intern(
+ 'sys time delta (ms)'), pmt.from_double(10000.0))
e_pdu = pmt.cons(e_meta, pmt.init_c32vector(len(e_data), e_data))
# set up fg
self.tb.start()
self.time_delta.to_basic_block()._post(pmt.intern("pdu"), in_pdu)
- self.waitFor(lambda: self.debug.num_messages() == 1, timeout=1.0, poll_interval=0.01)
+ self.waitFor(lambda: self.debug.num_messages() ==
+ 1, timeout=1.0, poll_interval=0.01)
self.tb.stop()
self.tb.wait()
@@ -76,7 +86,8 @@ class qa_time_delta(gr_unittest.TestCase):
self.assertEqual(1, self.debug.num_messages())
a_meta = pmt.car(self.debug.get_message(0))
time_tag = pmt.dict_ref(a_meta, pmt.intern("system_time"), pmt.PMT_NIL)
- delta_tag = pmt.dict_ref(a_meta, pmt.intern("sys time delta (ms)"), pmt.PMT_NIL)
+ delta_tag = pmt.dict_ref(a_meta, pmt.intern(
+ "sys time delta (ms)"), pmt.PMT_NIL)
self.assertAlmostEqual(tnow, pmt.to_double(time_tag), delta=60)
self.assertAlmostEqual(10000, pmt.to_double(delta_tag), delta=10)