diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2016-05-01 21:46:05 -0700 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2016-05-01 21:46:05 -0700 |
commit | 6aa995a5477feea1b6f9ef1f49c3045c25d147b1 (patch) | |
tree | c23532a95c3cc0cae788fcbec2c60cec3343d153 | |
parent | 5052b5735124bcae0568ec410df2785784e2541b (diff) | |
parent | 5283e459f6d83aea2d5133fecb0a11a873f0c3ad (diff) |
Merge remote-tracking branch 'mbr0wn/digital/hpd_padding'
-rw-r--r-- | gr-digital/grc/digital_header_payload_demux.xml | 7 | ||||
-rw-r--r-- | gr-digital/include/gnuradio/digital/header_payload_demux.h | 91 | ||||
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.cc | 670 | ||||
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.h | 75 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_header_payload_demux.py | 451 |
5 files changed, 865 insertions, 429 deletions
diff --git a/gr-digital/grc/digital_header_payload_demux.xml b/gr-digital/grc/digital_header_payload_demux.xml index 24c6c5b216..a2fe80e621 100644 --- a/gr-digital/grc/digital_header_payload_demux.xml +++ b/gr-digital/grc/digital_header_payload_demux.xml @@ -13,6 +13,7 @@ $timing_tag_key, $samp_rate, $special_tags, + $header_padding, )</make> <param> <name>Header Length (Symbols)</name> @@ -20,6 +21,12 @@ <type>int</type> </param> <param> + <name>Header Padding (Uncertainty / Symbols)</name> + <key>header_padding</key> + <value>0</value> + <type>int</type> + </param> + <param> <name>Items per symbol</name> <key>items_per_symbol</key> <type>int</type> diff --git a/gr-digital/include/gnuradio/digital/header_payload_demux.h b/gr-digital/include/gnuradio/digital/header_payload_demux.h index 303bebbf32..bcd6bd108a 100644 --- a/gr-digital/include/gnuradio/digital/header_payload_demux.h +++ b/gr-digital/include/gnuradio/digital/header_payload_demux.h @@ -1,5 +1,5 @@ /* -*- c++ -*- */ -/* Copyright 2012 Free Software Foundation, Inc. +/* Copyright 2012-2016 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -29,7 +29,7 @@ namespace gr { namespace digital { /*! - * \brief Header/Payload demuxer. + * \brief Header/Payload demuxer (HPD). * \ingroup packet_operators_blk * * \details @@ -58,6 +58,9 @@ namespace gr { * and taken as the payload length. The payload, together with the header data * as tags, is then copied to output 1. * + * If the header demodulation fails, the header must send a PMT with value + * pmt::PMT_F. The state gets reset and the header is ignored. + * * \section hpd_item_sizes Symbols, Items and Item Sizes * * To generically and transparently handle different kinds of modulations, @@ -68,20 +71,66 @@ namespace gr { * grouping items. In OFDM, we usually don't care about individual samples, but * we do care about full OFDM symbols, so we set \p items_per_symbol to the * IFFT / FFT length of the OFDM modulator / demodulator. - * For most single-carrier modulations, this value can be set to 1 (the default - * value). + * For single-carrier modulations, this value can be set to the number of + * samples per symbol, to handle data in number of symbols, or to 1 to + * handle data in number of samples. * If specified, \p guard_interval items are discarded before every symbol. * This is useful for demuxing bursts of OFDM signals. * * On the output, we can deal with symbols directly by setting \p output_symbols * to true. In that case, the output item size is the <em>symbol size</em>. * - * \b Example: OFDM with 48 sub-carriers, using a length-64 IFFT on the modulator, - * and a cyclic-prefix length of 16 samples. In this case, the itemsize is - * `sizeof(gr_complex)`, because we're receiving complex samples. One OFDM symbol - * has 64 samples, hence \p items_per_symbol is set to 64, and \p guard_interval to - * 16. The header length is specified in number of OFDM symbols. Because we want to - * deal with full OFDM symbols, we set \p output_symbols to true. + * \b Example: OFDM with 48 sub-carriers, using a length-64 IFFT on the + * modulator, and a cyclic-prefix length of 16 samples. In this case, + * \p itemsize is `sizeof(gr_complex)`, because we're receiving complex + * samples. One OFDM symbol has 64 samples, hence \p items_per_symbol is + * set to 64, and \p guard_interval to 16. The header length is specified + * in number of OFDM symbols. Because we want to deal with full OFDM + * symbols, we set \p output_symbols to true. + * + * \b Example: PSK-modulated signals, with 4 samples per symbol. Again, + * \p itemsize is `sizeof(gr_complex)` because we're still dealing with + * complex samples. \p items_per_symbol is 4, because one item is one + * sample. \p guard_interval must be set to 0. The header length is + * given in number of PSK symbols. + * + * \section hpd_uncertainty Handling timing uncertainty on the trigger + * + * By default, the assumption is made that the trigger arrives on *exactly* + * the sample that the header starts. These triggers typically come from + * timing synchronization algorithms which may be suboptimal, and have a + * known timing uncertainty (e.g., we know the trigger might be a sample + * too early or too late). + * + * The demuxer has an option for this case, the \p header_padding. If this + * value is non-zero, it specifies the number of items that are prepended + * and appended to the header before copying it to the header output. + * + * Example: Say our synchronization algorithm can be off by up to two + * samples, and the header length is 20 samples. So we set \p header_len + * to 20, and \p header_padding to 2. + * Now assume a trigger arrives on sample index 100. We copy a total of + * 24 samples to the header port, starting at sample index 98. + * + * The payload is *not* padded. Let's say the header demod reports a + * payload length of 100. In the previous examples, we would copy 100 + * samples to the payload port, starting at sample index 120 (this means + * the padded samples appended to the header are copied to both ports!). + * However, the header demodulator has the option to specify a payload + * offset, which cannot exceed the padding value. To do this, include + * a key `payload_offset` in the message sent back to the HPD. A negative + * value means the payload starts earlier than otherwise. + * (If you wanted to always pad the payload, you could set `payload_offset` + * to `-header_padding` and increase the reported length of the payload). + * + * Because the padding is specified in number of items, and not symbols, + * this value can only be multiples of the number of items per symbol *if* + * either \p output_symbols is true, or a guard interval is specified (or + * both). Note that in practice, it is rare that both a guard interval is + * specified *and* a padding value is required. The difference between the + * padding value and a guard interval is that a guard interval is part of + * the signal, and comes with *every* symbol, whereas the header padding + * is added to only the header, and is not by design. * * \section hpd_tag_handling Tag Handling * @@ -95,12 +144,14 @@ namespace gr { * it belongs to this packet or the following. In this case, it is possible that the * tag might be propagated twice. * - * Tags outside of packets are generally discarded. If this information is important, - * there are two additional mechanisms to preserve the tags: + * Tags outside of packets are generally discarded. If there are tags that + * carry important information that must not be list, there are two + * additional mechanisms to preserve the tags: * - Timing tags might be relevant to know \b when a packet was received. By * specifying the name of a timestamp tag and the sample rate at this block, it * keeps track of the time and will add the time to the first item of every packet. - * The name of the timestamp tag is usually 'rx_time' (see gr::uhd::usrp_source::make()). + * The name of the timestamp tag is usually 'rx_time' (see, e.g., + * gr::uhd::usrp_source::make()). * The time value must be specified in the UHD time format. * - Other tags are simply stored and updated. As an example, the user might want to know the * rx frequency, which UHD stores in the rx_freq tag. In this case, add the tag name 'rx_freq' @@ -124,18 +175,20 @@ namespace gr { * \param timing_tag_key The name of the tag with timing information, usually 'rx_time' or empty (this means timing info is discarded) * \param samp_rate Sampling rate at the input. Necessary to calculate the rx time of packets. * \param special_tags A vector of strings denoting tags which shall be preserved (see \ref hpd_tag_handling) + * \param header_padding A number of items that is appended and prepended to the header. */ static sptr make( - int header_len, - int items_per_symbol=1, - int guard_interval=0, + const int header_len, + const int items_per_symbol=1, + const int guard_interval=0, const std::string &length_tag_key="frame_len", const std::string &trigger_tag_key="", - bool output_symbols=false, - size_t itemsize=sizeof(gr_complex), + const bool output_symbols=false, + const size_t itemsize=sizeof(gr_complex), const std::string &timing_tag_key="", const double samp_rate=1.0, - const std::vector<std::string> &special_tags=std::vector<std::string>() + const std::vector<std::string> &special_tags=std::vector<std::string>(), + const size_t header_padding=0 ); }; diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc index 89428fa86e..f887ea1eb3 100644 --- a/gr-digital/lib/header_payload_demux_impl.cc +++ b/gr-digital/lib/header_payload_demux_impl.cc @@ -1,5 +1,5 @@ /* -*- c++ -*- */ -/* Copyright 2012-2014 Free Software Foundation, Inc. +/* Copyright 2012-2016 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -23,10 +23,10 @@ #include "config.h" #endif -#include <climits> -#include <boost/format.hpp> -#include <gnuradio/io_signature.h> #include "header_payload_demux_impl.h" +#include <gnuradio/io_signature.h> +#include <boost/format.hpp> +#include <climits> namespace gr { namespace digital { @@ -55,55 +55,63 @@ namespace gr { enum out_port_indexes_t { PORT_HEADER = 0, - PORT_PAYLOAD = 1 + PORT_PAYLOAD = 1, + PORT_INPUTDATA = 0, + PORT_TRIGGER = 1 }; #define msg_port_id pmt::mp("header_data") header_payload_demux::sptr header_payload_demux::make( - int header_len, - int items_per_symbol, - int guard_interval, - const std::string &length_tag_key, - const std::string &trigger_tag_key, - bool output_symbols, - size_t itemsize, - const std::string &timing_tag_key, - const double samp_rate, - const std::vector<std::string> &special_tags + int header_len, + int items_per_symbol, + int guard_interval, + const std::string &length_tag_key, + const std::string &trigger_tag_key, + bool output_symbols, + size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags, + const size_t header_padding ){ return gnuradio::get_initial_sptr ( - new header_payload_demux_impl( - header_len, - items_per_symbol, - guard_interval, - length_tag_key, - trigger_tag_key, - output_symbols, - itemsize, - timing_tag_key, - samp_rate, - special_tags - ) + new header_payload_demux_impl( + header_len, + items_per_symbol, + guard_interval, + length_tag_key, + trigger_tag_key, + output_symbols, + itemsize, + timing_tag_key, + samp_rate, + special_tags, + header_padding + ) ); } header_payload_demux_impl::header_payload_demux_impl( - int header_len, - int items_per_symbol, - int guard_interval, - const std::string &length_tag_key, - const std::string &trigger_tag_key, - bool output_symbols, - size_t itemsize, - const std::string &timing_tag_key, - const double samp_rate, - const std::vector<std::string> &special_tags + int header_len, + int items_per_symbol, + int guard_interval, + const std::string &length_tag_key, + const std::string &trigger_tag_key, + bool output_symbols, + size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags, + const size_t header_padding ) : block("header_payload_demux", - io_signature::make2(1, 2, itemsize, sizeof(char)), - io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))), + io_signature::make2(1, 2, itemsize, sizeof(char)), + io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))), d_header_len(header_len), + d_header_padding_symbols(header_padding / items_per_symbol), + d_header_padding_items(header_padding % items_per_symbol), + d_header_padding_total_items(header_padding), d_items_per_symbol(items_per_symbol), d_gi(guard_interval), d_len_tag_key(pmt::string_to_symbol(length_tag_key)), @@ -113,32 +121,42 @@ namespace gr { d_uses_trigger_tag(!trigger_tag_key.empty()), d_state(STATE_FIND_TRIGGER), d_curr_payload_len(0), + d_curr_payload_offset(0), d_payload_tag_keys(0), d_payload_tag_values(0), d_track_time(!timing_tag_key.empty()), d_timing_key(pmt::intern(timing_tag_key)), + d_payload_offset_key(pmt::intern("payload_offset")), d_last_time_offset(0), d_last_time(pmt::make_tuple(pmt::from_uint64(0L), pmt::from_double(0.0))), d_sampling_time(1.0/samp_rate) { if (d_header_len < 1) { - throw std::invalid_argument("Header length must be at least 1 symbol."); + throw std::invalid_argument("Header length must be at least 1 symbol."); + } + if (header_padding < 0) { + throw std::invalid_argument("Header padding must be non-negative."); } if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) { - throw std::invalid_argument("Items and symbol sizes must be at least 1."); + throw std::invalid_argument("Items and symbol sizes must be at least 1."); } if (d_output_symbols) { - set_relative_rate(1.0 / (d_items_per_symbol + d_gi)); + set_relative_rate(1.0 / (d_items_per_symbol + d_gi)); } else { - set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi)); - set_output_multiple(d_items_per_symbol); + set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi)); + set_output_multiple(d_items_per_symbol); + } + if ((d_output_symbols || d_gi) && d_header_padding_items) { + throw std::invalid_argument( + "If output_symbols is true or a guard interval is given, padding must be a multiple of items_per_symbol!" + ); } set_tag_propagation_policy(TPP_DONT); message_port_register_in(msg_port_id); set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1)); - for (unsigned i = 0; i < special_tags.size(); i++) { - d_special_tags.push_back(pmt::string_to_symbol(special_tags[i])); - d_special_tags_last_value.push_back(pmt::PMT_NIL); + for (size_t i = 0; i < special_tags.size(); i++) { + d_special_tags.push_back(pmt::string_to_symbol(special_tags[i])); + d_special_tags_last_value.push_back(pmt::PMT_NIL); } } @@ -146,144 +164,219 @@ namespace gr { { } + // forecast() depends on state: + // - When waiting for a header, we require at least the header length + // - when waiting for a payload, we require at least the payload length + // - Otherwise, pretend this is a sync block with a decimation/interpolation + // depending on symbol size and if we output symbols or items void - header_payload_demux_impl::forecast (int noutput_items, gr_vector_int &ninput_items_required) - { + header_payload_demux_impl::forecast( + int noutput_items, + gr_vector_int &ninput_items_required + ) { int n_items_reqd = 0; if (d_state == STATE_HEADER) { - n_items_reqd = d_header_len * (d_items_per_symbol + d_gi); + n_items_reqd = d_header_len * (d_items_per_symbol + d_gi) + + 2*d_header_padding_total_items; } else if (d_state == STATE_PAYLOAD) { - n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi); + n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi); } else { - n_items_reqd = noutput_items * (d_items_per_symbol + d_gi); - if (!d_output_symbols) { - // here, noutput_items is an integer multiple of d_items_per_symbol! - n_items_reqd /= d_items_per_symbol; - } + n_items_reqd = noutput_items * (d_items_per_symbol + d_gi); + if (!d_output_symbols) { + // Here, noutput_items is an integer multiple of d_items_per_symbol! + n_items_reqd /= d_items_per_symbol; + } } for (unsigned i = 0; i < ninput_items_required.size(); i++) { - ninput_items_required[i] = n_items_reqd; + ninput_items_required[i] = n_items_reqd; } } - inline bool - header_payload_demux_impl::check_items_available( - int n_symbols, - gr_vector_int &ninput_items, - int noutput_items, - int nread - ) - { - // Check there's enough items on the input - if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread) - || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[1]-nread)))) { - return false; - } - + bool header_payload_demux_impl::check_buffers_ready( + int output_symbols_reqd, + int extra_output_items_reqd, + int noutput_items, + int input_items_reqd, + gr_vector_int &ninput_items, + int n_items_read + ) { // Check there's enough space on the output buffer if (d_output_symbols) { - if (noutput_items < n_symbols) { - return false; - } + if (noutput_items < output_symbols_reqd + extra_output_items_reqd) { + return false; + } } else { - if (noutput_items < n_symbols * d_items_per_symbol) { - return false; - } + if (noutput_items < (output_symbols_reqd * d_items_per_symbol) + extra_output_items_reqd) { + return false; + } + } + + // Check there's enough items on the input + if (input_items_reqd > (ninput_items[0]-n_items_read) + || (ninput_items.size() == 2 && (input_items_reqd > (ninput_items[1]-n_items_read)))) { + return false; } + // All good return true; } int - header_payload_demux_impl::general_work (int noutput_items, - gr_vector_int &ninput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - const unsigned char *in = (const unsigned char *) input_items[0]; + header_payload_demux_impl::general_work( + int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items + ) { + const unsigned char *in = (const unsigned char *) input_items[PORT_INPUTDATA]; unsigned char *out_header = (unsigned char *) output_items[PORT_HEADER]; unsigned char *out_payload = (unsigned char *) output_items[PORT_PAYLOAD]; - int nread = 0; - int trigger_offset = 0; - + const int n_input_items = (ninput_items.size() == 2) ? + std::min(ninput_items[0], ninput_items[1]) : + ninput_items[0]; + // Items read going into general_work() + const uint64_t n_items_read_base = nitems_read(PORT_INPUTDATA); + // Items read during this call to general_work() + int n_items_read = 0; + + #define CONSUME_ITEMS(items_to_consume) \ + update_special_tags( \ + n_items_read_base + n_items_read, \ + n_items_read_base + n_items_read + (items_to_consume) \ + ); \ + consume_each(items_to_consume); \ + n_items_read += (items_to_consume); \ + in += (items_to_consume) * d_itemsize; switch (d_state) { - case STATE_WAIT_FOR_MSG: - // In an ideal world, this would never be called - return 0; - - case STATE_HEADER_RX_FAIL: - update_special_tags(0, 1); - consume_each (1); - in += d_itemsize; - nread++; - d_state = STATE_FIND_TRIGGER; - // The following break was added to this state as well as STATE_FIND_TRIGGER - // and STATE_HEADER. There appears to be a bug somewhere in this code without - // the breaks that can lead to failure of this block. With the breaks in the code - // testing has shown more stable performance with various block paramters. - // If an offset calculation bug is found and fixed, it should be possible to - // remove these breaks for some performance increase. - break; - - case STATE_FIND_TRIGGER: - trigger_offset = find_trigger_signal(nread, noutput_items, input_items); - if (trigger_offset == -1) { - update_special_tags(0, noutput_items - nread); - consume_each(noutput_items - nread); - break; - } - update_special_tags(0, trigger_offset); - consume_each (trigger_offset); - in += trigger_offset * d_itemsize; - d_state = STATE_HEADER; - break; - - case STATE_HEADER: - if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) { - copy_n_symbols(in, out_header, PORT_HEADER, d_header_len); - d_state = STATE_WAIT_FOR_MSG; - add_special_tags(); - produce( - PORT_HEADER, - d_header_len * (d_output_symbols ? 1 : d_items_per_symbol) - ); - } - break; - - case STATE_HEADER_RX_SUCCESS: - for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { - add_item_tag( - PORT_PAYLOAD, - nitems_written(PORT_PAYLOAD), - d_payload_tag_keys[i], - d_payload_tag_values[i] - ); - } - nread += d_header_len * (d_items_per_symbol + d_gi); - update_special_tags(0, nread); - consume_each (nread); - in += nread * d_itemsize; - d_state = STATE_PAYLOAD; - break; - - case STATE_PAYLOAD: - if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) { - // The -1 because we won't consume the last item, it might hold the next trigger. - update_special_tags(0, (d_curr_payload_len - 1) * (d_items_per_symbol + d_gi)); - copy_n_symbols(in, out_payload, PORT_PAYLOAD, d_curr_payload_len); - produce(PORT_PAYLOAD, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); - consume_each ((d_curr_payload_len - 1) * (d_items_per_symbol + d_gi)); // Same here - set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi)); - d_state = STATE_FIND_TRIGGER; - } - break; - - default: - throw std::runtime_error("invalid state"); + case STATE_WAIT_FOR_MSG: + // In an ideal world, this would never be called + // parse_header_data_msg() is the only place that can kick us out + // of this state. + return 0; + + case STATE_HEADER_RX_FAIL: + // Actions: + // - Consume a single item to make sure we're not deleting any other + // info + CONSUME_ITEMS(1); + d_state = STATE_FIND_TRIGGER; + break; + + case STATE_FIND_TRIGGER: { + // Assumptions going into this state: + // - No other state was active for this call to general_work() + // - i.e. n_items_read == 0 + // Start looking for a trigger after any header padding. + // The trigger offset is relative to 'in'. + // => The absolute trigger offset is on n_items_read_base + n_items_read_base + trigger_offset + const int max_rel_offset = n_input_items - n_items_read; + const int trigger_offset = find_trigger_signal( + d_header_padding_total_items, + max_rel_offset, + n_items_read_base + n_items_read, + (input_items.size() == 2) ? + ((const unsigned char *) input_items[PORT_TRIGGER]) + n_items_read : NULL + ); + if (trigger_offset < max_rel_offset) { + d_state = STATE_HEADER; + } + // If we're using padding, don't consume everything, or we might + // end up with not enough items before the trigger + const int items_to_consume = trigger_offset - d_header_padding_total_items; + CONSUME_ITEMS(items_to_consume); + break; + } /* case STATE_FIND_TRIGGER */ + + case STATE_HEADER: + // Assumptions going into this state: + // - The first items on `in' are the header samples (including padding) + // - So we can just copy from the beginning of `in' + // - The trigger is on item index `d_header_padding * d_items_per_symbol' + // Actions: + // - Copy the entire header (including padding) to the header port + // - Special tags are added to the header port + if (check_buffers_ready( + d_header_len + 2*d_header_padding_symbols, + d_header_padding_items, + noutput_items, + d_header_len * (d_items_per_symbol + d_gi) + 2*d_header_padding_total_items, + ninput_items, + n_items_read)) { + add_special_tags(); + copy_n_symbols( + in, + out_header, + PORT_HEADER, + n_items_read_base + n_items_read, + d_header_len+2*d_header_padding_symbols, // Number of symbols to copy + 2*d_header_padding_items + ); + d_state = STATE_WAIT_FOR_MSG; + } + break; + + case STATE_HEADER_RX_SUCCESS: + // Copy tags from header to payload + for (size_t i = 0; i < d_payload_tag_keys.size(); i++) { + add_item_tag( + PORT_PAYLOAD, + nitems_written(PORT_PAYLOAD), + d_payload_tag_keys[i], + d_payload_tag_values[i] + ); + } + // Consume header from input + { + // Consume the padding only once, we leave the second + // part in there because it might be part of the payload + const int items_to_consume = + d_header_len * (d_items_per_symbol + d_gi) + + d_header_padding_total_items + + d_curr_payload_offset; + CONSUME_ITEMS(items_to_consume); + d_curr_payload_offset = 0; + d_state = STATE_PAYLOAD; + } + break; + + case STATE_PAYLOAD: + // Assumptions: + // - Input buffer is in the right spot to just start copying + if (check_buffers_ready( + d_curr_payload_len, + 0, + noutput_items, + d_curr_payload_len * (d_items_per_symbol + d_gi), + ninput_items, + n_items_read)) { + // Write payload + copy_n_symbols( + in, + out_payload, + PORT_PAYLOAD, + n_items_read_base + n_items_read, + d_curr_payload_len + ); + // Consume payload + // We can't consume the full payload, because we need to hold off + // at least the padding value. We'll use a minimum padding of 1 + // item here. + const int items_padding = std::max(d_header_padding_total_items, 1); + const int items_to_consume = + d_curr_payload_len * (d_items_per_symbol + d_gi) + - items_padding; + CONSUME_ITEMS(items_to_consume); + set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi)); + d_state = STATE_FIND_TRIGGER; + } + break; + + default: + throw std::runtime_error("invalid state"); } /* switch */ return WORK_CALLED_PRODUCE; @@ -292,35 +385,41 @@ namespace gr { int header_payload_demux_impl::find_trigger_signal( - int nread, - int noutput_items, - gr_vector_const_void_star &input_items) - { - if (input_items.size() == 2) { - unsigned char *in_trigger = (unsigned char *) input_items[1]; - in_trigger += nread; - for (int i = 0; i < noutput_items-nread; i++) { - if (in_trigger[i]) { - return i; - } - } + int skip_items, + int max_rel_offset, + uint64_t base_offset, + const unsigned char *in_trigger + ) { + int rel_offset = max_rel_offset; + if (max_rel_offset < skip_items) { + return rel_offset; + } + if (in_trigger) { + for (int i = skip_items; i < max_rel_offset; i++) { + if (in_trigger[i]) { + rel_offset = i; + break; + } + } } if (d_uses_trigger_tag) { std::vector<tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items, d_trigger_tag_key); - uint64_t min_offset = ULLONG_MAX; - int tag_index = -1; - for (unsigned i = 0; i < tags.size(); i++) { - if (tags[i].offset < min_offset) { - tag_index = (int) i; - min_offset = tags[i].offset; + get_tags_in_range( + tags, + PORT_INPUTDATA, + base_offset + skip_items, + base_offset + max_rel_offset, + d_trigger_tag_key + ); + if (!tags.empty()) { + std::sort(tags.begin(), tags.end(), tag_t::offset_compare); + const int tag_rel_offset = tags[0].offset - base_offset; + if (tag_rel_offset < rel_offset) { + rel_offset = tag_rel_offset; } } - if (tag_index != -1) { - return min_offset - nitems_read(0); - } } - return -1; + return rel_offset; } /* find_trigger_signal() */ @@ -332,77 +431,100 @@ namespace gr { d_state = STATE_HEADER_RX_FAIL; if (pmt::is_integer(header_data)) { - d_curr_payload_len = pmt::to_long(header_data); - d_payload_tag_keys.push_back(d_len_tag_key); - d_payload_tag_values.push_back(header_data); - d_state = STATE_HEADER_RX_SUCCESS; + d_curr_payload_len = pmt::to_long(header_data); + d_payload_tag_keys.push_back(d_len_tag_key); + d_payload_tag_values.push_back(header_data); + d_state = STATE_HEADER_RX_SUCCESS; } else if (pmt::is_dict(header_data)) { - pmt::pmt_t dict_items(pmt::dict_items(header_data)); - while (!pmt::is_null(dict_items)) { - pmt::pmt_t this_item(pmt::car(dict_items)); - d_payload_tag_keys.push_back(pmt::car(this_item)); - d_payload_tag_values.push_back(pmt::cdr(this_item)); - if (pmt::equal(pmt::car(this_item), d_len_tag_key)) { - d_curr_payload_len = pmt::to_long(pmt::cdr(this_item)); - d_state = STATE_HEADER_RX_SUCCESS; - } - dict_items = pmt::cdr(dict_items); - } - if (d_state == STATE_HEADER_RX_FAIL) { - GR_LOG_CRIT(d_logger, "no length tag passed from header data"); - } + pmt::pmt_t dict_items(pmt::dict_items(header_data)); + while (!pmt::is_null(dict_items)) { + pmt::pmt_t this_item(pmt::car(dict_items)); + d_payload_tag_keys.push_back(pmt::car(this_item)); + d_payload_tag_values.push_back(pmt::cdr(this_item)); + if (pmt::equal(pmt::car(this_item), d_len_tag_key)) { + d_curr_payload_len = pmt::to_long(pmt::cdr(this_item)); + d_state = STATE_HEADER_RX_SUCCESS; + } + if (pmt::equal(pmt::car(this_item), d_payload_offset_key)) { + d_curr_payload_offset = pmt::to_long(pmt::cdr(this_item)); + if (std::abs(d_curr_payload_offset) > d_header_padding_total_items) { + GR_LOG_CRIT(d_logger, "Payload offset exceeds padding"); + d_state = STATE_HEADER_RX_FAIL; + return; + } + } + dict_items = pmt::cdr(dict_items); + } + if (d_state == STATE_HEADER_RX_FAIL) { + GR_LOG_CRIT(d_logger, "no payload length passed from header data"); + } } else if (header_data == pmt::PMT_F || pmt::is_null(header_data)) { - GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data)); + GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data)); } else { - GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data)); + GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data)); } if (d_state == STATE_HEADER_RX_SUCCESS) { - if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)/2) { - d_state = STATE_HEADER_RX_FAIL; - GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); - } else { - set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); - } + if (d_curr_payload_len < 0) { + GR_LOG_WARN(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); + d_curr_payload_len = 0; + d_state = STATE_HEADER_RX_FAIL; + } + if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)/2) { + d_state = STATE_HEADER_RX_FAIL; + GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); + } else { + set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); + } } } /* parse_header_data_msg() */ void header_payload_demux_impl::copy_n_symbols( - const unsigned char *in, - unsigned char *out, - int port, - int n_symbols - ) - { + const unsigned char *in, + unsigned char *out, + int port, + const uint64_t n_items_read_base, + int n_symbols, + int n_padding_items + ) { // Copy samples if (d_gi) { - for (int i = 0; i < n_symbols; i++) { - memcpy((void *) out, (void *) (in + d_gi * d_itemsize), d_items_per_symbol * d_itemsize); - in += d_itemsize * (d_items_per_symbol + d_gi); - out += d_itemsize * d_items_per_symbol; - } + // Here we know n_padding_items must be 0 (see contract), + // because all padding items will be part of n_symbols + for (int i = 0; i < n_symbols; i++) { + memcpy( + (void *) out, + (void *) (in + d_gi * d_itemsize), + d_items_per_symbol * d_itemsize + ); + in += d_itemsize * (d_items_per_symbol + d_gi); + out += d_itemsize * d_items_per_symbol; + } } else { - memcpy( - (void *) out, - (void *) in, - n_symbols * d_items_per_symbol * d_itemsize - ); + memcpy( + (void *) out, + (void *) in, + (n_symbols * d_items_per_symbol + n_padding_items) * d_itemsize + ); } // Copy tags std::vector<tag_t> tags; get_tags_in_range( - tags, 0, - nitems_read(0), - nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi) + tags, + PORT_INPUTDATA, + n_items_read_base, + n_items_read_base + + n_symbols * (d_items_per_symbol + d_gi) + + n_padding_items ); for (size_t t = 0; t < tags.size(); t++) { // The trigger tag is *not* propagated if (tags[t].key == d_trigger_tag_key) { continue; } - int new_offset = tags[t].offset - nitems_read(0); + int new_offset = tags[t].offset - n_items_read_base; if (d_output_symbols) { new_offset /= (d_items_per_symbol + d_gi); } else if (d_gi) { @@ -418,43 +540,49 @@ namespace gr { tags[t].value ); } + // Advance write pointers + // Items to produce might actually be symbols + const int items_to_produce = d_output_symbols ? + n_symbols : + (n_symbols * d_items_per_symbol + n_padding_items); + produce(port, items_to_produce); } /* copy_n_symbols() */ void header_payload_demux_impl::update_special_tags( - int range_start, - int range_end + uint64_t range_start, + uint64_t range_end ){ if (d_track_time) { - std::vector<tag_t> tags; - get_tags_in_range(tags, 0, - nitems_read(0) + range_start, - nitems_read(0) + range_end, - d_timing_key - ); - for (unsigned t = 0; t < tags.size(); t++) { - if(tags[t].offset >= d_last_time_offset) { - d_last_time = tags[t].value; - d_last_time_offset = tags[t].offset; - } - } + std::vector<tag_t> tags; + get_tags_in_range( + tags, + PORT_INPUTDATA, + range_start, + range_end, + d_timing_key + ); + if (!tags.empty()) { + std::sort(tags.begin(), tags.end(), tag_t::offset_compare); + d_last_time = tags.back().value; + d_last_time_offset = tags.back().offset; + } } std::vector<tag_t> tags; - for (unsigned i = 0; i < d_special_tags.size(); i++) { - uint64_t offset = 0; - // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually - get_tags_in_range(tags, 0, - nitems_read(0) + range_start, - nitems_read(0) + range_end, - d_special_tags[i] - ); - for (unsigned t = 0; t < tags.size(); t++) { - if(tags[t].offset >= offset) { - d_special_tags_last_value[i] = tags[t].value; - offset = tags[t].offset; - } - } + for (size_t i = 0; i < d_special_tags.size(); i++) { + // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually + get_tags_in_range( + tags, + PORT_INPUTDATA, // Read from port 0 + range_start, + range_end, + d_special_tags[i] + ); + std::sort(tags.begin(), tags.end(), tag_t::offset_compare); + for (size_t t = 0; t < tags.size(); t++) { + d_special_tags_last_value[i] = tags[t].value; + } } } /* update_special_tags() */ @@ -462,24 +590,24 @@ namespace gr { header_payload_demux_impl::add_special_tags( ){ if (d_track_time) { - add_item_tag( - PORT_HEADER, - nitems_written(PORT_HEADER), - d_timing_key, - _update_pmt_time( - d_last_time, - d_sampling_time * (nitems_read(0) - d_last_time_offset) - ) - ); + add_item_tag( + PORT_HEADER, + nitems_written(PORT_HEADER), + d_timing_key, + _update_pmt_time( + d_last_time, + d_sampling_time * (nitems_read(PORT_INPUTDATA) - d_last_time_offset) + ) + ); } for (unsigned i = 0; i < d_special_tags.size(); i++) { - add_item_tag( - PORT_HEADER, - nitems_written(PORT_HEADER), - d_special_tags[i], - d_special_tags_last_value[i] - ); + add_item_tag( + PORT_HEADER, + nitems_written(PORT_HEADER), + d_special_tags[i], + d_special_tags_last_value[i] + ); } } /* add_special_tags() */ diff --git a/gr-digital/lib/header_payload_demux_impl.h b/gr-digital/lib/header_payload_demux_impl.h index 1d45dc7ce1..0a70e7da3e 100644 --- a/gr-digital/lib/header_payload_demux_impl.h +++ b/gr-digital/lib/header_payload_demux_impl.h @@ -1,18 +1,18 @@ /* -*- c++ -*- */ -/* Copyright 2012 Free Software Foundation, Inc. - * +/* Copyright 2012-2016 Free Software Foundation, Inc. + * * This file is part of GNU Radio - * + * * GNU Radio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3, or (at your option) * any later version. - * + * * GNU Radio is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. - * + * * You should have received a copy of the GNU General Public License * along with GNU Radio; see the file COPYING. If not, write to * the Free Software Foundation, Inc., 51 Franklin Street, @@ -31,6 +31,9 @@ namespace gr { { private: int d_header_len; //!< Number of bytes per header + const int d_header_padding_symbols; //!< Symbols header padding + const int d_header_padding_items; //!< Items header padding + const int d_header_padding_total_items; //!< Items header padding int d_items_per_symbol; //!< Bytes per symbol int d_gi; //!< Bytes per guard interval pmt::pmt_t d_len_tag_key; //!< Key of length tag @@ -40,10 +43,12 @@ namespace gr { bool d_uses_trigger_tag; //!< If a trigger tag is used int d_state; //!< Current read state int d_curr_payload_len; //!< Length of the next payload (symbols) + int d_curr_payload_offset; //!< Offset of the next payload (symbols) std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for PMTs that go on the payload (keys) std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for PMTs that go on the payload (values) bool d_track_time; //!< Whether or not to keep track of the rx time pmt::pmt_t d_timing_key; //!< Key of the timing tag (usually 'rx_time') + pmt::pmt_t d_payload_offset_key; //!< Key of payload offset (usually 'payload_offset') uint64_t d_last_time_offset; //!< Item number of the last time tag pmt::pmt_t d_last_time; //!< The actual time that was indicated double d_sampling_time; //!< Inverse sampling rate @@ -53,7 +58,14 @@ namespace gr { // Helper functions to make the state machine more readable //! Checks if there are enough items on the inputs and enough space on the output buffers to copy \p n_symbols symbols - inline bool check_items_available(int n_symbols, gr_vector_int &ninput_items, int noutput_items, int nread); + bool check_buffers_ready( + int output_symbols_reqd, + int extra_output_items_reqd, + int noutput_items, + int input_items_reqd, + gr_vector_int &ninput_items, + int n_items_read + ); //! Message handler: Reads the result from the header demod and sets length tag (and other tags) void parse_header_data_msg(pmt::pmt_t header_data); @@ -62,49 +74,54 @@ namespace gr { // Searches input 1 (if active), then the tags. Returns the offset in the input buffer // (or -1 if none is found) int find_trigger_signal( - int nread, - int noutput_items, - gr_vector_const_void_star &input_items); + int skip_items, + int noutput_items, + uint64_t base_offset, + const unsigned char *in_trigger + ); //! Copies n symbols from in to out, makes sure tags are propagated properly. Does neither consume nor produce. void copy_n_symbols( - const unsigned char *in, - unsigned char *out, - int port, - int n_symbols + const unsigned char *in, + unsigned char *out, + int port, + const uint64_t n_items_read_base, + int n_symbols, + int n_padding_items=0 ); //! Scans a given range for tags in d_special_tags void update_special_tags( - int range_start, - int range_end + uint64_t range_start, + uint64_t range_end ); //! Adds all tags in d_special_tags and timing info to the first item of the header. void add_special_tags(); - public: header_payload_demux_impl( - int header_len, - int items_per_symbol, - int guard_interval, - const std::string &length_tag_key, - const std::string &trigger_tag_key, - bool output_symbols, - size_t itemsize, - const std::string &timing_tag_key, - const double samp_rate, - const std::vector<std::string> &special_tags + const int header_len, + const int items_per_symbol, + const int guard_interval, + const std::string &length_tag_key, + const std::string &trigger_tag_key, + const bool output_symbols, + const size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags, + const size_t header_padding ); ~header_payload_demux_impl(); void forecast (int noutput_items, gr_vector_int &ninput_items_required); int general_work(int noutput_items, - gr_vector_int &ninput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items + ); }; } // namespace digital diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index 8006d4442e..f36d71067c 100755 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -1,29 +1,69 @@ #!/usr/bin/env python -# Copyright 2012 Free Software Foundation, Inc. -# +# Copyright 2012-2016 Free Software Foundation, Inc. +# # This file is part of GNU Radio -# +# # GNU Radio is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. -# +# # GNU Radio is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with GNU Radio; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. -# +# +from __future__ import print_function import time - -from gnuradio import gr, gr_unittest, digital, blocks +import random +import numpy +from gnuradio import gr +from gnuradio import gr_unittest +from gnuradio import digital +from gnuradio import blocks import pmt +def make_tag(key, value, offset): + tag = gr.tag_t() + tag.offset = offset + tag.key = pmt.string_to_symbol(key) + tag.value = pmt.to_pmt(value) + return tag + + +class HeaderToMessageBlock(gr.sync_block): + """ + Helps with testing the HPD. Receives a header, stores it, posts + a predetermined message. + """ + def __init__(self, itemsize, header_len, messages, header_is_symbol=False): + gr.sync_block.__init__( + self, + name="HeaderToMessageBlock", + in_sig=[itemsize], + out_sig=[itemsize], + ) + self.header_len = header_len + self.message_port_register_out(pmt.intern('header_data')) + self.messages = messages + self.msg_count = 0 + + def work(self, input_items, output_items): + for i in xrange(len(input_items[0])/self.header_len): + msg = self.messages[self.msg_count] or False + #print("Sending message: {0}".format(msg)) + self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg)) + self.msg_count += 1 + output_items[0][:] = input_items[0][:] + return len(input_items[0]) + + class qa_header_payload_demux (gr_unittest.TestCase): def setUp (self): @@ -32,6 +72,36 @@ class qa_header_payload_demux (gr_unittest.TestCase): def tearDown (self): self.tb = None + def connect_all_blocks(self, + data_src, trigger_src, + hpd, + mock_header_demod, + payload_sink, header_sink + ): + """ + Connect the standard HPD test flowgraph + """ + self.tb.connect(data_src, (hpd, 0)) + if trigger_src is not None: + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), mock_header_demod) + self.tb.connect(mock_header_demod, header_sink) + self.tb.msg_connect( + mock_header_demod, 'header_data', + hpd, 'header_data' + ) + self.tb.connect((hpd, 1), payload_sink) + + def run_tb(self, payload_sink, payload_len, header_sink, header_len, timeout=30): + stop_time = time.time() + timeout + self.tb.start() + while len(payload_sink.data()) < payload_len and \ + len(header_sink.data()) < header_len and \ + time.time() < stop_time: + time.sleep(.2) + self.tb.stop() + self.tb.wait() + def test_001_t (self): """ Simplest possible test: put in zeros, then header, then payload, trigger signal, try to demux. @@ -45,25 +115,13 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_signal = [0,] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: - testtag1 = gr.tag_t() - testtag1.offset = 0 - testtag1.key = pmt.string_to_symbol('tag1') - testtag1.value = pmt.from_long(0) + testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0: - testtag2 = gr.tag_t() - testtag2.offset = n_zeros - testtag2.key = pmt.string_to_symbol('tag2') - testtag2.value = pmt.from_long(23) + testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 2: - testtag3 = gr.tag_t() - testtag3.offset = n_zeros + len(header) - 1 - testtag3.key = pmt.string_to_symbol('tag3') - testtag3.value = pmt.from_long(42) + testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) # This goes on output 1, item 3: - testtag4 = gr.tag_t() - testtag4.offset = n_zeros + len(header) + 3 - testtag4.key = pmt.string_to_symbol('tag4') - testtag4.value = pmt.from_long(314) + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, @@ -73,26 +131,17 @@ class qa_header_payload_demux (gr_unittest.TestCase): hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float ) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + len(header), + [len(payload)] + ) self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you - header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - - self.tb.connect(data_src, (hpd, 0)) - self.tb.connect(trigger_src, (hpd, 1)) - self.tb.connect((hpd, 0), header_sink) - self.tb.connect((hpd, 1), payload_sink) - self.tb.start() - time.sleep(.2) # Need this, otherwise, the next message is ignored - hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.from_long(len(payload)) - ) - while len(payload_sink.data()) < len(payload): - time.sleep(.2) - self.tb.stop() - self.tb.wait() - - self.assertEqual(header_sink.data(), header) + header_sink = blocks.vector_sink_f() + self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, len(payload), header_sink, len(header)) + self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) ptags_header = [] for tag in header_sink.tags(): @@ -122,30 +171,15 @@ class qa_header_payload_demux (gr_unittest.TestCase): payload = tuple(range(5, 20)) data_signal = (0,) * n_zeros + header + payload # Trigger tag - trigger_tag = gr.tag_t() - trigger_tag.offset = n_zeros - trigger_tag.key = pmt.string_to_symbol('detect') - trigger_tag.value = pmt.PMT_T + trigger_tag = make_tag('detect', True, n_zeros) # This is dropped: - testtag1 = gr.tag_t() - testtag1.offset = 0 - testtag1.key = pmt.string_to_symbol('tag1') - testtag1.value = pmt.from_long(0) + testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0: - testtag2 = gr.tag_t() - testtag2.offset = n_zeros - testtag2.key = pmt.string_to_symbol('tag2') - testtag2.value = pmt.from_long(23) + testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 2: - testtag3 = gr.tag_t() - testtag3.offset = n_zeros + len(header) - 1 - testtag3.key = pmt.string_to_symbol('tag3') - testtag3.value = pmt.from_long(42) + testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) # This goes on output 1, item 3: - testtag4 = gr.tag_t() - testtag4.offset = n_zeros + len(header) + 3 - testtag4.key = pmt.string_to_symbol('tag4') - testtag4.value = pmt.from_long(314) + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, @@ -157,21 +191,14 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - - self.tb.connect(data_src, (hpd, 0)) - self.tb.connect((hpd, 0), header_sink) - self.tb.connect((hpd, 1), payload_sink) - self.tb.start() - time.sleep(.2) # Need this, otherwise, the next message is ignored - hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.from_long(len(payload)) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + len(header), + [len(payload)] ) - while len(payload_sink.data()) < len(payload): - time.sleep(.2) - self.tb.stop() - self.tb.wait() - + self.connect_all_blocks(data_src, None, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, len(payload), header_sink, len(header)) + # Check results self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) ptags_header = [] @@ -193,8 +220,143 @@ class qa_header_payload_demux (gr_unittest.TestCase): ] self.assertEqual(expected_tags_payload, ptags_payload) + def test_001_headerpadding (self): + """ Like test 1, but with header padding. """ + n_zeros = 3 + header = (1, 2, 3) + header_padding = 1 + payload = tuple(range(5, 20)) + data_signal = (0,) * n_zeros + header + payload + trigger_signal = [0,] * len(data_signal) + trigger_signal[n_zeros] = 1 + # This is dropped: + testtag1 = make_tag('tag1', 0, 0) + # This goes on output 0, item 0: + testtag2 = make_tag('tag2', 23, n_zeros) + # This goes on output 0, item 2: + testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) + # This goes on output 1, item 3: + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) + data_src = blocks.vector_source_f( + data_signal, + False, + tags=(testtag1, testtag2, testtag3, testtag4) + ) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header), + 1, # Items per symbol + 0, # Guard interval + "frame_len", # TSB tag key + "detect", # Trigger tag key + False, # No symbols please + gr.sizeof_float, # Item size + "", # Timing tag key + 1.0, # Samp rate + (), # No special tags + header_padding + ) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + len(header), + [len(payload)] + ) + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, len(payload), header_sink, len(header)+2) + # Check values + # Header now is padded: + self.assertEqual(header_sink.data(), (0,) + header + (payload[0],)) + self.assertEqual(payload_sink.data(), payload) + ptags_header = [] + for tag in header_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_header = [ + {'key': 'tag2', 'offset': 1}, + {'key': 'tag3', 'offset': 3}, + ] + self.assertEqual(expected_tags_header, ptags_header) + ptags_payload = [] + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_payload = [ + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 3}, + ] + self.assertEqual(expected_tags_payload, ptags_payload) + + def test_001_headerpadding_payload_offset (self): + """ Like test 1, but with header padding + payload offset. """ + n_zeros = 3 + header = (1, 2, 3) + header_padding = 1 + payload_offset = -1 + payload = tuple(range(5, 20)) + data_signal = (0,) * n_zeros + header + payload + (0,) * 100 + trigger_signal = [0,] * len(data_signal) + trigger_signal[n_zeros] = 1 + # This goes on output 1, item 3 + 1 (for payload offset) + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) + data_src = blocks.vector_source_f( + data_signal, + False, + tags=(testtag4,) + ) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header), + 1, # Items per symbol + 0, # Guard interval + "frame_len", # TSB tag key + "detect", # Trigger tag key + False, # No symbols please + gr.sizeof_float, # Item size + "", # Timing tag key + 1.0, # Samp rate + (), # No special tags + header_padding + ) + self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), header_sink) + self.tb.connect((hpd, 1), payload_sink) + self.tb.start() + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset}) + ) + while len(payload_sink.data()) < len(payload): + time.sleep(.2) + self.tb.stop() + self.tb.wait() + # Header is now padded: + self.assertEqual(header_sink.data(), (0,) + header + (payload[0],)) + # Payload is now offset: + self.assertEqual( + payload_sink.data(), + data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)] + ) + ptags_payload = {} + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload[ptag.key] = ptag.offset + expected_tags_payload = { + 'frame_len': 0, + 'payload_offset': 0, + 'tag4': 3 - payload_offset, + } + self.assertEqual(expected_tags_payload, ptags_payload) + + def test_002_symbols (self): - """ + """ Same as before, but operate on symbols """ n_zeros = 1 @@ -207,25 +369,13 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_signal = [0,] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: - testtag1 = gr.tag_t() - testtag1.offset = 0 - testtag1.key = pmt.string_to_symbol('tag1') - testtag1.value = pmt.from_long(0) + testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0 (from the GI) - testtag2 = gr.tag_t() - testtag2.offset = n_zeros - testtag2.key = pmt.string_to_symbol('tag2') - testtag2.value = pmt.from_long(23) + testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 0 (middle of the header symbol) - testtag3 = gr.tag_t() - testtag3.offset = n_zeros + gi + 1 - testtag3.key = pmt.string_to_symbol('tag3') - testtag3.value = pmt.from_long(42) + testtag3 = make_tag('tag3', 42, n_zeros + gi + 1) # This goes on output 1, item 1 (middle of the first payload symbol) - testtag4 = gr.tag_t() - testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1 - testtag4.key = pmt.string_to_symbol('tag4') - testtag4.value = pmt.from_long(314) + testtag4 = make_tag('tag4', 314, n_zeros + (gi + items_per_symbol) * 2 + 1) data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( @@ -291,25 +441,20 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_signal[n_zeros] = 1 trigger_signal[len(data_signal)] = 1 trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1 - tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000 + print("Triggers at: {0} {1} {2}".format( + n_zeros, + len(data_signal), + len(data_signal)+len(header_fail)+n_zeros) + ) + tx_signal = data_signal + \ + header_fail + (0,) * n_zeros + \ + header + payload2 + (0,) * 1000 # Timing tag: This is preserved and updated: - timing_tag = gr.tag_t() - timing_tag.offset = 0 - timing_tag.key = pmt.string_to_symbol('rx_time') - timing_tag.value = pmt.to_pmt((0, 0)) + timing_tag = make_tag('rx_time', (0, 0), 0) # Rx freq tags: - rx_freq_tag1 = gr.tag_t() - rx_freq_tag1.offset = 0 - rx_freq_tag1.key = pmt.string_to_symbol('rx_freq') - rx_freq_tag1.value = pmt.from_double(1.0) - rx_freq_tag2 = gr.tag_t() - rx_freq_tag2.offset = 29 - rx_freq_tag2.key = pmt.string_to_symbol('rx_freq') - rx_freq_tag2.value = pmt.from_double(1.5) - rx_freq_tag3 = gr.tag_t() - rx_freq_tag3.offset = 30 - rx_freq_tag3.key = pmt.string_to_symbol('rx_freq') - rx_freq_tag3.value = pmt.from_double(2.0) + rx_freq_tag1 = make_tag('rx_freq', 1.0, 0) + rx_freq_tag2 = make_tag('rx_freq', 1.5, 29) + rx_freq_tag3 = make_tag('rx_freq', 2.0, 30) ### Flow graph data_src = blocks.vector_source_f( tx_signal, False, @@ -388,6 +533,92 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.assertEqual(tags_header, tags_expected_header) self.assertEqual(tags_payload, tags_expected_payload) + def test_004_fuzz(self): + """ + Long random test + """ + def create_signal( + n_bursts, + header_len, + max_gap, + max_burstsize, + fail_rate, + ): + signal = [] + indexes = [] + burst_sizes = [] + total_payload_len = 0 + for burst_count in xrange(n_bursts): + gap_size = random.randint(0, max_gap) + signal += [0] * gap_size + is_failure = random.random() < fail_rate + if not is_failure: + burst_size = random.randint(0, max_burstsize) + else: + burst_size = 0 + total_payload_len += burst_size + indexes += [len(signal)] + signal += [1] * header_len + signal += [2] * burst_size + burst_sizes += [burst_size] + return (signal, indexes, total_payload_len, burst_sizes) + def indexes_to_triggers(indexes, signal_len): + """ + Convert indexes to a mix of trigger signals and tags + """ + trigger_signal = [0] * signal_len + trigger_tags = [] + for index in indexes: + if random.random() > 0.5: + trigger_signal[index] = 1 + else: + trigger_tags += [make_tag('detect', True, index)] + return (trigger_signal, trigger_tags) + ### Go, go, go + # The divide-by-20 means we'll usually get the same random seed + # between the first run and the XML run. + random_seed = int(time.time()/20) + random.seed(random_seed) + print("Random seed: {0}".format(random_seed)) + n_bursts = 400 + header_len = 5 + max_gap = 50 + max_burstsize = 100 + fail_rate = 0.05 + signal, indexes, total_payload_len, burst_sizes = create_signal( + n_bursts, header_len, max_gap, max_burstsize, fail_rate + ) + trigger_signal, trigger_tags = indexes_to_triggers(indexes, len(signal)) + # Flow graph + data_src = blocks.vector_source_f( + signal, False, + tags=trigger_tags + ) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + header_len=header_len, + items_per_symbol=1, + guard_interval=0, + length_tag_key="frame_len", + trigger_tag_key="detect", + output_symbols=False, + itemsize=gr.sizeof_float, + timing_tag_key='rx_time', + samp_rate=1.0, + special_tags=('rx_freq',), + ) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + header_len, + burst_sizes + ) + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, total_payload_len, header_sink, header_len*n_bursts) + self.assertEqual(header_sink.data(), tuple([1]*header_len*n_bursts)) + self.assertEqual(payload_sink.data(), tuple([2]*total_payload_len)) + if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml") |