#
# Copyright 2011-2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#

import numpy

import pmt

from gnuradio import gr, gr_unittest, blocks


class non_sync_block(gr.basic_block):
    def __init__(self):
        gr.basic_block.__init__(self,
                                name="non_sync_block",
                                in_sig=[numpy.float32],
                                out_sig=[numpy.float32, numpy.float32])

    def general_work(self, input_items, output_items):
        self.consume(0, len(input_items[0]))
        self.produce(0, 2)
        self.produce(1, 1)
        return gr.WORK_CALLED_PRODUCE


class add_2_f32_1_f32(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name="add 2 f32",
            in_sig=[numpy.float32, numpy.float32],
            out_sig=[numpy.float32],
        )

    def work(self, input_items, output_items):
        output_items[0][:] = input_items[0] + input_items[1]
        return len(output_items[0])


class add_2_fc32_1_fc32(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name="add 2 fc32",
            in_sig=[numpy.complex64, numpy.complex64],
            out_sig=[numpy.complex64],
        )

    def work(self, input_items, output_items):
        output_items[0][:] = input_items[0] + input_items[1]
        return len(output_items[0])


class convolve(gr.sync_block):
    """
    A demonstration using block history to properly perform a convolution.
    """

    def __init__(self):
        gr.sync_block.__init__(
            self,
            name="convolve",
            in_sig=[numpy.float32],
            out_sig=[numpy.float32]
        )
        self._taps = [1, 0, 0, 0]
        self.set_history(len(self._taps))

    def work(self, input_items, output_items):
        output_items[0][:] = numpy.convolve(
            input_items[0], self._taps, mode='valid')
        return len(output_items[0])


class decim2x(gr.decim_block):
    def __init__(self):
        gr.decim_block.__init__(
            self,
            name="decim2x",
            in_sig=[numpy.float32],
            out_sig=[numpy.float32],
            decim=2
        )

    def work(self, input_items, output_items):
        output_items[0][:] = input_items[0][::2]
        return len(output_items[0])


class interp2x(gr.interp_block):
    def __init__(self):
        gr.interp_block.__init__(
            self,
            name="interp2x",
            in_sig=[numpy.float32],
            out_sig=[numpy.float32],
            interp=2
        )

    def work(self, input_items, output_items):
        output_items[0][1::2] = input_items[0]
        output_items[0][::2] = input_items[0]
        return len(output_items[0])


class tag_source(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name="tag source",
            in_sig=None,
            out_sig=[numpy.float32],
        )

    def work(self, input_items, output_items):
        num_output_items = len(output_items[0])

        # put code here to fill the output items...

        # make a new tag on the middle element every time work is called
        count = self.nitems_written(0) + num_output_items // 2
        key = pmt.string_to_symbol("example_key")
        value = pmt.string_to_symbol("example_value")
        self.add_item_tag(0, count, key, value)

        return num_output_items


class tag_sink(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name="tag sink",
            in_sig=[numpy.float32],
            out_sig=None,
        )
        self.key = None

    def work(self, input_items, output_items):
        num_input_items = len(input_items[0])

        # put code here to process the input items...

        # print all the tags received in this work call
        nread = self.nitems_read(0)
        tags = self.get_tags_in_range(0, nread, nread + num_input_items)
        for tag in tags:
            # print tag.offset
            # print pmt.symbol_to_string(tag.key)
            # print pmt.symbol_to_string(tag.value)
            self.key = pmt.symbol_to_string(tag.key)

        return num_input_items


class tag_sink_win(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(self, name="tag sink",
                               in_sig=[numpy.float32],
                               out_sig=None)
        self.key = None

    def work(self, input_items, output_items):
        num_input_items = len(input_items[0])
        tags = self.get_tags_in_window(0, 0, num_input_items)
        for tag in tags:
            self.key = pmt.symbol_to_string(tag.key)
        return num_input_items


class fc32_to_f32_2(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name="fc32_to_f32_2",
            in_sig=[numpy.complex64],
            out_sig=[(numpy.float32, 2)],
        )

    def work(self, input_items, output_items):
        output_items[0][::, 0] = numpy.real(input_items[0])
        output_items[0][::, 1] = numpy.imag(input_items[0])
        return len(output_items[0])


class vector_to_stream(gr.interp_block):
    def __init__(self, itemsize, nitems_per_block):
        gr.interp_block.__init__(
            self,
            name="vector_to_stream",
            in_sig=[(itemsize, nitems_per_block)],
            out_sig=[itemsize],
            interp=nitems_per_block
        )
        self.block_size = nitems_per_block

    def work(self, input_items, output_items):
        n = 0
        for i in range(len(input_items[0])):
            for j in range(self.block_size):
                output_items[0][n] = input_items[0][i][j]
                n += 1

        return len(output_items[0])


class test_block_gateway(gr_unittest.TestCase):

    def test_add_f32(self):
        tb = gr.top_block()
        src0 = blocks.vector_source_f([1, 3, 5, 7, 9], False)
        src1 = blocks.vector_source_f([0, 2, 4, 6, 8], False)
        adder = add_2_f32_1_f32()
        adder.name()
        sink = blocks.vector_sink_f()
        tb.connect((src0, 0), (adder, 0))
        tb.connect((src1, 0), (adder, 1))
        tb.connect(adder, sink)
        tb.run()
        self.assertEqual(sink.data(), [1, 5, 9, 13, 17])

    def test_add_fc32(self):
        tb = gr.top_block()
        src0 = blocks.vector_source_c([1, 3j, 5, 7j, 9], False)
        src1 = blocks.vector_source_c([0, 2j, 4, 6j, 8], False)
        adder = add_2_fc32_1_fc32()
        sink = blocks.vector_sink_c()
        tb.connect((src0, 0), (adder, 0))
        tb.connect((src1, 0), (adder, 1))
        tb.connect(adder, sink)
        tb.run()
        self.assertEqual(sink.data(), [1, 5j, 9, 13j, 17])

    def test_convolve(self):
        tb = gr.top_block()
        src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False)
        cv = convolve()
        sink = blocks.vector_sink_f()
        tb.connect(src, cv, sink)
        tb.run()
        self.assertEqual(sink.data(), [1, 2, 3, 4, 5, 6, 7, 8])

    def test_decim2x(self):
        tb = gr.top_block()
        src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False)
        d2x = decim2x()
        sink = blocks.vector_sink_f()
        tb.connect(src, d2x, sink)
        tb.run()
        self.assertEqual(sink.data(), [1, 3, 5, 7])

    def test_interp2x(self):
        tb = gr.top_block()
        src = blocks.vector_source_f([1, 3, 5, 7, 9], False)
        i2x = interp2x()
        sink = blocks.vector_sink_f()
        tb.connect(src, i2x, sink)
        tb.run()
        self.assertEqual(sink.data(), [1, 1, 3, 3, 5, 5, 7, 7, 9, 9])

    def test_tags(self):
        src = tag_source()
        sink = tag_sink()
        # should be enough items to get a tag through
        head = blocks.head(gr.sizeof_float, 50000)
        tb = gr.top_block()
        tb.connect(src, head, sink)
        tb.run()
        self.assertEqual(sink.key, "example_key")

    def test_tags_win(self):
        src = tag_source()
        sink = tag_sink_win()
        # should be enough items to get a tag through
        head = blocks.head(gr.sizeof_float, 50000)
        tb = gr.top_block()
        tb.connect(src, head, sink)
        tb.run()
        self.assertEqual(sink.key, "example_key")

    def test_fc32_to_f32_2(self):
        tb = gr.top_block()
        src = blocks.vector_source_c(
            [1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j], False)
        convert = fc32_to_f32_2()
        v2s = vector_to_stream(numpy.float32, 2)
        sink = blocks.vector_sink_f()
        tb.connect(src, convert, v2s, sink)
        tb.run()
        self.assertEqual(sink.data(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

    def test_non_sync_block(self):
        tb = gr.top_block()
        src = blocks.vector_source_f(range(1000000))
        sinks = [blocks.vector_sink_f(), blocks.vector_sink_f()]
        dut = non_sync_block()
        tb.connect(src, dut)
        tb.connect((dut, 0), sinks[0])
        tb.connect((dut, 1), sinks[1])
        tb.run()
        self.assertEqual(len(sinks[0].data()), 2 * len(sinks[1].data()))


if __name__ == '__main__':
    gr_unittest.run(test_block_gateway)