1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
#!/usr/bin/env python
#
# Copyright 2007,2008 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
import math
from numpy import fft
from gnuradio import gr
try:
from gnuradio import filter
except ImportError:
import filter_swig as filter
try:
from gnuradio import blocks
except ImportError:
import blocks_swig as blocks
class ofdm_sync_pn(gr.hier_block2):
def __init__(self, fft_length, cp_length, logging=False):
"""
OFDM synchronization using PN Correlation:
T. M. Schmidl and D. C. Cox, "Robust Frequency and Timing
Synchonization for OFDM," IEEE Trans. Communications, vol. 45,
no. 12, 1997.
"""
gr.hier_block2.__init__(self, "ofdm_sync_pn",
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
gr.io_signature2(2, 2, gr.sizeof_float, gr.sizeof_char)) # Output signature
self.input = blocks.add_const_cc(0)
# PN Sync
# Create a delay line
self.delay = gr.delay(gr.sizeof_gr_complex, fft_length/2)
# Correlation from ML Sync
self.conjg = blocks.conjugate_cc();
self.corr = blocks.multiply_cc();
# Create a moving sum filter for the corr output
if 1:
moving_sum_taps = [1.0 for i in range(fft_length//2)]
self.moving_sum_filter = filter.fir_filter_ccf(1,moving_sum_taps)
else:
moving_sum_taps = [complex(1.0,0.0) for i in range(fft_length//2)]
self.moving_sum_filter = filter.fft_filter_ccc(1,moving_sum_taps)
# Create a moving sum filter for the input
self.inputmag2 = gr.complex_to_mag_squared()
movingsum2_taps = [1.0 for i in range(fft_length//2)]
if 1:
self.inputmovingsum = filter.fir_filter_fff(1,movingsum2_taps)
else:
self.inputmovingsum = filter.fft_filter_fff(1,movingsum2_taps)
self.square = blocks.multiply_ff()
self.normalize = blocks.divide_ff()
# Get magnitude (peaks) and angle (phase/freq error)
self.c2mag = gr.complex_to_mag_squared()
self.angle = gr.complex_to_arg()
self.sample_and_hold = gr.sample_and_hold_ff()
#ML measurements input to sampler block and detect
self.sub1 = blocks.add_const_ff(-1)
self.pk_detect = gr.peak_detector_fb(0.20, 0.20, 30, 0.001)
#self.pk_detect = gr.peak_detector2_fb(9)
self.connect(self, self.input)
# Calculate the frequency offset from the correlation of the preamble
self.connect(self.input, self.delay)
self.connect(self.input, (self.corr,0))
self.connect(self.delay, self.conjg)
self.connect(self.conjg, (self.corr,1))
self.connect(self.corr, self.moving_sum_filter)
self.connect(self.moving_sum_filter, self.c2mag)
self.connect(self.moving_sum_filter, self.angle)
self.connect(self.angle, (self.sample_and_hold,0))
# Get the power of the input signal to normalize the output of the correlation
self.connect(self.input, self.inputmag2, self.inputmovingsum)
self.connect(self.inputmovingsum, (self.square,0))
self.connect(self.inputmovingsum, (self.square,1))
self.connect(self.square, (self.normalize,1))
self.connect(self.c2mag, (self.normalize,0))
# Create a moving sum filter for the corr output
matched_filter_taps = [1.0/cp_length for i in range(cp_length)]
self.matched_filter = filter.fir_filter_fff(1,matched_filter_taps)
self.connect(self.normalize, self.matched_filter)
self.connect(self.matched_filter, self.sub1, self.pk_detect)
#self.connect(self.matched_filter, self.pk_detect)
self.connect(self.pk_detect, (self.sample_and_hold,1))
# Set output signals
# Output 0: fine frequency correction value
# Output 1: timing signal
self.connect(self.sample_and_hold, (self,0))
self.connect(self.pk_detect, (self,1))
if logging:
self.connect(self.matched_filter, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-mf_f.dat"))
self.connect(self.normalize, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-theta_f.dat"))
self.connect(self.angle, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-epsilon_f.dat"))
self.connect(self.pk_detect, gr.file_sink(gr.sizeof_char, "ofdm_sync_pn-peaks_b.dat"))
self.connect(self.sample_and_hold, gr.file_sink(gr.sizeof_float, "ofdm_sync_pn-sample_and_hold_f.dat"))
self.connect(self.input, gr.file_sink(gr.sizeof_gr_complex, "ofdm_sync_pn-input_c.dat"))
|