#!/usr/bin/env python

from gnuradio import gr, gr_unittest
import blocks_swig as blocks
import numpy

class add_ff(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name = "add_ff",
            in_sig = [numpy.float32, numpy.float32],
            out_sig = [numpy.float32],
        )

    def work(self, input_items, output_items):
        output_items[0][:] = input_items[0] + input_items[1]
        return len(output_items[0])

class multiply_const_ff(gr.sync_block):
    def __init__(self, k):
        gr.sync_block.__init__(
            self,
            name = "multiply_ff",
            in_sig = [numpy.float32],
            out_sig = [numpy.float32],
        )
        self.k = k

    def work(self, input_items, output_items):
        output_items[0][:] = map(lambda x: self.k*x, input_items[0])
        return len(output_items[0])

class test_hier_block2(gr_unittest.TestCase):

    def setUp(self):
	pass

    def tearDown(self):
    	pass

    def test_001_make(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	self.assertEqual("test_block", hblock.name())
	self.assertEqual(1, hblock.input_signature().max_streams())
	self.assertEqual(1, hblock.output_signature().min_streams())
	self.assertEqual(1, hblock.output_signature().max_streams())
	self.assertEqual(gr.sizeof_int, hblock.output_signature().sizeof_stream_item(0))

    def test_002_connect_input(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(hblock, nop1)

    def test_004_connect_output(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(nop1, hblock)

    def test_005_connect_output_in_use(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	nop2 = blocks.nop(gr.sizeof_int)
	hblock.connect(nop1, hblock)
	self.assertRaises(ValueError,
	    lambda: hblock.connect(nop2, hblock))

    def test_006_connect_invalid_src_port_neg(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	self.assertRaises(ValueError,
	    lambda: hblock.connect((hblock, -1), nop1))

    def test_005_connect_invalid_src_port_exceeds(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	self.assertRaises(ValueError,
	    lambda: hblock.connect((hblock, 1), nop1))

    def test_007_connect_invalid_dst_port_neg(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	nop2 = blocks.nop(gr.sizeof_int)
	self.assertRaises(ValueError,
	    lambda: hblock.connect(nop1, (nop2, -1)))

    def test_008_connect_invalid_dst_port_exceeds(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.null_sink(gr.sizeof_int)
	nop2 = blocks.null_sink(gr.sizeof_int)
	self.assertRaises(ValueError,
	    lambda: hblock.connect(nop1, (nop2, 1)))

    def test_009_check_topology(self):
	hblock = gr.top_block("test_block")
	hblock.check_topology(0, 0)

    def test_010_run(self):
        expected = (1.0, 2.0, 3.0, 4.0)
        hblock = gr.top_block("test_block")
        src = blocks.vector_source_f(expected, False)
        sink1 = blocks.vector_sink_f()
        sink2 = blocks.vector_sink_f()
        hblock.connect(src, sink1)
        hblock.connect(src, sink2)
        hblock.run()
        actual1 = sink1.data()
        actual2 = sink2.data()
        self.assertEquals(expected, actual1)
        self.assertEquals(expected, actual2)

    def test_012_disconnect_input(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(hblock, nop1)
        hblock.disconnect(hblock, nop1)

    def test_013_disconnect_input_not_connected(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
        nop2 = blocks.nop(gr.sizeof_int)
	hblock.connect(hblock, nop1)
        self.assertRaises(ValueError,
            lambda: hblock.disconnect(hblock, nop2))

    def test_014_disconnect_input_neg(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(hblock, nop1)
        self.assertRaises(ValueError,
            lambda: hblock.disconnect((hblock, -1), nop1))

    def test_015_disconnect_input_exceeds(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(hblock, nop1)
        self.assertRaises(ValueError,
            lambda: hblock.disconnect((hblock, 1), nop1))

    def test_016_disconnect_output(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(nop1, hblock)
        hblock.disconnect(nop1, hblock)

    def test_017_disconnect_output_not_connected(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
        nop2 = blocks.nop(gr.sizeof_int)
	hblock.connect(nop1, hblock)
        self.assertRaises(ValueError,
            lambda: hblock.disconnect(nop2, hblock))

    def test_018_disconnect_output_neg(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(hblock, nop1)
        self.assertRaises(ValueError,
            lambda: hblock.disconnect(nop1, (hblock, -1)))

    def test_019_disconnect_output_exceeds(self):
	hblock = gr.hier_block2("test_block",
				gr.io_signature(1,1,gr.sizeof_int),
				gr.io_signature(1,1,gr.sizeof_int))
	nop1 = blocks.nop(gr.sizeof_int)
	hblock.connect(nop1, hblock)
        self.assertRaises(ValueError,
            lambda: hblock.disconnect(nop1, (hblock, 1)))

    def test_020_run(self):
	hblock = gr.top_block("test_block")
	data = (1.0, 2.0, 3.0, 4.0)
	src = blocks.vector_source_f(data, False)
	dst = blocks.vector_sink_f()
	hblock.connect(src, dst)
	hblock.run()
	self.assertEquals(data, dst.data())

    def test_021_connect_single(self):
        hblock = gr.top_block("test_block")
        blk = gr.hier_block2("block",
                             gr.io_signature(0, 0, 0),
                             gr.io_signature(0, 0, 0))
        hblock.connect(blk)

    def test_022_connect_single_with_ports(self):
        hblock = gr.top_block("test_block")
        blk = gr.hier_block2("block",
                             gr.io_signature(1, 1, 1),
                             gr.io_signature(1, 1, 1))
        self.assertRaises(ValueError,
                          lambda: hblock.connect(blk))

    def test_023_connect_single_twice(self):
        hblock = gr.top_block("test_block")
        blk = gr.hier_block2("block",
                             gr.io_signature(0, 0, 0),
                             gr.io_signature(0, 0, 0))
        hblock.connect(blk)
        self.assertRaises(ValueError,
                          lambda: hblock.connect(blk))

    def test_024_disconnect_single(self):
        hblock = gr.top_block("test_block")
        blk = gr.hier_block2("block",
                             gr.io_signature(0, 0, 0),
                             gr.io_signature(0, 0, 0))
        hblock.connect(blk)
        hblock.disconnect(blk)

    def test_025_disconnect_single_not_connected(self):
        hblock = gr.top_block("test_block")
        blk = gr.hier_block2("block",
                             gr.io_signature(0, 0, 0),
                             gr.io_signature(0, 0, 0))
        self.assertRaises(ValueError,
                          lambda: hblock.disconnect(blk))

    def test_026_run_single(self):
        expected_data = (1.0,)
        tb = gr.top_block("top_block")
        hb = gr.hier_block2("block",
                            gr.io_signature(0, 0, 0),
                            gr.io_signature(0, 0, 0))
        src = blocks.vector_source_f(expected_data)
        dst = blocks.vector_sink_f()
        hb.connect(src, dst)
        tb.connect(hb)
        tb.run()
        self.assertEquals(expected_data, dst.data())

    def test_027a_internally_unconnected_input(self):
        tb = gr.top_block()
        hb = gr.hier_block2("block",
                            gr.io_signature(1, 1, 1),
                            gr.io_signature(1, 1, 1))
        hsrc = blocks.vector_source_b([1,])
        hb.connect(hsrc, hb) # wire output internally
        src = blocks.vector_source_b([1, ])
        dst = blocks.vector_sink_b()
        tb.connect(src, hb, dst) # hb's input is not connected internally
        self.assertRaises(RuntimeError,
                          lambda: tb.run())

    def test_027b_internally_unconnected_output(self):
        tb = gr.top_block()

        hb = gr.hier_block2("block",
                            gr.io_signature(1, 1, 1),
                            gr.io_signature(1, 1, 1))
        hdst = blocks.vector_sink_b()
        hb.connect(hb, hdst) # wire input internally
        src = blocks.vector_source_b([1, ])
        dst = blocks.vector_sink_b()
        tb.connect(src, hb, dst) # hb's output is not connected internally
        self.assertRaises(RuntimeError,
                          lambda: tb.run())

    def test_027c_fully_unconnected_output(self):
        tb = gr.top_block()
        hb = gr.hier_block2("block",
                            gr.io_signature(1, 1, 1),
                            gr.io_signature(1, 1, 1))
        hsrc = blocks.vector_sink_b()
        hb.connect(hb, hsrc) # wire input internally
        src = blocks.vector_source_b([1, ])
        dst = blocks.vector_sink_b()
        tb.connect(src, hb) # hb's output is not connected internally or externally
        self.assertRaises(RuntimeError,
                          lambda: tb.run())

    def test_027d_fully_unconnected_input(self):
        tb = gr.top_block()
        hb = gr.hier_block2("block",
                            gr.io_signature(1, 1, 1),
                            gr.io_signature(1, 1, 1))
        hdst = blocks.vector_source_b([1,])
        hb.connect(hdst, hb) # wire output internally
        dst = blocks.vector_sink_b()
        tb.connect(hb, dst) # hb's input is not connected internally or externally
        self.assertRaises(RuntimeError,
                          lambda: tb.run())

    def test_028_singleton_reconfigure(self):
        tb = gr.top_block()
        hb = gr.hier_block2("block",
                            gr.io_signature(0, 0, 0), gr.io_signature(0, 0, 0))
        src = blocks.vector_source_b([1, ])
        dst = blocks.vector_sink_b()
        hb.connect(src, dst)
        tb.connect(hb) # Singleton connect
        tb.lock()
        tb.disconnect_all()
        tb.connect(src, dst)
        tb.unlock()

    def test_029_singleton_disconnect(self):
        tb = gr.top_block()
        src = blocks.vector_source_b([1, ])
        dst = blocks.vector_sink_b()
        tb.connect(src, dst)
        tb.disconnect(src)   # Singleton disconnect
        tb.connect(src, dst)
        tb.run()
        self.assertEquals(dst.data(), (1,))

    def test_030_nested_input(self):
        tb = gr.top_block()
        src = blocks.vector_source_b([1,])
        hb1 = gr.hier_block2("hb1",
                             gr.io_signature(1, 1, gr.sizeof_char),
                             gr.io_signature(0, 0, 0))
        hb2 = gr.hier_block2("hb2",
                             gr.io_signature(1, 1, gr.sizeof_char),
                             gr.io_signature(0, 0, 0))
        dst = blocks.vector_sink_b()
        tb.connect(src, hb1)
        hb1.connect(hb1, hb2)
        hb2.connect(hb2, blocks.copy(gr.sizeof_char), dst)
        tb.run()
        self.assertEquals(dst.data(), (1,))

    def test_031_multiple_internal_inputs(self):
        tb = gr.top_block()
        src = blocks.vector_source_f([1.0,])
        hb = gr.hier_block2("hb",
                            gr.io_signature(1, 1, gr.sizeof_float),
                            gr.io_signature(1, 1, gr.sizeof_float))
        m1 = multiply_const_ff(1.0)
        m2 = multiply_const_ff(2.0)
        add = add_ff()
        hb.connect(hb, m1)       # m1 is connected to hb external input #0
        hb.connect(hb, m2)       # m2 is also connected to hb external input #0
        hb.connect(m1, (add, 0))
        hb.connect(m2, (add, 1))
        hb.connect(add, hb)      # add is connected to hb external output #0
        dst = blocks.vector_sink_f()
        tb.connect(src, hb, dst)
        tb.run()
        self.assertEquals(dst.data(), (3.0,))

    def test_032_nested_multiple_internal_inputs(self):
        tb = gr.top_block()
        src = blocks.vector_source_f([1.0,])
        hb = gr.hier_block2("hb",
                            gr.io_signature(1, 1, gr.sizeof_float),
                            gr.io_signature(1, 1, gr.sizeof_float))
        hb2 = gr.hier_block2("hb",
                            gr.io_signature(1, 1, gr.sizeof_float),
                            gr.io_signature(1, 1, gr.sizeof_float))

        m1 = multiply_const_ff(1.0)
        m2 = multiply_const_ff(2.0)
        add = add_ff()
        hb2.connect(hb2, m1)       # m1 is connected to hb2 external input #0
        hb2.connect(hb2, m2)       # m2 is also connected to hb2 external input #0
        hb2.connect(m1, (add, 0))
        hb2.connect(m2, (add, 1))
        hb2.connect(add, hb2)      # add is connected to hb2 external output #0
        hb.connect(hb, hb2, hb)   # hb as hb2 as nested internal block
        dst = blocks.vector_sink_f()
        tb.connect(src, hb, dst)
        tb.run()
        self.assertEquals(dst.data(), (3.0,))


if __name__ == "__main__":
    gr_unittest.run(test_hier_block2, "test_hier_block2.xml")