diff options
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_txrx.py')
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_txrx.py | 120 |
1 files changed, 99 insertions, 21 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_txrx.py b/gr-digital/python/digital/qa_ofdm_txrx.py index 681041d47d..adf93ee356 100755 --- a/gr-digital/python/digital/qa_ofdm_txrx.py +++ b/gr-digital/python/digital/qa_ofdm_txrx.py @@ -21,12 +21,44 @@ # import random - import numpy -from gnuradio import gr, gr_unittest, digital, blocks + +from gnuradio import gr, gr_unittest, digital, blocks, channels from gnuradio.digital.ofdm_txrx import ofdm_tx, ofdm_rx from gnuradio.digital.utils import tagged_streams +# Set this to true if you need to write out data +LOG_DEBUG_INFO=False + +class ofdm_tx_fg (gr.top_block): + def __init__(self, data, len_tag_key): + gr.top_block.__init__(self, "ofdm_tx") + tx_data, tags = tagged_streams.packets_to_vectors((data,), len_tag_key) + src = blocks.vector_source_b(data, False, 1, tags) + self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO) + self.sink = blocks.vector_sink_c() + self.connect(src, self.tx, self.sink) + + def get_tx_samples(self): + return self.sink.data() + +class ofdm_rx_fg (gr.top_block): + def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100): + gr.top_block.__init__(self, "ofdm_rx") + if prepend_zeros: + samples = (0,) * prepend_zeros + tuple(samples) + src = blocks.vector_source_c(tuple(samples) + (0,) * 1000) + self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO) + if channel is not None: + self.connect(src, channel, self.rx) + else: + self.connect(src, self.rx) + self.sink = blocks.vector_sink_b() + self.connect(self.rx, self.sink) + + def get_rx_bytes(self): + return self.sink.data() + class test_ofdm_txrx (gr_unittest.TestCase): def setUp (self): @@ -35,26 +67,72 @@ class test_ofdm_txrx (gr_unittest.TestCase): def tearDown (self): self.tb = None - def test_001 (self): - pass - #len_tag_key = 'frame_len' - #n_bytes = 100 - #test_data = [random.randint(0, 255) for x in range(n_bytes)] - #tx_data, tags = tagged_streams.packets_to_vectors((test_data,), len_tag_key) - #src = blocks.vector_source_b(test_data, False, 1, tags) - #tx = ofdm_tx(frame_length_tag_key=len_tag_key) - #rx = ofdm_rx(frame_length_tag_key=len_tag_key) - #self.assertEqual(tx.sync_word1, rx.sync_word1) - #self.assertEqual(tx.sync_word2, rx.sync_word2) - #delay = blocks.delay(gr.sizeof_gr_complex, 100) - #noise = analog.noise_source_c(analog.GR_GAUSSIAN, 0.05) - #add = blocks.add_cc() - #sink = blocks.vector_sink_b() - ##self.tb.connect(src, tx, add, rx, sink) - ##self.tb.connect(noise, (add, 1)) - #self.tb.connect(src, tx, blocks.null_sink(gr.sizeof_gr_complex)) - #self.tb.run() + def test_001_tx (self): + """ Just make sure the Tx works in general """ + len_tag_key = 'frame_len' + n_bytes = 52 + n_samples_expected = (numpy.ceil(1.0 * (n_bytes + 4) / 6) + 3) * 80 + test_data = [random.randint(0, 255) for x in range(n_bytes)] + tx_data, tags = tagged_streams.packets_to_vectors((test_data,), len_tag_key) + src = blocks.vector_source_b(test_data, False, 1, tags) + tx = ofdm_tx(packet_length_tag_key=len_tag_key) + tx_fg = ofdm_tx_fg(test_data, len_tag_key) + tx_fg.run() + self.assertEqual(len(tx_fg.get_tx_samples()), n_samples_expected) + + def test_002_rx_only_noise(self): + """ Run the RX with only noise, check it doesn't crash + or return a burst. """ + len_tag_key = 'frame_len' + samples = (0,) * 1000 + channel = channels.channel_model(0.1) + rx_fg = ofdm_rx_fg(samples, len_tag_key, channel) + rx_fg.run() + self.assertEqual(len(rx_fg.get_rx_bytes()), 0) + + def test_003_tx1packet(self): + """ Transmit one packet, with slight AWGN and slight frequency + timing offset. + Check packet is received and no bit errors have occurred. """ + len_tag_key = 'frame_len' + n_bytes = 21 + fft_len = 64 + test_data = tuple([random.randint(0, 255) for x in range(n_bytes)]) + # 1.0/fft_len is one sub-carrier, a fine freq offset stays below that + freq_offset = 1.0 / fft_len * 0.7 + #channel = channels.channel_model(0.01, freq_offset) + channel = None + # Tx + tx_fg = ofdm_tx_fg(test_data, len_tag_key) + tx_fg.run() + tx_samples = tx_fg.get_tx_samples() + # Rx + rx_fg = ofdm_rx_fg(tx_samples, len_tag_key, channel, prepend_zeros=100) + rx_fg.run() + rx_data = rx_fg.get_rx_bytes() + self.assertEqual(tuple(tx_fg.tx.sync_word1), tuple(rx_fg.rx.sync_word1)) + self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2)) + self.assertEqual(test_data, rx_data) + def test_004_tx1packet_large_fO(self): + """ Transmit one packet, with slight AWGN and large frequency offset. + Check packet is received and no bit errors have occurred. """ + fft_len = 64 + len_tag_key = 'frame_len' + n_bytes = 21 + test_data = tuple([random.randint(0, 255) for x in range(n_bytes)]) + #test_data = tuple([255 for x in range(n_bytes)]) + # 1.0/fft_len is one sub-carrier + frequency_offset = 1.0 / fft_len * 2.5 + channel = channels.channel_model(0.00001, frequency_offset) + # Tx + tx_fg = ofdm_tx_fg(test_data, len_tag_key) + tx_fg.run() + tx_samples = tx_fg.get_tx_samples() + # Rx + rx_fg = ofdm_rx_fg(tx_samples, len_tag_key, channel, prepend_zeros=100) + rx_fg.run() + rx_data = rx_fg.get_rx_bytes() + self.assertEqual(test_data, rx_data) if __name__ == '__main__': gr_unittest.run(test_ofdm_txrx, "test_ofdm_txrx.xml") |