diff options
author | Ben Reynwar <ben@reynwar.net> | 2013-04-02 23:04:08 -0700 |
---|---|---|
committer | Ben Reynwar <ben@reynwar.net> | 2013-04-02 23:04:08 -0700 |
commit | c6dbde23b256a41b3d92cb4ad6b63287095d53c7 (patch) | |
tree | 71db12ea2e1667770c22568dcdf5e0857d5f1e59 /gr-digital/python/digital | |
parent | 22b70d0889ef3c51e27a31ee18d153093a55cbb8 (diff) | |
parent | 98758cbfa9a2aff714952d19e773bc370dfa2185 (diff) |
Merged next into uninstalled import branch.
Diffstat (limited to 'gr-digital/python/digital')
52 files changed, 2274 insertions, 240 deletions
diff --git a/gr-digital/python/digital/CMakeLists.txt b/gr-digital/python/digital/CMakeLists.txt index 8162c6fe20..919509f434 100644 --- a/gr-digital/python/digital/CMakeLists.txt +++ b/gr-digital/python/digital/CMakeLists.txt @@ -39,6 +39,7 @@ GR_PYTHON_INSTALL( ofdm_sync_ml.py ofdm_sync_pnac.py ofdm_sync_pn.py + ofdm_txrx.py packet_utils.py pkt.py psk.py @@ -55,6 +56,7 @@ GR_PYTHON_INSTALL( utils/gray_code.py utils/mod_codes.py utils/alignment.py + utils/tagged_streams.py DESTINATION ${GR_PYTHON_DIR}/gnuradio/digital/utils COMPONENT "digital_python" ) diff --git a/gr-digital/python/digital/__init__.py b/gr-digital/python/digital/__init__.py index d37c900929..5059e4eec8 100644 --- a/gr-digital/python/digital/__init__.py +++ b/gr-digital/python/digital/__init__.py @@ -49,6 +49,7 @@ from ofdm_sync_fixed import * from ofdm_sync_ml import * from ofdm_sync_pnac import * from ofdm_sync_pn import * +from ofdm_txrx import ofdm_tx, ofdm_rx import packet_utils import ofdm_packet_utils diff --git a/gr-digital/python/digital/cpm.py b/gr-digital/python/digital/cpm.py index a9915ba91a..b27fb098f5 100644 --- a/gr-digital/python/digital/cpm.py +++ b/gr-digital/python/digital/cpm.py @@ -26,6 +26,7 @@ from gnuradio import gr, filter from gnuradio import analog +from gnuradio import blocks from math import pi import numpy @@ -195,13 +196,13 @@ class cpm_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.B2s, - gr.file_sink(gr.sizeof_float, "symbols.dat")) + blocks.file_sink(gr.sizeof_float, "symbols.dat")) self.connect(self.pam, - gr.file_sink(gr.sizeof_float, "pam.dat")) + blocks.file_sink(gr.sizeof_float, "pam.dat")) self.connect(self.filter, - gr.file_sink(gr.sizeof_float, "filter.dat")) + blocks.file_sink(gr.sizeof_float, "filter.dat")) self.connect(self.fmmod, - gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) + blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) def add_options(parser): diff --git a/gr-digital/python/digital/digital_voice.py.real b/gr-digital/python/digital/digital_voice.py.real index 4a2ef7721f..241a4a3dc2 100644 --- a/gr-digital/python/digital/digital_voice.py.real +++ b/gr-digital/python/digital/digital_voice.py.real @@ -23,8 +23,7 @@ """ Digital voice Tx and Rx using GSM 13kbit vocoder and GMSK. -Runs channel at 32kbit/sec. Currently uses fake channel coding, -but there's room for a rate 1/2 coder. +Runs channel at 32kbit/sec. """ from gnuradio import gr, gru @@ -66,7 +65,7 @@ class digital_voice_tx(gr.hier_block): f2s = blocks.float_to_short() voice_coder = gsm_full_rate.encode_sp() - channel_coder = gr.fake_channel_encoder_pp(GSM_FRAME_SIZE, AIR_FRAME_SIZE) + channel_coder = gr.multiply_const_b(1) p2s = gr.parallel_to_serial(gr.sizeof_char, AIR_FRAME_SIZE) mod = gmsk_mod(fg, sps=samples_per_symbol, @@ -93,7 +92,7 @@ class digital_voice_rx(gr.hier_block): p_size=AIR_FRAME_SIZE) s2p = gr.serial_to_parallel(gr.sizeof_char, AIR_FRAME_SIZE) - channel_decoder = gr.fake_channel_decoder_pp(AIR_FRAME_SIZE, GSM_FRAME_SIZE) + channel_decoder = gr.multiply_const_b(1) voice_decoder = gsm_full_rate.decode_ps() s2f = blocks.short_to_float () diff --git a/gr-digital/python/digital/generic_mod_demod.py b/gr-digital/python/digital/generic_mod_demod.py index f5b2084766..b812fe1c37 100644 --- a/gr-digital/python/digital/generic_mod_demod.py +++ b/gr-digital/python/digital/generic_mod_demod.py @@ -198,17 +198,17 @@ class generic_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.bytes2chunks, - gr.file_sink(gr.sizeof_char, "tx_bytes2chunks.8b")) + blocks.file_sink(gr.sizeof_char, "tx_bytes2chunks.8b")) if self.pre_diff_code: self.connect(self.symbol_mapper, - gr.file_sink(gr.sizeof_char, "tx_symbol_mapper.8b")) + blocks.file_sink(gr.sizeof_char, "tx_symbol_mapper.8b")) if self._differential: self.connect(self.diffenc, - gr.file_sink(gr.sizeof_char, "tx_diffenc.8b")) + blocks.file_sink(gr.sizeof_char, "tx_diffenc.8b")) self.connect(self.chunks2symbols, - gr.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.32fc")) self.connect(self.rrc_filter, - gr.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.32fc")) # ///////////////////////////////////////////////////////////////////////////// @@ -338,39 +338,39 @@ class generic_demod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.agc, - gr.file_sink(gr.sizeof_gr_complex, "rx_agc.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "rx_agc.32fc")) self.connect((self.freq_recov, 0), - gr.file_sink(gr.sizeof_gr_complex, "rx_freq_recov.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "rx_freq_recov.32fc")) self.connect((self.freq_recov, 1), - gr.file_sink(gr.sizeof_float, "rx_freq_recov_freq.32f")) + blocks.file_sink(gr.sizeof_float, "rx_freq_recov_freq.32f")) self.connect((self.freq_recov, 2), - gr.file_sink(gr.sizeof_float, "rx_freq_recov_phase.32f")) + blocks.file_sink(gr.sizeof_float, "rx_freq_recov_phase.32f")) self.connect((self.freq_recov, 3), - gr.file_sink(gr.sizeof_float, "rx_freq_recov_error.32f")) + blocks.file_sink(gr.sizeof_float, "rx_freq_recov_error.32f")) self.connect((self.time_recov, 0), - gr.file_sink(gr.sizeof_gr_complex, "rx_time_recov.32fc")) + blocks.file_sink(gr.sizeof_gr_complex, "rx_time_recov.32fc")) self.connect((self.time_recov, 1), - gr.file_sink(gr.sizeof_float, "rx_time_recov_error.32f")) + blocks.file_sink(gr.sizeof_float, "rx_time_recov_error.32f")) self.connect((self.time_recov, 2), - gr.file_sink(gr.sizeof_float, "rx_time_recov_rate.32f")) + blocks.file_sink(gr.sizeof_float, "rx_time_recov_rate.32f")) self.connect((self.time_recov, 3), - gr.file_sink(gr.sizeof_float, "rx_time_recov_phase.32f")) + blocks.file_sink(gr.sizeof_float, "rx_time_recov_phase.32f")) self.connect((self.receiver, 0), - gr.file_sink(gr.sizeof_char, "rx_receiver.8b")) + blocks.file_sink(gr.sizeof_char, "rx_receiver.8b")) self.connect((self.receiver, 1), - gr.file_sink(gr.sizeof_float, "rx_receiver_error.32f")) + blocks.file_sink(gr.sizeof_float, "rx_receiver_error.32f")) self.connect((self.receiver, 2), - gr.file_sink(gr.sizeof_float, "rx_receiver_phase.32f")) + blocks.file_sink(gr.sizeof_float, "rx_receiver_phase.32f")) self.connect((self.receiver, 3), - gr.file_sink(gr.sizeof_float, "rx_receiver_freq.32f")) + blocks.file_sink(gr.sizeof_float, "rx_receiver_freq.32f")) if self._differential: self.connect(self.diffdec, - gr.file_sink(gr.sizeof_char, "rx_diffdec.8b")) + blocks.file_sink(gr.sizeof_char, "rx_diffdec.8b")) if self.pre_diff_code: self.connect(self.symbol_mapper, - gr.file_sink(gr.sizeof_char, "rx_symbol_mapper.8b")) + blocks.file_sink(gr.sizeof_char, "rx_symbol_mapper.8b")) self.connect(self.unpack, - gr.file_sink(gr.sizeof_char, "rx_unpack.8b")) + blocks.file_sink(gr.sizeof_char, "rx_unpack.8b")) def add_options(parser): """ diff --git a/gr-digital/python/digital/gfsk.py b/gr-digital/python/digital/gfsk.py index e4aeef8ef9..6ba007ca0f 100644 --- a/gr-digital/python/digital/gfsk.py +++ b/gr-digital/python/digital/gfsk.py @@ -116,7 +116,7 @@ class gfsk_mod(gr.hier_block2): self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps) # FM modulation - self.fmmod = frequency.frequency_modulator_fc(sensitivity) + self.fmmod = analog.frequency_modulator_fc(sensitivity) # small amount of output attenuation to prevent clipping USRP sink self.amp = blocks.multiply_const_cc(0.999) @@ -146,11 +146,11 @@ class gfsk_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.nrz, - gr.file_sink(gr.sizeof_float, "nrz.dat")) + blocks.file_sink(gr.sizeof_float, "nrz.dat")) self.connect(self.gaussian_filter, - gr.file_sink(gr.sizeof_float, "gaussian_filter.dat")) + blocks.file_sink(gr.sizeof_float, "gaussian_filter.dat")) self.connect(self.fmmod, - gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) + blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) def add_options(parser): @@ -272,11 +272,11 @@ class gfsk_demod(gr.hier_block2): def _setup_logging(self): print "Demodulation logging turned on." self.connect(self.fmdemod, - gr.file_sink(gr.sizeof_float, "fmdemod.dat")) + blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) self.connect(self.clock_recovery, - gr.file_sink(gr.sizeof_float, "clock_recovery.dat")) + blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) self.connect(self.slicer, - gr.file_sink(gr.sizeof_char, "slicer.dat")) + blocks.file_sink(gr.sizeof_char, "slicer.dat")) def add_options(parser): """ diff --git a/gr-digital/python/digital/gmsk.py b/gr-digital/python/digital/gmsk.py index 53a5640259..9a44837002 100644 --- a/gr-digital/python/digital/gmsk.py +++ b/gr-digital/python/digital/gmsk.py @@ -136,11 +136,11 @@ class gmsk_mod(gr.hier_block2): def _setup_logging(self): print "Modulation logging turned on." self.connect(self.nrz, - gr.file_sink(gr.sizeof_float, "nrz.dat")) + blocks.file_sink(gr.sizeof_float, "nrz.dat")) self.connect(self.gaussian_filter, - gr.file_sink(gr.sizeof_float, "gaussian_filter.dat")) + blocks.file_sink(gr.sizeof_float, "gaussian_filter.dat")) self.connect(self.fmmod, - gr.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) + blocks.file_sink(gr.sizeof_gr_complex, "fmmod.dat")) def add_options(parser): @@ -256,11 +256,11 @@ class gmsk_demod(gr.hier_block2): def _setup_logging(self): print "Demodulation logging turned on." self.connect(self.fmdemod, - gr.file_sink(gr.sizeof_float, "fmdemod.dat")) + blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) self.connect(self.clock_recovery, - gr.file_sink(gr.sizeof_float, "clock_recovery.dat")) + blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) self.connect(self.slicer, - gr.file_sink(gr.sizeof_char, "slicer.dat")) + blocks.file_sink(gr.sizeof_char, "slicer.dat")) def add_options(parser): """ diff --git a/gr-digital/python/digital/ofdm.py b/gr-digital/python/digital/ofdm.py index 5bbe111f31..fdb23703f9 100644 --- a/gr-digital/python/digital/ofdm.py +++ b/gr-digital/python/digital/ofdm.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007,2008 Free Software Foundation, Inc. +# Copyright 2006-2008,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -118,13 +118,13 @@ class ofdm_mod(gr.hier_block2): self._print_verbage() if options.log: - self.connect(self._pkt_input, gr.file_sink(gr.sizeof_gr_complex*options.fft_length, + self.connect(self._pkt_input, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length, "ofdm_mapper_c.dat")) - self.connect(self.preambles, gr.file_sink(gr.sizeof_gr_complex*options.fft_length, + self.connect(self.preambles, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length, "ofdm_preambles.dat")) - self.connect(self.ifft, gr.file_sink(gr.sizeof_gr_complex*options.fft_length, + self.connect(self.ifft, blocks.file_sink(gr.sizeof_gr_complex*options.fft_length, "ofdm_ifft_c.dat")) - self.connect(self.cp_adder, gr.file_sink(gr.sizeof_gr_complex, + self.connect(self.cp_adder, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_cp_adder_c.dat")) def send_pkt(self, payload='', eof=False): @@ -256,11 +256,11 @@ class ofdm_demod(gr.hier_block2): if options.log: self.connect(self.ofdm_demod, - gr.file_sink(gr.sizeof_gr_complex*self._occupied_tones, + blocks.file_sink(gr.sizeof_gr_complex*self._occupied_tones, "ofdm_frame_sink_c.dat")) else: self.connect(self.ofdm_demod, - gr.null_sink(gr.sizeof_gr_complex*self._occupied_tones)) + blocks.null_sink(gr.sizeof_gr_complex*self._occupied_tones)) if options.verbose: self._print_verbage() diff --git a/gr-digital/python/digital/ofdm_receiver.py b/gr-digital/python/digital/ofdm_receiver.py index f4fc5f5ba3..4fbf76251a 100644 --- a/gr-digital/python/digital/ofdm_receiver.py +++ b/gr-digital/python/digital/ofdm_receiver.py @@ -144,11 +144,11 @@ class ofdm_receiver(gr.hier_block2): self.connect((self.ofdm_frame_acq,1), (self,1)) # frame and symbol timing, and equalization if logging: - self.connect(self.chan_filt, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-chan_filt_c.dat")) - self.connect(self.fft_demod, gr.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-fft_out_c.dat")) + self.connect(self.chan_filt, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-chan_filt_c.dat")) + self.connect(self.fft_demod, blocks.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-fft_out_c.dat")) self.connect(self.ofdm_frame_acq, - gr.file_sink(gr.sizeof_gr_complex*occupied_tones, "ofdm_receiver-frame_acq_c.dat")) - self.connect((self.ofdm_frame_acq,1), gr.file_sink(1, "ofdm_receiver-found_corr_b.dat")) - self.connect(self.sampler, gr.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-sampler_c.dat")) - self.connect(self.sigmix, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-sigmix_c.dat")) - self.connect(self.nco, gr.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-nco_c.dat")) + blocks.file_sink(gr.sizeof_gr_complex*occupied_tones, "ofdm_receiver-frame_acq_c.dat")) + self.connect((self.ofdm_frame_acq,1), blocks.file_sink(1, "ofdm_receiver-found_corr_b.dat")) + self.connect(self.sampler, blocks.file_sink(gr.sizeof_gr_complex*fft_length, "ofdm_receiver-sampler_c.dat")) + self.connect(self.sigmix, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-sigmix_c.dat")) + self.connect(self.nco, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_receiver-nco_c.dat")) diff --git a/gr-digital/python/digital/ofdm_sync_fixed.py b/gr-digital/python/digital/ofdm_sync_fixed.py index 9bac789bf6..9cbd59b943 100644 --- a/gr-digital/python/digital/ofdm_sync_fixed.py +++ b/gr-digital/python/digital/ofdm_sync_fixed.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2007 Free Software Foundation, Inc. +# Copyright 2007,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -22,6 +22,7 @@ import math from gnuradio import gr +from gnuradio import blocks class ofdm_sync_fixed(gr.hier_block2): def __init__(self, fft_length, cp_length, nsymbols, freq_offset, logging=False): @@ -35,16 +36,16 @@ class ofdm_sync_fixed(gr.hier_block2): pkt_length = nsymbols*symbol_length data = (pkt_length)*[0,] data[(symbol_length)-1] = 1 - self.peak_trigger = gr.vector_source_b(data, True) + self.peak_trigger = blocks.vector_source_b(data, True) # Use a pre-defined frequency offset foffset = (pkt_length)*[math.pi*freq_offset,] - self.frequency_offset = gr.vector_source_f(foffset, True) + self.frequency_offset = blocks.vector_source_f(foffset, True) - self.connect(self, gr.null_sink(gr.sizeof_gr_complex)) + self.connect(self, blocks.null_sink(gr.sizeof_gr_complex)) self.connect(self.frequency_offset, (self,0)) self.connect(self.peak_trigger, (self,1)) if logging: - self.connect(self.peak_trigger, gr.file_sink(gr.sizeof_char, "ofdm_sync_fixed-peaks_b.dat")) + self.connect(self.peak_trigger, blocks.file_sink(gr.sizeof_char, "ofdm_sync_fixed-peaks_b.dat")) diff --git a/gr-digital/python/digital/ofdm_sync_ml.py b/gr-digital/python/digital/ofdm_sync_ml.py index 76c00f3a54..3afd647098 100644 --- a/gr-digital/python/digital/ofdm_sync_ml.py +++ b/gr-digital/python/digital/ofdm_sync_ml.py @@ -158,18 +158,18 @@ class ofdm_sync_ml(gr.hier_block2): if logging: - self.connect(self.moving_sum_filter, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-energy_f.dat")) - self.connect(self.diff, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-theta_f.dat")) - self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-epsilon_f.dat")) - self.connect(self.corrmag, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-corrmag_f.dat")) - self.connect(self.kscorr, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-kscorr_c.dat")) - self.connect(self.div, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-div_f.dat")) - self.connect(self.mul, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-mul_f.dat")) - self.connect(self.slice, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-slice_f.dat")) - self.connect(self.pk_detect, gr.file_sink(gr.sizeof_char, "ofdm_sync_ml-peaks_b.dat")) + self.connect(self.moving_sum_filter, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-energy_f.dat")) + self.connect(self.diff, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-theta_f.dat")) + self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-epsilon_f.dat")) + self.connect(self.corrmag, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-corrmag_f.dat")) + self.connect(self.kscorr, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-kscorr_c.dat")) + self.connect(self.div, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-div_f.dat")) + self.connect(self.mul, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-mul_f.dat")) + self.connect(self.slice, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-slice_f.dat")) + self.connect(self.pk_detect, blocks.file_sink(gr.sizeof_char, "ofdm_sync_ml-peaks_b.dat")) if use_dpll: - self.connect(self.dpll, gr.file_sink(gr.sizeof_char, "ofdm_sync_ml-dpll_b.dat")) + self.connect(self.dpll, blocks.file_sink(gr.sizeof_char, "ofdm_sync_ml-dpll_b.dat")) - self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_ml-sample_and_hold_f.dat")) - self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-input_c.dat")) + self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_ml-sample_and_hold_f.dat")) + self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_ml-input_c.dat")) diff --git a/gr-digital/python/digital/ofdm_sync_pn.py b/gr-digital/python/digital/ofdm_sync_pn.py index 63e85135bc..4c6a30f802 100644 --- a/gr-digital/python/digital/ofdm_sync_pn.py +++ b/gr-digital/python/digital/ofdm_sync_pn.py @@ -124,10 +124,10 @@ class ofdm_sync_pn(gr.hier_block2): self.connect(self.pk_detect, (self,1)) if logging: - self.connect(self.matched_filter, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-mf_f.dat")) - self.connect(self.normalize, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-theta_f.dat")) - self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-epsilon_f.dat")) - self.connect(self.pk_detect, gr.file_sink(gr.sizeof_char, "ofdm_sync_pn-peaks_b.dat")) - self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-sample_and_hold_f.dat")) - self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pn-input_c.dat")) + self.connect(self.matched_filter, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-mf_f.dat")) + self.connect(self.normalize, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-theta_f.dat")) + self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-epsilon_f.dat")) + self.connect(self.pk_detect, blocks.file_sink(gr.sizeof_char, "ofdm_sync_pn-peaks_b.dat")) + self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pn-sample_and_hold_f.dat")) + self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pn-input_c.dat")) diff --git a/gr-digital/python/digital/ofdm_sync_pnac.py b/gr-digital/python/digital/ofdm_sync_pnac.py index 547b1e93c0..ee7c82927a 100644 --- a/gr-digital/python/digital/ofdm_sync_pnac.py +++ b/gr-digital/python/digital/ofdm_sync_pnac.py @@ -125,11 +125,11 @@ class ofdm_sync_pnac(gr.hier_block2): self.connect(self.peaks, (self,1)) if logging: - self.connect(self.compare, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-compare_f.dat")) - self.connect(self.c2mag, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-theta_f.dat")) - self.connect(self.power, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-inputpower_f.dat")) - self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-epsilon_f.dat")) - self.connect(self.threshold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-threshold_f.dat")) - self.connect(self.peaks, gr.file_sink(gr.sizeof_char, "ofdm_sync_pnac-peaks_b.dat")) - self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pnac-sample_and_hold_f.dat")) - self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pnac-input_c.dat")) + self.connect(self.compare, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-compare_f.dat")) + self.connect(self.c2mag, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-theta_f.dat")) + self.connect(self.power, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-inputpower_f.dat")) + self.connect(self.angle, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-epsilon_f.dat")) + self.connect(self.threshold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-threshold_f.dat")) + self.connect(self.peaks, blocks.file_sink(gr.sizeof_char, "ofdm_sync_pnac-peaks_b.dat")) + self.connect(self.sample_and_hold, blocks.file_sink(gr.sizeof_float, "ofdm_sync_pnac-sample_and_hold_f.dat")) + self.connect(self.input, blocks.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pnac-input_c.dat")) diff --git a/gr-digital/python/digital/ofdm_txrx.py b/gr-digital/python/digital/ofdm_txrx.py new file mode 100644 index 0000000000..37c4086cc3 --- /dev/null +++ b/gr-digital/python/digital/ofdm_txrx.py @@ -0,0 +1,261 @@ +# +# Copyright 2005-2007,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# +""" +OFDM Transmitter / Receiver hier blocks. + +For simple configurations, no need to connect all the relevant OFDM blocks +to form an OFDM Tx/Rx--simply use these. +""" + +import numpy +from gnuradio import gr +import digital_swig as digital +from utils import tagged_streams + +try: + # This will work when feature #505 is added. + from gnuradio import fft + from gnuradio import blocks + from gnuradio import analog +except ImportError: + # Until then this will work. + import fft_swig as fft + import blocks_swig as blocks + import analog_swig as analog + +_def_fft_len = 64 +_def_cp_len = 16 +_def_frame_length_tag_key = "frame_length" +_def_packet_length_tag_key = "frame_length" +_def_packet_num_tag_key = "" +_def_occupied_carriers=(range(1, 27) + range(38, 64),) +_def_pilot_carriers=((0,),) +_def_pilot_symbols=((100,),) +_seq_seed = 42 + +def _make_sync_word(fft_len, occupied_carriers, constellation): + """ Makes a random sync sequence """ + occupied_carriers = list(occupied_carriers[0]) + occupied_carriers = [occupied_carriers[x] + fft_len if occupied_carriers[x] < 0 else occupied_carriers[x] for x in range(len(occupied_carriers))] + numpy.random.seed(_seq_seed) + sync_sequence = [constellation.map_to_points_v(numpy.random.randint(constellation.arity()))[0] * numpy.sqrt(2) if x in occupied_carriers and x % 3 else 0 for x in range(fft_len)] + return sync_sequence + +def _get_constellation(bps): + """ Returns a modulator block for a given number of bits per symbol """ + constellation = { + 1: digital.constellation_bpsk(), + 2: digital.constellation_qpsk(), + 3: digital.constellation_8psk() + } + try: + return constellation[bps] + except KeyError: + print 'Modulation not supported.' + exit(1) + +class ofdm_tx(gr.hier_block2): + """ + Hierarchical block for OFDM modulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + fft_len: The length of FFT (integer). + cp_len: The length of cyclic prefix (integer). + occupied_carriers: ?? + pilot_carriers: ?? + pilot_symbols: ?? + length_tag_key: The name of the tag giving packet length. + """ + def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len, + frame_length_tag_key=_def_frame_length_tag_key, + occupied_carriers=_def_occupied_carriers, + pilot_carriers=_def_pilot_carriers, + pilot_symbols=_def_pilot_symbols, + bps_header=1, + bps_payload=1, + sync_word1=None, + sync_word2=None, + rolloff=0 + ): + gr.hier_block2.__init__(self, "ofdm_tx", + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_gr_complex)) + self.fft_len = fft_len + self.cp_len = cp_len + self.frame_length_tag_key = frame_length_tag_key + self.occupied_carriers = occupied_carriers + self.pilot_carriers = pilot_carriers + self.pilot_symbols = pilot_symbols + self.bps_header = bps_header + self.bps_payload = bps_payload + n_sync_words = 1 + header_constellation = _get_constellation(bps_header) + header_mod = digital.chunks_to_symbols_bc(header_constellation.points()) + self.sync_word1 = sync_word1 + if sync_word1 is None: + self.sync_word1 = _make_sync_word(fft_len, occupied_carriers, header_constellation) + else: + if len(sync_word1) != self.fft_len: + raise ValueError("Length of sync sequence(s) must be FFT length.") + self.sync_words = [sync_word1,] + self.sync_word2 = () + if sync_word2 is not None: + if len(sync_word2) != fft_len: + raise ValueError("Length of sync sequence(s) must be FFT length.") + self.sync_word2 = sync_word2 + n_sync_words = 2 + self.sync_words.append(self.sync_word2) + crc = digital.crc32_bb(False, self.frame_length_tag_key) + formatter_object = digital.packet_header_ofdm( + occupied_carriers=occupied_carriers, n_syms=1, + bits_per_sym=self.bps_header + ) + header_gen = digital.packet_headergenerator_bb(formatter_object.base(), self.frame_length_tag_key) + header_payload_mux = blocks.tagged_stream_mux(gr.sizeof_gr_complex*1, self.frame_length_tag_key) + self.connect(self, crc, header_gen, header_mod, (header_payload_mux, 0)) + payload_constellation = _get_constellation(bps_payload) + payload_mod = digital.chunks_to_symbols_bc(payload_constellation.points()) + self.connect( + crc, + blocks.repack_bits_bb(8, bps_payload, frame_length_tag_key), + payload_mod, + (header_payload_mux, 1) + ) + allocator = digital.ofdm_carrier_allocator_cvc( + self.fft_len, + occupied_carriers=self.occupied_carriers, + pilot_carriers=self.pilot_carriers, + pilot_symbols=self.pilot_symbols, + sync_words=self.sync_words, + len_tag_key=self.frame_length_tag_key + ) + ffter = fft.fft_vcc(self.fft_len, False, (), True) + cyclic_prefixer = digital.ofdm_cyclic_prefixer( + self.fft_len, + self.fft_len+self.cp_len, + rolloff, + self.frame_length_tag_key + ) + self.connect(header_payload_mux, allocator, ffter, cyclic_prefixer, self) + + +class ofdm_rx(gr.hier_block2): + """ + Hierarchical block for OFDM demodulation. + + The input is a byte stream (unsigned char) and the + output is the complex modulated signal at baseband. + + Args: + fft_len: The length of FFT (integer). + cp_len: The length of cyclic prefix (integer). + occupied_carriers: ?? + pilot_carriers: ?? + pilot_symbols: ?? + length_tag_key: The name of the tag giving packet length. + """ + def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len, + frame_length_tag_key=_def_frame_length_tag_key, + packet_length_tag_key=_def_packet_length_tag_key, + packet_num_tag_key=_def_packet_num_tag_key, + occupied_carriers=_def_occupied_carriers, + pilot_carriers=_def_pilot_carriers, + pilot_symbols=_def_pilot_symbols, + bps_header=1, + bps_payload=1, + sync_word1=None, + sync_word2=None + ): + gr.hier_block2.__init__(self, "ofdm_rx", + gr.io_signature(1, 1, gr.sizeof_gr_complex), + gr.io_signature(1, 1, gr.sizeof_char)) + self.fft_len = fft_len + self.cp_len = cp_len + self.frame_length_tag_key = frame_length_tag_key + self.packet_length_tag_key = packet_length_tag_key + self.occupied_carriers = occupied_carriers + self.bps_header = bps_header + self.bps_payload = bps_payload + n_sync_words = 1 + header_constellation = _get_constellation(bps_header) + if sync_word1 is None: + self.sync_word1 = _make_sync_word(fft_len, occupied_carriers, header_constellation) + else: + if len(sync_word1) != self.fft_len: + raise ValueError("Length of sync sequence(s) must be FFT length.") + self.sync_word1 = sync_word1 + self.sync_word2 = () + if sync_word2 is not None: + if len(sync_word2) != fft_len: + raise ValueError("Length of sync sequence(s) must be FFT length.") + self.sync_word2 = sync_word2 + n_sync_words = 2 + else: + sync_word2 = () + # Receiver path + sync_detect = digital.ofdm_sync_sc_cfb(fft_len, cp_len) + oscillator = analog.frequency_modulator_fc(-2.0 / fft_len) + delay = gr.delay(gr.sizeof_gr_complex, fft_len+cp_len) + mixer = gr.multiply_cc() + hpd = digital.header_payload_demux(n_sync_words, fft_len, cp_len, + frame_length_tag_key, "", True) + self.connect(self, sync_detect) + self.connect((sync_detect, 0), oscillator, (mixer, 0)) + self.connect(self, delay, (mixer, 1)) + self.connect(mixer, (hpd, 0)) + self.connect((sync_detect, 1), (hpd, 1)) + # Header demodulation + header_fft = fft.fft_vcc(self.fft_len, True, (), True) + chanest = digital.ofdm_chanest_vcvc(self.sync_word1, self.sync_word2, 1) + header_equalizer = digital.ofdm_equalizer_simpledfe( + fft_len, header_constellation.base(), + occupied_carriers, pilot_carriers, pilot_symbols + ) + header_eq = digital.ofdm_frame_equalizer_vcvc(header_equalizer.base(), frame_length_tag_key, True) + header_serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers) + header_constellation = _get_constellation(bps_header) + header_demod = digital.constellation_decoder_cb(header_constellation.base()) + header_formatter = digital.packet_header_ofdm( + occupied_carriers, 1, + packet_length_tag_key, + frame_length_tag_key, + packet_num_tag_key, + bps_header + ) + header_parser = digital.packet_headerparser_b(header_formatter.formatter()) + self.connect((hpd, 0), header_fft, chanest, header_eq, header_serializer, header_demod, header_parser) + self.msg_connect(header_parser, "header_data", hpd, "header_data") + # Payload demodulation + payload_fft = fft.fft_vcc(self.fft_len, True, (), True) + payload_equalizer = digital.ofdm_equalizer_simpledfe( + fft_len, header_constellation.base(), + occupied_carriers, pilot_carriers, pilot_symbols, 1 + ) + payload_eq = digital.ofdm_frame_equalizer_vcvc(payload_equalizer.base(), frame_length_tag_key) + payload_serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers) + payload_constellation = _get_constellation(bps_payload) + payload_demod = digital.constellation_decoder_cb(payload_constellation.base()) + bit_packer = blocks.repack_bits_bb(bps_payload, 8, packet_length_tag_key, True) + self.connect((hpd, 1), payload_fft, payload_eq, payload_serializer, payload_demod, bit_packer, self) + diff --git a/gr-digital/python/digital/qa_binary_slicer_fb.py b/gr-digital/python/digital/qa_binary_slicer_fb.py index 162fd73f8e..93e12dbb8d 100755 --- a/gr-digital/python/digital/qa_binary_slicer_fb.py +++ b/gr-digital/python/digital/qa_binary_slicer_fb.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011,2012 Free Software Foundation, Inc. +# Copyright 2011-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,9 +20,9 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +import random -import math, random +from gnuradio import gr, gr_unittest, digital, blocks class test_binary_slicer_fb(gr_unittest.TestCase): @@ -36,9 +36,9 @@ class test_binary_slicer_fb(gr_unittest.TestCase): expected_result = ( 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1) src_data = (-1, 1, -1, -1, 1, 1, -1, -1, -1, 1, 1, 1, -1, 1, 1, 1, 1) src_data = [s + (1 - random.random()) for s in src_data] # add some noise - src = gr.vector_source_f(src_data) + src = blocks.vector_source_f(src_data) op = digital.binary_slicer_fb() - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op) self.tb.connect(op, dst) diff --git a/gr-digital/python/digital/qa_chunks_to_symbols.py b/gr-digital/python/digital/qa_chunks_to_symbols.py index aab5a5addd..25798f33e5 100755 --- a/gr-digital/python/digital/qa_chunks_to_symbols.py +++ b/gr-digital/python/digital/qa_chunks_to_symbols.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2012 Free Software Foundation, Inc. +# Copyright 2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_chunks_to_symbols(gr_unittest.TestCase): @@ -37,10 +37,10 @@ class test_chunks_to_symbols(gr_unittest.TestCase): expected_result = (1+0j, 0+1j, -1+0j, 0-1j, 0-1j, -1+0j, 0+1j, 1+0j) - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.chunks_to_symbols_bc(const) - dst = gr.vector_sink_c() + dst = blocks.vector_sink_c() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() @@ -54,10 +54,10 @@ class test_chunks_to_symbols(gr_unittest.TestCase): expected_result = (-3, -1, 1, 3, 3, 1, -1, -3) - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.chunks_to_symbols_bf(const) - dst = gr.vector_sink_f() + dst = blocks.vector_sink_f() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() @@ -72,10 +72,10 @@ class test_chunks_to_symbols(gr_unittest.TestCase): expected_result = (1+0j, 0+1j, -1+0j, 0-1j, 0-1j, -1+0j, 0+1j, 1+0j) - src = gr.vector_source_i(src_data) + src = blocks.vector_source_i(src_data) op = digital.chunks_to_symbols_ic(const) - dst = gr.vector_sink_c() + dst = blocks.vector_sink_c() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() @@ -89,10 +89,10 @@ class test_chunks_to_symbols(gr_unittest.TestCase): expected_result = (-3, -1, 1, 3, 3, 1, -1, -3) - src = gr.vector_source_i(src_data) + src = blocks.vector_source_i(src_data) op = digital.chunks_to_symbols_if(const) - dst = gr.vector_sink_f() + dst = blocks.vector_sink_f() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() @@ -107,10 +107,10 @@ class test_chunks_to_symbols(gr_unittest.TestCase): expected_result = (1+0j, 0+1j, -1+0j, 0-1j, 0-1j, -1+0j, 0+1j, 1+0j) - src = gr.vector_source_s(src_data) + src = blocks.vector_source_s(src_data) op = digital.chunks_to_symbols_sc(const) - dst = gr.vector_sink_c() + dst = blocks.vector_sink_c() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() @@ -124,10 +124,10 @@ class test_chunks_to_symbols(gr_unittest.TestCase): expected_result = (-3, -1, 1, 3, 3, 1, -1, -3) - src = gr.vector_source_s(src_data) + src = blocks.vector_source_s(src_data) op = digital.chunks_to_symbols_sf(const) - dst = gr.vector_sink_f() + dst = blocks.vector_sink_f() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() diff --git a/gr-digital/python/digital/qa_clock_recovery_mm.py b/gr-digital/python/digital/qa_clock_recovery_mm.py index b9e9121c32..783770d6e9 100755 --- a/gr-digital/python/digital/qa_clock_recovery_mm.py +++ b/gr-digital/python/digital/qa_clock_recovery_mm.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011,2012 Free Software Foundation, Inc. +# Copyright 2011-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -23,7 +23,7 @@ import random import cmath -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_clock_recovery_mm(gr_unittest.TestCase): @@ -46,8 +46,8 @@ class test_clock_recovery_mm(gr_unittest.TestCase): omega_rel_lim) data = 100*[complex(1, 1),] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.test, self.snk) self.tb.run() @@ -81,8 +81,8 @@ class test_clock_recovery_mm(gr_unittest.TestCase): omega_rel_lim) data = 100*[1,] - self.src = gr.vector_source_f(data, False) - self.snk = gr.vector_sink_f() + self.src = blocks.vector_source_f(data, False) + self.snk = blocks.vector_sink_f() self.tb.connect(self.src, self.test, self.snk) self.tb.run() @@ -116,8 +116,8 @@ class test_clock_recovery_mm(gr_unittest.TestCase): omega_rel_lim) data = 1000*[complex(1, 1), complex(1, 1), complex(-1, -1), complex(-1, -1)] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.test, self.snk) self.tb.run() @@ -151,8 +151,8 @@ class test_clock_recovery_mm(gr_unittest.TestCase): omega_rel_lim) data = 1000*[1, 1, -1, -1] - self.src = gr.vector_source_f(data, False) - self.snk = gr.vector_sink_f() + self.src = blocks.vector_source_f(data, False) + self.snk = blocks.vector_sink_f() self.tb.connect(self.src, self.test, self.snk) self.tb.run() diff --git a/gr-digital/python/digital/qa_cma_equalizer.py b/gr-digital/python/digital/qa_cma_equalizer.py index 7335c0f918..6da391f70c 100755 --- a/gr-digital/python/digital/qa_cma_equalizer.py +++ b/gr-digital/python/digital/qa_cma_equalizer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007,2010,2011 Free Software Foundation, Inc. +# Copyright 2006,2007,2010,2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_cma_equalizer_fir(gr_unittest.TestCase): @@ -31,9 +31,9 @@ class test_cma_equalizer_fir(gr_unittest.TestCase): self.tb = None def transform(self, src_data): - SRC = gr.vector_source_c(src_data, False) + SRC = blocks.vector_source_c(src_data, False) EQU = digital.cma_equalizer_cc(4, 1.0, .001, 1) - DST = gr.vector_sink_c() + DST = blocks.vector_sink_c() self.tb.connect(SRC, EQU, DST) self.tb.run() return DST.data() diff --git a/gr-digital/python/digital/qa_constellation.py b/gr-digital/python/digital/qa_constellation.py index 709debb2b9..9e7e691d5c 100755 --- a/gr-digital/python/digital/qa_constellation.py +++ b/gr-digital/python/digital/qa_constellation.py @@ -174,9 +174,9 @@ class test_constellation(gr_unittest.TestCase): else: rotations = [None] for rotation in rotations: - src = gr.vector_source_b(self.src_data) + src = blocks.vector_source_b(self.src_data) content = mod_demod(constellation, differential, rotation) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb = gr.top_block() self.tb.connect(src, content, dst) self.tb.run() diff --git a/gr-digital/python/digital/qa_constellation_decoder_cb.py b/gr-digital/python/digital/qa_constellation_decoder_cb.py index 1c910feacb..d3fbce91ba 100755 --- a/gr-digital/python/digital/qa_constellation_decoder_cb.py +++ b/gr-digital/python/digital/qa_constellation_decoder_cb.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2004,2007,2010-2012 Free Software Foundation, Inc. +# Copyright 2004,2007,2010-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_constellation_decoder(gr_unittest.TestCase): @@ -36,9 +36,9 @@ class test_constellation_decoder(gr_unittest.TestCase): 0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j) expected_result = ( 1, 1, 0, 0, 1, 0, 1) - src = gr.vector_source_c(src_data) + src = blocks.vector_source_c(src_data) op = digital.constellation_decoder_cb(cnst.base()) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op) self.tb.connect(op, dst) @@ -55,9 +55,9 @@ class test_constellation_decoder(gr_unittest.TestCase): 0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j) expected_result = ( 3, 1, 0, 2, 3, 2, 1) - src = gr.vector_source_c(src_data) + src = blocks.vector_source_c(src_data) op = digital_swig.constellation_decoder_cb(cnst.base()) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op) self.tb.connect(op, dst) diff --git a/gr-digital/python/digital/qa_constellation_receiver.py b/gr-digital/python/digital/qa_constellation_receiver.py index 914e347d88..e595b585ac 100755 --- a/gr-digital/python/digital/qa_constellation_receiver.py +++ b/gr-digital/python/digital/qa_constellation_receiver.py @@ -152,7 +152,7 @@ class rec_test_tb(gr.top_block): else: self.src_data = src_data packer = blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST) - src = gr.vector_source_b(self.src_data) + src = blocks.vector_source_b(self.src_data) mod = generic_mod(constellation, differential=differential) # Channel if freq_offset: @@ -167,7 +167,7 @@ class rec_test_tb(gr.top_block): else: demod = generic_demod(constellation, differential=differential, freq_bw=0, phase_bw=0) - self.dst = gr.vector_sink_b() + self.dst = blocks.vector_sink_b() self.connect(src, packer, mod, channel, demod, self.dst) if __name__ == '__main__': diff --git a/gr-digital/python/digital/qa_correlate_access_code.py b/gr-digital/python/digital/qa_correlate_access_code.py index 849ca474a9..198a254da7 100755 --- a/gr-digital/python/digital/qa_correlate_access_code.py +++ b/gr-digital/python/digital/qa_correlate_access_code.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007,2010,2011 Free Software Foundation, Inc. +# Copyright 2006,2007,2010,2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,9 +20,7 @@ # Boston, MA 02110-1301, USA. # -import math - -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks default_access_code = '\xAC\xDD\xA4\xE2\xF2\x8C\x20\xFC' @@ -52,9 +50,9 @@ class test_correlate_access_code(gr_unittest.TestCase): # 0 0 0 1 0 0 0 1 src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 expected_result = pad + (1, 0, 1, 1, 3, 1, 0, 1, 1, 2) + (0,) * 6 - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.correlate_access_code_bb("1011", 0) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) self.tb.run() result_data = dst.data() @@ -69,9 +67,9 @@ class test_correlate_access_code(gr_unittest.TestCase): #print access_code src_data = code + (1, 0, 1, 1) + pad expected_result = pad + code + (3, 0, 1, 1) - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.correlate_access_code_bb(access_code, 0) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) self.tb.run() result_data = dst.data() @@ -85,9 +83,9 @@ class test_correlate_access_code(gr_unittest.TestCase): #print access_code src_data = code + (1, 0, 1, 1) + pad expected_result = code + (1, 0, 1, 1) + pad - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.correlate_access_code_tag_bb(access_code, 0, "test") - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) self.tb.run() result_data = dst.data() diff --git a/gr-digital/python/digital/qa_costas_loop_cc.py b/gr-digital/python/digital/qa_costas_loop_cc.py index e920a76e05..9ecb017599 100755 --- a/gr-digital/python/digital/qa_costas_loop_cc.py +++ b/gr-digital/python/digital/qa_costas_loop_cc.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -23,7 +23,7 @@ import random import cmath -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks from gnuradio.digital import psk class test_costas_loop_cc(gr_unittest.TestCase): @@ -41,8 +41,8 @@ class test_costas_loop_cc(gr_unittest.TestCase): self.test = digital.costas_loop_cc(natfreq, order) data = 100*[complex(1,0),] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.test, self.snk) self.tb.run() @@ -58,8 +58,8 @@ class test_costas_loop_cc(gr_unittest.TestCase): self.test = digital.costas_loop_cc(natfreq, order) data = [complex(2*random.randint(0,1)-1, 0) for i in xrange(100)] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.test, self.snk) self.tb.run() @@ -82,8 +82,8 @@ class test_costas_loop_cc(gr_unittest.TestCase): expected_result = data[N:] data = [rot*d for d in data] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.test, self.snk) self.tb.run() @@ -108,8 +108,8 @@ class test_costas_loop_cc(gr_unittest.TestCase): expected_result = data[N:] data = [rot*d for d in data] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.test, self.snk) self.tb.run() @@ -137,8 +137,8 @@ class test_costas_loop_cc(gr_unittest.TestCase): rot = cmath.exp(0.1j) # some small rotation data = [rot*d for d in data] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.test, self.snk) self.tb.run() diff --git a/gr-digital/python/digital/qa_cpm.py b/gr-digital/python/digital/qa_cpm.py index 5f7253f5f8..6468ed507b 100755 --- a/gr-digital/python/digital/qa_cpm.py +++ b/gr-digital/python/digital/qa_cpm.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2010 Free Software Foundation, Inc. +# Copyright 2010,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -36,10 +36,10 @@ class test_cpm(gr_unittest.TestCase): sps = 2 L = 1 in_bits = (1,) * 20 - src = gr.vector_source_b(in_bits, False) + src = blocks.vector_source_b(in_bits, False) cpm = digital.cpmmod_bc(type, 0.5, sps, L) arg = blocks.complex_to_arg() - sink = gr.vector_sink_f() + sink = blocks.vector_sink_f() self.tb.connect(src, cpm, arg, sink) self.tb.run() @@ -67,10 +67,10 @@ class test_cpm(gr_unittest.TestCase): L = 5 bt = 0.3 in_bits = (1,) * 20 - src = gr.vector_source_b(in_bits, False) + src = blocks.vector_source_b(in_bits, False) gmsk = digital.gmskmod_bc(sps, L, bt) arg = blocks.complex_to_arg() - sink = gr.vector_sink_f() + sink = blocks.vector_sink_f() self.tb.connect(src, gmsk, arg, sink) self.tb.run() diff --git a/gr-digital/python/digital/qa_crc32_bb.py b/gr-digital/python/digital/qa_crc32_bb.py new file mode 100755 index 0000000000..397834ffde --- /dev/null +++ b/gr-digital/python/digital/qa_crc32_bb.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks, digital +import pmt + +class qa_crc32_bb (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_crc_len (self): + """ Make sure the output of a CRC set is 4 bytes longer than the input. """ + data = range(16) + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(data)) + src = blocks.vector_source_b(data, False, 1, (tag,)) + crc = digital.crc32_bb(False, tag_name) + sink = blocks.vector_sink_b() + self.tb.connect(src, crc, sink) + self.tb.run() + # Check that the packets before crc_check are 4 bytes longer that the input. + self.assertEqual(len(data)+4, len(sink.data())) + + def test_002_crc_equal (self): + """ Go through CRC set / CRC check and make sure the output + is the same as the input. """ + data = (0, 1, 2, 3, 4, 5, 6, 7, 8) + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(data)) + src = blocks.vector_source_b(data, False, 1, (tag,)) + crc = digital.crc32_bb(False, tag_name) + crc_check = digital.crc32_bb(True, tag_name) + sink = blocks.vector_sink_b() + self.tb.connect(src, crc, crc_check, sink) + self.tb.run() + # Check that the packets after crc_check are the same as input. + self.assertEqual(data, sink.data()) + + def test_003_crc_correct_lentag (self): + tag_name = "length" + pack_len = 8 + packets = range(pack_len*2) + tag1 = gr.gr_tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol(tag_name) + tag1.value = pmt.from_long(pack_len) + tag2 = gr.gr_tag_t() + tag2.offset = pack_len + tag2.key = pmt.string_to_symbol(tag_name) + tag2.value = pmt.from_long(pack_len) + testtag1 = gr.gr_tag_t() + testtag1.offset = 1 + testtag1.key = pmt.string_to_symbol("tag1") + testtag1.value = pmt.from_long(0) + testtag2 = gr.gr_tag_t() + testtag2.offset = pack_len + testtag2.key = pmt.string_to_symbol("tag2") + testtag2.value = pmt.from_long(0) + testtag3 = gr.gr_tag_t() + testtag3.offset = len(packets)-1 + testtag3.key = pmt.string_to_symbol("tag3") + testtag3.value = pmt.from_long(0) + src = blocks.vector_source_b(packets, False, 1, (tag1, tag2, testtag1, testtag2, testtag3)) + crc = digital.crc32_bb(False, tag_name) + sink = blocks.vector_sink_b() + self.tb.connect(src, crc, sink) + self.tb.run() + self.assertEqual(len(sink.data()), 2*(pack_len+4)) + correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19} + tags_found = {'tag1': False, 'tag2': False, 'tag3': False} + for tag in sink.tags(): + key = pmt.symbol_to_string(tag.key) + if key in correct_offsets.keys(): + tags_found[key] = True + self.assertEqual(correct_offsets[key], tag.offset) + if key == tag_name: + self.assertTrue(tag.offset == 0 or tag.offset == pack_len+4) + self.assertTrue(all(tags_found.values())) + + + def test_004_fail (self): + """ Corrupt the data and make sure it fails CRC test. """ + data = (0, 1, 2, 3, 4, 5, 6, 7) + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(data)) + src = blocks.vector_source_b(data, False, 1, (tag,)) + crc = digital.crc32_bb(False, tag_name) + crc_check = digital.crc32_bb(True, tag_name) + corruptor = blocks.add_const_bb(1) + sink = blocks.vector_sink_b() + self.tb.connect(src, crc, corruptor, crc_check, sink) + self.tb.run() + # crc_check will drop invalid packets + self.assertEqual(len(sink.data()), 0) + + def test_005_tag_propagation (self): + """ Make sure tags on the CRC aren't lost. """ + data = (0, 1, 2, 3, 4, 5, 6, 7, 8, 230, 166, 39, 8) + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(data)) + testtag = gr.gr_tag_t() + testtag.offset = len(data)-1 + testtag.key = pmt.string_to_symbol('tag1') + testtag.value = pmt.from_long(0) + src = blocks.vector_source_b(data, False, 1, (tag, testtag)) + crc_check = digital.crc32_bb(True, tag_name) + sink = blocks.vector_sink_b() + self.tb.connect(src, crc_check, sink) + self.tb.run() + self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1']) + +if __name__ == '__main__': + gr_unittest.run(qa_crc32_bb, "qa_crc32_bb.xml") diff --git a/gr-digital/python/digital/qa_diff_encoder.py b/gr-digital/python/digital/qa_diff_encoder.py index 34249760c5..410b937fbc 100755 --- a/gr-digital/python/digital/qa_diff_encoder.py +++ b/gr-digital/python/digital/qa_diff_encoder.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007,2010,2012 Free Software Foundation, Inc. +# Copyright 2006,2007,2010,2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,10 +20,9 @@ # Boston, MA 02110-1301, USA. # -import math import random -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks def make_random_int_tuple(L, min, max): result = [] @@ -45,10 +44,10 @@ class test_diff_encoder(gr_unittest.TestCase): modulus = 2 src_data = make_random_int_tuple(1000, 0, modulus-1) expected_result = src_data - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) enc = digital.diff_encoder_bb(modulus) dec = digital.diff_decoder_bb(modulus) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, enc, dec, dst) self.tb.run() # run the graph and wait for it to finish actual_result = dst.data() # fetch the contents of the sink @@ -59,10 +58,10 @@ class test_diff_encoder(gr_unittest.TestCase): modulus = 4 src_data = make_random_int_tuple(1000, 0, modulus-1) expected_result = src_data - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) enc = digital.diff_encoder_bb(modulus) dec = digital.diff_decoder_bb(modulus) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, enc, dec, dst) self.tb.run() # run the graph and wait for it to finish actual_result = dst.data() # fetch the contents of the sink @@ -73,10 +72,10 @@ class test_diff_encoder(gr_unittest.TestCase): modulus = 8 src_data = make_random_int_tuple(40000, 0, modulus-1) expected_result = src_data - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) enc = digital.diff_encoder_bb(modulus) dec = digital.diff_decoder_bb(modulus) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, enc, dec, dst) self.tb.run() # run the graph and wait for it to finish actual_result = dst.data() # fetch the contents of the sink diff --git a/gr-digital/python/digital/qa_diff_phasor_cc.py b/gr-digital/python/digital/qa_diff_phasor_cc.py index 84e151a630..7cae4870cc 100755 --- a/gr-digital/python/digital/qa_diff_phasor_cc.py +++ b/gr-digital/python/digital/qa_diff_phasor_cc.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2004,2007,2010 Free Software Foundation, Inc. +# Copyright 2004,2007,2010,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,9 +20,7 @@ # Boston, MA 02110-1301, USA. # -import math - -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_diff_phasor(gr_unittest.TestCase): @@ -35,9 +33,9 @@ class test_diff_phasor(gr_unittest.TestCase): def test_diff_phasor_cc(self): src_data = (0+0j, 1+0j, -1+0j, 3+4j, -3-4j, -3+4j) expected_result = (0+0j, 0+0j, -1+0j, -3-4j, -25+0j, -7-24j) - src = gr.vector_source_c(src_data) + src = blocks.vector_source_c(src_data) op = digital.diff_phasor_cc() - dst = gr.vector_sink_c() + dst = blocks.vector_sink_c() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() # run the graph and wait for it to finish diff --git a/gr-digital/python/digital/qa_fll_band_edge.py b/gr-digital/python/digital/qa_fll_band_edge.py index 02ded4e20c..17c5fa85f8 100755 --- a/gr-digital/python/digital/qa_fll_band_edge.py +++ b/gr-digital/python/digital/qa_fll_band_edge.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011,2012 Free Software Foundation, Inc. +# Copyright 2011-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -49,7 +49,7 @@ class test_fll_band_edge_cc(gr_unittest.TestCase): # Create a set of 1's and -1's, pulse shape and interpolate to sps random.seed(0) data = [2.0*random.randint(0, 2) - 1.0 for i in xrange(200)] - self.src = gr.vector_source_c(data, False) + self.src = blocks.vector_source_c(data, False) self.rrc = filter.interp_fir_filter_ccf(sps, rrc_taps) # Mix symbols with a complex sinusoid to spin them @@ -61,10 +61,10 @@ class test_fll_band_edge_cc(gr_unittest.TestCase): # Create sinks for all outputs of the FLL # we will only care about the freq and error outputs - self.vsnk_frq = gr.vector_sink_f() - self.nsnk_fll = gr.null_sink(gr.sizeof_gr_complex) - self.nsnk_phs = gr.null_sink(gr.sizeof_float) - self.nsnk_err = gr.null_sink(gr.sizeof_float) + self.vsnk_frq = blocks.vector_sink_f() + self.nsnk_fll = blocks.null_sink(gr.sizeof_gr_complex) + self.nsnk_phs = blocks.null_sink(gr.sizeof_float) + self.nsnk_err = blocks.null_sink(gr.sizeof_float) # Connect the blocks self.tb.connect(self.nco, (self.mix,1)) diff --git a/gr-digital/python/digital/qa_framer_sink.py b/gr-digital/python/digital/qa_framer_sink.py index d5c4df1f1e..4b260c14ec 100755 --- a/gr-digital/python/digital/qa_framer_sink.py +++ b/gr-digital/python/digital/qa_framer_sink.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2012 Free Software Foundation, Inc. +# Copyright 2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks default_access_code = '\xAC\xDD\xA4\xE2\xF2\x8C\x20\xFC' @@ -55,10 +55,10 @@ class test_framker_sink(gr_unittest.TestCase): rcvd_pktq = gr.msg_queue() - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) correlator = digital.correlate_access_code_bb(access_code, 0) framer_sink = digital.framer_sink_1(rcvd_pktq) - vsnk = gr.vector_sink_b() + vsnk = blocks.vector_sink_b() self.tb.connect(src, correlator, framer_sink) self.tb.connect(correlator, vsnk) @@ -79,10 +79,10 @@ class test_framker_sink(gr_unittest.TestCase): rcvd_pktq = gr.msg_queue() - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) correlator = digital.correlate_access_code_bb(access_code, 0) framer_sink = digital.framer_sink_1(rcvd_pktq) - vsnk = gr.vector_sink_b() + vsnk = blocks.vector_sink_b() self.tb.connect(src, correlator, framer_sink) self.tb.connect(correlator, vsnk) diff --git a/gr-digital/python/digital/qa_glfsr_source.py b/gr-digital/python/digital/qa_glfsr_source.py index 155a91c4a0..f39c408198 100755 --- a/gr-digital/python/digital/qa_glfsr_source.py +++ b/gr-digital/python/digital/qa_glfsr_source.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2007,2010,2012 Free Software Foundation, Inc. +# Copyright 2007,2010,2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_glfsr_source(gr_unittest.TestCase): @@ -45,7 +45,7 @@ class test_glfsr_source(gr_unittest.TestCase): for degree in range(1,11): # Higher degrees take too long to correlate src = digital.glfsr_source_b(degree, False) b2f = digital.chunks_to_symbols_bf((-1.0,1.0), 1) - dst = gr.vector_sink_f() + dst = blocks.vector_sink_f() del self.tb # Discard existing top block self.tb = gr.top_block() self.tb.connect(src, b2f, dst) @@ -70,7 +70,7 @@ class test_glfsr_source(gr_unittest.TestCase): def test_005_correlation_f(self): for degree in range(1,11): # Higher degrees take too long to correlate src = digital.glfsr_source_f(degree, False) - dst = gr.vector_sink_f() + dst = blocks.vector_sink_f() del self.tb # Discard existing top block self.tb = gr.top_block() self.tb.connect(src, dst) diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py new file mode 100755 index 0000000000..e0ade4e5fa --- /dev/null +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import time + +from gnuradio import gr, gr_unittest, digital, blocks +import pmt + +class qa_header_payload_demux (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + """ Simplest possible test: put in zeros, then header, + then payload, trigger signal, try to demux. + The return signal from the header parser is faked via _post() + """ + n_zeros = 100 + header = (1, 2, 3) + payload = tuple(range(17)) + data_signal = (0,) * n_zeros + header + payload + trigger_signal = [0,] * len(data_signal) + trigger_signal[n_zeros] = 1 + + data_src = blocks.vector_source_f(data_signal, False) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float + ) + self.assertEqual(pmt.length(hpd.message_ports_in()), 1) + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + + self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), header_sink) + self.tb.connect((hpd, 1), payload_sink) + self.tb.start() + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.from_long(len(payload)) + ) + while len(payload_sink.data()) < len(payload): + time.sleep(.2) + self.tb.stop() + self.tb.wait() + + self.assertEqual(header_sink.data(), header) + self.assertEqual(payload_sink.data(), payload) + + +if __name__ == '__main__': + gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml") + diff --git a/gr-digital/python/digital/qa_lms_equalizer.py b/gr-digital/python/digital/qa_lms_equalizer.py index f610eeaaae..7768c1f078 100755 --- a/gr-digital/python/digital/qa_lms_equalizer.py +++ b/gr-digital/python/digital/qa_lms_equalizer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007,2010,2011 Free Software Foundation, Inc. +# Copyright 2006,2007,2010,2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_lms_dd_equalizer(gr_unittest.TestCase): @@ -31,9 +31,9 @@ class test_lms_dd_equalizer(gr_unittest.TestCase): self.tb = None def transform(self, src_data, gain, const): - SRC = gr.vector_source_c(src_data, False) + SRC = blocks.vector_source_c(src_data, False) EQU = digital.lms_dd_equalizer_cc(4, gain, 1, const.base()) - DST = gr.vector_sink_c() + DST = blocks.vector_sink_c() self.tb.connect(SRC, EQU, DST) self.tb.run() return DST.data() diff --git a/gr-digital/python/digital/qa_map.py b/gr-digital/python/digital/qa_map.py index df906d5e08..604fa084d9 100755 --- a/gr-digital/python/digital/qa_map.py +++ b/gr-digital/python/digital/qa_map.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2012 Free Software Foundation, Inc. +# Copyright 2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_map(gr_unittest.TestCase): @@ -33,9 +33,9 @@ class test_map(gr_unittest.TestCase): def helper(self, symbols): src_data = [0, 1, 2, 3, 0, 1, 2, 3] expected_data = map(lambda x: symbols[x], src_data) - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.map_bb(symbols) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) self.tb.run() diff --git a/gr-digital/python/digital/qa_mpsk_receiver.py b/gr-digital/python/digital/qa_mpsk_receiver.py index d1ae81cef5..1379b52e61 100755 --- a/gr-digital/python/digital/qa_mpsk_receiver.py +++ b/gr-digital/python/digital/qa_mpsk_receiver.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011,2012 Free Software Foundation, Inc. +# Copyright 2011-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -24,7 +24,7 @@ import random import cmath import time -from gnuradio import gr, gr_unittest, digital, filter +from gnuradio import gr, gr_unittest, digital, filter, blocks class test_mpsk_receiver(gr_unittest.TestCase): @@ -54,8 +54,8 @@ class test_mpsk_receiver(gr_unittest.TestCase): data = 10000*[complex(1,0), complex(-1,0)] #data = [2*random.randint(0,1)-1 for x in xrange(10000)] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() # pulse shaping interpolation filter nfilts = 32 @@ -110,8 +110,8 @@ class test_mpsk_receiver(gr_unittest.TestCase): complex(-0.707, -0.707), complex( 0.707, -0.707)] data = [0.5*d for d in data] - self.src = gr.vector_source_c(data, False) - self.snk = gr.vector_sink_c() + self.src = blocks.vector_source_c(data, False) + self.snk = blocks.vector_sink_c() # pulse shaping interpolation filter nfilts = 32 diff --git a/gr-digital/python/digital/qa_mpsk_snr_est.py b/gr-digital/python/digital/qa_mpsk_snr_est.py index 7991f26a5b..cb3e954141 100755 --- a/gr-digital/python/digital/qa_mpsk_snr_est.py +++ b/gr-digital/python/digital/qa_mpsk_snr_est.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011,2012 Free Software Foundation, Inc. +# Copyright 2011-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -21,7 +21,7 @@ # import random -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks def get_cplx(): return complex(2*random.randint(0,1) - 1, 0) @@ -45,8 +45,8 @@ class test_mpsk_snr_est(gr_unittest.TestCase): for i in xrange(1,6): src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)] - src = gr.vector_source_c(src_data) - dst = gr.null_sink(gr.sizeof_gr_complex) + src = blocks.vector_source_c(src_data) + dst = blocks.null_sink(gr.sizeof_gr_complex) tb = gr.top_block() tb.connect(src, op) @@ -103,7 +103,7 @@ class test_mpsk_snr_est(gr_unittest.TestCase): for i in xrange(1,6): src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)] - src = gr.vector_source_c(src_data) + src = blocks.vector_source_c(src_data) N = 10000 alpha = 0.001 diff --git a/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py b/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py new file mode 100755 index 0000000000..56cd7a617e --- /dev/null +++ b/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, digital, blocks +import pmt + +class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + """ + pretty simple (the carrier allocation is not a practical OFDM configuration!) + """ + fft_len = 6 + tx_symbols = (1, 2, 3) + # ^ this gets mapped to the DC carrier because occupied_carriers[0][0] == 0 + pilot_symbols = ((1j,),) + occupied_carriers = ((0, 1, 2),) + pilot_carriers = ((3,),) + sync_word = (range(fft_len),) + expected_result = tuple(sync_word[0] + [1j, 0, 0, 1, 2, 3]) + # ^ DC carrier + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(tx_symbols)) + src = blocks.vector_source_c(tx_symbols, False, 1, (tag,)) + alloc = digital.ofdm_carrier_allocator_cvc(fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, sync_word, + tag_name) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, alloc, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result) + + def test_001_t2 (self): + """ + pretty simple (same as before, but odd fft len) + """ + fft_len = 5 + tx_symbols = (1, 2, 3) + # ^ this gets mapped to the DC carrier because occupied_carriers[0][0] == 0 + occupied_carriers = ((0, 1, 2),) + pilot_carriers = ((-2,),) + pilot_symbols = ((1j,),) + expected_result = (1j, 0, 1, 2, 3) + # ^ DC carrier + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(tx_symbols)) + src = blocks.vector_source_c(tx_symbols, False, 1, (tag,)) + alloc = digital.ofdm_carrier_allocator_cvc(fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, (), + tag_name) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, alloc, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result) + + def test_002_t (self): + """ + same, but using negative carrier indices + """ + fft_len = 6 + tx_symbols = (1, 2, 3) + pilot_symbols = ((1j,),) + occupied_carriers = ((-1, 1, 2),) + pilot_carriers = ((3,),) + expected_result = (1j, 0, 1, 0, 2, 3) + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(tx_symbols)) + src = blocks.vector_source_c(tx_symbols, False, 1, (tag,)) + alloc = digital.ofdm_carrier_allocator_cvc(fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, (), + tag_name) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, alloc, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result) + + def test_003_t (self): + """ + more advanced: + - 6 symbols per carrier + - 2 pilots per carrier + - have enough data for nearly 3 OFDM symbols + - send that twice + - add some random tags + - don't shift + """ + tx_symbols = range(1, 16); # 15 symbols + pilot_symbols = ((1j, 2j), (3j, 4j)) + occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) + pilot_carriers = ((2, 13), (3, 12)) + expected_result = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0, + 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0, + 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0) + fft_len = 16 + tag_name = "len" + tag1 = gr.gr_tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol(tag_name) + tag1.value = pmt.from_long(len(tx_symbols)) + tag2 = gr.gr_tag_t() + tag2.offset = len(tx_symbols) + tag2.key = pmt.string_to_symbol(tag_name) + tag2.value = pmt.from_long(len(tx_symbols)) + testtag1 = gr.gr_tag_t() + testtag1.offset = 0 + testtag1.key = pmt.string_to_symbol('tag1') + testtag1.value = pmt.from_long(0) + testtag2 = gr.gr_tag_t() + testtag2.offset = 7 # On the 2nd OFDM symbol + testtag2.key = pmt.string_to_symbol('tag2') + testtag2.value = pmt.from_long(0) + testtag3 = gr.gr_tag_t() + testtag3.offset = len(tx_symbols)+1 # First OFDM symbol of packet 2 + testtag3.key = pmt.string_to_symbol('tag3') + testtag3.value = pmt.from_long(0) + testtag4 = gr.gr_tag_t() + testtag4.offset = 2*len(tx_symbols)-1 # Last OFDM symbol of packet 2 + testtag4.key = pmt.string_to_symbol('tag4') + testtag4.value = pmt.from_long(0) + src = blocks.vector_source_c(tx_symbols * 2, False, 1, + (tag1, tag2, testtag1, testtag2, testtag3, testtag4)) + alloc = digital.ofdm_carrier_allocator_cvc(fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, (), + tag_name, + False) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, alloc, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result * 2) + tags_found = {'tag1': False, 'tag2': False, 'tag3': False, 'tag4': False} + correct_offsets = {'tag1': 0, 'tag2': 1, 'tag3': 3, 'tag4': 5} + for tag in sink.tags(): + key = pmt.symbol_to_string(tag.key) + if key in tags_found.keys(): + tags_found[key] = True + self.assertEqual(correct_offsets[key], tag.offset) + if key == tag_name: + self.assertTrue(tag.offset == 0 or tag.offset == 3) + self.assertTrue(pmt.to_long(tag.value) == 3) + self.assertTrue(all(tags_found.values())) + + +if __name__ == '__main__': + gr_unittest.run(qa_digital_carrier_allocator_cvc, "qa_digital_carrier_allocator_cvc.xml") + diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py new file mode 100755 index 0000000000..c9e2bacd0d --- /dev/null +++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import sys +import random + +import numpy + +from gnuradio import gr, gr_unittest, blocks, analog, digital +import pmt + +def shift_tuple(vec, N): + """ Shifts a vector by N elements. Fills up with zeros. """ + if N > 0: + return (0,) * N + tuple(vec[0:-N]) + else: + N = -N + return tuple(vec[N:]) + (0,) * N + +def rand_range(min_val, max_val): + """ Returns a random value (uniform) from the interval min_val, max_val """ + return random.random() * (max_val - min_val) + min_val + + +class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_offset_2sym (self): + """ Add a frequency offset, check if it's correctly detected. + Also add some random tags and see if they come out at the correct + position. """ + fft_len = 16 + carr_offset = -2 + sync_symbol1 = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0) + sync_symbol2 = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) + data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) + tx_data = shift_tuple(sync_symbol1, carr_offset) + \ + shift_tuple(sync_symbol2, carr_offset) + \ + shift_tuple(data_symbol, carr_offset) + tag1 = gr.gr_tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol("test_tag_1") + tag1.value = pmt.from_long(23) + tag2 = gr.gr_tag_t() + tag2.offset = 2 + tag2.key = pmt.string_to_symbol("test_tag_2") + tag2.value = pmt.from_long(42) + src = blocks.vector_source_c(tx_data, False, fft_len, (tag1, tag2)) + chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, chanest, sink) + self.tb.run() + self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol) + tags = sink.tags() + detected_tags = { + 'ofdm_sync_carr_offset': False, + 'test_tag_1': False, + 'test_tag_2': False + } + for tag in tags: + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': + carr_offset_hat = pmt.to_long(tag.value) + self.assertEqual(pmt.to_long(tag.value), carr_offset) + if pmt.symbol_to_string(tag.key) == 'test_tag_1': + self.assertEqual(tag.offset, 0) + if pmt.symbol_to_string(tag.key) == 'test_tag_2': + self.assertEqual(tag.offset, 0) + detected_tags[pmt.symbol_to_string(tag.key)] = True + self.assertTrue(all(detected_tags.values())) + + def test_002_offset_1sym (self): + """ Add a frequency offset, check if it's correctly detected. + Difference to previous test is, it only uses one synchronisation symbol. """ + fft_len = 16 + carr_offset = -2 + # This will not correct for +2 because it thinks carrier 14 is used + # (because of interpolation) + sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0) + data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) + tx_data = shift_tuple(sync_symbol, carr_offset) + \ + shift_tuple(data_symbol, carr_offset) + src = blocks.vector_source_c(tx_data, False, fft_len) + # 17 is out of bounds! + chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1, 0, 17) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, chanest, sink) + self.tb.run() + self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol) + tags = sink.tags() + for tag in tags: + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': + carr_offset_hat = pmt.to_long(tag.value) + self.assertEqual(pmt.to_long(tag.value), carr_offset) + + def test_003_channel_no_carroffset (self): + """ Add a channel, check if it's correctly estimated """ + fft_len = 16 + carr_offset = 0 + sync_symbol1 = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0) + sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1, 0, 0) + data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) + tx_data = sync_symbol1 + sync_symbol2 + data_symbol + channel = (0, 0, 0, 2, -2, 2, 3j, 2, 0, 2, 2, 2, 2, 3, 0, 0) + src = blocks.vector_source_c(tx_data, False, fft_len) + chan = blocks.multiply_const_vcc(channel) + chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, chan, chanest, sink) + self.tb.run() + tags = sink.tags() + self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel))) + for tag in tags: + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': + self.assertEqual(pmt.to_long(tag.value), carr_offset) + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': + self.assertEqual(pmt.c32vector_elements(tag.value), channel) + + def test_004_channel_no_carroffset_1sym (self): + """ Add a channel, check if it's correctly estimated. + Only uses 1 synchronisation symbol. """ + fft_len = 16 + carr_offset = 0 + sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0) + data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) + tx_data = sync_symbol + data_symbol + channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0) + src = blocks.vector_source_c(tx_data, False, fft_len) + chan = blocks.multiply_const_vcc(channel) + chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, chan, chanest, sink) + self.tb.run() + tags = sink.tags() + for tag in tags: + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': + self.assertEqual(pmt.to_long(tag.value), carr_offset) + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': + self.assertEqual(pmt.c32vector_elements(tag.value), channel) + + def test_005_both_1sym_force (self): + """ Add a channel, check if it's correctly estimated. + Only uses 1 synchronisation symbol. """ + fft_len = 16 + carr_offset = 0 + sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0) + ref_symbol = (0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0) + data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) + tx_data = sync_symbol + data_symbol + channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0) + src = blocks.vector_source_c(tx_data, False, fft_len) + chan = blocks.multiply_const_vcc(channel) + chanest = digital.ofdm_chanest_vcvc(sync_symbol, ref_symbol, 1) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, chan, chanest, sink) + self.tb.run() + tags = sink.tags() + for tag in tags: + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': + self.assertEqual(pmt.to_long(tag.value), carr_offset) + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': + self.assertEqual(pmt.c32vector_elements(tag.value), channel) + + def test_006_channel_and_carroffset (self): + """ Add a channel, check if it's correctly estimated """ + fft_len = 16 + carr_offset = 2 + # Index 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + sync_symbol1 = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0) + sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1, 0, 0) + data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) + # Channel 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + # Shifted (0, 0, 0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1) + tx_data = shift_tuple(sync_symbol1, carr_offset) + \ + shift_tuple(sync_symbol2, carr_offset) + \ + shift_tuple(data_symbol, carr_offset) + channel = range(fft_len) + src = blocks.vector_source_c(tx_data, False, fft_len) + chan = blocks.multiply_const_vcc(channel) + chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, chan, chanest, sink) + self.tb.run() + tags = sink.tags() + chan_est = None + for tag in tags: + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': + self.assertEqual(pmt.to_long(tag.value), carr_offset) + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': + chan_est = pmt.c32vector_elements(tag.value) + for i in range(fft_len): + if shift_tuple(sync_symbol2, carr_offset)[i]: # Only here the channel can be estimated + self.assertEqual(chan_est[i], channel[i]) + self.assertEqual(sink.data(), tuple(numpy.multiply(shift_tuple(data_symbol, carr_offset), channel))) + + + def test_999_all_at_once(self): + """docstring for test_999_all_at_once""" + fft_len = 32 + # 6 carriers empty, 10 carriers full, 1 DC carrier, 10 carriers full, 5 carriers empty + syncsym_mask = (0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0) + carrier_mask = (0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0) + max_offset = 4 + wgn_amplitude = 0.05 + min_chan_ampl = 0.1 + max_chan_ampl = 5 + n_iter = 20 + def run_flow_graph(sync_sym1, sync_sym2, data_sym): + top_block = gr.top_block() + carr_offset = random.randint(-max_offset/2, max_offset/2) * 2 + tx_data = shift_tuple(sync_sym1, carr_offset) + \ + shift_tuple(sync_sym2, carr_offset) + \ + shift_tuple(data_sym, carr_offset) + channel = [rand_range(min_chan_ampl, max_chan_ampl) * numpy.exp(1j * rand_range(0, 2 * numpy.pi)) for x in range(fft_len)] + src = blocks.vector_source_c(tx_data, False, fft_len) + chan = blocks.multiply_const_vcc(channel) + noise = analog.noise_source_c(analog.GR_GAUSSIAN, wgn_amplitude) + add = blocks.add_cc(fft_len) + chanest = digital.ofdm_chanest_vcvc(sync_sym1, sync_sym2, 1) + sink = blocks.vector_sink_c(fft_len) + top_block.connect(src, chan, (add, 0), chanest, sink) + top_block.connect(noise, blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), (add, 1)) + top_block.run() + channel_est = None + carr_offset_hat = 0 + rx_sym_est = [0,] * fft_len + tags = sink.tags() + for tag in tags: + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': + carr_offset_hat = pmt.to_long(tag.value) + self.assertEqual(carr_offset, carr_offset_hat) + if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': + channel_est = pmt.c32vector_elements(tag.value) + shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset) + for i in range(fft_len): + if shifted_carrier_mask[i] and channel_est[i]: + self.assertAlmostEqual(channel[i], channel_est[i], places=0) + rx_sym_est[i] = (sink.data()[i] / channel_est[i]).real + return (carr_offset, list(shift_tuple(rx_sym_est, -carr_offset_hat))) + bit_errors = 0 + for k in xrange(n_iter): + sync_sym = [(random.randint(0, 1) * 2 - 1) * syncsym_mask[i] for i in range(fft_len)] + ref_sym = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)] + data_sym = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)] + data_sym[26] = 1 + (carr_offset, rx_sym) = run_flow_graph(sync_sym, ref_sym, data_sym) + rx_sym_est = [0,] * fft_len + for i in xrange(fft_len): + if carrier_mask[i] == 0: + continue + rx_sym_est[i] = {True: 1, False: -1}[rx_sym[i] > 0] + if rx_sym_est[i] != data_sym[i]: + bit_errors += 1 + # This is much more than we could allow + self.assertTrue(bit_errors < n_iter) + + +if __name__ == '__main__': + gr_unittest.run(qa_ofdm_sync_eqinit_vcvc, "qa_ofdm_sync_eqinit_vcvc.xml") + diff --git a/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py b/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py new file mode 100755 index 0000000000..82d4950883 --- /dev/null +++ b/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +# +# Copyright 2007,2010,2011,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, digital, blocks +import pmt + +class test_ofdm_cyclic_prefixer (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_wo_tags_no_rolloff(self): + " The easiest test: make sure the CP is added correctly. " + fft_len = 8 + cp_len = 2 + expected_result = (6, 7, 0, 1, 2, 3, 4, 5, 6, 7, + 6, 7, 0, 1, 2, 3, 4, 5, 6, 7) + src = blocks.vector_source_c(range(fft_len) * 2, False, fft_len) + cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len) + sink = blocks.vector_sink_c() + self.tb.connect(src, cp, sink) + self.tb.run() + self.assertEqual(sink.data(), expected_result) + + def test_wo_tags_2s_rolloff(self): + " No tags, but have a 2-sample rolloff " + fft_len = 8 + cp_len = 2 + rolloff = 2 + expected_result = (7.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, # 1.0/2 + 7.0/2+1.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8) + src = blocks.vector_source_c(range(1, fft_len+1) * 2, False, fft_len) + cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, rolloff) + sink = blocks.vector_sink_c() + self.tb.connect(src, cp, sink) + self.tb.run() + self.assertEqual(sink.data(), expected_result) + + def test_with_tags_2s_rolloff(self): + " With tags and a 2-sample rolloff " + fft_len = 8 + cp_len = 2 + tag_name = "length" + expected_result = (7.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, # 1.0/2 + 7.0/2+1.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1.0/2) + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(2) + tag2 = gr.gr_tag_t() + tag2.offset = 1 + tag2.key = pmt.string_to_symbol("random_tag") + tag2.value = pmt.from_long(42) + src = blocks.vector_source_c(range(1, fft_len+1) * 2, False, fft_len, (tag, tag2)) + cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, 2, tag_name) + sink = blocks.vector_sink_c() + self.tb.connect(src, cp, sink) + self.tb.run() + self.assertEqual(sink.data(), expected_result) + tags = [gr.tag_to_python(x) for x in sink.tags()] + tags = sorted([(x.offset, x.key, x.value) for x in tags]) + expected_tags = [ + (0, tag_name, len(expected_result)), + (fft_len+cp_len, "random_tag", 42) + ] + self.assertEqual(tags, expected_tags) + + +if __name__ == '__main__': + gr_unittest.run(test_ofdm_cyclic_prefixer, "test_ofdm_cyclic_prefixer.xml") + diff --git a/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py new file mode 100755 index 0000000000..ee1e849d92 --- /dev/null +++ b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import numpy + +from gnuradio import gr, gr_unittest, digital, blocks +import pmt + +class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_simple (self): + """ Very simple functionality testing """ + fft_len = 8 + equalizer = digital.ofdm_equalizer_static(fft_len) + n_syms = 3 + len_tag_key = "frame_len" + tx_data = (1,) * fft_len * n_syms + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.string_to_symbol(len_tag_key) + len_tag.value = pmt.from_long(n_syms) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.init_c32vector(fft_len, (1,) * fft_len) + src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertEqual(tx_data, sink.data()) + for tag in sink.tags(): + self.assertEqual(pmt.symbol_to_string(tag.key), len_tag_key) + self.assertEqual(pmt.to_long(tag.value), n_syms) + + def test_002_static (self): + fft_len = 8 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly... + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + idx2 = idx+2*fft_len + channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.string_to_symbol(len_tag_key) + len_tag.value = pmt.from_long(4) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len]) + src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + self.assertEqual(tx_data, rx_data) + for tag in sink.tags(): + if pmt.symbol_to_string(tag.key) == len_tag_key: + self.assertEqual(pmt.to_long(tag.value), 4) + if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps": + self.assertEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:]) + + def test_002_simpledfe (self): + fft_len = 8 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_simpledfe( + fft_len, cnst.base(), occupied_carriers, pilot_carriers, pilot_symbols, 0, 0.01 + ) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly... + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + idx2 = idx+2*fft_len + channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.string_to_symbol(len_tag_key) + len_tag.value = pmt.from_long(4) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len]) + src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + self.assertEqual(tx_data, rx_data) + for tag in sink.tags(): + if pmt.symbol_to_string(tag.key) == len_tag_key: + self.assertEqual(pmt.to_long(tag.value), 4) + if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps": + self.assertComplexTuplesAlmostEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:], places=1) + + +if __name__ == '__main__': + gr_unittest.run(qa_ofdm_frame_equalizer_vcvc, "qa_ofdm_frame_equalizer_vcvc.xml") + diff --git a/gr-digital/python/digital/qa_ofdm_insert_preamble.py b/gr-digital/python/digital/qa_ofdm_insert_preamble.py index a9664d6fe6..4edd54c8c6 100755 --- a/gr-digital/python/digital/qa_ofdm_insert_preamble.py +++ b/gr-digital/python/digital/qa_ofdm_insert_preamble.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2007,2010-2012 Free Software Foundation, Inc. +# Copyright 2007,2010-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -34,8 +34,8 @@ class test_ofdm_insert_preamble(gr_unittest.TestCase): def helper(self, v0, v1, fft_length, preamble): tb = self.tb - src0 = gr.vector_source_c(v0) - src1 = gr.vector_source_b(v1) + src0 = blocks.vector_source_c(v0) + src1 = blocks.vector_source_b(v1) s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_length) @@ -44,8 +44,8 @@ class test_ofdm_insert_preamble(gr_unittest.TestCase): op = digital.ofdm_insert_preamble(fft_length, preamble) v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, fft_length) - dst0 = gr.vector_sink_c() - dst1 = gr.vector_sink_b() + dst0 = blocks.vector_sink_c() + dst1 = blocks.vector_sink_b() tb.connect(src0, s2v, (op, 0)) tb.connect(src1, (op, 1)) diff --git a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py new file mode 100755 index 0000000000..fb1f47f2fd --- /dev/null +++ b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +# +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import numpy + +from gnuradio import gr, gr_unittest, blocks. fft, analog, digital +import pmt + +class qa_ofdm_serializer_vcc (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_simple (self): + """ Standard test """ + fft_len = 16 + tx_symbols = range(1, 16); + tx_symbols = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0, + 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0, + 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0) + expected_result = tuple(range(1, 16)) + (0, 0, 0) + occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) + n_syms = len(tx_symbols)/fft_len + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(n_syms) + src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,)) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False) + sink = blocks.vector_sink_c() + self.tb.connect(src, serializer, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result) + self.assertEqual(len(sink.tags()), 1) + result_tag = sink.tags()[0] + self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name) + self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0])) + + def test_002_with_offset (self): + """ Standard test, carrier offset """ + fft_len = 16 + tx_symbols = range(1, 16); + tx_symbols = (0, 0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, + 0, 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, + 0, 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0) + carr_offset = 1 # Compare this with tx_symbols from the previous test + expected_result = tuple(range(1, 16)) + (0, 0, 0) + occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) + n_syms = len(tx_symbols)/fft_len + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(n_syms) + offsettag = gr.gr_tag_t() + offsettag.offset = 0 + offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") + offsettag.value = pmt.from_long(carr_offset) + src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag)) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False) + sink = blocks.vector_sink_c() + self.tb.connect(src, serializer, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result) + self.assertEqual(len(sink.tags()), 2) + for tag in sink.tags(): + if pmt.symbol_to_string(tag.key) == tag_name: + self.assertEqual(pmt.to_long(tag.value), n_syms * len(occupied_carriers[0])) + + def test_003_connect (self): + """ Connect carrier_allocator to ofdm_serializer, + make sure output==input """ + fft_len = 32 + n_syms = 10 + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((3,),(5,)) + pilot_symbols = ((1j,),(-1j,)) + sync_word = (range(fft_len),) + tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)]) + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(tx_data)) + src = blocks.vector_source_c(tx_data, False, 1, (tag,)) + alloc = digital.ofdm_carrier_allocator_cvc(fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, sync_word, + tag_name) + serializer = digital.ofdm_serializer_vcc(alloc) + sink = blocks.vector_sink_c() + self.tb.connect(src, alloc, serializer, sink) + self.tb.run () + self.assertEqual(sink.data()[4:], tx_data) + + def test_004_connect (self): + """ + Advanced test: + - Allocator -> IFFT -> Frequency offset -> FFT -> Serializer + - FFT does shift (moves DC to middle) + - Make sure input == output + - Frequency offset is -2 carriers + """ + fft_len = 8 + n_syms = 2 + carr_offset = -2 + freq_offset = 2 * numpy.pi * carr_offset / fft_len # If the sampling rate == 1 + occupied_carriers = ((1, 2, -2, -1),) + pilot_carriers = ((3,),(5,)) + pilot_symbols = ((1j,),(-1j,)) + sync_word = (range(fft_len),) + tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)]) + #tx_data = (1,) * occupied_carriers[0] * n_syms + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(tx_data)) + offsettag = gr.gr_tag_t() + offsettag.offset = 0 + offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") + offsettag.value = pmt.from_long(carr_offset) + src = blocks.vector_source_c(tx_data, False, 1, (tag, offsettag)) + alloc = digital.ofdm_carrier_allocator_cvc(fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, sync_word, + tag_name) + tx_ifft = fft.fft_vcc(fft_len, False, ()) + offset_sig = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0) + mixer = blocks.multiply_cc() + rx_fft = fft.fft_vcc(fft_len, True, (), True) + serializer = digital.ofdm_serializer_vcc(alloc) + sink = blocks.vector_sink_c() + self.tb.connect( + src, alloc, tx_ifft, + blocks.vector_to_stream(gr.sizeof_gr_complex, fft_len), + (mixer, 0), + blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), + rx_fft, serializer, sink + ) + self.tb.connect(offset_sig, (mixer, 1)) + self.tb.run () + # FIXME check this + #self.assertEqual(sink.data(), tx_data) + + def test_005_packet_len_tag (self): + """ Standard test """ + fft_len = 16 + tx_symbols = range(1, 16); + tx_symbols = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0, + 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0, + 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0) + expected_result = tuple(range(1, 16)) + occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) + n_syms = len(tx_symbols)/fft_len + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(n_syms) + tag2 = gr.gr_tag_t() + tag2.offset = 0 + tag2.key = pmt.string_to_symbol("packet_len") + tag2.value = pmt.from_long(len(expected_result)) + src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, tag2)) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, False) + sink = blocks.vector_sink_c() + self.tb.connect(src, serializer, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result) + self.assertEqual(len(sink.tags()), 1) + result_tag = sink.tags()[0] + self.assertEqual(pmt.symbol_to_string(result_tag.key), "packet_len") + self.assertEqual(pmt.to_long(result_tag.value), len(expected_result)) + + def test_099 (self): + """ Make sure it fails if it should """ + fft_len = 16 + occupied_carriers = ((1, 3, 4, 11, 12, 17),) + tag_name = "len" + self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, tag_name) + + +if __name__ == '__main__': + #gr_unittest.run(qa_ofdm_serializer_vcc, "qa_ofdm_serializer_vcc.xml") + gr_unittest.run(qa_ofdm_serializer_vcc) + diff --git a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py new file mode 100755 index 0000000000..ba50240916 --- /dev/null +++ b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +# +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import numpy +import random + +from gnuradio import gr, gr_unittest, blocks, analog + +try: + # This will work when feature #505 is added. + from gnuradio import digital + from gnuradio.digital.utils import tagged_streams + from gnuradio.digital.ofdm_txrx import ofdm_tx +except ImportError: + # Until then this will work. + import digital_swig as digital + from utils import tagged_streams + from ofdm_txrx import ofdm_tx + + +class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_detect (self): + """ Send two bursts, with zeros in between, and check + they are both detected at the correct position and no + false alarms occur """ + n_zeros = 15 + fft_len = 32 + cp_len = 4 + sig_len = (fft_len + cp_len) * 10 + sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len/2)] * 2 + tx_signal = [0,] * n_zeros + \ + sync_symbol[-cp_len:] + \ + sync_symbol + \ + [(random.randint(0, 1)*2)-1 for x in range(sig_len)] + tx_signal = tx_signal * 2 + add = blocks.add_cc() + sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len) + sink_freq = blocks.vector_sink_f() + sink_detect = blocks.vector_sink_b() + self.tb.connect(blocks.vector_source_c(tx_signal), (add, 0)) + self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .01), (add, 1)) + self.tb.connect(add, sync) + self.tb.connect((sync, 0), sink_freq) + self.tb.connect((sync, 1), sink_detect) + self.tb.run() + sig1_detect = sink_detect.data()[0:len(tx_signal)/2] + sig2_detect = sink_detect.data()[len(tx_signal)/2:] + self.assertTrue(abs(sig1_detect.index(1) - (n_zeros + fft_len + cp_len)) < cp_len) + self.assertTrue(abs(sig2_detect.index(1) - (n_zeros + fft_len + cp_len)) < cp_len) + self.assertEqual(numpy.sum(sig1_detect), 1) + self.assertEqual(numpy.sum(sig2_detect), 1) + + + def test_002_freq (self): + """ Add a fine frequency offset and see if that get's detected properly """ + fft_len = 32 + cp_len = 4 + freq_offset = 0.1 # Must stay < 2*pi/fft_len = 0.196 (otherwise, it's coarse) + sig_len = (fft_len + cp_len) * 10 + sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len/2)] * 2 + tx_signal = sync_symbol[-cp_len:] + \ + sync_symbol + \ + [(random.randint(0, 1)*2)-1 for x in range(sig_len)] + mult = blocks.multiply_cc() + add = blocks.add_cc() + sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len) + sink_freq = blocks.vector_sink_f() + sink_detect = blocks.vector_sink_b() + self.tb.connect(blocks.vector_source_c(tx_signal), (mult, 0), (add, 0)) + self.tb.connect(analog.sig_source_c(2 * numpy.pi, analog.GR_SIN_WAVE, freq_offset, 1.0), (mult, 1)) + self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .01), (add, 1)) + self.tb.connect(add, sync) + self.tb.connect((sync, 0), sink_freq) + self.tb.connect((sync, 1), sink_detect) + self.tb.run() + phi_hat = sink_freq.data()[sink_detect.data().index(1)] + est_freq_offset = 2 * phi_hat / fft_len + self.assertAlmostEqual(est_freq_offset, freq_offset, places=2) + + + def test_003_multiburst (self): + """ Send several bursts, see if the number of detects is correct. + Burst lengths and content are random. + """ + n_bursts = 42 + fft_len = 32 + cp_len = 4 + tx_signal = [] + for i in xrange(n_bursts): + sync_symbol = [(random.randint(0, 1)*2)-1 for x in range(fft_len/2)] * 2 + tx_signal += [0,] * random.randint(0, 2*fft_len) + \ + sync_symbol[-cp_len:] + \ + sync_symbol + \ + [(random.randint(0, 1)*2)-1 for x in range(fft_len * random.randint(5,23))] + add = blocks.add_cc() + sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len) + sink_freq = blocks.vector_sink_f() + sink_detect = blocks.vector_sink_b() + self.tb.connect(blocks.vector_source_c(tx_signal), (add, 0)) + self.tb.connect(analog.noise_source_c(analog.GR_GAUSSIAN, .005), (add, 1)) + self.tb.connect(add, sync) + self.tb.connect((sync, 0), sink_freq) + self.tb.connect((sync, 1), sink_detect) + self.tb.run() + self.assertEqual(numpy.sum(sink_detect.data()), n_bursts, + msg="""Because of statistics, it is possible (though unlikely) +that the number of detected bursts differs slightly. If the number of detects is +off by one or two, run the test again and see what happen. +Detection error was: %d """ % (numpy.sum(sink_detect.data()) - n_bursts) + ) + + # FIXME ofdm_mod is currently not working + #def test_004_ofdm_packets (self): + #""" + #Send several bursts, see if the number of detects is correct. + #Burst lengths and content are random. + #""" + #n_bursts = 42 + #fft_len = 64 + #cp_len = 12 + #tx_signal = [] + #packets = [] + #tagname = "length" + #min_packet_length = 100 + #max_packet_length = 100 + #sync_sequence = [random.randint(0, 1)*2-1 for x in range(fft_len/2)] + #for i in xrange(n_bursts): + #packet_length = random.randint(min_packet_length, + #max_packet_length+1) + #packet = [random.randint(0, 255) for i in range(packet_length)] + #packets.append(packet) + #data, tags = tagged_streams.packets_to_vectors( + #packets, tagname, vlen=1) + #total_length = len(data) + + #src = blocks.vector_source_b(data, False, 1, tags) + #mod = ofdm_tx( + #fft_len=fft_len, + #cp_len=cp_len, + #length_tag_name=tagname, + #occupied_carriers=(range(1, 27) + range(38, 64),), + #pilot_carriers=((0,),), + #pilot_symbols=((100,),), + #) + #rate_in = 16000 + #rate_out = 48000 + #ratio = float(rate_out) / rate_in + #throttle1 = gr.throttle(gr.sizeof_gr_complex, rate_in) + #sink_countbursts = gr.vector_sink_c() + #head = gr.head(gr.sizeof_gr_complex, int(total_length * ratio*2)) + #add = gr.add_cc() + #sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len) + #sink_freq = blocks.vector_sink_f() + #sink_detect = blocks.vector_sink_b() + #noise_level = 0.01 + #noise = gr.noise_source_c(gr.GR_GAUSSIAN, noise_level) + #self.tb.connect(src, mod, blocks.null_sink(gr.sizeof_gr_complex)) + #self.tb.connect(insert_zeros, sink_countbursts) + #self.tb.connect(noise, (add, 1)) + #self.tb.connect(add, sync) + #self.tb.connect((sync, 0), sink_freq) + #self.tb.connect((sync, 1), sink_detect) + #self.tb.run() + #count_data = sink_countbursts.data() + #count_tags = sink_countbursts.tags() + #burstcount = tagged_streams.count_bursts(count_data, count_tags, tagname) + #self.assertEqual(numpy.sum(sink_detect.data()), burstcount) + + +if __name__ == '__main__': + #gr_unittest.run(qa_ofdm_sync_sc_cfb, "qa_ofdm_sync_sc_cfb.xml") + gr_unittest.run(qa_ofdm_sync_sc_cfb) + diff --git a/gr-digital/python/digital/qa_ofdm_txrx.py b/gr-digital/python/digital/qa_ofdm_txrx.py new file mode 100755 index 0000000000..681041d47d --- /dev/null +++ b/gr-digital/python/digital/qa_ofdm_txrx.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import random + +import numpy +from gnuradio import gr, gr_unittest, digital, blocks +from gnuradio.digital.ofdm_txrx import ofdm_tx, ofdm_rx +from gnuradio.digital.utils import tagged_streams + +class test_ofdm_txrx (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001 (self): + pass + #len_tag_key = 'frame_len' + #n_bytes = 100 + #test_data = [random.randint(0, 255) for x in range(n_bytes)] + #tx_data, tags = tagged_streams.packets_to_vectors((test_data,), len_tag_key) + #src = blocks.vector_source_b(test_data, False, 1, tags) + #tx = ofdm_tx(frame_length_tag_key=len_tag_key) + #rx = ofdm_rx(frame_length_tag_key=len_tag_key) + #self.assertEqual(tx.sync_word1, rx.sync_word1) + #self.assertEqual(tx.sync_word2, rx.sync_word2) + #delay = blocks.delay(gr.sizeof_gr_complex, 100) + #noise = analog.noise_source_c(analog.GR_GAUSSIAN, 0.05) + #add = blocks.add_cc() + #sink = blocks.vector_sink_b() + ##self.tb.connect(src, tx, add, rx, sink) + ##self.tb.connect(noise, (add, 1)) + #self.tb.connect(src, tx, blocks.null_sink(gr.sizeof_gr_complex)) + #self.tb.run() + + +if __name__ == '__main__': + gr_unittest.run(test_ofdm_txrx, "test_ofdm_txrx.xml") + diff --git a/gr-digital/python/digital/qa_packet_headergenerator_bb.py b/gr-digital/python/digital/qa_packet_headergenerator_bb.py new file mode 100755 index 0000000000..aaf7f89918 --- /dev/null +++ b/gr-digital/python/digital/qa_packet_headergenerator_bb.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, digital, blocks +import pmt + +class qa_packet_headergenerator_bb (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_12bits (self): + # 3 PDUs: | | | | + data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4) + tagname = "packet_len" + tag1 = gr.gr_tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol(tagname) + tag1.value = pmt.from_long(4) + tag2 = gr.gr_tag_t() + tag2.offset = 4 + tag2.key = pmt.string_to_symbol(tagname) + tag2.value = pmt.from_long(2) + tag3 = gr.gr_tag_t() + tag3.offset = 6 + tag3.key = pmt.string_to_symbol(tagname) + tag3.value = pmt.from_long(4) + src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) + header = digital.packet_headergenerator_bb(12, tagname) + sink = blocks.vector_sink_b() + self.tb.connect(src, header, sink) + self.tb.run() + expected_data = ( + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0 + ) + self.assertEqual(sink.data(), expected_data) + + + def test_002_32bits (self): + # 3 PDUs: | | | | + data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4) + tagname = "packet_len" + tag1 = gr.gr_tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol(tagname) + tag1.value = pmt.from_long(4) + tag2 = gr.gr_tag_t() + tag2.offset = 4 + tag2.key = pmt.string_to_symbol(tagname) + tag2.value = pmt.from_long(2) + tag3 = gr.gr_tag_t() + tag3.offset = 6 + tag3.key = pmt.string_to_symbol(tagname) + tag3.value = pmt.from_long(4) + src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) + header = digital.packet_headergenerator_bb(32, tagname) + sink = blocks.vector_sink_b() + self.tb.connect(src, header, sink) + self.tb.run() + expected_data = ( + # | Number of symbols | Packet number | Parity + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + ) + self.assertEqual(sink.data(), expected_data) + + + def test_003_12bits_formatter_object (self): + # 3 PDUs: | | | | + data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4) + tagname = "packet_len" + tag1 = gr.gr_tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol(tagname) + tag1.value = pmt.from_long(4) + tag2 = gr.gr_tag_t() + tag2.offset = 4 + tag2.key = pmt.string_to_symbol(tagname) + tag2.value = pmt.from_long(2) + tag3 = gr.gr_tag_t() + tag3.offset = 6 + tag3.key = pmt.string_to_symbol(tagname) + tag3.value = pmt.from_long(4) + src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) + formatter_object = digital.packet_header_default(12, tagname) + header = digital.packet_headergenerator_bb(formatter_object.formatter(), tagname) + sink = blocks.vector_sink_b() + self.tb.connect(src, header, sink) + self.tb.run() + expected_data = ( + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0 + ) + self.assertEqual(sink.data(), expected_data) + + def test_004_8bits_formatter_ofdm (self): + occupied_carriers = ((1, 2, 3, 5, 6, 7),) + # 3 PDUs: | | | | + data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4) + tagname = "packet_len" + tag1 = gr.gr_tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol(tagname) + tag1.value = pmt.from_long(4) + tag2 = gr.gr_tag_t() + tag2.offset = 4 + tag2.key = pmt.string_to_symbol(tagname) + tag2.value = pmt.from_long(2) + tag3 = gr.gr_tag_t() + tag3.offset = 6 + tag3.key = pmt.string_to_symbol(tagname) + tag3.value = pmt.from_long(4) + src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) + formatter_object = digital.packet_header_ofdm(occupied_carriers, 1, tagname) + self.assertEqual(formatter_object.header_len(), 6) + self.assertEqual(pmt.symbol_to_string(formatter_object.len_tag_key()), tagname) + header = digital.packet_headergenerator_bb(formatter_object.formatter(), tagname) + sink = blocks.vector_sink_b() + self.tb.connect(src, header, sink) + self.tb.run() + expected_data = ( + 0, 0, 1, 0, 0, 0, + 0, 1, 0, 0, 0, 0, + 0, 0, 1, 0, 0, 0 + ) + self.assertEqual(sink.data(), expected_data) + +if __name__ == '__main__': + gr_unittest.run(qa_packet_headergenerator_bb, "qa_packet_headergenerator_bb.xml") + diff --git a/gr-digital/python/digital/qa_packet_headerparser_b.py b/gr-digital/python/digital/qa_packet_headerparser_b.py new file mode 100755 index 0000000000..b127c86fb1 --- /dev/null +++ b/gr-digital/python/digital/qa_packet_headerparser_b.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import time +from gnuradio import gr, gr_unittest, blocks, digital +import pmt + +class qa_packet_headerparser_b (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + expected_data = ( + # | Number of symbols | Packet number | Parity + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0 + ) + tagname = "packet_len" + + src = blocks.vector_source_b(expected_data) + parser = digital.packet_headerparser_b(32, tagname) + sink = blocks.message_debug() + + self.tb.connect(src, parser) + self.tb.msg_connect(parser, "header_data", sink, "store") + self.tb.start () + time.sleep(1) + self.tb.stop() + self.tb.wait() + + self.assertEqual(sink.num_messages(), 3) + msg = sink.get_message(0) + #try: + #self.assertEqual(4, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol(tagname), pmt.PMT_F))) + #self.assertEqual(0, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol("packet_num"), pmt.PMT_F))) + + #except: + #self.fail() + # msg1: length 4, number 0 + # msg2: length 2, number 1 + # msg3: PMT_F because parity fail + + + +if __name__ == '__main__': + gr_unittest.run(qa_packet_headerparser_b, "qa_packet_headerparser_b.xml") diff --git a/gr-digital/python/digital/qa_pfb_clock_sync.py b/gr-digital/python/digital/qa_pfb_clock_sync.py index 77a0c9eadf..286953ab34 100755 --- a/gr-digital/python/digital/qa_pfb_clock_sync.py +++ b/gr-digital/python/digital/qa_pfb_clock_sync.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2011 Free Software Foundation, Inc. +# Copyright 2011,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -23,7 +23,7 @@ import random import cmath -from gnuradio import gr, gr_unittest, filter, digital +from gnuradio import gr, gr_unittest, filter, digital, blocks class test_pfb_clock_sync(gr_unittest.TestCase): @@ -54,7 +54,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase): osps) data = 10000*[complex(1,0), complex(-1,0)] - self.src = gr.vector_source_c(data, False) + self.src = blocks.vector_source_c(data, False) # pulse shaping interpolation filter rrc_taps = filter.firdes.root_raised_cosine( @@ -65,7 +65,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase): ntaps) self.rrc_filter = filter.pfb_arb_resampler_ccf(sps, rrc_taps) - self.snk = gr.vector_sink_c() + self.snk = blocks.vector_sink_c() self.tb.connect(self.src, self.rrc_filter, self.test, self.snk) self.tb.run() @@ -107,7 +107,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase): osps) data = 10000*[1, -1] - self.src = gr.vector_source_f(data, False) + self.src = blocks.vector_source_f(data, False) # pulse shaping interpolation filter rrc_taps = filter.firdes.root_raised_cosine( @@ -118,7 +118,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase): ntaps) self.rrc_filter = filter.pfb_arb_resampler_fff(sps, rrc_taps) - self.snk = gr.vector_sink_f() + self.snk = blocks.vector_sink_f() self.tb.connect(self.src, self.rrc_filter, self.test, self.snk) self.tb.run() diff --git a/gr-digital/python/digital/qa_pn_correlator_cc.py b/gr-digital/python/digital/qa_pn_correlator_cc.py index f637ee7bd3..92041d9eda 100755 --- a/gr-digital/python/digital/qa_pn_correlator_cc.py +++ b/gr-digital/python/digital/qa_pn_correlator_cc.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2007,2010,2012 Free Software Foundation, Inc. +# Copyright 2007,2010,2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -37,10 +37,10 @@ class test_pn_correlator_cc(gr_unittest.TestCase): degree = 10 length = 2**degree-1 src = digital.glfsr_source_f(degree) - head = gr.head(gr.sizeof_float, length*length) + head = blocks.head(gr.sizeof_float, length*length) f2c = blocks.float_to_complex() corr = digital.pn_correlator_cc(degree) - dst = gr.vector_sink_c() + dst = blocks.vector_sink_c() self.tb.connect(src, head, f2c, corr, dst) self.tb.run() data = dst.data() diff --git a/gr-digital/python/digital/qa_probe_density.py b/gr-digital/python/digital/qa_probe_density.py index bcf4eeeaf0..752d95da3e 100755 --- a/gr-digital/python/digital/qa_probe_density.py +++ b/gr-digital/python/digital/qa_probe_density.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2012 Free Software Foundation, Inc. +# Copyright 2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_probe_density(gr_unittest.TestCase): @@ -33,7 +33,7 @@ class test_probe_density(gr_unittest.TestCase): def test_001(self): src_data = [0, 1, 0, 1] expected_data = 1 - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.probe_density_b(1) self.tb.connect(src, op) self.tb.run() @@ -45,7 +45,7 @@ class test_probe_density(gr_unittest.TestCase): def test_002(self): src_data = [1, 1, 1, 1] expected_data = 1 - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.probe_density_b(0.01) self.tb.connect(src, op) self.tb.run() @@ -56,7 +56,7 @@ class test_probe_density(gr_unittest.TestCase): def test_003(self): src_data = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1] expected_data = 0.95243 - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.probe_density_b(0.01) self.tb.connect(src, op) self.tb.run() diff --git a/gr-digital/python/digital/qa_scrambler.py b/gr-digital/python/digital/qa_scrambler.py index e11958c650..05daebd389 100755 --- a/gr-digital/python/digital/qa_scrambler.py +++ b/gr-digital/python/digital/qa_scrambler.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2008,2010,2012 Free Software Foundation, Inc. +# Copyright 2008,2010,2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_scrambler(gr_unittest.TestCase): @@ -32,30 +32,30 @@ class test_scrambler(gr_unittest.TestCase): def test_scrambler_descrambler(self): src_data = (1,)*1000 - src = gr.vector_source_b(src_data, False) + src = blocks.vector_source_b(src_data, False) scrambler = digital.scrambler_bb(0x8a, 0x7F, 7) # CCSDS 7-bit scrambler descrambler = digital.descrambler_bb(0x8a, 0x7F, 7) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, scrambler, descrambler, dst) self.tb.run() self.assertEqual(tuple(src_data[:-8]), dst.data()[8:]) # skip garbage during synchronization def test_additive_scrambler(self): src_data = (1,)*1000 - src = gr.vector_source_b(src_data, False) + src = blocks.vector_source_b(src_data, False) scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7) descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, scrambler, descrambler, dst) self.tb.run() self.assertEqual(src_data, dst.data()) def test_additive_scrambler_reset(self): src_data = (1,)*1000 - src = gr.vector_source_b(src_data, False) + src = blocks.vector_source_b(src_data, False) scrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100) descrambler = digital.additive_scrambler_bb(0x8a, 0x7f, 7, 100) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, scrambler, descrambler, dst) self.tb.run() self.assertEqual(src_data, dst.data()) diff --git a/gr-digital/python/digital/qa_simple_correlator.py b/gr-digital/python/digital/qa_simple_correlator.py index 4af1d542c8..f39fb62dda 100755 --- a/gr-digital/python/digital/qa_simple_correlator.py +++ b/gr-digital/python/digital/qa_simple_correlator.py @@ -41,7 +41,7 @@ class test_simple_correlator(gr_unittest.TestCase): # Just using a RRC for some basic filter shape taps = filter.firdes.root_raised_cosine(8, 8, 1.0, 0.5, 21) - src = gr.vector_source_b(expected_result) + src = blocks.vector_source_b(expected_result) frame = digital.simple_framer(4) unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) expand = filter.interp_fir_filter_fff(8, taps) @@ -49,7 +49,7 @@ class test_simple_correlator(gr_unittest.TestCase): mult2 = blocks.multiply_const_ff(2) sub1 = blocks.add_const_ff(-1) op = digital.simple_correlator(4) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, frame, unpack, b2f, mult2, sub1, expand) self.tb.connect(expand, op, dst) self.tb.run() diff --git a/gr-digital/python/digital/qa_simple_framer.py b/gr-digital/python/digital/qa_simple_framer.py index 095d61c5e5..cf9934648b 100755 --- a/gr-digital/python/digital/qa_simple_framer.py +++ b/gr-digital/python/digital/qa_simple_framer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc. +# Copyright 2004,2007,2010,2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, digital +from gnuradio import gr, gr_unittest, digital, blocks class test_simple_framer(gr_unittest.TestCase): @@ -42,9 +42,9 @@ class test_simple_framer(gr_unittest.TestCase): 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x02, 0x88, 0x99, 0xaa, 0xbb, 0x55, 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x03, 0xcc, 0xdd, 0xee, 0xff, 0x55) - src = gr.vector_source_b(src_data) + src = blocks.vector_source_b(src_data) op = digital.simple_framer(4) - dst = gr.vector_sink_b() + dst = blocks.vector_sink_b() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() diff --git a/gr-digital/python/digital/utils/tagged_streams.py b/gr-digital/python/digital/utils/tagged_streams.py new file mode 100644 index 0000000000..f2a58ffe1e --- /dev/null +++ b/gr-digital/python/digital/utils/tagged_streams.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr +import pmt + +def make_lengthtags(lengths, offsets, tagname='length', vlen=1): + tags = [] + assert(len(offsets) == len(lengths)) + for offset, length in zip(offsets, lengths): + tag = gr.gr_tag_t() + tag.offset = offset/vlen + tag.key = pmt.string_to_symbol(tagname) + tag.value = pmt.from_long(length/vlen) + tags.append(tag) + return tags + +def string_to_vector(string): + v = [] + for s in string: + v.append(ord(s)) + return v + +def strings_to_vectors(strings, lengthtagname): + vs = [string_to_vector(string) for string in strings] + return packets_to_vectors(vs, lengthtagname) + +def vector_to_string(v): + s = [] + for d in v: + s.append(chr(d)) + return ''.join(s) + +def vectors_to_strings(data, tags, lengthtagname): + packets = vectors_to_packets(data, tags, lengthtagname) + return [vector_to_string(packet) for packet in packets] + +def count_bursts(data, tags, lengthtagname, vlen=1): + lengthtags = [t for t in tags + if pmt.symbol_to_string(t.key) == lengthtagname] + lengths = {} + for tag in lengthtags: + if tag.offset in lengths: + raise ValueError( + "More than one tags with key {0} with the same offset={1}." + .format(lengthtagname, tag.offset)) + lengths[tag.offset] = pmt.to_long(tag.value)*vlen + in_burst = False + in_packet = False + packet_length = None + packet_pos = None + burst_count = 0 + for pos in range(len(data)): + if pos in lengths: + if in_packet: + print("Got tag at pos {0} current packet_pos is {1}".format(pos, packet_pos)) + raise StandardError("Received packet tag while in packet.") + packet_pos = -1 + packet_length = lengths[pos] + in_packet = True + if not in_burst: + burst_count += 1 + in_burst = True + elif not in_packet: + in_burst = False + if in_packet: + packet_pos += 1 + if packet_pos == packet_length-1: + in_packet = False + packet_pos = None + return burst_count + +def vectors_to_packets(data, tags, lengthtagname, vlen=1): + lengthtags = [t for t in tags + if pmt.symbol_to_string(t.key) == lengthtagname] + lengths = {} + for tag in lengthtags: + if tag.offset in lengths: + raise ValueError( + "More than one tags with key {0} with the same offset={1}." + .format(lengthtagname, tag.offset)) + lengths[tag.offset] = pmt.to_long(tag.value)*vlen + if 0 not in lengths: + raise ValueError("There is no tag with key {0} and an offset of 0" + .format(lengthtagname)) + pos = 0 + packets = [] + while pos < len(data): + if pos not in lengths: + raise ValueError("There is no tag with key {0} and an offset of {1}." + "We were expecting one." + .format(lengthtagname, pos)) + length = lengths[pos] + if length == 0: + raise ValueError("Packets cannot have zero length.") + if pos+length > len(data): + raise ValueError("The final packet is incomplete.") + packets.append(data[pos: pos+length]) + pos += length + return packets + +def packets_to_vectors(packets, lengthtagname, vlen=1): + tags = [] + data = [] + offset = 0 + for packet in packets: + data.extend(packet) + tag = gr.gr_tag_t() + tag.offset = offset/vlen + tag.key = pmt.string_to_symbol(lengthtagname) + tag.value = pmt.from_long(len(packet)/vlen) + tags.append(tag) + offset = offset + len(packet) + return data, tags |