summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_ofdm_txrx.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_txrx.py')
-rwxr-xr-xgr-digital/python/digital/qa_ofdm_txrx.py30
1 files changed, 26 insertions, 4 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_txrx.py b/gr-digital/python/digital/qa_ofdm_txrx.py
index adf93ee356..6f5bd2ed10 100755
--- a/gr-digital/python/digital/qa_ofdm_txrx.py
+++ b/gr-digital/python/digital/qa_ofdm_txrx.py
@@ -31,11 +31,11 @@ from gnuradio.digital.utils import tagged_streams
LOG_DEBUG_INFO=False
class ofdm_tx_fg (gr.top_block):
- def __init__(self, data, len_tag_key):
+ def __init__(self, data, len_tag_key, scramble_bits=False):
gr.top_block.__init__(self, "ofdm_tx")
tx_data, tags = tagged_streams.packets_to_vectors((data,), len_tag_key)
src = blocks.vector_source_b(data, False, 1, tags)
- self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO)
+ self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO, scramble_bits=scramble_bits)
self.sink = blocks.vector_sink_c()
self.connect(src, self.tx, self.sink)
@@ -43,12 +43,12 @@ class ofdm_tx_fg (gr.top_block):
return self.sink.data()
class ofdm_rx_fg (gr.top_block):
- def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100):
+ def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100, scramble_bits=False):
gr.top_block.__init__(self, "ofdm_rx")
if prepend_zeros:
samples = (0,) * prepend_zeros + tuple(samples)
src = blocks.vector_source_c(tuple(samples) + (0,) * 1000)
- self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO)
+ self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO, scramble_bits=scramble_bits)
if channel is not None:
self.connect(src, channel, self.rx)
else:
@@ -113,6 +113,28 @@ class test_ofdm_txrx (gr_unittest.TestCase):
self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2))
self.assertEqual(test_data, rx_data)
+ def test_003_tx1packet_scramble(self):
+ """ Same as before, use scrambler. """
+ len_tag_key = 'frame_len'
+ n_bytes = 21
+ fft_len = 64
+ test_data = tuple([random.randint(0, 255) for x in range(n_bytes)])
+ # 1.0/fft_len is one sub-carrier, a fine freq offset stays below that
+ freq_offset = 1.0 / fft_len * 0.7
+ #channel = channels.channel_model(0.01, freq_offset)
+ channel = None
+ # Tx
+ tx_fg = ofdm_tx_fg(test_data, len_tag_key, scramble_bits=True)
+ tx_fg.run()
+ tx_samples = tx_fg.get_tx_samples()
+ # Rx
+ rx_fg = ofdm_rx_fg(tx_samples, len_tag_key, channel, prepend_zeros=100, scramble_bits=True)
+ rx_fg.run()
+ rx_data = rx_fg.get_rx_bytes()
+ self.assertEqual(tuple(tx_fg.tx.sync_word1), tuple(rx_fg.rx.sync_word1))
+ self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2))
+ self.assertEqual(test_data, rx_data)
+
def test_004_tx1packet_large_fO(self):
""" Transmit one packet, with slight AWGN and large frequency offset.
Check packet is received and no bit errors have occurred. """