diff options
author | Terry May <terrydmay@gmail.com> | 2019-11-11 14:19:36 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2020-04-11 15:15:13 -0700 |
commit | 572428babb6db488cc17bfe2ada63ffc23a50d63 (patch) | |
tree | 258eabfdc6b65491df636507260343d3c974c06f /gr-filter | |
parent | bad1040058004cd193bfea31c544f41a7e6d12e3 (diff) |
filter: Fix YAML files for C++ generation
- This enables C++ support for some filter blocks.
- Re-factored taps generation in pfb.py to allow c++ code to leverage
default taps values.
Diffstat (limited to 'gr-filter')
-rw-r--r-- | gr-filter/grc/filter_low_pass_filter.block.yml | 10 | ||||
-rw-r--r-- | gr-filter/grc/filter_pfb_arb_resampler.block.yml | 35 | ||||
-rw-r--r-- | gr-filter/grc/filter_pfb_synthesizer.block.yml | 10 | ||||
-rw-r--r-- | gr-filter/grc/filter_rational_resampler_xxx.block.yml | 46 | ||||
-rw-r--r-- | gr-filter/python/filter/pfb.py | 306 | ||||
-rw-r--r-- | gr-filter/python/filter/rational_resampler.py | 11 |
6 files changed, 257 insertions, 161 deletions
diff --git a/gr-filter/grc/filter_low_pass_filter.block.yml b/gr-filter/grc/filter_low_pass_filter.block.yml index 0313727f1..d9393b40c 100644 --- a/gr-filter/grc/filter_low_pass_filter.block.yml +++ b/gr-filter/grc/filter_low_pass_filter.block.yml @@ -76,21 +76,21 @@ templates: ${beta})) cpp_templates: - includes: ['#include <gnuradio/filter/firdes.h>'] - declarations: 'filter::firdes::sptr ${id};' + includes: ['#include <gnuradio/filter/firdes.h>', '#include <gnuradio/filter/interp_fir_filter.h>', '#include <gnuradio/filter/fir_filter_blk.h>'] + declarations: 'gr::filter::${type}::sptr ${id};' make: |- this->${id} = filter::${type}::make( ${ interp if str(type).startswith('interp') else decim }, - firdes.low_pass( + gr::filter::firdes::low_pass( ${gain}, ${samp_rate}, ${cutoff_freq}, ${width}, - ${win}, + ${win.replace('firdes.', 'gr::filter::firdes::')}, ${beta})); link: ['gnuradio-filter'] callbacks: - - set_taps(firdes::low_pass(${gain}, ${samp_rate}, ${cutoff_freq}, ${width}, ${win}, + - set_taps(gr::filter::firdes::low_pass(${gain}, ${samp_rate}, ${cutoff_freq}, ${width}, ${win.replace('firdes.', 'gr::filter::firdes::')}, ${beta})) documentation: |- diff --git a/gr-filter/grc/filter_pfb_arb_resampler.block.yml b/gr-filter/grc/filter_pfb_arb_resampler.block.yml index ca5412415..add43df14 100644 --- a/gr-filter/grc/filter_pfb_arb_resampler.block.yml +++ b/gr-filter/grc/filter_pfb_arb_resampler.block.yml @@ -34,6 +34,9 @@ parameters: default: '0' hide: part +asserts: +- ${ rrate > 0 } + inputs: - domain: stream dtype: ${ type.input } @@ -57,19 +60,39 @@ templates: - set_rate(${rrate}) cpp_templates: - includes: ['#include <gnuradio/filter/pfb.arb_resampler_${type}.h>'] - declarations: 'pfb::arb_resampler_${type}::sptr ${id};' + includes: ['#include <gnuradio/filter/pfb_arb_resampler_${type}.h>'] + declarations: 'gr::filter::pfb_arb_resampler_${type}::sptr ${id};' make: |- + <% + taps = self.context.get('taps') + try: taps + except NameError: taps = None + taps = None if (len(taps) == 0) else taps + %> + % if taps == None or len(taps) == 0: + <% + # Leverage python module to generate values for taps + from gnuradio import filter + %> + % if type == 'fff': + <% taps = filter.pfb.arb_resampler_fff.create_taps(float(self.context.get('rrate')), int(self.context.get('nfilts')), float(self.context.get('atten'))) %> + % elif type == 'ccf': + <% taps = filter.pfb.arb_resampler_ccf.create_taps(float(self.context.get('rrate')), int(self.context.get('nfilts')), float(self.context.get('atten'))) %> + % elif type == 'ccc': + <% taps = filter.pfb.arb_resampler_ccc.create_taps(float(self.context.get('rrate')), int(self.context.get('nfilts')), float(self.context.get('atten'))) %> + % endif + % endif + % if str(type.taps) == "complex_vector": std::vector<gr_complex> taps = {${str(taps)[1:-1]}}; % else: std::vector<float> taps = {${str(taps)[1:-1]}}; % endif - this->${id} =pfb::arb_resampler_${type}::make( + this->${id} = gr::filter::pfb_arb_resampler_${type}::make( ${rrate}, - taps=${ taps if taps else 'None' }, - flt_size=${nfilts}); - this->${id}.declare_sample_delay(${samp_delay}); + taps, + ${nfilts}); + this->${id}->declare_sample_delay(${samp_delay}); link: ['gnuradio-filter'] callbacks: - set_taps(taps) diff --git a/gr-filter/grc/filter_pfb_synthesizer.block.yml b/gr-filter/grc/filter_pfb_synthesizer.block.yml index 5b384ef87..e6cbbc8bb 100644 --- a/gr-filter/grc/filter_pfb_synthesizer.block.yml +++ b/gr-filter/grc/filter_pfb_synthesizer.block.yml @@ -60,16 +60,16 @@ templates: cpp_templates: includes: ['#include <gnuradio/filter/pfb_synthesizer_ccf.h>'] - declarations: 'pfb::synthesizer_ccf::sptr ${id};' + declarations: 'gr::filter::pfb_synthesizer_ccf::sptr ${id};' make: |- - std::vector<float> taps = {${str(taps)[1:-1]}}; + std::vector<float> taps = {${str(eval(taps))[1:-1]}}; std::vector<int> ch_map = {${str(ch_map)[1:-1]}}; - this->${id} = pfb::synthesizer_ccf::make( + this->${id} = gr::filter::pfb_synthesizer_ccf::make( ${numchans}, taps, ${twox}); - this->${id}.set_channel_map(${ch_map}); - this->${id}.declare_sample_delay(${samp_delay}); + this->${id}->set_channel_map(ch_map); + this->${id}->declare_sample_delay(${samp_delay}); link: ['gnuradio-filter'] callbacks: - set_taps(taps) diff --git a/gr-filter/grc/filter_rational_resampler_xxx.block.yml b/gr-filter/grc/filter_rational_resampler_xxx.block.yml index bf11b7a25..1ac8238b7 100644 --- a/gr-filter/grc/filter_rational_resampler_xxx.block.yml +++ b/gr-filter/grc/filter_rational_resampler_xxx.block.yml @@ -1,6 +1,6 @@ id: rational_resampler_xxx label: Rational Resampler -flags: [ python ] +flags: [ python, cpp ] parameters: - id: type @@ -59,6 +59,50 @@ templates: callbacks: - set_taps(${taps}) +cpp_templates: + includes: ['#include <gnuradio/filter/rational_resampler_base.h>'] + declarations: 'gr::filter::rational_resampler_base_${type}::sptr ${id};' + make: |- + <% + taps = self.context.get('taps') + try: taps + except (NameError): taps = None + taps = None if (len(taps) == 0) else taps + %> + % if taps == None or len(taps) == 0: + <% + # Leverage python module to generate default values for taps + from gnuradio import filter + %> + <% + fbw = self.context.get('fbw') + fbw = float(fbw) if (len(fbw) > 0 and float(fbw) != 0) else None + iotype = self.context.get('type') + ftype = eval('filter.rational_resampler.rational_resampler_' + str(iotype)) + fltr = ftype(int(self.context.get('interp')), int(self.context.get('decim')), taps, fbw) + taps = fltr.taps() + # Format complex taps values for C++ + if iotype == 'fcc' or iotype == 'ccc': + cmplx_taps = [] + for cmplx in taps: + cmplx_taps.append({cmplx.real, cmplx.imag}) + taps = cmplx_taps + %> + % endif + + % if str(type.taps) == "complex_vector": + std::vector<gr_complex> taps = {${str(taps)[1:-1]}}; + % else: + std::vector<float> taps = {${str(taps)[1:-1]}}; + % endif + this->${id} = gr::filter::rational_resampler_base_${type}::make( + ${interp}, + ${decim}, + taps); + link: ['gnuradio-filter'] + callbacks: + - set_taps(${taps}) + documentation: |- This is a rational resampling polyphase FIR filter. diff --git a/gr-filter/python/filter/pfb.py b/gr-filter/python/filter/pfb.py index 284c9d7d6..c26b7d5b0 100644 --- a/gr-filter/python/filter/pfb.py +++ b/gr-filter/python/filter/pfb.py @@ -38,23 +38,7 @@ class channelizer_ccf(gr.hier_block2): if (taps is not None) and (len(taps) > 0): self._taps = taps else: - # Create a filter that covers the full bandwidth of the input signal - bw = 0.4 - tb = 0.2 - ripple = 0.1 - made = False - while not made: - try: - self._taps = optfir.low_pass(1, self._nchans, bw, bw+tb, ripple, atten) - made = True - except RuntimeError: - ripple += 0.01 - made = False - print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) - - # Build in an exit strategy; if we've come this far, it ain't working. - if(ripple >= 1.0): - raise RuntimeError("optfir could not generate an appropriate filter.") + self._taps = self.create_taps(self._nchans, atten) self.s2ss = blocks.stream_to_streams(gr.sizeof_gr_complex, self._nchans) self.pfb = filter.pfb_channelizer_ccf(self._nchans, self._taps, @@ -76,6 +60,24 @@ class channelizer_ccf(gr.hier_block2): def declare_sample_delay(self, delay): self.pfb.declare_sample_delay(delay) + + @staticmethod + def create_taps(numchans, atten=100): + # Create a filter that covers the full bandwidth of the input signal + bw = 0.4 + tb = 0.2 + ripple = 0.1 + while True: + try: + taps = optfir.low_pass(1, self._nchans, bw, bw+tb, ripple, atten) + return taps + except RuntimeError: + ripple += 0.01 + print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) + + # Build in an exit strategy; if we've come this far, it ain't working. + if(ripple >= 1.0): + raise RuntimeError("optfir could not generate an appropriate filter.") class interpolator_ccf(gr.hier_block2): @@ -98,23 +100,7 @@ class interpolator_ccf(gr.hier_block2): if (taps is not None) and (len(taps) > 0): self._taps = taps else: - # Create a filter that covers the full bandwidth of the input signal - bw = 0.4 - tb = 0.2 - ripple = 0.99 - made = False - while not made: - try: - self._taps = optfir.low_pass(self._interp, self._interp, bw, bw+tb, ripple, atten) - made = True - except RuntimeError: - ripple += 0.01 - made = False - print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) - - # Build in an exit strategy; if we've come this far, it ain't working. - if(ripple >= 1.0): - raise RuntimeError("optfir could not generate an appropriate filter.") + self._taps = self.create_taps(self._interp, atten) self.pfb = filter.pfb_interpolator_ccf(self._interp, self._taps) @@ -127,6 +113,25 @@ class interpolator_ccf(gr.hier_block2): def declare_sample_delay(self, delay): self.pfb.declare_sample_delay(delay) + @staticmethod + def create_taps(interp, atten): + # Create a filter that covers the full bandwidth of the input signal + bw = 0.4 + tb = 0.2 + ripple = 0.99 + while True: + try: + taps = optfir.low_pass(interp, interp, bw, bw+tb, ripple, atten) + return taps + except RuntimeError: + ripple += 0.01 + print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) + + # Build in an exit strategy; if we've come this far, it ain't working. + if(ripple >= 1.0): + raise RuntimeError("optfir could not generate an appropriate filter.") + + class decimator_ccf(gr.hier_block2): ''' @@ -147,23 +152,7 @@ class decimator_ccf(gr.hier_block2): if (taps is not None) and (len(taps) > 0): self._taps = taps else: - # Create a filter that covers the full bandwidth of the input signal - bw = 0.4 - tb = 0.2 - ripple = 0.1 - made = False - while not made: - try: - self._taps = optfir.low_pass(1, self._decim, bw, bw+tb, ripple, atten) - made = True - except RuntimeError: - ripple += 0.01 - made = False - print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) - - # Build in an exit strategy; if we've come this far, it ain't working. - if(ripple >= 1.0): - raise RuntimeError("optfir could not generate an appropriate filter.") + self._taps = self.create_taps(self._decim, atten) self.s2ss = blocks.stream_to_streams(gr.sizeof_gr_complex, self._decim) self.pfb = filter.pfb_decimator_ccf(self._decim, self._taps, self._channel, @@ -184,6 +173,24 @@ class decimator_ccf(gr.hier_block2): def declare_sample_delay(self, delay): self.pfb.declare_sample_delay(delay) + + @staticmethod + def create_taps(decim, atten=100): + # Create a filter that covers the full bandwidth of the input signal + bw = 0.4 + tb = 0.2 + ripple = 0.1 + while True: + try: + taps = optfir.low_pass(1, decim, bw, bw+tb, ripple, atten) + return taps + except RuntimeError: + ripple += 0.01 + print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) + + # Build in an exit strategy; if we've come this far, it ain't working. + if(ripple >= 1.0): + raise RuntimeError("optfir could not generate an appropriate filter.") class arb_resampler_ccf(gr.hier_block2): @@ -206,43 +213,7 @@ class arb_resampler_ccf(gr.hier_block2): if (taps is not None) and (len(taps) > 0): self._taps = taps else: - # Create a filter that covers the full bandwidth of the output signal - - # If rate >= 1, we need to prevent images in the output, - # so we have to filter it to less than half the channel - # width of 0.5. If rate < 1, we need to filter to less - # than half the output signal's bw to avoid aliasing, so - # the half-band here is 0.5*rate. - percent = 0.80 - if(self._rate < 1): - halfband = 0.5*self._rate - bw = percent*halfband - tb = (percent / 2.0)*halfband - ripple = 0.1 - - # As we drop the bw factor, the optfir filter has a harder time converging; - # using the firdes method here for better results. - self._taps = filter.firdes.low_pass_2(self._size, self._size, bw, tb, atten, - filter.firdes.WIN_BLACKMAN_HARRIS) - else: - halfband = 0.5 - bw = percent*halfband - tb = (percent / 2.0)*halfband - ripple = 0.1 - - made = False - while not made: - try: - self._taps = optfir.low_pass(self._size, self._size, bw, bw+tb, ripple, atten) - made = True - except RuntimeError: - ripple += 0.01 - made = False - print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) - - # Build in an exit strategy; if we've come this far, it ain't working. - if(ripple >= 1.0): - raise RuntimeError("optfir could not generate an appropriate filter.") + self._taps = self.create_taps(self._rate, self._size, atten); self.pfb = filter.pfb_arb_resampler_ccf(self._rate, self._taps, self._size) #print("PFB has %d taps\n" % (len(self._taps),)) @@ -260,6 +231,46 @@ class arb_resampler_ccf(gr.hier_block2): def declare_sample_delay(self, delay): self.pfb.declare_sample_delay(delay) + @staticmethod + def create_taps(rate, flt_size=32, atten=100): + # Create a filter that covers the full bandwidth of the output signal + + # If rate >= 1, we need to prevent images in the output, + # so we have to filter it to less than half the channel + # width of 0.5. If rate < 1, we need to filter to less + # than half the output signal's bw to avoid aliasing, so + # the half-band here is 0.5*rate. + percent = 0.80 + if(rate < 1): + halfband = 0.5*rate + bw = percent*halfband + tb = (percent / 2.0)*halfband + ripple = 0.1 + + # As we drop the bw factor, the optfir filter has a harder time converging; + # using the firdes method here for better results. + return filter.firdes.low_pass_2(flt_size, flt_size, bw, tb, atten, + filter.firdes.WIN_BLACKMAN_HARRIS) + else: + halfband = 0.5 + bw = percent*halfband + tb = (percent / 2.0)*halfband + ripple = 0.1 + taps = None + + while True: + try: + taps = optfir.low_pass(flt_size, flt_size, bw, bw+tb, ripple, atten) + return taps + except RuntimeError: + ripple += 0.01 + print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) + + # Build in an exit strategy; if we've come this far, it ain't working. + if(ripple >= 1.0): + raise RuntimeError("optfir could not generate an appropriate filter.") + + class arb_resampler_fff(gr.hier_block2): ''' Convenience wrapper for the polyphase filterbank arbitrary resampler. @@ -280,43 +291,7 @@ class arb_resampler_fff(gr.hier_block2): if (taps is not None) and (len(taps) > 0): self._taps = taps else: - # Create a filter that covers the full bandwidth of the input signal - - # If rate >= 1, we need to prevent images in the output, - # so we have to filter it to less than half the channel - # width of 0.5. If rate < 1, we need to filter to less - # than half the output signal's bw to avoid aliasing, so - # the half-band here is 0.5*rate. - percent = 0.80 - if(self._rate < 1): - halfband = 0.5*self._rate - bw = percent*halfband - tb = (percent / 2.0)*halfband - ripple = 0.1 - - # As we drop the bw factor, the optfir filter has a harder time converging; - # using the firdes method here for better results. - self._taps = filter.firdes.low_pass_2(self._size, self._size, bw, tb, atten, - filter.firdes.WIN_BLACKMAN_HARRIS) - else: - halfband = 0.5 - bw = percent*halfband - tb = (percent / 2.0)*halfband - ripple = 0.1 - - made = False - while not made: - try: - self._taps = optfir.low_pass(self._size, self._size, bw, bw+tb, ripple, atten) - made = True - except RuntimeError: - ripple += 0.01 - made = False - print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) - - # Build in an exit strategy; if we've come this far, it ain't working. - if(ripple >= 1.0): - raise RuntimeError("optfir could not generate an appropriate filter.") + self._taps = self.create_taps(self._rate, self._size, atten) self.pfb = filter.pfb_arb_resampler_fff(self._rate, self._taps, self._size) #print "PFB has %d taps\n" % (len(self._taps),) @@ -334,6 +309,44 @@ class arb_resampler_fff(gr.hier_block2): def declare_sample_delay(self, delay): self.pfb.declare_sample_delay(delay) + @staticmethod + def create_taps(rate, flt_size=32, atten=100): + # Create a filter that covers the full bandwidth of the input signal + + # If rate >= 1, we need to prevent images in the output, + # so we have to filter it to less than half the channel + # width of 0.5. If rate < 1, we need to filter to less + # than half the output signal's bw to avoid aliasing, so + # the half-band here is 0.5*rate. + percent = 0.80 + if(rate < 1): + halfband = 0.5*rate + bw = percent*halfband + tb = (percent / 2.0)*halfband + ripple = 0.1 + + # As we drop the bw factor, the optfir filter has a harder time converging; + # using the firdes method here for better results. + return filter.firdes.low_pass_2(flt_size, flt_size, bw, tb, atten, + filter.firdes.WIN_BLACKMAN_HARRIS) + else: + halfband = 0.5 + bw = percent*halfband + tb = (percent / 2.0)*halfband + ripple = 0.1 + + while True: + try: + taps = optfir.low_pass(flt_size, flt_size, bw, bw+tb, ripple, atten) + return taps + except RuntimeError: + ripple += 0.01 + print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) + + # Build in an exit strategy; if we've come this far, it ain't working. + if(ripple >= 1.0): + raise RuntimeError("optfir could not generate an appropriate filter.") + class arb_resampler_ccc(gr.hier_block2): ''' Convenience wrapper for the polyphase filterbank arbitrary resampler. @@ -354,24 +367,7 @@ class arb_resampler_ccc(gr.hier_block2): if (taps is not None) and (len(taps) > 0): self._taps = taps else: - # Create a filter that covers the full bandwidth of the input signal - bw = 0.4 - tb = 0.2 - ripple = 0.1 - #self._taps = filter.firdes.low_pass_2(self._size, self._size, bw, tb, atten) - made = False - while not made: - try: - self._taps = optfir.low_pass(self._size, self._size, bw, bw+tb, ripple, atten) - made = True - except RuntimeError: - ripple += 0.01 - made = False - print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) - - # Build in an exit strategy; if we've come this far, it ain't working. - if(ripple >= 1.0): - raise RuntimeError("optfir could not generate an appropriate filter.") + self._taps = self.create_taps(self._rate, self._size, atten) self.pfb = filter.pfb_arb_resampler_ccc(self._rate, self._taps, self._size) #print "PFB has %d taps\n" % (len(self._taps),) @@ -389,6 +385,23 @@ class arb_resampler_ccc(gr.hier_block2): def declare_sample_delay(self, delay): self.pfb.declare_sample_delay(delay) + @staticmethod + def create_taps(rate, flt_size=32, atten=100): + # Create a filter that covers the full bandwidth of the input signal + bw = 0.4 + tb = 0.2 + ripple = 0.1 + while True: + try: + taps = optfir.low_pass(flt_size, flt_size, bw, bw+tb, ripple, atten) + return taps + except RuntimeError: + ripple += 0.01 + print("Warning: set ripple to %.4f dB. If this is a problem, adjust the attenuation or create your own filter taps." % (ripple)) + + # Build in an exit strategy; if we've come this far, it ain't working. + if(ripple >= 1.0): + raise RuntimeError("optfir could not generate an appropriate filter.") class channelizer_hier_ccf(gr.hier_block2): """ @@ -416,7 +429,7 @@ class channelizer_hier_ccf(gr.hier_block2): gr.io_signature(1, 1, gr.sizeof_gr_complex), gr.io_signature(len(outchans), len(outchans), gr.sizeof_gr_complex)) if taps is None: - taps = optfir.low_pass(1, n_chans, bw, bw+tb, ripple, atten) + taps = self.create_taps(n_chans, atten=100, bw=1.0, tb=0.2, ripple=0.1) taps = list(taps) extra_taps = int(math.ceil(1.0*len(taps)/n_chans)*n_chans - len(taps)) taps = taps + [0] * extra_taps @@ -465,3 +478,8 @@ class channelizer_hier_ccf(gr.hier_block2): self.connect(self.fft, self.v2ss) for i in range(0, len(outchans)): self.connect((self.v2ss, i), (self, i)) + + @staticmethod + def create_taps(n_chans, atten=100, bw=1.0, tb=0.2, ripple=0.1): + return optfir.low_pass(1, n_chans, bw, bw+tb, ripple, atten) + diff --git a/gr-filter/python/filter/rational_resampler.py b/gr-filter/python/filter/rational_resampler.py index acf7d0507..17b811cc1 100644 --- a/gr-filter/python/filter/rational_resampler.py +++ b/gr-filter/python/filter/rational_resampler.py @@ -136,3 +136,14 @@ class rational_resampler_ccc(_rational_resampler_base): """ _rational_resampler_base.__init__(self, filter.rational_resampler_base_ccc, interpolation, decimation, taps, fractional_bw) + +class rational_resampler_fcc(_rational_resampler_base): + def __init__(self, interpolation, decimation, taps=None, fractional_bw=None): + """ + Rational resampling polyphase FIR filter with + float input, complex output and complex taps. + """ + _rational_resampler_base.__init__(self, filter.rational_resampler_base_fcc, + interpolation, decimation, taps, fractional_bw) + + |