#!/usr/bin/env python # Copyright 2012-2016,2018 Free Software Foundation, Inc. # # This file is part of GNU Radio # # SPDX-License-Identifier: GPL-3.0-or-later # # import time import random import numpy from gnuradio import gr from gnuradio import gr_unittest from gnuradio import digital from gnuradio import blocks import pmt def make_tag(key, value, offset): """Create a gr.tag_t() from key, value, offset.""" tag = gr.tag_t() tag.offset = offset tag.key = pmt.string_to_symbol(key) tag.value = pmt.to_pmt(value) return tag class HeaderToMessageBlock(gr.sync_block): """ Helps with testing the HPD. Receives a header, stores it, posts a predetermined message. forecast() is not currently working in Python, so use a local buffer to accumulate header data. """ def __init__(self, itemsize, header_len, messages): gr.sync_block.__init__( self, name="HeaderToMessageBlock", in_sig=[itemsize], out_sig=[itemsize], ) self.header_len = header_len self.message_port_register_out(pmt.intern('header_data')) self.messages = messages self.msg_count = 0 self.buf = [] def work(self, input_items, output_items): """Where the magic happens.""" self.buf.extend(input_items[0]) for _ in range(len(self.buf) // self.header_len): msg = self.messages[self.msg_count] or False self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg)) self.msg_count += 1 del self.buf[:self.header_len] output_items[0][:] = input_items[0][:] return len(input_items[0]) class qa_header_payload_demux (gr_unittest.TestCase): def setUp(self): """Runs before every test.""" self.tb = gr.top_block() random.seed(0) def tearDown(self): """Runs after every test.""" self.tb = None def connect_all_blocks(self, data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink): """ Connect the standard HPD test flowgraph """ self.tb.connect(data_src, (hpd, 0)) if trigger_src is not None: self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), mock_header_demod) self.tb.connect(mock_header_demod, header_sink) self.tb.msg_connect( mock_header_demod, 'header_data', hpd, 'header_data' ) self.tb.connect((hpd, 1), payload_sink) def run_tb( self, payload_sink, payload_len, header_sink, header_len, timeout=30): """Execute self.tb""" stop_time = time.time() + timeout self.tb.start() while (len(payload_sink.data()) < payload_len or len(header_sink.data()) < header_len) and \ time.time() < stop_time: time.sleep(.2) self.tb.stop() self.tb.wait() def test_001_t(self): """ Simplest possible test: put in zeros, then header, then payload, trigger signal, try to demux. The return signal from the header parser is faked via _post() Add in some tags for fun. """ n_zeros = 1 header = (1, 2, 3) payload = tuple(range(5, 20)) data_signal = (0,) * n_zeros + header + payload trigger_signal = [0, ] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0: testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 2: testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) # This goes on output 1, item 3: testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4) ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float ) mock_header_demod = HeaderToMessageBlock( numpy.float32, len(header), [len(payload)] ) # extra system port defined for you self.assertEqual(pmt.length(hpd.message_ports_in()), 2) payload_sink = blocks.vector_sink_f() header_sink = blocks.vector_sink_f() self.connect_all_blocks( data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) self.run_tb(payload_sink, len(payload), header_sink, len(header)) self.assertEqual(header_sink.data(), list(header)) self.assertEqual(payload_sink.data(), list(payload)) ptags_header = [] for tag in header_sink.tags(): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ {'key': 'tag2', 'offset': 0}, {'key': 'tag3', 'offset': 2}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] for tag in payload_sink.tags(): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ {'key': 'frame_len', 'offset': 0}, {'key': 'tag4', 'offset': 3}, ] self.assertEqual(expected_tags_payload, ptags_payload) def test_001_t_tags(self): """ Like the previous test, but use a trigger tag instead of a trigger signal. """ n_zeros = 1 header = (1, 2, 3) payload = tuple(range(5, 20)) data_signal = (0,) * n_zeros + header + payload # Trigger tag trigger_tag = make_tag('detect', True, n_zeros) # This is dropped: testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0: testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 2: testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) # This goes on output 1, item 3: testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, tags=(trigger_tag, testtag1, testtag2, testtag3, testtag4) ) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float ) # extra system port defined for you self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() mock_header_demod = HeaderToMessageBlock( numpy.float32, len(header), [len(payload)] ) self.connect_all_blocks( data_src, None, hpd, mock_header_demod, payload_sink, header_sink) self.run_tb(payload_sink, len(payload), header_sink, len(header)) # Check results self.assertEqual(header_sink.data(), list(header)) self.assertEqual(payload_sink.data(), list(payload)) ptags_header = [] for tag in header_sink.tags(): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ {'key': 'tag2', 'offset': 0}, {'key': 'tag3', 'offset': 2}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] for tag in payload_sink.tags(): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ {'key': 'frame_len', 'offset': 0}, {'key': 'tag4', 'offset': 3}, ] self.assertEqual(expected_tags_payload, ptags_payload) def test_001_headerpadding(self): """ Like test 1, but with header padding. """ n_zeros = 3 header = [1, 2, 3] header_padding = 1 payload = list(range(5, 20)) data_signal = [0, ] * n_zeros + header + payload trigger_signal = [0] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0: testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 2: testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) # This goes on output 1, item 3: testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4) ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, # Items per symbol 0, # Guard interval "frame_len", # TSB tag key "detect", # Trigger tag key False, # No symbols please gr.sizeof_float, # Item size "", # Timing tag key 1.0, # Samp rate (), # No special tags header_padding ) mock_header_demod = HeaderToMessageBlock( numpy.float32, len(header), [len(payload)] ) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() self.connect_all_blocks( data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) self.run_tb(payload_sink, len(payload), header_sink, len(header) + 2) # Check values # Header now is padded: self.assertEqual(header_sink.data(), [0, ] + header + [payload[0], ]) self.assertEqual(payload_sink.data(), payload) ptags_header = [] for tag in header_sink.tags(): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ {'key': 'tag2', 'offset': 1}, {'key': 'tag3', 'offset': 3}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] for tag in payload_sink.tags(): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ {'key': 'frame_len', 'offset': 0}, {'key': 'tag4', 'offset': 3}, ] self.assertEqual(expected_tags_payload, ptags_payload) def test_001_headerpadding_payload_offset(self): """ Like test 1, but with header padding + payload offset. """ n_zeros = 3 header = [1, 2, 3] header_padding = 1 payload_offset = -1 payload = list(range(5, 20)) data_signal = [0, ] * n_zeros + header + payload + [0, ] * 100 trigger_signal = [0] * len(data_signal) trigger_signal[n_zeros] = 1 # This goes on output 1, item 3 + 1 (for payload offset) testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, tags=(testtag4,) ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, # Items per symbol 0, # Guard interval "frame_len", # TSB tag key "detect", # Trigger tag key False, # No symbols please gr.sizeof_float, # Item size "", # Timing tag key 1.0, # Samp rate (), # No special tags header_padding ) # extra system port defined for you self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() self.tb.connect(data_src, (hpd, 0)) self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), header_sink) self.tb.connect((hpd, 1), payload_sink) self.tb.start() time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post(pmt.intern('header_data'), pmt.to_pmt( {'frame_len': len(payload), 'payload_offset': payload_offset})) while len(payload_sink.data()) < len(payload): time.sleep(.2) self.tb.stop() self.tb.wait() # Header is now padded: self.assertEqual(header_sink.data(), [0, ] + header + [payload[0], ]) # Payload is now offset: self.assertEqual(payload_sink.data(), data_signal[n_zeros + len( header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)]) ptags_payload = {} for tag in payload_sink.tags(): ptag = gr.tag_to_python(tag) ptags_payload[ptag.key] = ptag.offset expected_tags_payload = { 'frame_len': 0, 'payload_offset': 0, 'tag4': 3 - payload_offset, } self.assertEqual(expected_tags_payload, ptags_payload) def test_002_symbols(self): """ Same as before, but operate on symbols """ n_zeros = 1 items_per_symbol = 3 gi = 1 n_symbols = 4 header = (1, 2, 3) payload = (1, 2, 3) data_signal = (0,) * n_zeros + (0,) + header + \ ((0,) + payload) * n_symbols trigger_signal = [0, ] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0 (from the GI) testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 0 (middle of the header symbol) testtag3 = make_tag('tag3', 42, n_zeros + gi + 1) # This goes on output 1, item 1 (middle of the first payload symbol) testtag4 = make_tag('tag4', 314, n_zeros + (gi + items_per_symbol) * 2 + 1) data_src = blocks.vector_source_f( data_signal, False, tags=( testtag1, testtag2, testtag3, testtag4)) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header) // items_per_symbol, # Header length (in symbols) items_per_symbol, # Items per symbols gi, # Items per guard time "frame_len", # Frame length tag key "detect", # Trigger tag key True, # Output symbols (not items) gr.sizeof_float # Bytes per item ) # extra system port defined for you self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f(items_per_symbol) payload_sink = blocks.vector_sink_f(items_per_symbol) self.tb.connect(data_src, (hpd, 0)) self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), header_sink) self.tb.connect((hpd, 1), payload_sink) self.tb.start() time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post( pmt.intern('header_data'), pmt.from_long(n_symbols) ) while len(payload_sink.data()) < len(payload) * n_symbols: time.sleep(.2) self.tb.stop() self.tb.wait() self.assertEqual(header_sink.data(), list(header)) self.assertEqual(payload_sink.data(), list(payload * n_symbols)) ptags_header = [] for tag in header_sink.tags(): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ {'key': 'tag2', 'offset': 0}, {'key': 'tag3', 'offset': 0}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] for tag in payload_sink.tags(): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ {'key': 'frame_len', 'offset': 0}, {'key': 'tag4', 'offset': 1}, ] self.assertEqual(expected_tags_payload, ptags_payload) def test_003_t(self): """ Like test 1, but twice, plus one fail """ # Tx Data n_zeros = 5 header = [1, 2, 3] header_fail = [-1, -2, -4] # Contents don't really matter payload1 = list(range(5, 20)) payload2 = [42, ] sampling_rate = 2 data_signal = [0, ] * n_zeros + header + payload1 trigger_signal = [0, ] * len(data_signal) * 2 trigger_signal[n_zeros] = 1 trigger_signal[len(data_signal)] = 1 trigger_signal[len(data_signal) + len(header_fail) + n_zeros] = 1 print("Triggers at: {0} {1} {2}".format( n_zeros, len(data_signal), len(data_signal) + len(header_fail) + n_zeros)) tx_signal = data_signal + \ header_fail + [0, ] * n_zeros + \ header + payload2 + [0, ] * 1000 # Timing tag: This is preserved and updated: timing_tag = make_tag('rx_time', (0, 0), 0) # Rx freq tags: rx_freq_tag1 = make_tag('rx_freq', 1.0, 0) rx_freq_tag2 = make_tag('rx_freq', 1.5, 29) rx_freq_tag3 = make_tag('rx_freq', 2.0, 30) # Flow graph data_src = blocks.vector_source_f( tx_signal, False, tags=(timing_tag, rx_freq_tag1, rx_freq_tag2, rx_freq_tag3) ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( header_len=len(header), items_per_symbol=1, guard_interval=0, length_tag_key="frame_len", trigger_tag_key="detect", output_symbols=False, itemsize=gr.sizeof_float, timing_tag_key='rx_time', samp_rate=sampling_rate, special_tags=('rx_freq',), ) # extra system port defined for you self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() self.tb.connect(data_src, (hpd, 0)) self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), header_sink) self.tb.connect((hpd, 1), payload_sink) self.tb.start() time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post( pmt.intern('header_data'), pmt.from_long(len(payload1)) ) while len(payload_sink.data()) < len(payload1): time.sleep(.2) hpd.to_basic_block()._post( pmt.intern('header_data'), pmt.PMT_F ) # This next command is a bit of a showstopper, but there's no condition to check upon # to see if the previous msg handling is finished time.sleep(.7) hpd.to_basic_block()._post( pmt.intern('header_data'), pmt.from_long(len(payload2)) ) while len(payload_sink.data()) < len(payload1) + len(payload2): time.sleep(.2) self.tb.stop() self.tb.wait() # Signal description: # 0: 5 zeros # 5: header 1 # 8: payload 1 (length: 15) # 23: header 2 (fail) # 26: 5 zeros # 31: header 3 # 34: payload 2 (length 1) # 35: 1000 zeros self.assertEqual( header_sink.data(), list( header + header_fail + header)) self.assertEqual(payload_sink.data(), payload1 + payload2) tags_payload = [gr.tag_to_python(x) for x in payload_sink.tags()] tags_payload = sorted([(x.offset, x.key, x.value) for x in tags_payload]) tags_expected_payload = [ (0, 'frame_len', len(payload1)), (len(payload1), 'frame_len', len(payload2)), ] tags_header = [gr.tag_to_python(x) for x in header_sink.tags()] tags_header = sorted([(x.offset, x.key, x.value) for x in tags_header]) tags_expected_header = [ (0, 'rx_freq', 1.0), # Hard coded time value :( Is n_zeros/sampling_rate (0, 'rx_time', (2, 0.5)), (len(header), 'rx_freq', 1.0), # Hard coded time value :(. See above. (len(header), 'rx_time', (11, .5)), (2 * len(header), 'rx_freq', 2.0), # Hard coded time value :(. See above. (2 * len(header), 'rx_time', (15, .5)), ] self.assertEqual(tags_header, tags_expected_header) self.assertEqual(tags_payload, tags_expected_payload) def test_004_fuzz(self): """ Long random test """ def create_signal( n_bursts, header_len, max_gap, max_burstsize, fail_rate, ): signal = [] indexes = [] burst_sizes = [] total_payload_len = 0 for _ in range(n_bursts): gap_size = random.randint(0, max_gap) signal += [0] * gap_size is_failure = random.random() < fail_rate if not is_failure: burst_size = random.randint(0, max_burstsize) else: burst_size = 0 total_payload_len += burst_size indexes += [len(signal)] signal += [1] * header_len signal += [2] * burst_size burst_sizes += [burst_size] return (signal, indexes, total_payload_len, burst_sizes) def indexes_to_triggers(indexes, signal_len): """ Convert indexes to a mix of trigger signals and tags """ trigger_signal = [0] * signal_len trigger_tags = [] for index in indexes: if random.random() > 0.5: trigger_signal[index] = 1 else: trigger_tags += [make_tag('detect', True, index)] return (trigger_signal, trigger_tags) ### Go, go, go # Uncomment this if you want true randomness -- good for actual fuzzing # The divide-by-20 means we'll usually get the same random seed # between the first run and the XML run. # random_seed = int(time.time() / 20) # random.seed(random_seed) # print("Random seed: {0}".format(random_seed)) n_bursts = 400 header_len = 5 max_gap = 50 max_burstsize = 100 fail_rate = 0.05 signal, indexes, total_payload_len, burst_sizes = create_signal( n_bursts, header_len, max_gap, max_burstsize, fail_rate ) trigger_signal, trigger_tags = indexes_to_triggers( indexes, len(signal)) # Flow graph data_src = blocks.vector_source_f( signal, False, tags=trigger_tags ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( header_len=header_len, items_per_symbol=1, guard_interval=0, length_tag_key="frame_len", trigger_tag_key="detect", output_symbols=False, itemsize=gr.sizeof_float, timing_tag_key='rx_time', samp_rate=1.0, special_tags=('rx_freq',), ) mock_header_demod = HeaderToMessageBlock( numpy.float32, header_len, burst_sizes ) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() self.connect_all_blocks( data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) self.run_tb( payload_sink, total_payload_len, header_sink, header_len * n_bursts) self.assertEqual(header_sink.data(), list([1] * header_len * n_bursts)) self.assertEqual(payload_sink.data(), list([2] * total_payload_len)) if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux)