#!/usr/bin/env python
# Copyright 2012,2013 Free Software Foundation, Inc.
# This file is part of GNU Radio
# SPDX-License-Identifier: GPL-3.0-or-later

import random
import numpy

from gnuradio import gr, gr_unittest, blocks, analog, channels
from gnuradio import digital
from gnuradio.digital.utils import tagged_streams
from gnuradio.digital.ofdm_txrx import ofdm_tx

def make_bpsk_burst(fft_len, cp_len, num_bits):
    """ Create a burst of a sync symbol and some BPSK bits """
    sync_symbol = [
        (random.randint(0, 1) * 2) - 1
        for x in range(fft_len // 2)
    ] * 2
    sync_symbols = sync_symbol[-cp_len:] + sync_symbol
    mod_symbols = [
        (random.randint(0, 1) * 2) - 1
        for x in range(num_bits)
    return sync_symbols + mod_symbols

class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):

    def setUp(self):
        self.tb = gr.top_block()

    def tearDown(self):
        self.tb = None

    def test_001_detect(self):
        """ Send two bursts, with zeros in between, and check
        they are both detected at the correct position and no
        false alarms occur """
        n_zeros = 15
        fft_len = 32
        cp_len = 4
        sig_len = (fft_len + cp_len) * 10
        tx_signal = [0, ] * n_zeros + make_bpsk_burst(fft_len, cp_len, sig_len)
        tx_signal = tx_signal * 2
        add = blocks.add_cc()
        sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
        sink_freq = blocks.vector_sink_f()
        sink_detect = blocks.vector_sink_b()
        self.tb.connect(blocks.vector_source_c(tx_signal), (add, 0))
                analog.GR_GAUSSIAN, .01), (add, 1))
        self.tb.connect(add, sync)
        self.tb.connect((sync, 0), sink_freq)
        self.tb.connect((sync, 1), sink_detect)
        sig1_detect = sink_detect.data()[0:len(tx_signal) // 2]
        sig2_detect = sink_detect.data()[len(tx_signal) // 2:]
            1) - (n_zeros + fft_len + cp_len)) < cp_len)
            1) - (n_zeros + fft_len + cp_len)) < cp_len)
        self.assertEqual(numpy.sum(sig1_detect), 1)
        self.assertEqual(numpy.sum(sig2_detect), 1)

    def test_002_freq(self):
        """ Add a fine frequency offset and see if that gets detected properly """
        fft_len = 32
        cp_len = 4
        # This frequency offset is normalized to rads, i.e. \pi == f_s/2
        max_freq_offset = 2 * numpy.pi / fft_len  # Otherwise, it's coarse
        freq_offset = ((2 * random.random()) - 1) * max_freq_offset
        sig_len = (fft_len + cp_len) * 10
        tx_signal = make_bpsk_burst(fft_len, cp_len, sig_len)
        sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len, True)
        sink_freq = blocks.vector_sink_f()
        sink_detect = blocks.vector_sink_b()
        channel = channels.channel_model(0.005, freq_offset / 2.0 / numpy.pi)
        self.tb.connect(blocks.vector_source_c(tx_signal), channel, sync)
        self.tb.connect((sync, 0), sink_freq)
        self.tb.connect((sync, 1), sink_detect)
        phi_hat = sink_freq.data()[sink_detect.data().index(1)]
        est_freq_offset = 2 * phi_hat / fft_len
        self.assertAlmostEqual(est_freq_offset, freq_offset, places=2)

    def test_003_multiburst(self):
        """ Send several bursts, see if the number of detects is correct.
        Burst lengths and content are random.

        The channel is assumed AWGN for this test.
        n_bursts = 42
        fft_len = 32
        cp_len = 4
        tx_signal = []
        for _ in range(n_bursts):
            gap = [0, ] * random.randint(0, 2 * fft_len)
            tx_signal += gap + \
                make_bpsk_burst(fft_len, cp_len, fft_len * random.randint(5, 23))
        # Very loose definition of SNR here
        snr = 20  # dB
        sigma = 10**(-snr / 10)
        # Add noise -- we don't use the channel model blocks, we want to keep
        # this test as self-contained as possible, and all randomness should
        # derive from random.seed() above
        def complex_randn(N): return (numpy.random.randn(
            N) + 1j * numpy.random.randn(N)) * sigma / numpy.sqrt(2)
        tx_signal += complex_randn(len(tx_signal))
        sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
        sink_freq = blocks.vector_sink_f()
        sink_detect = blocks.vector_sink_b()
        self.tb.connect(blocks.vector_source_c(tx_signal), sync)
        self.tb.connect((sync, 0), sink_freq)
        self.tb.connect((sync, 1), sink_detect)
        n_bursts_detected = numpy.sum(sink_detect.data())
            n_bursts_detected, n_bursts,
            msg="Detection error (missed bursts): {}".format(
                (numpy.sum(sink_detect.data()) - n_bursts))

    def test_004_ofdm_packets(self):
        Send several bursts using ofdm_tx, see if the number of detects is correct.
        Burst lengths and content are random.
        n_bursts = 42
        fft_len = 64
        cp_len = 16
        # Here, coarse freq offset is allowed
        max_freq_offset = 2 * numpy.pi / fft_len * 4
        freq_offset = ((2 * random.random()) - 1) * max_freq_offset
        packets = []
        tagname = "packet_length"
        min_packet_length = 10
        max_packet_length = 50
        for _ in range(n_bursts):
            packet_length = random.randint(min_packet_length,
                                           max_packet_length + 1)
            packet = [random.randint(0, 255) for i in range(packet_length)]
        data, tags = tagged_streams.packets_to_vectors(
            packets, tagname, vlen=1)
        src = blocks.vector_source_b(data, False, 1, tags)
        mod = ofdm_tx(packet_length_tag_key=tagname)
        sync = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
        sink_freq = blocks.vector_sink_f()
        sink_detect = blocks.vector_sink_b()
        noise_level = 0.005
        channel = channels.channel_model(
            noise_level, freq_offset / 2 / numpy.pi)
        self.tb.connect(src, mod, channel, sync, sink_freq)
        self.tb.connect((sync, 1), sink_detect)
        self.assertEqual(numpy.sum(sink_detect.data()), n_bursts)

if __name__ == '__main__':