#!/usr/bin/env python # # Copyright 2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # # SPDX-License-Identifier: GPL-3.0-or-later # # import random from gnuradio import gr, gr_unittest, blocks import pmt class qa_repack_bits_bb (gr_unittest.TestCase): def setUp(self): random.seed(0) self.tb = gr.top_block() self.tsb_key = "length" def tearDown(self): self.tb = None def test_001_simple(self): """ Very simple test, 2 bits -> 1 """ src_data = [0b11, 0b01, 0b10] expected_data = [0b1, 0b1, 0b1, 0b0, 0b0, 0b1] k = 2 l = 1 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) self.tb.run() self.assertEqual(sink.data(), expected_data) def test_001_simple_msb(self): """ Very simple test, 2 bits -> 1 with MSB set """ src_data = [0b11, 0b01, 0b10] expected_data = [0b1, 0b1, 0b0, 0b1, 0b1, 0b0] k = 2 l = 1 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l, "", False, gr.GR_MSB_FIRST) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) self.tb.run() self.assertEqual(sink.data(), expected_data) def test_002_three(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111, 0b11111111] expected_data = [0b101, ] + [0b111, ] * 7 k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) self.tb.run() self.assertEqual(sink.data(), expected_data) def test_002_three(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111, 0b11111111] expected_data = [0b101, ] + [0b111, ] * 7 k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) self.tb.run() self.assertEqual(sink.data(), expected_data) def test_002_three_msb(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111, 0b11111111] expected_data = [0b111, ] + [0b111, ] + [0b011, ] + [0b111, ] * 5 k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l, "", False, gr.GR_MSB_FIRST) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) self.tb.run() self.assertEqual(sink.data(), expected_data) def test_003_lots_of_bytes(self): """ Lots and lots of bytes, multiple packer stages """ src_data = [random.randint(0, 255) for x in range(3 * 5 * 7 * 8 * 10)] src = blocks.vector_source_b(src_data, False, 1) repack1 = blocks.repack_bits_bb(8, 3) repack2 = blocks.repack_bits_bb(3, 5) repack3 = blocks.repack_bits_bb(5, 7) repack4 = blocks.repack_bits_bb(7, 8) sink = blocks.vector_sink_b() self.tb.connect(src, repack1, repack2, repack3, repack4, sink) self.tb.run() self.assertEqual(sink.data(), src_data) def test_003_lots_of_bytes_msb(self): """ Lots and lots of bytes, multiple packer stages """ src_data = [random.randint(0, 255) for x in range(3 * 5 * 7 * 8 * 10)] src = blocks.vector_source_b(src_data, False, 1) repack1 = blocks.repack_bits_bb(8, 3, "", False, gr.GR_MSB_FIRST) repack2 = blocks.repack_bits_bb(3, 5, "", False, gr.GR_MSB_FIRST) repack3 = blocks.repack_bits_bb(5, 7, "", False, gr.GR_MSB_FIRST) repack4 = blocks.repack_bits_bb(7, 8, "", False, gr.GR_MSB_FIRST) sink = blocks.vector_sink_b() self.tb.connect(src, repack1, repack2, repack3, repack4, sink) self.tb.run() self.assertEqual(sink.data(), src_data) def test_004_three_with_tags(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111] expected_data = [0b101, ] + [0b111, ] * 4 + [0b001, ] k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l, self.tsb_key) sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) self.tb.connect( src, blocks.stream_to_tagged_stream( gr.sizeof_char, 1, len(src_data), self.tsb_key), repack, sink) self.tb.run() self.assertEqual(len(sink.data()), 1) self.assertEqual(sink.data()[0], expected_data) def test_005_three_with_tags_trailing(self): """ 3 -> 8, trailing bits """ src_data = [0b101, ] + [0b111, ] * 4 + [0b001, ] expected_data = [0b11111101, 0b11111111] k = 3 l = 8 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l, self.tsb_key, True) sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) self.tb.connect( src, blocks.stream_to_tagged_stream( gr.sizeof_char, 1, len(src_data), self.tsb_key), repack, sink) self.tb.run() self.assertEqual(len(sink.data()), 1) self.assertEqual(sink.data()[0], expected_data) if __name__ == '__main__': gr_unittest.run(qa_repack_bits_bb)