summaryrefslogtreecommitdiff
path: root/gr-digital/lib/header_payload_demux_impl.cc
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2014-01-25 11:37:44 +0100
committerMartin Braun <martin.braun@ettus.com>2014-01-30 16:49:42 +0100
commit3e054cb93ccd2eb55ff040f9bc72532ea5b0b954 (patch)
treee04316202295ec00524c746b853d6ace1ff1d9f3 /gr-digital/lib/header_payload_demux_impl.cc
parent19d111e2448a58e20ff5c1c80ca69751376b2544 (diff)
digital: HPD now supports time- and other special tags, can mark rx-time of packets
Diffstat (limited to 'gr-digital/lib/header_payload_demux_impl.cc')
-rw-r--r--gr-digital/lib/header_payload_demux_impl.cc154
1 files changed, 133 insertions, 21 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc
index ba346e2ae7..406d59e246 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -31,6 +31,19 @@
namespace gr {
namespace digital {
+ //! Returns a PMT time tuple (uint seconds, double fraction) as the sum of
+ // another PMT time tuple and a time diff in seconds.
+ pmt::pmt_t _update_pmt_time(
+ pmt::pmt_t old_time,
+ double time_difference
+ ){
+ double diff_seconds;
+ double diff_frac = modf(time_difference, &diff_seconds);
+ uint64_t seconds = pmt::to_uint64(pmt::tuple_ref(old_time, 0)) + (uint64_t) diff_seconds;
+ double frac = pmt::to_double(pmt::tuple_ref(old_time, 1)) + diff_frac;
+ return pmt::make_tuple(pmt::from_uint64(seconds), pmt::from_double(frac));
+ }
+
enum demux_states_t {
STATE_FIND_TRIGGER, // "Idle" state (waiting for burst)
STATE_HEADER, // Copy header
@@ -40,6 +53,11 @@ namespace gr {
STATE_PAYLOAD // Copy payload
};
+ enum out_port_indexes_t {
+ PORT_HEADER = 0,
+ PORT_PAYLOAD = 1
+ };
+
#define msg_port_id pmt::mp("header_data")
header_payload_demux::sptr
@@ -50,8 +68,11 @@ namespace gr {
const std::string &length_tag_key,
const std::string &trigger_tag_key,
bool output_symbols,
- size_t itemsize)
- {
+ size_t itemsize,
+ const std::string &timing_tag_key,
+ const double samp_rate,
+ const std::vector<std::string> &special_tags
+ ){
return gnuradio::get_initial_sptr (
new header_payload_demux_impl(
header_len,
@@ -60,7 +81,10 @@ namespace gr {
length_tag_key,
trigger_tag_key,
output_symbols,
- itemsize
+ itemsize,
+ timing_tag_key,
+ samp_rate,
+ special_tags
)
);
}
@@ -72,7 +96,10 @@ namespace gr {
const std::string &length_tag_key,
const std::string &trigger_tag_key,
bool output_symbols,
- size_t itemsize
+ size_t itemsize,
+ const std::string &timing_tag_key,
+ const double samp_rate,
+ const std::vector<std::string> &special_tags
) : block("header_payload_demux",
io_signature::make2(1, 2, itemsize, sizeof(char)),
io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))),
@@ -87,7 +114,12 @@ namespace gr {
d_state(STATE_FIND_TRIGGER),
d_curr_payload_len(0),
d_payload_tag_keys(0),
- d_payload_tag_values(0)
+ d_payload_tag_values(0),
+ d_track_time(!timing_tag_key.empty()),
+ d_timing_key(pmt::intern(timing_tag_key)),
+ d_last_time_offset(0),
+ d_last_time(pmt::make_tuple(pmt::from_uint64(0L), pmt::from_double(0.0))),
+ d_sampling_time(1.0/samp_rate)
{
if (d_header_len < 1) {
throw std::invalid_argument("Header length must be at least 1 symbol.");
@@ -104,6 +136,10 @@ namespace gr {
set_tag_propagation_policy(TPP_DONT);
message_port_register_in(msg_port_id);
set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1));
+ for (unsigned i = 0; i < special_tags.size(); i++) {
+ d_special_tags.push_back(pmt::string_to_symbol(special_tags[i]));
+ d_special_tags_last_value.push_back(pmt::PMT_NIL);
+ }
}
header_payload_demux_impl::~header_payload_demux_impl()
@@ -168,8 +204,8 @@ namespace gr {
gr_vector_void_star &output_items)
{
const unsigned char *in = (const unsigned char *) input_items[0];
- unsigned char *out_header = (unsigned char *) output_items[0];
- unsigned char *out_payload = (unsigned char *) output_items[1];
+ unsigned char *out_header = (unsigned char *) output_items[PORT_HEADER];
+ unsigned char *out_payload = (unsigned char *) output_items[PORT_PAYLOAD];
int nread = 0;
int trigger_offset = 0;
@@ -180,6 +216,7 @@ namespace gr {
return 0;
case STATE_HEADER_RX_FAIL:
+ update_special_tags(0, 1);
consume_each (1);
in += d_itemsize;
nread++;
@@ -189,40 +226,52 @@ namespace gr {
case STATE_FIND_TRIGGER:
trigger_offset = find_trigger_signal(nread, noutput_items, input_items);
if (trigger_offset == -1) {
+ update_special_tags(0, noutput_items - nread);
consume_each(noutput_items - nread);
break;
}
+ update_special_tags(0, trigger_offset);
consume_each (trigger_offset);
in += trigger_offset * d_itemsize;
d_state = STATE_HEADER;
// Fall through
-
case STATE_HEADER:
if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) {
- copy_n_symbols(in, out_header, 0, d_header_len);
+ copy_n_symbols(in, out_header, PORT_HEADER, d_header_len);
d_state = STATE_WAIT_FOR_MSG;
- produce(0, d_header_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ add_special_tags();
+ produce(
+ PORT_HEADER,
+ d_header_len * (d_output_symbols ? 1 : d_items_per_symbol)
+ );
}
break;
case STATE_HEADER_RX_SUCCESS:
for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
- add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]);
+ add_item_tag(
+ PORT_PAYLOAD,
+ nitems_written(PORT_PAYLOAD),
+ d_payload_tag_keys[i],
+ d_payload_tag_values[i]
+ );
}
- consume_each (d_header_len * (d_items_per_symbol + d_gi));
- in += d_header_len * (d_items_per_symbol + d_gi) * d_itemsize;
nread += d_header_len * (d_items_per_symbol + d_gi);
+ update_special_tags(0, nread);
+ consume_each (nread);
+ in += nread * d_itemsize;
d_state = STATE_PAYLOAD;
// Fall through
case STATE_PAYLOAD:
if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) {
- copy_n_symbols(in, out_payload, 1, d_curr_payload_len);
- produce(1, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ update_special_tags(0, d_curr_payload_len * (d_items_per_symbol + d_gi));
+ copy_n_symbols(in, out_payload, PORT_PAYLOAD, d_curr_payload_len);
+ produce(PORT_PAYLOAD, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
consume_each (d_curr_payload_len * (d_items_per_symbol + d_gi));
- d_state = STATE_FIND_TRIGGER;
set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi));
+ d_state = STATE_FIND_TRIGGER;
}
break;
@@ -251,11 +300,11 @@ namespace gr {
}
if (d_uses_trigger_tag) {
std::vector<tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items, d_trigger_tag_key);
uint64_t min_offset = ULLONG_MAX;
int tag_index = -1;
for (unsigned i = 0; i < tags.size(); i++) {
- if (tags[i].key == d_trigger_tag_key && tags[i].offset < min_offset) {
+ if (tags[i].offset < min_offset) {
tag_index = (int) i;
min_offset = tags[i].offset;
}
@@ -266,7 +315,7 @@ namespace gr {
}
}
return -1;
- }
+ } /* find_trigger_signal() */
void
@@ -310,7 +359,7 @@ namespace gr {
set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
}
}
- }
+ } /* parse_header_data_msg() */
void
@@ -359,7 +408,70 @@ namespace gr {
tags[t].value
);
}
- }
+ } /* copy_n_symbols() */
+
+ void
+ header_payload_demux_impl::update_special_tags(
+ int range_start,
+ int range_end
+ ){
+ if (d_track_time) {
+ std::vector<tag_t> tags;
+ get_tags_in_range(tags, 0,
+ nitems_read(0) + range_start,
+ nitems_read(0) + range_end,
+ d_timing_key
+ );
+ for (unsigned t = 0; t < tags.size(); t++) {
+ if(tags[t].offset >= d_last_time_offset) {
+ d_last_time = tags[t].value;
+ d_last_time_offset = tags[t].offset;
+ }
+ }
+ }
+
+ std::vector<tag_t> tags;
+ for (unsigned i = 0; i < d_special_tags.size(); i++) {
+ uint64_t offset = 0;
+ // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually
+ get_tags_in_range(tags, 0,
+ nitems_read(0) + range_start,
+ nitems_read(0) + range_end,
+ d_special_tags[i]
+ );
+ for (unsigned t = 0; t < tags.size(); t++) {
+ if(tags[t].offset >= offset) {
+ d_special_tags_last_value[i] = tags[t].value;
+ offset = tags[t].offset;
+ }
+ }
+ }
+ } /* update_special_tags() */
+
+ void
+ header_payload_demux_impl::add_special_tags(
+ ){
+ if (d_track_time) {
+ add_item_tag(
+ PORT_HEADER,
+ nitems_written(PORT_HEADER),
+ d_timing_key,
+ _update_pmt_time(
+ d_last_time,
+ d_sampling_time * (nitems_read(0) - d_last_time_offset)
+ )
+ );
+ }
+
+ for (unsigned i = 0; i < d_special_tags.size(); i++) {
+ add_item_tag(
+ PORT_HEADER,
+ nitems_written(PORT_HEADER),
+ d_special_tags[i],
+ d_special_tags_last_value[i]
+ );
+ }
+ } /* add_special_tags() */
} /* namespace digital */
} /* namespace gr */