diff options
Diffstat (limited to 'gr-digital/python')
-rw-r--r-- | gr-digital/python/digital/ofdm_txrx.py | 48 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_txrx.py | 30 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_scrambler.py | 39 |
3 files changed, 104 insertions, 13 deletions
diff --git a/gr-digital/python/digital/ofdm_txrx.py b/gr-digital/python/digital/ofdm_txrx.py index a48a48db41..76bf337b8d 100644 --- a/gr-digital/python/digital/ofdm_txrx.py +++ b/gr-digital/python/digital/ofdm_txrx.py @@ -145,7 +145,8 @@ class ofdm_tx(gr.hier_block2): sync_word1=None, sync_word2=None, rolloff=0, - debug_log=False + debug_log=False, + scramble_bits=False ): gr.hier_block2.__init__(self, "ofdm_tx", gr.io_signature(1, 1, gr.sizeof_char), @@ -193,13 +194,32 @@ class ofdm_tx(gr.hier_block2): ### Payload modulation ############################################### payload_constellation = _get_constellation(bps_payload) payload_mod = digital.chunks_to_symbols_bc(payload_constellation.points()) + if scramble_bits: + self.connect( + crc, + digital.additive_scrambler_bb( + 0x8a, 0x7f, 7, + bits_per_byte=bps_payload, + reset_tag_key=self.packet_length_tag_key + ), + blocks.repack_bits_bb( + 8, # Unpack 8 bits per byte + bps_payload, + self.packet_length_tag_key + ), + payload_mod + ) + else: + self.connect( + crc, + blocks.repack_bits_bb( + 8, # Unpack 8 bits per byte + bps_payload, + self.packet_length_tag_key + ), + payload_mod + ) self.connect( - crc, - blocks.repack_bits_bb( - 8, # Unpack 8 bits per byte - bps_payload, - self.packet_length_tag_key - ), payload_mod, (header_payload_mux, 1) ) @@ -262,7 +282,8 @@ class ofdm_rx(gr.hier_block2): bps_payload=1, sync_word1=None, sync_word2=None, - debug_log=False + debug_log=False, + scramble_bits=False ): gr.hier_block2.__init__(self, "ofdm_rx", gr.io_signature(1, 1, gr.sizeof_gr_complex), @@ -372,7 +393,16 @@ class ofdm_rx(gr.hier_block2): payload_demod = digital.constellation_decoder_cb(payload_constellation.base()) repack = blocks.repack_bits_bb(bps_payload, 8, self.packet_length_tag_key, True) crc = digital.crc32_bb(True, self.packet_length_tag_key) - self.connect((hpd, 1), payload_fft, payload_eq, payload_serializer, payload_demod, repack, crc, self) + self.connect((hpd, 1), payload_fft, payload_eq, payload_serializer, payload_demod, repack) + if scramble_bits: + descrambler = digital.additive_scrambler_bb( + 0x8a, 0x7f, 7, + bits_per_byte=bps_payload, + reset_tag_key=self.packet_length_tag_key + ) + self.connect(repack, descrambler, crc, self) + else: + self.connect(repack, crc, self) if debug_log: self.connect((hpd, 1), blocks.tag_debug(gr.sizeof_gr_complex*fft_len, 'post-hpd')); self.connect(payload_fft, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-fft.dat')) diff --git a/gr-digital/python/digital/qa_ofdm_txrx.py b/gr-digital/python/digital/qa_ofdm_txrx.py index adf93ee356..6f5bd2ed10 100755 --- a/gr-digital/python/digital/qa_ofdm_txrx.py +++ b/gr-digital/python/digital/qa_ofdm_txrx.py @@ -31,11 +31,11 @@ from gnuradio.digital.utils import tagged_streams LOG_DEBUG_INFO=False class ofdm_tx_fg (gr.top_block): - def __init__(self, data, len_tag_key): + def __init__(self, data, len_tag_key, scramble_bits=False): gr.top_block.__init__(self, "ofdm_tx") tx_data, tags = tagged_streams.packets_to_vectors((data,), len_tag_key) src = blocks.vector_source_b(data, False, 1, tags) - self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO) + self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO, scramble_bits=scramble_bits) self.sink = blocks.vector_sink_c() self.connect(src, self.tx, self.sink) @@ -43,12 +43,12 @@ class ofdm_tx_fg (gr.top_block): return self.sink.data() class ofdm_rx_fg (gr.top_block): - def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100): + def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100, scramble_bits=False): gr.top_block.__init__(self, "ofdm_rx") if prepend_zeros: samples = (0,) * prepend_zeros + tuple(samples) src = blocks.vector_source_c(tuple(samples) + (0,) * 1000) - self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO) + self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO, scramble_bits=scramble_bits) if channel is not None: self.connect(src, channel, self.rx) else: @@ -113,6 +113,28 @@ class test_ofdm_txrx (gr_unittest.TestCase): self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2)) self.assertEqual(test_data, rx_data) + def test_003_tx1packet_scramble(self): + """ Same as before, use scrambler. """ + len_tag_key = 'frame_len' + n_bytes = 21 + fft_len = 64 + test_data = tuple([random.randint(0, 255) for x in range(n_bytes)]) + # 1.0/fft_len is one sub-carrier, a fine freq offset stays below that + freq_offset = 1.0 / fft_len * 0.7 + #channel = channels.channel_model(0.01, freq_offset) + channel = None + # Tx + tx_fg = ofdm_tx_fg(test_data, len_tag_key, scramble_bits=True) + tx_fg.run() + tx_samples = tx_fg.get_tx_samples() + # Rx + rx_fg = ofdm_rx_fg(tx_samples, len_tag_key, channel, prepend_zeros=100, scramble_bits=True) + rx_fg.run() + rx_data = rx_fg.get_rx_bytes() + self.assertEqual(tuple(tx_fg.tx.sync_word1), tuple(rx_fg.rx.sync_word1)) + self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2)) + self.assertEqual(test_data, rx_data) + def test_004_tx1packet_large_fO(self): """ Transmit one packet, with slight AWGN and large frequency offset. Check packet is received and no bit errors have occurred. """ diff --git a/gr-digital/python/digital/qa_scrambler.py b/gr-digital/python/digital/qa_scrambler.py index 05daebd389..e19e963cd3 100755 --- a/gr-digital/python/digital/qa_scrambler.py +++ b/gr-digital/python/digital/qa_scrambler.py @@ -21,6 +21,7 @@ # from gnuradio import gr, gr_unittest, digital, blocks +import pmt class test_scrambler(gr_unittest.TestCase): @@ -60,5 +61,43 @@ class test_scrambler(gr_unittest.TestCase): self.tb.run() self.assertEqual(src_data, dst.data()) + def test_additive_scrambler_reset_3bpb(self): + src_data = (5,)*2000 + src = blocks.vector_source_b(src_data, False) + scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 3) + descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 3) + dst = blocks.vector_sink_b() + dst2 = blocks.vector_sink_b() + self.tb.connect(src, scrambler, descrambler, dst) + self.tb.connect(scrambler, dst2) + self.tb.run() + if not (src_data == dst.data()): + self.fail('Not equal.') + self.assertEqual(src_data, src_data) + + def test_additive_scrambler_tags(self): + print 'tags' + src_data = (1,)*1000 + src = blocks.vector_source_b(src_data, False) + scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100) + descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100) + reset_tag_key = 'reset_lfsr' + reset_tag1 = gr.tag_t() + reset_tag1.key = pmt.string_to_symbol(reset_tag_key) + reset_tag1.offset = 17 + reset_tag2 = gr.tag_t() + reset_tag2.key = pmt.string_to_symbol(reset_tag_key) + reset_tag2.offset = 110 + reset_tag3 = gr.tag_t() + reset_tag3.key = pmt.string_to_symbol(reset_tag_key) + reset_tag3.offset = 523 + src = blocks.vector_source_b(src_data, False, 1, (reset_tag1, reset_tag2, reset_tag3)) + scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 1, reset_tag_key) + descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 1, reset_tag_key) + dst = blocks.vector_sink_b() + self.tb.connect(src, scrambler, descrambler, dst) + self.tb.run() + self.assertEqual(src_data, dst.data()) + if __name__ == '__main__': gr_unittest.run(test_scrambler, "test_scrambler.xml") |