summaryrefslogtreecommitdiff
path: root/gr-digital/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python')
-rw-r--r--gr-digital/python/digital/ofdm_txrx.py48
-rwxr-xr-xgr-digital/python/digital/qa_ofdm_txrx.py30
-rwxr-xr-xgr-digital/python/digital/qa_scrambler.py39
3 files changed, 104 insertions, 13 deletions
diff --git a/gr-digital/python/digital/ofdm_txrx.py b/gr-digital/python/digital/ofdm_txrx.py
index a48a48db41..76bf337b8d 100644
--- a/gr-digital/python/digital/ofdm_txrx.py
+++ b/gr-digital/python/digital/ofdm_txrx.py
@@ -145,7 +145,8 @@ class ofdm_tx(gr.hier_block2):
sync_word1=None,
sync_word2=None,
rolloff=0,
- debug_log=False
+ debug_log=False,
+ scramble_bits=False
):
gr.hier_block2.__init__(self, "ofdm_tx",
gr.io_signature(1, 1, gr.sizeof_char),
@@ -193,13 +194,32 @@ class ofdm_tx(gr.hier_block2):
### Payload modulation ###############################################
payload_constellation = _get_constellation(bps_payload)
payload_mod = digital.chunks_to_symbols_bc(payload_constellation.points())
+ if scramble_bits:
+ self.connect(
+ crc,
+ digital.additive_scrambler_bb(
+ 0x8a, 0x7f, 7,
+ bits_per_byte=bps_payload,
+ reset_tag_key=self.packet_length_tag_key
+ ),
+ blocks.repack_bits_bb(
+ 8, # Unpack 8 bits per byte
+ bps_payload,
+ self.packet_length_tag_key
+ ),
+ payload_mod
+ )
+ else:
+ self.connect(
+ crc,
+ blocks.repack_bits_bb(
+ 8, # Unpack 8 bits per byte
+ bps_payload,
+ self.packet_length_tag_key
+ ),
+ payload_mod
+ )
self.connect(
- crc,
- blocks.repack_bits_bb(
- 8, # Unpack 8 bits per byte
- bps_payload,
- self.packet_length_tag_key
- ),
payload_mod,
(header_payload_mux, 1)
)
@@ -262,7 +282,8 @@ class ofdm_rx(gr.hier_block2):
bps_payload=1,
sync_word1=None,
sync_word2=None,
- debug_log=False
+ debug_log=False,
+ scramble_bits=False
):
gr.hier_block2.__init__(self, "ofdm_rx",
gr.io_signature(1, 1, gr.sizeof_gr_complex),
@@ -372,7 +393,16 @@ class ofdm_rx(gr.hier_block2):
payload_demod = digital.constellation_decoder_cb(payload_constellation.base())
repack = blocks.repack_bits_bb(bps_payload, 8, self.packet_length_tag_key, True)
crc = digital.crc32_bb(True, self.packet_length_tag_key)
- self.connect((hpd, 1), payload_fft, payload_eq, payload_serializer, payload_demod, repack, crc, self)
+ self.connect((hpd, 1), payload_fft, payload_eq, payload_serializer, payload_demod, repack)
+ if scramble_bits:
+ descrambler = digital.additive_scrambler_bb(
+ 0x8a, 0x7f, 7,
+ bits_per_byte=bps_payload,
+ reset_tag_key=self.packet_length_tag_key
+ )
+ self.connect(repack, descrambler, crc, self)
+ else:
+ self.connect(repack, crc, self)
if debug_log:
self.connect((hpd, 1), blocks.tag_debug(gr.sizeof_gr_complex*fft_len, 'post-hpd'));
self.connect(payload_fft, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-fft.dat'))
diff --git a/gr-digital/python/digital/qa_ofdm_txrx.py b/gr-digital/python/digital/qa_ofdm_txrx.py
index adf93ee356..6f5bd2ed10 100755
--- a/gr-digital/python/digital/qa_ofdm_txrx.py
+++ b/gr-digital/python/digital/qa_ofdm_txrx.py
@@ -31,11 +31,11 @@ from gnuradio.digital.utils import tagged_streams
LOG_DEBUG_INFO=False
class ofdm_tx_fg (gr.top_block):
- def __init__(self, data, len_tag_key):
+ def __init__(self, data, len_tag_key, scramble_bits=False):
gr.top_block.__init__(self, "ofdm_tx")
tx_data, tags = tagged_streams.packets_to_vectors((data,), len_tag_key)
src = blocks.vector_source_b(data, False, 1, tags)
- self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO)
+ self.tx = ofdm_tx(packet_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO, scramble_bits=scramble_bits)
self.sink = blocks.vector_sink_c()
self.connect(src, self.tx, self.sink)
@@ -43,12 +43,12 @@ class ofdm_tx_fg (gr.top_block):
return self.sink.data()
class ofdm_rx_fg (gr.top_block):
- def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100):
+ def __init__(self, samples, len_tag_key, channel=None, prepend_zeros=100, scramble_bits=False):
gr.top_block.__init__(self, "ofdm_rx")
if prepend_zeros:
samples = (0,) * prepend_zeros + tuple(samples)
src = blocks.vector_source_c(tuple(samples) + (0,) * 1000)
- self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO)
+ self.rx = ofdm_rx(frame_length_tag_key=len_tag_key, debug_log=LOG_DEBUG_INFO, scramble_bits=scramble_bits)
if channel is not None:
self.connect(src, channel, self.rx)
else:
@@ -113,6 +113,28 @@ class test_ofdm_txrx (gr_unittest.TestCase):
self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2))
self.assertEqual(test_data, rx_data)
+ def test_003_tx1packet_scramble(self):
+ """ Same as before, use scrambler. """
+ len_tag_key = 'frame_len'
+ n_bytes = 21
+ fft_len = 64
+ test_data = tuple([random.randint(0, 255) for x in range(n_bytes)])
+ # 1.0/fft_len is one sub-carrier, a fine freq offset stays below that
+ freq_offset = 1.0 / fft_len * 0.7
+ #channel = channels.channel_model(0.01, freq_offset)
+ channel = None
+ # Tx
+ tx_fg = ofdm_tx_fg(test_data, len_tag_key, scramble_bits=True)
+ tx_fg.run()
+ tx_samples = tx_fg.get_tx_samples()
+ # Rx
+ rx_fg = ofdm_rx_fg(tx_samples, len_tag_key, channel, prepend_zeros=100, scramble_bits=True)
+ rx_fg.run()
+ rx_data = rx_fg.get_rx_bytes()
+ self.assertEqual(tuple(tx_fg.tx.sync_word1), tuple(rx_fg.rx.sync_word1))
+ self.assertEqual(tuple(tx_fg.tx.sync_word2), tuple(rx_fg.rx.sync_word2))
+ self.assertEqual(test_data, rx_data)
+
def test_004_tx1packet_large_fO(self):
""" Transmit one packet, with slight AWGN and large frequency offset.
Check packet is received and no bit errors have occurred. """
diff --git a/gr-digital/python/digital/qa_scrambler.py b/gr-digital/python/digital/qa_scrambler.py
index 05daebd389..e19e963cd3 100755
--- a/gr-digital/python/digital/qa_scrambler.py
+++ b/gr-digital/python/digital/qa_scrambler.py
@@ -21,6 +21,7 @@
#
from gnuradio import gr, gr_unittest, digital, blocks
+import pmt
class test_scrambler(gr_unittest.TestCase):
@@ -60,5 +61,43 @@ class test_scrambler(gr_unittest.TestCase):
self.tb.run()
self.assertEqual(src_data, dst.data())
+ def test_additive_scrambler_reset_3bpb(self):
+ src_data = (5,)*2000
+ src = blocks.vector_source_b(src_data, False)
+ scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 3)
+ descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 3)
+ dst = blocks.vector_sink_b()
+ dst2 = blocks.vector_sink_b()
+ self.tb.connect(src, scrambler, descrambler, dst)
+ self.tb.connect(scrambler, dst2)
+ self.tb.run()
+ if not (src_data == dst.data()):
+ self.fail('Not equal.')
+ self.assertEqual(src_data, src_data)
+
+ def test_additive_scrambler_tags(self):
+ print 'tags'
+ src_data = (1,)*1000
+ src = blocks.vector_source_b(src_data, False)
+ scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100)
+ descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100)
+ reset_tag_key = 'reset_lfsr'
+ reset_tag1 = gr.tag_t()
+ reset_tag1.key = pmt.string_to_symbol(reset_tag_key)
+ reset_tag1.offset = 17
+ reset_tag2 = gr.tag_t()
+ reset_tag2.key = pmt.string_to_symbol(reset_tag_key)
+ reset_tag2.offset = 110
+ reset_tag3 = gr.tag_t()
+ reset_tag3.key = pmt.string_to_symbol(reset_tag_key)
+ reset_tag3.offset = 523
+ src = blocks.vector_source_b(src_data, False, 1, (reset_tag1, reset_tag2, reset_tag3))
+ scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 1, reset_tag_key)
+ descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100, 1, reset_tag_key)
+ dst = blocks.vector_sink_b()
+ self.tb.connect(src, scrambler, descrambler, dst)
+ self.tb.run()
+ self.assertEqual(src_data, dst.data())
+
if __name__ == '__main__':
gr_unittest.run(test_scrambler, "test_scrambler.xml")