summaryrefslogtreecommitdiff
path: root/gr-blocks/python/qa_python_message_passing.py
diff options
context:
space:
mode:
authorBen Reynwar <ben@reynwar.net>2013-06-04 10:59:29 -0700
committerBen Reynwar <ben@reynwar.net>2013-06-04 11:55:00 -0700
commit48f3452530b29627ce57089b65363776dbe9a179 (patch)
treea2005c23b427415b9d86bcf256c49b3480f61427 /gr-blocks/python/qa_python_message_passing.py
parentba0a9afedcd5451f2032004e9d583ed78d74ece2 (diff)
uninstalled import: Updatings blocks, fec, uhd, and filter so that uninstalled import works with recent changes.
Diffstat (limited to 'gr-blocks/python/qa_python_message_passing.py')
-rw-r--r--gr-blocks/python/qa_python_message_passing.py123
1 files changed, 0 insertions, 123 deletions
diff --git a/gr-blocks/python/qa_python_message_passing.py b/gr-blocks/python/qa_python_message_passing.py
deleted file mode 100644
index 58bf9e59bd..0000000000
--- a/gr-blocks/python/qa_python_message_passing.py
+++ /dev/null
@@ -1,123 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2013 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-from gnuradio import gr, gr_unittest
-import pmt
-import numpy
-import time
-import blocks_swig as blocks
-
-# Simple block to generate messages
-class message_generator(gr.sync_block):
- def __init__(self, msg_list, msg_interval):
- gr.sync_block.__init__(
- self,
- name = "message generator",
- in_sig = [numpy.float32],
- out_sig = None
- )
- self.msg_list = msg_list
- self.msg_interval = msg_interval
- self.msg_ctr = 0
- self.message_port_register_out(pmt.intern('out_port'))
-
-
- def work(self, input_items, output_items):
- inLen = len(input_items[0])
- while self.msg_ctr < len(self.msg_list) and \
- (self.msg_ctr * self.msg_interval) < \
- (self.nitems_read(0) + inLen):
- self.message_port_pub(pmt.intern('out_port'),
- self.msg_list[self.msg_ctr])
- self.msg_ctr += 1
- return inLen
-
-# Simple block to consume messages
-class message_consumer(gr.sync_block):
- def __init__(self):
- gr.sync_block.__init__(
- self,
- name = "message consumer",
- in_sig = None,
- out_sig = None
- )
- self.msg_list = []
- self.message_port_register_in(pmt.intern('in_port'))
- self.set_msg_handler(pmt.intern('in_port'),
- self.handle_msg)
-
- def handle_msg(self, msg):
- # Create a new PMT from long value and put in list
- self.msg_list.append(pmt.from_long(pmt.to_long(msg)))
-
-class test_python_message_passing(gr_unittest.TestCase):
-
- def setUp(self):
- self.tb = gr.top_block()
-
- def tearDown(self):
- self.tb = None
-
- def test_000(self):
- num_msgs = 10
- msg_interval = 1000
- msg_list = []
- for i in range(num_msgs):
- msg_list.append(pmt.from_long(i))
-
- # Create vector source with dummy data to trigger messages
- src_data = []
- for i in range(num_msgs*msg_interval):
- src_data.append(float(i))
- src = blocks.vector_source_f(src_data, False)
- msg_gen = message_generator(msg_list, msg_interval)
- msg_cons = message_consumer()
-
- # Connect vector source to message gen
- self.tb.connect(src, msg_gen)
-
- # Connect message generator to message consumer
- self.tb.msg_connect(msg_gen, 'out_port', msg_cons, 'in_port')
-
- # Verify that the messgae port query functions work
- self.assertEqual(pmt.symbol_to_string(pmt.vector_ref(
- msg_gen.message_ports_out(), 0)), 'out_port')
- self.assertEqual(pmt.symbol_to_string(pmt.vector_ref(
- msg_cons.message_ports_in(), 0)), 'in_port')
-
- # Run to verify message passing
- self.tb.start()
-
- # Wait for all messages to be sent
- while msg_gen.msg_ctr < num_msgs:
- time.sleep(0.5)
- self.tb.stop()
- self.tb.wait()
-
- # Verify that the message consumer got all the messages
- self.assertEqual(num_msgs, len(msg_cons.msg_list))
- for i in range(num_msgs):
- self.assertTrue(pmt.equal(msg_list[i], msg_cons.msg_list[i]))
-
-if __name__ == '__main__':
- gr_unittest.run(test_python_message_passing,
- 'test_python_message_passing.xml')