GNU Radio 3.6.5 C++ API
|
GNU Radio was originally a streaming system with no other mechanism to pass data between blocks. Streams of data are a model that work well for samples, bits, etc., but are not really the right mechanism for control data, metadata, and, often, packet structures (at least at some point in the processing chain).
We solved part of this problem a few years ago by introducing the tag stream. This is a parallel stream to the data streaming. The difference is that tags are designed to hold metadata and control information. Tags are specifically associated with a particular sample in the data stream and flow downstream alongside the data. This model allows other blocks to identify that an event or action has occurred or should occur on a particular item. The major limitation is that the tag stream is really only accessible inside a work function and only flows in one direction. Its benefit is that it is isosynchronous with the data.
We want a more general message passing system for a couple of reasons. The first is to allow blocks downstream to communicate back to blocks upstream. The second is to allow an easier way for us to communicate back and forth between external applications and GNU Radio. The new message passing interface handles these cases, although it does so on an asynchronous basis.
The message passing interface heavily relies on Polymorphic Types (PMTs) in GNU Radio. For further information about these data structures, see the page Polymorphic Types.
The message passing interface is designed into the gr_basic_block, which is the parent class for all blocks in GNU Radio. Each block has a set of message queues to hold incoming messages and can post messages to the message queues of other blocks. The blocks also distinguish between input and output ports.
A block has to declare its input and output message ports in its constructor. The message ports are described by a name, which is in practice a PMT symbol (i.e., an interned string). The API calls to register a new port are:
void message_port_register_in(pmt::pmt_t port_id) void message_port_register_out(pmt::pmt_t port_id)
The ports are now identifiable by that port name. Other blocks who may want to post or receive messages on a port must subscribe to it. When a block has a message to send, they are published on a particular port. The subscribe and publish API looks like:
void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
Any block that has a subscription to another block's output message port will receive the message when it is published. Internally, when a block publishes a message, it simply iterates through all blocks that have subscribed and uses the gr_basic_block::_post method to send the message to that block's message queue.
From the flowgraph level, we have instrumented a gr_hier_block2::msg_connect method to make it easy to subscribe blocks to other blocks' messages. The message connection method looks like the following code. Assume that the block src has an output message port named pdus and the block dbg has an input port named print.
self.tb.msg_connect(src, "pdus", dbg, "print")
All messages published by the src block on port pdus will be received by dbg on port print. Note here how we are just using strings to define the ports, not PMT symbols. This is a convenience to the user to be able to more easily type in the port names (for reference, you can create a PMT symbol in Python using the pmt::pmt_intern function as pmt.pmt_intern("string")).
Users can also query blocks for the names of their input and output ports using the following API calls:
pmt::pmt_t message_ports_in(); pmt::pmt_t message_ports_out();
The return value for these are a PMT vector filled with PMT symbols, so PMT operators must be used to manipulate them.
Each block has internal methods to handle posting and receiving of messages. The gr_basic_block::_post method takes in a message and places it into its queue. The publishing model uses the gr_basic_block::_post method of the blocks as the way to access the message queue. So the message queue of the right name will have a new message. Posting messages also has the benefit of waking up the block's thread if it is in a wait state. So if idle, as soon as a message is posted, it will wake up and and call the message handler.
The other side of the action in a block is in the message handler. When a block has an input message port, it needs a callback function to handle messages received on that port. We use a Boost bind operator to bind the message port to the message handling function. When a new message is pushed onto a port's message queue, it is this function that is used to process the message.
The following is snippets of code from blocks current in GNU Radio that take advantage of message passing. We will be using gr_message_debug and gr_tagged_stream_to_pdu below to show setting up both input and output message passing capabilities.
The gr_message_debug block is used for debugging the message passing system. It describes two input message ports: print and store. The print port simply prints out all messages to standard out while the store port keeps a list of all messages posted to it. This latter port works in conjunction with a gr_message_debug::get_message(int i) call that allows us to retrieve message i
afterward.
The constructor of this block looks like this:
{ message_port_register_in(pmt::mp("print")); set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1)); message_port_register_in(pmt::mp("store")); set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1)); }
So the two ports are registered by their respective names. We then use the gr_basic_block::set_msg_handler function to identify this particular port name with a callback function. The Boost bind function (Boost::bind) here binds the callback to a function of this block's class. So now the block's gr_message_debug::print and gr_message_debug::store functions are assigned to handle messages passed to them. Below is the print function for reference.
void gr_message_debug::print(pmt::pmt_t msg) { std::cout << "***** MESSAGE DEBUG PRINT ********\n"; pmt::pmt_print(msg); std::cout << "**********************************\n"; }
The function simply takes in the PMT message and prints it. The method pmt::pmt_print is a function in the PMT library to print the PMT in a friendly, (mostly) pretty manner.
The gr_tagged_stream_to_pdu block only defines a single output message port. In this case, its constructor looks like:
{ message_port_register_out(pdu_port_id); }
So we are only creating a single output port where pdu_port_id is defined in the file gr_pdu.h as pdus.
This blocks purpose is to take in a stream of samples along with stream tags and construct a predefined PDU message from this. In GNU Radio, we define a PDU as a PMT pair of (metadata, data). The metadata describes the samples found in the data portion of the pair. Specifically, the metadata can contain the length of the data segment and any other information (sample rate, etc.). The PMT vectors know their own length, so the length value is not actually necessary unless useful for purposes down the line. The metadata is a PMT dictionary while the data segment is a PMT uniform vector of either bytes, floats, or complex values.
In the end, when a PDU message is ready, the block calls its gr_tagged_stream_to_pdu::send_message function that is shown below.
void gr_tagged_stream_to_pdu::send_meassage() { if(pmt::pmt_length(d_pdu_vector) != d_pdu_length) { throw std::runtime_error("msg length not correct"); } pmt::pmt_t msg = pmt::pmt_cons(d_pdu_meta, d_pdu_vector); message_port_pub(pdu_port_id, msg); d_pdu_meta = pmt::PMT_NIL; d_pdu_vector = pmt::PMT_NIL; d_pdu_length = 0; d_pdu_remain = 0; d_inpdu = false; }
This function does a bit of checking to make sure the PDU is ok as well as some cleanup in the end. But it is the line where the message is published that is important to this discussion. Here, the block posts the PDU message to any subscribers by calling gr_basic_block::message_port_pub publishing method.
There is similarly a gr_pdu_to_tagged_stream block that essentially does the opposite. It acts as a source to a flowgraph and waits for PDU messages to be posted to it on its input port pdus. It extracts the metadata and data and processes them. The metadata dictionary is split up into key:value pairs and stream tags are created out of them. The data is then converted into an output stream of items and passed along. The next section describes how PDUs can be passed into a flowgraph using the gr_pdu_to_tagged_stream block.
The last feature of the message passing architecture to discuss here is how it can be used to take in messages from an external source. We can call a block's gr_basic_block::_post method directly and pass it a message. So any block with an input message port can receive messages from the outside in this way.
The following example uses a gr_pdu_to_tagged_stream block as the source block to a flowgraph. Its purpose is to wait for messages as PDUs posted to it and convert them to a normal stream. The payload will be sent on as a normal stream while the meta data will be decoded into tags and sent on the tagged stream.
So if we have created a src block as a PDU to stream, it has a pdus input port, which is how we will inject PDU messages to the flowgraph. These PDUs could come from another block or flowgraph, but here, we will create and insert them by hand.
port = pmt.pmt_intern("pdus")
msg = pmt.pmt_cons(pmt.PMT_NIL,
pmt.pmt_make_u8vector(16, 0xFF))
src.to_basic_block()._post(port, msg)
The PDU's metadata section is empty, hence the pmt::PMT_NIL object. The payload is now just a simple vector of 16 bytes of all 1's. To post the message, we have to access the block's gr_basic_block class, which we do using the gr_basic_block::to_basic_block method and then call the gr_basic_block::_post method to pass the PDU to the right port.
All of these mechanisms are explored and tested in the QA code of the file qa_pdu.py.
There are some examples of using the message passing infrastructure through GRC in gnuradio-core/src/examples/msg_passing.