/* -*- c++ -*- */ /* * Copyright 2020 Free Software Foundation, Inc. * * This file is part of GNU Radio * * SPDX-License-Identifier: GPL-3.0-or-later * */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "udp_source_impl.h" #include <gnuradio/io_signature.h> #include <sstream> namespace gr { namespace network { udp_source::sptr udp_source::make(size_t itemsize, size_t veclen, int port, int header_type, int payloadsize, bool notify_missed, bool source_zeros, bool ipv6) { return gnuradio::get_initial_sptr(new udp_source_impl(itemsize, veclen, port, header_type, payloadsize, notify_missed, source_zeros, ipv6)); } /* * The private constructor */ udp_source_impl::udp_source_impl(size_t itemsize, size_t veclen, int port, int header_type, int payloadsize, bool notify_missed, bool source_zeros, bool ipv6) : gr::sync_block("udp_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * veclen)), is_ipv6(ipv6), d_itemsize(itemsize), d_veclen(veclen), d_port(port), d_notify_missed(notify_missed), d_source_zeros(source_zeros), d_header_type(header_type), d_payloadsize(payloadsize), d_seq_num(0), d_header_size(0), d_partial_frame_counter(0) { d_block_size = d_itemsize * d_veclen; switch (d_header_type) { case HEADERTYPE_SEQNUM: d_header_size = sizeof(header_seq_num); break; case HEADERTYPE_SEQPLUSSIZE: d_header_size = sizeof(header_seq_plus_size); break; case HEADERTYPE_OLDATA: d_header_size = sizeof(ata_header); break; case HEADERTYPE_NONE: d_header_size = 0; break; default: GR_LOG_ERROR(d_logger, "Unknown UDP header type."); throw std::invalid_argument("Unknown UDP header type."); break; } if (d_payloadsize < 8) { GR_LOG_ERROR(d_logger, "Payload size is too small. Must be at " "least 8 bytes once header/trailer adjustments are made."); throw std::invalid_argument( "Payload size is too small. Must be at " "least 8 bytes once header/trailer adjustments are made."); } d_precomp_data_size = d_payloadsize - d_header_size; d_precomp_data_over_item_size = d_precomp_data_size / d_itemsize; d_local_buffer = new char[d_payloadsize]; long max_circ_buffer; // Let's keep it from getting too big if (d_payloadsize < 2000) { max_circ_buffer = d_payloadsize * 4000; } else { if (d_payloadsize < 5000) max_circ_buffer = d_payloadsize * 2000; else max_circ_buffer = d_payloadsize * 1500; } d_localqueue = new boost::circular_buffer<char>(max_circ_buffer); if (is_ipv6) d_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v6(), port); else d_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port); try { d_udpsocket = new boost::asio::ip::udp::socket(d_io_service, d_endpoint); } catch (const std::exception& ex) { throw std::runtime_error(std::string("[UDP Source] Error occurred: ") + ex.what()); } int out_multiple = (d_payloadsize - d_header_size) / d_block_size; if (out_multiple == 1) out_multiple = 2; // Ensure we get pairs, for instance complex -> ichar pairs gr::block::set_output_multiple(out_multiple); std::stringstream msg_stream; msg_stream << "Listening for data on UDP port " << port << "."; GR_LOG_INFO(d_logger, msg_stream.str()); } /* * Our virtual destructor. */ udp_source_impl::~udp_source_impl() { stop(); } bool udp_source_impl::stop() { if (d_udpsocket) { d_udpsocket->close(); d_udpsocket = NULL; d_io_service.reset(); d_io_service.stop(); } if (d_local_buffer) { delete[] d_local_buffer; d_local_buffer = NULL; } if (d_localqueue) { delete d_localqueue; d_localqueue = NULL; } return true; } size_t udp_source_impl::data_available() { // Get amount of data available boost::asio::socket_base::bytes_readable command(true); d_udpsocket->io_control(command); size_t bytes_readable = command.get(); return (bytes_readable + d_localqueue->size()); } size_t udp_source_impl::netdata_available() { // Get amount of data available boost::asio::socket_base::bytes_readable command(true); d_udpsocket->io_control(command); size_t bytes_readable = command.get(); return bytes_readable; } uint64_t udp_source_impl::get_header_seqnum() { uint64_t retVal = 0; switch (d_header_type) { case HEADERTYPE_SEQNUM: { retVal = ((header_seq_num*)d_local_buffer)->seqnum; } break; case HEADERTYPE_SEQPLUSSIZE: { retVal = ((header_seq_plus_size*)d_local_buffer)->seqnum; } break; case HEADERTYPE_OLDATA: { retVal = ((ata_header*)d_local_buffer)->seq; } break; } return retVal; } int udp_source_impl::work(int noutput_items, gr_vector_const_void_star& input_items, gr_vector_void_star& output_items) { gr::thread::scoped_lock guard(d_setlock); static bool first_time = true; static int underrun_counter = 0; int bytes_available = netdata_available(); char* out = (char*)output_items[0]; unsigned int num_requested = noutput_items * d_block_size; // quick exit if nothing to do if ((bytes_available == 0) && (d_localqueue->size() == 0)) { underrun_counter++; d_partial_frame_counter = 0; if (d_source_zeros) { // Just return 0's memset((void*)out, 0x00, num_requested); // num_requested will be in bytes return noutput_items; } else { if (underrun_counter == 0) { if (!first_time) { std::cout << "nU"; } else first_time = false; } else { if (underrun_counter > 100) underrun_counter = 0; } return 0; } } int bytes_read; // we could get here even if no data was received but there's still data in // the queue. however read blocks so we want to make sure we have data before // we call it. if (bytes_available > 0) { boost::asio::streambuf::mutable_buffers_type buf = d_read_buffer.prepare(bytes_available); // http://stackoverflow.com/questions/28929699/boostasio-read-n-bytes-from-socket-to-streambuf bytes_read = d_udpsocket->receive_from(buf, d_endpoint); if (bytes_read > 0) { d_read_buffer.commit(bytes_read); // Get the data and add it to our local queue. We have to maintain a // local queue in case we read more bytes than noutput_items is asking // for. In that case we'll only return noutput_items bytes const char* read_data = boost::asio::buffer_cast<const char*>(d_read_buffer.data()); for (int i = 0; i < bytes_read; i++) { d_localqueue->push_back(read_data[i]); } d_read_buffer.consume(bytes_read); } } if (d_localqueue->size() < d_payloadsize) { // since we should be getting these in UDP packet blocks matched on the // sender/receiver, this should be a fringe case, or a case where another // app is sourcing the packets. d_partial_frame_counter++; if (d_partial_frame_counter >= 100) { std::stringstream msg_stream; msg_stream << "Insufficient block data. Check your sending " "app is using " << d_payloadsize << " send blocks."; GR_LOG_WARN(d_logger, msg_stream.str()); // This is just a safety to clear in the case there's a hanging partial // packet. If we've lingered through a number of calls and we still don't // have any data, clear the stale data. while (d_localqueue->size() > 0) d_localqueue->pop_front(); d_partial_frame_counter = 0; } return 0; // Don't memset 0x00 since we're starting to get data. In this // case we'll hold for the rest. } // If we're here, it's not a partial hanging frame d_partial_frame_counter = 0; // Now if we're here we should have at least 1 block. // let's figure out how much we have in relation to noutput_items, accounting // for headers // Number of data-only blocks requested (set_output_multiple() should make // sure this is an integer multiple) long blocks_requested = noutput_items / d_precomp_data_over_item_size; // Number of blocks available accounting for the header as well. long blocks_available = d_localqueue->size() / (d_payloadsize); long blocks_retrieved; int itemsreturned; if (blocks_requested <= blocks_available) blocks_retrieved = blocks_requested; else blocks_retrieved = blocks_available; // items returned is going to match the payload (actual data) of the number of // blocks. itemsreturned = blocks_retrieved * d_precomp_data_over_item_size; // We're going to have to read the data out in blocks, account for the header, // then just move the data part into the out[] array. char* data_ptr; data_ptr = &d_local_buffer[d_header_size]; int out_index = 0; int skipped_packets = 0; for (int cur_pkt = 0; cur_pkt < blocks_retrieved; cur_pkt++) { // Move a packet to our local buffer for (int cur_byte = 0; cur_byte < d_payloadsize; cur_byte++) { d_local_buffer[cur_byte] = d_localqueue->at(0); d_localqueue->pop_front(); } // Interpret the header if present if (d_header_type != HEADERTYPE_NONE) { uint64_t pkt_seq_num = get_header_seqnum(); if (d_seq_num > 0) { // d_seq_num will be 0 when this block starts if (pkt_seq_num > d_seq_num) { // Ideally pkt_seq_num = d_seq_num + 1. Therefore this should do += 0 // when no packets are dropped. skipped_packets += pkt_seq_num - d_seq_num - 1; } // Store as current for next pass. d_seq_num = pkt_seq_num; } else { // just starting. Prime it for no loss on the first packet. d_seq_num = pkt_seq_num; } } // Move the data to the output buffer and increment the out index memcpy(&out[out_index], data_ptr, d_precomp_data_size); out_index = out_index + d_precomp_data_size; } if (skipped_packets > 0 && d_notify_missed) { std::stringstream msg_stream; msg_stream << "[UDP source:" << d_port << "] missed packets: " << skipped_packets; GR_LOG_WARN(d_logger, msg_stream.str()); } // If we had less data than requested, it'll be reflected in the return value. return itemsreturned; } } /* namespace network */ } /* namespace gr */