summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_serializer_vcc.py')
-rwxr-xr-xgr-digital/python/digital/qa_ofdm_serializer_vcc.py122
1 files changed, 83 insertions, 39 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
index 8a5e8f4893..fbef2b1f27 100755
--- a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
+++ b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
@@ -22,7 +22,7 @@
import numpy
-from gnuradio import gr, gr_unittest, blocks. fft, analog, digital
+from gnuradio import gr, gr_unittest, blocks, fft, analog, digital
import pmt
class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
@@ -36,7 +36,6 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
def test_001_simple (self):
""" Standard test """
fft_len = 16
- tx_symbols = range(1, 16);
tx_symbols = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0,
0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0,
0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0)
@@ -44,12 +43,39 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
tag_name = "len"
- tag = gr.gr_tag_t()
+ tag = gr.tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(n_syms)
+ src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,))
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, "", False)
+ sink = blocks.vector_sink_c()
+ self.tb.connect(src, serializer, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(len(sink.tags()), 1)
+ result_tag = sink.tags()[0]
+ self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name)
+ self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0]))
+
+ def test_001b_shifted (self):
+ """ Same as before, but shifted, because that's the normal mode in OFDM Rx """
+ fft_len = 16
+ tx_symbols = (
+ 0, 0, 0, 0, 0, 0, 1, 2, 0, 3, 4, 5, 0, 0, 0, 0,
+ 0, 0, 0, 0, 6, 1j, 7, 8, 0, 9, 10, 1j, 11, 0, 0, 0,
+ 0, 0, 0, 0, 0, 12, 13, 14, 0, 15, 16, 17, 0, 0, 0, 0,
+ )
+ expected_result = tuple(range(18))
+ occupied_carriers = ((13, 14, 15, 1, 2, 3), (-4, -2, -1, 1, 2, 4),)
+ n_syms = len(tx_symbols)/fft_len
+ tag_name = "len"
+ tag = gr.tag_t()
tag.offset = 0
tag.key = pmt.string_to_symbol(tag_name)
tag.value = pmt.from_long(n_syms)
src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False)
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name)
sink = blocks.vector_sink_c()
self.tb.connect(src, serializer, sink)
self.tb.run ()
@@ -71,17 +97,24 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
tag_name = "len"
- tag = gr.gr_tag_t()
+ tag = gr.tag_t()
tag.offset = 0
tag.key = pmt.string_to_symbol(tag_name)
tag.value = pmt.from_long(n_syms)
- offsettag = gr.gr_tag_t()
+ offsettag = gr.tag_t()
offsettag.offset = 0
offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offsettag.value = pmt.from_long(carr_offset)
src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False)
sink = blocks.vector_sink_c()
+ serializer = digital.ofdm_serializer_vcc(
+ fft_len,
+ occupied_carriers,
+ tag_name,
+ "", 0,
+ "ofdm_sync_carr_offset",
+ False
+ )
self.tb.connect(src, serializer, sink)
self.tb.run ()
self.assertEqual(sink.data(), expected_result)
@@ -93,29 +126,39 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
def test_003_connect (self):
""" Connect carrier_allocator to ofdm_serializer,
make sure output==input """
- fft_len = 32
- n_syms = 10
+ fft_len = 8
+ n_syms = 1
occupied_carriers = ((1, 2, 6, 7),)
pilot_carriers = ((3,),(5,))
pilot_symbols = ((1j,),(-1j,))
- sync_word = (range(fft_len),)
- tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)])
+ #tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)])
+ tx_data = (1, 2, 3, 4)
tag_name = "len"
- tag = gr.gr_tag_t()
+ tag = gr.tag_t()
tag.offset = 0
tag.key = pmt.string_to_symbol(tag_name)
tag.value = pmt.from_long(len(tx_data))
src = blocks.vector_source_c(tx_data, False, 1, (tag,))
- alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
- occupied_carriers,
- pilot_carriers,
- pilot_symbols, sync_word,
- tag_name)
- serializer = digital.ofdm_serializer_vcc(alloc)
+ alloc = digital.ofdm_carrier_allocator_cvc(
+ fft_len,
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ (), # No sync word
+ tag_name,
+ True # Output is shifted (default)
+ )
+ serializer = digital.ofdm_serializer_vcc(
+ alloc,
+ "", # Len tag key
+ 0, # Symbols skipped
+ "", # Carrier offset key
+ True # Input is shifted (default)
+ )
sink = blocks.vector_sink_c()
self.tb.connect(src, alloc, serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data()[4:], tx_data)
+ self.assertEqual(sink.data(), tx_data)
def test_004_connect (self):
"""
@@ -126,21 +169,19 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
- Frequency offset is -2 carriers
"""
fft_len = 8
- n_syms = 2
+ n_syms = 1
carr_offset = -2
- freq_offset = 2 * numpy.pi * carr_offset / fft_len # If the sampling rate == 1
- occupied_carriers = ((1, 2, -2, -1),)
- pilot_carriers = ((3,),(5,))
+ freq_offset = 1.0 / fft_len * carr_offset # Normalized frequency
+ occupied_carriers = ((-2, -1, 1, 2),)
+ pilot_carriers = ((-3,),(3,))
pilot_symbols = ((1j,),(-1j,))
- sync_word = (range(fft_len),)
- tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)])
- #tx_data = (1,) * occupied_carriers[0] * n_syms
+ tx_data = (1, 2, 3, 4)
tag_name = "len"
- tag = gr.gr_tag_t()
+ tag = gr.tag_t()
tag.offset = 0
tag.key = pmt.string_to_symbol(tag_name)
tag.value = pmt.from_long(len(tx_data))
- offsettag = gr.gr_tag_t()
+ offsettag = gr.tag_t()
offsettag.offset = 0
offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offsettag.value = pmt.from_long(carr_offset)
@@ -148,13 +189,17 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
pilot_carriers,
- pilot_symbols, sync_word,
+ pilot_symbols, (),
tag_name)
- tx_ifft = fft.fft_vcc(fft_len, False, ())
- offset_sig = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0)
+ tx_ifft = fft.fft_vcc(fft_len, False, (1.0/fft_len,)*fft_len, True)
+ oscillator = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0/fft_len)
mixer = blocks.multiply_cc()
rx_fft = fft.fft_vcc(fft_len, True, (), True)
- serializer = digital.ofdm_serializer_vcc(alloc)
+ sink2 = blocks.vector_sink_c(fft_len)
+ self.tb.connect(rx_fft, sink2)
+ serializer = digital.ofdm_serializer_vcc(
+ alloc, "", 0, "ofdm_sync_carr_offset", True
+ )
sink = blocks.vector_sink_c()
self.tb.connect(
src, alloc, tx_ifft,
@@ -163,10 +208,9 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len),
rx_fft, serializer, sink
)
- self.tb.connect(offset_sig, (mixer, 1))
+ self.tb.connect(oscillator, (mixer, 1))
self.tb.run ()
- # FIXME check this
- #self.assertEqual(sink.data(), tx_data)
+ self.assertComplexTuplesAlmostEqual(sink.data()[-len(occupied_carriers[0]):], tx_data, places=4)
def test_005_packet_len_tag (self):
""" Standard test """
@@ -179,16 +223,16 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
tag_name = "len"
- tag = gr.gr_tag_t()
+ tag = gr.tag_t()
tag.offset = 0
tag.key = pmt.string_to_symbol(tag_name)
tag.value = pmt.from_long(n_syms)
- tag2 = gr.gr_tag_t()
+ tag2 = gr.tag_t()
tag2.offset = 0
tag2.key = pmt.string_to_symbol("packet_len")
tag2.value = pmt.from_long(len(expected_result))
src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, tag2))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, False)
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, "", False)
sink = blocks.vector_sink_c()
self.tb.connect(src, serializer, sink)
self.tb.run ()
@@ -201,7 +245,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
def test_099 (self):
""" Make sure it fails if it should """
fft_len = 16
- occupied_carriers = ((1, 3, 4, 11, 12, 17),)
+ occupied_carriers = ((1, 3, 4, 11, 12, 112),)
tag_name = "len"
self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, tag_name)