diff options
-rw-r--r-- | gr-filter/lib/pfb_channelizer_ccf_impl.cc | 3 | ||||
-rw-r--r-- | gr-filter/lib/pfb_decimator_ccf_impl.cc | 4 | ||||
-rw-r--r-- | gr-filter/lib/pfb_interpolator_ccf_impl.cc | 4 | ||||
-rwxr-xr-x | gr-filter/python/filter/qa_pfb_channelizer.py | 25 | ||||
-rwxr-xr-x | gr-filter/python/filter/qa_pfb_decimator.py | 111 | ||||
-rwxr-xr-x | gr-filter/python/filter/qa_pfb_interpolator.py | 18 |
6 files changed, 87 insertions, 78 deletions
diff --git a/gr-filter/lib/pfb_channelizer_ccf_impl.cc b/gr-filter/lib/pfb_channelizer_ccf_impl.cc index bb2aa07a54..c28434b6cb 100644 --- a/gr-filter/lib/pfb_channelizer_ccf_impl.cc +++ b/gr-filter/lib/pfb_channelizer_ccf_impl.cc @@ -81,6 +81,9 @@ namespace gr { d_output_multiple++; set_output_multiple(d_output_multiple); + // History is the length of each filter arm plus 1. + // The +1 comes from the channel mapping in the work function + // where we start n=1 so that we can look at in[n-1] set_history(d_taps_per_filter+1); } diff --git a/gr-filter/lib/pfb_decimator_ccf_impl.cc b/gr-filter/lib/pfb_decimator_ccf_impl.cc index 0010d6366c..a9e5138d18 100644 --- a/gr-filter/lib/pfb_decimator_ccf_impl.cc +++ b/gr-filter/lib/pfb_decimator_ccf_impl.cc @@ -53,7 +53,7 @@ namespace gr { d_rotator = new gr_complex[d_rate]; set_relative_rate(1.0/(float)decim); - set_history(d_taps_per_filter+1); + set_history(d_taps_per_filter); } pfb_decimator_ccf_impl::~pfb_decimator_ccf_impl() @@ -66,7 +66,7 @@ namespace gr { gr::thread::scoped_lock guard(d_mutex); polyphase_filterbank::set_taps(taps); - set_history(d_taps_per_filter+1); + set_history(d_taps_per_filter); d_updated = true; } diff --git a/gr-filter/lib/pfb_interpolator_ccf_impl.cc b/gr-filter/lib/pfb_interpolator_ccf_impl.cc index fce5fb2e68..a0ed73fd4e 100644 --- a/gr-filter/lib/pfb_interpolator_ccf_impl.cc +++ b/gr-filter/lib/pfb_interpolator_ccf_impl.cc @@ -48,7 +48,7 @@ namespace gr { polyphase_filterbank(interp, taps), d_updated (false), d_rate(interp) { - set_history(d_taps_per_filter+1); + set_history(d_taps_per_filter); } pfb_interpolator_ccf_impl::~pfb_interpolator_ccf_impl() @@ -61,7 +61,7 @@ namespace gr { gr::thread::scoped_lock guard(d_mutex); polyphase_filterbank::set_taps(taps); - set_history(d_taps_per_filter+1); + set_history(d_taps_per_filter); d_updated = true; } diff --git a/gr-filter/python/filter/qa_pfb_channelizer.py b/gr-filter/python/filter/qa_pfb_channelizer.py index fcabbaefcc..4c108e8443 100755 --- a/gr-filter/python/filter/qa_pfb_channelizer.py +++ b/gr-filter/python/filter/qa_pfb_channelizer.py @@ -40,16 +40,16 @@ class test_pfb_channelizer(gr_unittest.TestCase): def test_000(self): N = 1000 # number of samples to use M = 5 # Number of channels to channelize - fs = 1000 # baseband sampling rate + fs = 5000 # baseband sampling rate ifs = M*fs # input samp rate to channelizer - taps = filter.firdes.low_pass_2(1, ifs, 500, 50, + taps = filter.firdes.low_pass_2(1, ifs, fs/2, fs/10, attenuation_dB=80, window=filter.firdes.WIN_BLACKMAN_hARRIS) signals = list() add = blocks.add_cc() - freqs = [-200, -100, 0, 100, 200] + freqs = [-230., 121., 110., -513., 203.] for i in xrange(len(freqs)): f = freqs[i] + (M/2-M+i+1)*fs data = sig_source_c(ifs, f, 1, N) @@ -71,14 +71,21 @@ class test_pfb_channelizer(gr_unittest.TestCase): Ntest = 50 L = len(snks[0].data()) - t = map(lambda x: float(x)/fs, xrange(L)) # Adjusted phase rotations for data - p0 = 0 - p1 = math.pi*0.51998885 - p2 = -math.pi*0.96002233 - p3 = math.pi*0.96002233 - p4 = -math.pi*0.51998885 + p0 = -2*math.pi * 0 / M + p1 = -2*math.pi * 1 / M + p2 = -2*math.pi * 2 / M + p3 = -2*math.pi * 3 / M + p4 = -2*math.pi * 4 / M + + # Filter delay is the normal delay of each arm + tpf = math.ceil(len(taps) / float(M)) + delay = -(tpf - 1.0) / 2.0 + delay = int(delay) + + # Create a time scale that's delayed to match the filter delay + t = map(lambda x: float(x)/fs, xrange(delay, L+delay)) # Create known data as complex sinusoids at the different baseband freqs # the different channel numbering is due to channelizer output order. diff --git a/gr-filter/python/filter/qa_pfb_decimator.py b/gr-filter/python/filter/qa_pfb_decimator.py index 8cc068e79e..ea7a15570c 100755 --- a/gr-filter/python/filter/qa_pfb_decimator.py +++ b/gr-filter/python/filter/qa_pfb_decimator.py @@ -29,20 +29,11 @@ def sig_source_c(samp_rate, freq, amp, N): 1j*math.sin(2.*math.pi*freq*x), t) return y -class test_pfb_decimator(gr_unittest.TestCase): - - def setUp(self): - self.tb = gr.top_block() - - def tearDown(self): - self.tb = None - - def test_000(self): +def run_test(tb, channel): N = 1000 # number of samples to use M = 5 # Number of channels - fs = 1000 # baseband sampling rate + fs = 5000.0 # baseband sampling rate ifs = M*fs # input samp rate to decimator - channel = 0 # Extract channel 0 taps = filter.firdes.low_pass_2(1, ifs, fs/2, fs/10, attenuation_dB=80, @@ -50,81 +41,81 @@ class test_pfb_decimator(gr_unittest.TestCase): signals = list() add = blocks.add_cc() - freqs = [-200, -100, 0, 100, 200] + freqs = [-230., 121., 110., -513., 203.] + Mch = ((len(freqs)-1)/2 + channel) % len(freqs) for i in xrange(len(freqs)): f = freqs[i] + (M/2-M+i+1)*fs data = sig_source_c(ifs, f, 1, N) signals.append(blocks.vector_source_c(data)) - self.tb.connect(signals[i], (add,i)) + tb.connect(signals[i], (add,i)) - head = blocks.head(gr.sizeof_gr_complex, N) s2ss = blocks.stream_to_streams(gr.sizeof_gr_complex, M) pfb = filter.pfb_decimator_ccf(M, taps, channel) snk = blocks.vector_sink_c() - self.tb.connect(add, head, s2ss) + tb.connect(add, s2ss) for i in xrange(M): - self.tb.connect((s2ss,i), (pfb,i)) - self.tb.connect(pfb, snk) + tb.connect((s2ss,i), (pfb,i)) + tb.connect(pfb, snk) + tb.run() - self.tb.run() - - Ntest = 50 L = len(snk.data()) - t = map(lambda x: float(x)/fs, xrange(L)) + + # Each channel is rotated by 2pi/M + phase = -2*math.pi * channel / M + + # Filter delay is the normal delay of each arm + tpf = math.ceil(len(taps) / float(M)) + delay = -(tpf - 1.0) / 2.0 + delay = int(delay) + + # Create a time scale that's delayed to match the filter delay + t = map(lambda x: float(x)/fs, xrange(delay, L+delay)) # Create known data as complex sinusoids for the baseband freq # of the extracted channel is due to decimator output order. - phase = 0 - expected_data = map(lambda x: math.cos(2.*math.pi*freqs[2]*x+phase) + \ - 1j*math.sin(2.*math.pi*freqs[2]*x+phase), t) - + expected_data = map(lambda x: math.cos(2.*math.pi*freqs[Mch]*x+phase) + \ + 1j*math.sin(2.*math.pi*freqs[Mch]*x+phase), t) dst_data = snk.data() - self.assertComplexTuplesAlmostEqual(expected_data[-Ntest:], dst_data[-Ntest:], 4) - - def test_001(self): - N = 1000 # number of samples to use - M = 5 # Number of channels - fs = 1000 # baseband sampling rate - ifs = M*fs # input samp rate to decimator - channel = 1 # Extract channel 0 + return (dst_data, expected_data) - taps = filter.firdes.low_pass_2(1, ifs, fs/2, fs/10, - attenuation_dB=80, - window=filter.firdes.WIN_BLACKMAN_hARRIS) +class test_pfb_decimator(gr_unittest.TestCase): - signals = list() - add = blocks.add_cc() - freqs = [-200, -100, 0, 100, 200] - for i in xrange(len(freqs)): - f = freqs[i] + (M/2-M+i+1)*fs - data = sig_source_c(ifs, f, 1, N) - signals.append(blocks.vector_source_c(data)) - self.tb.connect(signals[i], (add,i)) + def setUp(self): + self.tb = gr.top_block() - s2ss = blocks.stream_to_streams(gr.sizeof_gr_complex, M) - pfb = filter.pfb_decimator_ccf(M, taps, channel) - snk = blocks.vector_sink_c() + def tearDown(self): + self.tb = None - self.tb.connect(add, s2ss) - for i in xrange(M): - self.tb.connect((s2ss,i), (pfb,i)) - self.tb.connect(pfb, snk) + def test_000(self): + Ntest = 50 + dst_data, expected_data = run_test(self.tb, 0) + + self.assertComplexTuplesAlmostEqual(expected_data[-Ntest:], dst_data[-Ntest:], 4) - self.tb.run() + def test_001(self): + Ntest = 50 + dst_data, expected_data = run_test(self.tb, 1) + + self.assertComplexTuplesAlmostEqual(expected_data[-Ntest:], dst_data[-Ntest:], 4) + def test_002(self): Ntest = 50 - L = len(snk.data()) - t = map(lambda x: float(x)/fs, xrange(L)) + dst_data, expected_data = run_test(self.tb, 2) + + self.assertComplexTuplesAlmostEqual(expected_data[-Ntest:], dst_data[-Ntest:], 4) - # Create known data as complex sinusoids for the baseband freq - # of the extracted channel is due to decimator output order. - phase = 6.1575 - expected_data = map(lambda x: math.cos(2.*math.pi*freqs[3]*x+phase) + \ - 1j*math.sin(2.*math.pi*freqs[3]*x+phase), t) - dst_data = snk.data() + def test_003(self): + Ntest = 50 + dst_data, expected_data = run_test(self.tb, 3) + + self.assertComplexTuplesAlmostEqual(expected_data[-Ntest:], dst_data[-Ntest:], 4) + def test_004(self): + Ntest = 50 + dst_data, expected_data = run_test(self.tb, 4) + self.assertComplexTuplesAlmostEqual(expected_data[-Ntest:], dst_data[-Ntest:], 4) if __name__ == '__main__': diff --git a/gr-filter/python/filter/qa_pfb_interpolator.py b/gr-filter/python/filter/qa_pfb_interpolator.py index 72856c6984..33c22675af 100755 --- a/gr-filter/python/filter/qa_pfb_interpolator.py +++ b/gr-filter/python/filter/qa_pfb_interpolator.py @@ -48,7 +48,7 @@ class test_pfb_interpolator(gr_unittest.TestCase): attenuation_dB=80, window=filter.firdes.WIN_BLACKMAN_hARRIS) - freq = 100 + freq = 123.456 data = sig_source_c(fs, freq, 1, N) signal = blocks.vector_source_c(data) pfb = filter.pfb_interpolator_ccf(M, taps) @@ -61,11 +61,19 @@ class test_pfb_interpolator(gr_unittest.TestCase): Ntest = 50 L = len(snk.data()) - t = map(lambda x: float(x)/ifs, xrange(L)) - # Create known data as complex sinusoids at freq - # of the channel at the interpolated rate. - phase = 0.62833 + # Can only get channel 0 out; no phase rotation + phase = 0 + + # Calculate the filter delay + delay = -(len(taps) - 1) / 2.0 - (M-1) + delay = int(delay) + + # Create a time scale that's delayed to match the filter delay + t = map(lambda x: float(x)/ifs, xrange(delay, L+delay)) + + # Create known data as complex sinusoids for the baseband freq + # of the extracted channel is due to decimator output order. expected_data = map(lambda x: math.cos(2.*math.pi*freq*x+phase) + \ 1j*math.sin(2.*math.pi*freq*x+phase), t) |