summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_header_payload_demux.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rw-r--r--gr-digital/python/digital/qa_header_payload_demux.py209
1 files changed, 126 insertions, 83 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py
index edee14c0c1..76be0905f0 100644
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -33,6 +33,7 @@ class HeaderToMessageBlock(gr.sync_block):
Helps with testing the HPD. Receives a header, stores it, posts
a predetermined message.
"""
+
def __init__(self, itemsize, header_len, messages):
gr.sync_block.__init__(
self,
@@ -57,12 +58,12 @@ class HeaderToMessageBlock(gr.sync_block):
class qa_header_payload_demux (gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
"""Runs before every test."""
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
random.seed(0)
- def tearDown (self):
+ def tearDown(self):
"""Runs after every test."""
self.tb = None
@@ -85,11 +86,17 @@ class qa_header_payload_demux (gr_unittest.TestCase):
)
self.tb.connect((hpd, 1), payload_sink)
- def run_tb(self, payload_sink, payload_len, header_sink, header_len, timeout=30):
+ def run_tb(
+ self,
+ payload_sink,
+ payload_len,
+ header_sink,
+ header_len,
+ timeout=30):
"""Execute self.tb"""
stop_time = time.time() + timeout
self.tb.start()
- while (len(payload_sink.data()) < payload_len or \
+ while (len(payload_sink.data()) < payload_len or
len(header_sink.data()) < header_len) and \
time.time() < stop_time:
time.sleep(.2)
@@ -106,7 +113,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
header = (1, 2, 3)
payload = tuple(range(5, 20))
data_signal = (0,) * n_zeros + header + payload
- trigger_signal = [0,] * len(data_signal)
+ trigger_signal = [0, ] * len(data_signal)
trigger_signal[n_zeros] = 1
# This is dropped:
testtag1 = make_tag('tag1', 0, 0)
@@ -130,7 +137,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
len(header),
[len(payload)]
)
- self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you
+ # extra system port defined for you
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 2)
payload_sink = blocks.vector_sink_f()
header_sink = blocks.vector_sink_f()
self.connect_all_blocks(
@@ -188,7 +196,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
)
- self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you
+ # extra system port defined for you
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 2)
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
mock_header_demod = HeaderToMessageBlock(
@@ -196,7 +205,13 @@ class qa_header_payload_demux (gr_unittest.TestCase):
len(header),
[len(payload)]
)
- self.connect_all_blocks(data_src, None, hpd, mock_header_demod, payload_sink, header_sink)
+ self.connect_all_blocks(
+ data_src,
+ None,
+ hpd,
+ mock_header_demod,
+ payload_sink,
+ header_sink)
self.run_tb(payload_sink, len(payload), header_sink, len(header))
# Check results
self.assertEqual(header_sink.data(), list(header))
@@ -220,13 +235,13 @@ class qa_header_payload_demux (gr_unittest.TestCase):
]
self.assertEqual(expected_tags_payload, ptags_payload)
- def test_001_headerpadding (self):
+ def test_001_headerpadding(self):
""" Like test 1, but with header padding. """
n_zeros = 3
header = [1, 2, 3]
header_padding = 1
payload = list(range(5, 20))
- data_signal = [0,] * n_zeros + header + payload
+ data_signal = [0, ] * n_zeros + header + payload
trigger_signal = [0] * len(data_signal)
trigger_signal[n_zeros] = 1
# This is dropped:
@@ -245,15 +260,15 @@ class qa_header_payload_demux (gr_unittest.TestCase):
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
len(header),
- 1, # Items per symbol
- 0, # Guard interval
- "frame_len", # TSB tag key
- "detect", # Trigger tag key
- False, # No symbols please
- gr.sizeof_float, # Item size
- "", # Timing tag key
- 1.0, # Samp rate
- (), # No special tags
+ 1, # Items per symbol
+ 0, # Guard interval
+ "frame_len", # TSB tag key
+ "detect", # Trigger tag key
+ False, # No symbols please
+ gr.sizeof_float, # Item size
+ "", # Timing tag key
+ 1.0, # Samp rate
+ (), # No special tags
header_padding
)
mock_header_demod = HeaderToMessageBlock(
@@ -263,11 +278,17 @@ class qa_header_payload_demux (gr_unittest.TestCase):
)
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
- self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink)
- self.run_tb(payload_sink, len(payload), header_sink, len(header)+2)
+ self.connect_all_blocks(
+ data_src,
+ trigger_src,
+ hpd,
+ mock_header_demod,
+ payload_sink,
+ header_sink)
+ self.run_tb(payload_sink, len(payload), header_sink, len(header) + 2)
# Check values
# Header now is padded:
- self.assertEqual(header_sink.data(), [0,] + header + [payload[0],])
+ self.assertEqual(header_sink.data(), [0, ] + header + [payload[0], ])
self.assertEqual(payload_sink.data(), payload)
ptags_header = []
for tag in header_sink.tags():
@@ -288,14 +309,14 @@ class qa_header_payload_demux (gr_unittest.TestCase):
]
self.assertEqual(expected_tags_payload, ptags_payload)
- def test_001_headerpadding_payload_offset (self):
+ def test_001_headerpadding_payload_offset(self):
""" Like test 1, but with header padding + payload offset. """
n_zeros = 3
header = [1, 2, 3]
header_padding = 1
payload_offset = -1
payload = list(range(5, 20))
- data_signal = [0,] * n_zeros + header + payload + [0,] * 100
+ data_signal = [0, ] * n_zeros + header + payload + [0, ] * 100
trigger_signal = [0] * len(data_signal)
trigger_signal[n_zeros] = 1
# This goes on output 1, item 3 + 1 (for payload offset)
@@ -308,41 +329,38 @@ class qa_header_payload_demux (gr_unittest.TestCase):
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
len(header),
- 1, # Items per symbol
- 0, # Guard interval
- "frame_len", # TSB tag key
- "detect", # Trigger tag key
- False, # No symbols please
- gr.sizeof_float, # Item size
- "", # Timing tag key
- 1.0, # Samp rate
- (), # No special tags
+ 1, # Items per symbol
+ 0, # Guard interval
+ "frame_len", # TSB tag key
+ "detect", # Trigger tag key
+ False, # No symbols please
+ gr.sizeof_float, # Item size
+ "", # Timing tag key
+ 1.0, # Samp rate
+ (), # No special tags
header_padding
)
- self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you
+ # extra system port defined for you
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 2)
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
- self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(data_src, (hpd, 0))
self.tb.connect(trigger_src, (hpd, 1))
self.tb.connect((hpd, 0), header_sink)
self.tb.connect((hpd, 1), payload_sink)
self.tb.start()
- time.sleep(.2) # Need this, otherwise, the next message is ignored
- hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset})
- )
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
+ hpd.to_basic_block()._post(pmt.intern('header_data'), pmt.to_pmt(
+ {'frame_len': len(payload), 'payload_offset': payload_offset}))
while len(payload_sink.data()) < len(payload):
time.sleep(.2)
self.tb.stop()
self.tb.wait()
# Header is now padded:
- self.assertEqual(header_sink.data(), [0,] + header + [payload[0],])
+ self.assertEqual(header_sink.data(), [0, ] + header + [payload[0], ])
# Payload is now offset:
- self.assertEqual(
- payload_sink.data(),
- data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)]
- )
+ self.assertEqual(payload_sink.data(), data_signal[n_zeros + len(
+ header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)])
ptags_payload = {}
for tag in payload_sink.tags():
ptag = gr.tag_to_python(tag)
@@ -354,7 +372,6 @@ class qa_header_payload_demux (gr_unittest.TestCase):
}
self.assertEqual(expected_tags_payload, ptags_payload)
-
def test_002_symbols(self):
"""
Same as before, but operate on symbols
@@ -365,8 +382,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
n_symbols = 4
header = (1, 2, 3)
payload = (1, 2, 3)
- data_signal = (0,) * n_zeros + (0,) + header + ((0,) + payload) * n_symbols
- trigger_signal = [0,] * len(data_signal)
+ data_signal = (0,) * n_zeros + (0,) + header + \
+ ((0,) + payload) * n_symbols
+ trigger_signal = [0, ] * len(data_signal)
trigger_signal[n_zeros] = 1
# This is dropped:
testtag1 = make_tag('tag1', 0, 0)
@@ -375,11 +393,14 @@ class qa_header_payload_demux (gr_unittest.TestCase):
# This goes on output 0, item 0 (middle of the header symbol)
testtag3 = make_tag('tag3', 42, n_zeros + gi + 1)
# This goes on output 1, item 1 (middle of the first payload symbol)
- testtag4 = make_tag('tag4', 314, n_zeros + (gi + items_per_symbol) * 2 + 1)
- data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4))
+ testtag4 = make_tag('tag4', 314, n_zeros +
+ (gi + items_per_symbol) * 2 + 1)
+ data_src = blocks.vector_source_f(
+ data_signal, False, tags=(
+ testtag1, testtag2, testtag3, testtag4))
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
- len(header) // items_per_symbol, # Header length (in symbols)
+ len(header) // items_per_symbol, # Header length (in symbols)
items_per_symbol, # Items per symbols
gi, # Items per guard time
"frame_len", # Frame length tag key
@@ -387,15 +408,16 @@ class qa_header_payload_demux (gr_unittest.TestCase):
True, # Output symbols (not items)
gr.sizeof_float # Bytes per item
)
- self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you
+ # extra system port defined for you
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 2)
header_sink = blocks.vector_sink_f(items_per_symbol)
payload_sink = blocks.vector_sink_f(items_per_symbol)
- self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(data_src, (hpd, 0))
self.tb.connect(trigger_src, (hpd, 1))
self.tb.connect((hpd, 0), header_sink)
self.tb.connect((hpd, 1), payload_sink)
self.tb.start()
- time.sleep(.2) # Need this, otherwise, the next message is ignored
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
hpd.to_basic_block()._post(
pmt.intern('header_data'),
pmt.from_long(n_symbols)
@@ -404,7 +426,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
time.sleep(.2)
self.tb.stop()
self.tb.wait()
- self.assertEqual(header_sink.data(), header)
+ self.assertEqual(header_sink.data(), header)
self.assertEqual(payload_sink.data(), payload * n_symbols)
ptags_header = []
for tag in header_sink.tags():
@@ -429,32 +451,32 @@ class qa_header_payload_demux (gr_unittest.TestCase):
"""
Like test 1, but twice, plus one fail
"""
- ### Tx Data
+ # Tx Data
n_zeros = 5
header = [1, 2, 3]
- header_fail = [-1, -2, -4] # Contents don't really matter
+ header_fail = [-1, -2, -4] # Contents don't really matter
payload1 = list(range(5, 20))
- payload2 = [42,]
+ payload2 = [42, ]
sampling_rate = 2
- data_signal = [0,] * n_zeros + header + payload1
- trigger_signal = [0,] * len(data_signal) * 2
+ data_signal = [0, ] * n_zeros + header + payload1
+ trigger_signal = [0, ] * len(data_signal) * 2
trigger_signal[n_zeros] = 1
trigger_signal[len(data_signal)] = 1
- trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1
+ trigger_signal[len(data_signal) + len(header_fail) + n_zeros] = 1
print("Triggers at: {0} {1} {2}".format(
n_zeros,
len(data_signal),
- len(data_signal)+len(header_fail)+n_zeros))
+ len(data_signal) + len(header_fail) + n_zeros))
tx_signal = data_signal + \
- header_fail + [0,] * n_zeros + \
- header + payload2 + [0,] * 1000
+ header_fail + [0, ] * n_zeros + \
+ header + payload2 + [0, ] * 1000
# Timing tag: This is preserved and updated:
timing_tag = make_tag('rx_time', (0, 0), 0)
# Rx freq tags:
rx_freq_tag1 = make_tag('rx_freq', 1.0, 0)
rx_freq_tag2 = make_tag('rx_freq', 1.5, 29)
rx_freq_tag3 = make_tag('rx_freq', 2.0, 30)
- ### Flow graph
+ # Flow graph
data_src = blocks.vector_source_f(
tx_signal, False,
tags=(timing_tag, rx_freq_tag1, rx_freq_tag2, rx_freq_tag3)
@@ -472,15 +494,16 @@ class qa_header_payload_demux (gr_unittest.TestCase):
samp_rate=sampling_rate,
special_tags=('rx_freq',),
)
- self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you
+ # extra system port defined for you
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 2)
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
- self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(data_src, (hpd, 0))
self.tb.connect(trigger_src, (hpd, 1))
self.tb.connect((hpd, 0), header_sink)
self.tb.connect((hpd, 1), payload_sink)
self.tb.start()
- time.sleep(.2) # Need this, otherwise, the next message is ignored
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
hpd.to_basic_block()._post(
pmt.intern('header_data'),
pmt.from_long(len(payload1))
@@ -511,23 +534,29 @@ class qa_header_payload_demux (gr_unittest.TestCase):
# 31: header 3
# 34: payload 2 (length 1)
# 35: 1000 zeros
- self.assertEqual(header_sink.data(), list(header + header_fail + header))
+ self.assertEqual(
+ header_sink.data(), list(
+ header + header_fail + header))
self.assertEqual(payload_sink.data(), payload1 + payload2)
tags_payload = [gr.tag_to_python(x) for x in payload_sink.tags()]
- tags_payload = sorted([(x.offset, x.key, x.value) for x in tags_payload])
+ tags_payload = sorted([(x.offset, x.key, x.value)
+ for x in tags_payload])
tags_expected_payload = [
- (0, 'frame_len', len(payload1)),
+ (0, 'frame_len', len(payload1)),
(len(payload1), 'frame_len', len(payload2)),
]
tags_header = [gr.tag_to_python(x) for x in header_sink.tags()]
tags_header = sorted([(x.offset, x.key, x.value) for x in tags_header])
tags_expected_header = [
- (0, 'rx_freq', 1.0),
- (0, 'rx_time', (2, 0.5)), # Hard coded time value :( Is n_zeros/sampling_rate
- (len(header), 'rx_freq', 1.0),
- (len(header), 'rx_time', (11, .5)), # Hard coded time value :(. See above.
- (2*len(header), 'rx_freq', 2.0),
- (2*len(header), 'rx_time', (15, .5)), # Hard coded time value :(. See above.
+ (0, 'rx_freq', 1.0),
+ # Hard coded time value :( Is n_zeros/sampling_rate
+ (0, 'rx_time', (2, 0.5)),
+ (len(header), 'rx_freq', 1.0),
+ # Hard coded time value :(. See above.
+ (len(header), 'rx_time', (11, .5)),
+ (2 * len(header), 'rx_freq', 2.0),
+ # Hard coded time value :(. See above.
+ (2 * len(header), 'rx_time', (15, .5)),
]
self.assertEqual(tags_header, tags_expected_header)
self.assertEqual(tags_payload, tags_expected_payload)
@@ -561,6 +590,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
signal += [2] * burst_size
burst_sizes += [burst_size]
return (signal, indexes, total_payload_len, burst_sizes)
+
def indexes_to_triggers(indexes, signal_len):
"""
Convert indexes to a mix of trigger signals and tags
@@ -588,7 +618,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
signal, indexes, total_payload_len, burst_sizes = create_signal(
n_bursts, header_len, max_gap, max_burstsize, fail_rate
)
- trigger_signal, trigger_tags = indexes_to_triggers(indexes, len(signal))
+ trigger_signal, trigger_tags = indexes_to_triggers(
+ indexes, len(signal))
# Flow graph
data_src = blocks.vector_source_f(
signal, False,
@@ -614,10 +645,22 @@ class qa_header_payload_demux (gr_unittest.TestCase):
)
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
- self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink)
- self.run_tb(payload_sink, total_payload_len, header_sink, header_len*n_bursts)
- self.assertEqual(header_sink.data(), list([1]*header_len*n_bursts))
- self.assertEqual(payload_sink.data(), list([2]*total_payload_len))
+ self.connect_all_blocks(
+ data_src,
+ trigger_src,
+ hpd,
+ mock_header_demod,
+ payload_sink,
+ header_sink)
+ self.run_tb(
+ payload_sink,
+ total_payload_len,
+ header_sink,
+ header_len *
+ n_bursts)
+ self.assertEqual(header_sink.data(), list([1] * header_len * n_bursts))
+ self.assertEqual(payload_sink.data(), list([2] * total_payload_len))
+
if __name__ == '__main__':
gr_unittest.run(qa_header_payload_demux)