summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py89
1 files changed, 49 insertions, 40 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
index 610d883b6c..5b67d868a0 100644
--- a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
+++ b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
@@ -22,23 +22,37 @@
from __future__ import division
-import numpy
import random
+import numpy
from gnuradio import gr, gr_unittest, blocks, analog, channels
from gnuradio import digital
from gnuradio.digital.utils import tagged_streams
from gnuradio.digital.ofdm_txrx import ofdm_tx
+def make_bpsk_burst(fft_len, cp_len, num_bits):
+ """ Create a burst of a sync symbol and some BPSK bits """
+ sync_symbol = [
+ (random.randint(0, 1)*2)-1
+ for x in range(fft_len // 2)
+ ] * 2
+ sync_symbols = sync_symbol[-cp_len:] + sync_symbol
+ mod_symbols = [
+ (random.randint(0, 1)*2)-1
+ for x in range(num_bits)
+ ]
+ return sync_symbols + mod_symbols
+
class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
+ random.seed(0)
self.tb = gr.top_block ()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_001_detect (self):
+ def test_001_detect(self):
""" Send two bursts, with zeros in between, and check
they are both detected at the correct position and no
false alarms occur """
@@ -46,15 +60,11 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
fft_len = 32
cp_len = 4
sig_len = (fft_len + cp_len) * 10
- sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len // 2)] * 2
- tx_signal = [0,] * n_zeros + \
- sync_symbol[-cp_len:] + \
- sync_symbol + \
- [(random.randint(0, 1)*2)-1 for x in range(sig_len)]
+ tx_signal = [0,] * n_zeros + make_bpsk_burst(fft_len, cp_len, sig_len)
tx_signal = tx_signal * 2
add = blocks.add_cc()
sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
- sink_freq = blocks.vector_sink_f()
+ sink_freq = blocks.vector_sink_f()
sink_detect = blocks.vector_sink_b()
self.tb.connect(blocks.vector_source_c(tx_signal), (add, 0))
self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .01), (add, 1))
@@ -69,7 +79,7 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
self.assertEqual(numpy.sum(sig1_detect), 1)
self.assertEqual(numpy.sum(sig2_detect), 1)
- def test_002_freq (self):
+ def test_002_freq(self):
""" Add a fine frequency offset and see if that gets detected properly """
fft_len = 32
cp_len = 4
@@ -77,12 +87,9 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
max_freq_offset = 2*numpy.pi/fft_len # Otherwise, it's coarse
freq_offset = ((2 * random.random()) - 1) * max_freq_offset
sig_len = (fft_len + cp_len) * 10
- sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len // 2)] * 2
- tx_signal = sync_symbol[-cp_len:] + \
- sync_symbol + \
- [(random.randint(0, 1)*2)-1 for x in range(sig_len)]
+ tx_signal = make_bpsk_burst(fft_len, cp_len, sig_len)
sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len, True)
- sink_freq = blocks.vector_sink_f()
+ sink_freq = blocks.vector_sink_f()
sink_detect = blocks.vector_sink_b()
channel = channels.channel_model(0.005, freq_offset / 2.0 / numpy.pi)
self.tb.connect(blocks.vector_source_c(tx_signal), channel, sync)
@@ -93,39 +100,45 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
est_freq_offset = 2 * phi_hat / fft_len
self.assertAlmostEqual(est_freq_offset, freq_offset, places=2)
- def test_003_multiburst (self):
+ def test_003_multiburst(self):
""" Send several bursts, see if the number of detects is correct.
Burst lengths and content are random.
+
+ The channel is assumed AWGN for this test.
"""
n_bursts = 42
fft_len = 32
cp_len = 4
tx_signal = []
- for i in range(n_bursts):
- sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len // 2)] * 2
- tx_signal += [0,] * random.randint(0, 2*fft_len) + \
- sync_symbol[-cp_len:] + \
- sync_symbol + \
- [(random.randint(0, 1)*2)-1 for x in range(fft_len * random.randint(5,23))]
- add = blocks.add_cc()
+ for _ in range(n_bursts):
+ gap = [0,] * random.randint(0, 2*fft_len)
+ tx_signal += \
+ gap + \
+ make_bpsk_burst(fft_len, cp_len, fft_len * random.randint(5, 23))
+ # Very loose definition of SNR here
+ snr = 20 # dB
+ sigma = 10**(-snr/10)
+ # Add noise -- we don't use the channel model blocks, we want to keep
+ # this test as self-contained as possible, and all randomness should
+ # derive from random.seed() above
+ complex_randn = \
+ lambda N: (numpy.random.randn(N) + 1j * numpy.random.randn(N)) * sigma / numpy.sqrt(2)
+ tx_signal += complex_randn(len(tx_signal))
sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
- sink_freq = blocks.vector_sink_f()
+ sink_freq = blocks.vector_sink_f()
sink_detect = blocks.vector_sink_b()
- channel = channels.channel_model(0.005)
- self.tb.connect(blocks.vector_source_c(tx_signal), channel, sync)
+ self.tb.connect(blocks.vector_source_c(tx_signal), sync)
self.tb.connect((sync, 0), sink_freq)
self.tb.connect((sync, 1), sink_detect)
self.tb.run()
n_bursts_detected = numpy.sum(sink_detect.data())
- # We allow for one false alarm or missed burst
- self.assertTrue(abs(n_bursts_detected - n_bursts) <= 1,
- msg="""Because of statistics, it is possible (though unlikely)
-that the number of detected bursts differs slightly. If the number of detects is
-off by one or two, run the test again and see what happen.
-Detection error was: %d """ % (numpy.sum(sink_detect.data()) - n_bursts)
+ self.assertEqual(
+ n_bursts_detected, n_bursts,
+ msg="Detection error (missed bursts): {}".format(
+ (numpy.sum(sink_detect.data()) - n_bursts))
)
- def test_004_ofdm_packets (self):
+ def test_004_ofdm_packets(self):
"""
Send several bursts using ofdm_tx, see if the number of detects is correct.
Burst lengths and content are random.
@@ -136,23 +149,20 @@ Detection error was: %d """ % (numpy.sum(sink_detect.data()) - n_bursts)
# Here, coarse freq offset is allowed
max_freq_offset = 2*numpy.pi/fft_len * 4
freq_offset = ((2 * random.random()) - 1) * max_freq_offset
- tx_signal = []
packets = []
tagname = "packet_length"
min_packet_length = 10
max_packet_length = 50
- sync_sequence = [random.randint(0, 1)*2-1 for x in range(fft_len // 2)]
- for i in range(n_bursts):
+ for _ in range(n_bursts):
packet_length = random.randint(min_packet_length,
max_packet_length+1)
packet = [random.randint(0, 255) for i in range(packet_length)]
packets.append(packet)
data, tags = tagged_streams.packets_to_vectors(packets, tagname, vlen=1)
- total_length = len(data)
src = blocks.vector_source_b(data, False, 1, tags)
mod = ofdm_tx(packet_length_tag_key=tagname)
sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
- sink_freq = blocks.vector_sink_f()
+ sink_freq = blocks.vector_sink_f()
sink_detect = blocks.vector_sink_b()
noise_level = 0.005
channel = channels.channel_model(noise_level, freq_offset / 2 / numpy.pi)
@@ -164,4 +174,3 @@ Detection error was: %d """ % (numpy.sum(sink_detect.data()) - n_bursts)
if __name__ == '__main__':
gr_unittest.run(qa_ofdm_sync_sc_cfb, "qa_ofdm_sync_sc_cfb.xml")
-