diff options
Diffstat (limited to 'gr-network/lib/udp_source_impl.cc')
-rw-r--r-- | gr-network/lib/udp_source_impl.cc | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/gr-network/lib/udp_source_impl.cc b/gr-network/lib/udp_source_impl.cc new file mode 100644 index 0000000000..4758b9e219 --- /dev/null +++ b/gr-network/lib/udp_source_impl.cc @@ -0,0 +1,375 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "udp_source_impl.h" +#include <gnuradio/io_signature.h> +#include <sstream> + +namespace gr { +namespace network { + +udp_source::sptr udp_source::make(size_t itemsize, + size_t veclen, + int port, + int header_type, + int payloadsize, + bool notify_missed, + bool source_zeros, + bool ipv6) +{ + return gnuradio::get_initial_sptr(new udp_source_impl(itemsize, + veclen, + port, + header_type, + payloadsize, + notify_missed, + source_zeros, + ipv6)); +} + +/* + * The private constructor + */ +udp_source_impl::udp_source_impl(size_t itemsize, + size_t veclen, + int port, + int header_type, + int payloadsize, + bool notify_missed, + bool source_zeros, + bool ipv6) + : gr::sync_block("udp_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(1, 1, itemsize * veclen)), + is_ipv6(ipv6), + d_itemsize(itemsize), + d_veclen(veclen), + d_port(port), + d_notify_missed(notify_missed), + d_source_zeros(source_zeros), + d_header_type(header_type), + d_payloadsize(payloadsize), + d_seq_num(0), + d_header_size(0), + d_partial_frame_counter(0) +{ + d_block_size = d_itemsize * d_veclen; + + switch (d_header_type) { + case HEADERTYPE_SEQNUM: + d_header_size = sizeof(header_seq_num); + break; + + case HEADERTYPE_SEQPLUSSIZE: + d_header_size = sizeof(header_seq_plus_size); + break; + + case HEADERTYPE_OLDATA: + d_header_size = sizeof(ata_header); + break; + + case HEADERTYPE_NONE: + d_header_size = 0; + break; + + default: + GR_LOG_ERROR(d_logger, "Unknown UDP header type."); + throw std::invalid_argument("Unknown UDP header type."); + break; + } + + if (d_payloadsize < 8) { + GR_LOG_ERROR(d_logger, + "Payload size is too small. Must be at " + "least 8 bytes once header/trailer adjustments are made."); + + throw std::invalid_argument( + "Payload size is too small. Must be at " + "least 8 bytes once header/trailer adjustments are made."); + } + + d_precomp_data_size = d_payloadsize - d_header_size; + d_precomp_data_over_item_size = d_precomp_data_size / d_itemsize; + + d_local_buffer = new char[d_payloadsize]; + long max_circ_buffer; + + // Let's keep it from getting too big + if (d_payloadsize < 2000) { + max_circ_buffer = d_payloadsize * 4000; + } else { + if (d_payloadsize < 5000) + max_circ_buffer = d_payloadsize * 2000; + else + max_circ_buffer = d_payloadsize * 1500; + } + + d_localqueue = new boost::circular_buffer<char>(max_circ_buffer); + + if (is_ipv6) + d_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v6(), port); + else + d_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port); + + try { + d_udpsocket = new boost::asio::ip::udp::socket(d_io_service, d_endpoint); + } catch (const std::exception& ex) { + throw std::runtime_error(std::string("[UDP Source] Error occurred: ") + + ex.what()); + } + + int out_multiple = (d_payloadsize - d_header_size) / d_block_size; + + if (out_multiple == 1) + out_multiple = 2; // Ensure we get pairs, for instance complex -> ichar pairs + + gr::block::set_output_multiple(out_multiple); + + std::stringstream msg_stream; + msg_stream << "Listening for data on UDP port " << port << "."; + GR_LOG_INFO(d_logger, msg_stream.str()); +} + +/* + * Our virtual destructor. + */ +udp_source_impl::~udp_source_impl() { stop(); } + +bool udp_source_impl::stop() +{ + if (d_udpsocket) { + d_udpsocket->close(); + + d_udpsocket = NULL; + + d_io_service.reset(); + d_io_service.stop(); + } + + if (d_local_buffer) { + delete[] d_local_buffer; + d_local_buffer = NULL; + } + + if (d_localqueue) { + delete d_localqueue; + d_localqueue = NULL; + } + return true; +} + +size_t udp_source_impl::data_available() +{ + // Get amount of data available + boost::asio::socket_base::bytes_readable command(true); + d_udpsocket->io_control(command); + size_t bytes_readable = command.get(); + + return (bytes_readable + d_localqueue->size()); +} + +size_t udp_source_impl::netdata_available() +{ + // Get amount of data available + boost::asio::socket_base::bytes_readable command(true); + d_udpsocket->io_control(command); + size_t bytes_readable = command.get(); + + return bytes_readable; +} + +uint64_t udp_source_impl::get_header_seqnum() +{ + uint64_t retVal = 0; + + switch (d_header_type) { + case HEADERTYPE_SEQNUM: { + retVal = ((header_seq_num*)d_local_buffer)->seqnum; + } break; + + case HEADERTYPE_SEQPLUSSIZE: { + retVal = ((header_seq_plus_size*)d_local_buffer)->seqnum; + } break; + + case HEADERTYPE_OLDATA: { + retVal = ((ata_header*)d_local_buffer)->seq; + } break; + } + + return retVal; +} + +int udp_source_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + gr::thread::scoped_lock guard(d_setlock); + + static bool first_time = true; + static int underrun_counter = 0; + + int bytes_available = netdata_available(); + char* out = (char*)output_items[0]; + unsigned int num_requested = noutput_items * d_block_size; + + // quick exit if nothing to do + if ((bytes_available == 0) && (d_localqueue->size() == 0)) { + underrun_counter++; + d_partial_frame_counter = 0; + if (d_source_zeros) { + // Just return 0's + memset((void*)out, 0x00, num_requested); // num_requested will be in bytes + return noutput_items; + } else { + if (underrun_counter == 0) { + if (!first_time) { + std::cout << "nU"; + } else + first_time = false; + } else { + if (underrun_counter > 100) + underrun_counter = 0; + } + + return 0; + } + } + + int bytes_read; + + // we could get here even if no data was received but there's still data in + // the queue. however read blocks so we want to make sure we have data before + // we call it. + if (bytes_available > 0) { + boost::asio::streambuf::mutable_buffers_type buf = + d_read_buffer.prepare(bytes_available); + // http://stackoverflow.com/questions/28929699/boostasio-read-n-bytes-from-socket-to-streambuf + bytes_read = d_udpsocket->receive_from(buf, d_endpoint); + + if (bytes_read > 0) { + d_read_buffer.commit(bytes_read); + + // Get the data and add it to our local queue. We have to maintain a + // local queue in case we read more bytes than noutput_items is asking + // for. In that case we'll only return noutput_items bytes + const char* read_data = + boost::asio::buffer_cast<const char*>(d_read_buffer.data()); + for (int i = 0; i < bytes_read; i++) { + d_localqueue->push_back(read_data[i]); + } + d_read_buffer.consume(bytes_read); + } + } + + if (d_localqueue->size() < d_payloadsize) { + // since we should be getting these in UDP packet blocks matched on the + // sender/receiver, this should be a fringe case, or a case where another + // app is sourcing the packets. + d_partial_frame_counter++; + + if (d_partial_frame_counter >= 100) { + std::stringstream msg_stream; + msg_stream << "Insufficient block data. Check your sending " + "app is using " + << d_payloadsize << " send blocks."; + GR_LOG_WARN(d_logger, msg_stream.str()); + + // This is just a safety to clear in the case there's a hanging partial + // packet. If we've lingered through a number of calls and we still don't + // have any data, clear the stale data. + while (d_localqueue->size() > 0) + d_localqueue->pop_front(); + + d_partial_frame_counter = 0; + } + return 0; // Don't memset 0x00 since we're starting to get data. In this + // case we'll hold for the rest. + } + + // If we're here, it's not a partial hanging frame + d_partial_frame_counter = 0; + + // Now if we're here we should have at least 1 block. + + // let's figure out how much we have in relation to noutput_items, accounting + // for headers + + // Number of data-only blocks requested (set_output_multiple() should make + // sure this is an integer multiple) + long blocks_requested = noutput_items / d_precomp_data_over_item_size; + // Number of blocks available accounting for the header as well. + long blocks_available = d_localqueue->size() / (d_payloadsize); + long blocks_retrieved; + int itemsreturned; + + if (blocks_requested <= blocks_available) + blocks_retrieved = blocks_requested; + else + blocks_retrieved = blocks_available; + + // items returned is going to match the payload (actual data) of the number of + // blocks. + itemsreturned = blocks_retrieved * d_precomp_data_over_item_size; + + // We're going to have to read the data out in blocks, account for the header, + // then just move the data part into the out[] array. + + char* data_ptr; + data_ptr = &d_local_buffer[d_header_size]; + int out_index = 0; + int skipped_packets = 0; + + for (int cur_pkt = 0; cur_pkt < blocks_retrieved; cur_pkt++) { + // Move a packet to our local buffer + for (int cur_byte = 0; cur_byte < d_payloadsize; cur_byte++) { + d_local_buffer[cur_byte] = d_localqueue->at(0); + d_localqueue->pop_front(); + } + + // Interpret the header if present + if (d_header_type != HEADERTYPE_NONE) { + uint64_t pkt_seq_num = get_header_seqnum(); + + if (d_seq_num > 0) { // d_seq_num will be 0 when this block starts + if (pkt_seq_num > d_seq_num) { + // Ideally pkt_seq_num = d_seq_num + 1. Therefore this should do += 0 + // when no packets are dropped. + skipped_packets += pkt_seq_num - d_seq_num - 1; + } + + // Store as current for next pass. + d_seq_num = pkt_seq_num; + } else { + // just starting. Prime it for no loss on the first packet. + d_seq_num = pkt_seq_num; + } + } + + // Move the data to the output buffer and increment the out index + memcpy(&out[out_index], data_ptr, d_precomp_data_size); + out_index = out_index + d_precomp_data_size; + } + + if (skipped_packets > 0 && d_notify_missed) { + std::stringstream msg_stream; + msg_stream << "[UDP source:" << d_port + << "] missed packets: " << skipped_packets; + GR_LOG_WARN(d_logger, msg_stream.str()); + } + + // If we had less data than requested, it'll be reflected in the return value. + return itemsreturned; +} +} /* namespace network */ +} /* namespace gr */ |