#!/usr/bin/env python
#
# Copyright 2004,2010,2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING.  If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#

from gnuradio import gr, gr_unittest
import blocks_swig as blocks
import pmt
import time

def all_counts():
    return (gr.block_ncurrently_allocated(),
            gr.block_detail_ncurrently_allocated(),
            gr.buffer_ncurrently_allocated(),
            gr.buffer_reader_ncurrently_allocated(),
            gr.message_ncurrently_allocated())


class test_message(gr_unittest.TestCase):

    def setUp(self):
        self.msgq = gr.msg_queue()

    def tearDown(self):
        self.msgq = None

    def leak_check(self, fct):
        begin = all_counts()
        fct()
        # tear down early so we can check for leaks
        self.tearDown()
        end = all_counts()
        self.assertEqual(begin, end)

    def test_100(self):
        msg = gr.message(0, 1.5, 2.3)
        self.assertEquals(0, msg.type())
        self.assertAlmostEqual(1.5, msg.arg1())
        self.assertAlmostEqual(2.3, msg.arg2())
        self.assertEquals(0, msg.length())

    def test_101(self):
        s = 'This is a test'
        msg = gr.message_from_string(s)
        self.assertEquals(s, msg.to_string())

    def test_200(self):
        self.leak_check(self.body_200)

    def body_200(self):
        self.msgq.insert_tail(gr.message(0))
        self.assertEquals(1, self.msgq.count())
        self.msgq.insert_tail(gr.message(1))
        self.assertEquals(2, self.msgq.count())
        msg0 = self.msgq.delete_head()
        self.assertEquals(0, msg0.type())
        msg1 = self.msgq.delete_head()
        self.assertEquals(1, msg1.type())
        self.assertEquals(0, self.msgq.count())

    def test_201(self):
        self.leak_check(self.body_201)

    def body_201(self):
        self.msgq.insert_tail(gr.message(0))
        self.assertEquals(1, self.msgq.count())
        self.msgq.insert_tail(gr.message(1))
        self.assertEquals(2, self.msgq.count())

    def test_202(self):
        self.leak_check(self.body_202)

    def body_202(self):
        # global msg
        msg = gr.message(666)

    def test_300(self):
        input_data = (0,1,2,3,4,5,6,7,8,9)
        src = gr.vector_source_b(input_data)
        dst = gr.vector_sink_b()
	tb = gr.top_block()
        tb.connect(src, dst)
        tb.run()
        self.assertEquals(input_data, dst.data())

    def test_301(self):
        # Use itemsize, limit constructor
        src = blocks.message_source(gr.sizeof_char)
        dst = gr.vector_sink_b()
	tb = gr.top_block()
        tb.connect(src, dst)
        src.msgq().insert_tail(gr.message_from_string('01234'))
        src.msgq().insert_tail(gr.message_from_string('5'))
        src.msgq().insert_tail(gr.message_from_string(''))
        src.msgq().insert_tail(gr.message_from_string('6789'))
        src.msgq().insert_tail(gr.message(1))                  # send EOF
        tb.run()
        self.assertEquals(tuple(map(ord, '0123456789')), dst.data())

    def test_302(self):
        # Use itemsize, msgq constructor
        msgq = gr.msg_queue()
        src = blocks.message_source(gr.sizeof_char, msgq)
        dst = gr.vector_sink_b()
	tb = gr.top_block()
        tb.connect(src, dst)
        src.msgq().insert_tail(gr.message_from_string('01234'))
        src.msgq().insert_tail(gr.message_from_string('5'))
        src.msgq().insert_tail(gr.message_from_string(''))
        src.msgq().insert_tail(gr.message_from_string('6789'))
        src.msgq().insert_tail(gr.message(1))                  # send EOF
        tb.run()
        self.assertEquals(tuple(map(ord, '0123456789')), dst.data())

    def test_debug_401(self):
        msg = pmt.pmt_intern("TESTING")
        src = blocks.message_strobe(msg, 500)
        snk = blocks.message_debug()

	tb = gr.top_block()
        tb.msg_connect(src, "strobe", snk, "store")
        tb.start()
        time.sleep(1)
        tb.stop()
        tb.wait()

        rec_msg = snk.get_message(0)
        self.assertTrue(pmt.pmt_eqv(rec_msg, msg))
        

if __name__ == '__main__':
    gr_unittest.run(test_message, "test_message.xml")