diff options
Diffstat (limited to 'gr-blocks/lib')
-rw-r--r-- | gr-blocks/lib/CMakeLists.txt | 3 | ||||
-rw-r--r-- | gr-blocks/lib/tcp_server_sink_impl.cc | 142 | ||||
-rw-r--r-- | gr-blocks/lib/tcp_server_sink_impl.h | 63 | ||||
-rw-r--r-- | gr-blocks/lib/udp_sink_impl.cc | 132 | ||||
-rw-r--r-- | gr-blocks/lib/udp_sink_impl.h | 53 | ||||
-rw-r--r-- | gr-blocks/lib/udp_source_impl.cc | 200 | ||||
-rw-r--r-- | gr-blocks/lib/udp_source_impl.h | 75 |
7 files changed, 0 insertions, 668 deletions
diff --git a/gr-blocks/lib/CMakeLists.txt b/gr-blocks/lib/CMakeLists.txt index 7957a0443f..cdffc6131c 100644 --- a/gr-blocks/lib/CMakeLists.txt +++ b/gr-blocks/lib/CMakeLists.txt @@ -145,14 +145,11 @@ set(BLOCKS_SOURCES threshold_ff_impl.cc throttle_impl.cc transcendental_impl.cc - tcp_server_sink_impl.cc tag_gate_impl.cc tagged_stream_align_impl.cc tagged_stream_mux_impl.cc uchar_array_to_float.cc uchar_to_float_impl.cc - udp_sink_impl.cc - udp_source_impl.cc unpack_k_bits_bb_impl.cc vco_f_impl.cc vco_c_impl.cc diff --git a/gr-blocks/lib/tcp_server_sink_impl.cc b/gr-blocks/lib/tcp_server_sink_impl.cc deleted file mode 100644 index 1b32f881a9..0000000000 --- a/gr-blocks/lib/tcp_server_sink_impl.cc +++ /dev/null @@ -1,142 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007-2010,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "tcp_server_sink_impl.h" -#include <gnuradio/io_signature.h> -#include <gnuradio/thread/thread.h> -#include <boost/array.hpp> -#include <boost/asio.hpp> -#include <boost/format.hpp> -#include <algorithm> -#include <cstdio> -#include <cstring> -#include <memory> -#include <stdexcept> - -namespace gr { -namespace blocks { - -tcp_server_sink::sptr -tcp_server_sink::make(size_t itemsize, const std::string& host, int port, bool noblock) -{ - return gnuradio::make_block_sptr<tcp_server_sink_impl>(itemsize, host, port, noblock); -} - -tcp_server_sink_impl::tcp_server_sink_impl(size_t itemsize, - const std::string& host, - int port, - bool noblock) - : sync_block("tcp_server_sink", - io_signature::make(1, 1, itemsize), - io_signature::make(0, 0, 0)), - d_itemsize(itemsize), - d_acceptor(d_io_service), - d_writing(0) -{ - std::string s__port = (boost::format("%d") % port).str(); - std::string s__host = host.empty() ? std::string("localhost") : host; - boost::asio::ip::tcp::resolver resolver(d_io_service); - boost::asio::ip::tcp::resolver::query query( - s__host, s__port, boost::asio::ip::resolver_query_base::passive); - d_endpoint = *resolver.resolve(query); - - d_acceptor.open(d_endpoint.protocol()); - d_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); - d_acceptor.bind(d_endpoint); - d_acceptor.listen(); - - if (!noblock) { - auto sock = std::make_unique<boost::asio::ip::tcp::socket>(d_io_service); - d_acceptor.accept(*sock, d_endpoint); - d_sockets.insert(std::move(sock)); - } - - d_socket = std::make_unique<boost::asio::ip::tcp::socket>(d_io_service); - d_acceptor.async_accept(*d_socket, - boost::bind(&tcp_server_sink_impl::do_accept, - this, - boost::asio::placeholders::error)); - d_io_serv_thread = - boost::thread(boost::bind(&boost::asio::io_service::run, &d_io_service)); -} - -void tcp_server_sink_impl::do_accept(const boost::system::error_code& error) -{ - if (!error) { - gr::thread::scoped_lock guard(d_writing_mut); - d_sockets.insert(std::move(d_socket)); - d_socket = std::make_unique<boost::asio::ip::tcp::socket>(d_io_service); - d_acceptor.async_accept(*d_socket, - boost::bind(&tcp_server_sink_impl::do_accept, - this, - boost::asio::placeholders::error)); - } -} - -void tcp_server_sink_impl::do_write( - const boost::system::error_code& error, - size_t len, - std::set<std::unique_ptr<boost::asio::ip::tcp::socket>>::iterator i) -{ - { - gr::thread::scoped_lock guard(d_writing_mut); - --d_writing; - if (error) { - d_sockets.erase(i); - } - } - d_writing_cond.notify_one(); -} - -tcp_server_sink_impl::~tcp_server_sink_impl() -{ - gr::thread::scoped_lock guard(d_writing_mut); - while (d_writing) { - d_writing_cond.wait(guard); - } - d_io_service.reset(); - d_io_service.stop(); - d_io_serv_thread.join(); -} - -int tcp_server_sink_impl::work(int noutput_items, - gr_vector_const_void_star& input_items, - gr_vector_void_star& output_items) -{ - const char* in = (const char*)input_items[0]; - - gr::thread::scoped_lock guard(d_writing_mut); - while (d_writing) { - d_writing_cond.wait(guard); - } - - size_t data_len = std::min(size_t(BUF_SIZE), noutput_items * d_itemsize); - data_len -= data_len % d_itemsize; - memcpy(d_buf.data(), in, data_len); - for (auto i = std::begin(d_sockets); i != std::end(d_sockets); ++i) { - boost::asio::async_write(**i, - boost::asio::buffer(d_buf.data(), data_len), - boost::bind(&tcp_server_sink_impl::do_write, - this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred, - i)); - } - d_writing = d_sockets.size(); - - return data_len / d_itemsize; -} - -} /* namespace blocks */ -} /* namespace gr */ diff --git a/gr-blocks/lib/tcp_server_sink_impl.h b/gr-blocks/lib/tcp_server_sink_impl.h deleted file mode 100644 index b7ff7af174..0000000000 --- a/gr-blocks/lib/tcp_server_sink_impl.h +++ /dev/null @@ -1,63 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2014 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifndef INCLUDED_GR_TCP_SERVER_SINK_IMPL_H -#define INCLUDED_GR_TCP_SERVER_SINK_IMPL_H - -#include <gnuradio/blocks/tcp_server_sink.h> -#include <boost/asio.hpp> -#include <boost/ptr_container/ptr_vector.hpp> -#include <set> - -namespace gr { -namespace blocks { - -class tcp_server_sink_impl : public tcp_server_sink -{ -private: - const size_t d_itemsize; - - boost::asio::io_service d_io_service; - gr::thread::thread d_io_serv_thread; - boost::asio::ip::tcp::endpoint d_endpoint; - std::unique_ptr<boost::asio::ip::tcp::socket> d_socket; - std::set<std::unique_ptr<boost::asio::ip::tcp::socket>> d_sockets; - boost::asio::ip::tcp::acceptor d_acceptor; - - enum { - BUF_SIZE = 256 * 1024, - }; - std::array<uint8_t, BUF_SIZE> d_buf; - - int d_writing; - boost::condition_variable d_writing_cond; - boost::mutex d_writing_mut; - - void do_accept(const boost::system::error_code& error); - void do_write(const boost::system::error_code& error, - std::size_t len, - std::set<std::unique_ptr<boost::asio::ip::tcp::socket>>::iterator); - -public: - tcp_server_sink_impl(size_t itemsize, - const std::string& host, - int port, - bool noblock); - ~tcp_server_sink_impl() override; - - int work(int noutput_items, - gr_vector_const_void_star& input_items, - gr_vector_void_star& output_items) override; -}; - -} /* namespace blocks */ -} /* namespace gr */ - -#endif /* INCLUDED_GR_TCP_SERVER_SINK_IMPL_H */ diff --git a/gr-blocks/lib/udp_sink_impl.cc b/gr-blocks/lib/udp_sink_impl.cc deleted file mode 100644 index 4391461dac..0000000000 --- a/gr-blocks/lib/udp_sink_impl.cc +++ /dev/null @@ -1,132 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007-2010,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "udp_sink_impl.h" -#include <gnuradio/io_signature.h> -#include <gnuradio/thread/thread.h> -#include <boost/array.hpp> -#include <boost/asio.hpp> -#include <boost/format.hpp> -#include <cstddef> -#include <cstring> -#include <memory> -#include <stdexcept> - -namespace gr { -namespace blocks { - -udp_sink::sptr udp_sink::make( - size_t itemsize, const std::string& host, int port, int payload_size, bool eof) -{ - return gnuradio::make_block_sptr<udp_sink_impl>( - itemsize, host, port, payload_size, eof); -} - -udp_sink_impl::udp_sink_impl( - size_t itemsize, const std::string& host, int port, int payload_size, bool eof) - : sync_block( - "udp_sink", io_signature::make(1, 1, itemsize), io_signature::make(0, 0, 0)), - d_itemsize(itemsize), - d_payload_size(payload_size), - d_eof(eof), - d_connected(false) -{ - // Get the destination address - connect(host, port); -} - -// public constructor that returns a shared_ptr -udp_sink_impl::~udp_sink_impl() -{ - if (d_connected) - disconnect(); -} - -void udp_sink_impl::connect(const std::string& host, int port) -{ - if (d_connected) - disconnect(); - - std::string s_port = (boost::format("%d") % port).str(); - if (!host.empty()) { - boost::asio::ip::udp::resolver resolver(d_io_service); - boost::asio::ip::udp::resolver::query query( - host, s_port, boost::asio::ip::resolver_query_base::passive); - d_endpoint = *resolver.resolve(query); - - d_socket = std::make_unique<boost::asio::ip::udp::socket>(d_io_service); - d_socket->open(d_endpoint.protocol()); - - boost::asio::socket_base::reuse_address roption(true); - d_socket->set_option(roption); - - d_connected = true; - } -} - -void udp_sink_impl::disconnect() -{ - if (!d_connected) - return; - - gr::thread::scoped_lock guard(d_mutex); // protect d_socket from work() - - // Send a few zero-length packets to signal receiver we are done - boost::array<char, 0> send_buf; - if (d_eof) { - int i; - for (i = 0; i < 3; i++) - d_socket->send_to(boost::asio::buffer(send_buf), d_endpoint); - } - - d_socket->close(); - d_socket.reset(); - - d_connected = false; -} - -int udp_sink_impl::work(int noutput_items, - gr_vector_const_void_star& input_items, - gr_vector_void_star& output_items) -{ - const char* in = (const char*)input_items[0]; - size_t r = 0; - std::ptrdiff_t bytes_sent = 0, bytes_to_send = 0; - const size_t total_size = noutput_items * d_itemsize; - - gr::thread::scoped_lock guard(d_mutex); // protect d_socket - - while (bytes_sent < static_cast<std::ptrdiff_t>(total_size)) { - bytes_to_send = std::min(static_cast<std::ptrdiff_t>(d_payload_size), - static_cast<std::ptrdiff_t>(total_size - bytes_sent)); - - if (d_connected) { - try { - r = d_socket->send_to( - boost::asio::buffer((void*)(in + bytes_sent), bytes_to_send), - d_endpoint); - } catch (std::exception& e) { - GR_LOG_ERROR(d_logger, boost::format("send error: %s") % e.what()); - return -1; - } - } else - r = bytes_to_send; // discarded for lack of connection - bytes_sent += r; - } - - return noutput_items; -} - -} /* namespace blocks */ -} /* namespace gr */ diff --git a/gr-blocks/lib/udp_sink_impl.h b/gr-blocks/lib/udp_sink_impl.h deleted file mode 100644 index 0912a8a6dc..0000000000 --- a/gr-blocks/lib/udp_sink_impl.h +++ /dev/null @@ -1,53 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007-2010,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifndef INCLUDED_GR_UDP_SINK_IMPL_H -#define INCLUDED_GR_UDP_SINK_IMPL_H - -#include <gnuradio/blocks/udp_sink.h> -#include <boost/asio.hpp> -#include <memory> - -namespace gr { -namespace blocks { - -class udp_sink_impl : public udp_sink -{ -private: - const size_t d_itemsize; - - const int d_payload_size; // maximum transmission unit (packet length) - const bool d_eof; // send zero-length packet on disconnect - bool d_connected; // are we connected? - gr::thread::mutex d_mutex; // protects d_socket and d_connected - - std::unique_ptr<boost::asio::ip::udp::socket> d_socket; // handle to socket - boost::asio::ip::udp::endpoint d_endpoint; - boost::asio::io_service d_io_service; - -public: - udp_sink_impl( - size_t itemsize, const std::string& host, int port, int payload_size, bool eof); - ~udp_sink_impl() override; - - int payload_size() override { return d_payload_size; } - - void connect(const std::string& host, int port) override; - void disconnect() override; - - int work(int noutput_items, - gr_vector_const_void_star& input_items, - gr_vector_void_star& output_items) override; -}; - -} /* namespace blocks */ -} /* namespace gr */ - -#endif /* INCLUDED_GR_UDP_SINK_IMPL_H */ diff --git a/gr-blocks/lib/udp_source_impl.cc b/gr-blocks/lib/udp_source_impl.cc deleted file mode 100644 index 11765de504..0000000000 --- a/gr-blocks/lib/udp_source_impl.cc +++ /dev/null @@ -1,200 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007-2010,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "udp_source_impl.h" -#include <gnuradio/io_signature.h> -#include <gnuradio/math.h> -#include <gnuradio/prefs.h> -#include <cstddef> -#include <cstring> -#include <memory> - -namespace gr { -namespace blocks { - -const int udp_source_impl::BUF_SIZE_PAYLOADS = - gr::prefs::singleton()->get_long("udp_blocks", "buf_size_payloads", 50); - -udp_source::sptr udp_source::make( - size_t itemsize, const std::string& ipaddr, int port, int payload_size, bool eof) -{ - return gnuradio::make_block_sptr<udp_source_impl>( - itemsize, ipaddr, port, payload_size, eof); -} - -udp_source_impl::udp_source_impl( - size_t itemsize, const std::string& host, int port, int payload_size, bool eof) - : sync_block( - "udp_source", io_signature::make(0, 0, 0), io_signature::make(1, 1, itemsize)), - d_itemsize(itemsize), - d_payload_size(payload_size), - d_eof(eof), - d_connected(false), - d_rxbuf(4 * payload_size), - d_residbuf(BUF_SIZE_PAYLOADS * payload_size), - d_residual(0), - d_sent(0) -{ - connect(host, port); -} - -udp_source_impl::~udp_source_impl() -{ - if (d_connected) - disconnect(); -} - -void udp_source_impl::connect(const std::string& host, int port) -{ - if (d_connected) - disconnect(); - - d_host = host; - d_port = static_cast<unsigned short>(port); - - std::string s_port; - s_port = (boost::format("%d") % d_port).str(); - - if (!host.empty()) { - boost::asio::ip::udp::resolver resolver(d_io_service); - boost::asio::ip::udp::resolver::query query( - d_host, s_port, boost::asio::ip::resolver_query_base::passive); - d_endpoint = *resolver.resolve(query); - - d_socket = std::make_unique<boost::asio::ip::udp::socket>(d_io_service); - d_socket->open(d_endpoint.protocol()); - - boost::asio::socket_base::reuse_address roption(true); - d_socket->set_option(roption); - - d_socket->bind(d_endpoint); - - start_receive(); - d_udp_thread = - gr::thread::thread(boost::bind(&udp_source_impl::run_io_service, this)); - d_connected = true; - } -} - -void udp_source_impl::disconnect() -{ - gr::thread::scoped_lock lock(d_setlock); - - if (!d_connected) - return; - - d_io_service.reset(); - d_io_service.stop(); - d_udp_thread.join(); - - d_socket->close(); - d_socket.reset(); - - d_connected = false; -} - -// Return port number of d_socket -int udp_source_impl::get_port(void) -{ - // return d_endpoint.port(); - return d_socket->local_endpoint().port(); -} - -void udp_source_impl::start_receive() -{ - d_socket->async_receive_from( - boost::asio::buffer((void*)d_rxbuf.data(), d_payload_size), - d_endpoint_rcvd, - boost::bind(&udp_source_impl::handle_read, - this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); -} - -void udp_source_impl::handle_read(const boost::system::error_code& error, - size_t bytes_transferred) -{ - if (!error) { - { - boost::lock_guard<gr::thread::mutex> lock(d_udp_mutex); - if (d_eof && (bytes_transferred == 0)) { - // If we are using EOF notification, test for it and don't - // add anything to the output. - d_residual = WORK_DONE; - d_cond_wait.notify_one(); - return; - } else { - // Make sure we never go beyond the boundary of the - // residual buffer. This will just drop the last bit of - // data in the buffer if we've run out of room. - if ((int)(d_residual + bytes_transferred) >= - (BUF_SIZE_PAYLOADS * d_payload_size)) { - GR_LOG_WARN(d_logger, "Too much data; dropping packet."); - } else { - // otherwise, copy received data into local buffer for - // copying later. - memcpy(d_residbuf.data() + d_residual, - d_rxbuf.data(), - bytes_transferred); - d_residual += bytes_transferred; - } - } - d_cond_wait.notify_one(); - } - } - start_receive(); -} - -int udp_source_impl::work(int noutput_items, - gr_vector_const_void_star& input_items, - gr_vector_void_star& output_items) -{ - gr::thread::scoped_lock l(d_setlock); - - char* out = (char*)output_items[0]; - - // Use async receive_from to get data from UDP buffer and wait - // on a conditional signal before proceeding. We use this - // because the conditional wait is interruptible while a - // synchronous receive_from is not. - boost::unique_lock<boost::mutex> lock(d_udp_mutex); - - // use timed_wait to avoid permanent blocking in the work function - d_cond_wait.timed_wait(lock, boost::posix_time::milliseconds(10)); - - if (d_residual < 0) { - return d_residual; - } - - const std::ptrdiff_t bytes_left_in_buffer = d_residual - d_sent; - auto bytes_to_send = - std::min<std::ptrdiff_t>(d_itemsize * noutput_items, bytes_left_in_buffer); - - // Copy the received data in the residual buffer to the output stream - memcpy(out, d_residbuf.data() + d_sent, bytes_to_send); - - // Keep track of where we are if we don't have enough output - // space to send all the data in the residbuf. - if (bytes_to_send == bytes_left_in_buffer) { - d_residual = 0; - d_sent = 0; - } else { - d_sent += bytes_to_send; - } - - return bytes_to_send / d_itemsize; -} - -} /* namespace blocks */ -} /* namespace gr */ diff --git a/gr-blocks/lib/udp_source_impl.h b/gr-blocks/lib/udp_source_impl.h deleted file mode 100644 index a2dc2dc9dc..0000000000 --- a/gr-blocks/lib/udp_source_impl.h +++ /dev/null @@ -1,75 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2007-2010,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifndef INCLUDED_GR_UDP_SOURCE_IMPL_H -#define INCLUDED_GR_UDP_SOURCE_IMPL_H - -#include <gnuradio/blocks/udp_source.h> -#include <gnuradio/thread/thread.h> -#include <boost/asio.hpp> -#include <boost/format.hpp> -#include <cstddef> -#include <memory> - -namespace gr { -namespace blocks { - -class udp_source_impl : public udp_source -{ -private: - const size_t d_itemsize; - int d_payload_size; // maximum transmission unit (packet length) - const bool d_eof; // look for an EOF signal - bool d_connected; // are we connected? - std::vector<char> d_rxbuf; // get UDP buffer items - std::vector<char> d_residbuf; // hold buffer between calls - std::ptrdiff_t - d_residual; // hold information about number of bytes stored in residbuf - size_t d_sent; // track how much of d_residbuf we've outputted - - static const int - BUF_SIZE_PAYLOADS; //!< The d_residbuf size in multiples of d_payload_size - - std::string d_host; - unsigned short d_port; - - std::unique_ptr<boost::asio::ip::udp::socket> d_socket; - boost::asio::ip::udp::endpoint d_endpoint; - boost::asio::ip::udp::endpoint d_endpoint_rcvd; - boost::asio::io_service d_io_service; - - gr::thread::condition_variable d_cond_wait; - gr::thread::mutex d_udp_mutex; - gr::thread::thread d_udp_thread; - - void start_receive(); - void handle_read(const boost::system::error_code& error, size_t bytes_transferred); - void run_io_service() { d_io_service.run(); } - -public: - udp_source_impl( - size_t itemsize, const std::string& host, int port, int payload_size, bool eof); - ~udp_source_impl() override; - - void connect(const std::string& host, int port) override; - void disconnect() override; - - int payload_size() override { return d_payload_size; } - int get_port() override; - - int work(int noutput_items, - gr_vector_const_void_star& input_items, - gr_vector_void_star& output_items) override; -}; - -} /* namespace blocks */ -} /* namespace gr */ - -#endif /* INCLUDED_GR_UDP_SOURCE_H */ |