summaryrefslogtreecommitdiff
path: root/gr-blocks/python/blocks/qa_block_gateway.py
diff options
context:
space:
mode:
authormormj <mormjb@gmail.com>2020-10-30 10:59:50 -0400
committerMarcus Müller <marcus@hostalia.de>2020-10-30 17:52:53 +0100
commit7a0948ba85758fba1cc3858ef99bfa600dcc7416 (patch)
tree610d7f9d773a193562def6df2d4b50f1bb3b3f86 /gr-blocks/python/blocks/qa_block_gateway.py
parent12192ee7d58de95ddca35a3e93bfc172bdb5c820 (diff)
qa: run autopep8 formatting on qa python files
find ./ -iname qa*.py | xargs autopep8 --in-place -a -a mostly formats whitespace and gets rid of trailing semicolons
Diffstat (limited to 'gr-blocks/python/blocks/qa_block_gateway.py')
-rw-r--r--gr-blocks/python/blocks/qa_block_gateway.py131
1 files changed, 74 insertions, 57 deletions
diff --git a/gr-blocks/python/blocks/qa_block_gateway.py b/gr-blocks/python/blocks/qa_block_gateway.py
index f78ea27076..2f8071dfdd 100644
--- a/gr-blocks/python/blocks/qa_block_gateway.py
+++ b/gr-blocks/python/blocks/qa_block_gateway.py
@@ -18,81 +18,89 @@ from gnuradio import gr, gr_unittest, blocks
class non_sync_block(gr.basic_block):
def __init__(self):
gr.basic_block.__init__(self,
- name="non_sync_block",
- in_sig=[numpy.float32],
- out_sig=[numpy.float32, numpy.float32])
+ name="non_sync_block",
+ in_sig=[numpy.float32],
+ out_sig=[numpy.float32, numpy.float32])
+
def general_work(self, input_items, output_items):
self.consume(0, len(input_items[0]))
- self.produce(0,2)
- self.produce(1,1)
+ self.produce(0, 2)
+ self.produce(1, 1)
return gr.WORK_CALLED_PRODUCE
+
class add_2_f32_1_f32(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
- name = "add 2 f32",
- in_sig = [numpy.float32, numpy.float32],
- out_sig = [numpy.float32],
+ name="add 2 f32",
+ in_sig=[numpy.float32, numpy.float32],
+ out_sig=[numpy.float32],
)
def work(self, input_items, output_items):
output_items[0][:] = input_items[0] + input_items[1]
return len(output_items[0])
+
class add_2_fc32_1_fc32(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
- name = "add 2 fc32",
- in_sig = [numpy.complex64, numpy.complex64],
- out_sig = [numpy.complex64],
+ name="add 2 fc32",
+ in_sig=[numpy.complex64, numpy.complex64],
+ out_sig=[numpy.complex64],
)
def work(self, input_items, output_items):
output_items[0][:] = input_items[0] + input_items[1]
return len(output_items[0])
+
class convolve(gr.sync_block):
"""
A demonstration using block history to properly perform a convolution.
"""
+
def __init__(self):
gr.sync_block.__init__(
self,
- name = "convolve",
- in_sig = [numpy.float32],
- out_sig = [numpy.float32]
+ name="convolve",
+ in_sig=[numpy.float32],
+ out_sig=[numpy.float32]
)
self._taps = [1, 0, 0, 0]
self.set_history(len(self._taps))
def work(self, input_items, output_items):
- output_items[0][:] = numpy.convolve(input_items[0], self._taps, mode='valid')
+ output_items[0][:] = numpy.convolve(
+ input_items[0], self._taps, mode='valid')
return len(output_items[0])
+
class decim2x(gr.decim_block):
def __init__(self):
gr.decim_block.__init__(
self,
- name = "decim2x",
- in_sig = [numpy.float32],
- out_sig = [numpy.float32],
- decim = 2
+ name="decim2x",
+ in_sig=[numpy.float32],
+ out_sig=[numpy.float32],
+ decim=2
)
def work(self, input_items, output_items):
output_items[0][:] = input_items[0][::2]
return len(output_items[0])
+
class interp2x(gr.interp_block):
def __init__(self):
gr.interp_block.__init__(
self,
- name = "interp2x",
- in_sig = [numpy.float32],
- out_sig = [numpy.float32],
- interp = 2
+ name="interp2x",
+ in_sig=[numpy.float32],
+ out_sig=[numpy.float32],
+ interp=2
)
def work(self, input_items, output_items):
@@ -100,21 +108,22 @@ class interp2x(gr.interp_block):
output_items[0][::2] = input_items[0]
return len(output_items[0])
+
class tag_source(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
- name = "tag source",
- in_sig = None,
- out_sig = [numpy.float32],
+ name="tag source",
+ in_sig=None,
+ out_sig=[numpy.float32],
)
def work(self, input_items, output_items):
num_output_items = len(output_items[0])
- #put code here to fill the output items...
+ # put code here to fill the output items...
- #make a new tag on the middle element every time work is called
+ # make a new tag on the middle element every time work is called
count = self.nitems_written(0) + num_output_items // 2
key = pmt.string_to_symbol("example_key")
value = pmt.string_to_symbol("example_value")
@@ -122,37 +131,39 @@ class tag_source(gr.sync_block):
return num_output_items
+
class tag_sink(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
- name = "tag sink",
- in_sig = [numpy.float32],
- out_sig = None,
+ name="tag sink",
+ in_sig=[numpy.float32],
+ out_sig=None,
)
self.key = None
def work(self, input_items, output_items):
num_input_items = len(input_items[0])
- #put code here to process the input items...
+ # put code here to process the input items...
- #print all the tags received in this work call
+ # print all the tags received in this work call
nread = self.nitems_read(0)
- tags = self.get_tags_in_range(0, nread, nread+num_input_items)
+ tags = self.get_tags_in_range(0, nread, nread + num_input_items)
for tag in tags:
- #print tag.offset
- #print pmt.symbol_to_string(tag.key)
- #print pmt.symbol_to_string(tag.value)
+ # print tag.offset
+ # print pmt.symbol_to_string(tag.key)
+ # print pmt.symbol_to_string(tag.value)
self.key = pmt.symbol_to_string(tag.key)
return num_input_items
+
class tag_sink_win(gr.sync_block):
def __init__(self):
- gr.sync_block.__init__(self, name = "tag sink",
- in_sig = [numpy.float32],
- out_sig = None)
+ gr.sync_block.__init__(self, name="tag sink",
+ in_sig=[numpy.float32],
+ out_sig=None)
self.key = None
def work(self, input_items, output_items):
@@ -162,28 +173,30 @@ class tag_sink_win(gr.sync_block):
self.key = pmt.symbol_to_string(tag.key)
return num_input_items
+
class fc32_to_f32_2(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
- name = "fc32_to_f32_2",
- in_sig = [numpy.complex64],
- out_sig = [(numpy.float32, 2)],
+ name="fc32_to_f32_2",
+ in_sig=[numpy.complex64],
+ out_sig=[(numpy.float32, 2)],
)
def work(self, input_items, output_items):
- output_items[0][::,0] = numpy.real(input_items[0])
- output_items[0][::,1] = numpy.imag(input_items[0])
+ output_items[0][::, 0] = numpy.real(input_items[0])
+ output_items[0][::, 1] = numpy.imag(input_items[0])
return len(output_items[0])
+
class vector_to_stream(gr.interp_block):
def __init__(self, itemsize, nitems_per_block):
gr.interp_block.__init__(
self,
- name = "vector_to_stream",
- in_sig = [(itemsize, nitems_per_block)],
- out_sig = [itemsize],
- interp = nitems_per_block
+ name="vector_to_stream",
+ in_sig=[(itemsize, nitems_per_block)],
+ out_sig=[itemsize],
+ interp=nitems_per_block
)
self.block_size = nitems_per_block
@@ -196,6 +209,7 @@ class vector_to_stream(gr.interp_block):
return len(output_items[0])
+
class test_block_gateway(gr_unittest.TestCase):
def test_add_f32(self):
@@ -253,7 +267,8 @@ class test_block_gateway(gr_unittest.TestCase):
def test_tags(self):
src = tag_source()
sink = tag_sink()
- head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through
+ # should be enough items to get a tag through
+ head = blocks.head(gr.sizeof_float, 50000)
tb = gr.top_block()
tb.connect(src, head, sink)
tb.run()
@@ -262,7 +277,8 @@ class test_block_gateway(gr_unittest.TestCase):
def test_tags_win(self):
src = tag_source()
sink = tag_sink_win()
- head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through
+ # should be enough items to get a tag through
+ head = blocks.head(gr.sizeof_float, 50000)
tb = gr.top_block()
tb.connect(src, head, sink)
tb.run()
@@ -270,7 +286,8 @@ class test_block_gateway(gr_unittest.TestCase):
def test_fc32_to_f32_2(self):
tb = gr.top_block()
- src = blocks.vector_source_c([1+2j, 3+4j, 5+6j, 7+8j, 9+10j], False)
+ src = blocks.vector_source_c(
+ [1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j], False)
convert = fc32_to_f32_2()
v2s = vector_to_stream(numpy.float32, 2)
sink = blocks.vector_sink_f()
@@ -279,15 +296,15 @@ class test_block_gateway(gr_unittest.TestCase):
self.assertEqual(sink.data(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
def test_non_sync_block(self):
- tb = gr.top_block ()
+ tb = gr.top_block()
src = blocks.vector_source_f(range(1000000))
sinks = [blocks.vector_sink_f(), blocks.vector_sink_f()]
dut = non_sync_block()
tb.connect(src, dut)
- tb.connect((dut,0), sinks[0])
- tb.connect((dut,1), sinks[1])
- tb.run ()
- self.assertEqual(len(sinks[0].data()), 2*len(sinks[1].data()))
+ tb.connect((dut, 0), sinks[0])
+ tb.connect((dut, 1), sinks[1])
+ tb.run()
+ self.assertEqual(len(sinks[0].data()), 2 * len(sinks[1].data()))
if __name__ == '__main__':