GNU Radio Manual and C++ API Reference  3.7.13.4
The Free & Open Software Radio Ecosystem
Python Blocks

How to create blocks in Python

Streaming Data Blocks

We create blocks in Python very much like we would in C++, just with more Python. Figure out which type of block you want to create:

The block class inherits from one of these base classes, and then in defining the parent class, we set the I/O signature. However, unlike in C++ where we use the gr::io_signature class, here we can just create a Python list of the I/O data sizes using numpy data types:

  • numpy.int8
  • numpy.int16
  • numpy.int32
  • numpy.float32
  • numpy.float64

Like a normal C++ version of the block, we then create and initialize any variables in the constructor, define any setters and getters, and create the work function. The prototype for the work function is quite simple:

def work(self, input_items, output_items)

The input_items and output_items are lists of lists. The input_items contains a vector of input samples for every input stream, and the output_items is a vector for each output stream where we can place items. Then length of output_items[0] is equivalent to the noutput_items concept we are so familiar with from the C++ blocks.

Following is an example Python block that adds two input streams together. This block is used in the qa_block_gateway.py test code.

class add_2_f32_1_f32(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
name = "add 2 f32",
in_sig = [numpy.float32, numpy.float32],
out_sig = [numpy.float32],
)
def work(self, input_items, output_items):
output_items[0][:] = input_items[0] + input_items[1]
return len(output_items[0])

The block defines two input floating point streams by setting in_sig to "[numpy.float32, numpy.float32]" and a single output float stream in out_sig of "[numpy.float32]."

The work function then just adds the two input streams together. The streams are input_items[0] and input_items[1]. The block still returns the concept of noutput_items like we use in C++, only we get it here by getting len(output_items[0]). Because this is a sync_block, we also know that the size of the input_items for both streams is the same as the size of the output_items vector.

Using Stream Tags

Python blocks have access to the stream tag system like their C++ counterparts. The interface is almost identical except they behave just a bit more like we would expect in Python.

To add tags to the data stream, we use the add_item_tag function:

def work(self, input_items, output_items):
....
add_item_tag(which_output, abs_offset,
key, value, srcid)
....

The abs_offset is an integer of the sample that the tag is attached to, and key and value are both PMTs to set the key:value pair of the tag information, and the srcid is an optional PMT to define the source of the block that generate the tag.

We then can get tags using either the get_tags_in_range or get_tags_in_window. Again, like their C++ counter parts, the get_tags_in_range uses the absolute item offset numbering (using nitems_read) while the get_tags_in_window uses relative offsets within the current window of items available to the work function. The main difference from the C++ function is that instead of having the first argument be a vector where the tags are stored, the Python version just returns a list of tags. We would use it like this:

def work(self, input_items, output_items):
....
tags = get_tags_in_window(which_input, rel_start, rel_end)
....

Using Message Passing

Again, like their C++ counterparts, Python blocks can use the asynchronous message passing interface. We define output message handlers using:

self.message_port_register_out(pmt.intern("<port name>"))

We can then post messages to this using the message_port_pub function:

self.message_port_pub(pmt.intern("<port name>"), <pmt message>)

We then register input messages and handlers in similar ways:

self.message_port_register_in(pmt.intern("<port name>"))
self.set_msg_handler(pmt.intern("<port name>"), <msg handler function>)

Putting this together below is a very simple example:

class msg_block(gr.basic_block):
def __init__(self):
gr.basic_block.__init__(
self,
name="msg_block",
in_sig=None,
out_sig=None)
self.message_port_register_out(pmt.intern('msg_out'))
self.message_port_register_in(pmt.intern('msg_in'))
self.set_msg_handler(pmt.intern('msg_in'), self.handle_msg)
def handle_msg(self, msg):
self.message_port_pub(pmt.intern('msg_out'),
pmt.intern('message received!'))