diff options
Diffstat (limited to 'gr-network/lib/udp_sink_impl.cc')
-rw-r--r-- | gr-network/lib/udp_sink_impl.cc | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/gr-network/lib/udp_sink_impl.cc b/gr-network/lib/udp_sink_impl.cc new file mode 100644 index 0000000000..5c2f3d1a75 --- /dev/null +++ b/gr-network/lib/udp_sink_impl.cc @@ -0,0 +1,270 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "udp_sink_impl.h" +#include <gnuradio/io_signature.h> +#include <boost/array.hpp> + +namespace gr { +namespace network { + +udp_sink::sptr udp_sink::make(size_t itemsize, + size_t veclen, + const std::string& host, + int port, + int header_type, + int payloadsize, + bool send_eof) +{ + return gnuradio::get_initial_sptr(new udp_sink_impl( + itemsize, veclen, host, port, header_type, payloadsize, send_eof)); +} + +/* + * The private constructor + */ +udp_sink_impl::udp_sink_impl(size_t itemsize, + size_t veclen, + const std::string& host, + int port, + int header_type, + int payloadsize, + bool send_eof) + : gr::sync_block("udp_sink", + gr::io_signature::make(1, 1, itemsize * veclen), + gr::io_signature::make(0, 0, 0)), + d_itemsize(itemsize), + d_veclen(veclen), + d_header_type(header_type), + d_header_size(0), + d_seq_num(0), + d_payloadsize(payloadsize), + b_send_eof(send_eof) +{ + // Lets set up the max payload size for the UDP packet based on the requested + // payload size. Some important notes: For a standard IP/UDP packet, say + // crossing the Internet with a standard MTU, 1472 is the max UDP payload + // size. Larger values can be sent, however the IP stack will fragment the + // packet. This can cause additional network overhead as the packet gets + // reassembled. Now for local nets that support jumbo frames, the max payload + // size is 8972 (9000-the UDP 28-byte header) Same rules apply with + // fragmentation. + + d_port = port; + + d_header_size = 0; + + switch (d_header_type) { + case HEADERTYPE_SEQNUM: + d_header_size = sizeof(header_seq_num); + break; + + case HEADERTYPE_SEQPLUSSIZE: + d_header_size = sizeof(header_seq_plus_size); + break; + + case HEADERTYPE_NONE: + d_header_size = 0; + break; + + default: + GR_LOG_ERROR(d_logger, "Unknown header type."); + throw std::invalid_argument("Unknown UDP header type."); + break; + } + + if (d_payloadsize < 8) { + GR_LOG_ERROR(d_logger, + "Payload size is too small. Must be at " + "least 8 bytes once header/trailer adjustments are made."); + throw std::invalid_argument( + "Payload size is too small. Must be at " + "least 8 bytes once header/trailer adjustments are made."); + } + + d_seq_num = 0; + + d_block_size = d_itemsize * d_veclen; + + d_precomp_datasize = d_payloadsize - d_header_size; + d_precomp_data_overitemsize = d_precomp_datasize / d_itemsize; + + d_localbuffer = new char[d_payloadsize]; + + long max_circ_buffer; + + // Let's keep it from getting too big + if (d_payloadsize < 2000) { + max_circ_buffer = d_payloadsize * 4000; + } else { + if (d_payloadsize < 5000) + max_circ_buffer = d_payloadsize * 2000; + else + max_circ_buffer = d_payloadsize * 1500; + } + + d_localqueue = new boost::circular_buffer<char>(max_circ_buffer); + + d_udpsocket = new boost::asio::ip::udp::socket(d_io_service); + + std::string s_port = (boost::format("%d") % port).str(); + std::string s_host = host.empty() ? std::string("localhost") : host; + boost::asio::ip::udp::resolver resolver(d_io_service); + boost::asio::ip::udp::resolver::query query( + s_host, s_port, boost::asio::ip::resolver_query_base::passive); + + boost::system::error_code err; + d_endpoint = *resolver.resolve(query, err); + + if (err) { + throw std::runtime_error(std::string("[UDP Sink] Unable to resolve host/IP: ") + + err.message()); + } + + if (host.find(":") != std::string::npos) + is_ipv6 = true; + else { + // This block supports a check that a name rather than an IP is provided. + // the endpoint is then checked after the resolver is done. + if (d_endpoint.address().is_v6()) + is_ipv6 = true; + else + is_ipv6 = false; + } + + if (is_ipv6) { + d_udpsocket->open(boost::asio::ip::udp::v6()); + } else { + d_udpsocket->open(boost::asio::ip::udp::v4()); + } + + int out_multiple = (d_payloadsize - d_header_size) / d_block_size; + + if (out_multiple == 1) + out_multiple = 2; // Ensure we get pairs, for instance complex -> ichar pairs + + gr::block::set_output_multiple(out_multiple); +} + +/* + * Our virtual destructor. + */ +udp_sink_impl::~udp_sink_impl() { stop(); } + +bool udp_sink_impl::stop() +{ + if (d_udpsocket) { + gr::thread::scoped_lock guard(d_setlock); + + if (b_send_eof) { + // Send a few zero-length packets to signal receiver we are done + boost::array<char, 0> send_buf; + for (int i = 0; i < 3; i++) + d_udpsocket->send_to(boost::asio::buffer(send_buf), d_endpoint); + } + + d_udpsocket->close(); + d_udpsocket = NULL; + + d_io_service.reset(); + d_io_service.stop(); + } + + if (d_localbuffer) { + delete[] d_localbuffer; + d_localbuffer = NULL; + } + + if (d_localqueue) { + delete d_localqueue; + d_localqueue = NULL; + } + + return true; +} + +void udp_sink_impl::build_header() +{ + switch (d_header_type) { + case HEADERTYPE_SEQNUM: { + d_seq_num++; + header_seq_num seq_header; + seq_header.seqnum = d_seq_num; + memcpy((void*)d_tmpheaderbuff, (void*)&seq_header, d_header_size); + } break; + + case HEADERTYPE_SEQPLUSSIZE: { + d_seq_num++; + header_seq_plus_size seq_header_plus_size; + seq_header_plus_size.seqnum = d_seq_num; + seq_header_plus_size.length = d_payloadsize; + memcpy((void*)d_tmpheaderbuff, (void*)&seq_header_plus_size, d_header_size); + } break; + } +} + +int udp_sink_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + gr::thread::scoped_lock guard(d_setlock); + + long num_bytes_to_transmit = noutput_items * d_block_size; + const char* in = (const char*)input_items[0]; + + // Build a long local queue to pull from so we can break it up easier + for (int i = 0; i < num_bytes_to_transmit; i++) { + d_localqueue->push_back(in[i]); + } + + // Local boost buffer for transmitting + std::vector<boost::asio::const_buffer> transmitbuffer; + + // Let's see how many blocks are in the buffer + int bytes_available = d_localqueue->size(); + long blocks_available = bytes_available / d_precomp_datasize; + + for (int cur_block = 0; cur_block < blocks_available; cur_block++) { + // Clear the next transmit buffer + transmitbuffer.clear(); + + // build our next header if we need it + if (d_header_type != HEADERTYPE_NONE) { + build_header(); + + transmitbuffer.push_back( + boost::asio::buffer((const void*)d_tmpheaderbuff, d_header_size)); + } + + // Fill the data buffer + for (int i = 0; i < d_precomp_datasize; i++) { + d_localbuffer[i] = d_localqueue->at(0); + d_localqueue->pop_front(); + } + + // Set up for transmit + transmitbuffer.push_back( + boost::asio::buffer((const void*)d_localbuffer, d_precomp_datasize)); + + // Send + d_udpsocket->send_to(transmitbuffer, d_endpoint); + } + + int itemsreturned = blocks_available * d_precomp_data_overitemsize; + + return itemsreturned; +} + +} /* namespace network */ +} /* namespace gr */ |