summaryrefslogtreecommitdiff
path: root/gr-network/lib/udp_source_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-network/lib/udp_source_impl.cc')
-rw-r--r--gr-network/lib/udp_source_impl.cc375
1 files changed, 375 insertions, 0 deletions
diff --git a/gr-network/lib/udp_source_impl.cc b/gr-network/lib/udp_source_impl.cc
new file mode 100644
index 0000000000..4758b9e219
--- /dev/null
+++ b/gr-network/lib/udp_source_impl.cc
@@ -0,0 +1,375 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "udp_source_impl.h"
+#include <gnuradio/io_signature.h>
+#include <sstream>
+
+namespace gr {
+namespace network {
+
+udp_source::sptr udp_source::make(size_t itemsize,
+ size_t veclen,
+ int port,
+ int header_type,
+ int payloadsize,
+ bool notify_missed,
+ bool source_zeros,
+ bool ipv6)
+{
+ return gnuradio::get_initial_sptr(new udp_source_impl(itemsize,
+ veclen,
+ port,
+ header_type,
+ payloadsize,
+ notify_missed,
+ source_zeros,
+ ipv6));
+}
+
+/*
+ * The private constructor
+ */
+udp_source_impl::udp_source_impl(size_t itemsize,
+ size_t veclen,
+ int port,
+ int header_type,
+ int payloadsize,
+ bool notify_missed,
+ bool source_zeros,
+ bool ipv6)
+ : gr::sync_block("udp_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(1, 1, itemsize * veclen)),
+ is_ipv6(ipv6),
+ d_itemsize(itemsize),
+ d_veclen(veclen),
+ d_port(port),
+ d_notify_missed(notify_missed),
+ d_source_zeros(source_zeros),
+ d_header_type(header_type),
+ d_payloadsize(payloadsize),
+ d_seq_num(0),
+ d_header_size(0),
+ d_partial_frame_counter(0)
+{
+ d_block_size = d_itemsize * d_veclen;
+
+ switch (d_header_type) {
+ case HEADERTYPE_SEQNUM:
+ d_header_size = sizeof(header_seq_num);
+ break;
+
+ case HEADERTYPE_SEQPLUSSIZE:
+ d_header_size = sizeof(header_seq_plus_size);
+ break;
+
+ case HEADERTYPE_OLDATA:
+ d_header_size = sizeof(ata_header);
+ break;
+
+ case HEADERTYPE_NONE:
+ d_header_size = 0;
+ break;
+
+ default:
+ GR_LOG_ERROR(d_logger, "Unknown UDP header type.");
+ throw std::invalid_argument("Unknown UDP header type.");
+ break;
+ }
+
+ if (d_payloadsize < 8) {
+ GR_LOG_ERROR(d_logger,
+ "Payload size is too small. Must be at "
+ "least 8 bytes once header/trailer adjustments are made.");
+
+ throw std::invalid_argument(
+ "Payload size is too small. Must be at "
+ "least 8 bytes once header/trailer adjustments are made.");
+ }
+
+ d_precomp_data_size = d_payloadsize - d_header_size;
+ d_precomp_data_over_item_size = d_precomp_data_size / d_itemsize;
+
+ d_local_buffer = new char[d_payloadsize];
+ long max_circ_buffer;
+
+ // Let's keep it from getting too big
+ if (d_payloadsize < 2000) {
+ max_circ_buffer = d_payloadsize * 4000;
+ } else {
+ if (d_payloadsize < 5000)
+ max_circ_buffer = d_payloadsize * 2000;
+ else
+ max_circ_buffer = d_payloadsize * 1500;
+ }
+
+ d_localqueue = new boost::circular_buffer<char>(max_circ_buffer);
+
+ if (is_ipv6)
+ d_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v6(), port);
+ else
+ d_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port);
+
+ try {
+ d_udpsocket = new boost::asio::ip::udp::socket(d_io_service, d_endpoint);
+ } catch (const std::exception& ex) {
+ throw std::runtime_error(std::string("[UDP Source] Error occurred: ") +
+ ex.what());
+ }
+
+ int out_multiple = (d_payloadsize - d_header_size) / d_block_size;
+
+ if (out_multiple == 1)
+ out_multiple = 2; // Ensure we get pairs, for instance complex -> ichar pairs
+
+ gr::block::set_output_multiple(out_multiple);
+
+ std::stringstream msg_stream;
+ msg_stream << "Listening for data on UDP port " << port << ".";
+ GR_LOG_INFO(d_logger, msg_stream.str());
+}
+
+/*
+ * Our virtual destructor.
+ */
+udp_source_impl::~udp_source_impl() { stop(); }
+
+bool udp_source_impl::stop()
+{
+ if (d_udpsocket) {
+ d_udpsocket->close();
+
+ d_udpsocket = NULL;
+
+ d_io_service.reset();
+ d_io_service.stop();
+ }
+
+ if (d_local_buffer) {
+ delete[] d_local_buffer;
+ d_local_buffer = NULL;
+ }
+
+ if (d_localqueue) {
+ delete d_localqueue;
+ d_localqueue = NULL;
+ }
+ return true;
+}
+
+size_t udp_source_impl::data_available()
+{
+ // Get amount of data available
+ boost::asio::socket_base::bytes_readable command(true);
+ d_udpsocket->io_control(command);
+ size_t bytes_readable = command.get();
+
+ return (bytes_readable + d_localqueue->size());
+}
+
+size_t udp_source_impl::netdata_available()
+{
+ // Get amount of data available
+ boost::asio::socket_base::bytes_readable command(true);
+ d_udpsocket->io_control(command);
+ size_t bytes_readable = command.get();
+
+ return bytes_readable;
+}
+
+uint64_t udp_source_impl::get_header_seqnum()
+{
+ uint64_t retVal = 0;
+
+ switch (d_header_type) {
+ case HEADERTYPE_SEQNUM: {
+ retVal = ((header_seq_num*)d_local_buffer)->seqnum;
+ } break;
+
+ case HEADERTYPE_SEQPLUSSIZE: {
+ retVal = ((header_seq_plus_size*)d_local_buffer)->seqnum;
+ } break;
+
+ case HEADERTYPE_OLDATA: {
+ retVal = ((ata_header*)d_local_buffer)->seq;
+ } break;
+ }
+
+ return retVal;
+}
+
+int udp_source_impl::work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
+ gr::thread::scoped_lock guard(d_setlock);
+
+ static bool first_time = true;
+ static int underrun_counter = 0;
+
+ int bytes_available = netdata_available();
+ char* out = (char*)output_items[0];
+ unsigned int num_requested = noutput_items * d_block_size;
+
+ // quick exit if nothing to do
+ if ((bytes_available == 0) && (d_localqueue->size() == 0)) {
+ underrun_counter++;
+ d_partial_frame_counter = 0;
+ if (d_source_zeros) {
+ // Just return 0's
+ memset((void*)out, 0x00, num_requested); // num_requested will be in bytes
+ return noutput_items;
+ } else {
+ if (underrun_counter == 0) {
+ if (!first_time) {
+ std::cout << "nU";
+ } else
+ first_time = false;
+ } else {
+ if (underrun_counter > 100)
+ underrun_counter = 0;
+ }
+
+ return 0;
+ }
+ }
+
+ int bytes_read;
+
+ // we could get here even if no data was received but there's still data in
+ // the queue. however read blocks so we want to make sure we have data before
+ // we call it.
+ if (bytes_available > 0) {
+ boost::asio::streambuf::mutable_buffers_type buf =
+ d_read_buffer.prepare(bytes_available);
+ // http://stackoverflow.com/questions/28929699/boostasio-read-n-bytes-from-socket-to-streambuf
+ bytes_read = d_udpsocket->receive_from(buf, d_endpoint);
+
+ if (bytes_read > 0) {
+ d_read_buffer.commit(bytes_read);
+
+ // Get the data and add it to our local queue. We have to maintain a
+ // local queue in case we read more bytes than noutput_items is asking
+ // for. In that case we'll only return noutput_items bytes
+ const char* read_data =
+ boost::asio::buffer_cast<const char*>(d_read_buffer.data());
+ for (int i = 0; i < bytes_read; i++) {
+ d_localqueue->push_back(read_data[i]);
+ }
+ d_read_buffer.consume(bytes_read);
+ }
+ }
+
+ if (d_localqueue->size() < d_payloadsize) {
+ // since we should be getting these in UDP packet blocks matched on the
+ // sender/receiver, this should be a fringe case, or a case where another
+ // app is sourcing the packets.
+ d_partial_frame_counter++;
+
+ if (d_partial_frame_counter >= 100) {
+ std::stringstream msg_stream;
+ msg_stream << "Insufficient block data. Check your sending "
+ "app is using "
+ << d_payloadsize << " send blocks.";
+ GR_LOG_WARN(d_logger, msg_stream.str());
+
+ // This is just a safety to clear in the case there's a hanging partial
+ // packet. If we've lingered through a number of calls and we still don't
+ // have any data, clear the stale data.
+ while (d_localqueue->size() > 0)
+ d_localqueue->pop_front();
+
+ d_partial_frame_counter = 0;
+ }
+ return 0; // Don't memset 0x00 since we're starting to get data. In this
+ // case we'll hold for the rest.
+ }
+
+ // If we're here, it's not a partial hanging frame
+ d_partial_frame_counter = 0;
+
+ // Now if we're here we should have at least 1 block.
+
+ // let's figure out how much we have in relation to noutput_items, accounting
+ // for headers
+
+ // Number of data-only blocks requested (set_output_multiple() should make
+ // sure this is an integer multiple)
+ long blocks_requested = noutput_items / d_precomp_data_over_item_size;
+ // Number of blocks available accounting for the header as well.
+ long blocks_available = d_localqueue->size() / (d_payloadsize);
+ long blocks_retrieved;
+ int itemsreturned;
+
+ if (blocks_requested <= blocks_available)
+ blocks_retrieved = blocks_requested;
+ else
+ blocks_retrieved = blocks_available;
+
+ // items returned is going to match the payload (actual data) of the number of
+ // blocks.
+ itemsreturned = blocks_retrieved * d_precomp_data_over_item_size;
+
+ // We're going to have to read the data out in blocks, account for the header,
+ // then just move the data part into the out[] array.
+
+ char* data_ptr;
+ data_ptr = &d_local_buffer[d_header_size];
+ int out_index = 0;
+ int skipped_packets = 0;
+
+ for (int cur_pkt = 0; cur_pkt < blocks_retrieved; cur_pkt++) {
+ // Move a packet to our local buffer
+ for (int cur_byte = 0; cur_byte < d_payloadsize; cur_byte++) {
+ d_local_buffer[cur_byte] = d_localqueue->at(0);
+ d_localqueue->pop_front();
+ }
+
+ // Interpret the header if present
+ if (d_header_type != HEADERTYPE_NONE) {
+ uint64_t pkt_seq_num = get_header_seqnum();
+
+ if (d_seq_num > 0) { // d_seq_num will be 0 when this block starts
+ if (pkt_seq_num > d_seq_num) {
+ // Ideally pkt_seq_num = d_seq_num + 1. Therefore this should do += 0
+ // when no packets are dropped.
+ skipped_packets += pkt_seq_num - d_seq_num - 1;
+ }
+
+ // Store as current for next pass.
+ d_seq_num = pkt_seq_num;
+ } else {
+ // just starting. Prime it for no loss on the first packet.
+ d_seq_num = pkt_seq_num;
+ }
+ }
+
+ // Move the data to the output buffer and increment the out index
+ memcpy(&out[out_index], data_ptr, d_precomp_data_size);
+ out_index = out_index + d_precomp_data_size;
+ }
+
+ if (skipped_packets > 0 && d_notify_missed) {
+ std::stringstream msg_stream;
+ msg_stream << "[UDP source:" << d_port
+ << "] missed packets: " << skipped_packets;
+ GR_LOG_WARN(d_logger, msg_stream.str());
+ }
+
+ // If we had less data than requested, it'll be reflected in the return value.
+ return itemsreturned;
+}
+} /* namespace network */
+} /* namespace gr */