summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_ofdm_txrx.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_txrx.py')
-rwxr-xr-xgr-digital/python/digital/qa_ofdm_txrx.py120
1 files changed, 99 insertions, 21 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_txrx.py b/gr-digital/python/digital/qa_ofdm_txrx.py
index 681041d47d..adf93ee356 100755
--- a/gr-digital/python/digital/qa_ofdm_txrx.py
+++ b/gr-digital/python/digital/qa_ofdm_txrx.py
@@ -21,12 +21,44 @@
#
import random
-
import numpy
-from gnuradio import gr, gr_unittest, digital, blocks
+
+from gnuradio import gr, gr_unittest, digital, blocks, channels
from gnuradio.digital.ofdm_txrx import ofdm_tx, ofdm_rx
from gnuradio.digital.utils import tagged_streams
+# Set this to true if you need to write out data
+LOG_DEBUG_INFO=False
+
+class ofdm_tx_fg (gr.top_block):
+ def __init__(self, data, len_tag_key):
+ gr.top_block.__init__(self, "ofdm_tx")
+ tx_data, tags = tagged_streams.packets_to_vectors((data,), len_tag_key)
+ src = blocks.vector_source_b(data, False, 1, tags)
+ self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO)
+ self.sink = blocks.vector_sink_c()
+ self.connect(src, self.tx, self.sink)
+
+ def get_tx_samples(self):
+ return self.sink.data()
+
+class ofdm_rx_fg (gr.top_block):
+ def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100):
+ gr.top_block.__init__(self, "ofdm_rx")
+ if prepend_zeros:
+ samples = (0,) * prepend_zeros + tuple(samples)
+ src = blocks.vector_source_c(tuple(samples) + (0,) * 1000)
+ self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO)
+ if channel is not None:
+ self.connect(src, channel, self.rx)
+ else:
+ self.connect(src, self.rx)
+ self.sink = blocks.vector_sink_b()
+ self.connect(self.rx, self.sink)
+
+ def get_rx_bytes(self):
+ return self.sink.data()
+
class test_ofdm_txrx (gr_unittest.TestCase):
def setUp (self):
@@ -35,26 +67,72 @@ class test_ofdm_txrx (gr_unittest.TestCase):
def tearDown (self):
self.tb = None
- def test_001 (self):
- pass
- #len_tag_key = 'frame_len'
- #n_bytes = 100
- #test_data = [random.randint(0, 255) for x in range(n_bytes)]
- #tx_data, tags = tagged_streams.packets_to_vectors((test_data,), len_tag_key)
- #src = blocks.vector_source_b(test_data, False, 1, tags)
- #tx = ofdm_tx(frame_length_tag_key=len_tag_key)
- #rx = ofdm_rx(frame_length_tag_key=len_tag_key)
- #self.assertEqual(tx.sync_word1, rx.sync_word1)
- #self.assertEqual(tx.sync_word2, rx.sync_word2)
- #delay = blocks.delay(gr.sizeof_gr_complex, 100)
- #noise = analog.noise_source_c(analog.GR_GAUSSIAN, 0.05)
- #add = blocks.add_cc()
- #sink = blocks.vector_sink_b()
- ##self.tb.connect(src, tx, add, rx, sink)
- ##self.tb.connect(noise, (add, 1))
- #self.tb.connect(src, tx, blocks.null_sink(gr.sizeof_gr_complex))
- #self.tb.run()
+ def test_001_tx (self):
+ """ Just make sure the Tx works in general """
+ len_tag_key = 'frame_len'
+ n_bytes = 52
+ n_samples_expected = (numpy.ceil(1.0 * (n_bytes + 4) / 6) + 3) * 80
+ test_data = [random.randint(0, 255) for x in range(n_bytes)]
+ tx_data, tags = tagged_streams.packets_to_vectors((test_data,), len_tag_key)
+ src = blocks.vector_source_b(test_data, False, 1, tags)
+ tx = ofdm_tx(packet_length_tag_key=len_tag_key)
+ tx_fg = ofdm_tx_fg(test_data, len_tag_key)
+ tx_fg.run()
+ self.assertEqual(len(tx_fg.get_tx_samples()), n_samples_expected)
+
+ def test_002_rx_only_noise(self):
+ """ Run the RX with only noise, check it doesn't crash
+ or return a burst. """
+ len_tag_key = 'frame_len'
+ samples = (0,) * 1000
+ channel = channels.channel_model(0.1)
+ rx_fg = ofdm_rx_fg(samples, len_tag_key, channel)
+ rx_fg.run()
+ self.assertEqual(len(rx_fg.get_rx_bytes()), 0)
+
+ def test_003_tx1packet(self):
+ """ Transmit one packet, with slight AWGN and slight frequency + timing offset.
+ Check packet is received and no bit errors have occurred. """
+ len_tag_key = 'frame_len'
+ n_bytes = 21
+ fft_len = 64
+ test_data = tuple([random.randint(0, 255) for x in range(n_bytes)])
+ # 1.0/fft_len is one sub-carrier, a fine freq offset stays below that
+ freq_offset = 1.0 / fft_len * 0.7
+ #channel = channels.channel_model(0.01, freq_offset)
+ channel = None
+ # Tx
+ tx_fg = ofdm_tx_fg(test_data, len_tag_key)
+ tx_fg.run()
+ tx_samples = tx_fg.get_tx_samples()
+ # Rx
+ rx_fg = ofdm_rx_fg(tx_samples, len_tag_key, channel, prepend_zeros=100)
+ rx_fg.run()
+ rx_data = rx_fg.get_rx_bytes()
+ self.assertEqual(tuple(tx_fg.tx.sync_word1), tuple(rx_fg.rx.sync_word1))
+ self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2))
+ self.assertEqual(test_data, rx_data)
+ def test_004_tx1packet_large_fO(self):
+ """ Transmit one packet, with slight AWGN and large frequency offset.
+ Check packet is received and no bit errors have occurred. """
+ fft_len = 64
+ len_tag_key = 'frame_len'
+ n_bytes = 21
+ test_data = tuple([random.randint(0, 255) for x in range(n_bytes)])
+ #test_data = tuple([255 for x in range(n_bytes)])
+ # 1.0/fft_len is one sub-carrier
+ frequency_offset = 1.0 / fft_len * 2.5
+ channel = channels.channel_model(0.00001, frequency_offset)
+ # Tx
+ tx_fg = ofdm_tx_fg(test_data, len_tag_key)
+ tx_fg.run()
+ tx_samples = tx_fg.get_tx_samples()
+ # Rx
+ rx_fg = ofdm_rx_fg(tx_samples, len_tag_key, channel, prepend_zeros=100)
+ rx_fg.run()
+ rx_data = rx_fg.get_rx_bytes()
+ self.assertEqual(test_data, rx_data)
if __name__ == '__main__':
gr_unittest.run(test_ofdm_txrx, "test_ofdm_txrx.xml")