#!/usr/bin/env python # # Copyright 2004, 2007, 2010, 2012, 2013, 2020 Free Software Foundation, Inc. # # This file is part of GNU Radio # # SPDX-License-Identifier: GPL-3.0-or-later # # import math import pmt from gnuradio import gr, gr_unittest, analog, blocks class test_sig_source(gr_unittest.TestCase): def setUp(self): self.tb = gr.top_block() def tearDown(self): self.tb = None def test_const_f(self): tb = self.tb expected_result = [1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5] src1 = analog.sig_source_f(1e6, analog.GR_CONST_WAVE, 0, 1.5) op = blocks.head(gr.sizeof_float, 10) dst1 = blocks.vector_sink_f() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertEqual(expected_result, dst_data) def test_const_i(self): tb = self.tb expected_result = [1, 1, 1, 1] src1 = analog.sig_source_i(1e6, analog.GR_CONST_WAVE, 0, 1) op = blocks.head(gr.sizeof_int, 4) dst1 = blocks.vector_sink_i() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertEqual(expected_result, dst_data) def test_const_b(self): tb = self.tb expected_result = [1, 1, 1, 1] src1 = analog.sig_source_b(1e6, analog.GR_CONST_WAVE, 0, 1) op = blocks.head(gr.sizeof_char, 4) dst1 = blocks.vector_sink_b() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertEqual(expected_result, dst_data) def test_sine_f(self): tb = self.tb sqrt2 = math.sqrt(2) / 2 expected_result = [0, sqrt2, 1, sqrt2, 0, -sqrt2, -1, -sqrt2, 0] src1 = analog.sig_source_f(8, analog.GR_SIN_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_float, 9) dst1 = blocks.vector_sink_f() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) def test_sine_b(self): tb = self.tb sqrt2 = math.sqrt(2) / 2 temp_result = [0, sqrt2, 1, sqrt2, 0, -sqrt2, -1, -sqrt2, 0] amp = 8 expected_result = tuple([int(z * amp) for z in temp_result]) src1 = analog.sig_source_b(8, analog.GR_SIN_WAVE, 1.0, amp) op = blocks.head(gr.sizeof_char, 9) dst1 = blocks.vector_sink_b() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() # Let the python know we are dealing with signed int behind scenes dst_data_signed = [b if b < 127 else (256 - b) * -1 for b in dst_data] self.assertFloatTuplesAlmostEqual(expected_result, dst_data_signed) def test_cosine_f(self): tb = self.tb sqrt2 = math.sqrt(2) / 2 expected_result = [1, sqrt2, 0, -sqrt2, -1, -sqrt2, 0, sqrt2, 1] src1 = analog.sig_source_f(8, analog.GR_COS_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_float, 9) dst1 = blocks.vector_sink_f() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) def test_cosine_c(self): tb = self.tb sqrt2 = math.sqrt(2) / 2 sqrt2j = 1j * math.sqrt(2) / 2 expected_result = [ 1, sqrt2 + sqrt2j, 1j, -sqrt2 + sqrt2j, -1, -sqrt2 - sqrt2j, -1j, sqrt2 - sqrt2j, 1 ] src1 = analog.sig_source_c(8, analog.GR_COS_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_gr_complex, 9) dst1 = blocks.vector_sink_c() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) def test_sqr_c(self): tb = self.tb # arg6 is a bit before -PI/2 expected_result = [1j, 1j, 0, 0, 1, 1, 1 + 0j, 1 + 1j, 1j] src1 = analog.sig_source_c(8, analog.GR_SQR_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_gr_complex, 9) dst1 = blocks.vector_sink_c() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertEqual(expected_result, dst_data) def test_tri_c(self): tb = self.tb expected_result = [ 1 + .5j, .75 + .75j, .5 + 1j, .25 + .75j, 0 + .5j, .25 + .25j, .5 + 0j, .75 + .25j, 1 + .5j ] src1 = analog.sig_source_c(8, analog.GR_TRI_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_gr_complex, 9) dst1 = blocks.vector_sink_c() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5) def test_saw_c(self): tb = self.tb expected_result = [ .5 + .25j, .625 + .375j, .75 + .5j, .875 + .625j, 0 + .75j, .125 + .875j, .25 + 1j, .375 + .125j, .5 + .25j ] src1 = analog.sig_source_c(8, analog.GR_SAW_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_gr_complex, 9) dst1 = blocks.vector_sink_c() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5) def test_sqr_f(self): tb = self.tb expected_result = [0, 0, 0, 0, 1, 1, 1, 1, 0] src1 = analog.sig_source_f(8, analog.GR_SQR_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_float, 9) dst1 = blocks.vector_sink_f() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertEqual(expected_result, dst_data) def test_tri_f(self): tb = self.tb expected_result = [1, .75, .5, .25, 0, .25, .5, .75, 1] src1 = analog.sig_source_f(8, analog.GR_TRI_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_float, 9) dst1 = blocks.vector_sink_f() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) def test_saw_f(self): tb = self.tb expected_result = [.5, .625, .75, .875, 0, .125, .25, .375, .5] src1 = analog.sig_source_f(8, analog.GR_SAW_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_float, 9) dst1 = blocks.vector_sink_f() tb.connect(src1, op) tb.connect(op, dst1) tb.run() dst_data = dst1.data() self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) def test_freq_msg(self): # deprecated but still tested src = analog.sig_source_c(8, analog.GR_SIN_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_gr_complex, 9) snk = blocks.vector_sink_c() self.tb.connect(src, op, snk) self.assertAlmostEqual(src.frequency(), 1.0) frequency = 3.0 src._post(pmt.to_pmt('freq'), pmt.from_double(frequency)) self.tb.run() self.assertAlmostEqual(src.frequency(), frequency) def test_cmd_msg(self): src = analog.sig_source_c(8, analog.GR_SIN_WAVE, 1.0, 1.0) op = blocks.head(gr.sizeof_gr_complex, 9) snk = blocks.vector_sink_c() self.tb.connect(src, op, snk) self.assertAlmostEqual(src.frequency(), 1.0) frequency = 3.0 amplitude = 10 offset = -1.0 src._post( pmt.to_pmt('freq'), pmt.to_pmt({ "freq": frequency, "ampl": amplitude, "offset": offset })) self.tb.run() self.assertAlmostEqual(src.frequency(), frequency) self.assertAlmostEqual(src.amplitude(), amplitude) self.assertAlmostEqual(src.offset(), offset) if __name__ == '__main__': gr_unittest.run(test_sig_source)