summaryrefslogtreecommitdiff
path: root/gr-digital/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python')
-rw-r--r--gr-digital/python/CMakeLists.txt12
-rw-r--r--gr-digital/python/__init__.py4
-rw-r--r--gr-digital/python/bpsk.py149
-rw-r--r--gr-digital/python/cpm.py83
-rw-r--r--gr-digital/python/crc.py6
-rw-r--r--gr-digital/python/digital_voice.py.real103
-rw-r--r--gr-digital/python/generic_mod_demod.py217
-rw-r--r--gr-digital/python/gfsk.py74
-rw-r--r--gr-digital/python/gmsk.py108
-rw-r--r--gr-digital/python/modulation_utils.py9
-rw-r--r--gr-digital/python/ofdm.py61
-rw-r--r--gr-digital/python/ofdm_packet_utils.py34
-rw-r--r--gr-digital/python/ofdm_receiver.py69
-rw-r--r--gr-digital/python/ofdm_sync_fixed.py3
-rw-r--r--gr-digital/python/ofdm_sync_ml.py78
-rw-r--r--gr-digital/python/ofdm_sync_pn.py58
-rw-r--r--gr-digital/python/ofdm_sync_pnac.py52
-rw-r--r--gr-digital/python/packet_utils.py28
-rw-r--r--gr-digital/python/pkt.py43
-rw-r--r--gr-digital/python/psk.py76
-rwxr-xr-xgr-digital/python/qa_binary_slicer_fb.py30
-rwxr-xr-xgr-digital/python/qa_chunks_to_symbols.py6
-rwxr-xr-xgr-digital/python/qa_clock_recovery_mm.py50
-rwxr-xr-xgr-digital/python/qa_cma_equalizer.py8
-rwxr-xr-xgr-digital/python/qa_constellation.py59
-rwxr-xr-xgr-digital/python/qa_constellation_decoder_cb.py52
-rwxr-xr-xgr-digital/python/qa_constellation_receiver.py66
-rwxr-xr-xgr-digital/python/qa_correlate_access_code.py37
-rwxr-xr-xgr-digital/python/qa_costas_loop_cc.py39
-rwxr-xr-xgr-digital/python/qa_cpm.py28
-rwxr-xr-xgr-digital/python/qa_crc32.py26
-rwxr-xr-xgr-digital/python/qa_crc32_bb.py47
-rwxr-xr-xgr-digital/python/qa_diff_encoder.py8
-rwxr-xr-xgr-digital/python/qa_diff_phasor_cc.py26
-rwxr-xr-xgr-digital/python/qa_digital.py8
-rwxr-xr-xgr-digital/python/qa_fll_band_edge.py27
-rwxr-xr-xgr-digital/python/qa_framer_sink.py8
-rwxr-xr-xgr-digital/python/qa_glfsr_source.py14
-rwxr-xr-xgr-digital/python/qa_header_payload_demux.py6
-rwxr-xr-xgr-digital/python/qa_lfsr.py (renamed from gr-digital/python/qa_bytes_to_syms.py)38
-rwxr-xr-xgr-digital/python/qa_lms_equalizer.py10
-rwxr-xr-xgr-digital/python/qa_map.py10
-rwxr-xr-xgr-digital/python/qa_mpsk_receiver.py105
-rwxr-xr-xgr-digital/python/qa_mpsk_snr_est.py63
-rwxr-xr-xgr-digital/python/qa_ofdm_carrier_allocator_cvc.py36
-rwxr-xr-xgr-digital/python/qa_ofdm_chanest_vcvc.py84
-rwxr-xr-xgr-digital/python/qa_ofdm_cyclic_prefixer.py8
-rwxr-xr-xgr-digital/python/qa_ofdm_frame_equalizer_vcvc.py44
-rwxr-xr-xgr-digital/python/qa_ofdm_insert_preamble.py21
-rwxr-xr-xgr-digital/python/qa_ofdm_serializer_vcc.py54
-rwxr-xr-xgr-digital/python/qa_ofdm_sync_sc_cfb.py20
-rwxr-xr-xgr-digital/python/qa_packet_headergenerator_bb.py50
-rwxr-xr-xgr-digital/python/qa_packet_headerparser_b.py3
-rwxr-xr-xgr-digital/python/qa_pfb_clock_sync.py43
-rwxr-xr-xgr-digital/python/qa_pn_correlator_cc.py5
-rwxr-xr-xgr-digital/python/qa_probe_density.py24
-rwxr-xr-xgr-digital/python/qa_scrambler.py2
-rwxr-xr-xgr-digital/python/qa_simple_correlator.py14
-rwxr-xr-xgr-digital/python/qa_simple_framer.py27
-rw-r--r--gr-digital/python/qam.py40
-rw-r--r--gr-digital/python/qpsk.py189
-rw-r--r--gr-digital/python/utils/mod_codes.py1
-rw-r--r--gr-digital/python/utils/tagged_streams.py16
63 files changed, 1496 insertions, 1223 deletions
diff --git a/gr-digital/python/CMakeLists.txt b/gr-digital/python/CMakeLists.txt
index f820736c2f..7846345970 100644
--- a/gr-digital/python/CMakeLists.txt
+++ b/gr-digital/python/CMakeLists.txt
@@ -69,14 +69,16 @@ if(ENABLE_TESTING)
list(APPEND GR_TEST_PYTHON_DIRS
${CMAKE_BINARY_DIR}/gr-digital/python
${CMAKE_BINARY_DIR}/gr-digital/swig
- ${CMAKE_BINARY_DIR}/gr-fft/python
- ${CMAKE_BINARY_DIR}/gr-fft/swig
- ${CMAKE_BINARY_DIR}/gr-blocks/python
- ${CMAKE_BINARY_DIR}/gr-blocks/swig
+ ${CMAKE_BINARY_DIR}/gr-filter/python
+ ${CMAKE_BINARY_DIR}/gr-filter/swig
${CMAKE_BINARY_DIR}/gr-analog/python
${CMAKE_BINARY_DIR}/gr-analog/swig
+ ${CMAKE_BINARY_DIR}/gr-blocks/python
+ ${CMAKE_BINARY_DIR}/gr-blocks/swig
+ ${CMAKE_BINARY_DIR}/gr-fft/python
+ ${CMAKE_BINARY_DIR}/gr-fft/swig
)
-list(APPEND GR_TEST_TARGET_DEPS gnuradio-digital gnuradio-filter gnuradio-fft)
+list(APPEND GR_TEST_TARGET_DEPS gnuradio-digital gnuradio-filter gnuradio-fft gnuradio-analog gnuradio-blocks)
include(GrTest)
file(GLOB py_qa_test_files "qa_*.py")
diff --git a/gr-digital/python/__init__.py b/gr-digital/python/__init__.py
index 1c7958f178..6bbe8160e8 100644
--- a/gr-digital/python/__init__.py
+++ b/gr-digital/python/__init__.py
@@ -19,8 +19,7 @@
#
'''
-This is the gr-digital package. It contains all of the blocks,
-utilities, and examples for doing digital modulation and demodulation.
+Blocks and utilities for digital modulation and demodulation.
'''
# The presence of this file turns this directory into a Python package
@@ -28,6 +27,7 @@ utilities, and examples for doing digital modulation and demodulation.
from digital_swig import *
from psk import *
from qam import *
+from qamlike import *
from bpsk import *
from qpsk import *
from gmsk import *
diff --git a/gr-digital/python/bpsk.py b/gr-digital/python/bpsk.py
index 0d8f05c4c1..57cf2534f4 100644
--- a/gr-digital/python/bpsk.py
+++ b/gr-digital/python/bpsk.py
@@ -28,21 +28,15 @@ from cmath import exp
from gnuradio import gr
from gnuradio.digital.generic_mod_demod import generic_mod, generic_demod
+from gnuradio.digital.generic_mod_demod import shared_mod_args, shared_demod_args
import digital_swig
import modulation_utils
-# Default number of points in constellation.
-_def_constellation_points = 2
-# Whether differential coding is used.
-_def_differential = False
-
# /////////////////////////////////////////////////////////////////////////////
# BPSK constellation
# /////////////////////////////////////////////////////////////////////////////
-def bpsk_constellation(m=_def_constellation_points):
- if m != _def_constellation_points:
- raise ValueError("BPSK can only have 2 constellation points.")
+def bpsk_constellation():
return digital_swig.constellation_bpsk()
# /////////////////////////////////////////////////////////////////////////////
@@ -50,115 +44,102 @@ def bpsk_constellation(m=_def_constellation_points):
# /////////////////////////////////////////////////////////////////////////////
class bpsk_mod(generic_mod):
+ """
+ Hierarchical block for RRC-filtered BPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+
+ Args:
+ mod_code: Argument is not used. It exists purely to simplify generation of the block in grc.
+ differential: Whether to use differential encoding (boolean).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_mod_args
+
+ def __init__(self, mod_code=None, differential=False, *args, **kwargs):
- def __init__(self, constellation_points=_def_constellation_points,
- differential=False, *args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered BPSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_mod block for list of parameters.
- """
-
- constellation_points = _def_constellation_points
constellation = digital_swig.constellation_bpsk()
- if constellation_points != 2:
- raise ValueError('Number of constellation points must be 2 for BPSK.')
super(bpsk_mod, self).__init__(constellation=constellation,
differential=differential, *args, **kwargs)
-
+
+
# /////////////////////////////////////////////////////////////////////////////
# BPSK demodulator
#
# /////////////////////////////////////////////////////////////////////////////
class bpsk_demod(generic_demod):
-
- def __init__(self, constellation_points=_def_constellation_points,
- differential=False, *args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered BPSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_demod block for list of parameters.
- """
-
- constellation_points = _def_constellation_points
+ """
+ Hierarchical block for RRC-filtered BPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ mod_code: Argument is not used. It exists purely to simplify generation of the block in grc.
+ differential: whether to use differential encoding (boolean)
+ """
+ # See generic_demod for additional arguments
+ __doc__ += shared_demod_args
+
+ def __init__(self, mod_code=None, differential=False, *args, **kwargs):
constellation = digital_swig.constellation_bpsk()
- if constellation_points != 2:
- raise ValueError('Number of constellation points must be 2 for BPSK.')
super(bpsk_demod, self).__init__(constellation=constellation,
differential=differential, *args, **kwargs)
-
+#bpsk_demod.__doc__ += shared_demod_args
# /////////////////////////////////////////////////////////////////////////////
# DBPSK constellation
# /////////////////////////////////////////////////////////////////////////////
-def dbpsk_constellation(m=_def_constellation_points):
- if m != _def_constellation_points:
- raise ValueError("DBPSK can only have 2 constellation points.")
+def dbpsk_constellation():
return digital_swig.constellation_dbpsk()
# /////////////////////////////////////////////////////////////////////////////
# DBPSK modulator
# /////////////////////////////////////////////////////////////////////////////
-class dbpsk_mod(generic_mod):
-
- def __init__(self, constellation_points=_def_constellation_points,
- differential=True, *args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered DBPSK modulation.
+class dbpsk_mod(bpsk_mod):
+ """
+ Hierarchical block for RRC-filtered DBPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ mod_code: Argument is not used. It exists purely to simplify generation of the block in grc.
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_mod_args
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
+ def __init__(self, mod_code=None, *args, **kwargs):
- See generic_mod block for list of parameters.
- """
-
- constellation_points = _def_constellation_points
- constellation = digital_swig.constellation_bpsk()
- if constellation_points != 2:
- raise ValueError('Number of constellation points must be 2 for DBPSK.')
- super(dbpsk_mod, self).__init__(constellation=constellation,
- differential=True,
- *args, **kwargs)
+ super(dbpsk_mod, self).__init__(*args, **kwargs)
# /////////////////////////////////////////////////////////////////////////////
# DBPSK demodulator
#
# /////////////////////////////////////////////////////////////////////////////
-class dbpsk_demod(generic_demod):
-
- def __init__(self, constellation_points=_def_constellation_points,
- differential=True, *args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered DBPSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_demod block for list of parameters.
- """
-
- constellation_points = _def_constellation_points
- constellation = digital_swig.constellation_bpsk()
- if constellation_points != 2:
- raise ValueError('Number of constellation points must be 2 for DBPSK.')
- super(dbpsk_demod, self).__init__(constellation=constellation,
- differential=True,
- *args, **kwargs)
+class dbpsk_demod(bpsk_demod):
+ """
+ Hierarchical block for RRC-filtered DBPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ mod_code: Argument is not used. It exists purely to simplify generation of the block in grc.
+ """
+ # See generic_demod for additional arguments
+ __doc__ += shared_demod_args
+
+ def __init__(self, mod_code=None, *args, **kwargs):
+
+ super(dbpsk_demod, self).__init__(*args, **kwargs)
#
# Add these to the mod/demod registry
diff --git a/gr-digital/python/cpm.py b/gr-digital/python/cpm.py
index 05032336d4..b27fb098f5 100644
--- a/gr-digital/python/cpm.py
+++ b/gr-digital/python/cpm.py
@@ -24,7 +24,9 @@
# See gnuradio-examples/python/digital for examples
-from gnuradio import gr, blks2
+from gnuradio import gr, filter
+from gnuradio import analog
+from gnuradio import blocks
from math import pi
import numpy
@@ -49,6 +51,31 @@ _def_log = False
# /////////////////////////////////////////////////////////////////////////////
class cpm_mod(gr.hier_block2):
+ """
+ Hierarchical block for Continuous Phase modulation.
+
+ The input is a byte stream (unsigned char) representing packed
+ bits and the output is the complex modulated signal at baseband.
+
+ See Proakis for definition of generic CPM signals:
+ s(t)=exp(j phi(t))
+ phi(t)= 2 pi h int_0^t f(t') dt'
+ f(t)=sum_k a_k g(t-kT)
+ (normalizing assumption: int_0^infty g(t) dt = 1/2)
+
+ Args:
+ samples_per_symbol: samples per baud >= 2 (integer)
+ bits_per_symbol: bits per symbol (integer)
+ h_numerator: numerator of modulation index (integer)
+ h_denominator: denominator of modulation index (numerator and denominator must be relative primes) (integer)
+ cpm_type: supported types are: 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL (integer)
+ bt: bandwidth symbol time product for GMSK (float)
+ symbols_per_pulse: shaping pulse duration in symbols (integer)
+ generic_taps: define a generic CPM pulse shape (sum = samples_per_symbol/2) (list/array of floats)
+ verbose: Print information about modulator? (boolean)
+ debug: Print modulation data to files? (boolean)
+ """
+
def __init__(self,
samples_per_symbol=_def_samples_per_symbol,
bits_per_symbol=_def_bits_per_symbol,
@@ -60,42 +87,6 @@ class cpm_mod(gr.hier_block2):
generic_taps=_def_generic_taps,
verbose=_def_verbose,
log=_def_log):
- """
- Hierarchical block for Continuous Phase
- modulation.
-
- The input is a byte stream (unsigned char)
- representing packed bits and the
- output is the complex modulated signal at baseband.
-
- See Proakis for definition of generic CPM signals:
- s(t)=exp(j phi(t))
- phi(t)= 2 pi h int_0^t f(t') dt'
- f(t)=sum_k a_k g(t-kT)
- (normalizing assumption: int_0^infty g(t) dt = 1/2)
-
- @param samples_per_symbol: samples per baud >= 2
- @type samples_per_symbol: integer
- @param bits_per_symbol: bits per symbol
- @type bits_per_symbol: integer
- @param h_numerator: numerator of modulation index
- @type h_numerator: integer
- @param h_denominator: denominator of modulation index (numerator and denominator must be relative primes)
- @type h_denominator: integer
- @param cpm_type: supported types are: 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL
- @type cpm_type: integer
- @param bt: bandwidth symbol time product for GMSK
- @type bt: float
- @param symbols_per_pulse: shaping pulse duration in symbols
- @type symbols_per_pulse: integer
- @param generic_taps: define a generic CPM pulse shape (sum = samples_per_symbol/2)
- @type generic_taps: array of floats
-
- @param verbose: Print information about modulator?
- @type verbose: bool
- @param debug: Print modulation data to files?
- @type debug: bool
- """
gr.hier_block2.__init__(self, "cpm_mod",
gr.io_signature(1, 1, gr.sizeof_char), # Input signature
@@ -127,17 +118,17 @@ class cpm_mod(gr.hier_block2):
sensitivity = 2 * pi * h_numerator / h_denominator / samples_per_symbol
# Unpack Bytes into bits_per_symbol groups
- self.B2s = gr.packed_to_unpacked_bb(bits_per_symbol,gr.GR_MSB_FIRST)
+ self.B2s = blocks.packed_to_unpacked_bb(bits_per_symbol,gr.GR_MSB_FIRST)
# Turn it into symmetric PAM data.
- self.pam = gr.chunks_to_symbols_bf(self.sym_alphabet,1)
+ self.pam = digital_swig.chunks_to_symbols_bf(self.sym_alphabet,1)
# Generate pulse (sum of taps = samples_per_symbol/2)
if cpm_type == 0: # CPFSK
self.taps= (1.0/self._symbols_per_pulse/2,) * self.ntaps
elif cpm_type == 1: # GMSK
- gaussian_taps = gr.firdes.gaussian(
+ gaussian_taps = filter.firdes.gaussian(
1.0/2, # gain
samples_per_symbol, # symbol_rate
bt, # bandwidth * symbol time
@@ -153,10 +144,10 @@ class cpm_mod(gr.hier_block2):
else:
raise TypeError, ("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
- self.filter = blks2.pfb_arb_resampler_fff(samples_per_symbol, self.taps)
+ self.filter = filter.pfb.arb_resampler_fff(samples_per_symbol, self.taps)
# FM modulation
- self.fmmod = gr.frequency_modulator_fc(sensitivity)
+ self.fmmod = analog.frequency_modulator_fc(sensitivity)
if verbose:
self._print_verbage()
@@ -205,13 +196,13 @@ class cpm_mod(gr.hier_block2):
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.B2s,
- gr.file_sink(gr.sizeof_float, "symbols.dat"))
+ blocks.file_sink(gr.sizeof_float, "symbols.dat"))
self.connect(self.pam,
- gr.file_sink(gr.sizeof_float, "pam.dat"))
+ blocks.file_sink(gr.sizeof_float, "pam.dat"))
self.connect(self.filter,
- gr.file_sink(gr.sizeof_float, "filter.dat"))
+ blocks.file_sink(gr.sizeof_float, "filter.dat"))
self.connect(self.fmmod,
- gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat"))
+ blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat"))
def add_options(parser):
diff --git a/gr-digital/python/crc.py b/gr-digital/python/crc.py
index 198ab059f5..e228faaa98 100644
--- a/gr-digital/python/crc.py
+++ b/gr-digital/python/crc.py
@@ -20,11 +20,11 @@
#
from gnuradio import gru
-import digital_swig
+import digital_swig as digital
import struct
def gen_and_append_crc32(s):
- crc = digital_swig.crc32(s)
+ crc = digital.crc32(s)
return s + struct.pack(">I", gru.hexint(crc) & 0xFFFFFFFF)
def check_crc32(s):
@@ -32,7 +32,7 @@ def check_crc32(s):
return (False, '')
msg = s[:-4]
#print "msg = '%s'" % (msg,)
- actual = digital_swig.crc32(msg)
+ actual = digital.crc32(msg)
(expected,) = struct.unpack(">I", s[-4:])
# print "actual =", hex(actual), "expected =", hex(expected)
return (actual == expected, msg)
diff --git a/gr-digital/python/digital_voice.py.real b/gr-digital/python/digital_voice.py.real
new file mode 100644
index 0000000000..4a2ef7721f
--- /dev/null
+++ b/gr-digital/python/digital_voice.py.real
@@ -0,0 +1,103 @@
+#!/usr/bin/env python
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+"""
+Digital voice Tx and Rx using GSM 13kbit vocoder and GMSK.
+
+Runs channel at 32kbit/sec. Currently uses fake channel coding,
+but there's room for a rate 1/2 coder.
+"""
+
+from gnuradio import gr, gru
+from gnuradio import blocks
+from gnuradio.blksimpl.gmsk import gmsk_mod, gmsk_demod
+
+from gnuradio.vocoder import gsm_full_rate
+
+# Size of gsm full rate speech encoder output packet in bytes
+
+GSM_FRAME_SIZE = 33
+
+# Size of packet in bytes that we send to GMSK modulator:
+#
+# Target: 256kS/sec air rate.
+#
+# 256kS 1 sym 1 bit 1 byte 0.020 sec 80 bytes
+# ---- * ----- * ----- * ------ * --------- = --------
+# sec 8 S 1 sym 8 bits frame frame
+#
+# gr_simple_framer add 10 bytes of overhead.
+
+AIR_FRAME_SIZE = 70
+
+
+class digital_voice_tx(gr.hier_block):
+ """
+ Hierarchical block for digital voice tranmission.
+
+ The input is 8kS/sec floating point audio in the range [-1,+1]
+ The output is 256kS/sec GMSK modulated complex baseband signal in the range [-1,+1].
+ """
+ def __init__(self, fg):
+ samples_per_symbol = 8
+ symbol_rate = 32000
+ bt = 0.3 # Gaussian filter bandwidth * symbol time
+
+ src_scale = blocks.multiply_const_ff(32767)
+ f2s = blocks.float_to_short()
+ voice_coder = gsm_full_rate.encode_sp()
+
+ channel_coder = gr.fake_channel_encoder_pp(GSM_FRAME_SIZE, AIR_FRAME_SIZE)
+ p2s = gr.parallel_to_serial(gr.sizeof_char, AIR_FRAME_SIZE)
+
+ mod = gmsk_mod(fg, sps=samples_per_symbol,
+ symbol_rate=symbol_rate, bt=bt,
+ p_size=AIR_FRAME_SIZE)
+
+ fg.connect(src_scale, f2s, voice_coder, channel_coder, p2s, mod)
+ gr.hier_block.__init__(self, fg, src_scale, mod)
+
+
+class digital_voice_rx(gr.hier_block):
+ """
+ Hierarchical block for digital voice reception.
+
+ The input is 256kS/sec GMSK modulated complex baseband signal.
+ The output is 8kS/sec floating point audio in the range [-1,+1]
+ """
+ def __init__(self, fg):
+ samples_per_symbol = 8
+ symbol_rate = 32000
+
+ demod = gmsk_demod(fg, sps=samples_per_symbol,
+ symbol_rate=symbol_rate,
+ p_size=AIR_FRAME_SIZE)
+
+ s2p = gr.serial_to_parallel(gr.sizeof_char, AIR_FRAME_SIZE)
+ channel_decoder = gr.fake_channel_decoder_pp(AIR_FRAME_SIZE, GSM_FRAME_SIZE)
+
+ voice_decoder = gsm_full_rate.decode_ps()
+ s2f = blocks.short_to_float ()
+ sink_scale = blocks.multiply_const_ff(1.0/32767.)
+
+ fg.connect(demod, s2p, channel_decoder, voice_decoder, s2f, sink_scale)
+ gr.hier_block.__init__(self, fg, demod, sink_scale)
diff --git a/gr-digital/python/generic_mod_demod.py b/gr-digital/python/generic_mod_demod.py
index a6c4f3445a..b812fe1c37 100644
--- a/gr-digital/python/generic_mod_demod.py
+++ b/gr-digital/python/generic_mod_demod.py
@@ -31,6 +31,21 @@ from utils import mod_codes
import digital_swig as digital
import math
+try:
+ from gnuradio import blocks
+except ImportError:
+ import blocks_swig as blocks
+
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
+try:
+ from gnuradio import analog
+except ImportError:
+ import analog_swig as analog
+
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 2
_def_excess_bw = 0.35
@@ -74,42 +89,40 @@ def add_common_options(parser):
# /////////////////////////////////////////////////////////////////////////////
class generic_mod(gr.hier_block2):
+ """
+ Hierarchical block for RRC-filtered differential generic modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ constellation: determines the modulation type (gnuradio.digital.digital_constellation)
+ samples_per_symbol: samples per baud >= 2 (float)
+ differential: whether to use differential encoding (boolean)
+ pre_diff_code: whether to use apply a pre-differential mapping (boolean)
+ excess_bw: Root-raised cosine filter excess bandwidth (float)
+ verbose: Print information about modulator? (boolean)
+ log: Log modulation data to files? (boolean)
+ """
def __init__(self, constellation,
- samples_per_symbol=_def_samples_per_symbol,
differential=_def_differential,
+ samples_per_symbol=_def_samples_per_symbol,
+ pre_diff_code=True,
excess_bw=_def_excess_bw,
- gray_coded=True,
verbose=_def_verbose,
log=_def_log):
- """
- Hierarchical block for RRC-filtered differential generic modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- @param constellation: determines the modulation type
- @type constellation: gnuradio.digital.gr_constellation
- @param samples_per_symbol: samples per baud >= 2
- @type samples_per_symbol: float
- @param excess_bw: Root-raised cosine filter excess bandwidth
- @type excess_bw: float
- @param gray_coded: turn gray coding on/off
- @type gray_coded: bool
- @param verbose: Print information about modulator?
- @type verbose: bool
- @param log: Log modulation data to files?
- @type log: bool
- """
gr.hier_block2.__init__(self, "generic_mod",
gr.io_signature(1, 1, gr.sizeof_char), # Input signature
gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
- self._constellation = constellation.base()
+ self._constellation = constellation
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
self._differential = differential
+ # Only apply a predifferential coding if the constellation also supports it.
+ self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code()
if self._samples_per_symbol < 2:
raise TypeError, ("sbp must be >= 2, is %f" % self._samples_per_symbol)
@@ -118,9 +131,9 @@ class generic_mod(gr.hier_block2):
# turn bytes into k-bit vectors
self.bytes2chunks = \
- gr.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST)
+ blocks.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST)
- if gray_coded == True:
+ if self.pre_diff_code:
self.symbol_mapper = digital.map_bb(self._constellation.pre_diff_code())
if differential:
@@ -131,23 +144,23 @@ class generic_mod(gr.hier_block2):
# pulse shaping filter
nfilts = 32
ntaps = nfilts * 11 * int(self._samples_per_symbol) # make nfilts filters of ntaps each
- self.rrc_taps = gr.firdes.root_raised_cosine(
+ self.rrc_taps = filter.firdes.root_raised_cosine(
nfilts, # gain
nfilts, # sampling rate based on 32 filters in resampler
1.0, # symbol rate
self._excess_bw, # excess bandwidth (roll-off factor)
ntaps)
- self.rrc_filter = gr.pfb_arb_resampler_ccf(self._samples_per_symbol,
- self.rrc_taps)
+ self.rrc_filter = filter.pfb_arb_resampler_ccf(self._samples_per_symbol,
+ self.rrc_taps)
# Connect
- blocks = [self, self.bytes2chunks]
- if gray_coded == True:
- blocks.append(self.symbol_mapper)
+ self._blocks = [self, self.bytes2chunks]
+ if self.pre_diff_code:
+ self._blocks.append(self.symbol_mapper)
if differential:
- blocks.append(self.diffenc)
- blocks += [self.chunks2symbols, self.rrc_filter, self]
- self.connect(*blocks)
+ self._blocks.append(self.diffenc)
+ self._blocks += [self.chunks2symbols, self.rrc_filter, self]
+ self.connect(*self._blocks)
if verbose:
self._print_verbage()
@@ -185,17 +198,17 @@ class generic_mod(gr.hier_block2):
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.bytes2chunks,
- gr.file_sink(gr.sizeof_char, "tx_bytes2chunks.8b"))
- if self._constellation.apply_pre_diff_code():
+ blocks.file_sink(gr.sizeof_char, "tx_bytes2chunks.8b"))
+ if self.pre_diff_code:
self.connect(self.symbol_mapper,
- gr.file_sink(gr.sizeof_char, "tx_symbol_mapper.8b"))
+ blocks.file_sink(gr.sizeof_char, "tx_symbol_mapper.8b"))
if self._differential:
self.connect(self.diffenc,
- gr.file_sink(gr.sizeof_char, "tx_diffenc.8b"))
+ blocks.file_sink(gr.sizeof_char, "tx_diffenc.8b"))
self.connect(self.chunks2symbols,
- gr.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.32fc"))
+ blocks.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.32fc"))
self.connect(self.rrc_filter,
- gr.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.32fc"))
+ blocks.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.32fc"))
# /////////////////////////////////////////////////////////////////////////////
@@ -206,48 +219,41 @@ class generic_mod(gr.hier_block2):
# /////////////////////////////////////////////////////////////////////////////
class generic_demod(gr.hier_block2):
+ """
+ Hierarchical block for RRC-filtered differential generic demodulation.
+
+ The input is the complex modulated signal at baseband.
+ The output is a stream of bits packed 1 bit per byte (LSB)
+
+ Args:
+ constellation: determines the modulation type (gnuradio.digital.digital_constellation)
+ samples_per_symbol: samples per baud >= 2 (float)
+ differential: whether to use differential encoding (boolean)
+ pre_diff_code: whether to use apply a pre-differential mapping (boolean)
+ excess_bw: Root-raised cosine filter excess bandwidth (float)
+ freq_bw: loop filter lock-in bandwidth (float)
+ timing_bw: timing recovery loop lock-in bandwidth (float)
+ phase_bw: phase recovery loop bandwidth (float)
+ verbose: Print information about modulator? (boolean)
+ log: Log modulation data to files? (boolean)
+ """
def __init__(self, constellation,
- samples_per_symbol=_def_samples_per_symbol,
differential=_def_differential,
+ samples_per_symbol=_def_samples_per_symbol,
+ pre_diff_code=True,
excess_bw=_def_excess_bw,
- gray_coded=True,
freq_bw=_def_freq_bw,
timing_bw=_def_timing_bw,
phase_bw=_def_phase_bw,
verbose=_def_verbose,
log=_def_log):
- """
- Hierarchical block for RRC-filtered differential generic demodulation.
-
- The input is the complex modulated signal at baseband.
- The output is a stream of bits packed 1 bit per byte (LSB)
-
- @param constellation: determines the modulation type
- @type constellation: gnuradio.digital.gr_constellation
- @param samples_per_symbol: samples per symbol >= 2
- @type samples_per_symbol: float
- @param excess_bw: Root-raised cosine filter excess bandwidth
- @type excess_bw: float
- @param gray_coded: turn gray coding on/off
- @type gray_coded: bool
- @param freq_bw: loop filter lock-in bandwidth
- @type freq_bw: float
- @param timing_bw: timing recovery loop lock-in bandwidth
- @type timing_bw: float
- @param phase_bw: phase recovery loop bandwidth
- @type phase_bw: float
- @param verbose: Print information about modulator?
- @type verbose: bool
- @param debug: Print modualtion data to files?
- @type debug: bool
- """
gr.hier_block2.__init__(self, "generic_demod",
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
- self._constellation = constellation.base()
+ self._constellation = constellation
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
self._phase_bw = phase_bw
@@ -259,13 +265,16 @@ class generic_demod(gr.hier_block2):
if self._samples_per_symbol < 2:
raise TypeError, ("sbp must be >= 2, is %d" % self._samples_per_symbol)
+ # Only apply a predifferential coding if the constellation also supports it.
+ self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code()
+
arity = pow(2,self.bits_per_symbol())
nfilts = 32
ntaps = 11 * int(self._samples_per_symbol*nfilts)
# Automatic gain control
- self.agc = gr.agc2_cc(0.6e-1, 1e-3, 1, 1, 100)
+ self.agc = analog.agc2_cc(0.6e-1, 1e-3, 1, 1, 100)
# Frequency correction
fll_ntaps = 55
@@ -273,8 +282,8 @@ class generic_demod(gr.hier_block2):
fll_ntaps, self._freq_bw)
# symbol timing recovery with RRC data filter
- taps = gr.firdes.root_raised_cosine(nfilts, nfilts*self._samples_per_symbol,
- 1.0, self._excess_bw, ntaps)
+ taps = filter.firdes.root_raised_cosine(nfilts, nfilts*self._samples_per_symbol,
+ 1.0, self._excess_bw, ntaps)
self.time_recov = digital.pfb_clock_sync_ccf(self._samples_per_symbol,
self._timing_bw, taps,
nfilts, nfilts//2, self._timing_max_dev)
@@ -282,19 +291,19 @@ class generic_demod(gr.hier_block2):
fmin = -0.25
fmax = 0.25
self.receiver = digital.constellation_receiver_cb(
- self._constellation, self._phase_bw,
+ self._constellation.base(), self._phase_bw,
fmin, fmax)
# Do differential decoding based on phase change of symbols
if differential:
self.diffdec = digital.diff_decoder_bb(arity)
- if gray_coded:
+ if self.pre_diff_code:
self.symbol_mapper = digital.map_bb(
mod_codes.invert_code(self._constellation.pre_diff_code()))
# unpack the k bit vector into a stream of bits
- self.unpack = gr.unpack_k_bits_bb(self.bits_per_symbol())
+ self.unpack = blocks.unpack_k_bits_bb(self.bits_per_symbol())
if verbose:
self._print_verbage()
@@ -303,14 +312,14 @@ class generic_demod(gr.hier_block2):
self._setup_logging()
# Connect and Initialize base class
- blocks = [self, self.agc, self.freq_recov,
- self.time_recov, self.receiver]
+ self._blocks = [self, self.agc, self.freq_recov,
+ self.time_recov, self.receiver]
if differential:
- blocks.append(self.diffdec)
- if self._constellation.apply_pre_diff_code():
- blocks.append(self.symbol_mapper)
- blocks += [self.unpack, self]
- self.connect(*blocks)
+ self._blocks.append(self.diffdec)
+ if self.pre_diff_code:
+ self._blocks.append(self.symbol_mapper)
+ self._blocks += [self.unpack, self]
+ self.connect(*self._blocks)
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -329,39 +338,39 @@ class generic_demod(gr.hier_block2):
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.agc,
- gr.file_sink(gr.sizeof_gr_complex, "rx_agc.32fc"))
+ blocks.file_sink(gr.sizeof_gr_complex, "rx_agc.32fc"))
self.connect((self.freq_recov, 0),
- gr.file_sink(gr.sizeof_gr_complex, "rx_freq_recov.32fc"))
+ blocks.file_sink(gr.sizeof_gr_complex, "rx_freq_recov.32fc"))
self.connect((self.freq_recov, 1),
- gr.file_sink(gr.sizeof_float, "rx_freq_recov_freq.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_freq_recov_freq.32f"))
self.connect((self.freq_recov, 2),
- gr.file_sink(gr.sizeof_float, "rx_freq_recov_phase.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_freq_recov_phase.32f"))
self.connect((self.freq_recov, 3),
- gr.file_sink(gr.sizeof_float, "rx_freq_recov_error.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_freq_recov_error.32f"))
self.connect((self.time_recov, 0),
- gr.file_sink(gr.sizeof_gr_complex, "rx_time_recov.32fc"))
+ blocks.file_sink(gr.sizeof_gr_complex, "rx_time_recov.32fc"))
self.connect((self.time_recov, 1),
- gr.file_sink(gr.sizeof_float, "rx_time_recov_error.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_time_recov_error.32f"))
self.connect((self.time_recov, 2),
- gr.file_sink(gr.sizeof_float, "rx_time_recov_rate.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_time_recov_rate.32f"))
self.connect((self.time_recov, 3),
- gr.file_sink(gr.sizeof_float, "rx_time_recov_phase.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_time_recov_phase.32f"))
self.connect((self.receiver, 0),
- gr.file_sink(gr.sizeof_char, "rx_receiver.8b"))
+ blocks.file_sink(gr.sizeof_char, "rx_receiver.8b"))
self.connect((self.receiver, 1),
- gr.file_sink(gr.sizeof_float, "rx_receiver_error.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_receiver_error.32f"))
self.connect((self.receiver, 2),
- gr.file_sink(gr.sizeof_float, "rx_receiver_phase.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_receiver_phase.32f"))
self.connect((self.receiver, 3),
- gr.file_sink(gr.sizeof_float, "rx_receiver_freq.32f"))
+ blocks.file_sink(gr.sizeof_float, "rx_receiver_freq.32f"))
if self._differential:
self.connect(self.diffdec,
- gr.file_sink(gr.sizeof_char, "rx_diffdec.8b"))
- if self._constellation.apply_pre_diff_code():
+ blocks.file_sink(gr.sizeof_char, "rx_diffdec.8b"))
+ if self.pre_diff_code:
self.connect(self.symbol_mapper,
- gr.file_sink(gr.sizeof_char, "rx_symbol_mapper.8b"))
+ blocks.file_sink(gr.sizeof_char, "rx_symbol_mapper.8b"))
self.connect(self.unpack,
- gr.file_sink(gr.sizeof_char, "rx_unpack.8b"))
+ blocks.file_sink(gr.sizeof_char, "rx_unpack.8b"))
def add_options(parser):
"""
@@ -385,3 +394,17 @@ class generic_demod(gr.hier_block2):
return extract_kwargs_from_options_for_class(cls, options)
extract_kwargs_from_options=classmethod(extract_kwargs_from_options)
+shared_demod_args = """ samples_per_symbol: samples per baud >= 2 (float)
+ excess_bw: Root-raised cosine filter excess bandwidth (float)
+ freq_bw: loop filter lock-in bandwidth (float)
+ timing_bw: timing recovery loop lock-in bandwidth (float)
+ phase_bw: phase recovery loop bandwidth (float)
+ verbose: Print information about modulator? (boolean)
+ log: Log modulation data to files? (boolean)
+"""
+
+shared_mod_args = """ samples_per_symbol: samples per baud >= 2 (float)
+ excess_bw: Root-raised cosine filter excess bandwidth (float)
+ verbose: Print information about modulator? (boolean)
+ log: Log modulation data to files? (boolean)
+"""
diff --git a/gr-digital/python/gfsk.py b/gr-digital/python/gfsk.py
index c85fdf0e00..54c94b88fd 100644
--- a/gr-digital/python/gfsk.py
+++ b/gr-digital/python/gfsk.py
@@ -2,7 +2,7 @@
# GFSK modulation and demodulation.
#
#
-# Copyright 2005,2006,2007 Free Software Foundation, Inc.
+# Copyright 2005-2007,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -25,6 +25,8 @@
# See gnuradio-examples/python/digital for examples
from gnuradio import gr
+from gnuradio import analog
+from gnuradio import blocks
import modulation_utils
import digital_swig as digital
from math import pi
@@ -32,6 +34,11 @@ import numpy
from pprint import pprint
import inspect
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 2
_def_sensitivity = 1
@@ -68,14 +75,11 @@ class gfsk_mod(gr.hier_block2):
The input is a byte stream (unsigned char) and the
output is the complex modulated signal at baseband.
- @param samples_per_symbol: samples per baud >= 2
- @type samples_per_symbol: integer
- @param bt: Gaussian filter bandwidth * symbol time
- @type bt: float
- @param verbose: Print information about modulator?
- @type verbose: bool
- @param debug: Print modualtion data to files?
- @type debug: bool
+ Args:
+ samples_per_symbol: samples per baud >= 2 (integer)
+ bt: Gaussian filter bandwidth * symbol time (float)
+ verbose: Print information about modulator? (bool)
+ debug: Print modualtion data to files? (bool)
"""
gr.hier_block2.__init__(self, "gfsk_mod",
@@ -94,11 +98,13 @@ class gfsk_mod(gr.hier_block2):
#sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
# Turn it into NRZ data.
- self.nrz = gr.bytes_to_syms()
+ #self.nrz = digital.bytes_to_syms()
+ self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
+ self.nrz = digital.chunks_to_symbols_bf([-1, 1])
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
- self.gaussian_taps = gr.firdes.gaussian(
+ self.gaussian_taps = filter.firdes.gaussian(
1.0, # gain
samples_per_symbol, # symbol_rate
bt, # bandwidth * symbol time
@@ -107,13 +113,13 @@ class gfsk_mod(gr.hier_block2):
self.sqwave = (1,) * samples_per_symbol # rectangular window
self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = gr.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
# FM modulation
- self.fmmod = gr.frequency_modulator_fc(sensitivity)
+ self.fmmod = frequency.frequency_modulator_fc(sensitivity)
# small amount of output attenuation to prevent clipping USRP sink
- self.amp = gr.multiply_const_cc(0.999)
+ self.amp = blocks.multiply_const_cc(0.999)
if verbose:
self._print_verbage()
@@ -122,7 +128,7 @@ class gfsk_mod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
+ self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -140,11 +146,11 @@ class gfsk_mod(gr.hier_block2):
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.nrz,
- gr.file_sink(gr.sizeof_float, "nrz.dat"))
+ blocks.file_sink(gr.sizeof_float, "nrz.dat"))
self.connect(self.gaussian_filter,
- gr.file_sink(gr.sizeof_float, "gaussian_filter.dat"))
+ blocks.file_sink(gr.sizeof_float, "gaussian_filter.dat"))
self.connect(self.fmmod,
- gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat"))
+ blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat"))
def add_options(parser):
@@ -188,23 +194,19 @@ class gfsk_demod(gr.hier_block2):
The input is the complex modulated signal at baseband.
The output is a stream of bits packed 1 bit per byte (the LSB)
- @param samples_per_symbol: samples per baud
- @type samples_per_symbol: integer
- @param verbose: Print information about modulator?
- @type verbose: bool
- @param log: Print modualtion data to files?
- @type log: bool
+ Args:
+ samples_per_symbol: samples per baud (integer)
+ verbose: Print information about modulator? (bool)
+ log: Print modualtion data to files? (bool)
Clock recovery parameters. These all have reasonble defaults.
- @param gain_mu: controls rate of mu adjustment
- @type gain_mu: float
- @param mu: fractional delay [0.0, 1.0]
- @type mu: float
- @param omega_relative_limit: sets max variation in omega
- @type omega_relative_limit: float, typically 0.000200 (200 ppm)
- @param freq_error: bit rate error as a fraction
- @param float
+ Args:
+ gain_mu: controls rate of mu adjustment (float)
+ mu: fractional delay [0.0, 1.0] (float)
+ omega_relative_limit: sets max variation in omega (float, typically 0.000200 (200 ppm))
+ freq_error: bit rate error as a fraction
+ float:
"""
gr.hier_block2.__init__(self, "gfsk_demod",
@@ -230,7 +232,7 @@ class gfsk_demod(gr.hier_block2):
# Demodulate FM
#sensitivity = (pi / 2) / samples_per_symbol
- self.fmdemod = gr.quadrature_demod_cf(1.0 / sensitivity)
+ self.fmdemod = analog.quadrature_demod_cf(1.0 / sensitivity)
# the clock recovery block tracks the symbol clock and resamples as needed.
# the output of the block is a stream of soft symbols (float)
@@ -270,11 +272,11 @@ class gfsk_demod(gr.hier_block2):
def _setup_logging(self):
print "Demodulation logging turned on."
self.connect(self.fmdemod,
- gr.file_sink(gr.sizeof_float, "fmdemod.dat"))
+ blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
self.connect(self.clock_recovery,
- gr.file_sink(gr.sizeof_float, "clock_recovery.dat"))
+ blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
self.connect(self.slicer,
- gr.file_sink(gr.sizeof_char, "slicer.dat"))
+ blocks.file_sink(gr.sizeof_char, "slicer.dat"))
def add_options(parser):
"""
diff --git a/gr-digital/python/gmsk.py b/gr-digital/python/gmsk.py
index 2c9be056c2..055fc6002b 100644
--- a/gr-digital/python/gmsk.py
+++ b/gr-digital/python/gmsk.py
@@ -2,7 +2,7 @@
# GMSK modulation and demodulation.
#
#
-# Copyright 2005,2006,2007 Free Software Foundation, Inc.
+# Copyright 2005-2007,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -25,6 +25,8 @@
# See gnuradio-examples/python/digital for examples
from gnuradio import gr
+from gnuradio import blocks
+from gnuradio import analog
import modulation_utils
import digital_swig as digital
from math import pi
@@ -32,6 +34,12 @@ import numpy
from pprint import pprint
import inspect
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
+
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 2
_def_bt = 0.35
@@ -53,28 +61,25 @@ _def_omega_relative_limit = 0.005
# /////////////////////////////////////////////////////////////////////////////
class gmsk_mod(gr.hier_block2):
+ """
+ Hierarchical block for Gaussian Minimum Shift Key (GMSK)
+ modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ samples_per_symbol: samples per baud >= 2 (integer)
+ bt: Gaussian filter bandwidth * symbol time (float)
+ verbose: Print information about modulator? (boolean)
+ debug: Print modulation data to files? (boolean)
+ """
def __init__(self,
samples_per_symbol=_def_samples_per_symbol,
bt=_def_bt,
verbose=_def_verbose,
log=_def_log):
- """
- Hierarchical block for Gaussian Minimum Shift Key (GMSK)
- modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- @param samples_per_symbol: samples per baud >= 2
- @type samples_per_symbol: integer
- @param bt: Gaussian filter bandwidth * symbol time
- @type bt: float
- @param verbose: Print information about modulator?
- @type verbose: bool
- @param debug: Print modualtion data to files?
- @type debug: bool
- """
gr.hier_block2.__init__(self, "gmsk_mod",
gr.io_signature(1, 1, gr.sizeof_char), # Input signature
@@ -92,11 +97,13 @@ class gmsk_mod(gr.hier_block2):
sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
# Turn it into NRZ data.
- self.nrz = gr.bytes_to_syms()
+ #self.nrz = digital.bytes_to_syms()
+ self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
+ self.nrz = digital.chunks_to_symbols_bf([-1, 1], 1)
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
- self.gaussian_taps = gr.firdes.gaussian(
+ self.gaussian_taps = filter.firdes.gaussian(
1, # gain
samples_per_symbol, # symbol_rate
bt, # bandwidth * symbol time
@@ -105,10 +112,10 @@ class gmsk_mod(gr.hier_block2):
self.sqwave = (1,) * samples_per_symbol # rectangular window
self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = gr.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
# FM modulation
- self.fmmod = gr.frequency_modulator_fc(sensitivity)
+ self.fmmod = analog.frequency_modulator_fc(sensitivity)
if verbose:
self._print_verbage()
@@ -117,7 +124,7 @@ class gmsk_mod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self)
+ self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -135,11 +142,11 @@ class gmsk_mod(gr.hier_block2):
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.nrz,
- gr.file_sink(gr.sizeof_float, "nrz.dat"))
+ blocks.file_sink(gr.sizeof_float, "nrz.dat"))
self.connect(self.gaussian_filter,
- gr.file_sink(gr.sizeof_float, "gaussian_filter.dat"))
+ blocks.file_sink(gr.sizeof_float, "gaussian_filter.dat"))
self.connect(self.fmmod,
- gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat"))
+ blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat"))
def add_options(parser):
@@ -166,7 +173,23 @@ class gmsk_mod(gr.hier_block2):
# /////////////////////////////////////////////////////////////////////////////
class gmsk_demod(gr.hier_block2):
-
+ """
+ Hierarchical block for Gaussian Minimum Shift Key (GMSK)
+ demodulation.
+
+ The input is the complex modulated signal at baseband.
+ The output is a stream of bits packed 1 bit per byte (the LSB)
+
+ Args:
+ samples_per_symbol: samples per baud (integer)
+ verbose: Print information about modulator? (boolean)
+ log: Print modualtion data to files? (boolean)
+ gain_mu: controls rate of mu adjustment (float)
+ mu: fractional delay [0.0, 1.0] (float)
+ omega_relative_limit: sets max variation in omega (float)
+ freq_error: bit rate error as a fraction (float)
+ """
+
def __init__(self,
samples_per_symbol=_def_samples_per_symbol,
gain_mu=_def_gain_mu,
@@ -175,31 +198,6 @@ class gmsk_demod(gr.hier_block2):
freq_error=_def_freq_error,
verbose=_def_verbose,
log=_def_log):
- """
- Hierarchical block for Gaussian Minimum Shift Key (GMSK)
- demodulation.
-
- The input is the complex modulated signal at baseband.
- The output is a stream of bits packed 1 bit per byte (the LSB)
-
- @param samples_per_symbol: samples per baud
- @type samples_per_symbol: integer
- @param verbose: Print information about modulator?
- @type verbose: bool
- @param log: Print modualtion data to files?
- @type log: bool
-
- Clock recovery parameters. These all have reasonble defaults.
-
- @param gain_mu: controls rate of mu adjustment
- @type gain_mu: float
- @param mu: fractional delay [0.0, 1.0]
- @type mu: float
- @param omega_relative_limit: sets max variation in omega
- @type omega_relative_limit: float, typically 0.000200 (200 ppm)
- @param freq_error: bit rate error as a fraction
- @param float
- """
gr.hier_block2.__init__(self, "gmsk_demod",
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
@@ -224,7 +222,7 @@ class gmsk_demod(gr.hier_block2):
# Demodulate FM
sensitivity = (pi / 2) / samples_per_symbol
- self.fmdemod = gr.quadrature_demod_cf(1.0 / sensitivity)
+ self.fmdemod = analog.quadrature_demod_cf(1.0 / sensitivity)
# the clock recovery block tracks the symbol clock and resamples as needed.
# the output of the block is a stream of soft symbols (float)
@@ -264,11 +262,11 @@ class gmsk_demod(gr.hier_block2):
def _setup_logging(self):
print "Demodulation logging turned on."
self.connect(self.fmdemod,
- gr.file_sink(gr.sizeof_float, "fmdemod.dat"))
+ blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
self.connect(self.clock_recovery,
- gr.file_sink(gr.sizeof_float, "clock_recovery.dat"))
+ blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
self.connect(self.slicer,
- gr.file_sink(gr.sizeof_char, "slicer.dat"))
+ blocks.file_sink(gr.sizeof_char, "slicer.dat"))
def add_options(parser):
"""
diff --git a/gr-digital/python/modulation_utils.py b/gr-digital/python/modulation_utils.py
index cb3a9812d4..d499094d05 100644
--- a/gr-digital/python/modulation_utils.py
+++ b/gr-digital/python/modulation_utils.py
@@ -74,11 +74,10 @@ def extract_kwargs_from_options(function, excluded_args, options):
but in that case the default provided in the __init__ argument
list will be used since there is no kwargs entry.)
- @param function: the function whose parameter list will be examined
- @param excluded_args: function arguments that are NOT to be added to the dictionary
- @type excluded_args: sequence of strings
- @param options: result of command argument parsing
- @type options: optparse.Values
+ Args:
+ function: the function whose parameter list will be examined
+ excluded_args: function arguments that are NOT to be added to the dictionary (sequence of strings)
+ options: result of command argument parsing (optparse.Values)
"""
# Try this in C++ ;)
diff --git a/gr-digital/python/ofdm.py b/gr-digital/python/ofdm.py
index 9f57920efc..bf129675ab 100644
--- a/gr-digital/python/ofdm.py
+++ b/gr-digital/python/ofdm.py
@@ -21,8 +21,9 @@
#
import math
-from gnuradio import gr
-import digital_swig
+from gnuradio import gr, fft
+from gnuradio import blocks
+import digital_swig as digital
import ofdm_packet_utils
from ofdm_receiver import ofdm_receiver
import gnuradio.gr.gr_threading as _threading
@@ -46,10 +47,10 @@ class ofdm_mod(gr.hier_block2):
Packets to be sent are enqueued by calling send_pkt.
The output is the complex modulated signal at baseband.
- @param options: pass modulation options from higher layers (fft length, occupied tones, etc.)
- @param msgq_limit: maximum number of messages in message queue
- @type msgq_limit: int
- @param pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples
+ Args:
+ options: pass modulation options from higher layers (fft length, occupied tones, etc.)
+ msgq_limit: maximum number of messages in message queue (int)
+ pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples
"""
gr.hier_block2.__init__(self, "ofdm_mod",
@@ -97,17 +98,17 @@ class ofdm_mod(gr.hier_block2):
constel = qam.qam_constellation(arity)
rotated_const = map(lambda pt: pt * rot, constel.points())
#print rotated_const
- self._pkt_input = digital_swig.ofdm_mapper_bcv(rotated_const,
- msgq_limit,
- options.occupied_tones,
- options.fft_length)
+ self._pkt_input = digital.ofdm_mapper_bcv(rotated_const,
+ msgq_limit,
+ options.occupied_tones,
+ options.fft_length)
- self.preambles = digital_swig.ofdm_insert_preamble(self._fft_length,
- padded_preambles)
- self.ifft = gr.fft_vcc(self._fft_length, False, win, True)
- self.cp_adder = digital_swig.ofdm_cyclic_prefixer(self._fft_length,
- symbol_length)
- self.scale = gr.multiply_const_cc(1.0 / math.sqrt(self._fft_length))
+ self.preambles = digital.ofdm_insert_preamble(self._fft_length,
+ padded_preambles)
+ self.ifft = fft.fft_vcc(self._fft_length, False, win, True)
+ self.cp_adder = digital.ofdm_cyclic_prefixer(self._fft_length,
+ symbol_length)
+ self.scale = blocks.multiply_const_cc(1.0 / math.sqrt(self._fft_length))
self.connect((self._pkt_input, 0), (self.preambles, 0))
self.connect((self._pkt_input, 1), (self.preambles, 1))
@@ -117,21 +118,21 @@ class ofdm_mod(gr.hier_block2):
self._print_verbage()
if options.log:
- self.connect(self._pkt_input, gr.file_sink(gr.sizeof_gr_complex*options.fft_length,
+ self.connect(self._pkt_input, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length,
"ofdm_mapper_c.dat"))
- self.connect(self.preambles, gr.file_sink(gr.sizeof_gr_complex*options.fft_length,
+ self.connect(self.preambles, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length,
"ofdm_preambles.dat"))
- self.connect(self.ifft, gr.file_sink(gr.sizeof_gr_complex*options.fft_length,
+ self.connect(self.ifft, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length,
"ofdm_ifft_c.dat"))
- self.connect(self.cp_adder, gr.file_sink(gr.sizeof_gr_complex,
+ self.connect(self.cp_adder, blocks.file_sink(gr.sizeof_gr_complex,
"ofdm_cp_adder_c.dat"))
def send_pkt(self, payload='', eof=False):
"""
Send the payload.
- @param payload: data to send
- @type payload: string
+ Args:
+ payload: data to send (string)
"""
if eof:
msg = gr.message(1) # tell self._pkt_input we're not sending any more packets
@@ -188,9 +189,9 @@ class ofdm_demod(gr.hier_block2):
The input is the complex modulated signal at baseband.
Demodulated packets are sent to the handler.
- @param options: pass modulation options from higher layers (fft length, occupied tones, etc.)
- @param callback: function of two args: ok, payload
- @type callback: ok: bool; payload: string
+ Args:
+ options: pass modulation options from higher layers (fft length, occupied tones, etc.)
+ callback: function of two args: ok, payload (ok: bool; payload: string)
"""
gr.hier_block2.__init__(self, "ofdm_demod",
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
@@ -240,10 +241,10 @@ class ofdm_demod(gr.hier_block2):
phgain = 0.25
frgain = phgain*phgain / 4.0
- self.ofdm_demod = digital_swig.ofdm_frame_sink(rotated_const, range(arity),
- self._rcvd_pktq,
- self._occupied_tones,
- phgain, frgain)
+ self.ofdm_demod = digital.ofdm_frame_sink(rotated_const, range(arity),
+ self._rcvd_pktq,
+ self._occupied_tones,
+ phgain, frgain)
self.connect(self, self.ofdm_recv)
self.connect((self.ofdm_recv, 0), (self.ofdm_demod, 0))
@@ -255,7 +256,7 @@ class ofdm_demod(gr.hier_block2):
if options.log:
self.connect(self.ofdm_demod,
- gr.file_sink(gr.sizeof_gr_complex*self._occupied_tones,
+ blocks.file_sink(gr.sizeof_gr_complex*self._occupied_tones,
"ofdm_frame_sink_c.dat"))
else:
self.connect(self.ofdm_demod,
diff --git a/gr-digital/python/ofdm_packet_utils.py b/gr-digital/python/ofdm_packet_utils.py
index d0000e6db5..c49dfe4f8e 100644
--- a/gr-digital/python/ofdm_packet_utils.py
+++ b/gr-digital/python/ofdm_packet_utils.py
@@ -101,14 +101,12 @@ def make_packet(payload, samples_per_symbol, bits_per_symbol,
"""
Build a packet, given access code, payload, and whitener offset
- @param payload: packet payload, len [0, 4096]
- @param samples_per_symbol: samples per symbol (needed for padding calculation)
- @type samples_per_symbol: int
- @param bits_per_symbol: (needed for padding calculation)
- @type bits_per_symbol: int
- @param whitener_offset offset into whitener string to use [0-16)
- @param whitening: Turn whitener on or off
- @type whitening: bool
+ Args:
+ payload: packet payload, len [0, 4096]
+ samples_per_symbol: samples per symbol (needed for padding calculation) (int)
+ bits_per_symbol: (needed for padding calculation) (int)
+ whitener_offset: offset into whitener string to use [0-16)
+ whitening: Turn whitener on or off (bool)
Packet will have access code at the beginning, followed by length, payload
and finally CRC-32.
@@ -150,13 +148,13 @@ def _npadding_bytes(pkt_byte_len, samples_per_symbol, bits_per_symbol):
we want to pad so that after modulation the resulting packet
is a multiple of 128 samples.
- @param ptk_byte_len: len in bytes of packet, not including padding.
- @param samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK)
- @type samples_per_symbol: int
- @param bits_per_symbol: bits per symbol (log2(modulation order))
- @type bits_per_symbol: int
+ Args:
+ ptk_byte_len: len in bytes of packet, not including padding.
+ samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK) (int)
+ bits_per_symbol: bits per symbol (log2(modulation order)) (int)
- @returns number of bytes of padding to append.
+ Returns:
+ number of bytes of padding to append.
"""
modulus = 128
byte_modulus = gru.lcm(modulus/8, samples_per_symbol) * bits_per_symbol / samples_per_symbol
@@ -170,10 +168,10 @@ def unmake_packet(whitened_payload_with_crc, whitener_offset=0, dewhitening=1):
"""
Return (ok, payload)
- @param whitened_payload_with_crc: string
- @param whitener_offset offset into whitener string to use [0-16)
- @param dewhitening: Turn whitener on or off
- @type dewhitening: bool
+ Args:
+ whitened_payload_with_crc: string
+ whitener_offset: offset into whitener string to use [0-16)
+ dewhitening: Turn whitener on or off (bool)
"""
if dewhitening:
diff --git a/gr-digital/python/ofdm_receiver.py b/gr-digital/python/ofdm_receiver.py
index 9d4d6e559d..4fbf76251a 100644
--- a/gr-digital/python/ofdm_receiver.py
+++ b/gr-digital/python/ofdm_receiver.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2006, 2007, 2008 Free Software Foundation, Inc.
+# Copyright 2006-2008 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -23,13 +23,21 @@
import math
from numpy import fft
from gnuradio import gr
+from gnuradio import analog
+from gnuradio import blocks
+from gnuradio import filter
-import digital_swig
+import digital_swig as digital
from ofdm_sync_pn import ofdm_sync_pn
from ofdm_sync_fixed import ofdm_sync_fixed
from ofdm_sync_pnac import ofdm_sync_pnac
from ofdm_sync_ml import ofdm_sync_ml
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
class ofdm_receiver(gr.hier_block2):
"""
Performs receiver synchronization on OFDM symbols.
@@ -47,18 +55,13 @@ class ofdm_receiver(gr.hier_block2):
The input is the complex modulated signal at baseband.
Synchronized packets are sent back to the demodulator.
- @param fft_length: total number of subcarriers
- @type fft_length: int
- @param cp_length: length of cyclic prefix as specified in subcarriers (<= fft_length)
- @type cp_length: int
- @param occupied_tones: number of subcarriers used for data
- @type occupied_tones: int
- @param snr: estimated signal to noise ratio used to guide cyclic prefix synchronizer
- @type snr: float
- @param ks: known symbols used as preambles to each packet
- @type ks: list of lists
- @param logging: turn file logging on or off
- @type logging: bool
+ Args:
+ fft_length: total number of subcarriers (int)
+ cp_length: length of cyclic prefix as specified in subcarriers (<= fft_length) (int)
+ occupied_tones: number of subcarriers used for data (int)
+ snr: estimated signal to noise ratio used to guide cyclic prefix synchronizer (float)
+ ks: known symbols used as preambles to each packet (list of lists)
+ logging: turn file logging on or off (bool)
"""
gr.hier_block2.__init__(self, "ofdm_receiver",
@@ -67,12 +70,12 @@ class ofdm_receiver(gr.hier_block2):
bw = (float(occupied_tones) / float(fft_length)) / 2.0
tb = bw*0.08
- chan_coeffs = gr.firdes.low_pass (1.0, # gain
- 1.0, # sampling rate
- bw+tb, # midpoint of trans. band
- tb, # width of trans. band
- gr.firdes.WIN_HAMMING) # filter type
- self.chan_filt = gr.fft_filter_ccc(1, chan_coeffs)
+ chan_coeffs = filter.firdes.low_pass (1.0, # gain
+ 1.0, # sampling rate
+ bw+tb, # midpoint of trans. band
+ tb, # width of trans. band
+ filter.firdes.WIN_HAMMING) # filter type
+ self.chan_filt = filter.fft_filter_ccc(1, chan_coeffs)
win = [1 for i in range(fft_length)]
@@ -107,7 +110,7 @@ class ofdm_receiver(gr.hier_block2):
# for testing only; do not user over the air
# remove filter and filter delay for this
elif SYNC == "fixed":
- self.chan_filt = gr.multiply_const_cc(1.0)
+ self.chan_filt = blocks.multiply_const_cc(1.0)
nsymbols = 18 # enter the number of symbols per packet
freq_offset = 0.0 # if you use a frequency offset, enter it here
nco_sensitivity = -2.0/fft_length # correct for fine frequency
@@ -119,11 +122,11 @@ class ofdm_receiver(gr.hier_block2):
# Set up blocks
- self.nco = gr.frequency_modulator_fc(nco_sensitivity) # generate a signal proportional to frequency error of sync block
- self.sigmix = gr.multiply_cc()
- self.sampler = digital_swig.ofdm_sampler(fft_length, fft_length+cp_length)
- self.fft_demod = gr.fft_vcc(fft_length, True, win, True)
- self.ofdm_frame_acq = digital_swig.ofdm_frame_acquisition(occupied_tones,
+ self.nco = analog.frequency_modulator_fc(nco_sensitivity) # generate a signal proportional to frequency error of sync block
+ self.sigmix = blocks.multiply_cc()
+ self.sampler = digital.ofdm_sampler(fft_length, fft_length+cp_length)
+ self.fft_demod = fft.fft_vcc(fft_length, True, win, True)
+ self.ofdm_frame_acq = digital.ofdm_frame_acquisition(occupied_tones,
fft_length,
cp_length, ks[0])
@@ -141,11 +144,11 @@ class ofdm_receiver(gr.hier_block2):
self.connect((self.ofdm_frame_acq,1), (self,1)) # frame and symbol timing, and equalization
if logging:
- self.connect(self.chan_filt, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-chan_filt_c.dat"))
- self.connect(self.fft_demod, gr.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-fft_out_c.dat"))
+ self.connect(self.chan_filt, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-chan_filt_c.dat"))
+ self.connect(self.fft_demod, blocks.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-fft_out_c.dat"))
self.connect(self.ofdm_frame_acq,
- gr.file_sink(gr.sizeof_gr_complex*occupied_tones, "ofdm_receiver-frame_acq_c.dat"))
- self.connect((self.ofdm_frame_acq,1), gr.file_sink(1, "ofdm_receiver-found_corr_b.dat"))
- self.connect(self.sampler, gr.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-sampler_c.dat"))
- self.connect(self.sigmix, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-sigmix_c.dat"))
- self.connect(self.nco, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-nco_c.dat"))
+ blocks.file_sink(gr.sizeof_gr_complex*occupied_tones, "ofdm_receiver-frame_acq_c.dat"))
+ self.connect((self.ofdm_frame_acq,1), blocks.file_sink(1, "ofdm_receiver-found_corr_b.dat"))
+ self.connect(self.sampler, blocks.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-sampler_c.dat"))
+ self.connect(self.sigmix, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-sigmix_c.dat"))
+ self.connect(self.nco, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-nco_c.dat"))
diff --git a/gr-digital/python/ofdm_sync_fixed.py b/gr-digital/python/ofdm_sync_fixed.py
index 9bac789bf6..bd64964651 100644
--- a/gr-digital/python/ofdm_sync_fixed.py
+++ b/gr-digital/python/ofdm_sync_fixed.py
@@ -22,6 +22,7 @@
import math
from gnuradio import gr
+from gnuradio import blocks
class ofdm_sync_fixed(gr.hier_block2):
def __init__(self, fft_length, cp_length, nsymbols, freq_offset, logging=False):
@@ -46,5 +47,5 @@ class ofdm_sync_fixed(gr.hier_block2):
self.connect(self.peak_trigger, (self,1))
if logging:
- self.connect(self.peak_trigger, gr.file_sink(gr.sizeof_char, "ofdm_sync_fixed-peaks_b.dat"))
+ self.connect(self.peak_trigger, blocks.file_sink(gr.sizeof_char, "ofdm_sync_fixed-peaks_b.dat"))
diff --git a/gr-digital/python/ofdm_sync_ml.py b/gr-digital/python/ofdm_sync_ml.py
index 7c75d7f1d4..3afd647098 100644
--- a/gr-digital/python/ofdm_sync_ml.py
+++ b/gr-digital/python/ofdm_sync_ml.py
@@ -23,6 +23,16 @@
import math
from gnuradio import gr
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
+try:
+ from gnuradio import blocks
+except ImportError:
+ import blocks_swig as blocks
+
class ofdm_sync_ml(gr.hier_block2):
def __init__(self, fft_length, cp_length, snr, kstime, logging):
''' Maximum Likelihood OFDM synchronizer:
@@ -35,7 +45,7 @@ class ofdm_sync_ml(gr.hier_block2):
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
gr.io_signature2(2, 2, gr.sizeof_float, gr.sizeof_char)) # Output signature
- self.input = gr.add_const_cc(0)
+ self.input = blocks.add_const_cc(0)
SNR = 10.0**(snr/10.0)
rho = SNR / (SNR + 1.0)
@@ -48,16 +58,16 @@ class ofdm_sync_ml(gr.hier_block2):
self.connect(self, self.input)
# Create a delay line
- self.delay = gr.delay(gr.sizeof_gr_complex, fft_length)
+ self.delay = blocks.delay(gr.sizeof_gr_complex, fft_length)
self.connect(self.input, self.delay)
# magnitude squared blocks
- self.magsqrd1 = gr.complex_to_mag_squared()
- self.magsqrd2 = gr.complex_to_mag_squared()
- self.adder = gr.add_ff()
+ self.magsqrd1 = blocks.complex_to_mag_squared()
+ self.magsqrd2 = blocks.complex_to_mag_squared()
+ self.adder = blocks.add_ff()
moving_sum_taps = [rho/2 for i in range(cp_length)]
- self.moving_sum_filter = gr.fir_filter_fff(1,moving_sum_taps)
+ self.moving_sum_filter = filter.fir_filter_fff(1,moving_sum_taps)
self.connect(self.input,self.magsqrd1)
self.connect(self.delay,self.magsqrd2)
@@ -67,29 +77,29 @@ class ofdm_sync_ml(gr.hier_block2):
# Correlation from ML Sync
- self.conjg = gr.conjugate_cc();
- self.mixer = gr.multiply_cc();
+ self.conjg = blocks.conjugate_cc();
+ self.mixer = blocks.multiply_cc();
movingsum2_taps = [1.0 for i in range(cp_length)]
- self.movingsum2 = gr.fir_filter_ccf(1,movingsum2_taps)
+ self.movingsum2 = filter.fir_filter_ccf(1,movingsum2_taps)
# Correlator data handler
- self.c2mag = gr.complex_to_mag()
- self.angle = gr.complex_to_arg()
+ self.c2mag = blocks.complex_to_mag()
+ self.angle = blocks.complex_to_arg()
self.connect(self.input,(self.mixer,1))
self.connect(self.delay,self.conjg,(self.mixer,0))
self.connect(self.mixer,self.movingsum2,self.c2mag)
self.connect(self.movingsum2,self.angle)
# ML Sync output arg, need to find maximum point of this
- self.diff = gr.sub_ff()
+ self.diff = blocks.sub_ff()
self.connect(self.c2mag,(self.diff,0))
self.connect(self.moving_sum_filter,(self.diff,1))
#ML measurements input to sampler block and detect
- self.f2c = gr.float_to_complex()
- self.pk_detect = gr.peak_detector_fb(0.2, 0.25, 30, 0.0005)
- self.sample_and_hold = gr.sample_and_hold_ff()
+ self.f2c = blocks.float_to_complex()
+ self.pk_detect = blocks.peak_detector_fb(0.2, 0.25, 30, 0.0005)
+ self.sample_and_hold = blocks.sample_and_hold_ff()
# use the sync loop values to set the sampler and the NCO
# self.diff = theta
@@ -115,9 +125,9 @@ class ofdm_sync_ml(gr.hier_block2):
# to readjust the timing in the middle of the packet or we ruin the equalizer settings.
kstime = [k.conjugate() for k in kstime]
kstime.reverse()
- self.kscorr = gr.fir_filter_ccc(1, kstime)
- self.corrmag = gr.complex_to_mag_squared()
- self.div = gr.divide_ff()
+ self.kscorr = filter.fir_filter_ccc(1, kstime)
+ self.corrmag = blocks.complex_to_mag_squared()
+ self.div = blocks.divide_ff()
# The output signature of the correlation has a few spikes because the rest of the
# system uses the repeated preamble symbol. It needs to work that generically if
@@ -125,10 +135,10 @@ class ofdm_sync_ml(gr.hier_block2):
# The output theta of the correlator above is multiplied with this correlation to
# identify the proper peak and remove other products in this cross-correlation
self.threshold_factor = 0.1
- self.slice = gr.threshold_ff(self.threshold_factor, self.threshold_factor, 0)
- self.f2b = gr.float_to_char()
- self.b2f = gr.char_to_float()
- self.mul = gr.multiply_ff()
+ self.slice = blocks.threshold_ff(self.threshold_factor, self.threshold_factor, 0)
+ self.f2b = blocks.float_to_char()
+ self.b2f = blocks.char_to_float()
+ self.mul = blocks.multiply_ff()
# Normalize the power of the corr output by the energy. This is not really needed
# and could be removed for performance, but it makes for a cleaner signal.
@@ -148,18 +158,18 @@ class ofdm_sync_ml(gr.hier_block2):
if logging:
- self.connect(self.moving_sum_filter, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-energy_f.dat"))
- self.connect(self.diff, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-theta_f.dat"))
- self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-epsilon_f.dat"))
- self.connect(self.corrmag, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-corrmag_f.dat"))
- self.connect(self.kscorr, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-kscorr_c.dat"))
- self.connect(self.div, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-div_f.dat"))
- self.connect(self.mul, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-mul_f.dat"))
- self.connect(self.slice, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-slice_f.dat"))
- self.connect(self.pk_detect, gr.file_sink(gr.sizeof_char, "ofdm_sync_ml-peaks_b.dat"))
+ self.connect(self.moving_sum_filter, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-energy_f.dat"))
+ self.connect(self.diff, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-theta_f.dat"))
+ self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-epsilon_f.dat"))
+ self.connect(self.corrmag, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-corrmag_f.dat"))
+ self.connect(self.kscorr, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-kscorr_c.dat"))
+ self.connect(self.div, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-div_f.dat"))
+ self.connect(self.mul, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-mul_f.dat"))
+ self.connect(self.slice, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-slice_f.dat"))
+ self.connect(self.pk_detect, blocks.file_sink(gr.sizeof_char, "ofdm_sync_ml-peaks_b.dat"))
if use_dpll:
- self.connect(self.dpll, gr.file_sink(gr.sizeof_char, "ofdm_sync_ml-dpll_b.dat"))
+ self.connect(self.dpll, blocks.file_sink(gr.sizeof_char, "ofdm_sync_ml-dpll_b.dat"))
- self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-sample_and_hold_f.dat"))
- self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-input_c.dat"))
+ self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-sample_and_hold_f.dat"))
+ self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-input_c.dat"))
diff --git a/gr-digital/python/ofdm_sync_pn.py b/gr-digital/python/ofdm_sync_pn.py
index 05b1de2e19..4c6a30f802 100644
--- a/gr-digital/python/ofdm_sync_pn.py
+++ b/gr-digital/python/ofdm_sync_pn.py
@@ -24,6 +24,16 @@ import math
from numpy import fft
from gnuradio import gr
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
+try:
+ from gnuradio import blocks
+except ImportError:
+ import blocks_swig as blocks
+
class ofdm_sync_pn(gr.hier_block2):
def __init__(self, fft_length, cp_length, logging=False):
"""
@@ -37,47 +47,47 @@ class ofdm_sync_pn(gr.hier_block2):
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
gr.io_signature2(2, 2, gr.sizeof_float, gr.sizeof_char)) # Output signature
- self.input = gr.add_const_cc(0)
+ self.input = blocks.add_const_cc(0)
# PN Sync
# Create a delay line
- self.delay = gr.delay(gr.sizeof_gr_complex, fft_length/2)
+ self.delay = blocks.delay(gr.sizeof_gr_complex, fft_length/2)
# Correlation from ML Sync
- self.conjg = gr.conjugate_cc();
- self.corr = gr.multiply_cc();
+ self.conjg = blocks.conjugate_cc();
+ self.corr = blocks.multiply_cc();
# Create a moving sum filter for the corr output
if 1:
moving_sum_taps = [1.0 for i in range(fft_length//2)]
- self.moving_sum_filter = gr.fir_filter_ccf(1,moving_sum_taps)
+ self.moving_sum_filter = filter.fir_filter_ccf(1,moving_sum_taps)
else:
moving_sum_taps = [complex(1.0,0.0) for i in range(fft_length//2)]
- self.moving_sum_filter = gr.fft_filter_ccc(1,moving_sum_taps)
+ self.moving_sum_filter = filter.fft_filter_ccc(1,moving_sum_taps)
# Create a moving sum filter for the input
- self.inputmag2 = gr.complex_to_mag_squared()
+ self.inputmag2 = blocks.complex_to_mag_squared()
movingsum2_taps = [1.0 for i in range(fft_length//2)]
if 1:
- self.inputmovingsum = gr.fir_filter_fff(1,movingsum2_taps)
+ self.inputmovingsum = filter.fir_filter_fff(1,movingsum2_taps)
else:
- self.inputmovingsum = gr.fft_filter_fff(1,movingsum2_taps)
+ self.inputmovingsum = filter.fft_filter_fff(1,movingsum2_taps)
- self.square = gr.multiply_ff()
- self.normalize = gr.divide_ff()
+ self.square = blocks.multiply_ff()
+ self.normalize = blocks.divide_ff()
# Get magnitude (peaks) and angle (phase/freq error)
- self.c2mag = gr.complex_to_mag_squared()
- self.angle = gr.complex_to_arg()
+ self.c2mag = blocks.complex_to_mag_squared()
+ self.angle = blocks.complex_to_arg()
- self.sample_and_hold = gr.sample_and_hold_ff()
+ self.sample_and_hold = blocks.sample_and_hold_ff()
#ML measurements input to sampler block and detect
- self.sub1 = gr.add_const_ff(-1)
- self.pk_detect = gr.peak_detector_fb(0.20, 0.20, 30, 0.001)
- #self.pk_detect = gr.peak_detector2_fb(9)
+ self.sub1 = blocks.add_const_ff(-1)
+ self.pk_detect = blocks.peak_detector_fb(0.20, 0.20, 30, 0.001)
+ #self.pk_detect = blocks.peak_detector2_fb(9)
self.connect(self, self.input)
@@ -100,7 +110,7 @@ class ofdm_sync_pn(gr.hier_block2):
# Create a moving sum filter for the corr output
matched_filter_taps = [1.0/cp_length for i in range(cp_length)]
- self.matched_filter = gr.fir_filter_fff(1,matched_filter_taps)
+ self.matched_filter = filter.fir_filter_fff(1,matched_filter_taps)
self.connect(self.normalize, self.matched_filter)
self.connect(self.matched_filter, self.sub1, self.pk_detect)
@@ -114,10 +124,10 @@ class ofdm_sync_pn(gr.hier_block2):
self.connect(self.pk_detect, (self,1))
if logging:
- self.connect(self.matched_filter, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-mf_f.dat"))
- self.connect(self.normalize, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-theta_f.dat"))
- self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-epsilon_f.dat"))
- self.connect(self.pk_detect, gr.file_sink(gr.sizeof_char, "ofdm_sync_pn-peaks_b.dat"))
- self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-sample_and_hold_f.dat"))
- self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pn-input_c.dat"))
+ self.connect(self.matched_filter, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-mf_f.dat"))
+ self.connect(self.normalize, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-theta_f.dat"))
+ self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-epsilon_f.dat"))
+ self.connect(self.pk_detect, blocks.file_sink(gr.sizeof_char, "ofdm_sync_pn-peaks_b.dat"))
+ self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-sample_and_hold_f.dat"))
+ self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pn-input_c.dat"))
diff --git a/gr-digital/python/ofdm_sync_pnac.py b/gr-digital/python/ofdm_sync_pnac.py
index 10a1259641..ee7c82927a 100644
--- a/gr-digital/python/ofdm_sync_pnac.py
+++ b/gr-digital/python/ofdm_sync_pnac.py
@@ -24,6 +24,16 @@ import math
from numpy import fft
from gnuradio import gr
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
+try:
+ from gnuradio import blocks
+except ImportError:
+ import blocks_swig as blocks
+
class ofdm_sync_pnac(gr.hier_block2):
def __init__(self, fft_length, cp_length, kstime, logging=False):
"""
@@ -50,7 +60,7 @@ class ofdm_sync_pnac(gr.hier_block2):
gr.io_signature2(2, 2, gr.sizeof_float, gr.sizeof_char)) # Output signature
- self.input = gr.add_const_cc(0)
+ self.input = blocks.add_const_cc(0)
symbol_length = fft_length + cp_length
@@ -59,30 +69,30 @@ class ofdm_sync_pnac(gr.hier_block2):
# cross-correlate with the known symbol
kstime = [k.conjugate() for k in kstime[0:fft_length//2]]
kstime.reverse()
- self.crosscorr_filter = gr.fir_filter_ccc(1, kstime)
+ self.crosscorr_filter = filter.fir_filter_ccc(1, kstime)
# Create a delay line
- self.delay = gr.delay(gr.sizeof_gr_complex, fft_length/2)
+ self.delay = blocks.delay(gr.sizeof_gr_complex, fft_length/2)
# Correlation from ML Sync
- self.conjg = gr.conjugate_cc();
- self.corr = gr.multiply_cc();
+ self.conjg = blocks.conjugate_cc();
+ self.corr = blocks.multiply_cc();
# Create a moving sum filter for the input
- self.mag = gr.complex_to_mag_squared()
+ self.mag = blocks.complex_to_mag_squared()
movingsum_taps = (fft_length//1)*[1.0,]
- self.power = gr.fir_filter_fff(1,movingsum_taps)
+ self.power = filter.fir_filter_fff(1,movingsum_taps)
# Get magnitude (peaks) and angle (phase/freq error)
- self.c2mag = gr.complex_to_mag_squared()
- self.angle = gr.complex_to_arg()
- self.compare = gr.sub_ff()
+ self.c2mag = blocks.complex_to_mag_squared()
+ self.angle = blocks.complex_to_arg()
+ self.compare = blocks.sub_ff()
- self.sample_and_hold = gr.sample_and_hold_ff()
+ self.sample_and_hold = blocks.sample_and_hold_ff()
#ML measurements input to sampler block and detect
- self.threshold = gr.threshold_ff(0,0,0) # threshold detection might need to be tweaked
- self.peaks = gr.float_to_char()
+ self.threshold = blocks.threshold_ff(0,0,0) # threshold detection might need to be tweaked
+ self.peaks = blocksx.float_to_char()
self.connect(self, self.input)
@@ -115,11 +125,11 @@ class ofdm_sync_pnac(gr.hier_block2):
self.connect(self.peaks, (self,1))
if logging:
- self.connect(self.compare, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-compare_f.dat"))
- self.connect(self.c2mag, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-theta_f.dat"))
- self.connect(self.power, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-inputpower_f.dat"))
- self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-epsilon_f.dat"))
- self.connect(self.threshold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-threshold_f.dat"))
- self.connect(self.peaks, gr.file_sink(gr.sizeof_char, "ofdm_sync_pnac-peaks_b.dat"))
- self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-sample_and_hold_f.dat"))
- self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pnac-input_c.dat"))
+ self.connect(self.compare, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-compare_f.dat"))
+ self.connect(self.c2mag, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-theta_f.dat"))
+ self.connect(self.power, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-inputpower_f.dat"))
+ self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-epsilon_f.dat"))
+ self.connect(self.threshold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-threshold_f.dat"))
+ self.connect(self.peaks, blocks.file_sink(gr.sizeof_char, "ofdm_sync_pnac-peaks_b.dat"))
+ self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-sample_and_hold_f.dat"))
+ self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pnac-input_c.dat"))
diff --git a/gr-digital/python/packet_utils.py b/gr-digital/python/packet_utils.py
index 2e216ff50e..2929758ef0 100644
--- a/gr-digital/python/packet_utils.py
+++ b/gr-digital/python/packet_utils.py
@@ -107,13 +107,12 @@ def make_packet(payload, samples_per_symbol, bits_per_symbol,
"""
Build a packet, given access code, payload, and whitener offset
- @param payload: packet payload, len [0, 4096]
- @param samples_per_symbol: samples per symbol (needed for padding calculation)
- @type samples_per_symbol: int
- @param bits_per_symbol: (needed for padding calculation)
- @type bits_per_symbol: int
- @param access_code: string of ascii 0's and 1's
- @param whitener_offset offset into whitener string to use [0-16)
+ Args:
+ payload: packet payload, len [0, 4096]
+ samples_per_symbol: samples per symbol (needed for padding calculation) (int)
+ bits_per_symbol: (needed for padding calculation) (int)
+ access_code: string of ascii 0's and 1's
+ whitener_offset: offset into whitener string to use [0-16)
Packet will have access code at the beginning, followed by length, payload
and finally CRC-32.
@@ -156,13 +155,13 @@ def _npadding_bytes(pkt_byte_len, samples_per_symbol, bits_per_symbol):
we want to pad so that after modulation the resulting packet
is a multiple of 128 samples.
- @param ptk_byte_len: len in bytes of packet, not including padding.
- @param samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK)
- @type samples_per_symbol: int
- @param bits_per_symbol: bits per symbol (log2(modulation order))
- @type bits_per_symbol: int
+ Args:
+ ptk_byte_len: len in bytes of packet, not including padding.
+ samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK) (int)
+ bits_per_symbol: bits per symbol (log2(modulation order)) (int)
- @returns number of bytes of padding to append.
+ Returns:
+ number of bytes of padding to append.
"""
modulus = 128
byte_modulus = gru.lcm(modulus/8, samples_per_symbol) * bits_per_symbol / samples_per_symbol
@@ -176,7 +175,8 @@ def unmake_packet(whitened_payload_with_crc, whitener_offset=0, dewhitening=True
"""
Return (ok, payload)
- @param whitened_payload_with_crc: string
+ Args:
+ whitened_payload_with_crc: string
"""
if dewhitening:
diff --git a/gr-digital/python/pkt.py b/gr-digital/python/pkt.py
index 8650bdbb02..434548906e 100644
--- a/gr-digital/python/pkt.py
+++ b/gr-digital/python/pkt.py
@@ -23,8 +23,12 @@ from math import pi
from gnuradio import gr
import gnuradio.gr.gr_threading as _threading
import packet_utils
-import digital_swig
+import digital_swig as digital
+try:
+ from gnuradio import blocks
+except ImportError:
+ import blocks_swig as blocks
# /////////////////////////////////////////////////////////////////////////////
# mod/demod with packets as i/o
@@ -44,14 +48,12 @@ class mod_pkts(gr.hier_block2):
Packets to be sent are enqueued by calling send_pkt.
The output is the complex modulated signal at baseband.
- @param modulator: instance of modulator class (gr_block or hier_block2)
- @type modulator: complex baseband out
- @param access_code: AKA sync vector
- @type access_code: string of 1's and 0's between 1 and 64 long
- @param msgq_limit: maximum number of messages in message queue
- @type msgq_limit: int
- @param pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples
- @param use_whitener_offset: If true, start of whitener XOR string is incremented each packet
+ Args:
+ modulator: instance of modulator class (gr_block or hier_block2) (complex baseband out)
+ access_code: AKA sync vector (string of 1's and 0's between 1 and 64 long)
+ msgq_limit: maximum number of messages in message queue (int)
+ pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples
+ use_whitener_offset: If true, start of whitener XOR string is incremented each packet
See gmsk_mod for remaining parameters
"""
@@ -72,15 +74,15 @@ class mod_pkts(gr.hier_block2):
self._access_code = access_code
# accepts messages from the outside world
- self._pkt_input = gr.message_source(gr.sizeof_char, msgq_limit)
+ self._pkt_input = blocks.message_source(gr.sizeof_char, msgq_limit)
self.connect(self._pkt_input, self._modulator, self)
def send_pkt(self, payload='', eof=False):
"""
Send the payload.
- @param payload: data to send
- @type payload: string
+ Args:
+ payload: data to send (string)
"""
if eof:
msg = gr.message(1) # tell self._pkt_input we're not sending any more packets
@@ -116,14 +118,11 @@ class demod_pkts(gr.hier_block2):
The input is the complex modulated signal at baseband.
Demodulated packets are sent to the handler.
- @param demodulator: instance of demodulator class (gr_block or hier_block2)
- @type demodulator: complex baseband in
- @param access_code: AKA sync vector
- @type access_code: string of 1's and 0's
- @param callback: function of two args: ok, payload
- @type callback: ok: bool; payload: string
- @param threshold: detect access_code with up to threshold bits wrong (-1 -> use default)
- @type threshold: int
+ Args:
+ demodulator: instance of demodulator class (gr_block or hier_block2) (complex baseband in)
+ access_code: AKA sync vector (string of 1's and 0's)
+ callback: function of two args: ok, payload (ok: bool; payload: string)
+ threshold: detect access_code with up to threshold bits wrong (-1 -> use default) (int)
"""
gr.hier_block2.__init__(self, "demod_pkts",
@@ -141,9 +140,9 @@ class demod_pkts(gr.hier_block2):
threshold = 12 # FIXME raise exception
self._rcvd_pktq = gr.msg_queue() # holds packets from the PHY
- self.correlator = digital_swig.correlate_access_code_bb(access_code, threshold)
+ self.correlator = digital.correlate_access_code_bb(access_code, threshold)
- self.framer_sink = gr.framer_sink_1(self._rcvd_pktq)
+ self.framer_sink = digital.framer_sink_1(self._rcvd_pktq)
self.connect(self, self._demodulator, self.correlator, self.framer_sink)
self._watcher = _queue_watcher_thread(self._rcvd_pktq, callback)
diff --git a/gr-digital/python/psk.py b/gr-digital/python/psk.py
index 58f6787f0c..1816ffb4ba 100644
--- a/gr-digital/python/psk.py
+++ b/gr-digital/python/psk.py
@@ -30,22 +30,29 @@ import digital_swig
import modulation_utils
from utils import mod_codes, gray_code
from generic_mod_demod import generic_mod, generic_demod
+from generic_mod_demod import shared_mod_args, shared_demod_args
# Default number of points in constellation.
_def_constellation_points = 4
# The default encoding (e.g. gray-code, set-partition)
_def_mod_code = mod_codes.GRAY_CODE
+# Default use of differential encoding
+_def_differential = True
-def create_encodings(mod_code, arity):
+def create_encodings(mod_code, arity, differential):
post_diff_code = None
if mod_code not in mod_codes.codes:
raise ValueError('That modulation code does not exist.')
if mod_code == mod_codes.GRAY_CODE:
- pre_diff_code = gray_code.gray_code(arity)
- elif mod_code == mod_codes.SET_PARTITION_CODE:
- pre_diff_code = set_partition_code.set_partition_code(arity)
+ if differential:
+ pre_diff_code = gray_code.gray_code(arity)
+ post_diff_code = None
+ else:
+ pre_diff_code = []
+ post_diff_code = gray_code.gray_code(arity)
elif mod_code == mod_codes.NO_CODE:
pre_diff_code = []
+ post_diff_code = None
else:
raise ValueError('That modulation code is not implemented for this constellation.')
return (pre_diff_code, post_diff_code)
@@ -54,7 +61,8 @@ def create_encodings(mod_code, arity):
# PSK constellation
# /////////////////////////////////////////////////////////////////////////////
-def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code):
+def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code,
+ differential=_def_differential):
"""
Creates a PSK constellation object.
"""
@@ -62,7 +70,7 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code):
if (k != int(k)):
raise StandardError('Number of constellation points must be a power of two.')
points = [exp(2*pi*(0+1j)*i/m) for i in range(0,m)]
- pre_diff_code, post_diff_code = create_encodings(mod_code, m)
+ pre_diff_code, post_diff_code = create_encodings(mod_code, m, differential)
if post_diff_code is not None:
inverse_post_diff_code = mod_codes.invert_code(post_diff_code)
points = [points[x] for x in inverse_post_diff_code]
@@ -74,22 +82,26 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code):
# /////////////////////////////////////////////////////////////////////////////
class psk_mod(generic_mod):
+ """
+ Hierarchical block for RRC-filtered PSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ constellation_points: Number of constellation points (must be a power of two) (integer).
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ differential: Whether to use differential encoding (boolean).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_mod_args
def __init__(self, constellation_points=_def_constellation_points,
mod_code=_def_mod_code,
+ differential=_def_differential,
*args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered PSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_mod block for list of parameters.
- """
-
- constellation = psk_constellation(constellation_points, mod_code)
- super(psk_mod, self).__init__(constellation, *args, **kwargs)
+ constellation = psk_constellation(constellation_points, mod_code, differential)
+ super(psk_mod, self).__init__(constellation, differential, *args, **kwargs)
# /////////////////////////////////////////////////////////////////////////////
# PSK demodulator
@@ -98,21 +110,25 @@ class psk_mod(generic_mod):
class psk_demod(generic_demod):
+ """
+ Hierarchical block for RRC-filtered PSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ constellation_points: Number of constellation points (must be a power of two) (integer).
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ differential: Whether to use differential encoding (boolean).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_mod_args
def __init__(self, constellation_points=_def_constellation_points,
mod_code=_def_mod_code,
+ differential=_def_differential,
*args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered PSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_demod block for list of parameters.
- """
-
- constellation = psk_constellation(constellation_points, mod_code)
- super(psk_demod, self).__init__(constellation, *args, **kwargs)
+ constellation = psk_constellation(constellation_points, mod_code, differential)
+ super(psk_demod, self).__init__(constellation, differential, *args, **kwargs)
#
# Add these to the mod/demod registry
diff --git a/gr-digital/python/qa_binary_slicer_fb.py b/gr-digital/python/qa_binary_slicer_fb.py
index 60d92c5d19..22f7da73ff 100755
--- a/gr-digital/python/qa_binary_slicer_fb.py
+++ b/gr-digital/python/qa_binary_slicer_fb.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,33 +21,33 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import math, random
-class test_binary_slicer_fb (gr_unittest.TestCase):
+class test_binary_slicer_fb(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_binary_slicer_fb (self):
+ def test_binary_slicer_fb(self):
expected_result = ( 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1)
src_data = (-1, 1, -1, -1, 1, 1, -1, -1, -1, 1, 1, 1, -1, 1, 1, 1, 1)
src_data = [s + (1 - random.random()) for s in src_data] # add some noise
- src = gr.vector_source_f (src_data)
- op = digital_swig.binary_slicer_fb ()
- dst = gr.vector_sink_b ()
+ src = gr.vector_source_f(src_data)
+ op = digital.binary_slicer_fb()
+ dst = gr.vector_sink_b()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
+ actual_result = dst.data() # fetch the contents of the sink
#print "actual result", actual_result
#print "expected result", expected_result
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_chunks_to_symbols.py b/gr-digital/python/qa_chunks_to_symbols.py
index 63af10d8ff..5ffe425132 100755
--- a/gr-digital/python/qa_chunks_to_symbols.py
+++ b/gr-digital/python/qa_chunks_to_symbols.py
@@ -25,10 +25,10 @@ import digital_swig as digital
class test_chunks_to_symbols(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def test_bc_001(self):
diff --git a/gr-digital/python/qa_clock_recovery_mm.py b/gr-digital/python/qa_clock_recovery_mm.py
index f4c345b034..e904cf4c21 100755
--- a/gr-digital/python/qa_clock_recovery_mm.py
+++ b/gr-digital/python/qa_clock_recovery_mm.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,18 +21,18 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import random, cmath
class test_clock_recovery_mm(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# Test complex/complex version
omega = 2
gain_omega = 0.001
@@ -40,9 +40,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.01
omega_rel_lim = 0.001
- self.test = digital_swig.clock_recovery_mm_cc(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_cc(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 100*[complex(1, 1),]
self.src = gr.vector_source_c(data, False)
@@ -64,10 +64,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5)
- def test02 (self):
+ def test02(self):
# Test float/float version
omega = 2
gain_omega = 0.01
@@ -75,9 +75,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.01
omega_rel_lim = 0.001
- self.test = digital_swig.clock_recovery_mm_ff(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_ff(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 100*[1,]
self.src = gr.vector_source_f(data, False)
@@ -99,10 +99,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5)
- def test03 (self):
+ def test03(self):
# Test complex/complex version with varying input
omega = 2
gain_omega = 0.01
@@ -110,9 +110,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.1
omega_rel_lim = 0.0001
- self.test = digital_swig.clock_recovery_mm_cc(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_cc(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 1000*[complex(1, 1), complex(1, 1), complex(-1, -1), complex(-1, -1)]
self.src = gr.vector_source_c(data, False)
@@ -134,10 +134,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
- def test04 (self):
+ def test04(self):
# Test float/float version
omega = 2
gain_omega = 0.01
@@ -145,9 +145,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.1
omega_rel_lim = 0.001
- self.test = digital_swig.clock_recovery_mm_ff(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_ff(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 1000*[1, 1, -1, -1]
self.src = gr.vector_source_f(data, False)
@@ -169,7 +169,7 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 1)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_cma_equalizer.py b/gr-digital/python/qa_cma_equalizer.py
index 75fb0f05ed..f71e199189 100755
--- a/gr-digital/python/qa_cma_equalizer.py
+++ b/gr-digital/python/qa_cma_equalizer.py
@@ -21,7 +21,7 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
class test_cma_equalizer_fir(gr_unittest.TestCase):
@@ -33,7 +33,7 @@ class test_cma_equalizer_fir(gr_unittest.TestCase):
def transform(self, src_data):
SRC = gr.vector_source_c(src_data, False)
- EQU = digital_swig.cma_equalizer_cc(4, 1.0, .001, 1)
+ EQU = digital.cma_equalizer_cc(4, 1.0, .001, 1)
DST = gr.vector_sink_c()
self.tb.connect(SRC, EQU, DST)
self.tb.run()
@@ -44,7 +44,9 @@ class test_cma_equalizer_fir(gr_unittest.TestCase):
src_data = (1+0j, 0+1j, -1+0j, 0-1j)*1000
expected_data = src_data
result = self.transform(src_data)
- self.assertComplexTuplesAlmostEqual(expected_data, result)
+
+ N = -500
+ self.assertComplexTuplesAlmostEqual(expected_data[N:], result[N:])
if __name__ == "__main__":
gr_unittest.run(test_cma_equalizer_fir, "test_cma_equalizer_fir.xml")
diff --git a/gr-digital/python/qa_constellation.py b/gr-digital/python/qa_constellation.py
index ddd8c71e64..a593c3ea3e 100755
--- a/gr-digital/python/qa_constellation.py
+++ b/gr-digital/python/qa_constellation.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -23,9 +23,10 @@
import random
from cmath import exp, pi, log
-from gnuradio import gr, gr_unittest, blks2
+from gnuradio import gr, gr_unittest
from utils import mod_codes
-import digital_swig
+import digital_swig as digital
+import blocks_swig as blocks
# import from local folder
import psk
@@ -51,7 +52,7 @@ def twod_constell():
(-1+0j), (0-1j))
rot_sym = 2
dim = 2
- return digital_swig.constellation_calcdist(points, [], rot_sym, dim)
+ return digital.constellation_calcdist(points, [], rot_sym, dim)
def threed_constell():
oned_points = ((1+0j), (0+1j), (-1+0j), (0-1j))
@@ -63,7 +64,7 @@ def threed_constell():
points += [oned_points[ia], oned_points[ib], oned_points[ic]]
rot_sym = 4
dim = 3
- return digital_swig.constellation_calcdist(points, [], rot_sym, dim)
+ return digital.constellation_calcdist(points, [], rot_sym, dim)
# A list of tuples for constellation testing. The contents of the
# tuples are (constructor, poss_args, differential, diff_argname).
@@ -74,15 +75,25 @@ easy_constellation_info = (
{'m': (2, 4, 8, 16, ),
'mod_code': tested_mod_codes, },
True, None),
+ (psk.psk_constellation,
+ {'m': (2, 4, 8, 16, 32, 64),
+ 'mod_code': tested_mod_codes,
+ 'differential': (False,)},
+ False, None),
(qam.qam_constellation,
{'constellation_points': (4,),
'mod_code': tested_mod_codes,
'large_ampls_to_corners': [False],},
True, None),
- (digital_swig.constellation_bpsk, {}, True, None),
- (digital_swig.constellation_qpsk, {}, False, None),
- (digital_swig.constellation_dqpsk, {}, True, None),
- (digital_swig.constellation_8psk, {}, False, None),
+ (qam.qam_constellation,
+ {'constellation_points': (4, 16, 64),
+ 'mod_code': tested_mod_codes,
+ 'differential': (False,)},
+ False, None),
+ (digital.constellation_bpsk, {}, True, None),
+ (digital.constellation_qpsk, {}, False, None),
+ (digital.constellation_dqpsk, {}, True, None),
+ (digital.constellation_8psk, {}, False, None),
(twod_constell, {}, True, None),
(threed_constell, {}, True, None),
)
@@ -150,7 +161,7 @@ def tested_constellations(easy=True, medium=True, difficult=True):
break
-class test_constellation (gr_unittest.TestCase):
+class test_constellation(gr_unittest.TestCase):
src_length = 256
@@ -178,7 +189,7 @@ class test_constellation (gr_unittest.TestCase):
data = dst.data()
# Don't worry about cut off data for now.
first = constellation.bits_per_symbol()
- self.assertEqual (self.src_data[first:len(data)], data[first:])
+ self.assertEqual(self.src_data[first:len(data)], data[first:])
class mod_demod(gr.hier_block2):
@@ -200,40 +211,39 @@ class mod_demod(gr.hier_block2):
self.blocks = [self]
# We expect a stream of unpacked bits.
# First step is to pack them.
- self.blocks.append(
- gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST))
+ self.blocks.append(blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST))
# Second step we unpack them such that we have k bits in each byte where
# each constellation symbol hold k bits.
self.blocks.append(
- gr.packed_to_unpacked_bb(self.constellation.bits_per_symbol(),
- gr.GR_MSB_FIRST))
+ blocks.packed_to_unpacked_bb(self.constellation.bits_per_symbol(),
+ gr.GR_MSB_FIRST))
# Apply any pre-differential coding
# Gray-coding is done here if we're also using differential coding.
if self.constellation.apply_pre_diff_code():
- self.blocks.append(gr.map_bb(self.constellation.pre_diff_code()))
+ self.blocks.append(digital.map_bb(self.constellation.pre_diff_code()))
# Differential encoding.
if self.differential:
- self.blocks.append(gr.diff_encoder_bb(arity))
+ self.blocks.append(digital.diff_encoder_bb(arity))
# Convert to constellation symbols.
- self.blocks.append(gr.chunks_to_symbols_bc(self.constellation.points(),
- self.constellation.dimensionality()))
+ self.blocks.append(digital.chunks_to_symbols_bc(self.constellation.points(),
+ self.constellation.dimensionality()))
# CHANNEL
# Channel just consists of a rotation to check differential coding.
if rotation is not None:
- self.blocks.append(gr.multiply_const_cc(rotation))
+ self.blocks.append(blocks.multiply_const_cc(rotation))
# RX
# Convert the constellation symbols back to binary values.
- self.blocks.append(digital_swig.constellation_decoder_cb(self.constellation.base()))
+ self.blocks.append(digital.constellation_decoder_cb(self.constellation.base()))
# Differential decoding.
if self.differential:
- self.blocks.append(gr.diff_decoder_bb(arity))
+ self.blocks.append(digital.diff_decoder_bb(arity))
# Decode any pre-differential coding.
if self.constellation.apply_pre_diff_code():
- self.blocks.append(gr.map_bb(
+ self.blocks.append(digital.map_bb(
mod_codes.invert_code(self.constellation.pre_diff_code())))
# unpack the k bit vector into a stream of bits
- self.blocks.append(gr.unpack_k_bits_bb(
+ self.blocks.append(blocks.unpack_k_bits_bb(
self.constellation.bits_per_symbol()))
# connect to block output
check_index = len(self.blocks)
@@ -241,7 +251,6 @@ class mod_demod(gr.hier_block2):
self.blocks.append(self)
self.connect(*self.blocks)
-
if __name__ == '__main__':
gr_unittest.run(test_constellation, "test_constellation.xml")
diff --git a/gr-digital/python/qa_constellation_decoder_cb.py b/gr-digital/python/qa_constellation_decoder_cb.py
index 5401a07fc0..6a93b6e743 100755
--- a/gr-digital/python/qa_constellation_decoder_cb.py
+++ b/gr-digital/python/qa_constellation_decoder_cb.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2004,2007,2010,2011 Free Software Foundation, Inc.
+# Copyright 2004,2007,2010-2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,54 +21,54 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import math
-class test_constellation_decoder (gr_unittest.TestCase):
+class test_constellation_decoder(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_constellation_decoder_cb_bpsk (self):
- cnst = digital_swig.constellation_bpsk()
+ def test_constellation_decoder_cb_bpsk(self):
+ cnst = digital.constellation_bpsk()
src_data = (0.5 + 0.5j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j,
0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j)
expected_result = ( 1, 1, 0, 0,
1, 0, 1)
- src = gr.vector_source_c (src_data)
- op = digital_swig.constellation_decoder_cb (cnst.base())
- dst = gr.vector_sink_b ()
+ src = gr.vector_source_c(src_data)
+ op = digital.constellation_decoder_cb(cnst.base())
+ dst = gr.vector_sink_b()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
+ actual_result = dst.data() # fetch the contents of the sink
#print "actual result", actual_result
#print "expected result", expected_result
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
- def test_constellation_decoder_cb_qpsk (self):
- cnst = digital_swig.constellation_qpsk()
+ def _test_constellation_decoder_cb_qpsk(self):
+ cnst = digital.constellation_qpsk()
src_data = (0.5 + 0.5j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j,
0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j)
expected_result = ( 3, 1, 0, 2,
3, 2, 1)
- src = gr.vector_source_c (src_data)
- op = digital_swig.constellation_decoder_cb (cnst.base())
- dst = gr.vector_sink_b ()
+ src = gr.vector_source_c(src_data)
+ op = digital_swig.constellation_decoder_cb(cnst.base())
+ dst = gr.vector_sink_b()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
+ actual_result = dst.data() # fetch the contents of the sink
#print "actual result", actual_result
#print "expected result", expected_result
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_constellation_receiver.py b/gr-digital/python/qa_constellation_receiver.py
index 37e56b4cf7..bc44220ea9 100755
--- a/gr-digital/python/qa_constellation_receiver.py
+++ b/gr-digital/python/qa_constellation_receiver.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -22,9 +22,12 @@
import random
-from gnuradio import gr, blks2, gr_unittest
+from gnuradio import gr, gr_unittest
from utils import mod_codes, alignment
-import digital_swig, packet_utils
+import packet_utils
+import filter_swig as filter
+import analog_swig as analog
+import blocks_swig as blocks
from generic_mod_demod import generic_mod, generic_demod
from qa_constellation import tested_constellations, twod_constell
@@ -37,7 +40,7 @@ SEED = 1239
# TESTING PARAMETERS
# The number of symbols to test with.
# We need this many to let the frequency recovery block converge.
-DATA_LENGTH = 2000
+DATA_LENGTH = 1000
# Test fails if fraction of output that is correct is less than this.
EASY_REQ_CORRECT = 0.9
# For constellations that aren't expected to work so well.
@@ -52,8 +55,30 @@ TIMING_OFFSET = 1.0
FREQ_BW = 2*math.pi/100.0
PHASE_BW = 2*math.pi/100.0
+class channel_model(gr.hier_block2):
+ def __init__(self, noise_voltage, freq, timing):
+ gr.hier_block2.__init__(self, "channel_model",
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
+
-class test_constellation_receiver (gr_unittest.TestCase):
+ timing_offset = filter.fractional_interpolator_cc(0, timing)
+ noise_adder = blocks.add_cc()
+ noise = analog.noise_source_c(analog.GR_GAUSSIAN,
+ noise_voltage, 0)
+ freq_offset = analog.sig_source_c(1, analog.GR_SIN_WAVE,
+ freq, 1.0, 0.0)
+ mixer_offset = blocks.multiply_cc();
+
+ self.connect(self, timing_offset)
+ self.connect(timing_offset, (mixer_offset,0))
+ self.connect(freq_offset, (mixer_offset,1))
+ self.connect(mixer_offset, (noise_adder,1))
+ self.connect(noise, (noise_adder,0))
+ self.connect(noise_adder, self)
+
+
+class test_constellation_receiver(gr_unittest.TestCase):
# We ignore the first half of the output data since often it takes
# a while for the receiver to lock on.
@@ -109,18 +134,20 @@ class test_constellation_receiver (gr_unittest.TestCase):
self.assertTrue(correct > req_correct)
-class rec_test_tb (gr.top_block):
+class rec_test_tb(gr.top_block):
"""
Takes a constellation an runs a generic modulation, channel,
and generic demodulation.
"""
def __init__(self, constellation, differential,
- data_length=None, src_data=None):
+ data_length=None, src_data=None, freq_offset=True):
"""
- constellation -- a constellation object
- differential -- whether differential encoding is used
- data_length -- the number of bits of data to use
- src_data -- a list of the bits to use
+ Args:
+ constellation: a constellation object
+ differential: whether differential encoding is used
+ data_length: the number of bits of data to use
+ src_data: a list of the bits to use
+ freq_offset: whether to use a frequency offset in the channel
"""
super(rec_test_tb, self).__init__()
# Transmission Blocks
@@ -128,15 +155,22 @@ class rec_test_tb (gr.top_block):
self.src_data = tuple([rndm.randint(0,1) for i in range(0, data_length)])
else:
self.src_data = src_data
- packer = gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST)
+ packer = blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST)
src = gr.vector_source_b(self.src_data)
mod = generic_mod(constellation, differential=differential)
# Channel
- channel = gr.channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET)
+ if freq_offset:
+ channel = channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET)
+ else:
+ channel = channel_model(NOISE_VOLTAGE, 0, TIMING_OFFSET)
# Receiver Blocks
- demod = generic_demod(constellation, differential=differential,
- freq_bw=FREQ_BW,
- phase_bw=PHASE_BW)
+ if freq_offset:
+ demod = generic_demod(constellation, differential=differential,
+ freq_bw=FREQ_BW,
+ phase_bw=PHASE_BW)
+ else:
+ demod = generic_demod(constellation, differential=differential,
+ freq_bw=0, phase_bw=0)
self.dst = gr.vector_sink_b()
self.connect(src, packer, mod, channel, demod, self.dst)
diff --git a/gr-digital/python/qa_correlate_access_code.py b/gr-digital/python/qa_correlate_access_code.py
index 96246dcfb9..5a5f2209f7 100755
--- a/gr-digital/python/qa_correlate_access_code.py
+++ b/gr-digital/python/qa_correlate_access_code.py
@@ -52,13 +52,13 @@ class test_correlate_access_code(gr_unittest.TestCase):
# 0 0 0 1 0 0 0 1
src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
expected_result = pad + (1, 0, 1, 1, 3, 1, 0, 1, 1, 2) + (0,) * 6
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.correlate_access_code_bb("1011", 0)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
def test_002(self):
@@ -69,13 +69,13 @@ class test_correlate_access_code(gr_unittest.TestCase):
#print access_code
src_data = code + (1, 0, 1, 1) + pad
expected_result = pad + code + (3, 0, 1, 1)
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.correlate_access_code_bb(access_code, 0)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
def test_003(self):
code = tuple(string_to_1_0_list(default_access_code))
@@ -85,14 +85,13 @@ class test_correlate_access_code(gr_unittest.TestCase):
#print access_code
src_data = code + (1, 0, 1, 1) + pad
expected_result = code + (1, 0, 1, 1) + pad
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.correlate_access_code_tag_bb(access_code, 0, "test")
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
-
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
if __name__ == '__main__':
gr_unittest.run(test_correlate_access_code, "test_correlate_access_code.xml")
diff --git a/gr-digital/python/qa_costas_loop_cc.py b/gr-digital/python/qa_costas_loop_cc.py
index 75fdbc2f84..365eda736a 100755
--- a/gr-digital/python/qa_costas_loop_cc.py
+++ b/gr-digital/python/qa_costas_loop_cc.py
@@ -21,22 +21,23 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig, psk
+import digital_swig as digital
+import psk
import random, cmath
class test_costas_loop_cc(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# test basic functionality by setting all gains to 0
natfreq = 0.0
order = 2
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
data = 100*[complex(1,0),]
self.src = gr.vector_source_c(data, False)
@@ -47,13 +48,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
expected_result = data
dst_data = self.snk.data()
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5)
- def test02 (self):
+ def test02(self):
# Make sure it doesn't diverge given perfect data
natfreq = 0.25
order = 2
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
data = [complex(2*random.randint(0,1)-1, 0) for i in xrange(100)]
self.src = gr.vector_source_c(data, False)
@@ -65,13 +66,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
expected_result = data
dst_data = self.snk.data()
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5)
- def test03 (self):
+ def test03(self):
# BPSK Convergence test with static rotation
natfreq = 0.25
order = 2
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
rot = cmath.exp(0.2j) # some small rotation
data = [complex(2*random.randint(0,1)-1, 0) for i in xrange(100)]
@@ -90,13 +91,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
# generously compare results; the loop will converge near to, but
# not exactly on, the target data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2)
- def test04 (self):
+ def test04(self):
# QPSK Convergence test with static rotation
natfreq = 0.25
order = 4
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
rot = cmath.exp(0.2j) # some small rotation
data = [complex(2*random.randint(0,1)-1, 2*random.randint(0,1)-1)
@@ -116,13 +117,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
# generously compare results; the loop will converge near to, but
# not exactly on, the target data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2)
- def test05 (self):
+ def test05(self):
# 8PSK Convergence test with static rotation
natfreq = 0.25
order = 8
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
rot = cmath.exp(-cmath.pi/8.0j) # rotate to match Costas rotation
const = psk.psk_constellation(order)
@@ -145,7 +146,7 @@ class test_costas_loop_cc(gr_unittest.TestCase):
# generously compare results; the loop will converge near to, but
# not exactly on, the target data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2)
if __name__ == '__main__':
gr_unittest.run(test_costas_loop_cc, "test_costas_loop_cc.xml")
diff --git a/gr-digital/python/qa_cpm.py b/gr-digital/python/qa_cpm.py
index 12a84c76c2..070e69a982 100755
--- a/gr-digital/python/qa_cpm.py
+++ b/gr-digital/python/qa_cpm.py
@@ -21,15 +21,17 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
+import analog_swig as analog
+import blocks_swig as blocks
import numpy
class test_cpm(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def do_check_phase_shift(self, type, name):
@@ -37,8 +39,8 @@ class test_cpm(gr_unittest.TestCase):
L = 1
in_bits = (1,) * 20
src = gr.vector_source_b(in_bits, False)
- cpm = digital_swig.cpmmod_bc(type, 0.5, sps, L)
- arg = gr.complex_to_arg()
+ cpm = digital.cpmmod_bc(type, 0.5, sps, L)
+ arg = blocks.complex_to_arg()
sink = gr.vector_sink_f()
self.tb.connect(src, cpm, arg, sink)
@@ -51,16 +53,16 @@ class test_cpm(gr_unittest.TestCase):
msg="Phase shift was not correct for CPM method " + name)
def test_001_lrec(self):
- self.do_check_phase_shift(gr.cpm.LRC, 'LREC')
+ self.do_check_phase_shift(analog.cpm.LRC, 'LREC')
def test_001_lrc(self):
- self.do_check_phase_shift(gr.cpm.LRC, 'LRC')
+ self.do_check_phase_shift(analog.cpm.LRC, 'LRC')
def test_001_lsrc(self):
- self.do_check_phase_shift(gr.cpm.LSRC, 'LSRC')
+ self.do_check_phase_shift(analog.cpm.LSRC, 'LSRC')
def test_001_ltfm(self):
- self.do_check_phase_shift(gr.cpm.TFM, 'TFM')
+ self.do_check_phase_shift(analog.cpm.TFM, 'TFM')
def test_001_lgmsk(self):
sps = 2
@@ -68,8 +70,8 @@ class test_cpm(gr_unittest.TestCase):
bt = 0.3
in_bits = (1,) * 20
src = gr.vector_source_b(in_bits, False)
- gmsk = digital_swig.gmskmod_bc(sps, bt, L)
- arg = gr.complex_to_arg()
+ gmsk = digital.gmskmod_bc(sps, L, bt)
+ arg = blocks.complex_to_arg()
sink = gr.vector_sink_f()
self.tb.connect(src, gmsk, arg, sink)
@@ -82,7 +84,7 @@ class test_cpm(gr_unittest.TestCase):
msg="Phase shift was not correct for GMSK")
def test_phase_response(self):
- phase_response = gr.cpm.phase_response(gr.cpm.LREC, 2, 4)
+ phase_response = analog.cpm.phase_response(analog.cpm.LREC, 2, 4)
self.assertAlmostEqual(numpy.sum(phase_response), 1)
diff --git a/gr-digital/python/qa_crc32.py b/gr-digital/python/qa_crc32.py
index f86813f3f3..cd4006b1d3 100755
--- a/gr-digital/python/qa_crc32.py
+++ b/gr-digital/python/qa_crc32.py
@@ -21,40 +21,40 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import random, cmath
class test_crc32(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
data = 100*"0"
expected_result = 2943744955
- result = digital_swig.crc32(data)
+ result = digital.crc32(data)
#print hex(result)
- self.assertEqual (expected_result, result)
+ self.assertEqual(expected_result, result)
- def test02 (self):
+ def test02(self):
data = 100*"1"
expected_result = 2326594156
- result = digital_swig.crc32(data)
+ result = digital.crc32(data)
#print hex(result)
- self.assertEqual (expected_result, result)
+ self.assertEqual(expected_result, result)
- def test03 (self):
+ def test03(self):
data = 10*"0123456789"
expected_result = 3774345973
- result = digital_swig.crc32(data)
+ result = digital.crc32(data)
#print hex(result)
- self.assertEqual (expected_result, result)
+ self.assertEqual(expected_result, result)
if __name__ == '__main__':
gr_unittest.run(test_crc32, "test_crc32.xml")
diff --git a/gr-digital/python/qa_crc32_bb.py b/gr-digital/python/qa_crc32_bb.py
index d63b5e1222..4574b9dca7 100755
--- a/gr-digital/python/qa_crc32_bb.py
+++ b/gr-digital/python/qa_crc32_bb.py
@@ -20,6 +20,7 @@
#
from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
import digital_swig as digital
try: import pmt
except: from gruel import pmt
@@ -38,8 +39,8 @@ class qa_crc32_bb (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(data))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
src = gr.vector_source_b(data, False, 1, (tag,))
crc = digital.crc32_bb(False, tag_name)
sink = gr.vector_sink_b()
@@ -55,8 +56,8 @@ class qa_crc32_bb (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(data))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
src = gr.vector_source_b(data, False, 1, (tag,))
crc = digital.crc32_bb(False, tag_name)
crc_check = digital.crc32_bb(True, tag_name)
@@ -72,24 +73,24 @@ class qa_crc32_bb (gr_unittest.TestCase):
packets = range(pack_len*2)
tag1 = gr.gr_tag_t()
tag1.offset = 0
- tag1.key = pmt.pmt_string_to_symbol(tag_name)
- tag1.value = pmt.pmt_from_long(pack_len)
+ tag1.key = pmt.string_to_symbol(tag_name)
+ tag1.value = pmt.from_long(pack_len)
tag2 = gr.gr_tag_t()
tag2.offset = pack_len
- tag2.key = pmt.pmt_string_to_symbol(tag_name)
- tag2.value = pmt.pmt_from_long(pack_len)
+ tag2.key = pmt.string_to_symbol(tag_name)
+ tag2.value = pmt.from_long(pack_len)
testtag1 = gr.gr_tag_t()
testtag1.offset = 1
- testtag1.key = pmt.pmt_string_to_symbol("tag1")
- testtag1.value = pmt.pmt_from_long(0)
+ testtag1.key = pmt.string_to_symbol("tag1")
+ testtag1.value = pmt.from_long(0)
testtag2 = gr.gr_tag_t()
testtag2.offset = pack_len
- testtag2.key = pmt.pmt_string_to_symbol("tag2")
- testtag2.value = pmt.pmt_from_long(0)
+ testtag2.key = pmt.string_to_symbol("tag2")
+ testtag2.value = pmt.from_long(0)
testtag3 = gr.gr_tag_t()
testtag3.offset = len(packets)-1
- testtag3.key = pmt.pmt_string_to_symbol("tag3")
- testtag3.value = pmt.pmt_from_long(0)
+ testtag3.key = pmt.string_to_symbol("tag3")
+ testtag3.value = pmt.from_long(0)
src = gr.vector_source_b(packets, False, 1, (tag1, tag2, testtag1, testtag2, testtag3))
crc = digital.crc32_bb(False, tag_name)
sink = gr.vector_sink_b()
@@ -99,7 +100,7 @@ class qa_crc32_bb (gr_unittest.TestCase):
correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19}
tags_found = {'tag1': False, 'tag2': False, 'tag3': False}
for tag in sink.tags():
- key = pmt.pmt_symbol_to_string(tag.key)
+ key = pmt.symbol_to_string(tag.key)
if key in correct_offsets.keys():
tags_found[key] = True
self.assertEqual(correct_offsets[key], tag.offset)
@@ -114,12 +115,12 @@ class qa_crc32_bb (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(data))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
src = gr.vector_source_b(data, False, 1, (tag,))
crc = digital.crc32_bb(False, tag_name)
crc_check = digital.crc32_bb(True, tag_name)
- corruptor = gr.add_const_bb(1)
+ corruptor = blocks.add_const_bb(1)
sink = gr.vector_sink_b()
self.tb.connect(src, crc, corruptor, crc_check, sink)
self.tb.run()
@@ -132,18 +133,18 @@ class qa_crc32_bb (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(data))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(data))
testtag = gr.gr_tag_t()
testtag.offset = len(data)-1
- testtag.key = pmt.pmt_string_to_symbol('tag1')
- testtag.value = pmt.pmt_from_long(0)
+ testtag.key = pmt.string_to_symbol('tag1')
+ testtag.value = pmt.from_long(0)
src = gr.vector_source_b(data, False, 1, (tag, testtag))
crc_check = digital.crc32_bb(True, tag_name)
sink = gr.vector_sink_b()
self.tb.connect(src, crc_check, sink)
self.tb.run()
- self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.pmt_symbol_to_string(tag.key) == 'tag1'])
+ self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1'])
if __name__ == '__main__':
gr_unittest.run(qa_crc32_bb, "qa_crc32_bb.xml")
diff --git a/gr-digital/python/qa_diff_encoder.py b/gr-digital/python/qa_diff_encoder.py
index e4f5470af5..c28f4dbdf8 100755
--- a/gr-digital/python/qa_diff_encoder.py
+++ b/gr-digital/python/qa_diff_encoder.py
@@ -32,12 +32,12 @@ def make_random_int_tuple(L, min, max):
return tuple(result)
-class test_diff_encoder (gr_unittest.TestCase):
+class test_diff_encoder(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def test_diff_encdec_000(self):
diff --git a/gr-digital/python/qa_diff_phasor_cc.py b/gr-digital/python/qa_diff_phasor_cc.py
index 3e7617fe47..833158d0a8 100755
--- a/gr-digital/python/qa_diff_phasor_cc.py
+++ b/gr-digital/python/qa_diff_phasor_cc.py
@@ -24,25 +24,25 @@ from gnuradio import gr, gr_unittest
import digital_swig as digital
import math
-class test_diff_phasor (gr_unittest.TestCase):
+class test_diff_phasor(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_diff_phasor_cc (self):
+ def test_diff_phasor_cc(self):
src_data = (0+0j, 1+0j, -1+0j, 3+4j, -3-4j, -3+4j)
expected_result = (0+0j, 0+0j, -1+0j, -3-4j, -25+0j, -7-24j)
- src = gr.vector_source_c (src_data)
- op = digital.diff_phasor_cc ()
- dst = gr.vector_sink_c ()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
- self.assertComplexTuplesAlmostEqual (expected_result, actual_result)
+ src = gr.vector_source_c(src_data)
+ op = digital.diff_phasor_cc()
+ dst = gr.vector_sink_c()
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
+ actual_result = dst.data() # fetch the contents of the sink
+ self.assertComplexTuplesAlmostEqual(expected_result, actual_result)
if __name__ == '__main__':
gr_unittest.run(test_diff_phasor, "test_diff_phasor.xml")
diff --git a/gr-digital/python/qa_digital.py b/gr-digital/python/qa_digital.py
index 97e35da568..6f54f14208 100755
--- a/gr-digital/python/qa_digital.py
+++ b/gr-digital/python/qa_digital.py
@@ -21,14 +21,14 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
class test_digital(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_fll_band_edge.py b/gr-digital/python/qa_fll_band_edge.py
index 9e4ca079b7..0f6bad984e 100755
--- a/gr-digital/python/qa_fll_band_edge.py
+++ b/gr-digital/python/qa_fll_band_edge.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,25 +21,28 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
+import filter_swig as filter
+import blocks_swig as blocks
+import analog_swig as analog
import random, math
class test_fll_band_edge_cc(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
sps = 4
rolloff = 0.35
bw = 2*math.pi/100.0
ntaps = 45
# Create pulse shape filter
- rrc_taps = gr.firdes.root_raised_cosine(
+ rrc_taps = filter.firdes.root_raised_cosine(
sps, sps, 1.0, rolloff, ntaps)
# The frequency offset to correct
@@ -49,14 +52,14 @@ class test_fll_band_edge_cc(gr_unittest.TestCase):
random.seed(0)
data = [2.0*random.randint(0, 2) - 1.0 for i in xrange(200)]
self.src = gr.vector_source_c(data, False)
- self.rrc = gr.interp_fir_filter_ccf(sps, rrc_taps)
+ self.rrc = filter.interp_fir_filter_ccf(sps, rrc_taps)
# Mix symbols with a complex sinusoid to spin them
- self.nco = gr.sig_source_c(1, gr.GR_SIN_WAVE, foffset, 1)
- self.mix = gr.multiply_cc()
+ self.nco = analog.sig_source_c(1, analog.GR_SIN_WAVE, foffset, 1)
+ self.mix = blocks.multiply_cc()
# FLL will despin the symbols to an arbitrary phase
- self.fll = digital_swig.fll_band_edge_cc(sps, rolloff, ntaps, bw)
+ self.fll = digital.fll_band_edge_cc(sps, rolloff, ntaps, bw)
# Create sinks for all outputs of the FLL
# we will only care about the freq and error outputs
@@ -78,7 +81,7 @@ class test_fll_band_edge_cc(gr_unittest.TestCase):
dst_data = self.vsnk_frq.data()[N:]
expected_result = len(dst_data)* [-0.20,]
- self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 4)
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 4)
if __name__ == '__main__':
gr_unittest.run(test_fll_band_edge_cc, "test_fll_band_edge_cc.xml")
diff --git a/gr-digital/python/qa_framer_sink.py b/gr-digital/python/qa_framer_sink.py
index bccc86dc78..e717e6ae05 100755
--- a/gr-digital/python/qa_framer_sink.py
+++ b/gr-digital/python/qa_framer_sink.py
@@ -63,11 +63,11 @@ class test_framker_sink(gr_unittest.TestCase):
self.tb.connect(src, correlator, framer_sink)
self.tb.connect(correlator, vsnk)
- self.tb.run ()
+ self.tb.run()
result_data = rcvd_pktq.delete_head()
result_data = result_data.to_string()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_002(self):
@@ -87,11 +87,11 @@ class test_framker_sink(gr_unittest.TestCase):
self.tb.connect(src, correlator, framer_sink)
self.tb.connect(correlator, vsnk)
- self.tb.run ()
+ self.tb.run()
result_data = rcvd_pktq.delete_head()
result_data = result_data.to_string()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
if __name__ == '__main__':
gr_unittest.run(test_framker_sink, "test_framker_sink.xml")
diff --git a/gr-digital/python/qa_glfsr_source.py b/gr-digital/python/qa_glfsr_source.py
index 157520d7f8..c5adab3023 100755
--- a/gr-digital/python/qa_glfsr_source.py
+++ b/gr-digital/python/qa_glfsr_source.py
@@ -25,10 +25,10 @@ import digital_swig as digital
class test_glfsr_source(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def test_000_make_b(self):
@@ -38,9 +38,9 @@ class test_glfsr_source(gr_unittest.TestCase):
def test_001_degree_b(self):
self.assertRaises(RuntimeError,
- lambda: gr.glfsr_source_b(0))
+ lambda: digital.glfsr_source_b(0))
self.assertRaises(RuntimeError,
- lambda: gr.glfsr_source_b(33))
+ lambda: digital.glfsr_source_b(33))
def test_002_correlation_b(self):
for degree in range(1,11): # Higher degrees take too long to correlate
@@ -65,9 +65,9 @@ class test_glfsr_source(gr_unittest.TestCase):
def test_004_degree_f(self):
self.assertRaises(RuntimeError,
- lambda: gr.glfsr_source_f(0))
+ lambda: digital.glfsr_source_f(0))
self.assertRaises(RuntimeError,
- lambda: gr.glfsr_source_f(33))
+ lambda: digital.glfsr_source_f(33))
def test_005_correlation_f(self):
for degree in range(1,11): # Higher degrees take too long to correlate
src = digital.glfsr_source_f(degree, False)
diff --git a/gr-digital/python/qa_header_payload_demux.py b/gr-digital/python/qa_header_payload_demux.py
index 15b3eadcf8..4073f24ace 100755
--- a/gr-digital/python/qa_header_payload_demux.py
+++ b/gr-digital/python/qa_header_payload_demux.py
@@ -50,7 +50,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
)
- self.assertEqual(pmt.pmt_length(hpd.message_ports_in()), 1)
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 1)
header_sink = gr.vector_sink_f()
payload_sink = gr.vector_sink_f()
@@ -61,8 +61,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.tb.start()
time.sleep(.2) # Need this, otherwise, the next message is ignored
hpd.to_basic_block()._post(
- pmt.pmt_intern('header_data'),
- pmt.pmt_from_long(len(payload))
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload))
)
while len(payload_sink.data()) < len(payload):
time.sleep(.2)
diff --git a/gr-digital/python/qa_bytes_to_syms.py b/gr-digital/python/qa_lfsr.py
index 75475a95b2..d70c466ca7 100755
--- a/gr-digital/python/qa_bytes_to_syms.py
+++ b/gr-digital/python/qa_lfsr.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc.
+# Copyright 2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -24,28 +24,26 @@ from gnuradio import gr, gr_unittest
import digital_swig as digital
import math
-class test_bytes_to_syms (gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+class test_lfsr(gr_unittest.TestCase):
- def tearDown (self):
- self.tb = None
+ def setUp(self):
+ pass
- def test_bytes_to_syms_001 (self):
- src_data = (0x01, 0x80, 0x03)
- expected_result = (-1, -1, -1, -1, -1, -1, -1, +1,
- +1, -1, -1, -1, -1, -1, -1, -1,
- -1, -1, -1, -1, -1, -1, +1, +1)
- src = gr.vector_source_b (src_data)
- op = digital.bytes_to_syms ()
- dst = gr.vector_sink_f ()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
+ def tearDown(self):
+ pass
+
+ def test_lfsr_001(self):
+ reglen = 8
+ l = digital.lfsr(1, 1, reglen)
+
+ result_data = []
+ for i in xrange(4*(reglen+1)):
+ result_data.append(l.next_bit())
+
+ expected_result = 4*([1,] + reglen*[0,])
+ self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5)
if __name__ == '__main__':
- gr_unittest.run(test_bytes_to_syms, "test_bytes_to_syms.xml")
+ gr_unittest.run(test_lfsr, "test_lfsr.xml")
diff --git a/gr-digital/python/qa_lms_equalizer.py b/gr-digital/python/qa_lms_equalizer.py
index 025c785aa4..9ba90a89ab 100755
--- a/gr-digital/python/qa_lms_equalizer.py
+++ b/gr-digital/python/qa_lms_equalizer.py
@@ -21,7 +21,7 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
class test_lms_dd_equalizer(gr_unittest.TestCase):
@@ -33,7 +33,7 @@ class test_lms_dd_equalizer(gr_unittest.TestCase):
def transform(self, src_data, gain, const):
SRC = gr.vector_source_c(src_data, False)
- EQU = digital_swig.lms_dd_equalizer_cc(4, gain, 1, const.base())
+ EQU = digital.lms_dd_equalizer_cc(4, gain, 1, const.base())
DST = gr.vector_sink_c()
self.tb.connect(SRC, EQU, DST)
self.tb.run()
@@ -41,13 +41,15 @@ class test_lms_dd_equalizer(gr_unittest.TestCase):
def test_001_identity(self):
# Constant modulus signal so no adjustments
- const = digital_swig.constellation_qpsk()
+ const = digital.constellation_qpsk()
src_data = const.points()*1000
N = 100 # settling time
expected_data = src_data[N:]
result = self.transform(src_data, 0.1, const)[N:]
- self.assertComplexTuplesAlmostEqual(expected_data, result, 5)
+
+ N = -500
+ self.assertComplexTuplesAlmostEqual(expected_data[N:], result[N:], 5)
if __name__ == "__main__":
gr_unittest.run(test_lms_dd_equalizer, "test_lms_dd_equalizer.xml")
diff --git a/gr-digital/python/qa_map.py b/gr-digital/python/qa_map.py
index 3ad99a2c12..0fd7c479a1 100755
--- a/gr-digital/python/qa_map.py
+++ b/gr-digital/python/qa_map.py
@@ -34,14 +34,14 @@ class test_map(gr_unittest.TestCase):
def helper(self, symbols):
src_data = [0, 1, 2, 3, 0, 1, 2, 3]
expected_data = map(lambda x: symbols[x], src_data)
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.map_bb(symbols)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
result_data = list(dst.data())
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_001(self):
symbols = [0, 0, 0, 0]
diff --git a/gr-digital/python/qa_mpsk_receiver.py b/gr-digital/python/qa_mpsk_receiver.py
index e1f16ee671..bde8895e76 100755
--- a/gr-digital/python/qa_mpsk_receiver.py
+++ b/gr-digital/python/qa_mpsk_receiver.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,101 +21,128 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
-import random, cmath
+import digital_swig as digital
+import filter_swig as filter
+import random, cmath, time
class test_mpsk_receiver(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# Test BPSK sync
M = 2
theta = 0
loop_bw = cmath.pi/100.0
fmin = -0.5
fmax = 0.5
- mu = 0.25
+ mu = 0.5
gain_mu = 0.01
omega = 2
gain_omega = 0.001
omega_rel = 0.001
- self.test = digital_swig.mpsk_receiver_cc(M, theta, loop_bw,
- fmin, fmax, mu, gain_mu,
- omega, gain_omega,
- omega_rel)
+ self.test = digital.mpsk_receiver_cc(M, theta, loop_bw,
+ fmin, fmax, mu, gain_mu,
+ omega, gain_omega,
+ omega_rel)
- data = 1000*[complex(1,0), complex(1,0), complex(-1,0), complex(-1,0)]
+ data = 10000*[complex(1,0), complex(-1,0)]
+ #data = [2*random.randint(0,1)-1 for x in xrange(10000)]
self.src = gr.vector_source_c(data, False)
self.snk = gr.vector_sink_c()
- self.tb.connect(self.src, self.test, self.snk)
+ # pulse shaping interpolation filter
+ nfilts = 32
+ excess_bw = 0.35
+ ntaps = 11 * int(omega*nfilts)
+ rrc_taps0 = filter.firdes.root_raised_cosine(
+ nfilts, nfilts, 1.0, excess_bw, ntaps)
+ rrc_taps1 = filter.firdes.root_raised_cosine(
+ 1, omega, 1.0, excess_bw, 11*omega)
+ self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0)
+ self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1)
+
+ self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[complex(-0.5,0), complex(0.5,0)]
+ expected_result = [0.5*d for d in data]
dst_data = self.snk.data()
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
- expected_result = expected_result[len_e - Ncmp:]
+ expected_result = expected_result[len_e - Ncmp-1:-1]
dst_data = dst_data[len_d - Ncmp:]
-
+
#for e,d in zip(expected_result, dst_data):
- # print e, d
+ # print "{0:+.02f} {1:+.02f}".format(e, d)
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
- def test02 (self):
+ def test02(self):
# Test QPSK sync
M = 4
theta = 0
- loop_bw = 2*cmath.pi/100.0
+ loop_bw = cmath.pi/100.0
fmin = -0.5
fmax = 0.5
- mu = 0.25
+ mu = 0.5
gain_mu = 0.01
omega = 2
gain_omega = 0.001
omega_rel = 0.001
- self.test = digital_swig.mpsk_receiver_cc(M, theta, loop_bw,
- fmin, fmax, mu, gain_mu,
- omega, gain_omega,
- omega_rel)
+ self.test = digital.mpsk_receiver_cc(M, theta, loop_bw,
+ fmin, fmax, mu, gain_mu,
+ omega, gain_omega,
+ omega_rel)
- data = 1000*[complex( 0.707, 0.707), complex( 0.707, 0.707),
- complex(-0.707, 0.707), complex(-0.707, 0.707),
- complex(-0.707, -0.707), complex(-0.707, -0.707),
- complex( 0.707, -0.707), complex( 0.707, -0.707)]
+ data = 10000*[complex( 0.707, 0.707),
+ complex(-0.707, 0.707),
+ complex(-0.707, -0.707),
+ complex( 0.707, -0.707)]
+ data = [0.5*d for d in data]
self.src = gr.vector_source_c(data, False)
self.snk = gr.vector_sink_c()
- self.tb.connect(self.src, self.test, self.snk)
+ # pulse shaping interpolation filter
+ nfilts = 32
+ excess_bw = 0.35
+ ntaps = 11 * int(omega*nfilts)
+ rrc_taps0 = filter.firdes.root_raised_cosine(
+ nfilts, nfilts, 1.0, excess_bw, ntaps)
+ rrc_taps1 = filter.firdes.root_raised_cosine(
+ 1, omega, 1.0, excess_bw, 11*omega)
+ self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0)
+ self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1)
+
+ self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[complex(0, -1.0), complex(1.0, 0),
- complex(0, 1.0), complex(-1.0, 0)]
- dst_data = self.snk.data()
+ expected_result = 10000*[complex(-0.5, +0.0), complex(+0.0, -0.5),
+ complex(+0.5, +0.0), complex(+0.0, +0.5)]
+
+ # get data after a settling period
+ dst_data = self.snk.data()[200:]
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
- expected_result = expected_result[len_e - Ncmp:]
+ expected_result = expected_result[len_e - Ncmp - 1:-1]
dst_data = dst_data[len_d - Ncmp:]
#for e,d in zip(expected_result, dst_data):
- # print e, d
+ # print "{0:+.02f} {1:+.02f}".format(e, d)
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
if __name__ == '__main__':
gr_unittest.run(test_mpsk_receiver, "test_mpsk_receiver.xml")
diff --git a/gr-digital/python/qa_mpsk_snr_est.py b/gr-digital/python/qa_mpsk_snr_est.py
index d392567bfd..c976bf21a8 100755
--- a/gr-digital/python/qa_mpsk_snr_est.py
+++ b/gr-digital/python/qa_mpsk_snr_est.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -29,94 +29,93 @@ def get_cplx():
def get_n_cplx():
return complex(random.random()-0.5, random.random()-0.5)
-class test_mpsk_snr_est (gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+class test_mpsk_snr_est(gr_unittest.TestCase):
+ def setUp(self):
+ self.tb = gr.top_block()
random.seed(0) # make repeatable
N = 10000
self._noise = [get_n_cplx() for i in xrange(N)]
self._bits = [get_cplx() for i in xrange(N)]
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def mpsk_snr_est_setup (self, op):
+ def mpsk_snr_est_setup(self, op):
result = []
for i in xrange(1,6):
src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)]
- src = gr.vector_source_c (src_data)
- dst = gr.null_sink (gr.sizeof_gr_complex)
+ src = gr.vector_source_c(src_data)
+ dst = gr.null_sink(gr.sizeof_gr_complex)
- tb = gr.top_block ()
- tb.connect (src, op)
- tb.connect (op, dst)
- tb.run () # run the graph and wait for it to finish
+ tb = gr.top_block()
+ tb.connect(src, op)
+ tb.connect(op, dst)
+ tb.run() # run the graph and wait for it to finish
result.append(op.snr())
return result
- def test_mpsk_snr_est_simple (self):
+ def test_mpsk_snr_est_simple(self):
expected_result = [11.48, 5.91, 3.30, 2.08, 1.46]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_SIMPLE, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_SIMPLE, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_mpsk_snr_est_skew (self):
+ def test_mpsk_snr_est_skew(self):
expected_result = [11.48, 5.91, 3.30, 2.08, 1.46]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_SKEW, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_SKEW, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_mpsk_snr_est_m2m4 (self):
+ def test_mpsk_snr_est_m2m4(self):
expected_result = [11.02, 6.20, 4.98, 5.16, 5.66]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_M2M4, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_M2M4, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_mpsk_snr_est_svn (self):
+ def test_mpsk_snr_est_svn(self):
expected_result = [10.90, 6.00, 4.76, 4.97, 5.49]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_SVR, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_SVR, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_probe_mpsk_snr_est_m2m4 (self):
+ def test_probe_mpsk_snr_est_m2m4(self):
expected_result = [11.02, 6.20, 4.98, 5.16, 5.66]
actual_result = []
for i in xrange(1,6):
src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)]
- src = gr.vector_source_c (src_data)
+ src = gr.vector_source_c(src_data)
N = 10000
alpha = 0.001
- op = digital.probe_mpsk_snr_est_c (digital.SNR_EST_M2M4, N, alpha)
+ op = digital.probe_mpsk_snr_est_c(digital.SNR_EST_M2M4, N, alpha)
- tb = gr.top_block ()
- tb.connect (src, op)
- tb.run () # run the graph and wait for it to finish
+ tb = gr.top_block()
+ tb.connect(src, op)
+ tb.run() # run the graph and wait for it to finish
actual_result.append(op.snr())
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
-
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
if __name__ == '__main__':
# Test various SNR estimators; we're not using a Gaussian
diff --git a/gr-digital/python/qa_ofdm_carrier_allocator_cvc.py b/gr-digital/python/qa_ofdm_carrier_allocator_cvc.py
index 1b013865f4..2105727e04 100755
--- a/gr-digital/python/qa_ofdm_carrier_allocator_cvc.py
+++ b/gr-digital/python/qa_ofdm_carrier_allocator_cvc.py
@@ -45,8 +45,8 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(tx_symbols))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_symbols))
src = gr.vector_source_c(tx_symbols, False, 1, (tag,))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
@@ -71,8 +71,8 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(tx_symbols))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_symbols))
src = gr.vector_source_c(tx_symbols, False, 1, (tag,))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
@@ -104,28 +104,28 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase):
tag_name = "len"
tag1 = gr.gr_tag_t()
tag1.offset = 0
- tag1.key = pmt.pmt_string_to_symbol(tag_name)
- tag1.value = pmt.pmt_from_long(len(tx_symbols))
+ tag1.key = pmt.string_to_symbol(tag_name)
+ tag1.value = pmt.from_long(len(tx_symbols))
tag2 = gr.gr_tag_t()
tag2.offset = len(tx_symbols)
- tag2.key = pmt.pmt_string_to_symbol(tag_name)
- tag2.value = pmt.pmt_from_long(len(tx_symbols))
+ tag2.key = pmt.string_to_symbol(tag_name)
+ tag2.value = pmt.from_long(len(tx_symbols))
testtag1 = gr.gr_tag_t()
testtag1.offset = 0
- testtag1.key = pmt.pmt_string_to_symbol('tag1')
- testtag1.value = pmt.pmt_from_long(0)
+ testtag1.key = pmt.string_to_symbol('tag1')
+ testtag1.value = pmt.from_long(0)
testtag2 = gr.gr_tag_t()
testtag2.offset = 7 # On the 2nd OFDM symbol
- testtag2.key = pmt.pmt_string_to_symbol('tag2')
- testtag2.value = pmt.pmt_from_long(0)
+ testtag2.key = pmt.string_to_symbol('tag2')
+ testtag2.value = pmt.from_long(0)
testtag3 = gr.gr_tag_t()
testtag3.offset = len(tx_symbols)+1 # First OFDM symbol of packet 2
- testtag3.key = pmt.pmt_string_to_symbol('tag3')
- testtag3.value = pmt.pmt_from_long(0)
+ testtag3.key = pmt.string_to_symbol('tag3')
+ testtag3.value = pmt.from_long(0)
testtag4 = gr.gr_tag_t()
testtag4.offset = 2*len(tx_symbols)-1 # Last OFDM symbol of packet 2
- testtag4.key = pmt.pmt_string_to_symbol('tag4')
- testtag4.value = pmt.pmt_from_long(0)
+ testtag4.key = pmt.string_to_symbol('tag4')
+ testtag4.value = pmt.from_long(0)
src = gr.vector_source_c(tx_symbols * 2, False, 1, (tag1, tag2, testtag1, testtag2, testtag3, testtag4))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
@@ -139,13 +139,13 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase):
tags_found = {'tag1': False, 'tag2': False, 'tag3': False, 'tag4': False}
correct_offsets = {'tag1': 0, 'tag2': 1, 'tag3': 3, 'tag4': 5}
for tag in sink.tags():
- key = pmt.pmt_symbol_to_string(tag.key)
+ key = pmt.symbol_to_string(tag.key)
if key in tags_found.keys():
tags_found[key] = True
self.assertEqual(correct_offsets[key], tag.offset)
if key == tag_name:
self.assertTrue(tag.offset == 0 or tag.offset == 3)
- self.assertTrue(pmt.pmt_to_long(tag.value) == 3)
+ self.assertTrue(pmt.to_long(tag.value) == 3)
self.assertTrue(all(tags_found.values()))
diff --git a/gr-digital/python/qa_ofdm_chanest_vcvc.py b/gr-digital/python/qa_ofdm_chanest_vcvc.py
index c78f8fccfc..c7c0d83a84 100755
--- a/gr-digital/python/qa_ofdm_chanest_vcvc.py
+++ b/gr-digital/python/qa_ofdm_chanest_vcvc.py
@@ -22,6 +22,8 @@
from gnuradio import gr, gr_unittest
try: import pmt
except: from gruel import pmt
+import blocks_swig as blocks
+import analog_swig as analog
import digital_swig as digital
import sys
import numpy
@@ -62,12 +64,12 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
shift_tuple(data_symbol, carr_offset)
tag1 = gr.gr_tag_t()
tag1.offset = 0
- tag1.key = pmt.pmt_string_to_symbol("test_tag_1")
- tag1.value = pmt.pmt_from_long(23)
+ tag1.key = pmt.string_to_symbol("test_tag_1")
+ tag1.value = pmt.from_long(23)
tag2 = gr.gr_tag_t()
tag2.offset = 2
- tag2.key = pmt.pmt_string_to_symbol("test_tag_2")
- tag2.value = pmt.pmt_from_long(42)
+ tag2.key = pmt.string_to_symbol("test_tag_2")
+ tag2.value = pmt.from_long(42)
src = gr.vector_source_c(tx_data, False, fft_len, (tag1, tag2))
chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
sink = gr.vector_sink_c(fft_len)
@@ -81,14 +83,14 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
'test_tag_2': False
}
for tag in tags:
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
- carr_offset_hat = pmt.pmt_to_long(tag.value)
- self.assertEqual(pmt.pmt_to_long(tag.value), carr_offset)
- if pmt.pmt_symbol_to_string(tag.key) == 'test_tag_1':
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ carr_offset_hat = pmt.to_long(tag.value)
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'test_tag_1':
self.assertEqual(tag.offset, 0)
- if pmt.pmt_symbol_to_string(tag.key) == 'test_tag_2':
+ if pmt.symbol_to_string(tag.key) == 'test_tag_2':
self.assertEqual(tag.offset, 0)
- detected_tags[pmt.pmt_symbol_to_string(tag.key)] = True
+ detected_tags[pmt.symbol_to_string(tag.key)] = True
self.assertTrue(all(detected_tags.values()))
def test_002_offset_1sym (self):
@@ -111,9 +113,9 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol)
tags = sink.tags()
for tag in tags:
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
- carr_offset_hat = pmt.pmt_to_long(tag.value)
- self.assertEqual(pmt.pmt_to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ carr_offset_hat = pmt.to_long(tag.value)
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
def test_003_channel_no_carroffset (self):
""" Add a channel, check if it's correctly estimated """
@@ -125,7 +127,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
tx_data = sync_symbol1 + sync_symbol2 + data_symbol
channel = (0, 0, 0, 2, -2, 2, 3j, 2, 0, 2, 2, 2, 2, 3, 0, 0)
src = gr.vector_source_c(tx_data, False, fft_len)
- chan = gr.multiply_const_vcc(channel)
+ chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
sink = gr.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
@@ -133,10 +135,10 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
tags = sink.tags()
self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel)))
for tag in tags:
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
- self.assertEqual(pmt.pmt_to_long(tag.value), carr_offset)
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
- self.assertEqual(pmt.pmt_c32vector_elements(tag.value), channel)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ self.assertEqual(pmt.c32vector_elements(tag.value), channel)
def test_004_channel_no_carroffset_1sym (self):
""" Add a channel, check if it's correctly estimated.
@@ -148,17 +150,17 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
tx_data = sync_symbol + data_symbol
channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
src = gr.vector_source_c(tx_data, False, fft_len)
- chan = gr.multiply_const_vcc(channel)
+ chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1)
sink = gr.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
self.tb.run()
tags = sink.tags()
for tag in tags:
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
- self.assertEqual(pmt.pmt_to_long(tag.value), carr_offset)
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
- self.assertEqual(pmt.pmt_c32vector_elements(tag.value), channel)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ self.assertEqual(pmt.c32vector_elements(tag.value), channel)
def test_005_both_1sym_force (self):
""" Add a channel, check if it's correctly estimated.
@@ -171,17 +173,17 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
tx_data = sync_symbol + data_symbol
channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
src = gr.vector_source_c(tx_data, False, fft_len)
- chan = gr.multiply_const_vcc(channel)
+ chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol, ref_symbol, 1)
sink = gr.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
self.tb.run()
tags = sink.tags()
for tag in tags:
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
- self.assertEqual(pmt.pmt_to_long(tag.value), carr_offset)
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
- self.assertEqual(pmt.pmt_c32vector_elements(tag.value), channel)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ self.assertEqual(pmt.c32vector_elements(tag.value), channel)
def test_006_channel_and_carroffset (self):
""" Add a channel, check if it's correctly estimated """
@@ -198,7 +200,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
shift_tuple(data_symbol, carr_offset)
channel = range(fft_len)
src = gr.vector_source_c(tx_data, False, fft_len)
- chan = gr.multiply_const_vcc(channel)
+ chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
sink = gr.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
@@ -206,10 +208,10 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
tags = sink.tags()
chan_est = None
for tag in tags:
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
- self.assertEqual(pmt.pmt_to_long(tag.value), carr_offset)
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
- chan_est = pmt.pmt_c32vector_elements(tag.value)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ self.assertEqual(pmt.to_long(tag.value), carr_offset)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ chan_est = pmt.c32vector_elements(tag.value)
for i in range(fft_len):
if shift_tuple(sync_symbol2, carr_offset)[i]: # Only here the channel can be estimated
self.assertEqual(chan_est[i], channel[i])
@@ -235,24 +237,24 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
shift_tuple(data_sym, carr_offset)
channel = [rand_range(min_chan_ampl, max_chan_ampl) * numpy.exp(1j * rand_range(0, 2 * numpy.pi)) for x in range(fft_len)]
src = gr.vector_source_c(tx_data, False, fft_len)
- chan = gr.multiply_const_vcc(channel)
- noise = gr.noise_source_c(gr.GR_GAUSSIAN, wgn_amplitude)
- add = gr.add_cc(fft_len)
+ chan = blocks.multiply_const_vcc(channel)
+ noise = analog.noise_source_c(analog.GR_GAUSSIAN, wgn_amplitude)
+ add = blocks.add_cc(fft_len)
chanest = digital.ofdm_chanest_vcvc(sync_sym1, sync_sym2, 1)
sink = gr.vector_sink_c(fft_len)
top_block.connect(src, chan, (add, 0), chanest, sink)
- top_block.connect(noise, gr.stream_to_vector(gr.sizeof_gr_complex, fft_len), (add, 1))
+ top_block.connect(noise, blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), (add, 1))
top_block.run()
channel_est = None
carr_offset_hat = 0
rx_sym_est = [0,] * fft_len
tags = sink.tags()
for tag in tags:
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
- carr_offset_hat = pmt.pmt_to_long(tag.value)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
+ carr_offset_hat = pmt.to_long(tag.value)
self.assertEqual(carr_offset, carr_offset_hat)
- if pmt.pmt_symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
- channel_est = pmt.pmt_c32vector_elements(tag.value)
+ if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
+ channel_est = pmt.c32vector_elements(tag.value)
shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
for i in range(fft_len):
if shifted_carrier_mask[i] and channel_est[i]:
diff --git a/gr-digital/python/qa_ofdm_cyclic_prefixer.py b/gr-digital/python/qa_ofdm_cyclic_prefixer.py
index 838e992d02..003e987e95 100755
--- a/gr-digital/python/qa_ofdm_cyclic_prefixer.py
+++ b/gr-digital/python/qa_ofdm_cyclic_prefixer.py
@@ -69,12 +69,12 @@ class test_ofdm_cyclic_prefixer (gr_unittest.TestCase):
7.0/2+1.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1.0/2)
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(2)
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(2)
tag2 = gr.gr_tag_t()
tag2.offset = 1
- tag2.key = pmt.pmt_string_to_symbol("random_tag")
- tag2.value = pmt.pmt_from_long(42)
+ tag2.key = pmt.string_to_symbol("random_tag")
+ tag2.value = pmt.from_long(42)
src = gr.vector_source_c(range(1, fft_len+1) * 2, False, fft_len, (tag, tag2))
cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, 2, tag_name)
sink = gr.vector_sink_c()
diff --git a/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py
index 6f44fd00fb..9faface03f 100755
--- a/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py
+++ b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py
@@ -42,12 +42,12 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
tx_data = (1,) * fft_len * n_syms
len_tag = gr.gr_tag_t()
len_tag.offset = 0
- len_tag.key = pmt.pmt_string_to_symbol(len_tag_key)
- len_tag.value = pmt.pmt_from_long(n_syms)
+ len_tag.key = pmt.string_to_symbol(len_tag_key)
+ len_tag.value = pmt.from_long(n_syms)
chan_tag = gr.gr_tag_t()
chan_tag.offset = 0
- chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps")
- chan_tag.value = pmt.pmt_init_c32vector(fft_len, (1,) * fft_len)
+ chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
+ chan_tag.value = pmt.init_c32vector(fft_len, (1,) * fft_len)
src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag))
eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key)
sink = gr.vector_sink_c(fft_len)
@@ -56,8 +56,8 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
# Check data
self.assertEqual(tx_data, sink.data())
for tag in sink.tags():
- self.assertEqual(pmt.pmt_symbol_to_string(tag.key), len_tag_key)
- self.assertEqual(pmt.pmt_to_long(tag.value), n_syms)
+ self.assertEqual(pmt.symbol_to_string(tag.key), len_tag_key)
+ self.assertEqual(pmt.to_long(tag.value), n_syms)
def test_002_static (self):
fft_len = 8
@@ -87,12 +87,12 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
len_tag_key = "frame_len"
len_tag = gr.gr_tag_t()
len_tag.offset = 0
- len_tag.key = pmt.pmt_string_to_symbol(len_tag_key)
- len_tag.value = pmt.pmt_from_long(4)
+ len_tag.key = pmt.string_to_symbol(len_tag_key)
+ len_tag.value = pmt.from_long(4)
chan_tag = gr.gr_tag_t()
chan_tag.offset = 0
- chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps")
- chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len])
+ chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
+ chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len])
src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag))
eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True)
sink = gr.vector_sink_c(fft_len)
@@ -101,10 +101,10 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()]
self.assertEqual(tx_data, rx_data)
for tag in sink.tags():
- if pmt.pmt_symbol_to_string(tag.key) == len_tag_key:
- self.assertEqual(pmt.pmt_to_long(tag.value), 4)
- if pmt.pmt_symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
- self.assertEqual(list(pmt.pmt_c32vector_elements(tag.value)), channel[-fft_len:])
+ if pmt.symbol_to_string(tag.key) == len_tag_key:
+ self.assertEqual(pmt.to_long(tag.value), 4)
+ if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
+ self.assertEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:])
def test_002_simpledfe (self):
fft_len = 8
@@ -136,12 +136,12 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
len_tag_key = "frame_len"
len_tag = gr.gr_tag_t()
len_tag.offset = 0
- len_tag.key = pmt.pmt_string_to_symbol(len_tag_key)
- len_tag.value = pmt.pmt_from_long(4)
+ len_tag.key = pmt.string_to_symbol(len_tag_key)
+ len_tag.value = pmt.from_long(4)
chan_tag = gr.gr_tag_t()
chan_tag.offset = 0
- chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps")
- chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len])
+ chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
+ chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len])
src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag))
eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True)
sink = gr.vector_sink_c(fft_len)
@@ -150,10 +150,10 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()]
self.assertEqual(tx_data, rx_data)
for tag in sink.tags():
- if pmt.pmt_symbol_to_string(tag.key) == len_tag_key:
- self.assertEqual(pmt.pmt_to_long(tag.value), 4)
- if pmt.pmt_symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
- self.assertComplexTuplesAlmostEqual(list(pmt.pmt_c32vector_elements(tag.value)), channel[-fft_len:], places=1)
+ if pmt.symbol_to_string(tag.key) == len_tag_key:
+ self.assertEqual(pmt.to_long(tag.value), 4)
+ if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
+ self.assertComplexTuplesAlmostEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:], places=1)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_ofdm_insert_preamble.py b/gr-digital/python/qa_ofdm_insert_preamble.py
index c45893fa38..60902edc14 100755
--- a/gr-digital/python/qa_ofdm_insert_preamble.py
+++ b/gr-digital/python/qa_ofdm_insert_preamble.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2007,2010,2011 Free Software Foundation, Inc.
+# Copyright 2007,2010-2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -22,14 +22,15 @@
from gnuradio import gr, gr_unittest
from pprint import pprint
-import digital_swig
+import digital_swig as digital
+import blocks_swig as blocks
-class test_ofdm_insert_preamble (gr_unittest.TestCase):
+class test_ofdm_insert_preamble(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def helper(self, v0, v1, fft_length, preamble):
@@ -37,13 +38,13 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase):
src0 = gr.vector_source_c(v0)
src1 = gr.vector_source_b(v1)
- s2v = gr.stream_to_vector(gr.sizeof_gr_complex, fft_length)
+ s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_length)
# print "len(v) = %d" % (len(v))
- op = digital_swig.ofdm_insert_preamble(fft_length, preamble)
+ op = digital.ofdm_insert_preamble(fft_length, preamble)
- v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_length)
+ v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, fft_length)
dst0 = gr.vector_sink_c()
dst1 = gr.vector_sink_b()
@@ -105,7 +106,6 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase):
p.append(tuple(t))
v += t
-
r = self.helper(v, npayloads*[1], fft_length, preamble)
self.assertEqual(r[0], tuple(npayloads*[1, 0]))
@@ -175,6 +175,5 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase):
p0, p1, p[12], p[13],
p0, p1, p[14], p[15]))
-
if __name__ == '__main__':
gr_unittest.run(test_ofdm_insert_preamble, "test_ofdm_insert_preamble.xml")
diff --git a/gr-digital/python/qa_ofdm_serializer_vcc.py b/gr-digital/python/qa_ofdm_serializer_vcc.py
index 537e19b593..107d6076c5 100755
--- a/gr-digital/python/qa_ofdm_serializer_vcc.py
+++ b/gr-digital/python/qa_ofdm_serializer_vcc.py
@@ -21,7 +21,9 @@
#
from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
import fft_swig as fft
+import analog_swig as analog
import digital_swig as digital
try: import pmt
except: from gruel import pmt
@@ -48,8 +50,8 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(n_syms)
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(n_syms)
src = gr.vector_source_c(tx_symbols, False, fft_len, (tag,))
serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False)
sink = gr.vector_sink_c()
@@ -58,8 +60,8 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
self.assertEqual(sink.data(), expected_result)
self.assertEqual(len(sink.tags()), 1)
result_tag = sink.tags()[0]
- self.assertEqual(pmt.pmt_symbol_to_string(result_tag.key), tag_name)
- self.assertEqual(pmt.pmt_to_long(result_tag.value), n_syms * len(occupied_carriers[0]))
+ self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name)
+ self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0]))
def test_002_with_offset (self):
""" Standard test, carrier offset """
@@ -75,12 +77,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(n_syms)
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(n_syms)
offsettag = gr.gr_tag_t()
offsettag.offset = 0
- offsettag.key = pmt.pmt_string_to_symbol("ofdm_sync_carr_offset")
- offsettag.value = pmt.pmt_from_long(carr_offset)
+ offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
+ offsettag.value = pmt.from_long(carr_offset)
src = gr.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag))
serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False)
sink = gr.vector_sink_c()
@@ -89,8 +91,8 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
self.assertEqual(sink.data(), expected_result)
self.assertEqual(len(sink.tags()), 2)
for tag in sink.tags():
- if pmt.pmt_symbol_to_string(tag.key) == tag_name:
- self.assertEqual(pmt.pmt_to_long(tag.value), n_syms * len(occupied_carriers[0]))
+ if pmt.symbol_to_string(tag.key) == tag_name:
+ self.assertEqual(pmt.to_long(tag.value), n_syms * len(occupied_carriers[0]))
def test_003_connect (self):
""" Connect carrier_allocator to ofdm_serializer,
@@ -104,8 +106,8 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(tx_data))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_data))
src = gr.vector_source_c(tx_data, False, 1, (tag,))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
@@ -138,12 +140,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(len(tx_data))
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(len(tx_data))
offsettag = gr.gr_tag_t()
offsettag.offset = 0
- offsettag.key = pmt.pmt_string_to_symbol("ofdm_sync_carr_offset")
- offsettag.value = pmt.pmt_from_long(carr_offset)
+ offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
+ offsettag.value = pmt.from_long(carr_offset)
src = gr.vector_source_c(tx_data, False, 1, (tag, offsettag))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
@@ -151,16 +153,16 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
pilot_symbols,
tag_name)
tx_ifft = fft.fft_vcc(fft_len, False, ())
- offset_sig = gr.sig_source_c(1.0, gr.GR_COS_WAVE, freq_offset, 1.0)
- mixer = gr.multiply_cc()
+ offset_sig = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0)
+ mixer = blocks.multiply_cc()
rx_fft = fft.fft_vcc(fft_len, True, (), True)
serializer = digital.ofdm_serializer_vcc(alloc)
sink = gr.vector_sink_c()
self.tb.connect(
src, alloc, tx_ifft,
- gr.vector_to_stream(gr.sizeof_gr_complex, fft_len),
+ blocks.vector_to_stream(gr.sizeof_gr_complex, fft_len),
(mixer, 0),
- gr.stream_to_vector(gr.sizeof_gr_complex, fft_len),
+ blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len),
rx_fft, serializer, sink
)
self.tb.connect(offset_sig, (mixer, 1))
@@ -181,12 +183,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
tag_name = "len"
tag = gr.gr_tag_t()
tag.offset = 0
- tag.key = pmt.pmt_string_to_symbol(tag_name)
- tag.value = pmt.pmt_from_long(n_syms)
+ tag.key = pmt.string_to_symbol(tag_name)
+ tag.value = pmt.from_long(n_syms)
tag2 = gr.gr_tag_t()
tag2.offset = 0
- tag2.key = pmt.pmt_string_to_symbol("packet_len")
- tag2.value = pmt.pmt_from_long(len(expected_result))
+ tag2.key = pmt.string_to_symbol("packet_len")
+ tag2.value = pmt.from_long(len(expected_result))
src = gr.vector_source_c(tx_symbols, False, fft_len, (tag, tag2))
serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, False)
sink = gr.vector_sink_c()
@@ -195,8 +197,8 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
self.assertEqual(sink.data(), expected_result)
self.assertEqual(len(sink.tags()), 1)
result_tag = sink.tags()[0]
- self.assertEqual(pmt.pmt_symbol_to_string(result_tag.key), "packet_len")
- self.assertEqual(pmt.pmt_to_long(result_tag.value), len(expected_result))
+ self.assertEqual(pmt.symbol_to_string(result_tag.key), "packet_len")
+ self.assertEqual(pmt.to_long(result_tag.value), len(expected_result))
def test_099 (self):
""" Make sure it fails if it should """
diff --git a/gr-digital/python/qa_ofdm_sync_sc_cfb.py b/gr-digital/python/qa_ofdm_sync_sc_cfb.py
index bed8e9c3ae..e71cd5cce3 100755
--- a/gr-digital/python/qa_ofdm_sync_sc_cfb.py
+++ b/gr-digital/python/qa_ofdm_sync_sc_cfb.py
@@ -24,6 +24,9 @@ import numpy
import random
from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import analog_swig as analog
+
try:
# This will work when feature #505 is added.
from gnuradio import digital
@@ -35,6 +38,7 @@ except ImportError:
from utils import tagged_streams
from ofdm_txrx import ofdm_tx
+
class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
def setUp (self):
@@ -57,12 +61,12 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
sync_symbol + \
[(random.randint(0, 1)*2)-1 for x in range(sig_len)]
tx_signal = tx_signal * 2
- add = gr.add_cc()
+ add = blocks.add_cc()
sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
sink_freq = gr.vector_sink_f()
sink_detect = gr.vector_sink_b()
self.tb.connect(gr.vector_source_c(tx_signal), (add, 0))
- self.tb.connect(gr.noise_source_c(gr.GR_GAUSSIAN, .01), (add, 1))
+ self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .01), (add, 1))
self.tb.connect(add, sync)
self.tb.connect((sync, 0), sink_freq)
self.tb.connect((sync, 1), sink_detect)
@@ -85,14 +89,14 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
tx_signal = sync_symbol[-cp_len:] + \
sync_symbol + \
[(random.randint(0, 1)*2)-1 for x in range(sig_len)]
- mult = gr.multiply_cc()
- add = gr.add_cc()
+ mult = blocks.multiply_cc()
+ add = blocks.add_cc()
sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
sink_freq = gr.vector_sink_f()
sink_detect = gr.vector_sink_b()
self.tb.connect(gr.vector_source_c(tx_signal), (mult, 0), (add, 0))
- self.tb.connect(gr.sig_source_c(2 * numpy.pi, gr.GR_SIN_WAVE, freq_offset, 1.0), (mult, 1))
- self.tb.connect(gr.noise_source_c(gr.GR_GAUSSIAN, .01), (add, 1))
+ self.tb.connect(analog.sig_source_c(2 * numpy.pi, analog.GR_SIN_WAVE, freq_offset, 1.0), (mult, 1))
+ self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .01), (add, 1))
self.tb.connect(add, sync)
self.tb.connect((sync, 0), sink_freq)
self.tb.connect((sync, 1), sink_detect)
@@ -116,12 +120,12 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
sync_symbol[-cp_len:] + \
sync_symbol + \
[(random.randint(0, 1)*2)-1 for x in range(fft_len * random.randint(5,23))]
- add = gr.add_cc()
+ add = blocks.add_cc()
sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
sink_freq = gr.vector_sink_f()
sink_detect = gr.vector_sink_b()
self.tb.connect(gr.vector_source_c(tx_signal), (add, 0))
- self.tb.connect(gr.noise_source_c(gr.GR_GAUSSIAN, .005), (add, 1))
+ self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .005), (add, 1))
self.tb.connect(add, sync)
self.tb.connect((sync, 0), sink_freq)
self.tb.connect((sync, 1), sink_detect)
diff --git a/gr-digital/python/qa_packet_headergenerator_bb.py b/gr-digital/python/qa_packet_headergenerator_bb.py
index 1aeaa86063..2e6e401566 100755
--- a/gr-digital/python/qa_packet_headergenerator_bb.py
+++ b/gr-digital/python/qa_packet_headergenerator_bb.py
@@ -38,16 +38,16 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase):
tagname = "packet_len"
tag1 = gr.gr_tag_t()
tag1.offset = 0
- tag1.key = pmt.pmt_string_to_symbol(tagname)
- tag1.value = pmt.pmt_from_long(4)
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
tag2 = gr.gr_tag_t()
tag2.offset = 4
- tag2.key = pmt.pmt_string_to_symbol(tagname)
- tag2.value = pmt.pmt_from_long(2)
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
tag3 = gr.gr_tag_t()
tag3.offset = 6
- tag3.key = pmt.pmt_string_to_symbol(tagname)
- tag3.value = pmt.pmt_from_long(4)
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
header = digital.packet_headergenerator_bb(12, tagname)
sink = gr.vector_sink_b()
@@ -67,16 +67,16 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase):
tagname = "packet_len"
tag1 = gr.gr_tag_t()
tag1.offset = 0
- tag1.key = pmt.pmt_string_to_symbol(tagname)
- tag1.value = pmt.pmt_from_long(4)
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
tag2 = gr.gr_tag_t()
tag2.offset = 4
- tag2.key = pmt.pmt_string_to_symbol(tagname)
- tag2.value = pmt.pmt_from_long(2)
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
tag3 = gr.gr_tag_t()
tag3.offset = 6
- tag3.key = pmt.pmt_string_to_symbol(tagname)
- tag3.value = pmt.pmt_from_long(4)
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
header = digital.packet_headergenerator_bb(32, tagname)
sink = gr.vector_sink_b()
@@ -97,16 +97,16 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase):
tagname = "packet_len"
tag1 = gr.gr_tag_t()
tag1.offset = 0
- tag1.key = pmt.pmt_string_to_symbol(tagname)
- tag1.value = pmt.pmt_from_long(4)
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
tag2 = gr.gr_tag_t()
tag2.offset = 4
- tag2.key = pmt.pmt_string_to_symbol(tagname)
- tag2.value = pmt.pmt_from_long(2)
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
tag3 = gr.gr_tag_t()
tag3.offset = 6
- tag3.key = pmt.pmt_string_to_symbol(tagname)
- tag3.value = pmt.pmt_from_long(4)
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
formatter_object = digital.packet_header_default(12, tagname)
header = digital.packet_headergenerator_bb(formatter_object.formatter())
@@ -127,20 +127,20 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase):
tagname = "packet_len"
tag1 = gr.gr_tag_t()
tag1.offset = 0
- tag1.key = pmt.pmt_string_to_symbol(tagname)
- tag1.value = pmt.pmt_from_long(4)
+ tag1.key = pmt.string_to_symbol(tagname)
+ tag1.value = pmt.from_long(4)
tag2 = gr.gr_tag_t()
tag2.offset = 4
- tag2.key = pmt.pmt_string_to_symbol(tagname)
- tag2.value = pmt.pmt_from_long(2)
+ tag2.key = pmt.string_to_symbol(tagname)
+ tag2.value = pmt.from_long(2)
tag3 = gr.gr_tag_t()
tag3.offset = 6
- tag3.key = pmt.pmt_string_to_symbol(tagname)
- tag3.value = pmt.pmt_from_long(4)
+ tag3.key = pmt.string_to_symbol(tagname)
+ tag3.value = pmt.from_long(4)
src = gr.vector_source_b(data, False, 1, (tag1, tag2, tag3))
formatter_object = digital.packet_header_ofdm(occupied_carriers, 1, tagname)
self.assertEqual(formatter_object.header_len(), 6)
- self.assertEqual(pmt.pmt_symbol_to_string(formatter_object.len_tag_key()), tagname)
+ self.assertEqual(pmt.symbol_to_string(formatter_object.len_tag_key()), tagname)
header = digital.packet_headergenerator_bb(formatter_object.formatter())
sink = gr.vector_sink_b()
self.tb.connect(src, header, sink)
diff --git a/gr-digital/python/qa_packet_headerparser_b.py b/gr-digital/python/qa_packet_headerparser_b.py
index 13dc46f9e3..aec2f96b57 100755
--- a/gr-digital/python/qa_packet_headerparser_b.py
+++ b/gr-digital/python/qa_packet_headerparser_b.py
@@ -23,6 +23,7 @@ import time
from gnuradio import gr, gr_unittest
try: import pmt
except: from gruel import pmt
+import blocks_swig as blocks
import digital_swig as digital
class qa_packet_headerparser_b (gr_unittest.TestCase):
@@ -44,7 +45,7 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
src = gr.vector_source_b(expected_data)
parser = digital.packet_headerparser_b(32, tagname)
- sink = gr.message_debug()
+ sink = blocks.message_debug()
self.tb.connect(src, parser)
self.tb.msg_connect(parser, "header_data", sink, "store")
diff --git a/gr-digital/python/qa_pfb_clock_sync.py b/gr-digital/python/qa_pfb_clock_sync.py
index 06c8a60ba7..4d0276bcd6 100755
--- a/gr-digital/python/qa_pfb_clock_sync.py
+++ b/gr-digital/python/qa_pfb_clock_sync.py
@@ -21,18 +21,19 @@
#
from gnuradio import gr, gr_unittest
+import filter_swig as filter
import digital_swig as digital
import random, cmath
class test_pfb_clock_sync(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# Test BPSK sync
excess_bw = 0.35
@@ -44,36 +45,36 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
osps = 1
ntaps = 11 * int(sps*nfilts)
- taps = gr.firdes.root_raised_cosine(nfilts, nfilts*sps,
- 1.0, excess_bw, ntaps)
+ taps = filter.firdes.root_raised_cosine(nfilts, nfilts*sps,
+ 1.0, excess_bw, ntaps)
self.test = digital.pfb_clock_sync_ccf(sps, loop_bw, taps,
nfilts, init_phase,
max_rate_deviation,
osps)
- data = 1000*[complex(1,0), complex(-1,0)]
+ data = 10000*[complex(1,0), complex(-1,0)]
self.src = gr.vector_source_c(data, False)
# pulse shaping interpolation filter
- rrc_taps = gr.firdes.root_raised_cosine(
+ rrc_taps = filter.firdes.root_raised_cosine(
nfilts, # gain
nfilts, # sampling rate based on 32 filters in resampler
1.0, # symbol rate
excess_bw, # excess bandwidth (roll-off factor)
ntaps)
- self.rrc_filter = gr.pfb_arb_resampler_ccf(sps, rrc_taps)
+ self.rrc_filter = filter.pfb_arb_resampler_ccf(sps, rrc_taps)
self.snk = gr.vector_sink_c()
self.tb.connect(self.src, self.rrc_filter, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[complex(-1,0), complex(1,0)]
+ expected_result = 10000*[complex(-1,0), complex(1,0)]
dst_data = self.snk.data()
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
expected_result = expected_result[len_e - Ncmp:]
@@ -82,10 +83,10 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
#for e,d in zip(expected_result, dst_data):
# print e, d
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
- def test02 (self):
+ def test02(self):
# Test real BPSK sync
excess_bw = 0.35
@@ -97,36 +98,36 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
osps = 1
ntaps = 11 * int(sps*nfilts)
- taps = gr.firdes.root_raised_cosine(nfilts, nfilts*sps,
- 1.0, excess_bw, ntaps)
+ taps = filter.firdes.root_raised_cosine(nfilts, nfilts*sps,
+ 1.0, excess_bw, ntaps)
self.test = digital.pfb_clock_sync_fff(sps, loop_bw, taps,
nfilts, init_phase,
max_rate_deviation,
osps)
- data = 1000*[1, -1]
+ data = 10000*[1, -1]
self.src = gr.vector_source_f(data, False)
# pulse shaping interpolation filter
- rrc_taps = gr.firdes.root_raised_cosine(
+ rrc_taps = filter.firdes.root_raised_cosine(
nfilts, # gain
nfilts, # sampling rate based on 32 filters in resampler
1.0, # symbol rate
excess_bw, # excess bandwidth (roll-off factor)
ntaps)
- self.rrc_filter = gr.pfb_arb_resampler_fff(sps, rrc_taps)
+ self.rrc_filter = filter.pfb_arb_resampler_fff(sps, rrc_taps)
self.snk = gr.vector_sink_f()
self.tb.connect(self.src, self.rrc_filter, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[-1, 1]
+ expected_result = 10000*[-1, 1]
dst_data = self.snk.data()
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
expected_result = expected_result[len_e - Ncmp:]
@@ -135,7 +136,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
#for e,d in zip(expected_result, dst_data):
# print e, d
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_pn_correlator_cc.py b/gr-digital/python/qa_pn_correlator_cc.py
index 377bef5feb..53633d04fa 100755
--- a/gr-digital/python/qa_pn_correlator_cc.py
+++ b/gr-digital/python/qa_pn_correlator_cc.py
@@ -22,11 +22,12 @@
from gnuradio import gr, gr_unittest
import digital_swig as digital
+import blocks_swig as blocks
class test_pn_correlator_cc(gr_unittest.TestCase):
def setUp(self):
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
def tearDown(self):
self.tb = None
@@ -39,7 +40,7 @@ class test_pn_correlator_cc(gr_unittest.TestCase):
length = 2**degree-1
src = digital.glfsr_source_f(degree)
head = gr.head(gr.sizeof_float, length*length)
- f2c = gr.float_to_complex()
+ f2c = blocks.float_to_complex()
corr = digital.pn_correlator_cc(degree)
dst = gr.vector_sink_c()
self.tb.connect(src, head, f2c, corr, dst)
diff --git a/gr-digital/python/qa_probe_density.py b/gr-digital/python/qa_probe_density.py
index c5b7e0e7c2..f42f00a7f7 100755
--- a/gr-digital/python/qa_probe_density.py
+++ b/gr-digital/python/qa_probe_density.py
@@ -34,37 +34,37 @@ class test_probe_density(gr_unittest.TestCase):
def test_001(self):
src_data = [0, 1, 0, 1]
expected_data = 1
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.probe_density_b(1)
- self.tb.connect (src, op)
- self.tb.run ()
+ self.tb.connect(src, op)
+ self.tb.run()
result_data = op.density()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_002(self):
src_data = [1, 1, 1, 1]
expected_data = 1
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.probe_density_b(0.01)
- self.tb.connect (src, op)
- self.tb.run ()
+ self.tb.connect(src, op)
+ self.tb.run()
result_data = op.density()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_003(self):
src_data = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
expected_data = 0.95243
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.probe_density_b(0.01)
- self.tb.connect (src, op)
- self.tb.run ()
+ self.tb.connect(src, op)
+ self.tb.run()
result_data = op.density()
print result_data
- self.assertAlmostEqual (expected_data, result_data, 5)
+ self.assertAlmostEqual(expected_data, result_data, 5)
if __name__ == '__main__':
gr_unittest.run(test_probe_density, "test_probe_density.xml")
diff --git a/gr-digital/python/qa_scrambler.py b/gr-digital/python/qa_scrambler.py
index f5bd612429..3127a7c1e6 100755
--- a/gr-digital/python/qa_scrambler.py
+++ b/gr-digital/python/qa_scrambler.py
@@ -25,7 +25,7 @@ import digital_swig as digital
class test_scrambler(gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
self.tb = gr.top_block()
def tearDown(self):
diff --git a/gr-digital/python/qa_simple_correlator.py b/gr-digital/python/qa_simple_correlator.py
index 124201a556..ff0faeb415 100755
--- a/gr-digital/python/qa_simple_correlator.py
+++ b/gr-digital/python/qa_simple_correlator.py
@@ -21,6 +21,8 @@
#
from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import filter_swig as filter
import digital_swig as digital
class test_simple_correlator(gr_unittest.TestCase):
@@ -40,15 +42,15 @@ class test_simple_correlator(gr_unittest.TestCase):
# Filter taps to expand the data to oversample by 8
# Just using a RRC for some basic filter shape
- taps = gr.firdes.root_raised_cosine(8, 8, 1.0, 0.5, 21)
+ taps = filter.firdes.root_raised_cosine(8, 8, 1.0, 0.5, 21)
src = gr.vector_source_b(expected_result)
frame = digital.simple_framer(4)
- unpack = gr.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
- expand = gr.interp_fir_filter_fff(8, taps)
- b2f = gr.char_to_float()
- mult2 = gr.multiply_const_ff(2)
- sub1 = gr.add_const_ff(-1)
+ unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
+ expand = filter.interp_fir_filter_fff(8, taps)
+ b2f = blocks.char_to_float()
+ mult2 = blocks.multiply_const_ff(2)
+ sub1 = blocks.add_const_ff(-1)
op = digital.simple_correlator(4)
dst = gr.vector_sink_b()
self.tb.connect(src, frame, unpack, b2f, mult2, sub1, expand)
diff --git a/gr-digital/python/qa_simple_framer.py b/gr-digital/python/qa_simple_framer.py
index 09b2d329b2..f8c894da28 100755
--- a/gr-digital/python/qa_simple_framer.py
+++ b/gr-digital/python/qa_simple_framer.py
@@ -24,15 +24,15 @@ from gnuradio import gr, gr_unittest
import digital_swig as digital
import math
-class test_simple_framer (gr_unittest.TestCase):
+class test_simple_framer(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_simple_framer_001 (self):
+ def test_simple_framer_001(self):
src_data = (0x00, 0x11, 0x22, 0x33,
0x44, 0x55, 0x66, 0x77,
0x88, 0x99, 0xaa, 0xbb,
@@ -44,15 +44,14 @@ class test_simple_framer (gr_unittest.TestCase):
0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x02, 0x88, 0x99, 0xaa, 0xbb, 0x55,
0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x03, 0xcc, 0xdd, 0xee, 0xff, 0x55)
- src = gr.vector_source_b (src_data)
- op = digital.simple_framer (4)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
-
+ src = gr.vector_source_b(src_data)
+ op = digital.simple_framer(4)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
if __name__ == '__main__':
gr_unittest.run(test_simple_framer, "test_simple_framer.xml")
diff --git a/gr-digital/python/qam.py b/gr-digital/python/qam.py
index 6834e1945a..518be78941 100644
--- a/gr-digital/python/qam.py
+++ b/gr-digital/python/qam.py
@@ -1,5 +1,5 @@
#
-# Copyright 2005,2006,2011 Free Software Foundation, Inc.
+# Copyright 2005,2006,2011,2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -27,10 +27,11 @@ from math import pi, sqrt, log
from gnuradio import gr
from generic_mod_demod import generic_mod, generic_demod
+from generic_mod_demod import shared_mod_args, shared_demod_args
from utils.gray_code import gray_code
from utils import mod_codes
import modulation_utils
-import digital_swig
+import digital_swig as digital
# Default number of points in constellation.
_def_constellation_points = 16
@@ -165,21 +166,23 @@ def qam_constellation(constellation_points=_def_constellation_points,
else:
raise ValueError("Mod code is not implemented for QAM")
if differential:
- points = make_differential_constellation(constellation_points, gray_coded)
+ points = make_differential_constellation(constellation_points, gray_coded=False)
else:
points = make_non_differential_constellation(constellation_points, gray_coded)
side = int(sqrt(constellation_points))
width = 2.0/(side-1)
+
# No pre-diff code
# Should add one so that we can gray-code the quadrant bits too.
pre_diff_code = []
if not large_ampls_to_corners:
- constellation = digital_swig.constellation_rect(points, pre_diff_code, 4,
+ constellation = digital.constellation_rect(points, pre_diff_code, 4,
side, side, width, width)
else:
sector_values = large_ampls_to_corners_mapping(side, points, width)
- constellation = digital_swig.constellation_expl_rect(
+ constellation = digital.constellation_expl_rect(
points, pre_diff_code, 4, side, side, width, width, sector_values)
+
return constellation
def find_closest_point(p, qs):
@@ -257,6 +260,19 @@ def large_ampls_to_corners_mapping(side, points, width):
# /////////////////////////////////////////////////////////////////////////////
class qam_mod(generic_mod):
+ """
+ Hierarchical block for RRC-filtered QAM modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ constellation_points: Number of constellation points (must be a power of four) (integer).
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ differential: Whether to use differential encoding (boolean).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_mod_args
def __init__(self, constellation_points=_def_constellation_points,
differential=_def_differential,
@@ -291,13 +307,25 @@ class qam_mod(generic_mod):
# /////////////////////////////////////////////////////////////////////////////
class qam_demod(generic_demod):
+ """
+ Hierarchical block for RRC-filtered QAM modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ constellation_points: Number of constellation points (must be a power of four) (integer).
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ differential: Whether to use differential encoding (boolean).
+ """
+ # See generic_demod for additional arguments
+ __doc__ += shared_mod_args
def __init__(self, constellation_points=_def_constellation_points,
differential=_def_differential,
mod_code=_def_mod_code,
large_ampls_to_corner = False,
*args, **kwargs):
-
"""
Hierarchical block for RRC-filtered QAM modulation.
diff --git a/gr-digital/python/qpsk.py b/gr-digital/python/qpsk.py
index be21fd76f1..859d981367 100644
--- a/gr-digital/python/qpsk.py
+++ b/gr-digital/python/qpsk.py
@@ -27,50 +27,59 @@ Demodulation is not included since the generic_mod_demod
from gnuradio import gr
from gnuradio.digital.generic_mod_demod import generic_mod, generic_demod
-import digital_swig
+from gnuradio.digital.generic_mod_demod import shared_mod_args, shared_demod_args
+from utils import mod_codes
+import digital_swig as digital
import modulation_utils
-# Default number of points in constellation.
-_def_constellation_points = 4
-# Whether gray coding is used.
-_def_gray_coded = True
+# The default encoding (e.g. gray-code, set-partition)
+_def_mod_code = mod_codes.GRAY_CODE
# /////////////////////////////////////////////////////////////////////////////
# QPSK constellation
# /////////////////////////////////////////////////////////////////////////////
-def qpsk_constellation(m=_def_constellation_points):
- if m != _def_constellation_points:
- raise ValueError("QPSK can only have 4 constellation points.")
- return digital_swig.constellation_qpsk()
+def qpsk_constellation(mod_code=_def_mod_code):
+ """
+ Creates a QPSK constellation.
+ """
+ if mod_code != mod_codes.GRAY_CODE:
+ raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
+ return digital.constellation_qpsk()
# /////////////////////////////////////////////////////////////////////////////
# QPSK modulator
# /////////////////////////////////////////////////////////////////////////////
class qpsk_mod(generic_mod):
-
- def __init__(self, constellation_points=_def_constellation_points,
- gray_coded=_def_gray_coded,
- *args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered QPSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_mod block for list of parameters.
- """
-
- constellation_points = _def_constellation_points
- constellation = digital_swig.constellation_qpsk()
- if constellation_points != 4:
- raise ValueError("QPSK can only have 4 constellation points.")
- if not gray_coded:
- raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
+ """
+ Hierarchical block for RRC-filtered QPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ differential: Whether to use differential encoding (boolean).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_mod_args
+
+ def __init__(self, mod_code=_def_mod_code, differential=False, *args, **kwargs):
+ pre_diff_code = True
+ if not differential:
+ constellation = digital.constellation_qpsk()
+ if mod_code != mod_codes.GRAY_CODE:
+ raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
+ else:
+ constellation = digital.constellation_dqpsk()
+ if mod_code not in set([mod_codes.GRAY_CODE, mod_codes.NO_CODE]):
+ raise ValueError("That mod_code is not supported for DQPSK mod/demod.")
+ if mod_code == mod_codes.NO_CODE:
+ pre_diff_code = False
+
super(qpsk_mod, self).__init__(constellation=constellation,
- gray_coded=gray_coded,
+ pre_diff_code=pre_diff_code,
*args, **kwargs)
@@ -80,24 +89,35 @@ class qpsk_mod(generic_mod):
# /////////////////////////////////////////////////////////////////////////////
class qpsk_demod(generic_demod):
-
- def __init__(self, constellation_points=_def_constellation_points,
+ """
+ Hierarchical block for RRC-filtered QPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ differential: Whether to use differential encoding (boolean).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_demod_args
+
+ def __init__(self, mod_code=_def_mod_code, differential=False,
*args, **kwargs):
+ pre_diff_code = True
+ if not differential:
+ constellation = digital.constellation_qpsk()
+ if mod_code != mod_codes.GRAY_CODE:
+ raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
+ else:
+ constellation = digital.constellation_dqpsk()
+ if mod_code not in set([mod_codes.GRAY_CODE, mod_codes.NO_CODE]):
+ raise ValueError("That mod_code is not supported for DQPSK mod/demod.")
+ if mod_code == mod_codes.NO_CODE:
+ pre_diff_code = False
- """
- Hierarchical block for RRC-filtered QPSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_demod block for list of parameters.
- """
-
- constellation_points = _def_constellation_points
- constellation = digital_swig.constellation_qpsk()
- if constellation_points != 4:
- raise ValueError('Number of constellation points must be 4 for QPSK.')
super(qpsk_demod, self).__init__(constellation=constellation,
+ pre_diff_code=pre_diff_code,
*args, **kwargs)
@@ -106,36 +126,30 @@ class qpsk_demod(generic_demod):
# DQPSK constellation
# /////////////////////////////////////////////////////////////////////////////
-def dqpsk_constellation(m=_def_constellation_points):
- if m != _def_constellation_points:
- raise ValueError("DQPSK can only have 4 constellation points.")
- return digital_swig.constellation_dqpsk()
+def dqpsk_constellation(mod_code=_def_mod_code):
+ if mod_code != mod_codes.GRAY_CODE:
+ raise ValueError("The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.")
+ return digital.constellation_dqpsk()
# /////////////////////////////////////////////////////////////////////////////
# DQPSK modulator
# /////////////////////////////////////////////////////////////////////////////
-class dqpsk_mod(generic_mod):
-
- def __init__(self, constellation_points=_def_constellation_points,
- gray_coded=_def_gray_coded,
- differential=True, *args, **kwargs):
- """
- Hierarchical block for RRC-filtered DQPSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_mod block for list of parameters.
- """
-
- constellation_points = _def_constellation_points
- constellation = digital_swig.constellation_dqpsk()
- if constellation_points != 4:
- raise ValueError('Number of constellation points must be 4 for DQPSK.')
- super(dqpsk_mod, self).__init__(constellation=constellation,
- gray_coded=gray_coded,
- differential=True,
+class dqpsk_mod(qpsk_mod):
+ """
+ Hierarchical block for RRC-filtered DQPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_mod_args
+
+ def __init__(self, mod_code=_def_mod_code, *args, **kwargs):
+ super(dqpsk_mod, self).__init__(mod_code,
*args, **kwargs)
# /////////////////////////////////////////////////////////////////////////////
@@ -143,25 +157,21 @@ class dqpsk_mod(generic_mod):
#
# /////////////////////////////////////////////////////////////////////////////
-class dqpsk_demod(generic_demod):
-
- def __init__(self, constellation_points=_def_constellation_points,
- differential=True, *args, **kwargs):
-
- """
- Hierarchical block for RRC-filtered DQPSK modulation.
-
- The input is a byte stream (unsigned char) and the
- output is the complex modulated signal at baseband.
-
- See generic_demod block for list of parameters.
- """
- constellation_points = _def_constellation_points
- constellation = digital_swig.constellation_dqpsk()
- if constellation_points != 4:
- raise ValueError('Number of constellation points must be 4 for DQPSK.')
- super(dqpsk_demod, self).__init__(constellation=constellation,
- differential=True,
+class dqpsk_demod(qpsk_demod):
+ """
+ Hierarchical block for RRC-filtered DQPSK modulation.
+
+ The input is a byte stream (unsigned char) and the
+ output is the complex modulated signal at baseband.
+
+ Args:
+ mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE).
+ """
+ # See generic_mod for additional arguments
+ __doc__ += shared_demod_args
+
+ def __init__(self, mod_code=_def_mod_code, *args, **kwargs):
+ super(dqpsk_demod, self).__init__(mod_code,
*args, **kwargs)
#
@@ -173,4 +183,3 @@ modulation_utils.add_type_1_constellation('qpsk', qpsk_constellation)
modulation_utils.add_type_1_mod('dqpsk', dqpsk_mod)
modulation_utils.add_type_1_demod('dqpsk', dqpsk_demod)
modulation_utils.add_type_1_constellation('dqpsk', dqpsk_constellation)
-
diff --git a/gr-digital/python/utils/mod_codes.py b/gr-digital/python/utils/mod_codes.py
index caacda5cc6..f55fe41b8b 100644
--- a/gr-digital/python/utils/mod_codes.py
+++ b/gr-digital/python/utils/mod_codes.py
@@ -20,6 +20,7 @@
# Boston, MA 02110-1301, USA.
#
+# Constants used to represent what coding to use.
GRAY_CODE = 'gray'
SET_PARTITION_CODE = 'set-partition'
NO_CODE = 'none'
diff --git a/gr-digital/python/utils/tagged_streams.py b/gr-digital/python/utils/tagged_streams.py
index 7e69aa20be..6a956aa642 100644
--- a/gr-digital/python/utils/tagged_streams.py
+++ b/gr-digital/python/utils/tagged_streams.py
@@ -8,8 +8,8 @@ def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
for offset, length in zip(offsets, lengths):
tag = gr.gr_tag_t()
tag.offset = offset/vlen
- tag.key = pmt.pmt_string_to_symbol(tagname)
- tag.value = pmt.pmt_from_long(length/vlen)
+ tag.key = pmt.string_to_symbol(tagname)
+ tag.value = pmt.from_long(length/vlen)
tags.append(tag)
return tags
@@ -35,14 +35,14 @@ def vectors_to_strings(data, tags, lengthtagname):
def count_bursts(data, tags, lengthtagname, vlen=1):
lengthtags = [t for t in tags
- if pmt.pmt_symbol_to_string(t.key) == lengthtagname]
+ if pmt.symbol_to_string(t.key) == lengthtagname]
lengths = {}
for tag in lengthtags:
if tag.offset in lengths:
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(lengthtagname, tag.offset))
- lengths[tag.offset] = pmt.pmt_to_long(tag.value)*vlen
+ lengths[tag.offset] = pmt.to_long(tag.value)*vlen
in_burst = False
in_packet = False
packet_length = None
@@ -70,14 +70,14 @@ def count_bursts(data, tags, lengthtagname, vlen=1):
def vectors_to_packets(data, tags, lengthtagname, vlen=1):
lengthtags = [t for t in tags
- if pmt.pmt_symbol_to_string(t.key) == lengthtagname]
+ if pmt.symbol_to_string(t.key) == lengthtagname]
lengths = {}
for tag in lengthtags:
if tag.offset in lengths:
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(lengthtagname, tag.offset))
- lengths[tag.offset] = pmt.pmt_to_long(tag.value)*vlen
+ lengths[tag.offset] = pmt.to_long(tag.value)*vlen
if 0 not in lengths:
raise ValueError("There is no tag with key {0} and an offset of 0"
.format(lengthtagname))
@@ -105,8 +105,8 @@ def packets_to_vectors(packets, lengthtagname, vlen=1):
data.extend(packet)
tag = gr.gr_tag_t()
tag.offset = offset/vlen
- tag.key = pmt.pmt_string_to_symbol(lengthtagname)
- tag.value = pmt.pmt_from_long(len(packet)/vlen)
+ tag.key = pmt.string_to_symbol(lengthtagname)
+ tag.value = pmt.from_long(len(packet)/vlen)
tags.append(tag)
offset = offset + len(packet)
return data, tags