GNU Radio 3.7.1 C++ API
Stream Tags

Introduction

GNU Radio was originally a streaming system with no other mechanism to pass data between blocks. Streams of data are a model that work well for samples, bits, etc., but can lack for control and meta data.

Part of this is solved using the message passing interface, which allows blocks to subscribe to messages published by any other block in the flowgraph (see Message Passing). The main drawback to the message passing system is that is works asynchronously, meaning that there is no guarantee when a message may arrive relative to the data stream.

Stream tags are an isosynchronous data stream that runs parallel to the main data stream. A stream tag is generated by a block's work function and from there on flows downstream with a particular sample until it reaches a sink or is forced to stop propagating by another block.

Stream tags are defined for a specific item in the data stream and are formed as a key:value pair. The key identifies what the value is while the value holds the data that the tag contains. Both key and value are PMTs (Polymorphic Types) where the key is a PMT symbol while the value any type of PMT and can therefore handle any data we wish to pass. A fourth part of the tag is the srcid, which is a PMT symbol and is used to identify the block that created the tag (which is usually the block's alias()).

API Extensions to the gr::block

To enable the stream tags, we have extended the API of gr::block to understand absolute item numbers. In the data stream model, each block's work function is given a buffer in the data stream that is referenced from 0 to N-1. This is a relative offset into the data stream. The absolute reference starts from the beginning of the flowgraph and continues to count up with ever item. Each input stream is associated with a concept of the 'number of items read' and each output stream has a 'number of items written.' These are programmed using the two API calls:

unsigned long int nitems_read(unsigned int which_input);
unsigned long int nitems_written(unsigned int which_output);

Each tag is associated with some item in this absolute time scale that is calculated using these functions.

Like the rest of the data stream, the number of items read/written are only updated once during the call to work. So in a work function, nitems_read/written will refer to the state of the data stream at the start of the work function. We must therefore add to this value the current relative offset in the data stream. So if we are iterating i over all output items, we would write the stream tag to output ports at nitems_written(0)+i for the 0th output port.

Stream Tags API

The stream tags API consists of four functions, two to add and two to get the stream tags. These functions are:

  • add_item_tag: Adds an item tag to a particular output port using a gr::tag_t data type.
  • add_item_tag: Adds an item tag to a particular output port where each value of the tag is explicitly given.
  • get_tags_in_range: Gets all tags from a particular input port between a certain range of items (in absolute item time).
  • get_tags_in_range: Gets any tag that has a specified key from a particular input port between a certain range of items (in absolute item time).

Adding a Tag to a Stream

The two function calls to add items tags are defined here. We add a tag to a particular output stream of the block. We can output them to multiple output streams if we want, but to do so means calling one of these functions once for each port.

Again, a tag is defined as:

  • offset: The offset, in absolute item time, of the tag in the data stream.
  • key: the PMT symbol identifying the type of tag.
  • value: the PMT holding the data of the tag.
  • srcid: (optional) the PMT symbol identifying the block which created the tag.

We can create a gr::tag_t structure to hold all of the above information of a tag, which is probably the easiest/best way to do it. The gr::tag_t struct is defined as having the same members as in the above list. To add a gr::tag_t tag to a stream, use the function:

  void add_item_tag(unsigned int which_output, const tag_t &tag);

The secondary API allows us to create a tag by explicitly listing all of the tag information in the function call:

  void add_item_tag(unsigned int which_output,
                    uint64_t abs_offset,
                    const pmt::pmt_t &key,
                    const pmt::pmt_t &value,
                    const pmt::pmt_t &srcid=pmt::PMT_F);

Getting tags from a Stream

To get tags from a particular input stream, we again have two functions we can use. Both of these pass back vectors of gr::tag_t. The second function allows us to specify a particular key (as a PMT symbol) that filters out all but the key we are interested in, which reduces the effort inside the work function for getting the right tag's data.

The first call just returns any tags between the given range of items:

  void get_tags_in_range(std::vector<tag_t> &v,
                         unsigned int which_input,
                         uint64_t abs_start,
                         uint64_t abs_end);

Adding a fifth argument to this function allows us to filter on the key key.

  void get_tags_in_range(std::vector<tag_t> &v,
                         unsigned int which_input,
                         uint64_t abs_start,
                         uint64_t abs_end,
                         const pmt::pmt_t &key);

Tag Propagation

Tags are propagated downstream from block to block like the normal data streams. How blocks are actually moved depends on a specific propagation policy. We defined three types of policies:

  • All-to-All: all tags from any input port are replicated to all output ports
  • One-to-One: tags from input port i are only copied to output port i (depends on num inputs = num outputs).
  • Dont: Does not propagate tags. Tags are either stopped here or the work function propagates them itself.

The default behavior of a block is the 'All-to-All' method of propagation.

To set a different propagation policy, use the function:

  void set_tag_propagation_policy(tag_propagation_policy_t p);

See the gr::block::tag_propagation_policy_t documentation for details on this enum type.

Tag Propagation through Rate Changes

When a tag is propagated through a block that has a rate change, the item's offset in the data stream will change. The scheduler uses the block's gr::block::relative_rate concept to perform the update on the tag's offset value. The relative rate of a block determines the relationship between the input rate and output rate. Decimators that decimate by a factor of D have a relative rate of 1/D.

Synchronous blocks (gr::sync_block), decimators (gr::sync_decimator), and interpolators (gr::sync_interpolator) all have pre-defined and well-understood relative rates. A standard gr::block has a default relative rate of 1.0, but this must be set if it does not work this way. Often, we use a gr::block because we have no pre-conceived notion of the number of input to output items. If it is important to pass tags through these blocks that respect the change in item value, we would have to use the TPP_DONT tag propagation policy and handle the propagation internally.

Notes on How to Use Tags

Tags can be very useful to an application, and their use is spreading. USRP sources generate tag information on the time, sample rate, and frequency of the board if anything changes. We have a meta data file source/sink that use tags to store information about the data stream. But there are things to think about when using tags in a block.

First, when tags are not being used, there is almost no effect on the scheduler. However, when we use tags, we add overhead by getting and extracting tags from a data stream. We also use overhead in propagating the tags. For each tag, each block must copy a vector of tags from the output port(s) of one block to the input port(s) of the next block(s). These copy operations can add up.

The key is to minimize the use of tags. Use them when and only when necessary and try to provide some control over how tags are generated to control their frequency. A good example is the USRP source, which generates a time tag. If it generated a tag with every sample, we would have thousands of tags per second, which would add a significant amount of overhead. Conversely, if we started at time t0 at sample rate sr, then after N samples, we know that we are now at time t0 + N/sr. So continuously producing new tags adds no information.

The main issue we need to deal with in the above situation is when there is a discontinuity in the packets received from the USRP. Since we have no way of knowing in the flowgraph how many samples were potentially lost, we have lost track of the timing information. The USRP driver recognizes when packets have been dropped and uses this to queue another tag, which allows us to resync. Likewise, any time the sample rate or frequency changes, a new tag is issued.