summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_serializer_vcc.py')
-rwxr-xr-xgr-digital/python/digital/qa_ofdm_serializer_vcc.py120
1 files changed, 39 insertions, 81 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
index 69997ce981..8a60b97882 100755
--- a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
+++ b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2012,2013 Free Software Foundation, Inc.
+# Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -29,6 +29,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "ts_last"
def tearDown (self):
self.tb = None
@@ -42,21 +43,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(1, 16)) + (0, 0, 0)
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, "", False)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, serializer, sink)
+ src = blocks.vector_source_c(tx_symbols, False, fft_len)
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key, "", 0, "", False)
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
+ self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 1)
- result_tag = sink.tags()[0]
- self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name)
- self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0]))
+ self.assertEqual(sink.data()[0], expected_result)
def test_001b_shifted (self):
""" Same as before, but shifted, because that's the normal mode in OFDM Rx """
@@ -69,21 +61,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(18))
occupied_carriers = ((13, 14, 15, 1, 2, 3), (-4, -2, -1, 1, 2, 4),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, serializer, sink)
+ src = blocks.vector_source_c(tx_symbols, False, fft_len)
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
+ self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 1)
- result_tag = sink.tags()[0]
- self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name)
- self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0]))
+ self.assertEqual(sink.data()[0], expected_result)
def test_002_with_offset (self):
""" Standard test, carrier offset """
@@ -96,32 +79,24 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(1, 16)) + (0, 0, 0)
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
offsettag = gr.tag_t()
offsettag.offset = 0
offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offsettag.value = pmt.from_long(carr_offset)
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag))
- sink = blocks.vector_sink_c()
+ src = blocks.vector_source_c(tx_symbols, False, fft_len, (offsettag,))
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
serializer = digital.ofdm_serializer_vcc(
fft_len,
occupied_carriers,
- tag_name,
+ self.tsb_key,
"", 0,
"ofdm_sync_carr_offset",
False
)
- self.tb.connect(src, serializer, sink)
+ self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 2)
- for tag in sink.tags():
- if pmt.symbol_to_string(tag.key) == tag_name:
- self.assertEqual(pmt.to_long(tag.value), n_syms * len(occupied_carriers[0]))
+ self.assertEqual(sink.data()[0], expected_result)
+ self.assertEqual(len(sink.tags()), 1)
def test_003_connect (self):
""" Connect carrier_allocator to ofdm_serializer,
@@ -133,19 +108,14 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
pilot_symbols = ((1j,),(-1j,))
#tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)])
tx_data = (1, 2, 3, 4)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_data))
- src = blocks.vector_source_c(tx_data, False, 1, (tag,))
+ src = blocks.vector_source_c(tx_data, False, 1)
alloc = digital.ofdm_carrier_allocator_cvc(
fft_len,
occupied_carriers,
pilot_carriers,
pilot_symbols,
(), # No sync word
- tag_name,
+ self.tsb_key,
True # Output is shifted (default)
)
serializer = digital.ofdm_serializer_vcc(
@@ -155,10 +125,10 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
"", # Carrier offset key
True # Input is shifted (default)
)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, alloc, serializer, sink)
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
+ self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_data), self.tsb_key), alloc, serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), tx_data)
+ self.assertEqual(sink.data()[0], tx_data)
def test_004_connect (self):
"""
@@ -176,33 +146,30 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
pilot_carriers = ((-3,),(3,))
pilot_symbols = ((1j,),(-1j,))
tx_data = (1, 2, 3, 4)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_data))
offsettag = gr.tag_t()
offsettag.offset = 0
offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offsettag.value = pmt.from_long(carr_offset)
- src = blocks.vector_source_c(tx_data, False, 1, (tag, offsettag))
+ src = blocks.vector_source_c(tx_data, False, 1, (offsettag,))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
pilot_carriers,
pilot_symbols, (),
- tag_name)
+ self.tsb_key)
tx_ifft = fft.fft_vcc(fft_len, False, (1.0/fft_len,)*fft_len, True)
oscillator = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0)
mixer = blocks.multiply_cc()
rx_fft = fft.fft_vcc(fft_len, True, (), True)
- sink2 = blocks.vector_sink_c(fft_len)
+ sink2 = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key)
self.tb.connect(rx_fft, sink2)
serializer = digital.ofdm_serializer_vcc(
alloc, "", 0, "ofdm_sync_carr_offset", True
)
- sink = blocks.vector_sink_c()
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
self.tb.connect(
- src, alloc, tx_ifft,
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_data), self.tsb_key),
+ alloc, tx_ifft,
blocks.vector_to_stream(gr.sizeof_gr_complex, fft_len),
(mixer, 0),
blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len),
@@ -210,7 +177,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
)
self.tb.connect(oscillator, (mixer, 1))
self.tb.run ()
- self.assertComplexTuplesAlmostEqual(sink.data()[-len(occupied_carriers[0]):], tx_data, places=4)
+ self.assertComplexTuplesAlmostEqual(sink.data()[0][-len(occupied_carriers[0]):], tx_data, places=4)
def test_005_packet_len_tag (self):
""" Standard test """
@@ -222,32 +189,23 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(1, 16))
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
+ packet_len_tsb_key = "packet_len"
tag2 = gr.tag_t()
tag2.offset = 0
tag2.key = pmt.string_to_symbol("packet_len")
tag2.value = pmt.from_long(len(expected_result))
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, tag2))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, "", False)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, serializer, sink)
+ src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag2,))
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key, packet_len_tsb_key , 0, "", False)
+ sink = blocks.tsb_vector_sink_c(tsb_key=packet_len_tsb_key)
+ self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 1)
- result_tag = sink.tags()[0]
- self.assertEqual(pmt.symbol_to_string(result_tag.key), "packet_len")
- self.assertEqual(pmt.to_long(result_tag.value), len(expected_result))
+ self.assertEqual(sink.data()[0], expected_result)
def test_099 (self):
""" Make sure it fails if it should """
fft_len = 16
- occupied_carriers = ((1, 3, 4, 11, 12, 112),)
- tag_name = "len"
- self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, tag_name)
+ occupied_carriers = ((1, 3, 4, 11, 12, 112),) # Something invalid
+ self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, self.tsb_key)
if __name__ == '__main__':