summaryrefslogtreecommitdiff
path: root/gr-blocks
diff options
context:
space:
mode:
authorJosh Morman <jmorman@perspectalabs.com>2021-03-25 11:25:46 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-04-21 14:02:57 -0400
commit63d38839b44d9fac8929b3f7b9966a0ceed95d41 (patch)
treeec37c9eefa7b32c254e3945bc68b8d365b17d55b /gr-blocks
parent65bd288a2804ef10c5ccac322cb5209907bb5d96 (diff)
blocks: remove tcp/udp qa
gr-network needs qa functionality on the tcp/udp blocks instead Signed-off-by: Josh Morman <jmorman@perspectalabs.com>
Diffstat (limited to 'gr-blocks')
-rw-r--r--gr-blocks/python/blocks/qa_tcp_server_sink.py81
-rw-r--r--gr-blocks/python/blocks/qa_udp_source_sink.py151
2 files changed, 0 insertions, 232 deletions
diff --git a/gr-blocks/python/blocks/qa_tcp_server_sink.py b/gr-blocks/python/blocks/qa_tcp_server_sink.py
deleted file mode 100644
index 64bcbf0c2e..0000000000
--- a/gr-blocks/python/blocks/qa_tcp_server_sink.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2014 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# SPDX-License-Identifier: GPL-3.0-or-later
-#
-#
-
-
-from gnuradio import gr, gr_unittest, blocks
-import os
-import socket
-from time import sleep
-
-from threading import Timer
-from multiprocessing import Process
-
-
-class test_tcp_sink(gr_unittest.TestCase):
-
- def setUp(self):
- os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
- self.tb_snd = gr.top_block()
- self.tb_rcv = gr.top_block()
-
- def tearDown(self):
- self.tb_rcv = None
- self.tb_snd = None
-
- def _tcp_client(self):
- dst = blocks.vector_sink_s()
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- for t in (0, 0.2):
- # wait until server listens
- sleep(t)
- try:
- sock.connect((self.addr, self.port))
- except socket.error as e:
- if e.errno != 111:
- raise
- continue
- break
- fd = os.dup(sock.fileno())
- self.tb_rcv.connect(
- blocks.file_descriptor_source(
- self.itemsize, fd), dst)
- self.tb_rcv.run()
- self.assertEqual(self.data, dst.data())
-
- def test_001(self):
- self.addr = '127.0.0.1'
- self.port = 65510
- self.itemsize = gr.sizeof_short
- n_data = 16
- self.data = tuple([x for x in range(n_data)])
-
-# tcp_server_sink blocks until client does not connect, start client
-# process first
- p = Process(target=self._tcp_client)
- p.start()
-
- src = blocks.vector_source_s(self.data, False)
- tcp_snd = blocks.tcp_server_sink(
- self.itemsize, self.addr, self.port, False)
- self.tb_snd.connect(src, tcp_snd)
-
- self.tb_snd.run()
- del tcp_snd
- self.tb_snd = None
- p.join()
-
- def stop_rcv(self):
- self.timeout = True
- self.tb_rcv.stop()
- # print "tb_rcv stopped by Timer"
-
-
-if __name__ == '__main__':
- gr_unittest.run(test_tcp_sink)
diff --git a/gr-blocks/python/blocks/qa_udp_source_sink.py b/gr-blocks/python/blocks/qa_udp_source_sink.py
deleted file mode 100644
index 2e48d0bfae..0000000000
--- a/gr-blocks/python/blocks/qa_udp_source_sink.py
+++ /dev/null
@@ -1,151 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2008,2010,2013 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# SPDX-License-Identifier: GPL-3.0-or-later
-#
-#
-
-
-from gnuradio import gr, gr_unittest, blocks
-import numpy
-import os
-import socket
-import time
-
-from threading import Timer, Thread
-
-
-def recv_data(sock, result):
- while True:
- data = sock.recv(4 * 1000)
- if len(data) == 0:
- break
- real_data = numpy.frombuffer(data, dtype=numpy.float32)
- result.extend(list(real_data))
-
-
-class test_udp_sink_source(gr_unittest.TestCase):
-
- def setUp(self):
- os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
- self.tb_snd = gr.top_block()
- self.tb_rcv = gr.top_block()
-
- def tearDown(self):
- self.tb_rcv = None
- self.tb_snd = None
-
- def test_001(self):
- # Tests calling disconnect/reconnect.
-
- port = 65510
-
- n_data = 16
- src_data = [x for x in range(n_data)]
- expected_result = src_data
- src = blocks.vector_source_s(src_data, False)
- udp_snd = blocks.udp_sink(gr.sizeof_short, 'localhost', port)
- self.tb_snd.connect(src, udp_snd)
-
- self.tb_snd.run()
- udp_snd.disconnect()
-
- udp_snd.connect('localhost', port + 1)
- src.rewind()
- self.tb_snd.run()
-
- def test_sink_001(self):
- port = 65520
-
- n_data = 100
- src_data = [float(x) for x in range(n_data)]
- expected_result = src_data
-
- recvsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- recvsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- recvsock.bind(('127.0.0.1', port))
-
- result = []
- t = Thread(target=recv_data, args=(recvsock, result))
- t.start()
-
- src = blocks.vector_source_f(src_data, False)
- udp_snd = blocks.udp_sink(gr.sizeof_float, '127.0.0.1', port)
- self.tb_snd.connect(src, udp_snd)
-
- self.tb_snd.run()
- udp_snd.disconnect()
- t.join()
- recvsock.close()
-
- self.assertEqual(expected_result, result)
-
- def test_source_001(self):
- port = 65520
-
- n_data = 100
- src_data = [float(x) for x in range(n_data)]
- expected_result = src_data
- send_data = numpy.array(src_data, dtype=numpy.float32)
- send_data = send_data.tobytes()
-
- udp_rcv = blocks.udp_source(gr.sizeof_float, '127.0.0.1', port)
- dst = blocks.vector_sink_f()
- self.tb_rcv.connect(udp_rcv, dst)
- self.tb_rcv.start()
- time.sleep(1.0)
- sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sendsock.sendto(send_data, ('127.0.0.1', port))
- time.sleep(1.0)
- sendsock.sendto(b'', ('127.0.0.1', port))
- sendsock.sendto(b'', ('127.0.0.1', port))
- sendsock.sendto(b'', ('127.0.0.1', port))
- self.tb_rcv.wait()
- sendsock.close()
- recv_data = dst.data()
-
- self.assertEqual(expected_result, recv_data)
-
- def test_003(self):
- port = 65530
-
- udp_rcv = blocks.udp_source(gr.sizeof_float, '0.0.0.0', 0, eof=False)
- rcv_port = udp_rcv.get_port()
-
- udp_snd = blocks.udp_sink(gr.sizeof_float, '127.0.0.1', port)
- udp_snd.connect('127.0.0.1', rcv_port)
-
- n_data = 16
- src_data = [float(x) for x in range(n_data)]
- expected_result = src_data
- src = blocks.vector_source_f(src_data)
- dst = blocks.vector_sink_f()
-
- self.tb_snd.connect(src, udp_snd)
- self.tb_rcv.connect(udp_rcv, dst)
-
- self.tb_rcv.start()
- time.sleep(0.1)
- self.tb_snd.run()
- udp_snd.disconnect()
- self.timeout = False
- q = Timer(2.0, self.stop_rcv)
- q.start()
- self.tb_rcv.wait()
- q.cancel()
-
- result_data = dst.data()
- self.assertEqual(expected_result, result_data)
- self.assertTrue(self.timeout) # source ignores EOF?
-
- def stop_rcv(self):
- self.timeout = True
- self.tb_rcv.stop()
- # print "tb_rcv stopped by Timer"
-
-
-if __name__ == '__main__':
- gr_unittest.run(test_udp_sink_source)