« Previous -
Version 2/10
(diff) -
Next » -
Current version
Josh Blum, 11/27/2011 11:47 pm
Blocks Coding Guide¶
Disclaimer: Any documentation related to python or message passing is experimental work that is not part of mainline gnuradio. The supporting code can be found here: http://gnuradio.org/cgit/jblum.git/log/?h=next
Terminology¶
| Block | A functional processing unit with inputs and outputs |
| Port | A single input or output of a block |
| Source | A producer of data |
| Sink | A consumer of data |
| Connection | A flow of data from output port to input port |
| Flow graph | A collection of blocks and connections |
| Item | A unit of data. Ex: baseband sample, fft vector, matrix... |
| Stream | A continuous flow of consecutive items |
| IO signature | A description of a blocks input and output ports |
The work function¶
To implement processing, the user must write a "work" routine that reads inputs, processes, and writes outputs.
An example work function implementing an adder in c++
int work(
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items
){
//cast buffers
const float* in0 = reinterpret_cast<const float *>(input_items[0]);
const float* in1 = reinterpret_cast<const float *>(input_items[1]);
float* out = reinterpret_cast<float *>(output_items[0]);
//process data
for (size_t i = 0; i < noutput_items; i++){
out[i] = in0[i] + in1[i];
}
//return produced
return noutput_items;
}
Parameter definitions:
- noutput_items: total number of items in each output buffer
- input_items: vector of input buffers, where each element corresponds to an input port
- output_items: vector of output buffers, where each element corresponds to an output port
- Each buffer must be cast from a void* pointer into a usable data type.
- The number of items in each input buffer is implied by noutput_items
- More information on this in later sections
- The number of items produced is returned, this can be less than noutput_items
An example work function implementing an adder in python
def work(self, input_items, output_items):
#buffer references
in0 = input_items[0]
in1 = input_items[1]
out = output_items[0]
#process data
out[:] = in0 + in1
#return produced
return len(out)
Parameter definitions:
- input_items: vector of numpy arrays, where each element corresponds to an input port
- output_items: vector of numpy arrays, where each element corresponds to an output port
- Casting is not necessary thanks to python's duck typing system
- The additon of numpy arrays implies an element by element addition
- The [:] operator is needed to perform an assignment into out
- The length of each array of items can be retried with len(...)
IO signatures¶
When creating a block, the user must communicate the following to the block:
- The number of input ports
- The number of output ports
- The item size of each port
An IO signature describes the number of ports a block may have and the size of each item in bytes. Each block has 2 IO signatures: an input signature, and an output signature.
Some example signatures in c++
-- A block with 2 inputs and 1 output --
gr_sync_block("my adder", gr_make_io_signature(2, 2, sizeof(float)), gr_make_io_signature(1, 1, sizeof(float)))
-- A block with no inputs and 1 output --
gr_sync_block("my source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, sizeof(float)))
-- A block with 2 inputs (float and double) and 1 output --
std::vector<int> input_sizes;
input_sizes.push_back(sizeof(float));
input_sizes.push_back(sizeof(double));
gr_sync_block("my block", gr_make_io_signaturev(2, 2, input_sizes), gr_make_io_signature(1, 1, sizeof(float)))
Some observations:
- Use gr_make_io_signature for blocks where all ports are homogenous in size
- Use gr_make_io_signaturev for blocks that have heterogeneous port sizes
- The first two parameters are min and max number of ports, this allows blocks to have a selectable number of ports at runtime
Some example signatures in python
-- A block with 2 inputs and 1 output -- gr.sync_block.__init__(self, name="my adder", in_sig=[numpy.float32, numpy.float32], out_sig=[numpy.float32]) -- A block with no inputs and 1 output -- gr.sync_block.__init__(self, name="my source", in_sig=None, out_sig=[numpy.float32]) -- A block with 2 inputs (float and double) and 1 output -- gr.sync_block.__init__(self, name="my block", in_sig=[numpy.float32, numpy.float64], out_sig=[numpy.float32])Some observations:
- An IO signature is a list of numpy dtypes
- dtypes describe both the size and type of the item
- None may be used when the signature as zero ports
- The number of ports is fixed at construction time
Block types¶
To take advantage of the gnuradio framework, users will create various blocks to implement the desired data processing. There are several types of blocks to choose from:
Synchronous Block¶
The sync block allows users to write blocks that consume and produce an equal number of items per port. A sync block may have any number of inputs or outputs. When a sync block has zero inputs, its called a source. When a sync block has zero outputs, its called a sink.
An example sync block in c++
#include <gr_sync_block.h>
class my_sync_block : public gr_sync_block{
public:
my_sync_block(...):
gr_sync_block("my block", gr_make_io_signature(1, 1, sizeof(int32_t)), gr_make_io_signature(1, 1, sizeof(int32_t)))
{
//constructor stuff
}
int work(
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items
){
//work stuff...
return noutput_items;
}
};
Some observations:
- noutput_items is the length in items of all input and output buffers
- an input siganture of gr_make_io_signature(0, 0, 0) makes this a source block
- an output siganture of gr_make_io_signature(0, 0, 0) makes this a sink block
An example sync block in python
from gnuradio import gr
class my_sync_block(gr.sync_block):
def __init__(self, args):
gr.sync_block.__init__(self, name="my block", in_sig=[numpy.int32], out_sig=[numpy.int32])
def work(self, input_items, output_items):
#work stuff...
return len(output_items[0])
Some observations:
- The length of all input vector and all output vectors is identical
- in_sig=None would turn this into a source block
- out_sig=None would turn this into a sink block
- in this case, return len(input_items [0]) since output_items is empty!
Decimation Block¶
The decimation block is another type of fixed rate block where the number of input items is a fixed multiple of the number of output items.
An example decimation block in c++
#include <gr_sync_decimator.h>
class my_decim_block : public gr_sync_decimator{
public:
my_decim_block(...):
gr_sync_decimator("my decim block", in_sig, out_sig, decimation)
{
//constructor stuff
}
//work function here...
};
Some observations:
- The gr_sync_decimator constructor takes a 4th parameter, the decimation factor
- The user should assume that the number of input items = noutput_items*decimation
An example decimation block in python
from gnuradio import gr
class my_decim_block(gr.decim_block):
def __init__(self, args):
gr.decim_block.__init__(self, name="my block", in_sig=[...], out_sig=[...], decim=decimation)
#work function here...
Some observations:
- The decim_block constructor has a new kwarg, decim, which takes the decimation factor
- The following will be true len(input_items[i]) = len(output_items[j])*decimation
Interpolation Block¶
The interpolation block is another type of fixed rate block where the number of output items is a fixed multiple of the number of input items.
An example interpolation block in c++
#include <gr_sync_interpolator.h>
class my_interp_block : public gr_sync_interpolator{
public:
my_interp_block(...):
gr_sync_interpolator("my interp block", in_sig, out_sig, interpolation)
{
//constructor stuff
}
//work function here...
};
Some observations:
- The gr_sync_interpolator constructor takes a 4th parameter, the interpolation factor
- The user should assume that the number of input items = noutput_items/interpolation
An example interpolation block in python
from gnuradio import gr
class my_interp_block(gr.interp_block):
def __init__(self, args):
gr.interp_block.__init__(self, name="my block", in_sig=[...], out_sig=[...], interp=interpolation)
#work function here...
Some observations:
- The interp_block constructor has a new kwarg, interp, which takes the interpolation factor
- The following will be true len(input_items[i]) = len(output_items[j])/interpolation
Hierarchical Block¶
TODO
Top Block¶
TODO
Basic Block¶
The basic block provides no relation between the number of input items and the number of output items. All other blocks are just simplifications of the basic block. Users should choose to inherit from basic block when the other blocks are not suitable.
The adder revisited as a basic block in c++
#include <gr_block.h>
class my_basic_block : public gr_block{
public:
my_basic_adder_block(...):
gr_block("another adder block", in_sig, out_sig)
{
//constructor stuff
}
int general_work(
int noutput_items,
gr_vector_int &ninput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items
){
//cast buffers
const float* in0 = reinterpret_cast<const float *>(input_items[0]);
const float* in1 = reinterpret_cast<const float *>(input_items[1]);
float* out = reinterpret_cast<float *>(output_items[0]);
//process data
for (size_t i = 0; i < noutput_items; i++){
out[i] = in0[i] + in1[i];
}
//consume the inputs
this->consume(0, noutput_items); //consume port 0 input
this->consume(1, noutput_items); //consume port 1 input
//this->consume_each(noutput_items); //or shortcut to consume on all inputs
//return produced
return noutput_items;
}
};
Some observations:
- This class overloads the general_work() method, not work()
- The general work has a parameter: ninput_items
- ninput_items is a vector describing the length of each input buffer
- Before return, general_work must manually consume the used inputs
- The number of items in the input buffers is assumed to be noutput_items
- Users may alter this behaviour by overloading the forecast() method
The adder revisited as a basic block in python
class my_basic_adder_block(gr.basic_block):
def __init__(self, args):
gr.basic_block.__init__(self, name="another_adder_block", in_sig=[...], out_sig=[...])
def general_work(self, input_items, output_items):
#buffer references
in0 = input_items[0]
in1 = input_items[1]
out = output_items[0]
#process data
out[:] = in0 + in1
//consume the inputs
self.consume(0, len(out)) //consume port 0 input
self.consume(1, len(out)) //consume port 1 input
#self.consume_each(len(out)) //or shortcut to consume on all inputs
#return produced
return len(out)
Some observations:
- Same basic assumptions about length and use of consume as c++ example
- No ninput_items parameter, the lengths are part of input_items
- No noutput_items parameter, the lengths are part of output_items
Stream Tags¶
A tag decorates a stream with metadata. A tag is associated with a particular item in a stream. An item may have more than one tag associated with it. The association of an item and tag is made through an absolute count. Every item in a stream has an absolute count. Tags use this count to identify which item in a stream to which they are associated.
A tag has the following members:- offset: the unique item count
- key: a PMT key unique to the type of contents
- value: a PMT holding the contents of this tag
- srcid: a PMT id unique to the producer of the tag (optional)
A PMT is a special data type in gnuradio to serialize arbitrary data. To learn more about PMTs see gruel/pmt.h
Reading stream tags¶
Tags can be read from the work function using get_tags_in_range. Each input port/stream can have associated tags.
Example reading tags in c++
int work(
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items
){
std::vector<gr_tag_t> tags;
const uint64_t nread = this->nitems_read(0); //number of items read on port 0
const size_t ninput_items = noutput_items; //assumption for sync block, this can change
//read all tags associated with port 0 for items in this work function
this->get_tags_in_range(tags, 0, nread, nread+ninput_items);
//work stuff here...
}
Example reading tags in python
def work(self, input_items, output_items):
nread = self.nitems_read(0) #number of items read on port 0
ninput_items = len(input_items[0])
#read all tags associated with port 0 for items in this work function
tags = self.get_tags_in_range(0, nread, nread+ninput_items)
#work stuff here...
Writing stream tags¶
Tags can be written from the work function using add_item_tag. Each output port/stream can have associated tags.
Example writing tags in c++
int work(
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items
){
const size_t item_index = ? //which output item gets the tag?
const uint64_t offset = this->nitems_written(0) + item_index;
pmt::pmt_t key = pmt::pmt_string_to_symbol("example_key");
pmt::pmt_t value = pmt::pmt_string_to_symbol("example_value");
//write at tag to output port 0 with given absolute item offset
this->add_item_tag(0, offset, key, value);
//work stuff here...
}
Example writing tags in python
def work(self, input_items, output_items):
item_index = ? #which output item gets the tag?
offset = self.nitems_written(0) + item_index
key = pmt.pmt_string_to_symbol("example_key")
value = pmt.pmt_string_to_symbol("example_value")
#write at tag to output port 0 with given absolute item offset
self.add_item_tag(0, offset, key, value)
#work stuff here...
}
Messages¶
Messages are a lot like tags. They have a key, value, and optional srcid. The only difference is that messages are not associated with a stream or item. You can think of messages as standalone tags. Blocks can accept incoming messages and post messages to downstream blocks.
Reading messages¶
Messages can be read from the work function with pop_msg_queue().
Example reading messages in c++
int work(
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items
){
const gr_tag_t msg = this->pop_msg_queue();
//work stuff here...
//perhaps the message determines what we produce...
}
Some observations:
- Every block has exactly 1 message queue
- Messages from all upstream producers will go into this queue
- The pop_msg_queue() call will block forever until a message is produced
- The user may call check_msg_queue() to check if a message is available
- The pop_msg_queue() call will interrupt if the flow graph is stopped
Example reading messages in python
def work(self, input_items, output_items):
try: msg = self.pop_msg_queue()
except: return -1
#work stuff here...
#perhaps the message determines what we produce...
Some observations:
- Same basic principals as the c++ example
- The try/except around pop is to safely handle a thread interruption
- See section of starting/stopping flow graphs for more
Writing messages¶
Messages can be written from the work function with post_msg().
Example writing messages in c++
int work(
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items
){
//perhaps the input determines what messages we produce...
pmt::pmt_t key = pmt::pmt_string_to_symbol("example_key");
pmt::pmt_t value = pmt::pmt_string_to_symbol("example_value");
this->post_msg("a message group", key, value);
//work stuff here...
}
Some observations:
- The destination of the message is determined by the group name
- A group name can be any string
- More on groups in the following section
- The post_msg() call returns immediately (non-blocking)
Example writing messages in python
def work(self, input_items, output_items):
#perhaps the input determines what messages we produce...
key = pmt.pmt_string_to_symbol("example_key")
value = pmt.pmt_string_to_symbol("example_value")
self.post_msg("a message group", key, value)
#work stuff here...
Connecting message ports¶
For message passing to work, the user must connect upstream message providers with downstream message subscribers. This is done throught the msg_connect() call.
Connecting message ports in c++
msg_src_block::sptr my_msg_src_block = msg_src_block::make();
msg_sink_block::sptr my_msg_sink_block = msg_sink_block::make();
gr_top_block_sptr tb = gr_make_top_block("some message flow graph");
tb->msg_connect(my_msg_src_block, "a message group", my_msg_sink_block);
tb->start(); //the flow graph is now running...
Some observations:
- The second parameter of msg_connect() is a group name
- The message source must post messages to this group in order for the message sink to receive the message
Connecting message ports in python
my_msg_src_block = msg_src_block()
my_msg_sink_block = msg_sink_block()
tb = gr.top_block("some message flow graph")
tb.msg_connect(my_msg_src_block, "a message group", my_msg_sink_block)
tb.start() #the flow graph is now running...
Robust blocks¶
This is the part of the guide where we give tips and tricks for making blocks that work robustly with the scheduler.
Blocking calls¶
If a work function contains a blocking call, it must be written in such a way that it can be interrupted by boost threads. When the flow graph is stopped, all worker threads will be interrupted. Thread interruption occurs when the user calls unlock() or stop() on the flow graph. Therefore, it is only acceptable to block indefinitely on a boost thread call such a sleep or condition variable, or something that uses these boost thread calls internally such as pop_msg_queue(). If you need to block on a resource such as a file descriptor or socket, the work routine should always call into the blocking routine with a timeout. When the operation times out, the work routine should call a boost thread interruption point or check boost thread interrupted and exit it true.
Saving state¶
Because work functions can be interrupted, the block's state variables may be indeterminate next time the flow graph is run. To make blocks robust against indeterminate state, users should overload the blocks start() and stop() functions. The start() routine is called when the flow graph is started before the work() thread is spawned. The stop() routine is called when the flow graph is stopped after the work thread has been joined and exited. Users should ensure that the state variables of the block are initialized property in the start() routine.