summaryrefslogtreecommitdiff
path: root/gr-digital/python
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2013-03-15 09:57:31 -0700
committerJohnathan Corgan <johnathan@corganlabs.com>2013-03-15 09:57:31 -0700
commite965a5bb209ad46a509ccd21f393667fd69d95f9 (patch)
tree40d683ef8f80980f12eac6cfe7f3423c49b45f87 /gr-digital/python
parent2bf9c4cb4b0b426690f353fc1662a13e70c0d5e0 (diff)
parent27990ca9e236931e39a830e48f0a1efe13ec085f (diff)
Merge branch 'ofdm-master' into ofdm-next
Added fixups for next branch changes Conflicts: CMakeLists.txt gnuradio-core/src/lib/io/gr_message_sink.cc gnuradio-core/src/lib/io/gr_message_sink.h gnuradio-core/src/lib/io/gr_message_sink.i gnuradio-core/src/lib/io/gr_message_source.cc gnuradio-core/src/lib/io/gr_message_source.h gnuradio-core/src/lib/io/gr_message_source.i gr-blocks/CMakeLists.txt gr-digital/CMakeLists.txt gr-digital/grc/digital_block_tree.xml gr-digital/include/digital/CMakeLists.txt gr-digital/include/digital_ofdm_cyclic_prefixer.h gr-digital/lib/CMakeLists.txt gr-digital/lib/digital_ofdm_cyclic_prefixer.cc gr-digital/lib/ofdm_cyclic_prefixer_impl.h gr-digital/python/CMakeLists.txt gr-digital/swig/CMakeLists.txt gr-digital/swig/digital_swig.i
Diffstat (limited to 'gr-digital/python')
-rw-r--r--gr-digital/python/CMakeLists.txt7
-rw-r--r--gr-digital/python/__init__.py2
-rw-r--r--gr-digital/python/fftshift.py31
-rw-r--r--gr-digital/python/ofdm_txrx.py268
-rwxr-xr-xgr-digital/python/qa_crc32_bb.py150
-rwxr-xr-xgr-digital/python/qa_header_payload_demux.py78
-rwxr-xr-xgr-digital/python/qa_ofdm_carrier_allocator_cvc.py154
-rwxr-xr-xgr-digital/python/qa_ofdm_chanest_vcvc.py284
-rwxr-xr-xgr-digital/python/qa_ofdm_cyclic_prefixer.py95
-rwxr-xr-xgr-digital/python/qa_ofdm_frame_equalizer_vcvc.py161
-rwxr-xr-xgr-digital/python/qa_ofdm_serializer_vcc.py214
-rwxr-xr-xgr-digital/python/qa_ofdm_sync_sc_cfb.py203
-rwxr-xr-xgr-digital/python/qa_ofdm_txrx.py58
-rwxr-xr-xgr-digital/python/qa_packet_headergenerator_bb.py157
-rwxr-xr-xgr-digital/python/qa_packet_headerparser_b.py72
-rwxr-xr-xgr-digital/python/qa_scale_tags.py57
-rw-r--r--gr-digital/python/qa_ts_insert_zeros.py78
-rw-r--r--gr-digital/python/utils/tagged_streams.py112
18 files changed, 2180 insertions, 1 deletions
diff --git a/gr-digital/python/CMakeLists.txt b/gr-digital/python/CMakeLists.txt
index 7f35e3c2be..4dfaac362a 100644
--- a/gr-digital/python/CMakeLists.txt
+++ b/gr-digital/python/CMakeLists.txt
@@ -28,6 +28,7 @@ GR_PYTHON_INSTALL(
bpsk.py
cpm.py
crc.py
+ fftshift.py
generic_mod_demod.py
gmsk.py
gfsk.py
@@ -39,6 +40,7 @@ GR_PYTHON_INSTALL(
ofdm_sync_ml.py
ofdm_sync_pnac.py
ofdm_sync_pn.py
+ ofdm_txrx.py
packet_utils.py
pkt.py
psk.py
@@ -55,6 +57,7 @@ GR_PYTHON_INSTALL(
utils/gray_code.py
utils/mod_codes.py
utils/alignment.py
+ utils/tagged_streams.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio/digital/utils
COMPONENT "digital_python"
)
@@ -73,8 +76,10 @@ list(APPEND GR_TEST_PYTHON_DIRS
${CMAKE_BINARY_DIR}/gr-analog/swig
${CMAKE_BINARY_DIR}/gr-blocks/python
${CMAKE_BINARY_DIR}/gr-blocks/swig
+ ${CMAKE_BINARY_DIR}/gr-fft/python
+ ${CMAKE_BINARY_DIR}/gr-fft/swig
)
-list(APPEND GR_TEST_TARGET_DEPS gnuradio-digital gnuradio-filter gnuradio-fft gnuradio-analog)
+list(APPEND GR_TEST_TARGET_DEPS gnuradio-digital gnuradio-filter gnuradio-fft gnuradio-analog gnuradio-blocks)
include(GrTest)
file(GLOB py_qa_test_files "qa_*.py")
diff --git a/gr-digital/python/__init__.py b/gr-digital/python/__init__.py
index 962f210324..28b74261f7 100644
--- a/gr-digital/python/__init__.py
+++ b/gr-digital/python/__init__.py
@@ -42,6 +42,8 @@ from ofdm_sync_fixed import *
from ofdm_sync_ml import *
from ofdm_sync_pnac import *
from ofdm_sync_pn import *
+from fftshift import fftshift, ifftshift
+from ofdm_txrx import ofdm_tx, ofdm_rx
import packet_utils
import ofdm_packet_utils
diff --git a/gr-digital/python/fftshift.py b/gr-digital/python/fftshift.py
new file mode 100644
index 0000000000..c8c7c7f140
--- /dev/null
+++ b/gr-digital/python/fftshift.py
@@ -0,0 +1,31 @@
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+# Aliases for fftpack.(i)fftshift()
+
+from scipy import fftpack
+
+def fftshift(x):
+ return fftpack.fftshift(x)
+
+def ifftshift(x):
+ return fftpack.ifftshift(x)
+
diff --git a/gr-digital/python/ofdm_txrx.py b/gr-digital/python/ofdm_txrx.py
new file mode 100644
index 0000000000..2734e9cc1c
--- /dev/null
+++ b/gr-digital/python/ofdm_txrx.py
@@ -0,0 +1,268 @@
+#
+# Copyright 2005,2006,2007 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+"""
+OFDM Transmitter / Receiver hier blocks.
+
+For simple configurations, no need to connect all the relevant OFDM blocks
+to form an OFDM Tx/Rx--simply use these.
+"""
+
+import numpy
+from gnuradio import gr
+import digital_swig as digital
+from utils import tagged_streams
+
+try:
+ # This will work when feature #505 is added.
+ from gnuradio import fft
+ from gnuradio import blocks
+ from gnuradio import analog
+except ImportError:
+ # Until then this will work.
+ import fft_swig as fft
+ import blocks_swig as blocks
+ import analog_swig as analog
+
+_def_fft_len = 64
+_def_cp_len = 16
+_def_frame_length_tag_key = "frame_length"
+_def_packet_length_tag_key = "frame_length"
+_def_packet_num_tag_key = ""
+_def_occupied_carriers=(range(1, 27) + range(38, 64),)
+_def_pilot_carriers=((0,),)
+_def_pilot_symbols=((100,),)
+_seq_seed = 42
+
+def _make_sync_word(fft_len, occupied_carriers, constellation):
+ """ Makes a random sync sequence """
+ occupied_carriers = list(occupied_carriers[0])
+ occupied_carriers = [occupied_carriers[x] + fft_len if occupied_carriers[x] < 0 else occupied_carriers[x] for x in range(len(occupied_carriers))]
+ numpy.random.seed(_seq_seed)
+ sync_sequence = [constellation.map_to_points_v(numpy.random.randint(constellation.arity()))[0] * numpy.sqrt(2) if x in occupied_carriers and x % 3 else 0 for x in range(fft_len)]
+ return sync_sequence
+
+def _get_constellation(bps):
+ """ Returns a modulator block for a given number of bits per symbol """
+ constellation = {
+ 1: digital.constellation_bpsk(),
+ 2: digital.constellation_qpsk(),
+ 3: digital.constellation_8psk()
+ }
+ try:
+ return constellation[bps]
+ except KeyError:
+ print 'Modulation not supported.'
+ exit(1)
+
+class ofdm_tx(gr.hier_block2):
+ """
+ Hierarchical block for OFDM modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ fft_len: The length of FFT (integer).
+ cp_len: The length of cyclic prefix (integer).
+ occupied_carriers: ??
+ pilot_carriers: ??
+ pilot_symbols: ??
+ length_tag_key: The name of the tag giving packet length.
+ """
+ def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len,
+ frame_length_tag_key=_def_frame_length_tag_key,
+ occupied_carriers=_def_occupied_carriers,
+ pilot_carriers=_def_pilot_carriers,
+ pilot_symbols=_def_pilot_symbols,
+ bps_header=1,
+ bps_payload=1,
+ sync_word1=None,
+ sync_word2=None,
+ rolloff=0
+ ):
+ gr.hier_block2.__init__(self, "ofdm_tx",
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
+ self.fft_len = fft_len
+ self.cp_len = cp_len
+ self.frame_length_tag_key = frame_length_tag_key
+ self.occupied_carriers = occupied_carriers
+ self.pilot_carriers = pilot_carriers
+ self.pilot_symbols = pilot_symbols
+ self.bps_header = bps_header
+ self.bps_payload = bps_payload
+ n_sync_words = 1
+ header_constellation = _get_constellation(bps_header)
+ header_mod = digital.chunks_to_symbols_bc(header_constellation.points())
+ self.sync_word1 = sync_word1
+ if sync_word1 is None:
+ self.sync_word1 = _make_sync_word(fft_len, occupied_carriers, header_constellation)
+ else:
+ if len(sync_word1) != self.fft_len:
+ raise ValueError("Length of sync sequence(s) must be FFT length.")
+ total_sync_word = self.sync_word1
+ self.sync_word2 = ()
+ if sync_word2 is not None:
+ if len(sync_word2) != fft_len:
+ raise ValueError("Length of sync sequence(s) must be FFT length.")
+ self.sync_word2 = sync_word2
+ n_sync_words = 2
+ total_sync_word = sync_word1 + sync_word2
+ crc = digital.crc32_bb(False, self.frame_length_tag_key)
+ formatter_object = digital.packet_header_ofdm(
+ occupied_carriers, 1, "", "", "",
+ bps_header
+ )
+ header_gen = digital.packet_headergenerator_bb(formatter_object.base())
+ header_payload_mux = blocks.tagged_stream_mux(gr.sizeof_gr_complex*1, self.frame_length_tag_key)
+ self.connect(self, crc, header_gen, header_mod, (header_payload_mux, 0))
+ payload_constellation = _get_constellation(bps_payload)
+ payload_mod = digital.chunks_to_symbols_bc(payload_constellation.points())
+ self.connect(
+ crc,
+ blocks.repack_bits_bb(8, bps_payload, frame_length_tag_key),
+ payload_mod,
+ (header_payload_mux, 1)
+ )
+ self.connect(payload_mod, gr.tag_debug(gr.sizeof_gr_complex, "pmod"))
+ sync_word_gen = gr.vector_source_c(
+ total_sync_word, True, self.fft_len,
+ tagged_streams.make_lengthtags((n_sync_words,), (0,), self.frame_length_tag_key)
+ )
+ allocator = digital.ofdm_carrier_allocator_cvc(
+ self.fft_len,
+ occupied_carriers=self.occupied_carriers,
+ pilot_carriers=self.pilot_carriers,
+ pilot_symbols=self.pilot_symbols,
+ len_tag_key=self.frame_length_tag_key
+ )
+ syncword_data_mux = blocks.tagged_stream_mux(gr.sizeof_gr_complex*self.fft_len, self.frame_length_tag_key)
+ self.connect(sync_word_gen, (syncword_data_mux, 0))
+ self.connect(header_payload_mux, allocator, (syncword_data_mux, 1))
+ ffter = fft.fft_vcc(self.fft_len, False, (), False)
+ cyclic_prefixer = digital.ofdm_cyclic_prefixer(
+ self.fft_len,
+ self.fft_len+self.cp_len,
+ rolloff,
+ self.frame_length_tag_key
+ )
+ self.connect(syncword_data_mux, ffter, cyclic_prefixer, self)
+
+
+class ofdm_rx(gr.hier_block2):
+ """
+ Hierarchical block for OFDM demodulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ fft_len: The length of FFT (integer).
+ cp_len: The length of cyclic prefix (integer).
+ occupied_carriers: ??
+ pilot_carriers: ??
+ pilot_symbols: ??
+ length_tag_key: The name of the tag giving packet length.
+ """
+ def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len,
+ frame_length_tag_key=_def_frame_length_tag_key,
+ packet_length_tag_key=_def_packet_length_tag_key,
+ packet_num_tag_key=_def_packet_num_tag_key,
+ occupied_carriers=_def_occupied_carriers,
+ pilot_carriers=_def_pilot_carriers,
+ pilot_symbols=_def_pilot_symbols,
+ bps_header=1,
+ bps_payload=1,
+ sync_word1=None,
+ sync_word2=None
+ ):
+ gr.hier_block2.__init__(self, "ofdm_rx",
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(1, 1, gr.sizeof_char))
+ self.fft_len = fft_len
+ self.cp_len = cp_len
+ self.frame_length_tag_key = frame_length_tag_key
+ self.packet_length_tag_key = packet_length_tag_key
+ self.occupied_carriers = occupied_carriers
+ self.bps_header = bps_header
+ self.bps_payload = bps_payload
+ n_sync_words = 1
+ header_constellation = _get_constellation(bps_header)
+ if sync_word1 is None:
+ self.sync_word1 = _make_sync_word(fft_len, occupied_carriers, header_constellation)
+ else:
+ if len(sync_word1) != self.fft_len:
+ raise ValueError("Length of sync sequence(s) must be FFT length.")
+ self.sync_word1 = sync_word1
+ self.sync_word2 = ()
+ if sync_word2 is not None:
+ if len(sync_word2) != fft_len:
+ raise ValueError("Length of sync sequence(s) must be FFT length.")
+ self.sync_word2 = sync_word2
+ n_sync_words = 2
+ else:
+ sync_word2 = ()
+ # Receiver path
+ sync_detect = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
+ oscillator = analog.frequency_modulator_fc(-2.0 / fft_len)
+ delay = gr.delay(gr.sizeof_gr_complex, fft_len+cp_len)
+ mixer = gr.multiply_cc()
+ hpd = digital.header_payload_demux(n_sync_words, fft_len, cp_len,
+ frame_length_tag_key, "", True)
+ self.connect(self, sync_detect)
+ self.connect((sync_detect, 0), oscillator, (mixer, 0))
+ self.connect(self, delay, (mixer, 1))
+ self.connect(mixer, (hpd, 0))
+ self.connect((sync_detect, 1), (hpd, 1))
+ # Header demodulation
+ header_fft = fft.fft_vcc(self.fft_len, True, (), True)
+ chanest = digital.ofdm_chanest_vcvc(self.sync_word1, self.sync_word2, 1)
+ header_equalizer = digital.ofdm_equalizer_simpledfe(
+ fft_len, header_constellation.base(),
+ occupied_carriers, pilot_carriers, pilot_symbols
+ )
+ header_eq = digital.ofdm_frame_equalizer_vcvc(header_equalizer.base(), frame_length_tag_key, True)
+ header_serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers)
+ header_constellation = _get_constellation(bps_header)
+ header_demod = digital.constellation_decoder_cb(header_constellation.base())
+ header_formatter = digital.packet_header_ofdm(
+ occupied_carriers, 1,
+ packet_length_tag_key,
+ frame_length_tag_key,
+ packet_num_tag_key,
+ bps_header
+ )
+ header_parser = digital.packet_headerparser_b(header_formatter.formatter())
+ self.connect((hpd, 0), header_fft, chanest, header_eq, header_serializer, header_demod, header_parser)
+ self.msg_connect(header_parser, "header_data", hpd, "header_data")
+ # Payload demodulation
+ payload_fft = fft.fft_vcc(self.fft_len, True, (), True)
+ payload_equalizer = digital.ofdm_equalizer_simpledfe(
+ fft_len, header_constellation.base(),
+ occupied_carriers, pilot_carriers, pilot_symbols, 1
+ )
+ payload_eq = digital.ofdm_frame_equalizer_vcvc(payload_equalizer.base(), frame_length_tag_key)
+ payload_serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers)
+ payload_constellation = _get_constellation(bps_payload)
+ payload_demod = digital.constellation_decoder_cb(payload_constellation.base())
+ bit_packer = blocks.repack_bits_bb(bps_payload, 8, packet_length_tag_key, True)
+ self.connect((hpd, 1), payload_fft, payload_eq, payload_serializer, payload_demod, bit_packer, self)
+
diff --git a/gr-digital/python/qa_crc32_bb.py b/gr-digital/python/qa_crc32_bb.py
new file mode 100755
index 0000000000..4574b9dca7
--- /dev/null
+++ b/gr-digital/python/qa_crc32_bb.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import digital_swig as digital
+try: import pmt
+except: from gruel import pmt
+
+class qa_crc32_bb (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_crc_len (self):
+ """ Make sure the output of a CRC set is 4 bytes longer than the input. """
+ data = range(16)
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
+ src = gr.vector_source_b(data, False, 1, (tag,))
+ crc = digital.crc32_bb(False, tag_name)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, crc, sink)
+ self.tb.run()
+ # Check that the packets before crc_check are 4 bytes longer that the input.
+ self.assertEqual(len(data)+4, len(sink.data()))
+
+ def test_002_crc_equal (self):
+ """ Go through CRC set / CRC check and make sure the output
+ is the same as the input. """
+ data = (0, 1, 2, 3, 4, 5, 6, 7, 8)
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
+ src = gr.vector_source_b(data, False, 1, (tag,))
+ crc = digital.crc32_bb(False, tag_name)
+ crc_check = digital.crc32_bb(True, tag_name)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, crc, crc_check, sink)
+ self.tb.run()
+ # Check that the packets after crc_check are the same as input.
+ self.assertEqual(data, sink.data())
+
+ def test_003_crc_correct_lentag (self):
+ tag_name = "length"
+ pack_len = 8
+ packets = range(pack_len*2)
+ tag1 = gr.gr_tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol(tag_name)
+ tag1.value = pmt.from_long(pack_len)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = pack_len
+ tag2.key = pmt.string_to_symbol(tag_name)
+ tag2.value = pmt.from_long(pack_len)
+ testtag1 = gr.gr_tag_t()
+ testtag1.offset = 1
+ testtag1.key = pmt.string_to_symbol("tag1")
+ testtag1.value = pmt.from_long(0)
+ testtag2 = gr.gr_tag_t()
+ testtag2.offset = pack_len
+ testtag2.key = pmt.string_to_symbol("tag2")
+ testtag2.value = pmt.from_long(0)
+ testtag3 = gr.gr_tag_t()
+ testtag3.offset = len(packets)-1
+ testtag3.key = pmt.string_to_symbol("tag3")
+ testtag3.value = pmt.from_long(0)
+ src = gr.vector_source_b(packets, False, 1, (tag1, tag2, testtag1, testtag2, testtag3))
+ crc = digital.crc32_bb(False, tag_name)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, crc, sink)
+ self.tb.run()
+ self.assertEqual(len(sink.data()), 2*(pack_len+4))
+ correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19}
+ tags_found = {'tag1': False, 'tag2': False, 'tag3': False}
+ for tag in sink.tags():
+ key = pmt.symbol_to_string(tag.key)
+ if key in correct_offsets.keys():
+ tags_found[key] = True
+ self.assertEqual(correct_offsets[key], tag.offset)
+ if key == tag_name:
+ self.assertTrue(tag.offset == 0 or tag.offset == pack_len+4)
+ self.assertTrue(all(tags_found.values()))
+
+
+ def test_004_fail (self):
+ """ Corrupt the data and make sure it fails CRC test. """
+ data = (0, 1, 2, 3, 4, 5, 6, 7)
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
+ src = gr.vector_source_b(data, False, 1, (tag,))
+ crc = digital.crc32_bb(False, tag_name)
+ crc_check = digital.crc32_bb(True, tag_name)
+ corruptor = blocks.add_const_bb(1)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, crc, corruptor, crc_check, sink)
+ self.tb.run()
+ # crc_check will drop invalid packets
+ self.assertEqual(len(sink.data()), 0)
+
+ def test_005_tag_propagation (self):
+ """ Make sure tags on the CRC aren't lost. """
+ data = (0, 1, 2, 3, 4, 5, 6, 7, 8, 230, 166, 39, 8)
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
+ testtag = gr.gr_tag_t()
+ testtag.offset = len(data)-1
+ testtag.key = pmt.string_to_symbol('tag1')
+ testtag.value = pmt.from_long(0)
+ src = gr.vector_source_b(data, False, 1, (tag, testtag))
+ crc_check = digital.crc32_bb(True, tag_name)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, crc_check, sink)
+ self.tb.run()
+ self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1'])
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_crc32_bb, "qa_crc32_bb.xml")
diff --git a/gr-digital/python/qa_header_payload_demux.py b/gr-digital/python/qa_header_payload_demux.py
new file mode 100755
index 0000000000..4073f24ace
--- /dev/null
+++ b/gr-digital/python/qa_header_payload_demux.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import digital_swig as digital
+import time
+
+class qa_header_payload_demux (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_t (self):
+ """ Simplest possible test: put in zeros, then header,
+ then payload, trigger signal, try to demux.
+ The return signal from the header parser is faked via _post()
+ """
+ n_zeros = 100
+ header = (1, 2, 3)
+ payload = tuple(range(17))
+ data_signal = (0,) * n_zeros + header + payload
+ trigger_signal = [0,] * len(data_signal)
+ trigger_signal[n_zeros] = 1
+
+ data_src = gr.vector_source_f(data_signal, False)
+ trigger_src = gr.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
+ )
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 1)
+ header_sink = gr.vector_sink_f()
+ payload_sink = gr.vector_sink_f()
+
+ self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(trigger_src, (hpd, 1))
+ self.tb.connect((hpd, 0), header_sink)
+ self.tb.connect((hpd, 1), payload_sink)
+ self.tb.start()
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload))
+ )
+ while len(payload_sink.data()) < len(payload):
+ time.sleep(.2)
+ self.tb.stop()
+ self.tb.wait()
+
+ self.assertEqual(header_sink.data(), header)
+ self.assertEqual(payload_sink.data(), payload)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml")
+
diff --git a/gr-digital/python/qa_ofdm_carrier_allocator_cvc.py b/gr-digital/python/qa_ofdm_carrier_allocator_cvc.py
new file mode 100755
index 0000000000..2105727e04
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_carrier_allocator_cvc.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import digital_swig as digital
+
+class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_t (self):
+ """
+ pretty simple
+ """
+ fft_len = 6
+ tx_symbols = (1, 2, 3)
+ pilot_symbols = ((1j,),)
+ occupied_carriers = ((0, 1, 2),)
+ pilot_carriers = ((3,),)
+ expected_result = (1, 2, 3, 1j, 0, 0)
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_symbols))
+ src = gr.vector_source_c(tx_symbols, False, 1, (tag,))
+ alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ tag_name)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, alloc, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_result)
+
+ def test_002_t (self):
+ """
+ same, but using negative carrier indices
+ """
+ fft_len = 6
+ tx_symbols = (1, 2, 3)
+ pilot_symbols = ((1j,),)
+ occupied_carriers = ((-1, 1, 2),)
+ pilot_carriers = ((3,),)
+ expected_result = (0, 2, 3, 1j, 0, 1)
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_symbols))
+ src = gr.vector_source_c(tx_symbols, False, 1, (tag,))
+ alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ tag_name)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, alloc, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_result)
+
+ def test_003_t (self):
+ """
+ more advanced:
+ - 6 symbols per carrier
+ - 2 pilots per carrier
+ - have enough data for nearly 3 OFDM symbols
+ - send that twice
+ - add some random tags
+ """
+ tx_symbols = range(1, 16); # 15 symbols
+ pilot_symbols = ((1j, 2j), (3j, 4j))
+ occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
+ pilot_carriers = ((2, 13), (3, 12))
+ expected_result = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0,
+ 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0,
+ 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0)
+ fft_len = 16
+ tag_name = "len"
+ tag1 = gr.gr_tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol(tag_name)
+ tag1.value = pmt.from_long(len(tx_symbols))
+ tag2 = gr.gr_tag_t()
+ tag2.offset = len(tx_symbols)
+ tag2.key = pmt.string_to_symbol(tag_name)
+ tag2.value = pmt.from_long(len(tx_symbols))
+ testtag1 = gr.gr_tag_t()
+ testtag1.offset = 0
+ testtag1.key = pmt.string_to_symbol('tag1')
+ testtag1.value = pmt.from_long(0)
+ testtag2 = gr.gr_tag_t()
+ testtag2.offset = 7 # On the 2nd OFDM symbol
+ testtag2.key = pmt.string_to_symbol('tag2')
+ testtag2.value = pmt.from_long(0)
+ testtag3 = gr.gr_tag_t()
+ testtag3.offset = len(tx_symbols)+1 # First OFDM symbol of packet 2
+ testtag3.key = pmt.string_to_symbol('tag3')
+ testtag3.value = pmt.from_long(0)
+ testtag4 = gr.gr_tag_t()
+ testtag4.offset = 2*len(tx_symbols)-1 # Last OFDM symbol of packet 2
+ testtag4.key = pmt.string_to_symbol('tag4')
+ testtag4.value = pmt.from_long(0)
+ src = gr.vector_source_c(tx_symbols * 2, False, 1, (tag1, tag2, testtag1, testtag2, testtag3, testtag4))
+ alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ tag_name)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, alloc, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_result * 2)
+ tags_found = {'tag1': False, 'tag2': False, 'tag3': False, 'tag4': False}
+ correct_offsets = {'tag1': 0, 'tag2': 1, 'tag3': 3, 'tag4': 5}
+ for tag in sink.tags():
+ key = pmt.symbol_to_string(tag.key)
+ if key in tags_found.keys():
+ tags_found[key] = True
+ self.assertEqual(correct_offsets[key], tag.offset)
+ if key == tag_name:
+ self.assertTrue(tag.offset == 0 or tag.offset == 3)
+ self.assertTrue(pmt.to_long(tag.value) == 3)
+ self.assertTrue(all(tags_found.values()))
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_digital_carrier_allocator_cvc, "qa_digital_carrier_allocator_cvc.xml")
+
diff --git a/gr-digital/python/qa_ofdm_chanest_vcvc.py b/gr-digital/python/qa_ofdm_chanest_vcvc.py
new file mode 100755
index 0000000000..c7c0d83a84
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_chanest_vcvc.py
@@ -0,0 +1,284 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import blocks_swig as blocks
+import analog_swig as analog
+import digital_swig as digital
+import sys
+import numpy
+import random
+
+def shift_tuple(vec, N):
+ """ Shifts a vector by N elements. Fills up with zeros. """
+ if N > 0:
+ return (0,) * N + tuple(vec[0:-N])
+ else:
+ N = -N
+ return tuple(vec[N:]) + (0,) * N
+
+def rand_range(min_val, max_val):
+ """ Returns a random value (uniform) from the interval min_val, max_val """
+ return random.random() * (max_val - min_val) + min_val
+
+
+class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_offset_2sym (self):
+ """ Add a frequency offset, check if it's correctly detected.
+ Also add some random tags and see if they come out at the correct
+ position. """
+ fft_len = 16
+ carr_offset = -2
+ sync_symbol1 = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
+ sync_symbol2 = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
+ data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
+ tx_data = shift_tuple(sync_symbol1, carr_offset) + \
+ shift_tuple(sync_symbol2, carr_offset) + \
+ shift_tuple(data_symbol, carr_offset)
+ tag1 = gr.gr_tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol("test_tag_1")
+ tag1.value = pmt.from_long(23)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = 2
+ tag2.key = pmt.string_to_symbol("test_tag_2")
+ tag2.value = pmt.from_long(42)
+ src = gr.vector_source_c(tx_data, False, fft_len, (tag1, tag2))
+ chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, chanest, sink)
+ self.tb.run()
+ self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol)
+ tags = sink.tags()
+ detected_tags = {
+ 'ofdm_sync_carr_offset': False,
+ 'test_tag_1': False,
+ 'test_tag_2': False
+ }
+ for tag in tags:
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ carr_offset_hat = pmt.to_long(tag.value)
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'test_tag_1':
+ self.assertEqual(tag.offset, 0)
+ if pmt.symbol_to_string(tag.key) == 'test_tag_2':
+ self.assertEqual(tag.offset, 0)
+ detected_tags[pmt.symbol_to_string(tag.key)] = True
+ self.assertTrue(all(detected_tags.values()))
+
+ def test_002_offset_1sym (self):
+ """ Add a frequency offset, check if it's correctly detected.
+ Difference to previous test is, it only uses one synchronisation symbol. """
+ fft_len = 16
+ carr_offset = -2
+ # This will not correct for +2 because it thinks carrier 14 is used
+ # (because of interpolation)
+ sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
+ data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
+ tx_data = shift_tuple(sync_symbol, carr_offset) + \
+ shift_tuple(data_symbol, carr_offset)
+ src = gr.vector_source_c(tx_data, False, fft_len)
+ # 17 is out of bounds!
+ chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1, 0, 17)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, chanest, sink)
+ self.tb.run()
+ self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol)
+ tags = sink.tags()
+ for tag in tags:
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ carr_offset_hat = pmt.to_long(tag.value)
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+
+ def test_003_channel_no_carroffset (self):
+ """ Add a channel, check if it's correctly estimated """
+ fft_len = 16
+ carr_offset = 0
+ sync_symbol1 = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
+ sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1, 0, 0)
+ data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
+ tx_data = sync_symbol1 + sync_symbol2 + data_symbol
+ channel = (0, 0, 0, 2, -2, 2, 3j, 2, 0, 2, 2, 2, 2, 3, 0, 0)
+ src = gr.vector_source_c(tx_data, False, fft_len)
+ chan = blocks.multiply_const_vcc(channel)
+ chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, chan, chanest, sink)
+ self.tb.run()
+ tags = sink.tags()
+ self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel)))
+ for tag in tags:
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ self.assertEqual(pmt.c32vector_elements(tag.value), channel)
+
+ def test_004_channel_no_carroffset_1sym (self):
+ """ Add a channel, check if it's correctly estimated.
+ Only uses 1 synchronisation symbol. """
+ fft_len = 16
+ carr_offset = 0
+ sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
+ data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
+ tx_data = sync_symbol + data_symbol
+ channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
+ src = gr.vector_source_c(tx_data, False, fft_len)
+ chan = blocks.multiply_const_vcc(channel)
+ chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, chan, chanest, sink)
+ self.tb.run()
+ tags = sink.tags()
+ for tag in tags:
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ self.assertEqual(pmt.c32vector_elements(tag.value), channel)
+
+ def test_005_both_1sym_force (self):
+ """ Add a channel, check if it's correctly estimated.
+ Only uses 1 synchronisation symbol. """
+ fft_len = 16
+ carr_offset = 0
+ sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
+ ref_symbol = (0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0)
+ data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
+ tx_data = sync_symbol + data_symbol
+ channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
+ src = gr.vector_source_c(tx_data, False, fft_len)
+ chan = blocks.multiply_const_vcc(channel)
+ chanest = digital.ofdm_chanest_vcvc(sync_symbol, ref_symbol, 1)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, chan, chanest, sink)
+ self.tb.run()
+ tags = sink.tags()
+ for tag in tags:
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ self.assertEqual(pmt.c32vector_elements(tag.value), channel)
+
+ def test_006_channel_and_carroffset (self):
+ """ Add a channel, check if it's correctly estimated """
+ fft_len = 16
+ carr_offset = 2
+ # Index 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ sync_symbol1 = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
+ sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1, 0, 0)
+ data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
+ # Channel 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ # Shifted (0, 0, 0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1)
+ tx_data = shift_tuple(sync_symbol1, carr_offset) + \
+ shift_tuple(sync_symbol2, carr_offset) + \
+ shift_tuple(data_symbol, carr_offset)
+ channel = range(fft_len)
+ src = gr.vector_source_c(tx_data, False, fft_len)
+ chan = blocks.multiply_const_vcc(channel)
+ chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, chan, chanest, sink)
+ self.tb.run()
+ tags = sink.tags()
+ chan_est = None
+ for tag in tags:
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ chan_est = pmt.c32vector_elements(tag.value)
+ for i in range(fft_len):
+ if shift_tuple(sync_symbol2, carr_offset)[i]: # Only here the channel can be estimated
+ self.assertEqual(chan_est[i], channel[i])
+ self.assertEqual(sink.data(), tuple(numpy.multiply(shift_tuple(data_symbol, carr_offset), channel)))
+
+
+ def test_999_all_at_once(self):
+ """docstring for test_999_all_at_once"""
+ fft_len = 32
+ # 6 carriers empty, 10 carriers full, 1 DC carrier, 10 carriers full, 5 carriers empty
+ syncsym_mask = (0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0)
+ carrier_mask = (0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0)
+ max_offset = 4
+ wgn_amplitude = 0.05
+ min_chan_ampl = 0.1
+ max_chan_ampl = 5
+ n_iter = 100
+ def run_flow_graph(sync_sym1, sync_sym2, data_sym):
+ top_block = gr.top_block()
+ carr_offset = random.randint(-max_offset/2, max_offset/2) * 2
+ tx_data = shift_tuple(sync_sym1, carr_offset) + \
+ shift_tuple(sync_sym2, carr_offset) + \
+ shift_tuple(data_sym, carr_offset)
+ channel = [rand_range(min_chan_ampl, max_chan_ampl) * numpy.exp(1j * rand_range(0, 2 * numpy.pi)) for x in range(fft_len)]
+ src = gr.vector_source_c(tx_data, False, fft_len)
+ chan = blocks.multiply_const_vcc(channel)
+ noise = analog.noise_source_c(analog.GR_GAUSSIAN, wgn_amplitude)
+ add = blocks.add_cc(fft_len)
+ chanest = digital.ofdm_chanest_vcvc(sync_sym1, sync_sym2, 1)
+ sink = gr.vector_sink_c(fft_len)
+ top_block.connect(src, chan, (add, 0), chanest, sink)
+ top_block.connect(noise, blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), (add, 1))
+ top_block.run()
+ channel_est = None
+ carr_offset_hat = 0
+ rx_sym_est = [0,] * fft_len
+ tags = sink.tags()
+ for tag in tags:
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ carr_offset_hat = pmt.to_long(tag.value)
+ self.assertEqual(carr_offset, carr_offset_hat)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ channel_est = pmt.c32vector_elements(tag.value)
+ shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
+ for i in range(fft_len):
+ if shifted_carrier_mask[i] and channel_est[i]:
+ self.assertAlmostEqual(channel[i], channel_est[i], places=0)
+ rx_sym_est[i] = (sink.data()[i] / channel_est[i]).real
+ return (carr_offset, list(shift_tuple(rx_sym_est, -carr_offset_hat)))
+ bit_errors = 0
+ for k in xrange(n_iter):
+ sync_sym = [(random.randint(0, 1) * 2 - 1) * syncsym_mask[i] for i in range(fft_len)]
+ ref_sym = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)]
+ data_sym = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)]
+ data_sym[26] = 1
+ (carr_offset, rx_sym) = run_flow_graph(sync_sym, ref_sym, data_sym)
+ rx_sym_est = [0,] * fft_len
+ for i in xrange(fft_len):
+ if carrier_mask[i] == 0:
+ continue
+ rx_sym_est[i] = {True: 1, False: -1}[rx_sym[i] > 0]
+ if rx_sym_est[i] != data_sym[i]:
+ bit_errors += 1
+ # This is much more than we could allow
+ self.assertTrue(bit_errors < n_iter)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_ofdm_sync_eqinit_vcvc, "qa_ofdm_sync_eqinit_vcvc.xml")
+
diff --git a/gr-digital/python/qa_ofdm_cyclic_prefixer.py b/gr-digital/python/qa_ofdm_cyclic_prefixer.py
new file mode 100755
index 0000000000..003e987e95
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_cyclic_prefixer.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+#
+# Copyright 2007,2010,2011 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import digital_swig as digital
+
+class test_ofdm_cyclic_prefixer (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_wo_tags_no_rolloff(self):
+ " The easiest test: make sure the CP is added correctly. "
+ fft_len = 8
+ cp_len = 2
+ expected_result = (6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+ 6, 7, 0, 1, 2, 3, 4, 5, 6, 7)
+ src = gr.vector_source_c(range(fft_len) * 2, False, fft_len)
+ cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len)
+ sink = gr.vector_sink_c()
+ self.tb.connect(src, cp, sink)
+ self.tb.run()
+ self.assertEqual(sink.data(), expected_result)
+
+ def test_wo_tags_2s_rolloff(self):
+ " No tags, but have a 2-sample rolloff "
+ fft_len = 8
+ cp_len = 2
+ rolloff = 2
+ expected_result = (7.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, # 1.0/2
+ 7.0/2+1.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8)
+ src = gr.vector_source_c(range(1, fft_len+1) * 2, False, fft_len)
+ cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, rolloff)
+ sink = gr.vector_sink_c()
+ self.tb.connect(src, cp, sink)
+ self.tb.run()
+ self.assertEqual(sink.data(), expected_result)
+
+ def test_with_tags_2s_rolloff(self):
+ " With tags and a 2-sample rolloff "
+ fft_len = 8
+ cp_len = 2
+ tag_name = "length"
+ expected_result = (7.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, # 1.0/2
+ 7.0/2+1.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1.0/2)
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(2)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = 1
+ tag2.key = pmt.string_to_symbol("random_tag")
+ tag2.value = pmt.from_long(42)
+ src = gr.vector_source_c(range(1, fft_len+1) * 2, False, fft_len, (tag, tag2))
+ cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, 2, tag_name)
+ sink = gr.vector_sink_c()
+ self.tb.connect(src, cp, sink)
+ self.tb.run()
+ self.assertEqual(sink.data(), expected_result)
+ tags = [gr.tag_to_python(x) for x in sink.tags()]
+ tags = sorted([(x.offset, x.key, x.value) for x in tags])
+ expected_tags = [
+ (0, tag_name, len(expected_result)),
+ (fft_len+cp_len, "random_tag", 42)
+ ]
+ self.assertEqual(tags, expected_tags)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_ofdm_cyclic_prefixer, "test_ofdm_cyclic_prefixer.xml")
+
diff --git a/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py
new file mode 100755
index 0000000000..9faface03f
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import numpy
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import digital_swig as digital
+
+class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_simple (self):
+ """ Very simple functionality testing """
+ fft_len = 8
+ equalizer = digital.ofdm_equalizer_static(fft_len)
+ n_syms = 3
+ len_tag_key = "frame_len"
+ tx_data = (1,) * fft_len * n_syms
+ len_tag = gr.gr_tag_t()
+ len_tag.offset = 0
+ len_tag.key = pmt.string_to_symbol(len_tag_key)
+ len_tag.value = pmt.from_long(n_syms)
+ chan_tag = gr.gr_tag_t()
+ chan_tag.offset = 0
+ chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
+ chan_tag.value = pmt.init_c32vector(fft_len, (1,) * fft_len)
+ src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, eq, sink)
+ self.tb.run ()
+ # Check data
+ self.assertEqual(tx_data, sink.data())
+ for tag in sink.tags():
+ self.assertEqual(pmt.symbol_to_string(tag.key), len_tag_key)
+ self.assertEqual(pmt.to_long(tag.value), n_syms)
+
+ def test_002_static (self):
+ fft_len = 8
+ # 4 5 6 7 0 1 2 3
+ tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0
+ -1, -1, 0, 2, -1, 2, 0, -1, # 8
+ -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols)
+ -1, -1, 1, 1, -1, 0, 2, -1] # 24
+ cnst = digital.constellation_qpsk()
+ tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data]
+ occupied_carriers = ((1, 2, 6, 7),)
+ pilot_carriers = ((), (), (1, 2, 6, 7), ())
+ pilot_symbols = (
+ [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], []
+ )
+ equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols)
+ channel = [
+ 0, 0, 1, 1, 0, 1, 1, 0,
+ 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly...
+ 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here!
+ 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here.
+ ]
+ for idx in range(fft_len, 2*fft_len):
+ channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5))
+ idx2 = idx+2*fft_len
+ channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5))
+ len_tag_key = "frame_len"
+ len_tag = gr.gr_tag_t()
+ len_tag.offset = 0
+ len_tag.key = pmt.string_to_symbol(len_tag_key)
+ len_tag.value = pmt.from_long(4)
+ chan_tag = gr.gr_tag_t()
+ chan_tag.offset = 0
+ chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
+ chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len])
+ src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, eq, sink)
+ self.tb.run ()
+ rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()]
+ self.assertEqual(tx_data, rx_data)
+ for tag in sink.tags():
+ if pmt.symbol_to_string(tag.key) == len_tag_key:
+ self.assertEqual(pmt.to_long(tag.value), 4)
+ if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
+ self.assertEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:])
+
+ def test_002_simpledfe (self):
+ fft_len = 8
+ # 4 5 6 7 0 1 2 3
+ tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0
+ -1, -1, 0, 2, -1, 2, 0, -1, # 8
+ -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols)
+ -1, -1, 1, 1, -1, 0, 2, -1] # 24
+ cnst = digital.constellation_qpsk()
+ tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data]
+ occupied_carriers = ((1, 2, 6, 7),)
+ pilot_carriers = ((), (), (1, 2, 6, 7), ())
+ pilot_symbols = (
+ [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], []
+ )
+ equalizer = digital.ofdm_equalizer_simpledfe(
+ fft_len, cnst.base(), occupied_carriers, pilot_carriers, pilot_symbols, 0, 0.01
+ )
+ channel = [
+ 0, 0, 1, 1, 0, 1, 1, 0,
+ 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly...
+ 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here!
+ 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here.
+ ]
+ for idx in range(fft_len, 2*fft_len):
+ channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5))
+ idx2 = idx+2*fft_len
+ channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5))
+ len_tag_key = "frame_len"
+ len_tag = gr.gr_tag_t()
+ len_tag.offset = 0
+ len_tag.key = pmt.string_to_symbol(len_tag_key)
+ len_tag.value = pmt.from_long(4)
+ chan_tag = gr.gr_tag_t()
+ chan_tag.offset = 0
+ chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
+ chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len])
+ src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True)
+ sink = gr.vector_sink_c(fft_len)
+ self.tb.connect(src, eq, sink)
+ self.tb.run ()
+ rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()]
+ self.assertEqual(tx_data, rx_data)
+ for tag in sink.tags():
+ if pmt.symbol_to_string(tag.key) == len_tag_key:
+ self.assertEqual(pmt.to_long(tag.value), 4)
+ if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
+ self.assertComplexTuplesAlmostEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:], places=1)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_ofdm_frame_equalizer_vcvc, "qa_ofdm_frame_equalizer_vcvc.xml")
+
diff --git a/gr-digital/python/qa_ofdm_serializer_vcc.py b/gr-digital/python/qa_ofdm_serializer_vcc.py
new file mode 100755
index 0000000000..107d6076c5
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_serializer_vcc.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import fft_swig as fft
+import analog_swig as analog
+import digital_swig as digital
+try: import pmt
+except: from gruel import pmt
+import numpy
+
+class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_simple (self):
+ """ Standard test """
+ fft_len = 16
+ tx_symbols = range(1, 16);
+ tx_symbols = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0,
+ 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0,
+ 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0)
+ expected_result = tuple(range(1, 16)) + (0, 0, 0)
+ occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
+ n_syms = len(tx_symbols)/fft_len
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(n_syms)
+ src = gr.vector_source_c(tx_symbols, False, fft_len, (tag,))
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False)
+ sink = gr.vector_sink_c()
+ self.tb.connect(src, serializer, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(len(sink.tags()), 1)
+ result_tag = sink.tags()[0]
+ self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name)
+ self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0]))
+
+ def test_002_with_offset (self):
+ """ Standard test, carrier offset """
+ fft_len = 16
+ tx_symbols = range(1, 16);
+ tx_symbols = (0, 0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6,
+ 0, 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12,
+ 0, 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0)
+ carr_offset = 1 # Compare this with tx_symbols from the previous test
+ expected_result = tuple(range(1, 16)) + (0, 0, 0)
+ occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
+ n_syms = len(tx_symbols)/fft_len
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(n_syms)
+ offsettag = gr.gr_tag_t()
+ offsettag.offset = 0
+ offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
+ offsettag.value = pmt.from_long(carr_offset)
+ src = gr.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag))
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False)
+ sink = gr.vector_sink_c()
+ self.tb.connect(src, serializer, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(len(sink.tags()), 2)
+ for tag in sink.tags():
+ if pmt.symbol_to_string(tag.key) == tag_name:
+ self.assertEqual(pmt.to_long(tag.value), n_syms * len(occupied_carriers[0]))
+
+ def test_003_connect (self):
+ """ Connect carrier_allocator to ofdm_serializer,
+ make sure output==input """
+ fft_len = 8
+ n_syms = 10
+ occupied_carriers = ((1, 2, 6, 7),)
+ pilot_carriers = ((3,),(5,))
+ pilot_symbols = ((1j,),(-1j,))
+ tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)])
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_data))
+ src = gr.vector_source_c(tx_data, False, 1, (tag,))
+ alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ tag_name)
+ serializer = digital.ofdm_serializer_vcc(alloc)
+ sink = gr.vector_sink_c()
+ self.tb.connect(src, alloc, serializer, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), tx_data)
+
+ def test_004_connect (self):
+ """
+ Advanced test:
+ - Allocator -> IFFT -> Frequency offset -> FFT -> Serializer
+ - FFT does shift (moves DC to middle)
+ - Make sure input == output
+ - Frequency offset is -2 carriers
+ """
+ fft_len = 8
+ n_syms = 2
+ carr_offset = -2
+ freq_offset = 2 * numpy.pi * carr_offset / fft_len # If the sampling rate == 1
+ occupied_carriers = ((1, 2, -2, -1),)
+ pilot_carriers = ((3,),(5,))
+ pilot_symbols = ((1j,),(-1j,))
+ tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)])
+ #tx_data = (1,) * occupied_carriers[0] * n_syms
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_data))
+ offsettag = gr.gr_tag_t()
+ offsettag.offset = 0
+ offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
+ offsettag.value = pmt.from_long(carr_offset)
+ src = gr.vector_source_c(tx_data, False, 1, (tag, offsettag))
+ alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ tag_name)
+ tx_ifft = fft.fft_vcc(fft_len, False, ())
+ offset_sig = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0)
+ mixer = blocks.multiply_cc()
+ rx_fft = fft.fft_vcc(fft_len, True, (), True)
+ serializer = digital.ofdm_serializer_vcc(alloc)
+ sink = gr.vector_sink_c()
+ self.tb.connect(
+ src, alloc, tx_ifft,
+ blocks.vector_to_stream(gr.sizeof_gr_complex, fft_len),
+ (mixer, 0),
+ blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len),
+ rx_fft, serializer, sink
+ )
+ self.tb.connect(offset_sig, (mixer, 1))
+ self.tb.run ()
+ # FIXME check this
+ #self.assertEqual(sink.data(), tx_data)
+
+ def test_005_packet_len_tag (self):
+ """ Standard test """
+ fft_len = 16
+ tx_symbols = range(1, 16);
+ tx_symbols = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0,
+ 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0,
+ 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0)
+ expected_result = tuple(range(1, 16))
+ occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
+ n_syms = len(tx_symbols)/fft_len
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(n_syms)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = 0
+ tag2.key = pmt.string_to_symbol("packet_len")
+ tag2.value = pmt.from_long(len(expected_result))
+ src = gr.vector_source_c(tx_symbols, False, fft_len, (tag, tag2))
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, False)
+ sink = gr.vector_sink_c()
+ self.tb.connect(src, serializer, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(len(sink.tags()), 1)
+ result_tag = sink.tags()[0]
+ self.assertEqual(pmt.symbol_to_string(result_tag.key), "packet_len")
+ self.assertEqual(pmt.to_long(result_tag.value), len(expected_result))
+
+ def test_099 (self):
+ """ Make sure it fails if it should """
+ fft_len = 16
+ occupied_carriers = ((1, 3, 4, 11, 12, 17),)
+ tag_name = "len"
+ self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, tag_name)
+
+
+if __name__ == '__main__':
+ #gr_unittest.run(qa_ofdm_serializer_vcc, "qa_ofdm_serializer_vcc.xml")
+ gr_unittest.run(qa_ofdm_serializer_vcc)
+
diff --git a/gr-digital/python/qa_ofdm_sync_sc_cfb.py b/gr-digital/python/qa_ofdm_sync_sc_cfb.py
new file mode 100755
index 0000000000..806ef931de
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_sync_sc_cfb.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import numpy
+import random
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import analog_swig as analog
+
+try:
+ # This will work when feature #505 is added.
+ from gnuradio import digital
+ from gnuradio.digital.utils import tagged_streams
+ from gnuradio.digital.ofdm_txrx import ofdm_tx
+except ImportError:
+ # Until then this will work.
+ import digital_swig as digital
+ from utils import tagged_streams
+ from ofdm_txrx import ofdm_tx
+
+
+class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_detect (self):
+ """ Send two bursts, with zeros in between, and check
+ they are both detected at the correct position and no
+ false alarms occur """
+ n_zeros = 15
+ fft_len = 32
+ cp_len = 4
+ sig_len = (fft_len + cp_len) * 10
+ sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len/2)] * 2
+ tx_signal = [0,] * n_zeros + \
+ sync_symbol[-cp_len:] + \
+ sync_symbol + \
+ [(random.randint(0, 1)*2)-1 for x in range(sig_len)]
+ tx_signal = tx_signal * 2
+ add = blocks.add_cc()
+ sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
+ sink_freq = gr.vector_sink_f()
+ sink_detect = gr.vector_sink_b()
+ self.tb.connect(gr.vector_source_c(tx_signal), (add, 0))
+ self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .01), (add, 1))
+ self.tb.connect(add, sync)
+ self.tb.connect((sync, 0), sink_freq)
+ self.tb.connect((sync, 1), sink_detect)
+ self.tb.run()
+ sig1_detect = sink_detect.data()[0:len(tx_signal)/2]
+ sig2_detect = sink_detect.data()[len(tx_signal)/2:]
+ self.assertTrue(abs(sig1_detect.index(1) - (n_zeros + fft_len + cp_len)) < cp_len)
+ self.assertTrue(abs(sig2_detect.index(1) - (n_zeros + fft_len + cp_len)) < cp_len)
+ self.assertEqual(numpy.sum(sig1_detect), 1)
+ self.assertEqual(numpy.sum(sig2_detect), 1)
+
+
+ def test_002_freq (self):
+ """ Add a fine frequency offset and see if that get's detected properly """
+ fft_len = 32
+ cp_len = 4
+ freq_offset = 0.1 # Must stay < 2*pi/fft_len = 0.196 (otherwise, it's coarse)
+ sig_len = (fft_len + cp_len) * 10
+ sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len/2)] * 2
+ tx_signal = sync_symbol[-cp_len:] + \
+ sync_symbol + \
+ [(random.randint(0, 1)*2)-1 for x in range(sig_len)]
+ mult = blocks.multiply_cc()
+ add = blocks.add_cc()
+ sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
+ sink_freq = gr.vector_sink_f()
+ sink_detect = gr.vector_sink_b()
+ self.tb.connect(gr.vector_source_c(tx_signal), (mult, 0), (add, 0))
+ self.tb.connect(analog.sig_source_c(2 * numpy.pi, analog.GR_SIN_WAVE, freq_offset, 1.0), (mult, 1))
+ self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .01), (add, 1))
+ self.tb.connect(add, sync)
+ self.tb.connect((sync, 0), sink_freq)
+ self.tb.connect((sync, 1), sink_detect)
+ self.tb.run()
+ phi_hat = sink_freq.data()[sink_detect.data().index(1)]
+ est_freq_offset = 2 * phi_hat / fft_len
+ self.assertAlmostEqual(est_freq_offset, freq_offset, places=2)
+
+
+ def test_003_multiburst (self):
+ """ Send several bursts, see if the number of detects is correct.
+ Burst lengths and content are random.
+ """
+ n_bursts = 42
+ fft_len = 32
+ cp_len = 4
+ tx_signal = []
+ for i in xrange(n_bursts):
+ sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len/2)] * 2
+ tx_signal += [0,] * random.randint(0, 2*fft_len) + \
+ sync_symbol[-cp_len:] + \
+ sync_symbol + \
+ [(random.randint(0, 1)*2)-1 for x in range(fft_len * random.randint(5,23))]
+ add = blocks.add_cc()
+ sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
+ sink_freq = gr.vector_sink_f()
+ sink_detect = gr.vector_sink_b()
+ self.tb.connect(gr.vector_source_c(tx_signal), (add, 0))
+ self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .005), (add, 1))
+ self.tb.connect(add, sync)
+ self.tb.connect((sync, 0), sink_freq)
+ self.tb.connect((sync, 1), sink_detect)
+ self.tb.run()
+ self.assertEqual(numpy.sum(sink_detect.data()), n_bursts,
+ msg="""Because of statistics, it is possible (though unlikely)
+that the number of detected bursts differs slightly. If the number of detects is
+off by one or two, run the test again and see what happen.
+Detection error was: %d """ % (numpy.sum(sink_detect.data()) - n_bursts)
+ )
+
+ # FIXME ofdm_mod is currently not working
+ #def test_004_ofdm_packets (self):
+ #"""
+ #Send several bursts, see if the number of detects is correct.
+ #Burst lengths and content are random.
+ #"""
+ #n_bursts = 42
+ #fft_len = 64
+ #cp_len = 12
+ #tx_signal = []
+ #packets = []
+ #tagname = "length"
+ #min_packet_length = 100
+ #max_packet_length = 100
+ #sync_sequence = [random.randint(0, 1)*2-1 for x in range(fft_len/2)]
+ #for i in xrange(n_bursts):
+ #packet_length = random.randint(min_packet_length,
+ #max_packet_length+1)
+ #packet = [random.randint(0, 255) for i in range(packet_length)]
+ #packets.append(packet)
+ #data, tags = tagged_streams.packets_to_vectors(
+ #packets, tagname, vlen=1)
+ #total_length = len(data)
+
+ #src = gr.vector_source_b(data, False, 1, tags)
+ #mod = ofdm_tx(
+ #fft_len=fft_len,
+ #cp_len=cp_len,
+ #length_tag_name=tagname,
+ #occupied_carriers=(range(1, 27) + range(38, 64),),
+ #pilot_carriers=((0,),),
+ #pilot_symbols=((100,),),
+ #)
+ #rate_in = 16000
+ #rate_out = 48000
+ #ratio = float(rate_out) / rate_in
+ #throttle1 = gr.throttle(gr.sizeof_gr_complex, rate_in)
+ #insert_zeros = digital.ts_insert_zeros_cc(tagname)
+ #throttle2 = gr.throttle(gr.sizeof_gr_complex, rate_out)
+ #sink_countbursts = gr.vector_sink_c()
+ #head = gr.head(gr.sizeof_gr_complex, int(total_length * ratio*2))
+ #add = gr.add_cc()
+ #sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
+ #sink_freq = gr.vector_sink_f()
+ #sink_detect = gr.vector_sink_b()
+ #noise_level = 0.01
+ #noise = gr.noise_source_c(gr.GR_GAUSSIAN, noise_level)
+ #self.tb.connect(src, mod, gr.null_sink(gr.sizeof_gr_complex))
+ #self.tb.connect(insert_zeros, sink_countbursts)
+ #self.tb.connect(noise, (add, 1))
+ #self.tb.connect(add, sync)
+ #self.tb.connect((sync, 0), sink_freq)
+ #self.tb.connect((sync, 1), sink_detect)
+ #self.tb.run()
+ #count_data = sink_countbursts.data()
+ #count_tags = sink_countbursts.tags()
+ #burstcount = tagged_streams.count_bursts(count_data, count_tags, tagname)
+ #self.assertEqual(numpy.sum(sink_detect.data()), burstcount)
+
+
+if __name__ == '__main__':
+ #gr_unittest.run(qa_ofdm_sync_sc_cfb, "qa_ofdm_sync_sc_cfb.xml")
+ gr_unittest.run(qa_ofdm_sync_sc_cfb)
+
diff --git a/gr-digital/python/qa_ofdm_txrx.py b/gr-digital/python/qa_ofdm_txrx.py
new file mode 100755
index 0000000000..778f03f515
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_txrx.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import numpy
+from gnuradio import gr, gr_unittest
+import digital_swig
+
+class test_ofdm_txrx (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001 (self):
+ pass
+ #len_tag_key = 'frame_len'
+ #n_bytes = 100
+ #test_data = [random.randint(0, 255) for x in range(n_bytes)]
+ #tx_data, tags = tagged_streams.packets_to_vectors((test_data,), len_tag_key)
+ #src = gr.vector_source_b(test_data, False, 1, tags)
+ #tx = ofdm_tx(frame_length_tag_key=len_tag_key)
+ #rx = ofdm_rx(frame_length_tag_key=len_tag_key)
+ #self.assertEqual(tx.sync_word1, rx.sync_word1)
+ #self.assertEqual(tx.sync_word2, rx.sync_word2)
+ #delay = gr.delay(gr.sizeof_gr_complex, 100)
+ #noise = gr.noise_source_c(gr.GR_GAUSSIAN, 0.05)
+ #add = gr.add_cc()
+ #sink = gr.vector_sink_b()
+ ##self.tb.connect(src, tx, add, rx, sink)
+ ##self.tb.connect(noise, (add, 1))
+ #self.tb.connect(src, tx, gr.null_sink(gr.sizeof_gr_complex))
+ #self.tb.run()
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_ofdm_txrx, "test_ofdm_txrx.xml")
+
diff --git a/gr-digital/python/qa_packet_headergenerator_bb.py b/gr-digital/python/qa_packet_headergenerator_bb.py
new file mode 100755
index 0000000000..2e6e401566
--- /dev/null
+++ b/gr-digital/python/qa_packet_headergenerator_bb.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import digital_swig as digital
+try: import pmt
+except: from gruel import pmt
+
+class qa_packet_headergenerator_bb (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_12bits (self):
+ # 3 PDUs: | | | |
+ data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4)
+ tagname = "packet_len"
+ tag1 = gr.gr_tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = 4
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
+ tag3 = gr.gr_tag_t()
+ tag3.offset = 6
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
+ src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
+ header = digital.packet_headergenerator_bb(12, tagname)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, header, sink)
+ self.tb.run()
+ expected_data = (
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0
+ )
+ self.assertEqual(sink.data(), expected_data)
+
+
+ def test_002_32bits (self):
+ # 3 PDUs: | | | |
+ data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4)
+ tagname = "packet_len"
+ tag1 = gr.gr_tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = 4
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
+ tag3 = gr.gr_tag_t()
+ tag3.offset = 6
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
+ src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
+ header = digital.packet_headergenerator_bb(32, tagname)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, header, sink)
+ self.tb.run()
+ expected_data = (
+ # | Number of symbols | Packet number | Parity
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
+ 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+ )
+ self.assertEqual(sink.data(), expected_data)
+
+
+ def test_003_12bits_formatter_object (self):
+ # 3 PDUs: | | | |
+ data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4)
+ tagname = "packet_len"
+ tag1 = gr.gr_tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = 4
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
+ tag3 = gr.gr_tag_t()
+ tag3.offset = 6
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
+ src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
+ formatter_object = digital.packet_header_default(12, tagname)
+ header = digital.packet_headergenerator_bb(formatter_object.formatter())
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, header, sink)
+ self.tb.run()
+ expected_data = (
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0
+ )
+ self.assertEqual(sink.data(), expected_data)
+
+ def test_004_8bits_formatter_ofdm (self):
+ occupied_carriers = ((1, 2, 3, 5, 6, 7),)
+ # 3 PDUs: | | | |
+ data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4)
+ tagname = "packet_len"
+ tag1 = gr.gr_tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
+ tag2 = gr.gr_tag_t()
+ tag2.offset = 4
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
+ tag3 = gr.gr_tag_t()
+ tag3.offset = 6
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
+ src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
+ formatter_object = digital.packet_header_ofdm(occupied_carriers, 1, tagname)
+ self.assertEqual(formatter_object.header_len(), 6)
+ self.assertEqual(pmt.symbol_to_string(formatter_object.len_tag_key()), tagname)
+ header = digital.packet_headergenerator_bb(formatter_object.formatter())
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, header, sink)
+ self.tb.run()
+ expected_data = (
+ 0, 0, 1, 0, 0, 0,
+ 0, 1, 0, 0, 0, 0,
+ 0, 0, 1, 0, 0, 0
+ )
+ self.assertEqual(sink.data(), expected_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_packet_headergenerator_bb, "qa_packet_headergenerator_bb.xml")
+
diff --git a/gr-digital/python/qa_packet_headerparser_b.py b/gr-digital/python/qa_packet_headerparser_b.py
new file mode 100755
index 0000000000..aec2f96b57
--- /dev/null
+++ b/gr-digital/python/qa_packet_headerparser_b.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import time
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import blocks_swig as blocks
+import digital_swig as digital
+
+class qa_packet_headerparser_b (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_t (self):
+ expected_data = (
+ # | Number of symbols | Packet number | Parity
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
+ 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0
+ )
+ tagname = "packet_len"
+
+ src = gr.vector_source_b(expected_data)
+ parser = digital.packet_headerparser_b(32, tagname)
+ sink = blocks.message_debug()
+
+ self.tb.connect(src, parser)
+ self.tb.msg_connect(parser, "header_data", sink, "store")
+ self.tb.start ()
+ time.sleep(1)
+ self.tb.stop()
+ self.tb.wait()
+
+ self.assertEqual(sink.num_messages(), 3)
+ msg = sink.get_message(0)
+ #try:
+ #self.assertEqual(4, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol(tagname), pmt.PMT_F)))
+ #self.assertEqual(0, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol("packet_num"), pmt.PMT_F)))
+
+ #except:
+ #self.fail()
+ # msg1: length 4, number 0
+ # msg2: length 2, number 1
+ # msg3: PMT_F because parity fail
+
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_packet_headerparser_b, "qa_packet_headerparser_b.xml")
diff --git a/gr-digital/python/qa_scale_tags.py b/gr-digital/python/qa_scale_tags.py
new file mode 100755
index 0000000000..deee775579
--- /dev/null
+++ b/gr-digital/python/qa_scale_tags.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import time
+import itertools
+
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import blocks_swig as blocks
+import digital_swig as digital
+from utils import tagged_streams
+
+class qa_scale_tags (gr_unittest.TestCase):
+
+ def test_utils(self):
+ packets = ((1, 2, 3), (4, 5, 6, 7, 8), (9, 10))
+ tagname = "vector_length"
+ data, tags = tagged_streams.packets_to_vectors(packets, tagname)
+ new_packets = tagged_streams.vectors_to_packets(data, tags, tagname)
+ for np, op in zip(new_packets, packets):
+ for n, o in zip(np, op):
+ self.assertEqual(n, o)
+
+ def test_001_t (self):
+ packets = ((1, 2, 3), (4, 5, 6, 7, 8), (9, 10))
+ tagname = "packet_length"
+ data, tags = tagged_streams.packets_to_vectors(packets, tagname)
+ tb = gr.top_block()
+ src = gr.vector_source_b(data, False, 1, tags)
+ tag_scaler = digital.scale_tags(1, tagname, 2)
+ unpacker = blocks.packed_to_unpacked_bb(4, blocks.GR_MSB_FIRST)
+ snk = gr.vector_sink_b()
+ tb.connect(src, unpacker, tag_scaler, snk)
+ tb.run()
+ packets = tagged_streams.vectors_to_packets(snk.data(), snk.tags(), tagname)
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_scale_tags, "qa_scale_tags.xml")
diff --git a/gr-digital/python/qa_ts_insert_zeros.py b/gr-digital/python/qa_ts_insert_zeros.py
new file mode 100644
index 0000000000..d13a4c1c2b
--- /dev/null
+++ b/gr-digital/python/qa_ts_insert_zeros.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import time
+import itertools
+
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import blocks_swig as blocks
+import digital_swig as digital
+from utils import tagged_streams
+
+class qa_ts_insert_zeros (gr_unittest.TestCase):
+
+ def test_one(self):
+ n_packets = 10
+ packet_length = 1000
+ packets = [[i]*packet_length for i in range(1, n_packets+1)]
+ tagname = "packet_length"
+ data, tags = tagged_streams.packets_to_vectors(packets, tagname)
+ tb = gr.top_block()
+ src = gr.vector_source_c(data, False, 1, tags)
+ rate_in = 16000
+ rate_out = 48000
+ ratio = float(rate_out) / rate_in
+ throttle1 = blocks.throttle(gr.sizeof_gr_complex, rate_in)
+ insert_zeros = digital.ts_insert_zeros_cc(tagname)
+ throttle2 = blocks.throttle(gr.sizeof_gr_complex, rate_out)
+ head = gr.head(gr.sizeof_gr_complex, int(n_packets * packet_length * ratio*2))
+ snk = gr.vector_sink_c()
+ tb.connect(src, throttle1, insert_zeros, throttle2, head, snk)
+ tb.run()
+ data = snk.data()
+ state = 1
+ pos = 0
+ last_non_zero = 0
+ for i, d in enumerate(data):
+ if d != 0:
+ last_non_zero = i
+ if pos == 0:
+ if (d == state):
+ pos = pos + 1
+ elif (d != 0):
+ raise ValueError("Invalid")
+ elif pos > 0:
+ if (d != state):
+ raise ValueError("Invalid")
+ pos = pos + 1
+ if pos == packet_length:
+ state += 1
+ pos = 0
+ min_ratio = ratio-1
+ max_ratio = ratio+1
+ self.assertEqual(state-1, n_packets)
+ self.assertTrue(last_non_zero > min_ratio*packet_length*n_packets)
+ self.assertTrue(last_non_zero < max_ratio*packet_length*n_packets)
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_ts_insert_zeros, "qa_ts_insert_zeros.xml")
diff --git a/gr-digital/python/utils/tagged_streams.py b/gr-digital/python/utils/tagged_streams.py
new file mode 100644
index 0000000000..6a956aa642
--- /dev/null
+++ b/gr-digital/python/utils/tagged_streams.py
@@ -0,0 +1,112 @@
+from gnuradio import gr
+try: import pmt
+except: from gruel import pmt
+
+def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
+ tags = []
+ assert(len(offsets) == len(lengths))
+ for offset, length in zip(offsets, lengths):
+ tag = gr.gr_tag_t()
+ tag.offset = offset/vlen
+ tag.key = pmt.string_to_symbol(tagname)
+ tag.value = pmt.from_long(length/vlen)
+ tags.append(tag)
+ return tags
+
+def string_to_vector(string):
+ v = []
+ for s in string:
+ v.append(ord(s))
+ return v
+
+def strings_to_vectors(strings, lengthtagname):
+ vs = [string_to_vector(string) for string in strings]
+ return packets_to_vectors(vs, lengthtagname)
+
+def vector_to_string(v):
+ s = []
+ for d in v:
+ s.append(chr(d))
+ return ''.join(s)
+
+def vectors_to_strings(data, tags, lengthtagname):
+ packets = vectors_to_packets(data, tags, lengthtagname)
+ return [vector_to_string(packet) for packet in packets]
+
+def count_bursts(data, tags, lengthtagname, vlen=1):
+ lengthtags = [t for t in tags
+ if pmt.symbol_to_string(t.key) == lengthtagname]
+ lengths = {}
+ for tag in lengthtags:
+ if tag.offset in lengths:
+ raise ValueError(
+ "More than one tags with key {0} with the same offset={1}."
+ .format(lengthtagname, tag.offset))
+ lengths[tag.offset] = pmt.to_long(tag.value)*vlen
+ in_burst = False
+ in_packet = False
+ packet_length = None
+ packet_pos = None
+ burst_count = 0
+ for pos in range(len(data)):
+ if pos in lengths:
+ if in_packet:
+ print("Got tag at pos {0} current packet_pos is {1}".format(pos, packet_pos))
+ raise StandardError("Received packet tag while in packet.")
+ packet_pos = -1
+ packet_length = lengths[pos]
+ in_packet = True
+ if not in_burst:
+ burst_count += 1
+ in_burst = True
+ elif not in_packet:
+ in_burst = False
+ if in_packet:
+ packet_pos += 1
+ if packet_pos == packet_length-1:
+ in_packet = False
+ packet_pos = None
+ return burst_count
+
+def vectors_to_packets(data, tags, lengthtagname, vlen=1):
+ lengthtags = [t for t in tags
+ if pmt.symbol_to_string(t.key) == lengthtagname]
+ lengths = {}
+ for tag in lengthtags:
+ if tag.offset in lengths:
+ raise ValueError(
+ "More than one tags with key {0} with the same offset={1}."
+ .format(lengthtagname, tag.offset))
+ lengths[tag.offset] = pmt.to_long(tag.value)*vlen
+ if 0 not in lengths:
+ raise ValueError("There is no tag with key {0} and an offset of 0"
+ .format(lengthtagname))
+ pos = 0
+ packets = []
+ while pos < len(data):
+ if pos not in lengths:
+ raise ValueError("There is no tag with key {0} and an offset of {1}."
+ "We were expecting one."
+ .format(lengthtagname, pos))
+ length = lengths[pos]
+ if length == 0:
+ raise ValueError("Packets cannot have zero length.")
+ if pos+length > len(data):
+ raise ValueError("The final packet is incomplete.")
+ packets.append(data[pos: pos+length])
+ pos += length
+ return packets
+
+def packets_to_vectors(packets, lengthtagname, vlen=1):
+ tags = []
+ data = []
+ offset = 0
+ for packet in packets:
+ data.extend(packet)
+ tag = gr.gr_tag_t()
+ tag.offset = offset/vlen
+ tag.key = pmt.string_to_symbol(lengthtagname)
+ tag.value = pmt.from_long(len(packet)/vlen)
+ tags.append(tag)
+ offset = offset + len(packet)
+ return data, tags