diff options
author | Josh Morman <jmorman@perspectalabs.com> | 2021-03-25 11:25:46 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-04-21 14:02:57 -0400 |
commit | 63d38839b44d9fac8929b3f7b9966a0ceed95d41 (patch) | |
tree | ec37c9eefa7b32c254e3945bc68b8d365b17d55b /gr-blocks | |
parent | 65bd288a2804ef10c5ccac322cb5209907bb5d96 (diff) |
blocks: remove tcp/udp qa
gr-network needs qa functionality on the tcp/udp blocks instead
Signed-off-by: Josh Morman <jmorman@perspectalabs.com>
Diffstat (limited to 'gr-blocks')
-rw-r--r-- | gr-blocks/python/blocks/qa_tcp_server_sink.py | 81 | ||||
-rw-r--r-- | gr-blocks/python/blocks/qa_udp_source_sink.py | 151 |
2 files changed, 0 insertions, 232 deletions
diff --git a/gr-blocks/python/blocks/qa_tcp_server_sink.py b/gr-blocks/python/blocks/qa_tcp_server_sink.py deleted file mode 100644 index 64bcbf0c2e..0000000000 --- a/gr-blocks/python/blocks/qa_tcp_server_sink.py +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2014 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# SPDX-License-Identifier: GPL-3.0-or-later -# -# - - -from gnuradio import gr, gr_unittest, blocks -import os -import socket -from time import sleep - -from threading import Timer -from multiprocessing import Process - - -class test_tcp_sink(gr_unittest.TestCase): - - def setUp(self): - os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb_snd = gr.top_block() - self.tb_rcv = gr.top_block() - - def tearDown(self): - self.tb_rcv = None - self.tb_snd = None - - def _tcp_client(self): - dst = blocks.vector_sink_s() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - for t in (0, 0.2): - # wait until server listens - sleep(t) - try: - sock.connect((self.addr, self.port)) - except socket.error as e: - if e.errno != 111: - raise - continue - break - fd = os.dup(sock.fileno()) - self.tb_rcv.connect( - blocks.file_descriptor_source( - self.itemsize, fd), dst) - self.tb_rcv.run() - self.assertEqual(self.data, dst.data()) - - def test_001(self): - self.addr = '127.0.0.1' - self.port = 65510 - self.itemsize = gr.sizeof_short - n_data = 16 - self.data = tuple([x for x in range(n_data)]) - -# tcp_server_sink blocks until client does not connect, start client -# process first - p = Process(target=self._tcp_client) - p.start() - - src = blocks.vector_source_s(self.data, False) - tcp_snd = blocks.tcp_server_sink( - self.itemsize, self.addr, self.port, False) - self.tb_snd.connect(src, tcp_snd) - - self.tb_snd.run() - del tcp_snd - self.tb_snd = None - p.join() - - def stop_rcv(self): - self.timeout = True - self.tb_rcv.stop() - # print "tb_rcv stopped by Timer" - - -if __name__ == '__main__': - gr_unittest.run(test_tcp_sink) diff --git a/gr-blocks/python/blocks/qa_udp_source_sink.py b/gr-blocks/python/blocks/qa_udp_source_sink.py deleted file mode 100644 index 2e48d0bfae..0000000000 --- a/gr-blocks/python/blocks/qa_udp_source_sink.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2008,2010,2013 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# SPDX-License-Identifier: GPL-3.0-or-later -# -# - - -from gnuradio import gr, gr_unittest, blocks -import numpy -import os -import socket -import time - -from threading import Timer, Thread - - -def recv_data(sock, result): - while True: - data = sock.recv(4 * 1000) - if len(data) == 0: - break - real_data = numpy.frombuffer(data, dtype=numpy.float32) - result.extend(list(real_data)) - - -class test_udp_sink_source(gr_unittest.TestCase): - - def setUp(self): - os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb_snd = gr.top_block() - self.tb_rcv = gr.top_block() - - def tearDown(self): - self.tb_rcv = None - self.tb_snd = None - - def test_001(self): - # Tests calling disconnect/reconnect. - - port = 65510 - - n_data = 16 - src_data = [x for x in range(n_data)] - expected_result = src_data - src = blocks.vector_source_s(src_data, False) - udp_snd = blocks.udp_sink(gr.sizeof_short, 'localhost', port) - self.tb_snd.connect(src, udp_snd) - - self.tb_snd.run() - udp_snd.disconnect() - - udp_snd.connect('localhost', port + 1) - src.rewind() - self.tb_snd.run() - - def test_sink_001(self): - port = 65520 - - n_data = 100 - src_data = [float(x) for x in range(n_data)] - expected_result = src_data - - recvsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - recvsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - recvsock.bind(('127.0.0.1', port)) - - result = [] - t = Thread(target=recv_data, args=(recvsock, result)) - t.start() - - src = blocks.vector_source_f(src_data, False) - udp_snd = blocks.udp_sink(gr.sizeof_float, '127.0.0.1', port) - self.tb_snd.connect(src, udp_snd) - - self.tb_snd.run() - udp_snd.disconnect() - t.join() - recvsock.close() - - self.assertEqual(expected_result, result) - - def test_source_001(self): - port = 65520 - - n_data = 100 - src_data = [float(x) for x in range(n_data)] - expected_result = src_data - send_data = numpy.array(src_data, dtype=numpy.float32) - send_data = send_data.tobytes() - - udp_rcv = blocks.udp_source(gr.sizeof_float, '127.0.0.1', port) - dst = blocks.vector_sink_f() - self.tb_rcv.connect(udp_rcv, dst) - self.tb_rcv.start() - time.sleep(1.0) - sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sendsock.sendto(send_data, ('127.0.0.1', port)) - time.sleep(1.0) - sendsock.sendto(b'', ('127.0.0.1', port)) - sendsock.sendto(b'', ('127.0.0.1', port)) - sendsock.sendto(b'', ('127.0.0.1', port)) - self.tb_rcv.wait() - sendsock.close() - recv_data = dst.data() - - self.assertEqual(expected_result, recv_data) - - def test_003(self): - port = 65530 - - udp_rcv = blocks.udp_source(gr.sizeof_float, '0.0.0.0', 0, eof=False) - rcv_port = udp_rcv.get_port() - - udp_snd = blocks.udp_sink(gr.sizeof_float, '127.0.0.1', port) - udp_snd.connect('127.0.0.1', rcv_port) - - n_data = 16 - src_data = [float(x) for x in range(n_data)] - expected_result = src_data - src = blocks.vector_source_f(src_data) - dst = blocks.vector_sink_f() - - self.tb_snd.connect(src, udp_snd) - self.tb_rcv.connect(udp_rcv, dst) - - self.tb_rcv.start() - time.sleep(0.1) - self.tb_snd.run() - udp_snd.disconnect() - self.timeout = False - q = Timer(2.0, self.stop_rcv) - q.start() - self.tb_rcv.wait() - q.cancel() - - result_data = dst.data() - self.assertEqual(expected_result, result_data) - self.assertTrue(self.timeout) # source ignores EOF? - - def stop_rcv(self): - self.timeout = True - self.tb_rcv.stop() - # print "tb_rcv stopped by Timer" - - -if __name__ == '__main__': - gr_unittest.run(test_udp_sink_source) |