diff options
author | Josh Morman <jmorman@gnuradio.org> | 2021-11-24 12:40:24 -0500 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-11-24 14:41:53 -0500 |
commit | de184bd22f98b714bc2f383d59126cd2510374fe (patch) | |
tree | 87dfc815886175fa59cbb3d6f88978616d810d01 | |
parent | 8ab3e5a686f89ae7711cec9496cdc6598a4ad80b (diff) |
trellis: pep8 formatting
Signed-off-by: Josh Morman <jmorman@gnuradio.org>
-rw-r--r-- | gr-trellis/docs/make_numbered_listing.py | 45 | ||||
-rw-r--r-- | gr-trellis/docs/test_tcm.py | 105 | ||||
-rw-r--r-- | gr-trellis/docs/test_viterbi_equalization1.py | 115 | ||||
-rw-r--r-- | gr-trellis/examples/python/test_tcm.py | 134 | ||||
-rw-r--r-- | gr-trellis/python/trellis/fsm_utils.py | 169 |
5 files changed, 304 insertions, 264 deletions
diff --git a/gr-trellis/docs/make_numbered_listing.py b/gr-trellis/docs/make_numbered_listing.py index c295dc8763..cdef9161fb 100644 --- a/gr-trellis/docs/make_numbered_listing.py +++ b/gr-trellis/docs/make_numbered_listing.py @@ -1,45 +1,48 @@ #!/usr/bin/env python import sys -import os, os.path +import os +import os.path from optparse import OptionParser -def quote_line (line): - line = line.replace ('&', '&') - line = line.replace ('<', '<') - line = line.replace ('>', '>') - line = line.replace ("'", ''') - line = line.replace ('"', '"') + +def quote_line(line): + line = line.replace('&', '&') + line = line.replace('<', '<') + line = line.replace('>', '>') + line = line.replace("'", ''') + line = line.replace('"', '"') return line -def generate_listing (input_filename, title=None): - inf = open (input_filename, "r") - output_filename = os.path.basename (input_filename) + '.xml' - outf = open (output_filename, "w") - outf.write ('<?xml version="1.0" encoding="ISO-8859-1"?>\n') + +def generate_listing(input_filename, title=None): + inf = open(input_filename, "r") + output_filename = os.path.basename(input_filename) + '.xml' + outf = open(output_filename, "w") + outf.write('<?xml version="1.0" encoding="ISO-8859-1"?>\n') # outf.write ('<example id="%s">\n' % (input_filename,)) # if not title: # title = input_filename # outf.write ('<title>') # outf.write (title) # outf.write ('</title>\n') - outf.write ('<programlisting>\n'); + outf.write('<programlisting>\n') lineno = 0 for line in inf: - line = line.expandtabs (8) - line = quote_line (line) + line = line.expandtabs(8) + line = quote_line(line) lineno = lineno + 1 - outf.write ('%3d %s' % (lineno, line)) + outf.write('%3d %s' % (lineno, line)) - outf.write ('</programlisting>\n') + outf.write('</programlisting>\n') # outf.write ('</example>\n') -def main (): +def main(): for file in sys.argv[1:]: - generate_listing (file) + generate_listing(file) -if __name__ == '__main__': - main () +if __name__ == '__main__': + main() diff --git a/gr-trellis/docs/test_tcm.py b/gr-trellis/docs/test_tcm.py index f9442dbc9f..65df9a2dbe 100644 --- a/gr-trellis/docs/test_tcm.py +++ b/gr-trellis/docs/test_tcm.py @@ -15,84 +15,97 @@ except ImportError: sys.stderr.write("Error: Program requires gr-analog.\n") sys.exit(1) -def run_test (f,Kb,bitspersymbol,K,dimensionality,constellation,N0,seed): - tb = gr.top_block () + +def run_test(f, Kb, bitspersymbol, K, dimensionality, constellation, N0, seed): + tb = gr.top_block() # TX src = blocks.lfsr_32k_source_s() - src_head = blocks.head (gr.sizeof_short,Kb / 16) # packet size in shorts - s2fsmi = blocks.packed_to_unpacked_ss(bitspersymbol,gr.GR_MSB_FIRST) # unpack shorts to symbols compatible with the FSM input cardinality - enc = trellis.encoder_ss(f,0) # initial state = 0 - mod = digital.chunks_to_symbols_sf(constellation,dimensionality) + src_head = blocks.head(gr.sizeof_short, Kb / 16) # packet size in shorts + # unpack shorts to symbols compatible with the FSM input cardinality + s2fsmi = blocks.packed_to_unpacked_ss(bitspersymbol, gr.GR_MSB_FIRST) + enc = trellis.encoder_ss(f, 0) # initial state = 0 + mod = digital.chunks_to_symbols_sf(constellation, dimensionality) # CHANNEL add = blocks.add_ff() - noise = analog.noise_source_f(analog.GR_GAUSSIAN,math.sqrt(N0 / 2),seed) + noise = analog.noise_source_f(analog.GR_GAUSSIAN, math.sqrt(N0 / 2), seed) # RX - metrics = trellis.metrics_f(f.O(),dimensionality,constellation,digital.TRELLIS_EUCLIDEAN) # data preprocessing to generate metrics for Viterbi - va = trellis.viterbi_s(f,K,0,-1) # Put -1 if the Initial/Final states are not set. - fsmi2s = blocks.unpacked_to_packed_ss(bitspersymbol,gr.GR_MSB_FIRST) # pack FSM input symbols to shorts - dst = blocks.check_lfsr_32k_s(); - - tb.connect (src,src_head,s2fsmi,enc,mod) - tb.connect (mod,(add,0)) - tb.connect (noise,(add,1)) - tb.connect (add,metrics) - tb.connect (metrics,va,fsmi2s,dst) + # data preprocessing to generate metrics for Viterbi + metrics = trellis.metrics_f( + f.O(), dimensionality, constellation, digital.TRELLIS_EUCLIDEAN) + # Put -1 if the Initial/Final states are not set. + va = trellis.viterbi_s(f, K, 0, -1) + fsmi2s = blocks.unpacked_to_packed_ss( + bitspersymbol, gr.GR_MSB_FIRST) # pack FSM input symbols to shorts + dst = blocks.check_lfsr_32k_s() + + tb.connect(src, src_head, s2fsmi, enc, mod) + tb.connect(mod, (add, 0)) + tb.connect(noise, (add, 1)) + tb.connect(add, metrics) + tb.connect(metrics, va, fsmi2s, dst) tb.run() # A bit of cheating: run the program once and print the # final encoder state. # Then put it as the last argument in the viterbi block - #print "final state = " , enc.ST() + # print "final state = " , enc.ST() - ntotal = dst.ntotal () - nright = dst.nright () - runlength = dst.runlength () - return (ntotal,ntotal-nright) + ntotal = dst.ntotal() + nright = dst.nright() + runlength = dst.runlength() + return (ntotal, ntotal - nright) def main(args): - nargs = len (args) + nargs = len(args) if nargs == 3: - fname=args[0] - esn0_db=float(args[1]) # Es/No in dB - rep=int(args[2]) # number of times the experiment is run to collect enough errors + fname = args[0] + esn0_db = float(args[1]) # Es/No in dB + # number of times the experiment is run to collect enough errors + rep = int(args[2]) else: - sys.stderr.write ('usage: test_tcm.py fsm_fname Es/No_db repetitions\n') - sys.exit (1) + sys.stderr.write( + 'usage: test_tcm.py fsm_fname Es/No_db repetitions\n') + sys.exit(1) # system parameters - f=trellis.fsm(fname) # get the FSM specification from a file - Kb=1024*16 # packet size in bits (make it multiple of 16 so it can be packed in a short) - bitspersymbol = int(round(math.log(f.I()) / math.log(2))) # bits per FSM input symbol - K=Kb / bitspersymbol # packet size in trellis steps - modulation = fsm_utils.psk4 # see fsm_utlis.py for available predefined modulations + f = trellis.fsm(fname) # get the FSM specification from a file + # packet size in bits (make it multiple of 16 so it can be packed in a short) + Kb = 1024 * 16 + # bits per FSM input symbol + bitspersymbol = int(round(math.log(f.I()) / math.log(2))) + K = Kb / bitspersymbol # packet size in trellis steps + modulation = fsm_utils.psk4 # see fsm_utlis.py for available predefined modulations dimensionality = modulation[0] constellation = modulation[1] if len(constellation) / dimensionality != f.O(): - sys.stderr.write ('Incompatible FSM output cardinality and modulation size.\n') - sys.exit (1) + sys.stderr.write( + 'Incompatible FSM output cardinality and modulation size.\n') + sys.exit(1) # calculate average symbol energy Es = 0 for i in range(len(constellation)): Es = Es + constellation[i]**2 - Es = Es / (len(constellation)//dimensionality) - N0=Es / pow(10.0,esn0_db/10.0); # noise variance + Es = Es / (len(constellation) // dimensionality) + N0 = Es / pow(10.0, esn0_db / 10.0) # noise variance - tot_s=0 - terr_s=0 + tot_s = 0 + terr_s = 0 for i in range(rep): - (s,e)=run_test(f,Kb,bitspersymbol,K,dimensionality,constellation,N0,-int(666+i)) # run experiment with different seed to get different noise realizations - tot_s=tot_s+s - terr_s=terr_s+e - if (i%100==0): - print(i,s,e,tot_s,terr_s, '%e' % ((1.0*terr_s) / tot_s)) + # run experiment with different seed to get different noise realizations + (s, e) = run_test(f, Kb, bitspersymbol, K, + dimensionality, constellation, N0, -int(666 + i)) + tot_s = tot_s + s + terr_s = terr_s + e + if (i % 100 == 0): + print(i, s, e, tot_s, terr_s, '%e' % ((1.0 * terr_s) / tot_s)) # estimate of the (short) error rate - print(tot_s,terr_s, '%e' % ((1.0*terr_s) / tot_s)) + print(tot_s, terr_s, '%e' % ((1.0 * terr_s) / tot_s)) if __name__ == '__main__': - main (sys.argv[1:]) + main(sys.argv[1:]) diff --git a/gr-trellis/docs/test_viterbi_equalization1.py b/gr-trellis/docs/test_viterbi_equalization1.py index 1302712d98..88b425ac8b 100644 --- a/gr-trellis/docs/test_viterbi_equalization1.py +++ b/gr-trellis/docs/test_viterbi_equalization1.py @@ -15,94 +15,107 @@ except ImportError: sys.stderr.write("Error: Program requires gr-analog.\n") sys.exit(1) -def run_test (f,Kb,bitspersymbol,K,channel,modulation,dimensionality,tot_constellation,N0,seed): - tb = gr.top_block () + +def run_test(f, Kb, bitspersymbol, K, channel, modulation, dimensionality, tot_constellation, N0, seed): + tb = gr.top_block() L = len(channel) # TX # this for loop is TOO slow in python!!! - packet = [0]*(K+2*L) + packet = [0] * (K + 2 * L) random.seed(seed) for i in range(len(packet)): - packet[i] = random.randint(0, 2**bitspersymbol - 1) # random symbols - for i in range(L): # first/last L symbols set to 0 + packet[i] = random.randint(0, 2**bitspersymbol - 1) # random symbols + for i in range(L): # first/last L symbols set to 0 packet[i] = 0 - packet[len(packet)-i-1] = 0 - src = blocks.vector_source_s(packet,False) - mod = digital.chunks_to_symbols_sf(modulation[1],modulation[0]) + packet[len(packet) - i - 1] = 0 + src = blocks.vector_source_s(packet, False) + mod = digital.chunks_to_symbols_sf(modulation[1], modulation[0]) # CHANNEL - isi = filter.fir_filter_fff(1,channel) + isi = filter.fir_filter_fff(1, channel) add = blocks.add_ff() - noise = analog.noise_source_f(analog.GR_GAUSSIAN,math.sqrt(N0 / 2),seed) + noise = analog.noise_source_f(analog.GR_GAUSSIAN, math.sqrt(N0 / 2), seed) # RX - skip = blocks.skiphead(gr.sizeof_float, L) # skip the first L samples since you know they are coming from the L zero symbols - #metrics = trellis.metrics_f(f.O(),dimensionality,tot_constellation,digital.TRELLIS_EUCLIDEAN) # data preprocessing to generate metrics for Viterbi - #va = trellis.viterbi_s(f,K+L,0,0) # Put -1 if the Initial/Final states are not set. - va = trellis.viterbi_combined_s(f,K+L,0,0,dimensionality,tot_constellation,digital.TRELLIS_EUCLIDEAN) # using viterbi_combined_s instead of metrics_f/viterbi_s allows larger packet lengths because metrics_f is complaining for not being able to allocate large buffers. This is due to the large f.O() in this application... + # skip the first L samples since you know they are coming from the L zero symbols + skip = blocks.skiphead(gr.sizeof_float, L) + # metrics = trellis.metrics_f(f.O(),dimensionality,tot_constellation,digital.TRELLIS_EUCLIDEAN) # data preprocessing to generate metrics for Viterbi + # va = trellis.viterbi_s(f,K+L,0,0) # Put -1 if the Initial/Final states are not set. + # using viterbi_combined_s instead of metrics_f/viterbi_s allows larger packet lengths because metrics_f is complaining for not being able to allocate large buffers. This is due to the large f.O() in this application... + va = trellis.viterbi_combined_s( + f, K + L, 0, 0, dimensionality, tot_constellation, digital.TRELLIS_EUCLIDEAN) dst = blocks.vector_sink_s() - tb.connect (src,mod) - tb.connect (mod,isi,(add,0)) - tb.connect (noise,(add,1)) + tb.connect(src, mod) + tb.connect(mod, isi, (add, 0)) + tb.connect(noise, (add, 1)) #tb.connect (add,metrics) #tb.connect (metrics,va,dst) - tb.connect (add,skip,va,dst) + tb.connect(add, skip, va, dst) tb.run() data = dst.data() ntotal = len(data) - L - nright=0 + nright = 0 for i in range(ntotal): - if packet[i+L]==data[i]: - nright=nright+1 - #else: - #print "Error in ", i + if packet[i + L] == data[i]: + nright = nright + 1 + # else: + # print "Error in ", i - return (ntotal,ntotal-nright) + return (ntotal, ntotal - nright) def main(args): - nargs = len (args) + nargs = len(args) if nargs == 2: - esn0_db=float(args[0]) - rep=int(args[1]) + esn0_db = float(args[0]) + rep = int(args[1]) else: - sys.stderr.write ('usage: test_viterbi_equalization1.py Es/No_db repetitions\n') - sys.exit (1) + sys.stderr.write( + 'usage: test_viterbi_equalization1.py Es/No_db repetitions\n') + sys.exit(1) # system parameters - Kb=2048 # packet size in bits - modulation = fsm_utils.pam4 # see fsm_utlis.py for available predefined modulations - channel = fsm_utils.c_channel # see fsm_utlis.py for available predefined test channels - f=trellis.fsm(len(modulation[1]),len(channel)) # generate the FSM automatically - bitspersymbol = int(round(math.log(f.I()) / math.log(2))) # bits per FSM input symbol - K=Kb / bitspersymbol # packet size in trellis steps - - tot_channel = fsm_utils.make_isi_lookup(modulation,channel,True) # generate the lookup table (normalize energy to 1) + Kb = 2048 # packet size in bits + modulation = fsm_utils.pam4 # see fsm_utlis.py for available predefined modulations + # see fsm_utlis.py for available predefined test channels + channel = fsm_utils.c_channel + # generate the FSM automatically + f = trellis.fsm(len(modulation[1]), len(channel)) + # bits per FSM input symbol + bitspersymbol = int(round(math.log(f.I()) / math.log(2))) + K = Kb / bitspersymbol # packet size in trellis steps + + # generate the lookup table (normalize energy to 1) + tot_channel = fsm_utils.make_isi_lookup(modulation, channel, True) dimensionality = tot_channel[0] tot_constellation = tot_channel[1] - N0=pow(10.0,-esn0_db / 10.0); # noise variance + N0 = pow(10.0, -esn0_db / 10.0) # noise variance if len(tot_constellation) / dimensionality != f.O(): - sys.stderr.write ('Incompatible FSM output cardinality and lookup table size.\n') - sys.exit (1) + sys.stderr.write( + 'Incompatible FSM output cardinality and lookup table size.\n') + sys.exit(1) - tot_s=0 # total number of transmitted shorts - terr_s=0 # total number of shorts in error - terr_p=0 # total number of packets in error + tot_s = 0 # total number of transmitted shorts + terr_s = 0 # total number of shorts in error + terr_p = 0 # total number of packets in error for i in range(rep): - (s,e)=run_test(f,Kb,bitspersymbol,K,channel,modulation,dimensionality,tot_constellation,N0,-int(666+i)) # run experiment with different seed to get different data and noise realizations - tot_s=tot_s+s - terr_s=terr_s+e - terr_p=terr_p+(terr_s!=0) - if ((i+1)%100==0) : # display progress - print(i+1,terr_p, '%.2e' % ((1.0*terr_p) / (i+1)),tot_s,terr_s, '%.2e' % ((1.0*terr_s) / tot_s)) + (s, e) = run_test(f, Kb, bitspersymbol, K, channel, modulation, dimensionality, tot_constellation, + N0, -int(666 + i)) # run experiment with different seed to get different data and noise realizations + tot_s = tot_s + s + terr_s = terr_s + e + terr_p = terr_p + (terr_s != 0) + if ((i + 1) % 100 == 0): # display progress + print(i + 1, terr_p, '%.2e' % ((1.0 * terr_p) / (i + 1)), + tot_s, terr_s, '%.2e' % ((1.0 * terr_s) / tot_s)) # estimate of the (short or symbol) error rate - print(rep,terr_p, '%.2e' % ((1.0*terr_p) / (i+1)),tot_s,terr_s, '%.2e' % ((1.0*terr_s) / tot_s)) + print(rep, terr_p, '%.2e' % ((1.0 * terr_p) / (i + 1)), + tot_s, terr_s, '%.2e' % ((1.0 * terr_s) / tot_s)) if __name__ == '__main__': - main (sys.argv[1:]) + main(sys.argv[1:]) diff --git a/gr-trellis/examples/python/test_tcm.py b/gr-trellis/examples/python/test_tcm.py index 958a4b5c64..c747b5b13c 100644 --- a/gr-trellis/examples/python/test_tcm.py +++ b/gr-trellis/examples/python/test_tcm.py @@ -17,108 +17,120 @@ except ImportError: sys.stderr.write("Error: Program requires gr-analog.\n") sys.exit(1) -def run_test (f,Kb,bitspersymbol,K,dimensionality,constellation,N0,seed): - tb = gr.top_block () + +def run_test(f, Kb, bitspersymbol, K, dimensionality, constellation, N0, seed): + tb = gr.top_block() # TX numpy.random.seed(-seed) - packet = numpy.random.randint(0,2,Kb) # create Kb random bits - packet[Kb-10:Kb]=0 - packet[0:Kb]=0 - src = blocks.vector_source_s(packet.tolist(),False) - b2s = blocks.unpacked_to_packed_ss(1,gr.GR_MSB_FIRST) # pack bits in shorts - s2fsmi = blocks.packed_to_unpacked_ss(bitspersymbol,gr.GR_MSB_FIRST) # unpack shorts to symbols compatible with the FSM input cardinality - enc = trellis.encoder_ss(f,0) # initial state = 0 - mod = digital.chunks_to_symbols_sf(constellation,dimensionality) + packet = numpy.random.randint(0, 2, Kb) # create Kb random bits + packet[Kb - 10:Kb] = 0 + packet[0:Kb] = 0 + src = blocks.vector_source_s(packet.tolist(), False) + b2s = blocks.unpacked_to_packed_ss( + 1, gr.GR_MSB_FIRST) # pack bits in shorts + # unpack shorts to symbols compatible with the FSM input cardinality + s2fsmi = blocks.packed_to_unpacked_ss(bitspersymbol, gr.GR_MSB_FIRST) + enc = trellis.encoder_ss(f, 0) # initial state = 0 + mod = digital.chunks_to_symbols_sf(constellation, dimensionality) # CHANNEL add = blocks.add_ff() - noise = analog.noise_source_f(analog.GR_GAUSSIAN,math.sqrt(N0 / 2),int(seed)) + noise = analog.noise_source_f( + analog.GR_GAUSSIAN, math.sqrt(N0 / 2), int(seed)) # RX - va = trellis.viterbi_combined_fs(f,K,0,0,dimensionality,constellation,digital.TRELLIS_EUCLIDEAN) # Put -1 if the Initial/Final states are not set. - fsmi2s = blocks.unpacked_to_packed_ss(bitspersymbol,gr.GR_MSB_FIRST) # pack FSM input symbols to shorts - s2b = blocks.packed_to_unpacked_ss(1,gr.GR_MSB_FIRST) # unpack shorts to bits - dst = blocks.vector_sink_s(); - - - tb.connect (src,b2s,s2fsmi,enc,mod) - tb.connect (mod,(add,0)) - tb.connect (noise,(add,1)) - tb.connect (add,va,fsmi2s,s2b,dst) - + # Put -1 if the Initial/Final states are not set. + va = trellis.viterbi_combined_fs( + f, K, 0, 0, dimensionality, constellation, digital.TRELLIS_EUCLIDEAN) + fsmi2s = blocks.unpacked_to_packed_ss( + bitspersymbol, gr.GR_MSB_FIRST) # pack FSM input symbols to shorts + s2b = blocks.packed_to_unpacked_ss( + 1, gr.GR_MSB_FIRST) # unpack shorts to bits + dst = blocks.vector_sink_s() + + tb.connect(src, b2s, s2fsmi, enc, mod) + tb.connect(mod, (add, 0)) + tb.connect(noise, (add, 1)) + tb.connect(add, va, fsmi2s, s2b, dst) tb.run() # A bit of cheating: run the program once and print the # final encoder state.. # Then put it as the last argument in the viterbi block - #print "final state = " , enc.ST() + # print "final state = " , enc.ST() if len(dst.data()) != len(packet): print("Error: not enough data:", len(dst.data()), len(packet)) - ntotal=len(packet) - nwrong = sum(abs(packet-numpy.array(dst.data()))); - return (ntotal,nwrong,abs(packet-numpy.array(dst.data()))) - - + ntotal = len(packet) + nwrong = sum(abs(packet - numpy.array(dst.data()))) + return (ntotal, nwrong, abs(packet - numpy.array(dst.data()))) def main(): parser = OptionParser(option_class=eng_option) - parser.add_option("-f", "--fsm_file", type="string", default="fsm_files/awgn1o2_4.fsm", help="Filename containing the fsm specification, e.g. -f fsm_files/awgn1o2_4.fsm (default=fsm_files/awgn1o2_4.fsm)") - parser.add_option("-e", "--esn0", type="eng_float", default=10.0, help="Symbol energy to noise PSD level ratio in dB, e.g., -e 10.0 (default=10.0)") - parser.add_option("-r", "--repetitions", type="int", default=100, help="Number of packets to be generated for the simulation, e.g., -r 100 (default=100)") - - (options, args) = parser.parse_args () + parser.add_option("-f", "--fsm_file", type="string", default="fsm_files/awgn1o2_4.fsm", + help="Filename containing the fsm specification, e.g. -f fsm_files/awgn1o2_4.fsm (default=fsm_files/awgn1o2_4.fsm)") + parser.add_option("-e", "--esn0", type="eng_float", default=10.0, + help="Symbol energy to noise PSD level ratio in dB, e.g., -e 10.0 (default=10.0)") + parser.add_option("-r", "--repetitions", type="int", default=100, + help="Number of packets to be generated for the simulation, e.g., -r 100 (default=100)") + + (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() raise SystemExit(1) - fname=options.fsm_file - esn0_db=float(options.esn0) - rep=int(options.repetitions) + fname = options.fsm_file + esn0_db = float(options.esn0) + rep = int(options.repetitions) # system parameters - f=trellis.fsm(fname) # get the FSM specification from a file + f = trellis.fsm(fname) # get the FSM specification from a file # alternatively you can specify the fsm from its generator matrix - #f=trellis.fsm(1,2,[5,7]) - Kb=1024*16 # packet size in bits (make it multiple of 16 so it can be packed in a short) - bitspersymbol = int(round(math.log(f.I()) / math.log(2))) # bits per FSM input symbol - K=Kb / bitspersymbol # packet size in trellis steps - modulation = fsm_utils.psk4 # see fsm_utlis.py for available predefined modulations + # f=trellis.fsm(1,2,[5,7]) + # packet size in bits (make it multiple of 16 so it can be packed in a short) + Kb = 1024 * 16 + # bits per FSM input symbol + bitspersymbol = int(round(math.log(f.I()) / math.log(2))) + K = Kb / bitspersymbol # packet size in trellis steps + modulation = fsm_utils.psk4 # see fsm_utlis.py for available predefined modulations dimensionality = modulation[0] constellation = modulation[1] if len(constellation) / dimensionality != f.O(): - sys.stderr.write ('Incompatible FSM output cardinality and modulation size.\n') - sys.exit (1) + sys.stderr.write( + 'Incompatible FSM output cardinality and modulation size.\n') + sys.exit(1) # calculate average symbol energy Es = 0 for i in range(len(constellation)): Es = Es + constellation[i]**2 - Es = Es / (len(constellation)//dimensionality) - N0=Es / pow(10.0,esn0_db/10.0); # calculate noise variance + Es = Es / (len(constellation) // dimensionality) + N0 = Es / pow(10.0, esn0_db / 10.0) # calculate noise variance - tot_b=0 # total number of transmitted bits - terr_b=0 # total number of bits in error - terr_p=0 # total number of packets in error + tot_b = 0 # total number of transmitted bits + terr_b = 0 # total number of bits in error + terr_p = 0 # total number of packets in error for i in range(rep): - (b,e,pattern)=run_test(f,Kb,bitspersymbol,K,dimensionality,constellation,N0,-(666+i)) # run experiment with different seed to get different noise realizations - tot_b=tot_b+b - terr_b=terr_b+e - terr_p=terr_p+(e!=0) - if ((i+1)%100==0) : # display progress - print(i+1,terr_p, '%.2e' % ((1.0*terr_p) / (i+1)),tot_b,terr_b, '%.2e' % ((1.0*terr_b) / tot_b)) - if e!=0: - print("rep=",i, e) + # run experiment with different seed to get different noise realizations + (b, e, pattern) = run_test(f, Kb, bitspersymbol, + K, dimensionality, constellation, N0, -(666 + i)) + tot_b = tot_b + b + terr_b = terr_b + e + terr_p = terr_p + (e != 0) + if ((i + 1) % 100 == 0): # display progress + print(i + 1, terr_p, '%.2e' % ((1.0 * terr_p) / (i + 1)), + tot_b, terr_b, '%.2e' % ((1.0 * terr_b) / tot_b)) + if e != 0: + print("rep=", i, e) for k in range(Kb): - if pattern[k]!=0: + if pattern[k] != 0: print(k) # estimate of the bit error rate - print(rep,terr_p, '%.2e' % ((1.0*terr_p) / (i+1)),tot_b,terr_b, '%.2e' % ((1.0*terr_b) / tot_b)) - + print(rep, terr_p, '%.2e' % ((1.0 * terr_p) / (i + 1)), + tot_b, terr_b, '%.2e' % ((1.0 * terr_b) / tot_b)) if __name__ == '__main__': main() - diff --git a/gr-trellis/python/trellis/fsm_utils.py b/gr-trellis/python/trellis/fsm_utils.py index 0cbd6b6750..7e78b19fa0 100644 --- a/gr-trellis/python/trellis/fsm_utils.py +++ b/gr-trellis/python/trellis/fsm_utils.py @@ -30,9 +30,9 @@ def dec2base(num, base, l): s = list(range(l)) n = num for i in range(l): - s[l-i-1]=n%base - n=int(n / base) - if n!=0: + s[l - i - 1] = n % base + n = int(n / base) + if n != 0: print('Number ', num, ' requires more than ', l, 'digits.') return s @@ -68,14 +68,14 @@ def make_isi_lookup(mod, channel, normalize): for i in range(len(channel)): channel[i] = channel[i] / math.sqrt(p) - lookup=list(range(len(constellation)**len(channel))) + lookup = list(range(len(constellation)**len(channel))) for o in range(len(constellation)**len(channel)): ss = dec2base(o, len(constellation), len(channel)) ll = 0 for i in range(len(channel)): - ll=ll+constellation[ss[i]]*channel[i] - lookup[o]=ll - return (1,lookup) + ll = ll + constellation[ss[i]] * channel[i] + lookup[o] = ll + return (1, lookup) def make_cpm_signals(K, P, M, L, q, frac): @@ -101,51 +101,50 @@ def make_cpm_signals(K, P, M, L, q, frac): X = (M**L) * P PSI = numpy.empty((X, Q)) for x in range(X): - xv=dec2base(x / P,M,L) - xv=numpy.append(xv, x%P) - qq1=numpy.zeros(Q) - for m in range(L): - qq1=qq1+xv[m]*q[m*Q:m*Q+Q] - psi=2*math.pi*h*xv[-1]+4*math.pi*h*qq1+w - #print(psi) - PSI[x]=psi + xv = dec2base(x / P, M, L) + xv = numpy.append(xv, x % P) + qq1 = numpy.zeros(Q) + for m in range(L): + qq1 = qq1 + xv[m] * q[m * Q:m * Q + Q] + psi = 2 * math.pi * h * xv[-1] + 4 * math.pi * h * qq1 + w + # print(psi) + PSI[x] = psi PSI = numpy.transpose(PSI) - SS=numpy.exp(1j*PSI) # contains all signals as columns - #print(SS) - + SS = numpy.exp(1j * PSI) # contains all signals as columns + # print(SS) # Now we need to orthogonalize the signals - F = scipy.linalg.orth(SS) # find an orthonormal basis for SS - #print(numpy.dot(numpy.transpose(F.conjugate()),F) # check for orthonormality) - S = numpy.dot(numpy.transpose(F.conjugate()),SS) - #print(F) - #print(S) + F = scipy.linalg.orth(SS) # find an orthonormal basis for SS + # print(numpy.dot(numpy.transpose(F.conjugate()),F) # check for orthonormality) + S = numpy.dot(numpy.transpose(F.conjugate()), SS) + # print(F) + # print(S) # We only want to keep those dimensions that contain most # of the energy of the overall constellation (eg, frac=0.9 ==> 90%) # evaluate mean energy in each dimension - E=numpy.sum(numpy.absolute(S)**2, axis=1) / Q - E=E / numpy.sum(E) - #print(E) + E = numpy.sum(numpy.absolute(S)**2, axis=1) / Q + E = E / numpy.sum(E) + # print(E) Es = -numpy.sort(-E) Esi = numpy.argsort(-E) - #print(Es) - #print(Esi) - Ecum=numpy.cumsum(Es) - #print(Ecum) - v0=numpy.searchsorted(Ecum,frac) - N = v0+1 - #print(v0) - #print(Esi[0:v0+1]) - Ff=numpy.transpose(numpy.transpose(F)[Esi[0:v0+1]]) - #print(Ff) - Sf = S[Esi[0:v0+1]] - #print(Sf) + # print(Es) + # print(Esi) + Ecum = numpy.cumsum(Es) + # print(Ecum) + v0 = numpy.searchsorted(Ecum, frac) + N = v0 + 1 + # print(v0) + # print(Esi[0:v0+1]) + Ff = numpy.transpose(numpy.transpose(F)[Esi[0:v0 + 1]]) + # print(Ff) + Sf = S[Esi[0:v0 + 1]] + # print(Sf) return (f0, SS, S, F, Sf, Ff, N) -#return f0 +# return f0 ###################################################################### # A list of common modulations. @@ -155,52 +154,52 @@ pam2 = (1, [-1, 1]) pam4 = (1, [-3, -1, 3, 1]) # includes Gray mapping pam8 = (1, [-7, -5, -3, -1, 1, 3, 5, 7]) -psk4=(2,[1, 0, \ - 0, 1, \ - 0, -1,\ - -1, 0]) # includes Gray mapping - -psk8=(2,[math.cos(2*math.pi*0/8), math.sin(2*math.pi*0/8), \ - math.cos(2*math.pi*1/8), math.sin(2*math.pi*1/8), \ - math.cos(2*math.pi*2/8), math.sin(2*math.pi*2/8), \ - math.cos(2*math.pi*3/8), math.sin(2*math.pi*3/8), \ - math.cos(2*math.pi*4/8), math.sin(2*math.pi*4/8), \ - math.cos(2*math.pi*5/8), math.sin(2*math.pi*5/8), \ - math.cos(2*math.pi*6/8), math.sin(2*math.pi*6/8), \ - math.cos(2*math.pi*7/8), math.sin(2*math.pi*7/8)]) - -psk2x3 = (3,[-1,-1,-1, \ - -1,-1,1, \ - -1,1,-1, \ - -1,1,1, \ - 1,-1,-1, \ - 1,-1,1, \ - 1,1,-1, \ - 1,1,1]) - -psk2x4 = (4,[-1,-1,-1,-1, \ - -1,-1,-1,1, \ - -1,-1,1,-1, \ - -1,-1,1,1, \ - -1,1,-1,-1, \ - -1,1,-1,1, \ - -1,1,1,-1, \ - -1,1,1,1, \ - 1,-1,-1,-1, \ - 1,-1,-1,1, \ - 1,-1,1,-1, \ - 1,-1,1,1, \ - 1,1,-1,-1, \ - 1,1,-1,1, \ - 1,1,1,-1, \ - 1,1,1,1]) - -orth2 = (2,[1, 0, \ - 0, 1]) -orth4=(4,[1, 0, 0, 0, \ - 0, 1, 0, 0, \ - 0, 0, 1, 0, \ - 0, 0, 0, 1]) +psk4 = (2, [1, 0, + 0, 1, + 0, -1, + -1, 0]) # includes Gray mapping + +psk8 = (2, [math.cos(2 * math.pi * 0 / 8), math.sin(2 * math.pi * 0 / 8), + math.cos(2 * math.pi * 1 / 8), math.sin(2 * math.pi * 1 / 8), + math.cos(2 * math.pi * 2 / 8), math.sin(2 * math.pi * 2 / 8), + math.cos(2 * math.pi * 3 / 8), math.sin(2 * math.pi * 3 / 8), + math.cos(2 * math.pi * 4 / 8), math.sin(2 * math.pi * 4 / 8), + math.cos(2 * math.pi * 5 / 8), math.sin(2 * math.pi * 5 / 8), + math.cos(2 * math.pi * 6 / 8), math.sin(2 * math.pi * 6 / 8), + math.cos(2 * math.pi * 7 / 8), math.sin(2 * math.pi * 7 / 8)]) + +psk2x3 = (3, [-1, -1, -1, + -1, -1, 1, + -1, 1, -1, + -1, 1, 1, + 1, -1, -1, + 1, -1, 1, + 1, 1, -1, + 1, 1, 1]) + +psk2x4 = (4, [-1, -1, -1, -1, + -1, -1, -1, 1, + -1, -1, 1, -1, + -1, -1, 1, 1, + -1, 1, -1, -1, + -1, 1, -1, 1, + -1, 1, 1, -1, + -1, 1, 1, 1, + 1, -1, -1, -1, + 1, -1, -1, 1, + 1, -1, 1, -1, + 1, -1, 1, 1, + 1, 1, -1, -1, + 1, 1, -1, 1, + 1, 1, 1, -1, + 1, 1, 1, 1]) + +orth2 = (2, [1, 0, + 0, 1]) +orth4 = (4, [1, 0, 0, 0, + 0, 1, 0, 0, + 0, 0, 1, 0, + 0, 0, 0, 1]) ###################################################################### # A list of channels to be tested |