diff options
Diffstat (limited to 'gr-digital/python')
52 files changed, 1307 insertions, 1040 deletions
diff --git a/gr-digital/python/CMakeLists.txt b/gr-digital/python/CMakeLists.txt index 9be5e1ae75..7f35e3c2be 100644 --- a/gr-digital/python/CMakeLists.txt +++ b/gr-digital/python/CMakeLists.txt @@ -67,8 +67,14 @@ if(ENABLE_TESTING) list(APPEND GR_TEST_PYTHON_DIRS ${CMAKE_BINARY_DIR}/gr-digital/python ${CMAKE_BINARY_DIR}/gr-digital/swig + ${CMAKE_BINARY_DIR}/gr-filter/python + ${CMAKE_BINARY_DIR}/gr-filter/swig + ${CMAKE_BINARY_DIR}/gr-analog/python + ${CMAKE_BINARY_DIR}/gr-analog/swig + ${CMAKE_BINARY_DIR}/gr-blocks/python + ${CMAKE_BINARY_DIR}/gr-blocks/swig ) -list(APPEND GR_TEST_TARGET_DEPS gnuradio-digital gnuradio-filter gnuradio-fft) +list(APPEND GR_TEST_TARGET_DEPS gnuradio-digital gnuradio-filter gnuradio-fft gnuradio-analog) include(GrTest) file(GLOB py_qa_test_files "qa_*.py") diff --git a/gr-digital/python/__init__.py b/gr-digital/python/__init__.py index 3fdbca769a..962f210324 100644 --- a/gr-digital/python/__init__.py +++ b/gr-digital/python/__init__.py @@ -19,8 +19,7 @@ # ''' -This is the gr-digital package. It contains all of the blocks, -utilities, and examples for doing digital modulation and demodulation. +Blocks and utilities for digital modulation and demodulation. ''' # The presence of this file turns this directory into a Python package @@ -28,6 +27,7 @@ utilities, and examples for doing digital modulation and demodulation. from digital_swig import * from psk import * from qam import * +from qamlike import * from bpsk import * from qpsk import * from gmsk import * diff --git a/gr-digital/python/bpsk.py b/gr-digital/python/bpsk.py index 0d8f05c4c1..57cf2534f4 100644 --- a/gr-digital/python/bpsk.py +++ b/gr-digital/python/bpsk.py @@ -28,21 +28,15 @@ from cmath import exp from gnuradio import gr from gnuradio.digital.generic_mod_demod import generic_mod, generic_demod +from gnuradio.digital.generic_mod_demod import shared_mod_args, shared_demod_args import digital_swig import modulation_utils -# Default number of points in constellation. -_def_constellation_points = 2 -# Whether differential coding is used. -_def_differential = False - # ///////////////////////////////////////////////////////////////////////////// # BPSK constellation # ///////////////////////////////////////////////////////////////////////////// -def bpsk_constellation(m=_def_constellation_points): - if m != _def_constellation_points: - raise ValueError("BPSK can only have 2 constellation points.") +def bpsk_constellation(): return digital_swig.constellation_bpsk() # ///////////////////////////////////////////////////////////////////////////// @@ -50,115 +44,102 @@ def bpsk_constellation(m=_def_constellation_points): # ///////////////////////////////////////////////////////////////////////////// class bpsk_mod(generic_mod): + """ + Hierarchical block for RRC-filtered BPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + + Args: + mod_code: Argument is not used. It exists purely to simplify generation of the block in grc. + differential: Whether to use differential encoding (boolean). + """ + # See generic_mod for additional arguments + __doc__ += shared_mod_args + + def __init__(self, mod_code=None, differential=False, *args, **kwargs): - def __init__(self, constellation_points=_def_constellation_points, - differential=False, *args, **kwargs): - - """ - Hierarchical block for RRC-filtered BPSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_mod block for list of parameters. - """ - - constellation_points = _def_constellation_points constellation = digital_swig.constellation_bpsk() - if constellation_points != 2: - raise ValueError('Number of constellation points must be 2 for BPSK.') super(bpsk_mod, self).__init__(constellation=constellation, differential=differential, *args, **kwargs) - + + # ///////////////////////////////////////////////////////////////////////////// # BPSK demodulator # # ///////////////////////////////////////////////////////////////////////////// class bpsk_demod(generic_demod): - - def __init__(self, constellation_points=_def_constellation_points, - differential=False, *args, **kwargs): - - """ - Hierarchical block for RRC-filtered BPSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_demod block for list of parameters. - """ - - constellation_points = _def_constellation_points + """ + Hierarchical block for RRC-filtered BPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + mod_code: Argument is not used. It exists purely to simplify generation of the block in grc. + differential: whether to use differential encoding (boolean) + """ + # See generic_demod for additional arguments + __doc__ += shared_demod_args + + def __init__(self, mod_code=None, differential=False, *args, **kwargs): constellation = digital_swig.constellation_bpsk() - if constellation_points != 2: - raise ValueError('Number of constellation points must be 2 for BPSK.') super(bpsk_demod, self).__init__(constellation=constellation, differential=differential, *args, **kwargs) - +#bpsk_demod.__doc__ += shared_demod_args # ///////////////////////////////////////////////////////////////////////////// # DBPSK constellation # ///////////////////////////////////////////////////////////////////////////// -def dbpsk_constellation(m=_def_constellation_points): - if m != _def_constellation_points: - raise ValueError("DBPSK can only have 2 constellation points.") +def dbpsk_constellation(): return digital_swig.constellation_dbpsk() # ///////////////////////////////////////////////////////////////////////////// # DBPSK modulator # ///////////////////////////////////////////////////////////////////////////// -class dbpsk_mod(generic_mod): - - def __init__(self, constellation_points=_def_constellation_points, - differential=True, *args, **kwargs): - - """ - Hierarchical block for RRC-filtered DBPSK modulation. +class dbpsk_mod(bpsk_mod): + """ + Hierarchical block for RRC-filtered DBPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + mod_code: Argument is not used. It exists purely to simplify generation of the block in grc. + """ + # See generic_mod for additional arguments + __doc__ += shared_mod_args - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. + def __init__(self, mod_code=None, *args, **kwargs): - See generic_mod block for list of parameters. - """ - - constellation_points = _def_constellation_points - constellation = digital_swig.constellation_bpsk() - if constellation_points != 2: - raise ValueError('Number of constellation points must be 2 for DBPSK.') - super(dbpsk_mod, self).__init__(constellation=constellation, - differential=True, - *args, **kwargs) + super(dbpsk_mod, self).__init__(*args, **kwargs) # ///////////////////////////////////////////////////////////////////////////// # DBPSK demodulator # # ///////////////////////////////////////////////////////////////////////////// -class dbpsk_demod(generic_demod): - - def __init__(self, constellation_points=_def_constellation_points, - differential=True, *args, **kwargs): - - """ - Hierarchical block for RRC-filtered DBPSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_demod block for list of parameters. - """ - - constellation_points = _def_constellation_points - constellation = digital_swig.constellation_bpsk() - if constellation_points != 2: - raise ValueError('Number of constellation points must be 2 for DBPSK.') - super(dbpsk_demod, self).__init__(constellation=constellation, - differential=True, - *args, **kwargs) +class dbpsk_demod(bpsk_demod): + """ + Hierarchical block for RRC-filtered DBPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + mod_code: Argument is not used. It exists purely to simplify generation of the block in grc. + """ + # See generic_demod for additional arguments + __doc__ += shared_demod_args + + def __init__(self, mod_code=None, *args, **kwargs): + + super(dbpsk_demod, self).__init__(*args, **kwargs) # # Add these to the mod/demod registry diff --git a/gr-digital/python/cpm.py b/gr-digital/python/cpm.py index 05032336d4..b27fb098f5 100644 --- a/gr-digital/python/cpm.py +++ b/gr-digital/python/cpm.py @@ -24,7 +24,9 @@ # See gnuradio-examples/python/digital for examples -from gnuradio import gr, blks2 +from gnuradio import gr, filter +from gnuradio import analog +from gnuradio import blocks from math import pi import numpy @@ -49,6 +51,31 @@ _def_log = False # ///////////////////////////////////////////////////////////////////////////// class cpm_mod(gr.hier_block2): + """ + Hierarchical block for Continuous Phase modulation. + + The input is a byte stream (unsigned char) representing packed + bits and the output is the complex modulated signal at baseband. + + See Proakis for definition of generic CPM signals: + s(t)=exp(j phi(t)) + phi(t)= 2 pi h int_0^t f(t') dt' + f(t)=sum_k a_k g(t-kT) + (normalizing assumption: int_0^infty g(t) dt = 1/2) + + Args: + samples_per_symbol: samples per baud >= 2 (integer) + bits_per_symbol: bits per symbol (integer) + h_numerator: numerator of modulation index (integer) + h_denominator: denominator of modulation index (numerator and denominator must be relative primes) (integer) + cpm_type: supported types are: 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL (integer) + bt: bandwidth symbol time product for GMSK (float) + symbols_per_pulse: shaping pulse duration in symbols (integer) + generic_taps: define a generic CPM pulse shape (sum = samples_per_symbol/2) (list/array of floats) + verbose: Print information about modulator? (boolean) + debug: Print modulation data to files? (boolean) + """ + def __init__(self, samples_per_symbol=_def_samples_per_symbol, bits_per_symbol=_def_bits_per_symbol, @@ -60,42 +87,6 @@ class cpm_mod(gr.hier_block2): generic_taps=_def_generic_taps, verbose=_def_verbose, log=_def_log): - """ - Hierarchical block for Continuous Phase - modulation. - - The input is a byte stream (unsigned char) - representing packed bits and the - output is the complex modulated signal at baseband. - - See Proakis for definition of generic CPM signals: - s(t)=exp(j phi(t)) - phi(t)= 2 pi h int_0^t f(t') dt' - f(t)=sum_k a_k g(t-kT) - (normalizing assumption: int_0^infty g(t) dt = 1/2) - - @param samples_per_symbol: samples per baud >= 2 - @type samples_per_symbol: integer - @param bits_per_symbol: bits per symbol - @type bits_per_symbol: integer - @param h_numerator: numerator of modulation index - @type h_numerator: integer - @param h_denominator: denominator of modulation index (numerator and denominator must be relative primes) - @type h_denominator: integer - @param cpm_type: supported types are: 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL - @type cpm_type: integer - @param bt: bandwidth symbol time product for GMSK - @type bt: float - @param symbols_per_pulse: shaping pulse duration in symbols - @type symbols_per_pulse: integer - @param generic_taps: define a generic CPM pulse shape (sum = samples_per_symbol/2) - @type generic_taps: array of floats - - @param verbose: Print information about modulator? - @type verbose: bool - @param debug: Print modulation data to files? - @type debug: bool - """ gr.hier_block2.__init__(self, "cpm_mod", gr.io_signature(1, 1, gr.sizeof_char), # Input signature @@ -127,17 +118,17 @@ class cpm_mod(gr.hier_block2): sensitivity = 2 * pi * h_numerator / h_denominator / samples_per_symbol # Unpack Bytes into bits_per_symbol groups - self.B2s = gr.packed_to_unpacked_bb(bits_per_symbol,gr.GR_MSB_FIRST) + self.B2s = blocks.packed_to_unpacked_bb(bits_per_symbol,gr.GR_MSB_FIRST) # Turn it into symmetric PAM data. - self.pam = gr.chunks_to_symbols_bf(self.sym_alphabet,1) + self.pam = digital_swig.chunks_to_symbols_bf(self.sym_alphabet,1) # Generate pulse (sum of taps = samples_per_symbol/2) if cpm_type == 0: # CPFSK self.taps= (1.0/self._symbols_per_pulse/2,) * self.ntaps elif cpm_type == 1: # GMSK - gaussian_taps = gr.firdes.gaussian( + gaussian_taps = filter.firdes.gaussian( 1.0/2, # gain samples_per_symbol, # symbol_rate bt, # bandwidth * symbol time @@ -153,10 +144,10 @@ class cpm_mod(gr.hier_block2): else: raise TypeError, ("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,)) - self.filter = blks2.pfb_arb_resampler_fff(samples_per_symbol, self.taps) + self.filter = filter.pfb.arb_resampler_fff(samples_per_symbol, self.taps) # FM modulation - self.fmmod = gr.frequency_modulator_fc(sensitivity) + self.fmmod = analog.frequency_modulator_fc(sensitivity) if verbose: self._print_verbage() @@ -205,13 +196,13 @@ class cpm_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.B2s, - gr.file_sink(gr.sizeof_float, "symbols.dat")) + blocks.file_sink(gr.sizeof_float, "symbols.dat")) self.connect(self.pam, - gr.file_sink(gr.sizeof_float, "pam.dat")) + blocks.file_sink(gr.sizeof_float, "pam.dat")) self.connect(self.filter, - gr.file_sink(gr.sizeof_float, "filter.dat")) + blocks.file_sink(gr.sizeof_float, "filter.dat")) self.connect(self.fmmod, - gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) + blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) def add_options(parser): diff --git a/gr-digital/python/crc.py b/gr-digital/python/crc.py index 198ab059f5..e228faaa98 100644 --- a/gr-digital/python/crc.py +++ b/gr-digital/python/crc.py @@ -20,11 +20,11 @@ # from gnuradio import gru -import digital_swig +import digital_swig as digital import struct def gen_and_append_crc32(s): - crc = digital_swig.crc32(s) + crc = digital.crc32(s) return s + struct.pack(">I", gru.hexint(crc) & 0xFFFFFFFF) def check_crc32(s): @@ -32,7 +32,7 @@ def check_crc32(s): return (False, '') msg = s[:-4] #print "msg = '%s'" % (msg,) - actual = digital_swig.crc32(msg) + actual = digital.crc32(msg) (expected,) = struct.unpack(">I", s[-4:]) # print "actual =", hex(actual), "expected =", hex(expected) return (actual == expected, msg) diff --git a/gr-digital/python/digital_voice.py.real b/gr-digital/python/digital_voice.py.real new file mode 100644 index 0000000000..4a2ef7721f --- /dev/null +++ b/gr-digital/python/digital_voice.py.real @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +""" +Digital voice Tx and Rx using GSM 13kbit vocoder and GMSK. + +Runs channel at 32kbit/sec. Currently uses fake channel coding, +but there's room for a rate 1/2 coder. +""" + +from gnuradio import gr, gru +from gnuradio import blocks +from gnuradio.blksimpl.gmsk import gmsk_mod, gmsk_demod + +from gnuradio.vocoder import gsm_full_rate + +# Size of gsm full rate speech encoder output packet in bytes + +GSM_FRAME_SIZE = 33 + +# Size of packet in bytes that we send to GMSK modulator: +# +# Target: 256kS/sec air rate. +# +# 256kS 1 sym 1 bit 1 byte 0.020 sec 80 bytes +# ---- * ----- * ----- * ------ * --------- = -------- +# sec 8 S 1 sym 8 bits frame frame +# +# gr_simple_framer add 10 bytes of overhead. + +AIR_FRAME_SIZE = 70 + + +class digital_voice_tx(gr.hier_block): + """ + Hierarchical block for digital voice tranmission. + + The input is 8kS/sec floating point audio in the range [-1,+1] + The output is 256kS/sec GMSK modulated complex baseband signal in the range [-1,+1]. + """ + def __init__(self, fg): + samples_per_symbol = 8 + symbol_rate = 32000 + bt = 0.3 # Gaussian filter bandwidth * symbol time + + src_scale = blocks.multiply_const_ff(32767) + f2s = blocks.float_to_short() + voice_coder = gsm_full_rate.encode_sp() + + channel_coder = gr.fake_channel_encoder_pp(GSM_FRAME_SIZE, AIR_FRAME_SIZE) + p2s = gr.parallel_to_serial(gr.sizeof_char, AIR_FRAME_SIZE) + + mod = gmsk_mod(fg, sps=samples_per_symbol, + symbol_rate=symbol_rate, bt=bt, + p_size=AIR_FRAME_SIZE) + + fg.connect(src_scale, f2s, voice_coder, channel_coder, p2s, mod) + gr.hier_block.__init__(self, fg, src_scale, mod) + + +class digital_voice_rx(gr.hier_block): + """ + Hierarchical block for digital voice reception. + + The input is 256kS/sec GMSK modulated complex baseband signal. + The output is 8kS/sec floating point audio in the range [-1,+1] + """ + def __init__(self, fg): + samples_per_symbol = 8 + symbol_rate = 32000 + + demod = gmsk_demod(fg, sps=samples_per_symbol, + symbol_rate=symbol_rate, + p_size=AIR_FRAME_SIZE) + + s2p = gr.serial_to_parallel(gr.sizeof_char, AIR_FRAME_SIZE) + channel_decoder = gr.fake_channel_decoder_pp(AIR_FRAME_SIZE, GSM_FRAME_SIZE) + + voice_decoder = gsm_full_rate.decode_ps() + s2f = blocks.short_to_float () + sink_scale = blocks.multiply_const_ff(1.0/32767.) + + fg.connect(demod, s2p, channel_decoder, voice_decoder, s2f, sink_scale) + gr.hier_block.__init__(self, fg, demod, sink_scale) diff --git a/gr-digital/python/generic_mod_demod.py b/gr-digital/python/generic_mod_demod.py index a6c4f3445a..b812fe1c37 100644 --- a/gr-digital/python/generic_mod_demod.py +++ b/gr-digital/python/generic_mod_demod.py @@ -31,6 +31,21 @@ from utils import mod_codes import digital_swig as digital import math +try: + from gnuradio import blocks +except ImportError: + import blocks_swig as blocks + +try: + from gnuradio import filter +except ImportError: + import filter_swig as filter + +try: + from gnuradio import analog +except ImportError: + import analog_swig as analog + # default values (used in __init__ and add_options) _def_samples_per_symbol = 2 _def_excess_bw = 0.35 @@ -74,42 +89,40 @@ def add_common_options(parser): # ///////////////////////////////////////////////////////////////////////////// class generic_mod(gr.hier_block2): + """ + Hierarchical block for RRC-filtered differential generic modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + constellation: determines the modulation type (gnuradio.digital.digital_constellation) + samples_per_symbol: samples per baud >= 2 (float) + differential: whether to use differential encoding (boolean) + pre_diff_code: whether to use apply a pre-differential mapping (boolean) + excess_bw: Root-raised cosine filter excess bandwidth (float) + verbose: Print information about modulator? (boolean) + log: Log modulation data to files? (boolean) + """ def __init__(self, constellation, - samples_per_symbol=_def_samples_per_symbol, differential=_def_differential, + samples_per_symbol=_def_samples_per_symbol, + pre_diff_code=True, excess_bw=_def_excess_bw, - gray_coded=True, verbose=_def_verbose, log=_def_log): - """ - Hierarchical block for RRC-filtered differential generic modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - @param constellation: determines the modulation type - @type constellation: gnuradio.digital.gr_constellation - @param samples_per_symbol: samples per baud >= 2 - @type samples_per_symbol: float - @param excess_bw: Root-raised cosine filter excess bandwidth - @type excess_bw: float - @param gray_coded: turn gray coding on/off - @type gray_coded: bool - @param verbose: Print information about modulator? - @type verbose: bool - @param log: Log modulation data to files? - @type log: bool - """ gr.hier_block2.__init__(self, "generic_mod", gr.io_signature(1, 1, gr.sizeof_char), # Input signature gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature - self._constellation = constellation.base() + self._constellation = constellation self._samples_per_symbol = samples_per_symbol self._excess_bw = excess_bw self._differential = differential + # Only apply a predifferential coding if the constellation also supports it. + self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code() if self._samples_per_symbol < 2: raise TypeError, ("sbp must be >= 2, is %f" % self._samples_per_symbol) @@ -118,9 +131,9 @@ class generic_mod(gr.hier_block2): # turn bytes into k-bit vectors self.bytes2chunks = \ - gr.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST) + blocks.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST) - if gray_coded == True: + if self.pre_diff_code: self.symbol_mapper = digital.map_bb(self._constellation.pre_diff_code()) if differential: @@ -131,23 +144,23 @@ class generic_mod(gr.hier_block2): # pulse shaping filter nfilts = 32 ntaps = nfilts * 11 * int(self._samples_per_symbol) # make nfilts filters of ntaps each - self.rrc_taps = gr.firdes.root_raised_cosine( + self.rrc_taps = filter.firdes.root_raised_cosine( nfilts, # gain nfilts, # sampling rate based on 32 filters in resampler 1.0, # symbol rate self._excess_bw, # excess bandwidth (roll-off factor) ntaps) - self.rrc_filter = gr.pfb_arb_resampler_ccf(self._samples_per_symbol, - self.rrc_taps) + self.rrc_filter = filter.pfb_arb_resampler_ccf(self._samples_per_symbol, + self.rrc_taps) # Connect - blocks = [self, self.bytes2chunks] - if gray_coded == True: - blocks.append(self.symbol_mapper) + self._blocks = [self, self.bytes2chunks] + if self.pre_diff_code: + self._blocks.append(self.symbol_mapper) if differential: - blocks.append(self.diffenc) - blocks += [self.chunks2symbols, self.rrc_filter, self] - self.connect(*blocks) + self._blocks.append(self.diffenc) + self._blocks += [self.chunks2symbols, self.rrc_filter, self] + self.connect(*self._blocks) if verbose: self._print_verbage() @@ -185,17 +198,17 @@ class generic_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.bytes2chunks, - gr.file_sink(gr.sizeof_char, "tx_bytes2chunks.8b")) - if self._constellation.apply_pre_diff_code(): + blocks.file_sink(gr.sizeof_char, "tx_bytes2chunks.8b")) + if self.pre_diff_code: self.connect(self.symbol_mapper, - gr.file_sink(gr.sizeof_char, "tx_symbol_mapper.8b")) + blocks.file_sink(gr.sizeof_char, "tx_symbol_mapper.8b")) if self._differential: self.connect(self.diffenc, - gr.file_sink(gr.sizeof_char, "tx_diffenc.8b")) + blocks.file_sink(gr.sizeof_char, "tx_diffenc.8b")) self.connect(self.chunks2symbols, - gr.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.32fc")) self.connect(self.rrc_filter, - gr.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.32fc")) # ///////////////////////////////////////////////////////////////////////////// @@ -206,48 +219,41 @@ class generic_mod(gr.hier_block2): # ///////////////////////////////////////////////////////////////////////////// class generic_demod(gr.hier_block2): + """ + Hierarchical block for RRC-filtered differential generic demodulation. + + The input is the complex modulated signal at baseband. + The output is a stream of bits packed 1 bit per byte (LSB) + + Args: + constellation: determines the modulation type (gnuradio.digital.digital_constellation) + samples_per_symbol: samples per baud >= 2 (float) + differential: whether to use differential encoding (boolean) + pre_diff_code: whether to use apply a pre-differential mapping (boolean) + excess_bw: Root-raised cosine filter excess bandwidth (float) + freq_bw: loop filter lock-in bandwidth (float) + timing_bw: timing recovery loop lock-in bandwidth (float) + phase_bw: phase recovery loop bandwidth (float) + verbose: Print information about modulator? (boolean) + log: Log modulation data to files? (boolean) + """ def __init__(self, constellation, - samples_per_symbol=_def_samples_per_symbol, differential=_def_differential, + samples_per_symbol=_def_samples_per_symbol, + pre_diff_code=True, excess_bw=_def_excess_bw, - gray_coded=True, freq_bw=_def_freq_bw, timing_bw=_def_timing_bw, phase_bw=_def_phase_bw, verbose=_def_verbose, log=_def_log): - """ - Hierarchical block for RRC-filtered differential generic demodulation. - - The input is the complex modulated signal at baseband. - The output is a stream of bits packed 1 bit per byte (LSB) - - @param constellation: determines the modulation type - @type constellation: gnuradio.digital.gr_constellation - @param samples_per_symbol: samples per symbol >= 2 - @type samples_per_symbol: float - @param excess_bw: Root-raised cosine filter excess bandwidth - @type excess_bw: float - @param gray_coded: turn gray coding on/off - @type gray_coded: bool - @param freq_bw: loop filter lock-in bandwidth - @type freq_bw: float - @param timing_bw: timing recovery loop lock-in bandwidth - @type timing_bw: float - @param phase_bw: phase recovery loop bandwidth - @type phase_bw: float - @param verbose: Print information about modulator? - @type verbose: bool - @param debug: Print modualtion data to files? - @type debug: bool - """ gr.hier_block2.__init__(self, "generic_demod", gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature gr.io_signature(1, 1, gr.sizeof_char)) # Output signature - self._constellation = constellation.base() + self._constellation = constellation self._samples_per_symbol = samples_per_symbol self._excess_bw = excess_bw self._phase_bw = phase_bw @@ -259,13 +265,16 @@ class generic_demod(gr.hier_block2): if self._samples_per_symbol < 2: raise TypeError, ("sbp must be >= 2, is %d" % self._samples_per_symbol) + # Only apply a predifferential coding if the constellation also supports it. + self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code() + arity = pow(2,self.bits_per_symbol()) nfilts = 32 ntaps = 11 * int(self._samples_per_symbol*nfilts) # Automatic gain control - self.agc = gr.agc2_cc(0.6e-1, 1e-3, 1, 1, 100) + self.agc = analog.agc2_cc(0.6e-1, 1e-3, 1, 1, 100) # Frequency correction fll_ntaps = 55 @@ -273,8 +282,8 @@ class generic_demod(gr.hier_block2): fll_ntaps, self._freq_bw) # symbol timing recovery with RRC data filter - taps = gr.firdes.root_raised_cosine(nfilts, nfilts*self._samples_per_symbol, - 1.0, self._excess_bw, ntaps) + taps = filter.firdes.root_raised_cosine(nfilts, nfilts*self._samples_per_symbol, + 1.0, self._excess_bw, ntaps) self.time_recov = digital.pfb_clock_sync_ccf(self._samples_per_symbol, self._timing_bw, taps, nfilts, nfilts//2, self._timing_max_dev) @@ -282,19 +291,19 @@ class generic_demod(gr.hier_block2): fmin = -0.25 fmax = 0.25 self.receiver = digital.constellation_receiver_cb( - self._constellation, self._phase_bw, + self._constellation.base(), self._phase_bw, fmin, fmax) # Do differential decoding based on phase change of symbols if differential: self.diffdec = digital.diff_decoder_bb(arity) - if gray_coded: + if self.pre_diff_code: self.symbol_mapper = digital.map_bb( mod_codes.invert_code(self._constellation.pre_diff_code())) # unpack the k bit vector into a stream of bits - self.unpack = gr.unpack_k_bits_bb(self.bits_per_symbol()) + self.unpack = blocks.unpack_k_bits_bb(self.bits_per_symbol()) if verbose: self._print_verbage() @@ -303,14 +312,14 @@ class generic_demod(gr.hier_block2): self._setup_logging() # Connect and Initialize base class - blocks = [self, self.agc, self.freq_recov, - self.time_recov, self.receiver] + self._blocks = [self, self.agc, self.freq_recov, + self.time_recov, self.receiver] if differential: - blocks.append(self.diffdec) - if self._constellation.apply_pre_diff_code(): - blocks.append(self.symbol_mapper) - blocks += [self.unpack, self] - self.connect(*blocks) + self._blocks.append(self.diffdec) + if self.pre_diff_code: + self._blocks.append(self.symbol_mapper) + self._blocks += [self.unpack, self] + self.connect(*self._blocks) def samples_per_symbol(self): return self._samples_per_symbol @@ -329,39 +338,39 @@ class generic_demod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.agc, - gr.file_sink(gr.sizeof_gr_complex, "rx_agc.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "rx_agc.32fc")) self.connect((self.freq_recov, 0), - gr.file_sink(gr.sizeof_gr_complex, "rx_freq_recov.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "rx_freq_recov.32fc")) self.connect((self.freq_recov, 1), - gr.file_sink(gr.sizeof_float, "rx_freq_recov_freq.32f")) + blocks.file_sink(gr.sizeof_float, "rx_freq_recov_freq.32f")) self.connect((self.freq_recov, 2), - gr.file_sink(gr.sizeof_float, "rx_freq_recov_phase.32f")) + blocks.file_sink(gr.sizeof_float, "rx_freq_recov_phase.32f")) self.connect((self.freq_recov, 3), - gr.file_sink(gr.sizeof_float, "rx_freq_recov_error.32f")) + blocks.file_sink(gr.sizeof_float, "rx_freq_recov_error.32f")) self.connect((self.time_recov, 0), - gr.file_sink(gr.sizeof_gr_complex, "rx_time_recov.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "rx_time_recov.32fc")) self.connect((self.time_recov, 1), - gr.file_sink(gr.sizeof_float, "rx_time_recov_error.32f")) + blocks.file_sink(gr.sizeof_float, "rx_time_recov_error.32f")) self.connect((self.time_recov, 2), - gr.file_sink(gr.sizeof_float, "rx_time_recov_rate.32f")) + blocks.file_sink(gr.sizeof_float, "rx_time_recov_rate.32f")) self.connect((self.time_recov, 3), - gr.file_sink(gr.sizeof_float, "rx_time_recov_phase.32f")) + blocks.file_sink(gr.sizeof_float, "rx_time_recov_phase.32f")) self.connect((self.receiver, 0), - gr.file_sink(gr.sizeof_char, "rx_receiver.8b")) + blocks.file_sink(gr.sizeof_char, "rx_receiver.8b")) self.connect((self.receiver, 1), - gr.file_sink(gr.sizeof_float, "rx_receiver_error.32f")) + blocks.file_sink(gr.sizeof_float, "rx_receiver_error.32f")) self.connect((self.receiver, 2), - gr.file_sink(gr.sizeof_float, "rx_receiver_phase.32f")) + blocks.file_sink(gr.sizeof_float, "rx_receiver_phase.32f")) self.connect((self.receiver, 3), - gr.file_sink(gr.sizeof_float, "rx_receiver_freq.32f")) + blocks.file_sink(gr.sizeof_float, "rx_receiver_freq.32f")) if self._differential: self.connect(self.diffdec, - gr.file_sink(gr.sizeof_char, "rx_diffdec.8b")) - if self._constellation.apply_pre_diff_code(): + blocks.file_sink(gr.sizeof_char, "rx_diffdec.8b")) + if self.pre_diff_code: self.connect(self.symbol_mapper, - gr.file_sink(gr.sizeof_char, "rx_symbol_mapper.8b")) + blocks.file_sink(gr.sizeof_char, "rx_symbol_mapper.8b")) self.connect(self.unpack, - gr.file_sink(gr.sizeof_char, "rx_unpack.8b")) + blocks.file_sink(gr.sizeof_char, "rx_unpack.8b")) def add_options(parser): """ @@ -385,3 +394,17 @@ class generic_demod(gr.hier_block2): return extract_kwargs_from_options_for_class(cls, options) extract_kwargs_from_options=classmethod(extract_kwargs_from_options) +shared_demod_args = """ samples_per_symbol: samples per baud >= 2 (float) + excess_bw: Root-raised cosine filter excess bandwidth (float) + freq_bw: loop filter lock-in bandwidth (float) + timing_bw: timing recovery loop lock-in bandwidth (float) + phase_bw: phase recovery loop bandwidth (float) + verbose: Print information about modulator? (boolean) + log: Log modulation data to files? (boolean) +""" + +shared_mod_args = """ samples_per_symbol: samples per baud >= 2 (float) + excess_bw: Root-raised cosine filter excess bandwidth (float) + verbose: Print information about modulator? (boolean) + log: Log modulation data to files? (boolean) +""" diff --git a/gr-digital/python/gfsk.py b/gr-digital/python/gfsk.py index c85fdf0e00..54c94b88fd 100644 --- a/gr-digital/python/gfsk.py +++ b/gr-digital/python/gfsk.py @@ -2,7 +2,7 @@ # GFSK modulation and demodulation. # # -# Copyright 2005,2006,2007 Free Software Foundation, Inc. +# Copyright 2005-2007,2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,6 +25,8 @@ # See gnuradio-examples/python/digital for examples from gnuradio import gr +from gnuradio import analog +from gnuradio import blocks import modulation_utils import digital_swig as digital from math import pi @@ -32,6 +34,11 @@ import numpy from pprint import pprint import inspect +try: + from gnuradio import filter +except ImportError: + import filter_swig as filter + # default values (used in __init__ and add_options) _def_samples_per_symbol = 2 _def_sensitivity = 1 @@ -68,14 +75,11 @@ class gfsk_mod(gr.hier_block2): The input is a byte stream (unsigned char) and the output is the complex modulated signal at baseband. - @param samples_per_symbol: samples per baud >= 2 - @type samples_per_symbol: integer - @param bt: Gaussian filter bandwidth * symbol time - @type bt: float - @param verbose: Print information about modulator? - @type verbose: bool - @param debug: Print modualtion data to files? - @type debug: bool + Args: + samples_per_symbol: samples per baud >= 2 (integer) + bt: Gaussian filter bandwidth * symbol time (float) + verbose: Print information about modulator? (bool) + debug: Print modualtion data to files? (bool) """ gr.hier_block2.__init__(self, "gfsk_mod", @@ -94,11 +98,13 @@ class gfsk_mod(gr.hier_block2): #sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2 # Turn it into NRZ data. - self.nrz = gr.bytes_to_syms() + #self.nrz = digital.bytes_to_syms() + self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) + self.nrz = digital.chunks_to_symbols_bf([-1, 1]) # Form Gaussian filter # Generate Gaussian response (Needs to be convolved with window below). - self.gaussian_taps = gr.firdes.gaussian( + self.gaussian_taps = filter.firdes.gaussian( 1.0, # gain samples_per_symbol, # symbol_rate bt, # bandwidth * symbol time @@ -107,13 +113,13 @@ class gfsk_mod(gr.hier_block2): self.sqwave = (1,) * samples_per_symbol # rectangular window self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave)) - self.gaussian_filter = gr.interp_fir_filter_fff(samples_per_symbol, self.taps) + self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps) # FM modulation - self.fmmod = gr.frequency_modulator_fc(sensitivity) + self.fmmod = frequency.frequency_modulator_fc(sensitivity) # small amount of output attenuation to prevent clipping USRP sink - self.amp = gr.multiply_const_cc(0.999) + self.amp = blocks.multiply_const_cc(0.999) if verbose: self._print_verbage() @@ -122,7 +128,7 @@ class gfsk_mod(gr.hier_block2): self._setup_logging() # Connect & Initialize base class - self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self) + self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self) def samples_per_symbol(self): return self._samples_per_symbol @@ -140,11 +146,11 @@ class gfsk_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.nrz, - gr.file_sink(gr.sizeof_float, "nrz.dat")) + blocks.file_sink(gr.sizeof_float, "nrz.dat")) self.connect(self.gaussian_filter, - gr.file_sink(gr.sizeof_float, "gaussian_filter.dat")) + blocks.file_sink(gr.sizeof_float, "gaussian_filter.dat")) self.connect(self.fmmod, - gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) + blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) def add_options(parser): @@ -188,23 +194,19 @@ class gfsk_demod(gr.hier_block2): The input is the complex modulated signal at baseband. The output is a stream of bits packed 1 bit per byte (the LSB) - @param samples_per_symbol: samples per baud - @type samples_per_symbol: integer - @param verbose: Print information about modulator? - @type verbose: bool - @param log: Print modualtion data to files? - @type log: bool + Args: + samples_per_symbol: samples per baud (integer) + verbose: Print information about modulator? (bool) + log: Print modualtion data to files? (bool) Clock recovery parameters. These all have reasonble defaults. - @param gain_mu: controls rate of mu adjustment - @type gain_mu: float - @param mu: fractional delay [0.0, 1.0] - @type mu: float - @param omega_relative_limit: sets max variation in omega - @type omega_relative_limit: float, typically 0.000200 (200 ppm) - @param freq_error: bit rate error as a fraction - @param float + Args: + gain_mu: controls rate of mu adjustment (float) + mu: fractional delay [0.0, 1.0] (float) + omega_relative_limit: sets max variation in omega (float, typically 0.000200 (200 ppm)) + freq_error: bit rate error as a fraction + float: """ gr.hier_block2.__init__(self, "gfsk_demod", @@ -230,7 +232,7 @@ class gfsk_demod(gr.hier_block2): # Demodulate FM #sensitivity = (pi / 2) / samples_per_symbol - self.fmdemod = gr.quadrature_demod_cf(1.0 / sensitivity) + self.fmdemod = analog.quadrature_demod_cf(1.0 / sensitivity) # the clock recovery block tracks the symbol clock and resamples as needed. # the output of the block is a stream of soft symbols (float) @@ -270,11 +272,11 @@ class gfsk_demod(gr.hier_block2): def _setup_logging(self): print "Demodulation logging turned on." self.connect(self.fmdemod, - gr.file_sink(gr.sizeof_float, "fmdemod.dat")) + blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) self.connect(self.clock_recovery, - gr.file_sink(gr.sizeof_float, "clock_recovery.dat")) + blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) self.connect(self.slicer, - gr.file_sink(gr.sizeof_char, "slicer.dat")) + blocks.file_sink(gr.sizeof_char, "slicer.dat")) def add_options(parser): """ diff --git a/gr-digital/python/gmsk.py b/gr-digital/python/gmsk.py index 2c9be056c2..055fc6002b 100644 --- a/gr-digital/python/gmsk.py +++ b/gr-digital/python/gmsk.py @@ -2,7 +2,7 @@ # GMSK modulation and demodulation. # # -# Copyright 2005,2006,2007 Free Software Foundation, Inc. +# Copyright 2005-2007,2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,6 +25,8 @@ # See gnuradio-examples/python/digital for examples from gnuradio import gr +from gnuradio import blocks +from gnuradio import analog import modulation_utils import digital_swig as digital from math import pi @@ -32,6 +34,12 @@ import numpy from pprint import pprint import inspect +try: + from gnuradio import filter +except ImportError: + import filter_swig as filter + + # default values (used in __init__ and add_options) _def_samples_per_symbol = 2 _def_bt = 0.35 @@ -53,28 +61,25 @@ _def_omega_relative_limit = 0.005 # ///////////////////////////////////////////////////////////////////////////// class gmsk_mod(gr.hier_block2): + """ + Hierarchical block for Gaussian Minimum Shift Key (GMSK) + modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + samples_per_symbol: samples per baud >= 2 (integer) + bt: Gaussian filter bandwidth * symbol time (float) + verbose: Print information about modulator? (boolean) + debug: Print modulation data to files? (boolean) + """ def __init__(self, samples_per_symbol=_def_samples_per_symbol, bt=_def_bt, verbose=_def_verbose, log=_def_log): - """ - Hierarchical block for Gaussian Minimum Shift Key (GMSK) - modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - @param samples_per_symbol: samples per baud >= 2 - @type samples_per_symbol: integer - @param bt: Gaussian filter bandwidth * symbol time - @type bt: float - @param verbose: Print information about modulator? - @type verbose: bool - @param debug: Print modualtion data to files? - @type debug: bool - """ gr.hier_block2.__init__(self, "gmsk_mod", gr.io_signature(1, 1, gr.sizeof_char), # Input signature @@ -92,11 +97,13 @@ class gmsk_mod(gr.hier_block2): sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2 # Turn it into NRZ data. - self.nrz = gr.bytes_to_syms() + #self.nrz = digital.bytes_to_syms() + self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) + self.nrz = digital.chunks_to_symbols_bf([-1, 1], 1) # Form Gaussian filter # Generate Gaussian response (Needs to be convolved with window below). - self.gaussian_taps = gr.firdes.gaussian( + self.gaussian_taps = filter.firdes.gaussian( 1, # gain samples_per_symbol, # symbol_rate bt, # bandwidth * symbol time @@ -105,10 +112,10 @@ class gmsk_mod(gr.hier_block2): self.sqwave = (1,) * samples_per_symbol # rectangular window self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave)) - self.gaussian_filter = gr.interp_fir_filter_fff(samples_per_symbol, self.taps) + self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps) # FM modulation - self.fmmod = gr.frequency_modulator_fc(sensitivity) + self.fmmod = analog.frequency_modulator_fc(sensitivity) if verbose: self._print_verbage() @@ -117,7 +124,7 @@ class gmsk_mod(gr.hier_block2): self._setup_logging() # Connect & Initialize base class - self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self) + self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self) def samples_per_symbol(self): return self._samples_per_symbol @@ -135,11 +142,11 @@ class gmsk_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.nrz, - gr.file_sink(gr.sizeof_float, "nrz.dat")) + blocks.file_sink(gr.sizeof_float, "nrz.dat")) self.connect(self.gaussian_filter, - gr.file_sink(gr.sizeof_float, "gaussian_filter.dat")) + blocks.file_sink(gr.sizeof_float, "gaussian_filter.dat")) self.connect(self.fmmod, - gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) + blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) def add_options(parser): @@ -166,7 +173,23 @@ class gmsk_mod(gr.hier_block2): # ///////////////////////////////////////////////////////////////////////////// class gmsk_demod(gr.hier_block2): - + """ + Hierarchical block for Gaussian Minimum Shift Key (GMSK) + demodulation. + + The input is the complex modulated signal at baseband. + The output is a stream of bits packed 1 bit per byte (the LSB) + + Args: + samples_per_symbol: samples per baud (integer) + verbose: Print information about modulator? (boolean) + log: Print modualtion data to files? (boolean) + gain_mu: controls rate of mu adjustment (float) + mu: fractional delay [0.0, 1.0] (float) + omega_relative_limit: sets max variation in omega (float) + freq_error: bit rate error as a fraction (float) + """ + def __init__(self, samples_per_symbol=_def_samples_per_symbol, gain_mu=_def_gain_mu, @@ -175,31 +198,6 @@ class gmsk_demod(gr.hier_block2): freq_error=_def_freq_error, verbose=_def_verbose, log=_def_log): - """ - Hierarchical block for Gaussian Minimum Shift Key (GMSK) - demodulation. - - The input is the complex modulated signal at baseband. - The output is a stream of bits packed 1 bit per byte (the LSB) - - @param samples_per_symbol: samples per baud - @type samples_per_symbol: integer - @param verbose: Print information about modulator? - @type verbose: bool - @param log: Print modualtion data to files? - @type log: bool - - Clock recovery parameters. These all have reasonble defaults. - - @param gain_mu: controls rate of mu adjustment - @type gain_mu: float - @param mu: fractional delay [0.0, 1.0] - @type mu: float - @param omega_relative_limit: sets max variation in omega - @type omega_relative_limit: float, typically 0.000200 (200 ppm) - @param freq_error: bit rate error as a fraction - @param float - """ gr.hier_block2.__init__(self, "gmsk_demod", gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature @@ -224,7 +222,7 @@ class gmsk_demod(gr.hier_block2): # Demodulate FM sensitivity = (pi / 2) / samples_per_symbol - self.fmdemod = gr.quadrature_demod_cf(1.0 / sensitivity) + self.fmdemod = analog.quadrature_demod_cf(1.0 / sensitivity) # the clock recovery block tracks the symbol clock and resamples as needed. # the output of the block is a stream of soft symbols (float) @@ -264,11 +262,11 @@ class gmsk_demod(gr.hier_block2): def _setup_logging(self): print "Demodulation logging turned on." self.connect(self.fmdemod, - gr.file_sink(gr.sizeof_float, "fmdemod.dat")) + blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) self.connect(self.clock_recovery, - gr.file_sink(gr.sizeof_float, "clock_recovery.dat")) + blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) self.connect(self.slicer, - gr.file_sink(gr.sizeof_char, "slicer.dat")) + blocks.file_sink(gr.sizeof_char, "slicer.dat")) def add_options(parser): """ diff --git a/gr-digital/python/modulation_utils.py b/gr-digital/python/modulation_utils.py index cb3a9812d4..d499094d05 100644 --- a/gr-digital/python/modulation_utils.py +++ b/gr-digital/python/modulation_utils.py @@ -74,11 +74,10 @@ def extract_kwargs_from_options(function, excluded_args, options): but in that case the default provided in the __init__ argument list will be used since there is no kwargs entry.) - @param function: the function whose parameter list will be examined - @param excluded_args: function arguments that are NOT to be added to the dictionary - @type excluded_args: sequence of strings - @param options: result of command argument parsing - @type options: optparse.Values + Args: + function: the function whose parameter list will be examined + excluded_args: function arguments that are NOT to be added to the dictionary (sequence of strings) + options: result of command argument parsing (optparse.Values) """ # Try this in C++ ;) diff --git a/gr-digital/python/ofdm.py b/gr-digital/python/ofdm.py index 9f57920efc..bf129675ab 100644 --- a/gr-digital/python/ofdm.py +++ b/gr-digital/python/ofdm.py @@ -21,8 +21,9 @@ # import math -from gnuradio import gr -import digital_swig +from gnuradio import gr, fft +from gnuradio import blocks +import digital_swig as digital import ofdm_packet_utils from ofdm_receiver import ofdm_receiver import gnuradio.gr.gr_threading as _threading @@ -46,10 +47,10 @@ class ofdm_mod(gr.hier_block2): Packets to be sent are enqueued by calling send_pkt. The output is the complex modulated signal at baseband. - @param options: pass modulation options from higher layers (fft length, occupied tones, etc.) - @param msgq_limit: maximum number of messages in message queue - @type msgq_limit: int - @param pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples + Args: + options: pass modulation options from higher layers (fft length, occupied tones, etc.) + msgq_limit: maximum number of messages in message queue (int) + pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples """ gr.hier_block2.__init__(self, "ofdm_mod", @@ -97,17 +98,17 @@ class ofdm_mod(gr.hier_block2): constel = qam.qam_constellation(arity) rotated_const = map(lambda pt: pt * rot, constel.points()) #print rotated_const - self._pkt_input = digital_swig.ofdm_mapper_bcv(rotated_const, - msgq_limit, - options.occupied_tones, - options.fft_length) + self._pkt_input = digital.ofdm_mapper_bcv(rotated_const, + msgq_limit, + options.occupied_tones, + options.fft_length) - self.preambles = digital_swig.ofdm_insert_preamble(self._fft_length, - padded_preambles) - self.ifft = gr.fft_vcc(self._fft_length, False, win, True) - self.cp_adder = digital_swig.ofdm_cyclic_prefixer(self._fft_length, - symbol_length) - self.scale = gr.multiply_const_cc(1.0 / math.sqrt(self._fft_length)) + self.preambles = digital.ofdm_insert_preamble(self._fft_length, + padded_preambles) + self.ifft = fft.fft_vcc(self._fft_length, False, win, True) + self.cp_adder = digital.ofdm_cyclic_prefixer(self._fft_length, + symbol_length) + self.scale = blocks.multiply_const_cc(1.0 / math.sqrt(self._fft_length)) self.connect((self._pkt_input, 0), (self.preambles, 0)) self.connect((self._pkt_input, 1), (self.preambles, 1)) @@ -117,21 +118,21 @@ class ofdm_mod(gr.hier_block2): self._print_verbage() if options.log: - self.connect(self._pkt_input, gr.file_sink(gr.sizeof_gr_complex*options.fft_length, + self.connect(self._pkt_input, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length, "ofdm_mapper_c.dat")) - self.connect(self.preambles, gr.file_sink(gr.sizeof_gr_complex*options.fft_length, + self.connect(self.preambles, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length, "ofdm_preambles.dat")) - self.connect(self.ifft, gr.file_sink(gr.sizeof_gr_complex*options.fft_length, + self.connect(self.ifft, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length, "ofdm_ifft_c.dat")) - self.connect(self.cp_adder, gr.file_sink(gr.sizeof_gr_complex, + self.connect(self.cp_adder, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_cp_adder_c.dat")) def send_pkt(self, payload='', eof=False): """ Send the payload. - @param payload: data to send - @type payload: string + Args: + payload: data to send (string) """ if eof: msg = gr.message(1) # tell self._pkt_input we're not sending any more packets @@ -188,9 +189,9 @@ class ofdm_demod(gr.hier_block2): The input is the complex modulated signal at baseband. Demodulated packets are sent to the handler. - @param options: pass modulation options from higher layers (fft length, occupied tones, etc.) - @param callback: function of two args: ok, payload - @type callback: ok: bool; payload: string + Args: + options: pass modulation options from higher layers (fft length, occupied tones, etc.) + callback: function of two args: ok, payload (ok: bool; payload: string) """ gr.hier_block2.__init__(self, "ofdm_demod", gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature @@ -240,10 +241,10 @@ class ofdm_demod(gr.hier_block2): phgain = 0.25 frgain = phgain*phgain / 4.0 - self.ofdm_demod = digital_swig.ofdm_frame_sink(rotated_const, range(arity), - self._rcvd_pktq, - self._occupied_tones, - phgain, frgain) + self.ofdm_demod = digital.ofdm_frame_sink(rotated_const, range(arity), + self._rcvd_pktq, + self._occupied_tones, + phgain, frgain) self.connect(self, self.ofdm_recv) self.connect((self.ofdm_recv, 0), (self.ofdm_demod, 0)) @@ -255,7 +256,7 @@ class ofdm_demod(gr.hier_block2): if options.log: self.connect(self.ofdm_demod, - gr.file_sink(gr.sizeof_gr_complex*self._occupied_tones, + blocks.file_sink(gr.sizeof_gr_complex*self._occupied_tones, "ofdm_frame_sink_c.dat")) else: self.connect(self.ofdm_demod, diff --git a/gr-digital/python/ofdm_packet_utils.py b/gr-digital/python/ofdm_packet_utils.py index d0000e6db5..c49dfe4f8e 100644 --- a/gr-digital/python/ofdm_packet_utils.py +++ b/gr-digital/python/ofdm_packet_utils.py @@ -101,14 +101,12 @@ def make_packet(payload, samples_per_symbol, bits_per_symbol, """ Build a packet, given access code, payload, and whitener offset - @param payload: packet payload, len [0, 4096] - @param samples_per_symbol: samples per symbol (needed for padding calculation) - @type samples_per_symbol: int - @param bits_per_symbol: (needed for padding calculation) - @type bits_per_symbol: int - @param whitener_offset offset into whitener string to use [0-16) - @param whitening: Turn whitener on or off - @type whitening: bool + Args: + payload: packet payload, len [0, 4096] + samples_per_symbol: samples per symbol (needed for padding calculation) (int) + bits_per_symbol: (needed for padding calculation) (int) + whitener_offset: offset into whitener string to use [0-16) + whitening: Turn whitener on or off (bool) Packet will have access code at the beginning, followed by length, payload and finally CRC-32. @@ -150,13 +148,13 @@ def _npadding_bytes(pkt_byte_len, samples_per_symbol, bits_per_symbol): we want to pad so that after modulation the resulting packet is a multiple of 128 samples. - @param ptk_byte_len: len in bytes of packet, not including padding. - @param samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK) - @type samples_per_symbol: int - @param bits_per_symbol: bits per symbol (log2(modulation order)) - @type bits_per_symbol: int + Args: + ptk_byte_len: len in bytes of packet, not including padding. + samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK) (int) + bits_per_symbol: bits per symbol (log2(modulation order)) (int) - @returns number of bytes of padding to append. + Returns: + number of bytes of padding to append. """ modulus = 128 byte_modulus = gru.lcm(modulus/8, samples_per_symbol) * bits_per_symbol / samples_per_symbol @@ -170,10 +168,10 @@ def unmake_packet(whitened_payload_with_crc, whitener_offset=0, dewhitening=1): """ Return (ok, payload) - @param whitened_payload_with_crc: string - @param whitener_offset offset into whitener string to use [0-16) - @param dewhitening: Turn whitener on or off - @type dewhitening: bool + Args: + whitened_payload_with_crc: string + whitener_offset: offset into whitener string to use [0-16) + dewhitening: Turn whitener on or off (bool) """ if dewhitening: diff --git a/gr-digital/python/ofdm_receiver.py b/gr-digital/python/ofdm_receiver.py index 9d4d6e559d..4fbf76251a 100644 --- a/gr-digital/python/ofdm_receiver.py +++ b/gr-digital/python/ofdm_receiver.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006, 2007, 2008 Free Software Foundation, Inc. +# Copyright 2006-2008 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -23,13 +23,21 @@ import math from numpy import fft from gnuradio import gr +from gnuradio import analog +from gnuradio import blocks +from gnuradio import filter -import digital_swig +import digital_swig as digital from ofdm_sync_pn import ofdm_sync_pn from ofdm_sync_fixed import ofdm_sync_fixed from ofdm_sync_pnac import ofdm_sync_pnac from ofdm_sync_ml import ofdm_sync_ml +try: + from gnuradio import filter +except ImportError: + import filter_swig as filter + class ofdm_receiver(gr.hier_block2): """ Performs receiver synchronization on OFDM symbols. @@ -47,18 +55,13 @@ class ofdm_receiver(gr.hier_block2): The input is the complex modulated signal at baseband. Synchronized packets are sent back to the demodulator. - @param fft_length: total number of subcarriers - @type fft_length: int - @param cp_length: length of cyclic prefix as specified in subcarriers (<= fft_length) - @type cp_length: int - @param occupied_tones: number of subcarriers used for data - @type occupied_tones: int - @param snr: estimated signal to noise ratio used to guide cyclic prefix synchronizer - @type snr: float - @param ks: known symbols used as preambles to each packet - @type ks: list of lists - @param logging: turn file logging on or off - @type logging: bool + Args: + fft_length: total number of subcarriers (int) + cp_length: length of cyclic prefix as specified in subcarriers (<= fft_length) (int) + occupied_tones: number of subcarriers used for data (int) + snr: estimated signal to noise ratio used to guide cyclic prefix synchronizer (float) + ks: known symbols used as preambles to each packet (list of lists) + logging: turn file logging on or off (bool) """ gr.hier_block2.__init__(self, "ofdm_receiver", @@ -67,12 +70,12 @@ class ofdm_receiver(gr.hier_block2): bw = (float(occupied_tones) / float(fft_length)) / 2.0 tb = bw*0.08 - chan_coeffs = gr.firdes.low_pass (1.0, # gain - 1.0, # sampling rate - bw+tb, # midpoint of trans. band - tb, # width of trans. band - gr.firdes.WIN_HAMMING) # filter type - self.chan_filt = gr.fft_filter_ccc(1, chan_coeffs) + chan_coeffs = filter.firdes.low_pass (1.0, # gain + 1.0, # sampling rate + bw+tb, # midpoint of trans. band + tb, # width of trans. band + filter.firdes.WIN_HAMMING) # filter type + self.chan_filt = filter.fft_filter_ccc(1, chan_coeffs) win = [1 for i in range(fft_length)] @@ -107,7 +110,7 @@ class ofdm_receiver(gr.hier_block2): # for testing only; do not user over the air # remove filter and filter delay for this elif SYNC == "fixed": - self.chan_filt = gr.multiply_const_cc(1.0) + self.chan_filt = blocks.multiply_const_cc(1.0) nsymbols = 18 # enter the number of symbols per packet freq_offset = 0.0 # if you use a frequency offset, enter it here nco_sensitivity = -2.0/fft_length # correct for fine frequency @@ -119,11 +122,11 @@ class ofdm_receiver(gr.hier_block2): # Set up blocks - self.nco = gr.frequency_modulator_fc(nco_sensitivity) # generate a signal proportional to frequency error of sync block - self.sigmix = gr.multiply_cc() - self.sampler = digital_swig.ofdm_sampler(fft_length, fft_length+cp_length) - self.fft_demod = gr.fft_vcc(fft_length, True, win, True) - self.ofdm_frame_acq = digital_swig.ofdm_frame_acquisition(occupied_tones, + self.nco = analog.frequency_modulator_fc(nco_sensitivity) # generate a signal proportional to frequency error of sync block + self.sigmix = blocks.multiply_cc() + self.sampler = digital.ofdm_sampler(fft_length, fft_length+cp_length) + self.fft_demod = fft.fft_vcc(fft_length, True, win, True) + self.ofdm_frame_acq = digital.ofdm_frame_acquisition(occupied_tones, fft_length, cp_length, ks[0]) @@ -141,11 +144,11 @@ class ofdm_receiver(gr.hier_block2): self.connect((self.ofdm_frame_acq,1), (self,1)) # frame and symbol timing, and equalization if logging: - self.connect(self.chan_filt, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-chan_filt_c.dat")) - self.connect(self.fft_demod, gr.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-fft_out_c.dat")) + self.connect(self.chan_filt, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-chan_filt_c.dat")) + self.connect(self.fft_demod, blocks.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-fft_out_c.dat")) self.connect(self.ofdm_frame_acq, - gr.file_sink(gr.sizeof_gr_complex*occupied_tones, "ofdm_receiver-frame_acq_c.dat")) - self.connect((self.ofdm_frame_acq,1), gr.file_sink(1, "ofdm_receiver-found_corr_b.dat")) - self.connect(self.sampler, gr.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-sampler_c.dat")) - self.connect(self.sigmix, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-sigmix_c.dat")) - self.connect(self.nco, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-nco_c.dat")) + blocks.file_sink(gr.sizeof_gr_complex*occupied_tones, "ofdm_receiver-frame_acq_c.dat")) + self.connect((self.ofdm_frame_acq,1), blocks.file_sink(1, "ofdm_receiver-found_corr_b.dat")) + self.connect(self.sampler, blocks.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-sampler_c.dat")) + self.connect(self.sigmix, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-sigmix_c.dat")) + self.connect(self.nco, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-nco_c.dat")) diff --git a/gr-digital/python/ofdm_sync_fixed.py b/gr-digital/python/ofdm_sync_fixed.py index 9bac789bf6..bd64964651 100644 --- a/gr-digital/python/ofdm_sync_fixed.py +++ b/gr-digital/python/ofdm_sync_fixed.py @@ -22,6 +22,7 @@ import math from gnuradio import gr +from gnuradio import blocks class ofdm_sync_fixed(gr.hier_block2): def __init__(self, fft_length, cp_length, nsymbols, freq_offset, logging=False): @@ -46,5 +47,5 @@ class ofdm_sync_fixed(gr.hier_block2): self.connect(self.peak_trigger, (self,1)) if logging: - self.connect(self.peak_trigger, gr.file_sink(gr.sizeof_char, "ofdm_sync_fixed-peaks_b.dat")) + self.connect(self.peak_trigger, blocks.file_sink(gr.sizeof_char, "ofdm_sync_fixed-peaks_b.dat")) diff --git a/gr-digital/python/ofdm_sync_ml.py b/gr-digital/python/ofdm_sync_ml.py index 7c75d7f1d4..3afd647098 100644 --- a/gr-digital/python/ofdm_sync_ml.py +++ b/gr-digital/python/ofdm_sync_ml.py @@ -23,6 +23,16 @@ import math from gnuradio import gr +try: + from gnuradio import filter +except ImportError: + import filter_swig as filter + +try: + from gnuradio import blocks +except ImportError: + import blocks_swig as blocks + class ofdm_sync_ml(gr.hier_block2): def __init__(self, fft_length, cp_length, snr, kstime, logging): ''' Maximum Likelihood OFDM synchronizer: @@ -35,7 +45,7 @@ class ofdm_sync_ml(gr.hier_block2): gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature gr.io_signature2(2, 2, gr.sizeof_float, gr.sizeof_char)) # Output signature - self.input = gr.add_const_cc(0) + self.input = blocks.add_const_cc(0) SNR = 10.0**(snr/10.0) rho = SNR / (SNR + 1.0) @@ -48,16 +58,16 @@ class ofdm_sync_ml(gr.hier_block2): self.connect(self, self.input) # Create a delay line - self.delay = gr.delay(gr.sizeof_gr_complex, fft_length) + self.delay = blocks.delay(gr.sizeof_gr_complex, fft_length) self.connect(self.input, self.delay) # magnitude squared blocks - self.magsqrd1 = gr.complex_to_mag_squared() - self.magsqrd2 = gr.complex_to_mag_squared() - self.adder = gr.add_ff() + self.magsqrd1 = blocks.complex_to_mag_squared() + self.magsqrd2 = blocks.complex_to_mag_squared() + self.adder = blocks.add_ff() moving_sum_taps = [rho/2 for i in range(cp_length)] - self.moving_sum_filter = gr.fir_filter_fff(1,moving_sum_taps) + self.moving_sum_filter = filter.fir_filter_fff(1,moving_sum_taps) self.connect(self.input,self.magsqrd1) self.connect(self.delay,self.magsqrd2) @@ -67,29 +77,29 @@ class ofdm_sync_ml(gr.hier_block2): # Correlation from ML Sync - self.conjg = gr.conjugate_cc(); - self.mixer = gr.multiply_cc(); + self.conjg = blocks.conjugate_cc(); + self.mixer = blocks.multiply_cc(); movingsum2_taps = [1.0 for i in range(cp_length)] - self.movingsum2 = gr.fir_filter_ccf(1,movingsum2_taps) + self.movingsum2 = filter.fir_filter_ccf(1,movingsum2_taps) # Correlator data handler - self.c2mag = gr.complex_to_mag() - self.angle = gr.complex_to_arg() + self.c2mag = blocks.complex_to_mag() + self.angle = blocks.complex_to_arg() self.connect(self.input,(self.mixer,1)) self.connect(self.delay,self.conjg,(self.mixer,0)) self.connect(self.mixer,self.movingsum2,self.c2mag) self.connect(self.movingsum2,self.angle) # ML Sync output arg, need to find maximum point of this - self.diff = gr.sub_ff() + self.diff = blocks.sub_ff() self.connect(self.c2mag,(self.diff,0)) self.connect(self.moving_sum_filter,(self.diff,1)) #ML measurements input to sampler block and detect - self.f2c = gr.float_to_complex() - self.pk_detect = gr.peak_detector_fb(0.2, 0.25, 30, 0.0005) - self.sample_and_hold = gr.sample_and_hold_ff() + self.f2c = blocks.float_to_complex() + self.pk_detect = blocks.peak_detector_fb(0.2, 0.25, 30, 0.0005) + self.sample_and_hold = blocks.sample_and_hold_ff() # use the sync loop values to set the sampler and the NCO # self.diff = theta @@ -115,9 +125,9 @@ class ofdm_sync_ml(gr.hier_block2): # to readjust the timing in the middle of the packet or we ruin the equalizer settings. kstime = [k.conjugate() for k in kstime] kstime.reverse() - self.kscorr = gr.fir_filter_ccc(1, kstime) - self.corrmag = gr.complex_to_mag_squared() - self.div = gr.divide_ff() + self.kscorr = filter.fir_filter_ccc(1, kstime) + self.corrmag = blocks.complex_to_mag_squared() + self.div = blocks.divide_ff() # The output signature of the correlation has a few spikes because the rest of the # system uses the repeated preamble symbol. It needs to work that generically if @@ -125,10 +135,10 @@ class ofdm_sync_ml(gr.hier_block2): # The output theta of the correlator above is multiplied with this correlation to # identify the proper peak and remove other products in this cross-correlation self.threshold_factor = 0.1 - self.slice = gr.threshold_ff(self.threshold_factor, self.threshold_factor, 0) - self.f2b = gr.float_to_char() - self.b2f = gr.char_to_float() - self.mul = gr.multiply_ff() + self.slice = blocks.threshold_ff(self.threshold_factor, self.threshold_factor, 0) + self.f2b = blocks.float_to_char() + self.b2f = blocks.char_to_float() + self.mul = blocks.multiply_ff() # Normalize the power of the corr output by the energy. This is not really needed # and could be removed for performance, but it makes for a cleaner signal. @@ -148,18 +158,18 @@ class ofdm_sync_ml(gr.hier_block2): if logging: - self.connect(self.moving_sum_filter, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-energy_f.dat")) - self.connect(self.diff, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-theta_f.dat")) - self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-epsilon_f.dat")) - self.connect(self.corrmag, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-corrmag_f.dat")) - self.connect(self.kscorr, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-kscorr_c.dat")) - self.connect(self.div, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-div_f.dat")) - self.connect(self.mul, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-mul_f.dat")) - self.connect(self.slice, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-slice_f.dat")) - self.connect(self.pk_detect, gr.file_sink(gr.sizeof_char, "ofdm_sync_ml-peaks_b.dat")) + self.connect(self.moving_sum_filter, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-energy_f.dat")) + self.connect(self.diff, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-theta_f.dat")) + self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-epsilon_f.dat")) + self.connect(self.corrmag, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-corrmag_f.dat")) + self.connect(self.kscorr, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-kscorr_c.dat")) + self.connect(self.div, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-div_f.dat")) + self.connect(self.mul, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-mul_f.dat")) + self.connect(self.slice, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-slice_f.dat")) + self.connect(self.pk_detect, blocks.file_sink(gr.sizeof_char, "ofdm_sync_ml-peaks_b.dat")) if use_dpll: - self.connect(self.dpll, gr.file_sink(gr.sizeof_char, "ofdm_sync_ml-dpll_b.dat")) + self.connect(self.dpll, blocks.file_sink(gr.sizeof_char, "ofdm_sync_ml-dpll_b.dat")) - self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-sample_and_hold_f.dat")) - self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-input_c.dat")) + self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-sample_and_hold_f.dat")) + self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-input_c.dat")) diff --git a/gr-digital/python/ofdm_sync_pn.py b/gr-digital/python/ofdm_sync_pn.py index 05b1de2e19..4c6a30f802 100644 --- a/gr-digital/python/ofdm_sync_pn.py +++ b/gr-digital/python/ofdm_sync_pn.py @@ -24,6 +24,16 @@ import math from numpy import fft from gnuradio import gr +try: + from gnuradio import filter +except ImportError: + import filter_swig as filter + +try: + from gnuradio import blocks +except ImportError: + import blocks_swig as blocks + class ofdm_sync_pn(gr.hier_block2): def __init__(self, fft_length, cp_length, logging=False): """ @@ -37,47 +47,47 @@ class ofdm_sync_pn(gr.hier_block2): gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature gr.io_signature2(2, 2, gr.sizeof_float, gr.sizeof_char)) # Output signature - self.input = gr.add_const_cc(0) + self.input = blocks.add_const_cc(0) # PN Sync # Create a delay line - self.delay = gr.delay(gr.sizeof_gr_complex, fft_length/2) + self.delay = blocks.delay(gr.sizeof_gr_complex, fft_length/2) # Correlation from ML Sync - self.conjg = gr.conjugate_cc(); - self.corr = gr.multiply_cc(); + self.conjg = blocks.conjugate_cc(); + self.corr = blocks.multiply_cc(); # Create a moving sum filter for the corr output if 1: moving_sum_taps = [1.0 for i in range(fft_length//2)] - self.moving_sum_filter = gr.fir_filter_ccf(1,moving_sum_taps) + self.moving_sum_filter = filter.fir_filter_ccf(1,moving_sum_taps) else: moving_sum_taps = [complex(1.0,0.0) for i in range(fft_length//2)] - self.moving_sum_filter = gr.fft_filter_ccc(1,moving_sum_taps) + self.moving_sum_filter = filter.fft_filter_ccc(1,moving_sum_taps) # Create a moving sum filter for the input - self.inputmag2 = gr.complex_to_mag_squared() + self.inputmag2 = blocks.complex_to_mag_squared() movingsum2_taps = [1.0 for i in range(fft_length//2)] if 1: - self.inputmovingsum = gr.fir_filter_fff(1,movingsum2_taps) + self.inputmovingsum = filter.fir_filter_fff(1,movingsum2_taps) else: - self.inputmovingsum = gr.fft_filter_fff(1,movingsum2_taps) + self.inputmovingsum = filter.fft_filter_fff(1,movingsum2_taps) - self.square = gr.multiply_ff() - self.normalize = gr.divide_ff() + self.square = blocks.multiply_ff() + self.normalize = blocks.divide_ff() # Get magnitude (peaks) and angle (phase/freq error) - self.c2mag = gr.complex_to_mag_squared() - self.angle = gr.complex_to_arg() + self.c2mag = blocks.complex_to_mag_squared() + self.angle = blocks.complex_to_arg() - self.sample_and_hold = gr.sample_and_hold_ff() + self.sample_and_hold = blocks.sample_and_hold_ff() #ML measurements input to sampler block and detect - self.sub1 = gr.add_const_ff(-1) - self.pk_detect = gr.peak_detector_fb(0.20, 0.20, 30, 0.001) - #self.pk_detect = gr.peak_detector2_fb(9) + self.sub1 = blocks.add_const_ff(-1) + self.pk_detect = blocks.peak_detector_fb(0.20, 0.20, 30, 0.001) + #self.pk_detect = blocks.peak_detector2_fb(9) self.connect(self, self.input) @@ -100,7 +110,7 @@ class ofdm_sync_pn(gr.hier_block2): # Create a moving sum filter for the corr output matched_filter_taps = [1.0/cp_length for i in range(cp_length)] - self.matched_filter = gr.fir_filter_fff(1,matched_filter_taps) + self.matched_filter = filter.fir_filter_fff(1,matched_filter_taps) self.connect(self.normalize, self.matched_filter) self.connect(self.matched_filter, self.sub1, self.pk_detect) @@ -114,10 +124,10 @@ class ofdm_sync_pn(gr.hier_block2): self.connect(self.pk_detect, (self,1)) if logging: - self.connect(self.matched_filter, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-mf_f.dat")) - self.connect(self.normalize, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-theta_f.dat")) - self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-epsilon_f.dat")) - self.connect(self.pk_detect, gr.file_sink(gr.sizeof_char, "ofdm_sync_pn-peaks_b.dat")) - self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-sample_and_hold_f.dat")) - self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pn-input_c.dat")) + self.connect(self.matched_filter, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-mf_f.dat")) + self.connect(self.normalize, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-theta_f.dat")) + self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-epsilon_f.dat")) + self.connect(self.pk_detect, blocks.file_sink(gr.sizeof_char, "ofdm_sync_pn-peaks_b.dat")) + self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-sample_and_hold_f.dat")) + self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pn-input_c.dat")) diff --git a/gr-digital/python/ofdm_sync_pnac.py b/gr-digital/python/ofdm_sync_pnac.py index 10a1259641..ee7c82927a 100644 --- a/gr-digital/python/ofdm_sync_pnac.py +++ b/gr-digital/python/ofdm_sync_pnac.py @@ -24,6 +24,16 @@ import math from numpy import fft from gnuradio import gr +try: + from gnuradio import filter +except ImportError: + import filter_swig as filter + +try: + from gnuradio import blocks +except ImportError: + import blocks_swig as blocks + class ofdm_sync_pnac(gr.hier_block2): def __init__(self, fft_length, cp_length, kstime, logging=False): """ @@ -50,7 +60,7 @@ class ofdm_sync_pnac(gr.hier_block2): gr.io_signature2(2, 2, gr.sizeof_float, gr.sizeof_char)) # Output signature - self.input = gr.add_const_cc(0) + self.input = blocks.add_const_cc(0) symbol_length = fft_length + cp_length @@ -59,30 +69,30 @@ class ofdm_sync_pnac(gr.hier_block2): # cross-correlate with the known symbol kstime = [k.conjugate() for k in kstime[0:fft_length//2]] kstime.reverse() - self.crosscorr_filter = gr.fir_filter_ccc(1, kstime) + self.crosscorr_filter = filter.fir_filter_ccc(1, kstime) # Create a delay line - self.delay = gr.delay(gr.sizeof_gr_complex, fft_length/2) + self.delay = blocks.delay(gr.sizeof_gr_complex, fft_length/2) # Correlation from ML Sync - self.conjg = gr.conjugate_cc(); - self.corr = gr.multiply_cc(); + self.conjg = blocks.conjugate_cc(); + self.corr = blocks.multiply_cc(); # Create a moving sum filter for the input - self.mag = gr.complex_to_mag_squared() + self.mag = blocks.complex_to_mag_squared() movingsum_taps = (fft_length//1)*[1.0,] - self.power = gr.fir_filter_fff(1,movingsum_taps) + self.power = filter.fir_filter_fff(1,movingsum_taps) # Get magnitude (peaks) and angle (phase/freq error) - self.c2mag = gr.complex_to_mag_squared() - self.angle = gr.complex_to_arg() - self.compare = gr.sub_ff() + self.c2mag = blocks.complex_to_mag_squared() + self.angle = blocks.complex_to_arg() + self.compare = blocks.sub_ff() - self.sample_and_hold = gr.sample_and_hold_ff() + self.sample_and_hold = blocks.sample_and_hold_ff() #ML measurements input to sampler block and detect - self.threshold = gr.threshold_ff(0,0,0) # threshold detection might need to be tweaked - self.peaks = gr.float_to_char() + self.threshold = blocks.threshold_ff(0,0,0) # threshold detection might need to be tweaked + self.peaks = blocksx.float_to_char() self.connect(self, self.input) @@ -115,11 +125,11 @@ class ofdm_sync_pnac(gr.hier_block2): self.connect(self.peaks, (self,1)) if logging: - self.connect(self.compare, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-compare_f.dat")) - self.connect(self.c2mag, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-theta_f.dat")) - self.connect(self.power, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-inputpower_f.dat")) - self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-epsilon_f.dat")) - self.connect(self.threshold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-threshold_f.dat")) - self.connect(self.peaks, gr.file_sink(gr.sizeof_char, "ofdm_sync_pnac-peaks_b.dat")) - self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-sample_and_hold_f.dat")) - self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pnac-input_c.dat")) + self.connect(self.compare, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-compare_f.dat")) + self.connect(self.c2mag, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-theta_f.dat")) + self.connect(self.power, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-inputpower_f.dat")) + self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-epsilon_f.dat")) + self.connect(self.threshold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-threshold_f.dat")) + self.connect(self.peaks, blocks.file_sink(gr.sizeof_char, "ofdm_sync_pnac-peaks_b.dat")) + self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-sample_and_hold_f.dat")) + self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pnac-input_c.dat")) diff --git a/gr-digital/python/packet_utils.py b/gr-digital/python/packet_utils.py index 2e216ff50e..2929758ef0 100644 --- a/gr-digital/python/packet_utils.py +++ b/gr-digital/python/packet_utils.py @@ -107,13 +107,12 @@ def make_packet(payload, samples_per_symbol, bits_per_symbol, """ Build a packet, given access code, payload, and whitener offset - @param payload: packet payload, len [0, 4096] - @param samples_per_symbol: samples per symbol (needed for padding calculation) - @type samples_per_symbol: int - @param bits_per_symbol: (needed for padding calculation) - @type bits_per_symbol: int - @param access_code: string of ascii 0's and 1's - @param whitener_offset offset into whitener string to use [0-16) + Args: + payload: packet payload, len [0, 4096] + samples_per_symbol: samples per symbol (needed for padding calculation) (int) + bits_per_symbol: (needed for padding calculation) (int) + access_code: string of ascii 0's and 1's + whitener_offset: offset into whitener string to use [0-16) Packet will have access code at the beginning, followed by length, payload and finally CRC-32. @@ -156,13 +155,13 @@ def _npadding_bytes(pkt_byte_len, samples_per_symbol, bits_per_symbol): we want to pad so that after modulation the resulting packet is a multiple of 128 samples. - @param ptk_byte_len: len in bytes of packet, not including padding. - @param samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK) - @type samples_per_symbol: int - @param bits_per_symbol: bits per symbol (log2(modulation order)) - @type bits_per_symbol: int + Args: + ptk_byte_len: len in bytes of packet, not including padding. + samples_per_symbol: samples per bit (1 bit / symbolwidth GMSK) (int) + bits_per_symbol: bits per symbol (log2(modulation order)) (int) - @returns number of bytes of padding to append. + Returns: + number of bytes of padding to append. """ modulus = 128 byte_modulus = gru.lcm(modulus/8, samples_per_symbol) * bits_per_symbol / samples_per_symbol @@ -176,7 +175,8 @@ def unmake_packet(whitened_payload_with_crc, whitener_offset=0, dewhitening=True """ Return (ok, payload) - @param whitened_payload_with_crc: string + Args: + whitened_payload_with_crc: string """ if dewhitening: diff --git a/gr-digital/python/pkt.py b/gr-digital/python/pkt.py index 8650bdbb02..434548906e 100644 --- a/gr-digital/python/pkt.py +++ b/gr-digital/python/pkt.py @@ -23,8 +23,12 @@ from math import pi from gnuradio import gr import gnuradio.gr.gr_threading as _threading import packet_utils -import digital_swig +import digital_swig as digital +try: + from gnuradio import blocks +except ImportError: + import blocks_swig as blocks # ///////////////////////////////////////////////////////////////////////////// # mod/demod with packets as i/o @@ -44,14 +48,12 @@ class mod_pkts(gr.hier_block2): Packets to be sent are enqueued by calling send_pkt. The output is the complex modulated signal at baseband. - @param modulator: instance of modulator class (gr_block or hier_block2) - @type modulator: complex baseband out - @param access_code: AKA sync vector - @type access_code: string of 1's and 0's between 1 and 64 long - @param msgq_limit: maximum number of messages in message queue - @type msgq_limit: int - @param pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples - @param use_whitener_offset: If true, start of whitener XOR string is incremented each packet + Args: + modulator: instance of modulator class (gr_block or hier_block2) (complex baseband out) + access_code: AKA sync vector (string of 1's and 0's between 1 and 64 long) + msgq_limit: maximum number of messages in message queue (int) + pad_for_usrp: If true, packets are padded such that they end up a multiple of 128 samples + use_whitener_offset: If true, start of whitener XOR string is incremented each packet See gmsk_mod for remaining parameters """ @@ -72,15 +74,15 @@ class mod_pkts(gr.hier_block2): self._access_code = access_code # accepts messages from the outside world - self._pkt_input = gr.message_source(gr.sizeof_char, msgq_limit) + self._pkt_input = blocks.message_source(gr.sizeof_char, msgq_limit) self.connect(self._pkt_input, self._modulator, self) def send_pkt(self, payload='', eof=False): """ Send the payload. - @param payload: data to send - @type payload: string + Args: + payload: data to send (string) """ if eof: msg = gr.message(1) # tell self._pkt_input we're not sending any more packets @@ -116,14 +118,11 @@ class demod_pkts(gr.hier_block2): The input is the complex modulated signal at baseband. Demodulated packets are sent to the handler. - @param demodulator: instance of demodulator class (gr_block or hier_block2) - @type demodulator: complex baseband in - @param access_code: AKA sync vector - @type access_code: string of 1's and 0's - @param callback: function of two args: ok, payload - @type callback: ok: bool; payload: string - @param threshold: detect access_code with up to threshold bits wrong (-1 -> use default) - @type threshold: int + Args: + demodulator: instance of demodulator class (gr_block or hier_block2) (complex baseband in) + access_code: AKA sync vector (string of 1's and 0's) + callback: function of two args: ok, payload (ok: bool; payload: string) + threshold: detect access_code with up to threshold bits wrong (-1 -> use default) (int) """ gr.hier_block2.__init__(self, "demod_pkts", @@ -141,9 +140,9 @@ class demod_pkts(gr.hier_block2): threshold = 12 # FIXME raise exception self._rcvd_pktq = gr.msg_queue() # holds packets from the PHY - self.correlator = digital_swig.correlate_access_code_bb(access_code, threshold) + self.correlator = digital.correlate_access_code_bb(access_code, threshold) - self.framer_sink = gr.framer_sink_1(self._rcvd_pktq) + self.framer_sink = digital.framer_sink_1(self._rcvd_pktq) self.connect(self, self._demodulator, self.correlator, self.framer_sink) self._watcher = _queue_watcher_thread(self._rcvd_pktq, callback) diff --git a/gr-digital/python/psk.py b/gr-digital/python/psk.py index 58f6787f0c..1816ffb4ba 100644 --- a/gr-digital/python/psk.py +++ b/gr-digital/python/psk.py @@ -30,22 +30,29 @@ import digital_swig import modulation_utils from utils import mod_codes, gray_code from generic_mod_demod import generic_mod, generic_demod +from generic_mod_demod import shared_mod_args, shared_demod_args # Default number of points in constellation. _def_constellation_points = 4 # The default encoding (e.g. gray-code, set-partition) _def_mod_code = mod_codes.GRAY_CODE +# Default use of differential encoding +_def_differential = True -def create_encodings(mod_code, arity): +def create_encodings(mod_code, arity, differential): post_diff_code = None if mod_code not in mod_codes.codes: raise ValueError('That modulation code does not exist.') if mod_code == mod_codes.GRAY_CODE: - pre_diff_code = gray_code.gray_code(arity) - elif mod_code == mod_codes.SET_PARTITION_CODE: - pre_diff_code = set_partition_code.set_partition_code(arity) + if differential: + pre_diff_code = gray_code.gray_code(arity) + post_diff_code = None + else: + pre_diff_code = [] + post_diff_code = gray_code.gray_code(arity) elif mod_code == mod_codes.NO_CODE: pre_diff_code = [] + post_diff_code = None else: raise ValueError('That modulation code is not implemented for this constellation.') return (pre_diff_code, post_diff_code) @@ -54,7 +61,8 @@ def create_encodings(mod_code, arity): # PSK constellation # ///////////////////////////////////////////////////////////////////////////// -def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code): +def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code, + differential=_def_differential): """ Creates a PSK constellation object. """ @@ -62,7 +70,7 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code): if (k != int(k)): raise StandardError('Number of constellation points must be a power of two.') points = [exp(2*pi*(0+1j)*i/m) for i in range(0,m)] - pre_diff_code, post_diff_code = create_encodings(mod_code, m) + pre_diff_code, post_diff_code = create_encodings(mod_code, m, differential) if post_diff_code is not None: inverse_post_diff_code = mod_codes.invert_code(post_diff_code) points = [points[x] for x in inverse_post_diff_code] @@ -74,22 +82,26 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code): # ///////////////////////////////////////////////////////////////////////////// class psk_mod(generic_mod): + """ + Hierarchical block for RRC-filtered PSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + constellation_points: Number of constellation points (must be a power of two) (integer). + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + differential: Whether to use differential encoding (boolean). + """ + # See generic_mod for additional arguments + __doc__ += shared_mod_args def __init__(self, constellation_points=_def_constellation_points, mod_code=_def_mod_code, + differential=_def_differential, *args, **kwargs): - - """ - Hierarchical block for RRC-filtered PSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_mod block for list of parameters. - """ - - constellation = psk_constellation(constellation_points, mod_code) - super(psk_mod, self).__init__(constellation, *args, **kwargs) + constellation = psk_constellation(constellation_points, mod_code, differential) + super(psk_mod, self).__init__(constellation, differential, *args, **kwargs) # ///////////////////////////////////////////////////////////////////////////// # PSK demodulator @@ -98,21 +110,25 @@ class psk_mod(generic_mod): class psk_demod(generic_demod): + """ + Hierarchical block for RRC-filtered PSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + constellation_points: Number of constellation points (must be a power of two) (integer). + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + differential: Whether to use differential encoding (boolean). + """ + # See generic_mod for additional arguments + __doc__ += shared_mod_args def __init__(self, constellation_points=_def_constellation_points, mod_code=_def_mod_code, + differential=_def_differential, *args, **kwargs): - - """ - Hierarchical block for RRC-filtered PSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_demod block for list of parameters. - """ - - constellation = psk_constellation(constellation_points, mod_code) - super(psk_demod, self).__init__(constellation, *args, **kwargs) + constellation = psk_constellation(constellation_points, mod_code, differential) + super(psk_demod, self).__init__(constellation, differential, *args, **kwargs) # # Add these to the mod/demod registry diff --git a/gr-digital/python/qa_binary_slicer_fb.py b/gr-digital/python/qa_binary_slicer_fb.py index 60d92c5d19..22f7da73ff 100755 --- a/gr-digital/python/qa_binary_slicer_fb.py +++ b/gr-digital/python/qa_binary_slicer_fb.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -21,33 +21,33 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital import math, random -class test_binary_slicer_fb (gr_unittest.TestCase): +class test_binary_slicer_fb(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_binary_slicer_fb (self): + def test_binary_slicer_fb(self): expected_result = ( 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1) src_data = (-1, 1, -1, -1, 1, 1, -1, -1, -1, 1, 1, 1, -1, 1, 1, 1, 1) src_data = [s + (1 - random.random()) for s in src_data] # add some noise - src = gr.vector_source_f (src_data) - op = digital_swig.binary_slicer_fb () - dst = gr.vector_sink_b () + src = gr.vector_source_f(src_data) + op = digital.binary_slicer_fb() + dst = gr.vector_sink_b() - self.tb.connect (src, op) - self.tb.connect (op, dst) - self.tb.run () # run the graph and wait for it to finish + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() # run the graph and wait for it to finish - actual_result = dst.data () # fetch the contents of the sink + actual_result = dst.data() # fetch the contents of the sink #print "actual result", actual_result #print "expected result", expected_result - self.assertFloatTuplesAlmostEqual (expected_result, actual_result) + self.assertFloatTuplesAlmostEqual(expected_result, actual_result) if __name__ == '__main__': diff --git a/gr-digital/python/qa_chunks_to_symbols.py b/gr-digital/python/qa_chunks_to_symbols.py index 63af10d8ff..5ffe425132 100755 --- a/gr-digital/python/qa_chunks_to_symbols.py +++ b/gr-digital/python/qa_chunks_to_symbols.py @@ -25,10 +25,10 @@ import digital_swig as digital class test_chunks_to_symbols(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_bc_001(self): diff --git a/gr-digital/python/qa_clock_recovery_mm.py b/gr-digital/python/qa_clock_recovery_mm.py index f4c345b034..e904cf4c21 100755 --- a/gr-digital/python/qa_clock_recovery_mm.py +++ b/gr-digital/python/qa_clock_recovery_mm.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -21,18 +21,18 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital import random, cmath class test_clock_recovery_mm(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test01 (self): + def test01(self): # Test complex/complex version omega = 2 gain_omega = 0.001 @@ -40,9 +40,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase): gain_mu = 0.01 omega_rel_lim = 0.001 - self.test = digital_swig.clock_recovery_mm_cc(omega, gain_omega, - mu, gain_mu, - omega_rel_lim) + self.test = digital.clock_recovery_mm_cc(omega, gain_omega, + mu, gain_mu, + omega_rel_lim) data = 100*[complex(1, 1),] self.src = gr.vector_source_c(data, False) @@ -64,10 +64,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase): #print expected_result #print dst_data - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5) - def test02 (self): + def test02(self): # Test float/float version omega = 2 gain_omega = 0.01 @@ -75,9 +75,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase): gain_mu = 0.01 omega_rel_lim = 0.001 - self.test = digital_swig.clock_recovery_mm_ff(omega, gain_omega, - mu, gain_mu, - omega_rel_lim) + self.test = digital.clock_recovery_mm_ff(omega, gain_omega, + mu, gain_mu, + omega_rel_lim) data = 100*[1,] self.src = gr.vector_source_f(data, False) @@ -99,10 +99,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase): #print expected_result #print dst_data - self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 5) + self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) - def test03 (self): + def test03(self): # Test complex/complex version with varying input omega = 2 gain_omega = 0.01 @@ -110,9 +110,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase): gain_mu = 0.1 omega_rel_lim = 0.0001 - self.test = digital_swig.clock_recovery_mm_cc(omega, gain_omega, - mu, gain_mu, - omega_rel_lim) + self.test = digital.clock_recovery_mm_cc(omega, gain_omega, + mu, gain_mu, + omega_rel_lim) data = 1000*[complex(1, 1), complex(1, 1), complex(-1, -1), complex(-1, -1)] self.src = gr.vector_source_c(data, False) @@ -134,10 +134,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase): #print expected_result #print dst_data - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) - def test04 (self): + def test04(self): # Test float/float version omega = 2 gain_omega = 0.01 @@ -145,9 +145,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase): gain_mu = 0.1 omega_rel_lim = 0.001 - self.test = digital_swig.clock_recovery_mm_ff(omega, gain_omega, - mu, gain_mu, - omega_rel_lim) + self.test = digital.clock_recovery_mm_ff(omega, gain_omega, + mu, gain_mu, + omega_rel_lim) data = 1000*[1, 1, -1, -1] self.src = gr.vector_source_f(data, False) @@ -169,7 +169,7 @@ class test_clock_recovery_mm(gr_unittest.TestCase): #print expected_result #print dst_data - self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 1) + self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 1) if __name__ == '__main__': diff --git a/gr-digital/python/qa_cma_equalizer.py b/gr-digital/python/qa_cma_equalizer.py index 75fb0f05ed..f71e199189 100755 --- a/gr-digital/python/qa_cma_equalizer.py +++ b/gr-digital/python/qa_cma_equalizer.py @@ -21,7 +21,7 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital class test_cma_equalizer_fir(gr_unittest.TestCase): @@ -33,7 +33,7 @@ class test_cma_equalizer_fir(gr_unittest.TestCase): def transform(self, src_data): SRC = gr.vector_source_c(src_data, False) - EQU = digital_swig.cma_equalizer_cc(4, 1.0, .001, 1) + EQU = digital.cma_equalizer_cc(4, 1.0, .001, 1) DST = gr.vector_sink_c() self.tb.connect(SRC, EQU, DST) self.tb.run() @@ -44,7 +44,9 @@ class test_cma_equalizer_fir(gr_unittest.TestCase): src_data = (1+0j, 0+1j, -1+0j, 0-1j)*1000 expected_data = src_data result = self.transform(src_data) - self.assertComplexTuplesAlmostEqual(expected_data, result) + + N = -500 + self.assertComplexTuplesAlmostEqual(expected_data[N:], result[N:]) if __name__ == "__main__": gr_unittest.run(test_cma_equalizer_fir, "test_cma_equalizer_fir.xml") diff --git a/gr-digital/python/qa_constellation.py b/gr-digital/python/qa_constellation.py index ddd8c71e64..a593c3ea3e 100755 --- a/gr-digital/python/qa_constellation.py +++ b/gr-digital/python/qa_constellation.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -23,9 +23,10 @@ import random from cmath import exp, pi, log -from gnuradio import gr, gr_unittest, blks2 +from gnuradio import gr, gr_unittest from utils import mod_codes -import digital_swig +import digital_swig as digital +import blocks_swig as blocks # import from local folder import psk @@ -51,7 +52,7 @@ def twod_constell(): (-1+0j), (0-1j)) rot_sym = 2 dim = 2 - return digital_swig.constellation_calcdist(points, [], rot_sym, dim) + return digital.constellation_calcdist(points, [], rot_sym, dim) def threed_constell(): oned_points = ((1+0j), (0+1j), (-1+0j), (0-1j)) @@ -63,7 +64,7 @@ def threed_constell(): points += [oned_points[ia], oned_points[ib], oned_points[ic]] rot_sym = 4 dim = 3 - return digital_swig.constellation_calcdist(points, [], rot_sym, dim) + return digital.constellation_calcdist(points, [], rot_sym, dim) # A list of tuples for constellation testing. The contents of the # tuples are (constructor, poss_args, differential, diff_argname). @@ -74,15 +75,25 @@ easy_constellation_info = ( {'m': (2, 4, 8, 16, ), 'mod_code': tested_mod_codes, }, True, None), + (psk.psk_constellation, + {'m': (2, 4, 8, 16, 32, 64), + 'mod_code': tested_mod_codes, + 'differential': (False,)}, + False, None), (qam.qam_constellation, {'constellation_points': (4,), 'mod_code': tested_mod_codes, 'large_ampls_to_corners': [False],}, True, None), - (digital_swig.constellation_bpsk, {}, True, None), - (digital_swig.constellation_qpsk, {}, False, None), - (digital_swig.constellation_dqpsk, {}, True, None), - (digital_swig.constellation_8psk, {}, False, None), + (qam.qam_constellation, + {'constellation_points': (4, 16, 64), + 'mod_code': tested_mod_codes, + 'differential': (False,)}, + False, None), + (digital.constellation_bpsk, {}, True, None), + (digital.constellation_qpsk, {}, False, None), + (digital.constellation_dqpsk, {}, True, None), + (digital.constellation_8psk, {}, False, None), (twod_constell, {}, True, None), (threed_constell, {}, True, None), ) @@ -150,7 +161,7 @@ def tested_constellations(easy=True, medium=True, difficult=True): break -class test_constellation (gr_unittest.TestCase): +class test_constellation(gr_unittest.TestCase): src_length = 256 @@ -178,7 +189,7 @@ class test_constellation (gr_unittest.TestCase): data = dst.data() # Don't worry about cut off data for now. first = constellation.bits_per_symbol() - self.assertEqual (self.src_data[first:len(data)], data[first:]) + self.assertEqual(self.src_data[first:len(data)], data[first:]) class mod_demod(gr.hier_block2): @@ -200,40 +211,39 @@ class mod_demod(gr.hier_block2): self.blocks = [self] # We expect a stream of unpacked bits. # First step is to pack them. - self.blocks.append( - gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST)) + self.blocks.append(blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST)) # Second step we unpack them such that we have k bits in each byte where # each constellation symbol hold k bits. self.blocks.append( - gr.packed_to_unpacked_bb(self.constellation.bits_per_symbol(), - gr.GR_MSB_FIRST)) + blocks.packed_to_unpacked_bb(self.constellation.bits_per_symbol(), + gr.GR_MSB_FIRST)) # Apply any pre-differential coding # Gray-coding is done here if we're also using differential coding. if self.constellation.apply_pre_diff_code(): - self.blocks.append(gr.map_bb(self.constellation.pre_diff_code())) + self.blocks.append(digital.map_bb(self.constellation.pre_diff_code())) # Differential encoding. if self.differential: - self.blocks.append(gr.diff_encoder_bb(arity)) + self.blocks.append(digital.diff_encoder_bb(arity)) # Convert to constellation symbols. - self.blocks.append(gr.chunks_to_symbols_bc(self.constellation.points(), - self.constellation.dimensionality())) + self.blocks.append(digital.chunks_to_symbols_bc(self.constellation.points(), + self.constellation.dimensionality())) # CHANNEL # Channel just consists of a rotation to check differential coding. if rotation is not None: - self.blocks.append(gr.multiply_const_cc(rotation)) + self.blocks.append(blocks.multiply_const_cc(rotation)) # RX # Convert the constellation symbols back to binary values. - self.blocks.append(digital_swig.constellation_decoder_cb(self.constellation.base())) + self.blocks.append(digital.constellation_decoder_cb(self.constellation.base())) # Differential decoding. if self.differential: - self.blocks.append(gr.diff_decoder_bb(arity)) + self.blocks.append(digital.diff_decoder_bb(arity)) # Decode any pre-differential coding. if self.constellation.apply_pre_diff_code(): - self.blocks.append(gr.map_bb( + self.blocks.append(digital.map_bb( mod_codes.invert_code(self.constellation.pre_diff_code()))) # unpack the k bit vector into a stream of bits - self.blocks.append(gr.unpack_k_bits_bb( + self.blocks.append(blocks.unpack_k_bits_bb( self.constellation.bits_per_symbol())) # connect to block output check_index = len(self.blocks) @@ -241,7 +251,6 @@ class mod_demod(gr.hier_block2): self.blocks.append(self) self.connect(*self.blocks) - if __name__ == '__main__': gr_unittest.run(test_constellation, "test_constellation.xml") diff --git a/gr-digital/python/qa_constellation_decoder_cb.py b/gr-digital/python/qa_constellation_decoder_cb.py index 5401a07fc0..6a93b6e743 100755 --- a/gr-digital/python/qa_constellation_decoder_cb.py +++ b/gr-digital/python/qa_constellation_decoder_cb.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2004,2007,2010,2011 Free Software Foundation, Inc. +# Copyright 2004,2007,2010-2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -21,54 +21,54 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital import math -class test_constellation_decoder (gr_unittest.TestCase): +class test_constellation_decoder(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_constellation_decoder_cb_bpsk (self): - cnst = digital_swig.constellation_bpsk() + def test_constellation_decoder_cb_bpsk(self): + cnst = digital.constellation_bpsk() src_data = (0.5 + 0.5j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j, 0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j) expected_result = ( 1, 1, 0, 0, 1, 0, 1) - src = gr.vector_source_c (src_data) - op = digital_swig.constellation_decoder_cb (cnst.base()) - dst = gr.vector_sink_b () + src = gr.vector_source_c(src_data) + op = digital.constellation_decoder_cb(cnst.base()) + dst = gr.vector_sink_b() - self.tb.connect (src, op) - self.tb.connect (op, dst) - self.tb.run () # run the graph and wait for it to finish + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() # run the graph and wait for it to finish - actual_result = dst.data () # fetch the contents of the sink + actual_result = dst.data() # fetch the contents of the sink #print "actual result", actual_result #print "expected result", expected_result - self.assertFloatTuplesAlmostEqual (expected_result, actual_result) + self.assertFloatTuplesAlmostEqual(expected_result, actual_result) - def test_constellation_decoder_cb_qpsk (self): - cnst = digital_swig.constellation_qpsk() + def _test_constellation_decoder_cb_qpsk(self): + cnst = digital.constellation_qpsk() src_data = (0.5 + 0.5j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j, 0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j) expected_result = ( 3, 1, 0, 2, 3, 2, 1) - src = gr.vector_source_c (src_data) - op = digital_swig.constellation_decoder_cb (cnst.base()) - dst = gr.vector_sink_b () + src = gr.vector_source_c(src_data) + op = digital_swig.constellation_decoder_cb(cnst.base()) + dst = gr.vector_sink_b() - self.tb.connect (src, op) - self.tb.connect (op, dst) - self.tb.run () # run the graph and wait for it to finish + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() # run the graph and wait for it to finish - actual_result = dst.data () # fetch the contents of the sink + actual_result = dst.data() # fetch the contents of the sink #print "actual result", actual_result #print "expected result", expected_result - self.assertFloatTuplesAlmostEqual (expected_result, actual_result) + self.assertFloatTuplesAlmostEqual(expected_result, actual_result) if __name__ == '__main__': diff --git a/gr-digital/python/qa_constellation_receiver.py b/gr-digital/python/qa_constellation_receiver.py index 37e56b4cf7..bc44220ea9 100755 --- a/gr-digital/python/qa_constellation_receiver.py +++ b/gr-digital/python/qa_constellation_receiver.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -22,9 +22,12 @@ import random -from gnuradio import gr, blks2, gr_unittest +from gnuradio import gr, gr_unittest from utils import mod_codes, alignment -import digital_swig, packet_utils +import packet_utils +import filter_swig as filter +import analog_swig as analog +import blocks_swig as blocks from generic_mod_demod import generic_mod, generic_demod from qa_constellation import tested_constellations, twod_constell @@ -37,7 +40,7 @@ SEED = 1239 # TESTING PARAMETERS # The number of symbols to test with. # We need this many to let the frequency recovery block converge. -DATA_LENGTH = 2000 +DATA_LENGTH = 1000 # Test fails if fraction of output that is correct is less than this. EASY_REQ_CORRECT = 0.9 # For constellations that aren't expected to work so well. @@ -52,8 +55,30 @@ TIMING_OFFSET = 1.0 FREQ_BW = 2*math.pi/100.0 PHASE_BW = 2*math.pi/100.0 +class channel_model(gr.hier_block2): + def __init__(self, noise_voltage, freq, timing): + gr.hier_block2.__init__(self, "channel_model", + gr.io_signature(1, 1, gr.sizeof_gr_complex), + gr.io_signature(1, 1, gr.sizeof_gr_complex)) + -class test_constellation_receiver (gr_unittest.TestCase): + timing_offset = filter.fractional_interpolator_cc(0, timing) + noise_adder = blocks.add_cc() + noise = analog.noise_source_c(analog.GR_GAUSSIAN, + noise_voltage, 0) + freq_offset = analog.sig_source_c(1, analog.GR_SIN_WAVE, + freq, 1.0, 0.0) + mixer_offset = blocks.multiply_cc(); + + self.connect(self, timing_offset) + self.connect(timing_offset, (mixer_offset,0)) + self.connect(freq_offset, (mixer_offset,1)) + self.connect(mixer_offset, (noise_adder,1)) + self.connect(noise, (noise_adder,0)) + self.connect(noise_adder, self) + + +class test_constellation_receiver(gr_unittest.TestCase): # We ignore the first half of the output data since often it takes # a while for the receiver to lock on. @@ -109,18 +134,20 @@ class test_constellation_receiver (gr_unittest.TestCase): self.assertTrue(correct > req_correct) -class rec_test_tb (gr.top_block): +class rec_test_tb(gr.top_block): """ Takes a constellation an runs a generic modulation, channel, and generic demodulation. """ def __init__(self, constellation, differential, - data_length=None, src_data=None): + data_length=None, src_data=None, freq_offset=True): """ - constellation -- a constellation object - differential -- whether differential encoding is used - data_length -- the number of bits of data to use - src_data -- a list of the bits to use + Args: + constellation: a constellation object + differential: whether differential encoding is used + data_length: the number of bits of data to use + src_data: a list of the bits to use + freq_offset: whether to use a frequency offset in the channel """ super(rec_test_tb, self).__init__() # Transmission Blocks @@ -128,15 +155,22 @@ class rec_test_tb (gr.top_block): self.src_data = tuple([rndm.randint(0,1) for i in range(0, data_length)]) else: self.src_data = src_data - packer = gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST) + packer = blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST) src = gr.vector_source_b(self.src_data) mod = generic_mod(constellation, differential=differential) # Channel - channel = gr.channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET) + if freq_offset: + channel = channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET) + else: + channel = channel_model(NOISE_VOLTAGE, 0, TIMING_OFFSET) # Receiver Blocks - demod = generic_demod(constellation, differential=differential, - freq_bw=FREQ_BW, - phase_bw=PHASE_BW) + if freq_offset: + demod = generic_demod(constellation, differential=differential, + freq_bw=FREQ_BW, + phase_bw=PHASE_BW) + else: + demod = generic_demod(constellation, differential=differential, + freq_bw=0, phase_bw=0) self.dst = gr.vector_sink_b() self.connect(src, packer, mod, channel, demod, self.dst) diff --git a/gr-digital/python/qa_correlate_access_code.py b/gr-digital/python/qa_correlate_access_code.py index 96246dcfb9..5a5f2209f7 100755 --- a/gr-digital/python/qa_correlate_access_code.py +++ b/gr-digital/python/qa_correlate_access_code.py @@ -52,13 +52,13 @@ class test_correlate_access_code(gr_unittest.TestCase): # 0 0 0 1 0 0 0 1 src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 expected_result = pad + (1, 0, 1, 1, 3, 1, 0, 1, 1, 2) + (0,) * 6 - src = gr.vector_source_b (src_data) + src = gr.vector_source_b(src_data) op = digital.correlate_access_code_bb("1011", 0) - dst = gr.vector_sink_b () - self.tb.connect (src, op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (expected_result, result_data) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) def test_002(self): @@ -69,13 +69,13 @@ class test_correlate_access_code(gr_unittest.TestCase): #print access_code src_data = code + (1, 0, 1, 1) + pad expected_result = pad + code + (3, 0, 1, 1) - src = gr.vector_source_b (src_data) + src = gr.vector_source_b(src_data) op = digital.correlate_access_code_bb(access_code, 0) - dst = gr.vector_sink_b () - self.tb.connect (src, op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (expected_result, result_data) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) def test_003(self): code = tuple(string_to_1_0_list(default_access_code)) @@ -85,14 +85,13 @@ class test_correlate_access_code(gr_unittest.TestCase): #print access_code src_data = code + (1, 0, 1, 1) + pad expected_result = code + (1, 0, 1, 1) + pad - src = gr.vector_source_b (src_data) + src = gr.vector_source_b(src_data) op = digital.correlate_access_code_tag_bb(access_code, 0, "test") - dst = gr.vector_sink_b () - self.tb.connect (src, op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (expected_result, result_data) - + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) if __name__ == '__main__': gr_unittest.run(test_correlate_access_code, "test_correlate_access_code.xml") diff --git a/gr-digital/python/qa_costas_loop_cc.py b/gr-digital/python/qa_costas_loop_cc.py index 75fdbc2f84..365eda736a 100755 --- a/gr-digital/python/qa_costas_loop_cc.py +++ b/gr-digital/python/qa_costas_loop_cc.py @@ -21,22 +21,23 @@ # from gnuradio import gr, gr_unittest -import digital_swig, psk +import digital_swig as digital +import psk import random, cmath class test_costas_loop_cc(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test01 (self): + def test01(self): # test basic functionality by setting all gains to 0 natfreq = 0.0 order = 2 - self.test = digital_swig.costas_loop_cc(natfreq, order) + self.test = digital.costas_loop_cc(natfreq, order) data = 100*[complex(1,0),] self.src = gr.vector_source_c(data, False) @@ -47,13 +48,13 @@ class test_costas_loop_cc(gr_unittest.TestCase): expected_result = data dst_data = self.snk.data() - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5) - def test02 (self): + def test02(self): # Make sure it doesn't diverge given perfect data natfreq = 0.25 order = 2 - self.test = digital_swig.costas_loop_cc(natfreq, order) + self.test = digital.costas_loop_cc(natfreq, order) data = [complex(2*random.randint(0,1)-1, 0) for i in xrange(100)] self.src = gr.vector_source_c(data, False) @@ -65,13 +66,13 @@ class test_costas_loop_cc(gr_unittest.TestCase): expected_result = data dst_data = self.snk.data() - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5) - def test03 (self): + def test03(self): # BPSK Convergence test with static rotation natfreq = 0.25 order = 2 - self.test = digital_swig.costas_loop_cc(natfreq, order) + self.test = digital.costas_loop_cc(natfreq, order) rot = cmath.exp(0.2j) # some small rotation data = [complex(2*random.randint(0,1)-1, 0) for i in xrange(100)] @@ -90,13 +91,13 @@ class test_costas_loop_cc(gr_unittest.TestCase): # generously compare results; the loop will converge near to, but # not exactly on, the target data - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2) - def test04 (self): + def test04(self): # QPSK Convergence test with static rotation natfreq = 0.25 order = 4 - self.test = digital_swig.costas_loop_cc(natfreq, order) + self.test = digital.costas_loop_cc(natfreq, order) rot = cmath.exp(0.2j) # some small rotation data = [complex(2*random.randint(0,1)-1, 2*random.randint(0,1)-1) @@ -116,13 +117,13 @@ class test_costas_loop_cc(gr_unittest.TestCase): # generously compare results; the loop will converge near to, but # not exactly on, the target data - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2) - def test05 (self): + def test05(self): # 8PSK Convergence test with static rotation natfreq = 0.25 order = 8 - self.test = digital_swig.costas_loop_cc(natfreq, order) + self.test = digital.costas_loop_cc(natfreq, order) rot = cmath.exp(-cmath.pi/8.0j) # rotate to match Costas rotation const = psk.psk_constellation(order) @@ -145,7 +146,7 @@ class test_costas_loop_cc(gr_unittest.TestCase): # generously compare results; the loop will converge near to, but # not exactly on, the target data - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2) if __name__ == '__main__': gr_unittest.run(test_costas_loop_cc, "test_costas_loop_cc.xml") diff --git a/gr-digital/python/qa_cpm.py b/gr-digital/python/qa_cpm.py index 12a84c76c2..070e69a982 100755 --- a/gr-digital/python/qa_cpm.py +++ b/gr-digital/python/qa_cpm.py @@ -21,15 +21,17 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital +import analog_swig as analog +import blocks_swig as blocks import numpy class test_cpm(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def do_check_phase_shift(self, type, name): @@ -37,8 +39,8 @@ class test_cpm(gr_unittest.TestCase): L = 1 in_bits = (1,) * 20 src = gr.vector_source_b(in_bits, False) - cpm = digital_swig.cpmmod_bc(type, 0.5, sps, L) - arg = gr.complex_to_arg() + cpm = digital.cpmmod_bc(type, 0.5, sps, L) + arg = blocks.complex_to_arg() sink = gr.vector_sink_f() self.tb.connect(src, cpm, arg, sink) @@ -51,16 +53,16 @@ class test_cpm(gr_unittest.TestCase): msg="Phase shift was not correct for CPM method " + name) def test_001_lrec(self): - self.do_check_phase_shift(gr.cpm.LRC, 'LREC') + self.do_check_phase_shift(analog.cpm.LRC, 'LREC') def test_001_lrc(self): - self.do_check_phase_shift(gr.cpm.LRC, 'LRC') + self.do_check_phase_shift(analog.cpm.LRC, 'LRC') def test_001_lsrc(self): - self.do_check_phase_shift(gr.cpm.LSRC, 'LSRC') + self.do_check_phase_shift(analog.cpm.LSRC, 'LSRC') def test_001_ltfm(self): - self.do_check_phase_shift(gr.cpm.TFM, 'TFM') + self.do_check_phase_shift(analog.cpm.TFM, 'TFM') def test_001_lgmsk(self): sps = 2 @@ -68,8 +70,8 @@ class test_cpm(gr_unittest.TestCase): bt = 0.3 in_bits = (1,) * 20 src = gr.vector_source_b(in_bits, False) - gmsk = digital_swig.gmskmod_bc(sps, bt, L) - arg = gr.complex_to_arg() + gmsk = digital.gmskmod_bc(sps, L, bt) + arg = blocks.complex_to_arg() sink = gr.vector_sink_f() self.tb.connect(src, gmsk, arg, sink) @@ -82,7 +84,7 @@ class test_cpm(gr_unittest.TestCase): msg="Phase shift was not correct for GMSK") def test_phase_response(self): - phase_response = gr.cpm.phase_response(gr.cpm.LREC, 2, 4) + phase_response = analog.cpm.phase_response(analog.cpm.LREC, 2, 4) self.assertAlmostEqual(numpy.sum(phase_response), 1) diff --git a/gr-digital/python/qa_crc32.py b/gr-digital/python/qa_crc32.py index f86813f3f3..cd4006b1d3 100755 --- a/gr-digital/python/qa_crc32.py +++ b/gr-digital/python/qa_crc32.py @@ -21,40 +21,40 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital import random, cmath class test_crc32(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test01 (self): + def test01(self): data = 100*"0" expected_result = 2943744955 - result = digital_swig.crc32(data) + result = digital.crc32(data) #print hex(result) - self.assertEqual (expected_result, result) + self.assertEqual(expected_result, result) - def test02 (self): + def test02(self): data = 100*"1" expected_result = 2326594156 - result = digital_swig.crc32(data) + result = digital.crc32(data) #print hex(result) - self.assertEqual (expected_result, result) + self.assertEqual(expected_result, result) - def test03 (self): + def test03(self): data = 10*"0123456789" expected_result = 3774345973 - result = digital_swig.crc32(data) + result = digital.crc32(data) #print hex(result) - self.assertEqual (expected_result, result) + self.assertEqual(expected_result, result) if __name__ == '__main__': gr_unittest.run(test_crc32, "test_crc32.xml") diff --git a/gr-digital/python/qa_diff_encoder.py b/gr-digital/python/qa_diff_encoder.py index e4f5470af5..c28f4dbdf8 100755 --- a/gr-digital/python/qa_diff_encoder.py +++ b/gr-digital/python/qa_diff_encoder.py @@ -32,12 +32,12 @@ def make_random_int_tuple(L, min, max): return tuple(result) -class test_diff_encoder (gr_unittest.TestCase): +class test_diff_encoder(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_diff_encdec_000(self): diff --git a/gr-digital/python/qa_diff_phasor_cc.py b/gr-digital/python/qa_diff_phasor_cc.py index 3e7617fe47..833158d0a8 100755 --- a/gr-digital/python/qa_diff_phasor_cc.py +++ b/gr-digital/python/qa_diff_phasor_cc.py @@ -24,25 +24,25 @@ from gnuradio import gr, gr_unittest import digital_swig as digital import math -class test_diff_phasor (gr_unittest.TestCase): +class test_diff_phasor(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_diff_phasor_cc (self): + def test_diff_phasor_cc(self): src_data = (0+0j, 1+0j, -1+0j, 3+4j, -3-4j, -3+4j) expected_result = (0+0j, 0+0j, -1+0j, -3-4j, -25+0j, -7-24j) - src = gr.vector_source_c (src_data) - op = digital.diff_phasor_cc () - dst = gr.vector_sink_c () - self.tb.connect (src, op) - self.tb.connect (op, dst) - self.tb.run () # run the graph and wait for it to finish - actual_result = dst.data () # fetch the contents of the sink - self.assertComplexTuplesAlmostEqual (expected_result, actual_result) + src = gr.vector_source_c(src_data) + op = digital.diff_phasor_cc() + dst = gr.vector_sink_c() + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() # run the graph and wait for it to finish + actual_result = dst.data() # fetch the contents of the sink + self.assertComplexTuplesAlmostEqual(expected_result, actual_result) if __name__ == '__main__': gr_unittest.run(test_diff_phasor, "test_diff_phasor.xml") diff --git a/gr-digital/python/qa_digital.py b/gr-digital/python/qa_digital.py index 97e35da568..6f54f14208 100755 --- a/gr-digital/python/qa_digital.py +++ b/gr-digital/python/qa_digital.py @@ -21,14 +21,14 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital class test_digital(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None if __name__ == '__main__': diff --git a/gr-digital/python/qa_fll_band_edge.py b/gr-digital/python/qa_fll_band_edge.py index 9e4ca079b7..0f6bad984e 100755 --- a/gr-digital/python/qa_fll_band_edge.py +++ b/gr-digital/python/qa_fll_band_edge.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -21,25 +21,28 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital +import filter_swig as filter +import blocks_swig as blocks +import analog_swig as analog import random, math class test_fll_band_edge_cc(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test01 (self): + def test01(self): sps = 4 rolloff = 0.35 bw = 2*math.pi/100.0 ntaps = 45 # Create pulse shape filter - rrc_taps = gr.firdes.root_raised_cosine( + rrc_taps = filter.firdes.root_raised_cosine( sps, sps, 1.0, rolloff, ntaps) # The frequency offset to correct @@ -49,14 +52,14 @@ class test_fll_band_edge_cc(gr_unittest.TestCase): random.seed(0) data = [2.0*random.randint(0, 2) - 1.0 for i in xrange(200)] self.src = gr.vector_source_c(data, False) - self.rrc = gr.interp_fir_filter_ccf(sps, rrc_taps) + self.rrc = filter.interp_fir_filter_ccf(sps, rrc_taps) # Mix symbols with a complex sinusoid to spin them - self.nco = gr.sig_source_c(1, gr.GR_SIN_WAVE, foffset, 1) - self.mix = gr.multiply_cc() + self.nco = analog.sig_source_c(1, analog.GR_SIN_WAVE, foffset, 1) + self.mix = blocks.multiply_cc() # FLL will despin the symbols to an arbitrary phase - self.fll = digital_swig.fll_band_edge_cc(sps, rolloff, ntaps, bw) + self.fll = digital.fll_band_edge_cc(sps, rolloff, ntaps, bw) # Create sinks for all outputs of the FLL # we will only care about the freq and error outputs @@ -78,7 +81,7 @@ class test_fll_band_edge_cc(gr_unittest.TestCase): dst_data = self.vsnk_frq.data()[N:] expected_result = len(dst_data)* [-0.20,] - self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 4) + self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 4) if __name__ == '__main__': gr_unittest.run(test_fll_band_edge_cc, "test_fll_band_edge_cc.xml") diff --git a/gr-digital/python/qa_framer_sink.py b/gr-digital/python/qa_framer_sink.py index bccc86dc78..e717e6ae05 100755 --- a/gr-digital/python/qa_framer_sink.py +++ b/gr-digital/python/qa_framer_sink.py @@ -63,11 +63,11 @@ class test_framker_sink(gr_unittest.TestCase): self.tb.connect(src, correlator, framer_sink) self.tb.connect(correlator, vsnk) - self.tb.run () + self.tb.run() result_data = rcvd_pktq.delete_head() result_data = result_data.to_string() - self.assertEqual (expected_data, result_data) + self.assertEqual(expected_data, result_data) def test_002(self): @@ -87,11 +87,11 @@ class test_framker_sink(gr_unittest.TestCase): self.tb.connect(src, correlator, framer_sink) self.tb.connect(correlator, vsnk) - self.tb.run () + self.tb.run() result_data = rcvd_pktq.delete_head() result_data = result_data.to_string() - self.assertEqual (expected_data, result_data) + self.assertEqual(expected_data, result_data) if __name__ == '__main__': gr_unittest.run(test_framker_sink, "test_framker_sink.xml") diff --git a/gr-digital/python/qa_glfsr_source.py b/gr-digital/python/qa_glfsr_source.py index 157520d7f8..c5adab3023 100755 --- a/gr-digital/python/qa_glfsr_source.py +++ b/gr-digital/python/qa_glfsr_source.py @@ -25,10 +25,10 @@ import digital_swig as digital class test_glfsr_source(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_000_make_b(self): @@ -38,9 +38,9 @@ class test_glfsr_source(gr_unittest.TestCase): def test_001_degree_b(self): self.assertRaises(RuntimeError, - lambda: gr.glfsr_source_b(0)) + lambda: digital.glfsr_source_b(0)) self.assertRaises(RuntimeError, - lambda: gr.glfsr_source_b(33)) + lambda: digital.glfsr_source_b(33)) def test_002_correlation_b(self): for degree in range(1,11): # Higher degrees take too long to correlate @@ -65,9 +65,9 @@ class test_glfsr_source(gr_unittest.TestCase): def test_004_degree_f(self): self.assertRaises(RuntimeError, - lambda: gr.glfsr_source_f(0)) + lambda: digital.glfsr_source_f(0)) self.assertRaises(RuntimeError, - lambda: gr.glfsr_source_f(33)) + lambda: digital.glfsr_source_f(33)) def test_005_correlation_f(self): for degree in range(1,11): # Higher degrees take too long to correlate src = digital.glfsr_source_f(degree, False) diff --git a/gr-digital/python/qa_bytes_to_syms.py b/gr-digital/python/qa_lfsr.py index 75475a95b2..d70c466ca7 100755 --- a/gr-digital/python/qa_bytes_to_syms.py +++ b/gr-digital/python/qa_lfsr.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc. +# Copyright 2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -24,28 +24,26 @@ from gnuradio import gr, gr_unittest import digital_swig as digital import math -class test_bytes_to_syms (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () +class test_lfsr(gr_unittest.TestCase): - def tearDown (self): - self.tb = None + def setUp(self): + pass - def test_bytes_to_syms_001 (self): - src_data = (0x01, 0x80, 0x03) - expected_result = (-1, -1, -1, -1, -1, -1, -1, +1, - +1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, +1, +1) - src = gr.vector_source_b (src_data) - op = digital.bytes_to_syms () - dst = gr.vector_sink_f () - self.tb.connect (src, op) - self.tb.connect (op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (expected_result, result_data) + def tearDown(self): + pass + + def test_lfsr_001(self): + reglen = 8 + l = digital.lfsr(1, 1, reglen) + + result_data = [] + for i in xrange(4*(reglen+1)): + result_data.append(l.next_bit()) + + expected_result = 4*([1,] + reglen*[0,]) + self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5) if __name__ == '__main__': - gr_unittest.run(test_bytes_to_syms, "test_bytes_to_syms.xml") + gr_unittest.run(test_lfsr, "test_lfsr.xml") diff --git a/gr-digital/python/qa_lms_equalizer.py b/gr-digital/python/qa_lms_equalizer.py index 025c785aa4..9ba90a89ab 100755 --- a/gr-digital/python/qa_lms_equalizer.py +++ b/gr-digital/python/qa_lms_equalizer.py @@ -21,7 +21,7 @@ # from gnuradio import gr, gr_unittest -import digital_swig +import digital_swig as digital class test_lms_dd_equalizer(gr_unittest.TestCase): @@ -33,7 +33,7 @@ class test_lms_dd_equalizer(gr_unittest.TestCase): def transform(self, src_data, gain, const): SRC = gr.vector_source_c(src_data, False) - EQU = digital_swig.lms_dd_equalizer_cc(4, gain, 1, const.base()) + EQU = digital.lms_dd_equalizer_cc(4, gain, 1, const.base()) DST = gr.vector_sink_c() self.tb.connect(SRC, EQU, DST) self.tb.run() @@ -41,13 +41,15 @@ class test_lms_dd_equalizer(gr_unittest.TestCase): def test_001_identity(self): # Constant modulus signal so no adjustments - const = digital_swig.constellation_qpsk() + const = digital.constellation_qpsk() src_data = const.points()*1000 N = 100 # settling time expected_data = src_data[N:] result = self.transform(src_data, 0.1, const)[N:] - self.assertComplexTuplesAlmostEqual(expected_data, result, 5) + + N = -500 + self.assertComplexTuplesAlmostEqual(expected_data[N:], result[N:], 5) if __name__ == "__main__": gr_unittest.run(test_lms_dd_equalizer, "test_lms_dd_equalizer.xml") diff --git a/gr-digital/python/qa_map.py b/gr-digital/python/qa_map.py index 3ad99a2c12..0fd7c479a1 100755 --- a/gr-digital/python/qa_map.py +++ b/gr-digital/python/qa_map.py @@ -34,14 +34,14 @@ class test_map(gr_unittest.TestCase): def helper(self, symbols): src_data = [0, 1, 2, 3, 0, 1, 2, 3] expected_data = map(lambda x: symbols[x], src_data) - src = gr.vector_source_b (src_data) + src = gr.vector_source_b(src_data) op = digital.map_bb(symbols) - dst = gr.vector_sink_b () - self.tb.connect (src, op, dst) - self.tb.run () + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() result_data = list(dst.data()) - self.assertEqual (expected_data, result_data) + self.assertEqual(expected_data, result_data) def test_001(self): symbols = [0, 0, 0, 0] diff --git a/gr-digital/python/qa_mpsk_receiver.py b/gr-digital/python/qa_mpsk_receiver.py index e1f16ee671..bde8895e76 100755 --- a/gr-digital/python/qa_mpsk_receiver.py +++ b/gr-digital/python/qa_mpsk_receiver.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -21,101 +21,128 @@ # from gnuradio import gr, gr_unittest -import digital_swig -import random, cmath +import digital_swig as digital +import filter_swig as filter +import random, cmath, time class test_mpsk_receiver(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test01 (self): + def test01(self): # Test BPSK sync M = 2 theta = 0 loop_bw = cmath.pi/100.0 fmin = -0.5 fmax = 0.5 - mu = 0.25 + mu = 0.5 gain_mu = 0.01 omega = 2 gain_omega = 0.001 omega_rel = 0.001 - self.test = digital_swig.mpsk_receiver_cc(M, theta, loop_bw, - fmin, fmax, mu, gain_mu, - omega, gain_omega, - omega_rel) + self.test = digital.mpsk_receiver_cc(M, theta, loop_bw, + fmin, fmax, mu, gain_mu, + omega, gain_omega, + omega_rel) - data = 1000*[complex(1,0), complex(1,0), complex(-1,0), complex(-1,0)] + data = 10000*[complex(1,0), complex(-1,0)] + #data = [2*random.randint(0,1)-1 for x in xrange(10000)] self.src = gr.vector_source_c(data, False) self.snk = gr.vector_sink_c() - self.tb.connect(self.src, self.test, self.snk) + # pulse shaping interpolation filter + nfilts = 32 + excess_bw = 0.35 + ntaps = 11 * int(omega*nfilts) + rrc_taps0 = filter.firdes.root_raised_cosine( + nfilts, nfilts, 1.0, excess_bw, ntaps) + rrc_taps1 = filter.firdes.root_raised_cosine( + 1, omega, 1.0, excess_bw, 11*omega) + self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0) + self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1) + + self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk) self.tb.run() - expected_result = 1000*[complex(-0.5,0), complex(0.5,0)] + expected_result = [0.5*d for d in data] dst_data = self.snk.data() # Only compare last Ncmp samples - Ncmp = 100 + Ncmp = 1000 len_e = len(expected_result) len_d = len(dst_data) - expected_result = expected_result[len_e - Ncmp:] + expected_result = expected_result[len_e - Ncmp-1:-1] dst_data = dst_data[len_d - Ncmp:] - + #for e,d in zip(expected_result, dst_data): - # print e, d + # print "{0:+.02f} {1:+.02f}".format(e, d) - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) - def test02 (self): + def test02(self): # Test QPSK sync M = 4 theta = 0 - loop_bw = 2*cmath.pi/100.0 + loop_bw = cmath.pi/100.0 fmin = -0.5 fmax = 0.5 - mu = 0.25 + mu = 0.5 gain_mu = 0.01 omega = 2 gain_omega = 0.001 omega_rel = 0.001 - self.test = digital_swig.mpsk_receiver_cc(M, theta, loop_bw, - fmin, fmax, mu, gain_mu, - omega, gain_omega, - omega_rel) + self.test = digital.mpsk_receiver_cc(M, theta, loop_bw, + fmin, fmax, mu, gain_mu, + omega, gain_omega, + omega_rel) - data = 1000*[complex( 0.707, 0.707), complex( 0.707, 0.707), - complex(-0.707, 0.707), complex(-0.707, 0.707), - complex(-0.707, -0.707), complex(-0.707, -0.707), - complex( 0.707, -0.707), complex( 0.707, -0.707)] + data = 10000*[complex( 0.707, 0.707), + complex(-0.707, 0.707), + complex(-0.707, -0.707), + complex( 0.707, -0.707)] + data = [0.5*d for d in data] self.src = gr.vector_source_c(data, False) self.snk = gr.vector_sink_c() - self.tb.connect(self.src, self.test, self.snk) + # pulse shaping interpolation filter + nfilts = 32 + excess_bw = 0.35 + ntaps = 11 * int(omega*nfilts) + rrc_taps0 = filter.firdes.root_raised_cosine( + nfilts, nfilts, 1.0, excess_bw, ntaps) + rrc_taps1 = filter.firdes.root_raised_cosine( + 1, omega, 1.0, excess_bw, 11*omega) + self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0) + self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1) + + self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk) self.tb.run() - expected_result = 1000*[complex(0, -1.0), complex(1.0, 0), - complex(0, 1.0), complex(-1.0, 0)] - dst_data = self.snk.data() + expected_result = 10000*[complex(-0.5, +0.0), complex(+0.0, -0.5), + complex(+0.5, +0.0), complex(+0.0, +0.5)] + + # get data after a settling period + dst_data = self.snk.data()[200:] # Only compare last Ncmp samples - Ncmp = 100 + Ncmp = 1000 len_e = len(expected_result) len_d = len(dst_data) - expected_result = expected_result[len_e - Ncmp:] + expected_result = expected_result[len_e - Ncmp - 1:-1] dst_data = dst_data[len_d - Ncmp:] #for e,d in zip(expected_result, dst_data): - # print e, d + # print "{0:+.02f} {1:+.02f}".format(e, d) - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) if __name__ == '__main__': gr_unittest.run(test_mpsk_receiver, "test_mpsk_receiver.xml") diff --git a/gr-digital/python/qa_mpsk_snr_est.py b/gr-digital/python/qa_mpsk_snr_est.py index d392567bfd..c976bf21a8 100755 --- a/gr-digital/python/qa_mpsk_snr_est.py +++ b/gr-digital/python/qa_mpsk_snr_est.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -29,94 +29,93 @@ def get_cplx(): def get_n_cplx(): return complex(random.random()-0.5, random.random()-0.5) -class test_mpsk_snr_est (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () +class test_mpsk_snr_est(gr_unittest.TestCase): + def setUp(self): + self.tb = gr.top_block() random.seed(0) # make repeatable N = 10000 self._noise = [get_n_cplx() for i in xrange(N)] self._bits = [get_cplx() for i in xrange(N)] - def tearDown (self): + def tearDown(self): self.tb = None - def mpsk_snr_est_setup (self, op): + def mpsk_snr_est_setup(self, op): result = [] for i in xrange(1,6): src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)] - src = gr.vector_source_c (src_data) - dst = gr.null_sink (gr.sizeof_gr_complex) + src = gr.vector_source_c(src_data) + dst = gr.null_sink(gr.sizeof_gr_complex) - tb = gr.top_block () - tb.connect (src, op) - tb.connect (op, dst) - tb.run () # run the graph and wait for it to finish + tb = gr.top_block() + tb.connect(src, op) + tb.connect(op, dst) + tb.run() # run the graph and wait for it to finish result.append(op.snr()) return result - def test_mpsk_snr_est_simple (self): + def test_mpsk_snr_est_simple(self): expected_result = [11.48, 5.91, 3.30, 2.08, 1.46] N = 10000 alpha = 0.001 - op = digital.mpsk_snr_est_cc (digital.SNR_EST_SIMPLE, N, alpha) + op = digital.mpsk_snr_est_cc(digital.SNR_EST_SIMPLE, N, alpha) actual_result = self.mpsk_snr_est_setup(op) - self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2) + self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2) - def test_mpsk_snr_est_skew (self): + def test_mpsk_snr_est_skew(self): expected_result = [11.48, 5.91, 3.30, 2.08, 1.46] N = 10000 alpha = 0.001 - op = digital.mpsk_snr_est_cc (digital.SNR_EST_SKEW, N, alpha) + op = digital.mpsk_snr_est_cc(digital.SNR_EST_SKEW, N, alpha) actual_result = self.mpsk_snr_est_setup(op) - self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2) + self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2) - def test_mpsk_snr_est_m2m4 (self): + def test_mpsk_snr_est_m2m4(self): expected_result = [11.02, 6.20, 4.98, 5.16, 5.66] N = 10000 alpha = 0.001 - op = digital.mpsk_snr_est_cc (digital.SNR_EST_M2M4, N, alpha) + op = digital.mpsk_snr_est_cc(digital.SNR_EST_M2M4, N, alpha) actual_result = self.mpsk_snr_est_setup(op) - self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2) + self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2) - def test_mpsk_snr_est_svn (self): + def test_mpsk_snr_est_svn(self): expected_result = [10.90, 6.00, 4.76, 4.97, 5.49] N = 10000 alpha = 0.001 - op = digital.mpsk_snr_est_cc (digital.SNR_EST_SVR, N, alpha) + op = digital.mpsk_snr_est_cc(digital.SNR_EST_SVR, N, alpha) actual_result = self.mpsk_snr_est_setup(op) - self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2) + self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2) - def test_probe_mpsk_snr_est_m2m4 (self): + def test_probe_mpsk_snr_est_m2m4(self): expected_result = [11.02, 6.20, 4.98, 5.16, 5.66] actual_result = [] for i in xrange(1,6): src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)] - src = gr.vector_source_c (src_data) + src = gr.vector_source_c(src_data) N = 10000 alpha = 0.001 - op = digital.probe_mpsk_snr_est_c (digital.SNR_EST_M2M4, N, alpha) + op = digital.probe_mpsk_snr_est_c(digital.SNR_EST_M2M4, N, alpha) - tb = gr.top_block () - tb.connect (src, op) - tb.run () # run the graph and wait for it to finish + tb = gr.top_block() + tb.connect(src, op) + tb.run() # run the graph and wait for it to finish actual_result.append(op.snr()) - self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2) - + self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2) if __name__ == '__main__': # Test various SNR estimators; we're not using a Gaussian diff --git a/gr-digital/python/qa_ofdm_insert_preamble.py b/gr-digital/python/qa_ofdm_insert_preamble.py index c45893fa38..60902edc14 100755 --- a/gr-digital/python/qa_ofdm_insert_preamble.py +++ b/gr-digital/python/qa_ofdm_insert_preamble.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2007,2010,2011 Free Software Foundation, Inc. +# Copyright 2007,2010-2012 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -22,14 +22,15 @@ from gnuradio import gr, gr_unittest from pprint import pprint -import digital_swig +import digital_swig as digital +import blocks_swig as blocks -class test_ofdm_insert_preamble (gr_unittest.TestCase): +class test_ofdm_insert_preamble(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def helper(self, v0, v1, fft_length, preamble): @@ -37,13 +38,13 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase): src0 = gr.vector_source_c(v0) src1 = gr.vector_source_b(v1) - s2v = gr.stream_to_vector(gr.sizeof_gr_complex, fft_length) + s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_length) # print "len(v) = %d" % (len(v)) - op = digital_swig.ofdm_insert_preamble(fft_length, preamble) + op = digital.ofdm_insert_preamble(fft_length, preamble) - v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_length) + v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, fft_length) dst0 = gr.vector_sink_c() dst1 = gr.vector_sink_b() @@ -105,7 +106,6 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase): p.append(tuple(t)) v += t - r = self.helper(v, npayloads*[1], fft_length, preamble) self.assertEqual(r[0], tuple(npayloads*[1, 0])) @@ -175,6 +175,5 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase): p0, p1, p[12], p[13], p0, p1, p[14], p[15])) - if __name__ == '__main__': gr_unittest.run(test_ofdm_insert_preamble, "test_ofdm_insert_preamble.xml") diff --git a/gr-digital/python/qa_pfb_clock_sync.py b/gr-digital/python/qa_pfb_clock_sync.py index 06c8a60ba7..4d0276bcd6 100755 --- a/gr-digital/python/qa_pfb_clock_sync.py +++ b/gr-digital/python/qa_pfb_clock_sync.py @@ -21,18 +21,19 @@ # from gnuradio import gr, gr_unittest +import filter_swig as filter import digital_swig as digital import random, cmath class test_pfb_clock_sync(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test01 (self): + def test01(self): # Test BPSK sync excess_bw = 0.35 @@ -44,36 +45,36 @@ class test_pfb_clock_sync(gr_unittest.TestCase): osps = 1 ntaps = 11 * int(sps*nfilts) - taps = gr.firdes.root_raised_cosine(nfilts, nfilts*sps, - 1.0, excess_bw, ntaps) + taps = filter.firdes.root_raised_cosine(nfilts, nfilts*sps, + 1.0, excess_bw, ntaps) self.test = digital.pfb_clock_sync_ccf(sps, loop_bw, taps, nfilts, init_phase, max_rate_deviation, osps) - data = 1000*[complex(1,0), complex(-1,0)] + data = 10000*[complex(1,0), complex(-1,0)] self.src = gr.vector_source_c(data, False) # pulse shaping interpolation filter - rrc_taps = gr.firdes.root_raised_cosine( + rrc_taps = filter.firdes.root_raised_cosine( nfilts, # gain nfilts, # sampling rate based on 32 filters in resampler 1.0, # symbol rate excess_bw, # excess bandwidth (roll-off factor) ntaps) - self.rrc_filter = gr.pfb_arb_resampler_ccf(sps, rrc_taps) + self.rrc_filter = filter.pfb_arb_resampler_ccf(sps, rrc_taps) self.snk = gr.vector_sink_c() self.tb.connect(self.src, self.rrc_filter, self.test, self.snk) self.tb.run() - expected_result = 1000*[complex(-1,0), complex(1,0)] + expected_result = 10000*[complex(-1,0), complex(1,0)] dst_data = self.snk.data() # Only compare last Ncmp samples - Ncmp = 100 + Ncmp = 1000 len_e = len(expected_result) len_d = len(dst_data) expected_result = expected_result[len_e - Ncmp:] @@ -82,10 +83,10 @@ class test_pfb_clock_sync(gr_unittest.TestCase): #for e,d in zip(expected_result, dst_data): # print e, d - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) - def test02 (self): + def test02(self): # Test real BPSK sync excess_bw = 0.35 @@ -97,36 +98,36 @@ class test_pfb_clock_sync(gr_unittest.TestCase): osps = 1 ntaps = 11 * int(sps*nfilts) - taps = gr.firdes.root_raised_cosine(nfilts, nfilts*sps, - 1.0, excess_bw, ntaps) + taps = filter.firdes.root_raised_cosine(nfilts, nfilts*sps, + 1.0, excess_bw, ntaps) self.test = digital.pfb_clock_sync_fff(sps, loop_bw, taps, nfilts, init_phase, max_rate_deviation, osps) - data = 1000*[1, -1] + data = 10000*[1, -1] self.src = gr.vector_source_f(data, False) # pulse shaping interpolation filter - rrc_taps = gr.firdes.root_raised_cosine( + rrc_taps = filter.firdes.root_raised_cosine( nfilts, # gain nfilts, # sampling rate based on 32 filters in resampler 1.0, # symbol rate excess_bw, # excess bandwidth (roll-off factor) ntaps) - self.rrc_filter = gr.pfb_arb_resampler_fff(sps, rrc_taps) + self.rrc_filter = filter.pfb_arb_resampler_fff(sps, rrc_taps) self.snk = gr.vector_sink_f() self.tb.connect(self.src, self.rrc_filter, self.test, self.snk) self.tb.run() - expected_result = 1000*[-1, 1] + expected_result = 10000*[-1, 1] dst_data = self.snk.data() # Only compare last Ncmp samples - Ncmp = 100 + Ncmp = 1000 len_e = len(expected_result) len_d = len(dst_data) expected_result = expected_result[len_e - Ncmp:] @@ -135,7 +136,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase): #for e,d in zip(expected_result, dst_data): # print e, d - self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1) + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) if __name__ == '__main__': diff --git a/gr-digital/python/qa_pn_correlator_cc.py b/gr-digital/python/qa_pn_correlator_cc.py index 377bef5feb..53633d04fa 100755 --- a/gr-digital/python/qa_pn_correlator_cc.py +++ b/gr-digital/python/qa_pn_correlator_cc.py @@ -22,11 +22,12 @@ from gnuradio import gr, gr_unittest import digital_swig as digital +import blocks_swig as blocks class test_pn_correlator_cc(gr_unittest.TestCase): def setUp(self): - self.tb = gr.top_block () + self.tb = gr.top_block() def tearDown(self): self.tb = None @@ -39,7 +40,7 @@ class test_pn_correlator_cc(gr_unittest.TestCase): length = 2**degree-1 src = digital.glfsr_source_f(degree) head = gr.head(gr.sizeof_float, length*length) - f2c = gr.float_to_complex() + f2c = blocks.float_to_complex() corr = digital.pn_correlator_cc(degree) dst = gr.vector_sink_c() self.tb.connect(src, head, f2c, corr, dst) diff --git a/gr-digital/python/qa_probe_density.py b/gr-digital/python/qa_probe_density.py index c5b7e0e7c2..f42f00a7f7 100755 --- a/gr-digital/python/qa_probe_density.py +++ b/gr-digital/python/qa_probe_density.py @@ -34,37 +34,37 @@ class test_probe_density(gr_unittest.TestCase): def test_001(self): src_data = [0, 1, 0, 1] expected_data = 1 - src = gr.vector_source_b (src_data) + src = gr.vector_source_b(src_data) op = digital.probe_density_b(1) - self.tb.connect (src, op) - self.tb.run () + self.tb.connect(src, op) + self.tb.run() result_data = op.density() - self.assertEqual (expected_data, result_data) + self.assertEqual(expected_data, result_data) def test_002(self): src_data = [1, 1, 1, 1] expected_data = 1 - src = gr.vector_source_b (src_data) + src = gr.vector_source_b(src_data) op = digital.probe_density_b(0.01) - self.tb.connect (src, op) - self.tb.run () + self.tb.connect(src, op) + self.tb.run() result_data = op.density() - self.assertEqual (expected_data, result_data) + self.assertEqual(expected_data, result_data) def test_003(self): src_data = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1] expected_data = 0.95243 - src = gr.vector_source_b (src_data) + src = gr.vector_source_b(src_data) op = digital.probe_density_b(0.01) - self.tb.connect (src, op) - self.tb.run () + self.tb.connect(src, op) + self.tb.run() result_data = op.density() print result_data - self.assertAlmostEqual (expected_data, result_data, 5) + self.assertAlmostEqual(expected_data, result_data, 5) if __name__ == '__main__': gr_unittest.run(test_probe_density, "test_probe_density.xml") diff --git a/gr-digital/python/qa_scrambler.py b/gr-digital/python/qa_scrambler.py index f5bd612429..3127a7c1e6 100755 --- a/gr-digital/python/qa_scrambler.py +++ b/gr-digital/python/qa_scrambler.py @@ -25,7 +25,7 @@ import digital_swig as digital class test_scrambler(gr_unittest.TestCase): - def setUp (self): + def setUp(self): self.tb = gr.top_block() def tearDown(self): diff --git a/gr-digital/python/qa_simple_correlator.py b/gr-digital/python/qa_simple_correlator.py index 124201a556..ff0faeb415 100755 --- a/gr-digital/python/qa_simple_correlator.py +++ b/gr-digital/python/qa_simple_correlator.py @@ -21,6 +21,8 @@ # from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import filter_swig as filter import digital_swig as digital class test_simple_correlator(gr_unittest.TestCase): @@ -40,15 +42,15 @@ class test_simple_correlator(gr_unittest.TestCase): # Filter taps to expand the data to oversample by 8 # Just using a RRC for some basic filter shape - taps = gr.firdes.root_raised_cosine(8, 8, 1.0, 0.5, 21) + taps = filter.firdes.root_raised_cosine(8, 8, 1.0, 0.5, 21) src = gr.vector_source_b(expected_result) frame = digital.simple_framer(4) - unpack = gr.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) - expand = gr.interp_fir_filter_fff(8, taps) - b2f = gr.char_to_float() - mult2 = gr.multiply_const_ff(2) - sub1 = gr.add_const_ff(-1) + unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) + expand = filter.interp_fir_filter_fff(8, taps) + b2f = blocks.char_to_float() + mult2 = blocks.multiply_const_ff(2) + sub1 = blocks.add_const_ff(-1) op = digital.simple_correlator(4) dst = gr.vector_sink_b() self.tb.connect(src, frame, unpack, b2f, mult2, sub1, expand) diff --git a/gr-digital/python/qa_simple_framer.py b/gr-digital/python/qa_simple_framer.py index 09b2d329b2..f8c894da28 100755 --- a/gr-digital/python/qa_simple_framer.py +++ b/gr-digital/python/qa_simple_framer.py @@ -24,15 +24,15 @@ from gnuradio import gr, gr_unittest import digital_swig as digital import math -class test_simple_framer (gr_unittest.TestCase): +class test_simple_framer(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_simple_framer_001 (self): + def test_simple_framer_001(self): src_data = (0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, @@ -44,15 +44,14 @@ class test_simple_framer (gr_unittest.TestCase): 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x02, 0x88, 0x99, 0xaa, 0xbb, 0x55, 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x03, 0xcc, 0xdd, 0xee, 0xff, 0x55) - src = gr.vector_source_b (src_data) - op = digital.simple_framer (4) - dst = gr.vector_sink_b () - self.tb.connect (src, op) - self.tb.connect (op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (expected_result, result_data) - + src = gr.vector_source_b(src_data) + op = digital.simple_framer(4) + dst = gr.vector_sink_b() + self.tb.connect(src, op) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) if __name__ == '__main__': gr_unittest.run(test_simple_framer, "test_simple_framer.xml") diff --git a/gr-digital/python/qam.py b/gr-digital/python/qam.py index 6834e1945a..518be78941 100644 --- a/gr-digital/python/qam.py +++ b/gr-digital/python/qam.py @@ -1,5 +1,5 @@ # -# Copyright 2005,2006,2011 Free Software Foundation, Inc. +# Copyright 2005,2006,2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -27,10 +27,11 @@ from math import pi, sqrt, log from gnuradio import gr from generic_mod_demod import generic_mod, generic_demod +from generic_mod_demod import shared_mod_args, shared_demod_args from utils.gray_code import gray_code from utils import mod_codes import modulation_utils -import digital_swig +import digital_swig as digital # Default number of points in constellation. _def_constellation_points = 16 @@ -165,21 +166,23 @@ def qam_constellation(constellation_points=_def_constellation_points, else: raise ValueError("Mod code is not implemented for QAM") if differential: - points = make_differential_constellation(constellation_points, gray_coded) + points = make_differential_constellation(constellation_points, gray_coded=False) else: points = make_non_differential_constellation(constellation_points, gray_coded) side = int(sqrt(constellation_points)) width = 2.0/(side-1) + # No pre-diff code # Should add one so that we can gray-code the quadrant bits too. pre_diff_code = [] if not large_ampls_to_corners: - constellation = digital_swig.constellation_rect(points, pre_diff_code, 4, + constellation = digital.constellation_rect(points, pre_diff_code, 4, side, side, width, width) else: sector_values = large_ampls_to_corners_mapping(side, points, width) - constellation = digital_swig.constellation_expl_rect( + constellation = digital.constellation_expl_rect( points, pre_diff_code, 4, side, side, width, width, sector_values) + return constellation def find_closest_point(p, qs): @@ -257,6 +260,19 @@ def large_ampls_to_corners_mapping(side, points, width): # ///////////////////////////////////////////////////////////////////////////// class qam_mod(generic_mod): + """ + Hierarchical block for RRC-filtered QAM modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + constellation_points: Number of constellation points (must be a power of four) (integer). + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + differential: Whether to use differential encoding (boolean). + """ + # See generic_mod for additional arguments + __doc__ += shared_mod_args def __init__(self, constellation_points=_def_constellation_points, differential=_def_differential, @@ -291,13 +307,25 @@ class qam_mod(generic_mod): # ///////////////////////////////////////////////////////////////////////////// class qam_demod(generic_demod): + """ + Hierarchical block for RRC-filtered QAM modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + constellation_points: Number of constellation points (must be a power of four) (integer). + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + differential: Whether to use differential encoding (boolean). + """ + # See generic_demod for additional arguments + __doc__ += shared_mod_args def __init__(self, constellation_points=_def_constellation_points, differential=_def_differential, mod_code=_def_mod_code, large_ampls_to_corner = False, *args, **kwargs): - """ Hierarchical block for RRC-filtered QAM modulation. diff --git a/gr-digital/python/qpsk.py b/gr-digital/python/qpsk.py index be21fd76f1..859d981367 100644 --- a/gr-digital/python/qpsk.py +++ b/gr-digital/python/qpsk.py @@ -27,50 +27,59 @@ Demodulation is not included since the generic_mod_demod from gnuradio import gr from gnuradio.digital.generic_mod_demod import generic_mod, generic_demod -import digital_swig +from gnuradio.digital.generic_mod_demod import shared_mod_args, shared_demod_args +from utils import mod_codes +import digital_swig as digital import modulation_utils -# Default number of points in constellation. -_def_constellation_points = 4 -# Whether gray coding is used. -_def_gray_coded = True +# The default encoding (e.g. gray-code, set-partition) +_def_mod_code = mod_codes.GRAY_CODE # ///////////////////////////////////////////////////////////////////////////// # QPSK constellation # ///////////////////////////////////////////////////////////////////////////// -def qpsk_constellation(m=_def_constellation_points): - if m != _def_constellation_points: - raise ValueError("QPSK can only have 4 constellation points.") - return digital_swig.constellation_qpsk() +def qpsk_constellation(mod_code=_def_mod_code): + """ + Creates a QPSK constellation. + """ + if mod_code != mod_codes.GRAY_CODE: + raise ValueError("This QPSK mod/demod works only for gray-coded constellations.") + return digital.constellation_qpsk() # ///////////////////////////////////////////////////////////////////////////// # QPSK modulator # ///////////////////////////////////////////////////////////////////////////// class qpsk_mod(generic_mod): - - def __init__(self, constellation_points=_def_constellation_points, - gray_coded=_def_gray_coded, - *args, **kwargs): - - """ - Hierarchical block for RRC-filtered QPSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_mod block for list of parameters. - """ - - constellation_points = _def_constellation_points - constellation = digital_swig.constellation_qpsk() - if constellation_points != 4: - raise ValueError("QPSK can only have 4 constellation points.") - if not gray_coded: - raise ValueError("This QPSK mod/demod works only for gray-coded constellations.") + """ + Hierarchical block for RRC-filtered QPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + differential: Whether to use differential encoding (boolean). + """ + # See generic_mod for additional arguments + __doc__ += shared_mod_args + + def __init__(self, mod_code=_def_mod_code, differential=False, *args, **kwargs): + pre_diff_code = True + if not differential: + constellation = digital.constellation_qpsk() + if mod_code != mod_codes.GRAY_CODE: + raise ValueError("This QPSK mod/demod works only for gray-coded constellations.") + else: + constellation = digital.constellation_dqpsk() + if mod_code not in set([mod_codes.GRAY_CODE, mod_codes.NO_CODE]): + raise ValueError("That mod_code is not supported for DQPSK mod/demod.") + if mod_code == mod_codes.NO_CODE: + pre_diff_code = False + super(qpsk_mod, self).__init__(constellation=constellation, - gray_coded=gray_coded, + pre_diff_code=pre_diff_code, *args, **kwargs) @@ -80,24 +89,35 @@ class qpsk_mod(generic_mod): # ///////////////////////////////////////////////////////////////////////////// class qpsk_demod(generic_demod): - - def __init__(self, constellation_points=_def_constellation_points, + """ + Hierarchical block for RRC-filtered QPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + differential: Whether to use differential encoding (boolean). + """ + # See generic_mod for additional arguments + __doc__ += shared_demod_args + + def __init__(self, mod_code=_def_mod_code, differential=False, *args, **kwargs): + pre_diff_code = True + if not differential: + constellation = digital.constellation_qpsk() + if mod_code != mod_codes.GRAY_CODE: + raise ValueError("This QPSK mod/demod works only for gray-coded constellations.") + else: + constellation = digital.constellation_dqpsk() + if mod_code not in set([mod_codes.GRAY_CODE, mod_codes.NO_CODE]): + raise ValueError("That mod_code is not supported for DQPSK mod/demod.") + if mod_code == mod_codes.NO_CODE: + pre_diff_code = False - """ - Hierarchical block for RRC-filtered QPSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_demod block for list of parameters. - """ - - constellation_points = _def_constellation_points - constellation = digital_swig.constellation_qpsk() - if constellation_points != 4: - raise ValueError('Number of constellation points must be 4 for QPSK.') super(qpsk_demod, self).__init__(constellation=constellation, + pre_diff_code=pre_diff_code, *args, **kwargs) @@ -106,36 +126,30 @@ class qpsk_demod(generic_demod): # DQPSK constellation # ///////////////////////////////////////////////////////////////////////////// -def dqpsk_constellation(m=_def_constellation_points): - if m != _def_constellation_points: - raise ValueError("DQPSK can only have 4 constellation points.") - return digital_swig.constellation_dqpsk() +def dqpsk_constellation(mod_code=_def_mod_code): + if mod_code != mod_codes.GRAY_CODE: + raise ValueError("The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.") + return digital.constellation_dqpsk() # ///////////////////////////////////////////////////////////////////////////// # DQPSK modulator # ///////////////////////////////////////////////////////////////////////////// -class dqpsk_mod(generic_mod): - - def __init__(self, constellation_points=_def_constellation_points, - gray_coded=_def_gray_coded, - differential=True, *args, **kwargs): - """ - Hierarchical block for RRC-filtered DQPSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_mod block for list of parameters. - """ - - constellation_points = _def_constellation_points - constellation = digital_swig.constellation_dqpsk() - if constellation_points != 4: - raise ValueError('Number of constellation points must be 4 for DQPSK.') - super(dqpsk_mod, self).__init__(constellation=constellation, - gray_coded=gray_coded, - differential=True, +class dqpsk_mod(qpsk_mod): + """ + Hierarchical block for RRC-filtered DQPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + """ + # See generic_mod for additional arguments + __doc__ += shared_mod_args + + def __init__(self, mod_code=_def_mod_code, *args, **kwargs): + super(dqpsk_mod, self).__init__(mod_code, *args, **kwargs) # ///////////////////////////////////////////////////////////////////////////// @@ -143,25 +157,21 @@ class dqpsk_mod(generic_mod): # # ///////////////////////////////////////////////////////////////////////////// -class dqpsk_demod(generic_demod): - - def __init__(self, constellation_points=_def_constellation_points, - differential=True, *args, **kwargs): - - """ - Hierarchical block for RRC-filtered DQPSK modulation. - - The input is a byte stream (unsigned char) and the - output is the complex modulated signal at baseband. - - See generic_demod block for list of parameters. - """ - constellation_points = _def_constellation_points - constellation = digital_swig.constellation_dqpsk() - if constellation_points != 4: - raise ValueError('Number of constellation points must be 4 for DQPSK.') - super(dqpsk_demod, self).__init__(constellation=constellation, - differential=True, +class dqpsk_demod(qpsk_demod): + """ + Hierarchical block for RRC-filtered DQPSK modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + mod_code: Whether to use a gray_code (digital.mod_codes.GRAY_CODE) or not (digital.mod_codes.NO_CODE). + """ + # See generic_mod for additional arguments + __doc__ += shared_demod_args + + def __init__(self, mod_code=_def_mod_code, *args, **kwargs): + super(dqpsk_demod, self).__init__(mod_code, *args, **kwargs) # @@ -173,4 +183,3 @@ modulation_utils.add_type_1_constellation('qpsk', qpsk_constellation) modulation_utils.add_type_1_mod('dqpsk', dqpsk_mod) modulation_utils.add_type_1_demod('dqpsk', dqpsk_demod) modulation_utils.add_type_1_constellation('dqpsk', dqpsk_constellation) - diff --git a/gr-digital/python/utils/mod_codes.py b/gr-digital/python/utils/mod_codes.py index caacda5cc6..f55fe41b8b 100644 --- a/gr-digital/python/utils/mod_codes.py +++ b/gr-digital/python/utils/mod_codes.py @@ -20,6 +20,7 @@ # Boston, MA 02110-1301, USA. # +# Constants used to represent what coding to use. GRAY_CODE = 'gray' SET_PARTITION_CODE = 'set-partition' NO_CODE = 'none' |