1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
#!/usr/bin/env python
#
# Copyright 2004,2010,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#
from __future__ import division
import math
import os
from gnuradio import gr, gr_unittest, trellis, digital, analog, blocks
fsm_args = {"awgn1o2_4": [2, 4, 4,
[0, 2, 0, 2, 1, 3, 1, 3],
[0, 3, 3, 0, 1, 2, 2, 1],
],
"rep2": [2, 1, 4, [0, 0], [0, 3]],
"nothing": [2, 1, 2, [0, 0], [0, 1]],
}
constells = {2: digital.constellation_bpsk(),
4: digital.constellation_qpsk(),
}
class test_trellis (gr_unittest.TestCase):
def test_001_fsm (self):
f = trellis.fsm(*fsm_args["awgn1o2_4"])
self.assertEqual(fsm_args["awgn1o2_4"],[f.I(),f.S(),f.O(),f.NS(),f.OS()])
def test_002_fsm (self):
f = trellis.fsm(*fsm_args["awgn1o2_4"])
g = trellis.fsm(f)
self.assertEqual((g.I(),g.S(),g.O(),g.NS(),g.OS()),(f.I(),f.S(),f.O(),f.NS(),f.OS()))
def test_003_fsm (self):
# FIXME: no file "awgn1o2_4.fsm"
#f = trellis.fsm("awgn1o2_4.fsm")
#self.assertEqual(fsm_args["awgn1o2_4"],(f.I(),f.S(),f.O(),f.NS(),f.OS()))
pass
def test_004_fsm(self):
""" Test to make sure fsm works with a single state fsm."""
# Just checking that it initializes properly.
f = trellis.fsm(*fsm_args["rep2"])
def test_001_interleaver (self):
K = 5
IN = [1,2,3,4,0]
DIN = [4,0,1,2,3]
i = trellis.interleaver(K,IN)
self.assertEqual((K,IN,DIN),(i.K(),i.INTER(),i.DEINTER()))
def test_001_viterbi(self):
"""
Runs some coding/decoding tests with a few different FSM
specs.
"""
for name, args in list(fsm_args.items()):
constellation = constells[args[2]]
fsms = trellis.fsm(*args)
noise = 0.1
tb = trellis_tb(constellation, fsms, noise)
tb.run()
# Make sure all packets successfully transmitted.
self.assertEqual(tb.dst.ntotal(), tb.dst.nright())
class trellis_tb(gr.top_block):
"""
A simple top block for use testing gr-trellis.
"""
def __init__(self, constellation, f, N0=0.25, seed=-666):
"""
constellation - a constellation object used for modulation.
f - a finite state machine specification used for coding.
N0 - noise level
seed - random seed
"""
super(trellis_tb, self).__init__()
# packet size in bits (make it multiple of 16 so it can be packed in a short)
packet_size = 1024*16
# bits per FSM input symbol
bitspersymbol = int(round(math.log(f.I()) / math.log(2))) # bits per FSM input symbol
# packet size in trellis steps
K = packet_size // bitspersymbol
# TX
src = blocks.lfsr_32k_source_s()
# packet size in shorts
src_head = blocks.head(gr.sizeof_short, packet_size // 16)
# unpack shorts to symbols compatible with the FSM input cardinality
s2fsmi = blocks.packed_to_unpacked_ss(bitspersymbol, gr.GR_MSB_FIRST)
# initial FSM state = 0
enc = trellis.encoder_ss(f, 0)
mod = digital.chunks_to_symbols_sc(constellation.points(), 1)
# CHANNEL
add = blocks.add_cc()
noise = analog.noise_source_c(analog.GR_GAUSSIAN,math.sqrt(N0 / 2),seed)
# RX
# data preprocessing to generate metrics for Viterbi
metrics = trellis.constellation_metrics_cf(constellation.base(), digital.TRELLIS_EUCLIDEAN)
# Put -1 if the Initial/Final states are not set.
va = trellis.viterbi_s(f, K, 0, -1)
# pack FSM input symbols to shorts
fsmi2s = blocks.unpacked_to_packed_ss(bitspersymbol, gr.GR_MSB_FIRST)
# check the output
self.dst = blocks.check_lfsr_32k_s()
self.connect (src, src_head, s2fsmi, enc, mod)
self.connect (mod, (add, 0))
self.connect (noise, (add, 1))
self.connect (add, metrics, va, fsmi2s, self.dst)
if __name__ == '__main__':
gr_unittest.run(test_trellis, "test_trellis.xml")
|