summaryrefslogtreecommitdiff
path: root/gr-fec/python
diff options
context:
space:
mode:
authorJosh Morman <jmorman@gnuradio.org>2021-11-24 12:32:44 -0500
committermormj <34754695+mormj@users.noreply.github.com>2021-11-24 14:41:53 -0500
commitc747e37751546159f30a2a1296dc4099fe132a53 (patch)
tree9b1b9851c9c98d1a0082cac5a65ba9439e5db776 /gr-fec/python
parent3cecf9268b15bb0e9e2fedaeb2528bd3038beee6 (diff)
fec: pep8 formatting
Signed-off-by: Josh Morman <jmorman@gnuradio.org>
Diffstat (limited to 'gr-fec/python')
-rw-r--r--gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py2
-rw-r--r--gr-fec/python/fec/LDPC/Generate_LDPC_matrix_functions.py1276
-rw-r--r--gr-fec/python/fec/__init__.py30
-rw-r--r--gr-fec/python/fec/_qa_helper.py24
-rw-r--r--gr-fec/python/fec/bercurve_generator.py14
-rw-r--r--gr-fec/python/fec/bitflip.py73
-rw-r--r--gr-fec/python/fec/capillary_threaded_decoder.py34
-rw-r--r--gr-fec/python/fec/capillary_threaded_encoder.py69
-rw-r--r--gr-fec/python/fec/extended_async_encoder.py11
-rw-r--r--gr-fec/python/fec/extended_decoder.py22
-rw-r--r--gr-fec/python/fec/extended_encoder.py18
-rw-r--r--gr-fec/python/fec/extended_tagged_decoder.py20
-rw-r--r--gr-fec/python/fec/extended_tagged_encoder.py17
-rw-r--r--gr-fec/python/fec/fec_test.py10
-rw-r--r--gr-fec/python/fec/polar/__init__.py18
-rw-r--r--gr-fec/python/fec/polar/channel_construction.py3
-rw-r--r--gr-fec/python/fec/polar/channel_construction_awgn.py18
-rw-r--r--gr-fec/python/fec/polar/channel_construction_bec.py5
-rw-r--r--gr-fec/python/fec/polar/common.py3
-rw-r--r--gr-fec/python/fec/polar/decoder.py25
-rw-r--r--gr-fec/python/fec/polar/encoder.py8
-rw-r--r--gr-fec/python/fec/polar/helper_functions.py9
-rw-r--r--gr-fec/python/fec/polar/polar_channel_construction12
-rw-r--r--gr-fec/python/fec/polar/testbed.py9
-rw-r--r--gr-fec/python/fec/qa_fecapi_dummy.py8
-rw-r--r--gr-fec/python/fec/threaded_decoder.py11
-rw-r--r--gr-fec/python/fec/threaded_encoder.py14
27 files changed, 911 insertions, 852 deletions
diff --git a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py
index 4e0c622e15..b745815d33 100644
--- a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py
+++ b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py
@@ -41,7 +41,7 @@ n = 200 # number of columns, corresponds to codeword length
p = 3 # column weight
q = 5 # row weight
-parity_check_matrix = LDPC_matrix(n_p_q = [n, p, q])
+parity_check_matrix = LDPC_matrix(n_p_q=[n, p, q])
# Richardson and Urbanke's preprocessing method requires a full rank
# matrix to start. The matrices generated by the
diff --git a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix_functions.py b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix_functions.py
index 9563d66afb..7d5e00536b 100644
--- a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix_functions.py
+++ b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix_functions.py
@@ -21,698 +21,700 @@ from numpy.random import shuffle, randint
# verbose = 1 #######################################################
def read_alist_file(filename):
- """
- This function reads in an alist file and creates the
- corresponding parity check matrix H. The format of alist
- files is described at:
- http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html
- """
-
- with open(filename, 'r') as myfile:
- data = myfile.readlines()
- numCols, numRows = parse_alist_header(data[0])
- H = zeros((numRows, numCols))
- # The locations of 1s starts in the 5th line of the file
- for lineNumber in np.arange(4, 4 + numCols):
- indices = data[lineNumber].split()
- for index in indices:
- H[int(index) - 1, lineNumber - 4] = 1
- # The subsequent lines in the file list the indices for where
- # the 1s are in the rows, but this is redundant
- # information.
-
- return H
+ """
+ This function reads in an alist file and creates the
+ corresponding parity check matrix H. The format of alist
+ files is described at:
+ http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html
+ """
+
+ with open(filename, 'r') as myfile:
+ data = myfile.readlines()
+ numCols, numRows = parse_alist_header(data[0])
+ H = zeros((numRows, numCols))
+ # The locations of 1s starts in the 5th line of the file
+ for lineNumber in np.arange(4, 4 + numCols):
+ indices = data[lineNumber].split()
+ for index in indices:
+ H[int(index) - 1, lineNumber - 4] = 1
+ # The subsequent lines in the file list the indices for where
+ # the 1s are in the rows, but this is redundant
+ # information.
+
+ return H
def parse_alist_header(header):
- size = header.split()
- return int(size[0]), int(size[1])
+ size = header.split()
+ return int(size[0]), int(size[1])
def write_alist_file(filename, H, verbose=0):
- """
- This function writes an alist file for the parity check
- matrix. The format of alist files is described at:
- http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html
- """
- with open(filename, 'w') as myfile:
-
- numRows = H.shape[0]
- numCols = H.shape[1]
-
- tempstring = repr(numCols) + ' ' + repr(numRows) + '\n'
- myfile.write(tempstring)
-
- tempstring1 = ''
- tempstring2 = ''
- maxRowWeight = 0
- for rowNum in np.arange(numRows):
- nonzeros = array(H[rowNum, :].nonzero())
- rowWeight = nonzeros.shape[1]
- if rowWeight > maxRowWeight:
- maxRowWeight = rowWeight
- tempstring1 = tempstring1 + repr(rowWeight) + ' '
- for tempArray in nonzeros:
- for index in tempArray:
- tempstring2 = tempstring2 + repr(index + 1) + ' '
- tempstring2 = tempstring2 + '\n'
- tempstring1 = tempstring1 + '\n'
-
- tempstring3 = ''
- tempstring4 = ''
- maxColWeight = 0
- for colNum in np.arange(numCols):
- nonzeros = array(H[:, colNum].nonzero())
- colWeight = nonzeros.shape[1]
- if colWeight > maxColWeight:
- maxColWeight = colWeight
- tempstring3 = tempstring3 + repr(colWeight) + ' '
- for tempArray in nonzeros:
- for index in tempArray:
- tempstring4 = tempstring4 + repr(index + 1) + ' '
- tempstring4 = tempstring4 + '\n'
- tempstring3 = tempstring3 + '\n'
-
- tempstring = repr(maxColWeight) + ' ' + repr(maxRowWeight) + '\n'
- # write out max column and row weights
- myfile.write(tempstring)
- # write out all of the column weights
- myfile.write(tempstring3)
- # write out all of the row weights
- myfile.write(tempstring1)
- # write out the nonzero indices for each column
- myfile.write(tempstring4)
- # write out the nonzero indices for each row
- myfile.write(tempstring2)
+ """
+ This function writes an alist file for the parity check
+ matrix. The format of alist files is described at:
+ http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html
+ """
+ with open(filename, 'w') as myfile:
+
+ numRows = H.shape[0]
+ numCols = H.shape[1]
+
+ tempstring = repr(numCols) + ' ' + repr(numRows) + '\n'
+ myfile.write(tempstring)
+
+ tempstring1 = ''
+ tempstring2 = ''
+ maxRowWeight = 0
+ for rowNum in np.arange(numRows):
+ nonzeros = array(H[rowNum, :].nonzero())
+ rowWeight = nonzeros.shape[1]
+ if rowWeight > maxRowWeight:
+ maxRowWeight = rowWeight
+ tempstring1 = tempstring1 + repr(rowWeight) + ' '
+ for tempArray in nonzeros:
+ for index in tempArray:
+ tempstring2 = tempstring2 + repr(index + 1) + ' '
+ tempstring2 = tempstring2 + '\n'
+ tempstring1 = tempstring1 + '\n'
+
+ tempstring3 = ''
+ tempstring4 = ''
+ maxColWeight = 0
+ for colNum in np.arange(numCols):
+ nonzeros = array(H[:, colNum].nonzero())
+ colWeight = nonzeros.shape[1]
+ if colWeight > maxColWeight:
+ maxColWeight = colWeight
+ tempstring3 = tempstring3 + repr(colWeight) + ' '
+ for tempArray in nonzeros:
+ for index in tempArray:
+ tempstring4 = tempstring4 + repr(index + 1) + ' '
+ tempstring4 = tempstring4 + '\n'
+ tempstring3 = tempstring3 + '\n'
+
+ tempstring = repr(maxColWeight) + ' ' + repr(maxRowWeight) + '\n'
+ # write out max column and row weights
+ myfile.write(tempstring)
+ # write out all of the column weights
+ myfile.write(tempstring3)
+ # write out all of the row weights
+ myfile.write(tempstring1)
+ # write out the nonzero indices for each column
+ myfile.write(tempstring4)
+ # write out the nonzero indices for each row
+ myfile.write(tempstring2)
class LDPC_matrix(object):
- """ Class for a LDPC parity check matrix """
-
- def __init__(self, alist_filename=None,
- n_p_q=None,
- H_matrix=None):
- if (alist_filename != None):
- self.H = read_alist_file(alist_filename)
- elif (n_p_q != None):
- self.H = self.regular_LDPC_code_contructor(n_p_q)
- elif (H_matrix != None):
- self.H = H_matrix
- else:
- print('Error: provide either an alist filename, ', end='')
- print('parameters for constructing regular LDPC parity, ', end='')
- print('check matrix, or a numpy array.')
+ """ Class for a LDPC parity check matrix """
+
+ def __init__(self, alist_filename=None,
+ n_p_q=None,
+ H_matrix=None):
+ if (alist_filename != None):
+ self.H = read_alist_file(alist_filename)
+ elif (n_p_q != None):
+ self.H = self.regular_LDPC_code_contructor(n_p_q)
+ elif (H_matrix != None):
+ self.H = H_matrix
+ else:
+ print('Error: provide either an alist filename, ', end='')
+ print('parameters for constructing regular LDPC parity, ', end='')
+ print('check matrix, or a numpy array.')
+
+ self.rank = linalg.matrix_rank(self.H)
+ self.numRows = self.H.shape[0]
+ self.n = self.H.shape[1]
+ self.k = self.n - self.numRows
+
+ def regular_LDPC_code_contructor(self, n_p_q):
+ """
+ This function constructs a LDPC parity check matrix
+ H. The algorithm follows Gallager's approach where we create
+ p submatrices and stack them together. Reference: Turbo
+ Coding for Satellite and Wireless Communications, section
+ 9,3.
+
+ Note: the matrices computed from this algorithm will never
+ have full rank. (Reference Gallager's Dissertation.) They
+ will have rank = (number of rows - p + 1). To convert it
+ to full rank, use the function get_full_rank_H_matrix
+ """
+
+ n = n_p_q[0] # codeword length
+ p = n_p_q[1] # column weight
+ q = n_p_q[2] # row weight
+ # TODO: There should probably be other guidelines for n/p/q,
+ # but I have not found any specifics in the literature....
+
+ # For this algorithm, n/p must be an integer, because the
+ # number of rows in each submatrix must be a whole number.
+ ratioTest = (n * 1.0) / q
+ if ratioTest % 1 != 0:
+ print('\nError in regular_LDPC_code_contructor: The ', end='')
+ print('ratio of inputs n/q must be a whole number.\n')
+ return
+
+ # First submatrix first:
+ m = (n * p) / q # number of rows in H matrix
+ submatrix1 = zeros((m / p, n))
+ for row in np.arange(m / p):
+ range1 = row * q
+ range2 = (row + 1) * q
+ submatrix1[row, range1:range2] = 1
+ H = submatrix1
+
+ # Create the other submatrices and vertically stack them on.
+ submatrixNum = 2
+ newColumnOrder = np.arange(n)
+ while submatrixNum <= p:
+ submatrix = zeros((m / p, n))
+ shuffle(newColumnOrder)
+
+ for columnNum in np.arange(n):
+ submatrix[:, columnNum] = \
+ submatrix1[:, newColumnOrder[columnNum]]
+
+ H = vstack((H, submatrix))
+ submatrixNum = submatrixNum + 1
+
+ # Double check the row weight and column weights.
+ size = H.shape
+ rows = size[0]
+ cols = size[1]
+
+ # Check the row weights.
+ for rowNum in np.arange(rows):
+ nonzeros = array(H[rowNum, :].nonzero())
+ if nonzeros.shape[1] != q:
+ print('Row', rowNum, 'has incorrect weight!')
+ return
+
+ # Check the column weights
+ for columnNum in np.arange(cols):
+ nonzeros = array(H[:, columnNum].nonzero())
+ if nonzeros.shape[1] != p:
+ print('Row', columnNum, 'has incorrect weight!')
+ return
+
+ return H
- self.rank = linalg.matrix_rank(self.H)
- self.numRows = self.H.shape[0]
- self.n = self.H.shape[1]
- self.k = self.n - self.numRows
- def regular_LDPC_code_contructor(self, n_p_q):
+def greedy_upper_triangulation(H, verbose=0):
"""
- This function constructs a LDPC parity check matrix
- H. The algorithm follows Gallager's approach where we create
- p submatrices and stack them together. Reference: Turbo
- Coding for Satellite and Wireless Communications, section
- 9,3.
-
- Note: the matrices computed from this algorithm will never
- have full rank. (Reference Gallager's Dissertation.) They
- will have rank = (number of rows - p + 1). To convert it
- to full rank, use the function get_full_rank_H_matrix
+ This function performs row/column permutations to bring
+ H into approximate upper triangular form via greedy
+ upper triangulation method outlined in Modern Coding
+ Theory Appendix 1, Section A.2
"""
-
- n = n_p_q[0] # codeword length
- p = n_p_q[1] # column weight
- q = n_p_q[2] # row weight
- # TODO: There should probably be other guidelines for n/p/q,
- # but I have not found any specifics in the literature....
-
- # For this algorithm, n/p must be an integer, because the
- # number of rows in each submatrix must be a whole number.
- ratioTest = (n * 1.0) / q
- if ratioTest % 1 != 0:
- print('\nError in regular_LDPC_code_contructor: The ', end='')
- print('ratio of inputs n/q must be a whole number.\n')
- return
-
- # First submatrix first:
- m = (n * p) / q # number of rows in H matrix
- submatrix1 = zeros((m / p, n))
- for row in np.arange(m / p):
- range1 = row * q
- range2 = (row + 1) * q
- submatrix1[row, range1:range2] = 1
- H = submatrix1
-
- # Create the other submatrices and vertically stack them on.
- submatrixNum = 2
- newColumnOrder = np.arange(n)
- while submatrixNum <= p:
- submatrix = zeros((m / p, n))
- shuffle(newColumnOrder)
-
- for columnNum in np.arange(n):
- submatrix[:, columnNum] = \
- submatrix1[:, newColumnOrder[columnNum]]
-
- H = vstack((H, submatrix))
- submatrixNum = submatrixNum + 1
-
- # Double check the row weight and column weights.
- size = H.shape
- rows = size[0]
- cols = size[1]
-
- # Check the row weights.
- for rowNum in np.arange(rows):
- nonzeros = array(H[rowNum, :].nonzero())
- if nonzeros.shape[1] != q:
- print('Row', rowNum, 'has incorrect weight!')
- return
-
- # Check the column weights
- for columnNum in np.arange(cols):
- nonzeros = array(H[:, columnNum].nonzero())
- if nonzeros.shape[1] != p:
- print('Row', columnNum, 'has incorrect weight!')
+ H_t = H.copy()
+
+ # Per email from Dr. Urbanke, author of this textbook, this
+ # algorithm requires H to be full rank
+ if linalg.matrix_rank(H_t) != H_t.shape[0]:
+ print('Rank of H:', linalg.matrix_rank(tempArray))
+ print('H has', H_t.shape[0], 'rows')
+ print('Error: H must be full rank.')
return
- return H
-
+ size = H_t.shape
+ n = size[1]
+ k = n - size[0]
+ g = t = 0
+
+ while t != (n - k - g):
+ H_residual = H_t[t:n - k - g, t:n]
+ size = H_residual.shape
+ numRows = size[0]
+ numCols = size[1]
+
+ minResidualDegrees = zeros((1, numCols), dtype=int)
+
+ for colNum in np.arange(numCols):
+ nonZeroElements = array(H_residual[:, colNum].nonzero())
+ minResidualDegrees[0, colNum] = nonZeroElements.shape[1]
+
+ # Find the minimum nonzero residual degree
+ nonZeroElementIndices = minResidualDegrees.nonzero()
+ nonZeroElements = minResidualDegrees[nonZeroElementIndices[0],
+ nonZeroElementIndices[1]]
+ minimumResidualDegree = nonZeroElements.min()
+
+ # Get indices of all of the columns in H_t that have degree
+ # equal to the min positive residual degree, then pick a
+ # random column c.
+ indices = (minResidualDegrees == minimumResidualDegree) \
+ .nonzero()[1]
+ indices = indices + t
+ if indices.shape[0] == 1:
+ columnC = indices[0]
+ else:
+ randomIndex = randint(0, indices.shape[0], (1, 1))[0][0]
+ columnC = indices[randomIndex]
-def greedy_upper_triangulation(H, verbose=0):
- """
- This function performs row/column permutations to bring
- H into approximate upper triangular form via greedy
- upper triangulation method outlined in Modern Coding
- Theory Appendix 1, Section A.2
- """
- H_t = H.copy()
-
- # Per email from Dr. Urbanke, author of this textbook, this
- # algorithm requires H to be full rank
- if linalg.matrix_rank(H_t) != H_t.shape[0]:
- print('Rank of H:', linalg.matrix_rank(tempArray))
- print('H has', H_t.shape[0], 'rows')
- print('Error: H must be full rank.')
- return
-
- size = H_t.shape
- n = size[1]
- k = n - size[0]
- g = t = 0
-
- while t != (n - k - g):
- H_residual = H_t[t:n - k - g, t:n]
- size = H_residual.shape
- numRows = size[0]
- numCols = size[1]
-
- minResidualDegrees = zeros((1, numCols), dtype=int)
-
- for colNum in np.arange(numCols):
- nonZeroElements = array(H_residual[:, colNum].nonzero())
- minResidualDegrees[0, colNum] = nonZeroElements.shape[1]
-
- # Find the minimum nonzero residual degree
- nonZeroElementIndices = minResidualDegrees.nonzero()
- nonZeroElements = minResidualDegrees[nonZeroElementIndices[0],
- nonZeroElementIndices[1]]
- minimumResidualDegree = nonZeroElements.min()
-
- # Get indices of all of the columns in H_t that have degree
- # equal to the min positive residual degree, then pick a
- # random column c.
- indices = (minResidualDegrees == minimumResidualDegree) \
- .nonzero()[1]
- indices = indices + t
- if indices.shape[0] == 1:
- columnC = indices[0]
- else:
- randomIndex = randint(0, indices.shape[0], (1, 1))[0][0]
- columnC = indices[randomIndex]
-
- Htemp = H_t.copy()
-
- if minimumResidualDegree == 1:
- # This is the 'extend' case
- rowThatContainsNonZero = H_residual[:, columnC - t].nonzero()[0][0]
-
- # Swap column c with column t. (Book says t+1 but we
- # index from 0, not 1.)
- Htemp[:, columnC] = H_t[:, t]
- Htemp[:, t] = H_t[:, columnC]
- H_t = Htemp.copy()
- Htemp = H_t.copy()
- # Swap row r with row t. (Book says t+1 but we index from
- # 0, not 1.)
- Htemp[rowThatContainsNonZero + t, :] = H_t[t, :]
- Htemp[t, :] = H_t[rowThatContainsNonZero + t, :]
- H_t = Htemp.copy()
- Htemp = H_t.copy()
- else:
- # This is the 'choose' case.
- rowsThatContainNonZeros = H_residual[:, columnC - t] \
- .nonzero()[0]
-
- # Swap column c with column t. (Book says t+1 but we
- # index from 0, not 1.)
- Htemp[:, columnC] = H_t[:, t]
- Htemp[:, t] = H_t[:, columnC]
- H_t = Htemp.copy()
- Htemp = H_t.copy()
-
- # Swap row r1 with row t
- r1 = rowsThatContainNonZeros[0]
- Htemp[r1 + t, :] = H_t[t, :]
- Htemp[t, :] = H_t[r1 + t, :]
- numRowsLeft = rowsThatContainNonZeros.shape[0] - 1
- H_t = Htemp.copy()
- Htemp = H_t.copy()
-
- # Move the other rows that contain nonZero entries to the
- # bottom of the matrix. We can't just swap them,
- # otherwise we will be pulling up rows that we pushed
- # down before. So, use a rotation method.
- for index in np.arange(1, numRowsLeft + 1):
- rowInH_residual = rowsThatContainNonZeros[index]
- rowInH_t = rowInH_residual + t - index + 1
- m = n - k
- # Move the row with the nonzero element to the
- # bottom; don't update H_t.
- Htemp[m - 1, :] = H_t[rowInH_t, :]
- # Now rotate the bottom rows up.
- sub_index = 1
- while sub_index < (m - rowInH_t):
- Htemp[m - sub_index - 1, :] = H_t[m - sub_index, :]
- sub_index = sub_index + 1
- H_t = Htemp.copy()
Htemp = H_t.copy()
- # Save temp H as new H_t.
- H_t = Htemp.copy()
- Htemp = H_t.copy()
- g = g + (minimumResidualDegree - 1)
-
- t = t + 1
-
- if g == 0:
- if verbose:
- print('Error: gap is 0.')
- return
-
- # We need to ensure phi is nonsingular.
- T = H_t[0:t, 0:t]
- E = H_t[t:t + g, 0:t]
- A = H_t[0:t, t:t + g]
- C = H_t[t:t + g, t:t + g]
- D = H_t[t:t + g, t + g:n]
-
- invTmod2array = inv_mod2(T)
- temp1 = dot(E, invTmod2array) % 2
- temp2 = dot(temp1, A) % 2
- phi = (C - temp2) % 2
- if phi.any():
- try:
- # Try to take the inverse of phi.
- invPhi = inv_mod2(phi)
- except linalg.linalg.LinAlgError:
- # Phi is singular
- if verbose > 1:
- print('Initial phi is singular')
- else:
- # Phi is nonsingular, so we need to use this version of H.
- if verbose > 1:
- print('Initial phi is nonsingular')
- return [H_t, g, t]
- else:
- if verbose:
- print('Initial phi is all zeros:\n', phi)
+ if minimumResidualDegree == 1:
+ # This is the 'extend' case
+ rowThatContainsNonZero = H_residual[:, columnC - t].nonzero()[0][0]
+
+ # Swap column c with column t. (Book says t+1 but we
+ # index from 0, not 1.)
+ Htemp[:, columnC] = H_t[:, t]
+ Htemp[:, t] = H_t[:, columnC]
+ H_t = Htemp.copy()
+ Htemp = H_t.copy()
+ # Swap row r with row t. (Book says t+1 but we index from
+ # 0, not 1.)
+ Htemp[rowThatContainsNonZero + t, :] = H_t[t, :]
+ Htemp[t, :] = H_t[rowThatContainsNonZero + t, :]
+ H_t = Htemp.copy()
+ Htemp = H_t.copy()
+ else:
+ # This is the 'choose' case.
+ rowsThatContainNonZeros = H_residual[:, columnC - t] \
+ .nonzero()[0]
+
+ # Swap column c with column t. (Book says t+1 but we
+ # index from 0, not 1.)
+ Htemp[:, columnC] = H_t[:, t]
+ Htemp[:, t] = H_t[:, columnC]
+ H_t = Htemp.copy()
+ Htemp = H_t.copy()
+
+ # Swap row r1 with row t
+ r1 = rowsThatContainNonZeros[0]
+ Htemp[r1 + t, :] = H_t[t, :]
+ Htemp[t, :] = H_t[r1 + t, :]
+ numRowsLeft = rowsThatContainNonZeros.shape[0] - 1
+ H_t = Htemp.copy()
+ Htemp = H_t.copy()
+
+ # Move the other rows that contain nonZero entries to the
+ # bottom of the matrix. We can't just swap them,
+ # otherwise we will be pulling up rows that we pushed
+ # down before. So, use a rotation method.
+ for index in np.arange(1, numRowsLeft + 1):
+ rowInH_residual = rowsThatContainNonZeros[index]
+ rowInH_t = rowInH_residual + t - index + 1
+ m = n - k
+ # Move the row with the nonzero element to the
+ # bottom; don't update H_t.
+ Htemp[m - 1, :] = H_t[rowInH_t, :]
+ # Now rotate the bottom rows up.
+ sub_index = 1
+ while sub_index < (m - rowInH_t):
+ Htemp[m - sub_index - 1, :] = H_t[m - sub_index, :]
+ sub_index = sub_index + 1
+ H_t = Htemp.copy()
+ Htemp = H_t.copy()
+
+ # Save temp H as new H_t.
+ H_t = Htemp.copy()
+ Htemp = H_t.copy()
+ g = g + (minimumResidualDegree - 1)
+
+ t = t + 1
+
+ if g == 0:
+ if verbose:
+ print('Error: gap is 0.')
+ return
- # If the C and D submatrices are all zeros, there is no point in
- # shuffling them around in an attempt to find a good phi.
- if not (C.any() or D.any()):
- if verbose:
- print('C and D are all zeros. There is no hope in', )
- print('finding a nonsingular phi matrix. ')
- return
-
- # We can't look at every row/column permutation possibility
- # because there would be (n-t)! column shuffles and g! row
- # shuffles. g has gotten up to 12 in tests, so 12! would still
- # take quite some time. Instead, we will just pick an arbitrary
- # number of max iterations to perform, then break.
- maxIterations = 300
- iterationCount = 0
- columnsToShuffle = np.arange(t, n)
- rowsToShuffle = np.arange(t, t + g)
-
- while iterationCount < maxIterations:
- if verbose > 1:
- print('iterationCount:', iterationCount)
- tempH = H_t.copy()
-
- shuffle(columnsToShuffle)
- shuffle(rowsToShuffle)
- index = 0
- for newDestinationColumnNumber in np.arange(t, n):
- oldColumnNumber = columnsToShuffle[index]
- tempH[:, newDestinationColumnNumber] = \
- H_t[:, oldColumnNumber]
- index += 1
-
- tempH2 = tempH.copy()
- index = 0
- for newDesinationRowNumber in np.arange(t, t + g):
- oldRowNumber = rowsToShuffle[index]
- tempH[newDesinationRowNumber, :] = tempH2[oldRowNumber, :]
- index += 1
-
- # Now test this new H matrix.
- H_t = tempH.copy()
+ # We need to ensure phi is nonsingular.
T = H_t[0:t, 0:t]
E = H_t[t:t + g, 0:t]
A = H_t[0:t, t:t + g]
C = H_t[t:t + g, t:t + g]
+ D = H_t[t:t + g, t + g:n]
+
invTmod2array = inv_mod2(T)
temp1 = dot(E, invTmod2array) % 2
temp2 = dot(temp1, A) % 2
phi = (C - temp2) % 2
if phi.any():
- try:
- # Try to take the inverse of phi.
- invPhi = inv_mod2(phi)
- except linalg.linalg.LinAlgError:
- # Phi is singular
- if verbose > 1:
- print('Phi is still singular')
- else:
- # Phi is nonsingular, so we're done.
- if verbose:
- print('Found a nonsingular phi on', )
- print('iterationCount = ', iterationCount)
- return [H_t, g, t]
+ try:
+ # Try to take the inverse of phi.
+ invPhi = inv_mod2(phi)
+ except linalg.linalg.LinAlgError:
+ # Phi is singular
+ if verbose > 1:
+ print('Initial phi is singular')
+ else:
+ # Phi is nonsingular, so we need to use this version of H.
+ if verbose > 1:
+ print('Initial phi is nonsingular')
+ return [H_t, g, t]
else:
- if verbose > 1:
- print('phi is all zeros')
-
- iterationCount += 1
-
- # If we've reached this point, then we haven't found a
- # version of H that has a nonsingular phi.
- if verbose:
- print('--- Error: nonsingular phi matrix not found.')
+ if verbose:
+ print('Initial phi is all zeros:\n', phi)
+ # If the C and D submatrices are all zeros, there is no point in
+ # shuffling them around in an attempt to find a good phi.
+ if not (C.any() or D.any()):
+ if verbose:
+ print('C and D are all zeros. There is no hope in', )
+ print('finding a nonsingular phi matrix. ')
+ return
-def inv_mod2(squareMatrix, verbose=0):
- """
- Calculates the mod 2 inverse of a matrix.
- """
- A = squareMatrix.copy()
- t = A.shape[0]
-
- # Special case for one element array [1]
- if A.size == 1 and A[0] == 1:
- return array([1])
-
- Ainverse = inv(A)
- B = det(A) * Ainverse
- C = B % 2
-
- # Encountered lots of rounding errors with this function.
- # Previously tried floor, C.astype(int), and casting with (int)
- # and none of that works correctly, so doing it the tedious way.
-
- test = dot(A, C) % 2
- tempTest = zeros_like(test)
- for colNum in np.arange(test.shape[1]):
- for rowNum in np.arange(test.shape[0]):
- value = test[rowNum, colNum]
- if (abs(1 - value)) < 0.01:
- # this is a 1
- tempTest[rowNum, colNum] = 1
- elif (abs(2 - value)) < 0.01:
- # there shouldn't be any 2s after B % 2, but I'm
- # seeing them!
- tempTest[rowNum, colNum] = 0
- elif (abs(0 - value)) < 0.01:
- # this is a 0
- tempTest[rowNum, colNum] = 0
- else:
+ # We can't look at every row/column permutation possibility
+ # because there would be (n-t)! column shuffles and g! row
+ # shuffles. g has gotten up to 12 in tests, so 12! would still
+ # take quite some time. Instead, we will just pick an arbitrary
+ # number of max iterations to perform, then break.
+ maxIterations = 300
+ iterationCount = 0
+ columnsToShuffle = np.arange(t, n)
+ rowsToShuffle = np.arange(t, t + g)
+
+ while iterationCount < maxIterations:
if verbose > 1:
- print('In inv_mod2. Rounding error on this', )
- print('value? Mod 2 has already been done.', )
- print('value:', value)
+ print('iterationCount:', iterationCount)
+ tempH = H_t.copy()
+
+ shuffle(columnsToShuffle)
+ shuffle(rowsToShuffle)
+ index = 0
+ for newDestinationColumnNumber in np.arange(t, n):
+ oldColumnNumber = columnsToShuffle[index]
+ tempH[:, newDestinationColumnNumber] = \
+ H_t[:, oldColumnNumber]
+ index += 1
+
+ tempH2 = tempH.copy()
+ index = 0
+ for newDesinationRowNumber in np.arange(t, t + g):
+ oldRowNumber = rowsToShuffle[index]
+ tempH[newDesinationRowNumber, :] = tempH2[oldRowNumber, :]
+ index += 1
+
+ # Now test this new H matrix.
+ H_t = tempH.copy()
+ T = H_t[0:t, 0:t]
+ E = H_t[t:t + g, 0:t]
+ A = H_t[0:t, t:t + g]
+ C = H_t[t:t + g, t:t + g]
+ invTmod2array = inv_mod2(T)
+ temp1 = dot(E, invTmod2array) % 2
+ temp2 = dot(temp1, A) % 2
+ phi = (C - temp2) % 2
+ if phi.any():
+ try:
+ # Try to take the inverse of phi.
+ invPhi = inv_mod2(phi)
+ except linalg.linalg.LinAlgError:
+ # Phi is singular
+ if verbose > 1:
+ print('Phi is still singular')
+ else:
+ # Phi is nonsingular, so we're done.
+ if verbose:
+ print('Found a nonsingular phi on', )
+ print('iterationCount = ', iterationCount)
+ return [H_t, g, t]
+ else:
+ if verbose > 1:
+ print('phi is all zeros')
+
+ iterationCount += 1
+
+ # If we've reached this point, then we haven't found a
+ # version of H that has a nonsingular phi.
+ if verbose:
+ print('--- Error: nonsingular phi matrix not found.')
- test = tempTest.copy()
- if (test - eye(t, t) % 2).any():
- if verbose:
- print('Error in inv_mod2: did not find inverse.')
- # TODO is this the most appropriate error to raise?
- raise linalg.linalg.LinAlgError
- else:
- return C
+def inv_mod2(squareMatrix, verbose=0):
+ """
+ Calculates the mod 2 inverse of a matrix.
+ """
+ A = squareMatrix.copy()
+ t = A.shape[0]
+
+ # Special case for one element array [1]
+ if A.size == 1 and A[0] == 1:
+ return array([1])
+
+ Ainverse = inv(A)
+ B = det(A) * Ainverse
+ C = B % 2
+
+ # Encountered lots of rounding errors with this function.
+ # Previously tried floor, C.astype(int), and casting with (int)
+ # and none of that works correctly, so doing it the tedious way.
+
+ test = dot(A, C) % 2
+ tempTest = zeros_like(test)
+ for colNum in np.arange(test.shape[1]):
+ for rowNum in np.arange(test.shape[0]):
+ value = test[rowNum, colNum]
+ if (abs(1 - value)) < 0.01:
+ # this is a 1
+ tempTest[rowNum, colNum] = 1
+ elif (abs(2 - value)) < 0.01:
+ # there shouldn't be any 2s after B % 2, but I'm
+ # seeing them!
+ tempTest[rowNum, colNum] = 0
+ elif (abs(0 - value)) < 0.01:
+ # this is a 0
+ tempTest[rowNum, colNum] = 0
+ else:
+ if verbose > 1:
+ print('In inv_mod2. Rounding error on this', )
+ print('value? Mod 2 has already been done.', )
+ print('value:', value)
+
+ test = tempTest.copy()
+
+ if (test - eye(t, t) % 2).any():
+ if verbose:
+ print('Error in inv_mod2: did not find inverse.')
+ # TODO is this the most appropriate error to raise?
+ raise linalg.linalg.LinAlgError
+ else:
+ return C
def swap_columns(a, b, arrayIn):
- """
- Swaps two columns in a matrix.
- """
- arrayOut = arrayIn.copy()
- arrayOut[:, a] = arrayIn[:, b]
- arrayOut[:, b] = arrayIn[:, a]
- return arrayOut
+ """
+ Swaps two columns in a matrix.
+ """
+ arrayOut = arrayIn.copy()
+ arrayOut[:, a] = arrayIn[:, b]
+ arrayOut[:, b] = arrayIn[:, a]
+ return arrayOut
def move_row_to_bottom(i, arrayIn):
- """"
- Moves a specified row (just one) to the bottom of the matrix,
- then rotates the rows at the bottom up.
-
- For example, if we had a matrix with 5 rows, and we wanted to
- push row 2 to the bottom, then the resulting row order would be:
- 1,3,4,5,2
- """
- arrayOut = arrayIn.copy()
- numRows = arrayOut.shape[0]
- # Push the specified row to the bottom.
- arrayOut[numRows - 1] = arrayIn[i, :]
- # Now rotate the bottom rows up.
- index = 2
- while (numRows - index) >= i:
- arrayOut[numRows - index, :] = arrayIn[numRows - index + 1]
- index = index + 1
- return arrayOut
+ """"
+ Moves a specified row (just one) to the bottom of the matrix,
+ then rotates the rows at the bottom up.
+
+ For example, if we had a matrix with 5 rows, and we wanted to
+ push row 2 to the bottom, then the resulting row order would be:
+ 1,3,4,5,2
+ """
+ arrayOut = arrayIn.copy()
+ numRows = arrayOut.shape[0]
+ # Push the specified row to the bottom.
+ arrayOut[numRows - 1] = arrayIn[i, :]
+ # Now rotate the bottom rows up.
+ index = 2
+ while (numRows - index) >= i:
+ arrayOut[numRows - index, :] = arrayIn[numRows - index + 1]
+ index = index + 1
+ return arrayOut
def get_full_rank_H_matrix(H, verbose=False):
- """
- This function accepts a parity check matrix H and, if it is not
- already full rank, will determine which rows are dependent and
- remove them. The updated matrix will be returned.
- """
- tempArray = H.copy()
- if linalg.matrix_rank(tempArray) == tempArray.shape[0]:
- if verbose:
- print('Returning H; it is already full rank.')
- return tempArray
+ """
+ This function accepts a parity check matrix H and, if it is not
+ already full rank, will determine which rows are dependent and
+ remove them. The updated matrix will be returned.
+ """
+ tempArray = H.copy()
+ if linalg.matrix_rank(tempArray) == tempArray.shape[0]:
+ if verbose:
+ print('Returning H; it is already full rank.')
+ return tempArray
+
+ numRows = tempArray.shape[0]
+ numColumns = tempArray.shape[1]
+ limit = numRows
+ rank = 0
+ i = 0
- numRows = tempArray.shape[0]
- numColumns = tempArray.shape[1]
- limit = numRows
- rank = 0
- i = 0
+ # Create an array to save the column permutations.
+ columnOrder = np.arange(numColumns).reshape(1, numColumns)
- # Create an array to save the column permutations.
- columnOrder = np.arange(numColumns).reshape(1, numColumns)
+ # Create an array to save the row permutations. We just need
+ # this to know which dependent rows to delete.
+ rowOrder = np.arange(numRows).reshape(numRows, 1)
- # Create an array to save the row permutations. We just need
- # this to know which dependent rows to delete.
- rowOrder = np.arange(numRows).reshape(numRows, 1)
+ while i < limit:
+ if verbose:
+ print('In get_full_rank_H_matrix; i:', i)
+ # Flag indicating that the row contains a non-zero entry
+ found = False
+ for j in np.arange(i, numColumns):
+ if tempArray[i, j] == 1:
+ # Encountered a non-zero entry at (i, j)
+ found = True
+ # Increment rank by 1
+ rank = rank + 1
+ # Make the entry at (i,i) be 1
+ tempArray = swap_columns(j, i, tempArray)
+ # Keep track of the column swapping
+ columnOrder = swap_columns(j, i, columnOrder)
+ break
+ if found == True:
+ for k in np.arange(0, numRows):
+ if k == i:
+ continue
+ # Checking for 1's
+ if tempArray[k, i] == 1:
+ # Add row i to row k
+ tempArray[k, :] = tempArray[k, :] + tempArray[i, :]
+ # Addition is mod2
+ tempArray = tempArray.copy() % 2
+ # All the entries above & below (i, i) are now 0
+ i = i + 1
+ if found == False:
+ # Push the row of 0s to the bottom, and move the bottom
+ # rows up (sort of a rotation thing).
+ tempArray = move_row_to_bottom(i, tempArray)
+ # Decrease limit since we just found a row of 0s
+ limit -= 1
+ # Keep track of row swapping
+ rowOrder = move_row_to_bottom(i, rowOrder)
+
+ # Don't need the dependent rows
+ finalRowOrder = rowOrder[0:i]
+
+ # Reorder H, per the permutations taken above .
+ # First, put rows in order, omitting the dependent rows.
+ newNumberOfRowsForH = finalRowOrder.shape[0]
+ newH = zeros((newNumberOfRowsForH, numColumns))
+ for index in np.arange(newNumberOfRowsForH):
+ newH[index, :] = H[finalRowOrder[index], :]
+
+ # Next, put the columns in order.
+ tempHarray = newH.copy()
+ for index in np.arange(numColumns):
+ newH[:, index] = tempHarray[:, columnOrder[0, index]]
- while i < limit:
if verbose:
- print('In get_full_rank_H_matrix; i:', i)
- # Flag indicating that the row contains a non-zero entry
- found = False
- for j in np.arange(i, numColumns):
- if tempArray[i, j] == 1:
- # Encountered a non-zero entry at (i, j)
- found = True
- # Increment rank by 1
- rank = rank + 1
- # Make the entry at (i,i) be 1
- tempArray = swap_columns(j, i, tempArray)
- # Keep track of the column swapping
- columnOrder = swap_columns(j, i, columnOrder)
- break
- if found == True:
- for k in np.arange(0, numRows):
- if k == i: continue
- # Checking for 1's
- if tempArray[k, i] == 1:
- # Add row i to row k
- tempArray[k, :] = tempArray[k, :] + tempArray[i, :]
- # Addition is mod2
- tempArray = tempArray.copy() % 2
- # All the entries above & below (i, i) are now 0
- i = i + 1
- if found == False:
- # Push the row of 0s to the bottom, and move the bottom
- # rows up (sort of a rotation thing).
- tempArray = move_row_to_bottom(i, tempArray)
- # Decrease limit since we just found a row of 0s
- limit -= 1
- # Keep track of row swapping
- rowOrder = move_row_to_bottom(i, rowOrder)
-
- # Don't need the dependent rows
- finalRowOrder = rowOrder[0:i]
-
- # Reorder H, per the permutations taken above .
- # First, put rows in order, omitting the dependent rows.
- newNumberOfRowsForH = finalRowOrder.shape[0]
- newH = zeros((newNumberOfRowsForH, numColumns))
- for index in np.arange(newNumberOfRowsForH):
- newH[index, :] = H[finalRowOrder[index], :]
-
- # Next, put the columns in order.
- tempHarray = newH.copy()
- for index in np.arange(numColumns):
- newH[:, index] = tempHarray[:, columnOrder[0, index]]
-
- if verbose:
- print('original H.shape:', H.shape)
- print('newH.shape:', newH.shape)
-
- return newH
+ print('original H.shape:', H.shape)
+ print('newH.shape:', newH.shape)
+
+ return newH
def get_best_matrix(H, numIterations=100, verbose=0):
- """
- This function will run the Greedy Upper Triangulation algorithm
- for numIterations times, looking for the lowest possible gap.
- The submatrices returned are those needed for real-time encoding.
- """
-
- hadFirstJoy = 0
- index = 1
- while index <= numIterations:
- if verbose:
- print('--- In get_best_matrix, iteration:', index)
- index += 1
- try:
- ret = greedy_upper_triangulation(H, verbose)
- except ValueError as e:
- if verbose > 1:
- print('greedy_upper_triangulation error: ', e)
+ """
+ This function will run the Greedy Upper Triangulation algorithm
+ for numIterations times, looking for the lowest possible gap.
+ The submatrices returned are those needed for real-time encoding.
+ """
+
+ hadFirstJoy = 0
+ index = 1
+ while index <= numIterations:
+ if verbose:
+ print('--- In get_best_matrix, iteration:', index)
+ index += 1
+ try:
+ ret = greedy_upper_triangulation(H, verbose)
+ except ValueError as e:
+ if verbose > 1:
+ print('greedy_upper_triangulation error: ', e)
+ else:
+ if ret:
+ [betterH, gap, t] = ret
+ else:
+ continue
+
+ if not hadFirstJoy:
+ hadFirstJoy = 1
+ bestGap = gap
+ bestH = betterH.copy()
+ bestT = t
+ elif gap < bestGap:
+ bestGap = gap
+ bestH = betterH.copy()
+ bestT = t
+
+ if hadFirstJoy:
+ return [bestH, bestGap]
else:
- if ret:
- [betterH, gap, t] = ret
- else:
- continue
-
- if not hadFirstJoy:
- hadFirstJoy = 1
- bestGap = gap
- bestH = betterH.copy()
- bestT = t
- elif gap < bestGap:
- bestGap = gap
- bestH = betterH.copy()
- bestT = t
-
- if hadFirstJoy:
- return [bestH, bestGap]
- else:
- if verbose:
- print('Error: Could not find appropriate H form', )
- print('for encoding.')
- return
+ if verbose:
+ print('Error: Could not find appropriate H form', )
+ print('for encoding.')
+ return
def getSystematicGmatrix(GenMatrix):
- """
- This function finds the systematic form of the generator
- matrix GenMatrix. This form is G = [I P] where I is an identity
- matrix and P is the parity submatrix. If the GenMatrix matrix
- provided is not full rank, then dependent rows will be deleted.
-
- This function does not convert parity check (H) matrices to the
- generator matrix format. Use the function getSystematicGmatrixFromH
- for that purpose.
- """
- tempArray = GenMatrix.copy()
- numRows = tempArray.shape[0]
- numColumns = tempArray.shape[1]
- limit = numRows
- rank = 0
- i = 0
- while i < limit:
- # Flag indicating that the row contains a non-zero entry
- found = False
- for j in np.arange(i, numColumns):
- if tempArray[i, j] == 1:
- # Encountered a non-zero entry at (i, j)
- found = True
- # Increment rank by 1
- rank = rank + 1
- # make the entry at (i,i) be 1
- tempArray = swap_columns(j, i, tempArray)
- break
- if found == True:
- for k in np.arange(0, numRows):
- if k == i: continue
- # Checking for 1's
- if tempArray[k, i] == 1:
- # add row i to row k
- tempArray[k, :] = tempArray[k, :] + tempArray[i, :]
- # Addition is mod2
- tempArray = tempArray.copy() % 2
- # All the entries above & below (i, i) are now 0
- i = i + 1
- if found == False:
- # push the row of 0s to the bottom, and move the bottom
- # rows up (sort of a rotation thing)
- tempArray = move_row_to_bottom(i, tempArray)
- # decrease limit since we just found a row of 0s
- limit -= 1
- # the rows below i are the dependent rows, which we discard
- G = tempArray[0:i, :]
- return G
+ """
+ This function finds the systematic form of the generator
+ matrix GenMatrix. This form is G = [I P] where I is an identity
+ matrix and P is the parity submatrix. If the GenMatrix matrix
+ provided is not full rank, then dependent rows will be deleted.
+
+ This function does not convert parity check (H) matrices to the
+ generator matrix format. Use the function getSystematicGmatrixFromH
+ for that purpose.
+ """
+ tempArray = GenMatrix.copy()
+ numRows = tempArray.shape[0]
+ numColumns = tempArray.shape[1]
+ limit = numRows
+ rank = 0
+ i = 0
+ while i < limit:
+ # Flag indicating that the row contains a non-zero entry
+ found = False
+ for j in np.arange(i, numColumns):
+ if tempArray[i, j] == 1:
+ # Encountered a non-zero entry at (i, j)
+ found = True
+ # Increment rank by 1
+ rank = rank + 1
+ # make the entry at (i,i) be 1
+ tempArray = swap_columns(j, i, tempArray)
+ break
+ if found == True:
+ for k in np.arange(0, numRows):
+ if k == i:
+ continue
+ # Checking for 1's
+ if tempArray[k, i] == 1:
+ # add row i to row k
+ tempArray[k, :] = tempArray[k, :] + tempArray[i, :]
+ # Addition is mod2
+ tempArray = tempArray.copy() % 2
+ # All the entries above & below (i, i) are now 0
+ i = i + 1
+ if found == False:
+ # push the row of 0s to the bottom, and move the bottom
+ # rows up (sort of a rotation thing)
+ tempArray = move_row_to_bottom(i, tempArray)
+ # decrease limit since we just found a row of 0s
+ limit -= 1
+ # the rows below i are the dependent rows, which we discard
+ G = tempArray[0:i, :]
+ return G
def getSystematicGmatrixFromH(H, verbose=False):
- """
- If given a parity check matrix H, this function returns a
- generator matrix G in the systematic form: G = [I P]
- where: I is an identity matrix, size k x k
- P is the parity submatrix, size k x (n-k)
- If the H matrix provided is not full rank, then dependent rows
- will be deleted first.
- """
- if verbose:
- print('received H with size: ', H.shape)
-
- # First, put the H matrix into the form H = [I|m] where:
- # I is (n-k) x (n-k) identity matrix
- # m is (n-k) x k
- # This part is just copying the algorithm from getSystematicGmatrix
- tempArray = getSystematicGmatrix(H)
-
- # Next, swap I and m columns so the matrix takes the forms [m|I].
- n = H.shape[1]
- k = n - H.shape[0]
- I_temp = tempArray[:, 0:(n - k)]
- m = tempArray[:, (n - k):n]
- newH = concatenate((m, I_temp), axis=1)
-
- # Now the submatrix m is the transpose of the parity submatrix,
- # i.e. H is in the form H = [P'|I]. So G is just [I|P]
- k = m.shape[1]
- G = concatenate((identity(k), m.T), axis=1)
- if verbose:
- print('returning G with size: ', G.shape)
- return G
+ """
+ If given a parity check matrix H, this function returns a
+ generator matrix G in the systematic form: G = [I P]
+ where: I is an identity matrix, size k x k
+ P is the parity submatrix, size k x (n-k)
+ If the H matrix provided is not full rank, then dependent rows
+ will be deleted first.
+ """
+ if verbose:
+ print('received H with size: ', H.shape)
+
+ # First, put the H matrix into the form H = [I|m] where:
+ # I is (n-k) x (n-k) identity matrix
+ # m is (n-k) x k
+ # This part is just copying the algorithm from getSystematicGmatrix
+ tempArray = getSystematicGmatrix(H)
+
+ # Next, swap I and m columns so the matrix takes the forms [m|I].
+ n = H.shape[1]
+ k = n - H.shape[0]
+ I_temp = tempArray[:, 0:(n - k)]
+ m = tempArray[:, (n - k):n]
+ newH = concatenate((m, I_temp), axis=1)
+
+ # Now the submatrix m is the transpose of the parity submatrix,
+ # i.e. H is in the form H = [P'|I]. So G is just [I|P]
+ k = m.shape[1]
+ G = concatenate((identity(k), m.T), axis=1)
+ if verbose:
+ print('returning G with size: ', G.shape)
+ return G
diff --git a/gr-fec/python/fec/__init__.py b/gr-fec/python/fec/__init__.py
index 83b906ad06..d3788e47f5 100644
--- a/gr-fec/python/fec/__init__.py
+++ b/gr-fec/python/fec/__init__.py
@@ -20,6 +20,19 @@ except ImportError:
__path__.append(os.path.join(dirname, "bindings"))
from .fec_python import *
+from .bercurve_generator import bercurve_generator
+from .fec_test import fec_test
+from .extended_tagged_decoder import extended_tagged_decoder
+from .extended_tagged_encoder import extended_tagged_encoder
+from .extended_async_encoder import extended_async_encoder
+from .capillary_threaded_encoder import capillary_threaded_encoder
+from .capillary_threaded_decoder import capillary_threaded_decoder
+from .threaded_decoder import threaded_decoder
+from .threaded_encoder import threaded_encoder
+from .extended_decoder import extended_decoder
+from .extended_encoder import extended_encoder
+from .bitflip import *
+
# Pybind cannot bind constructors to make functions that return a different type
# Remap make functions to __init__ here
@@ -74,23 +87,8 @@ try:
ldpc_gen_mtrx_encoder_make = code.ldpc_gen_mtrx_encoder.make
ldpc_bit_flip_decoder = code.ldpc_bit_flip_decoder
except AttributeError:
- pass
+ pass
polar_decoder_sc = code.polar_decoder_sc
polar_decoder_sc_list = code.polar_decoder_sc_list
polar_decoder_sc_systematic = code.polar_decoder_sc_systematic
-
-from .bitflip import *
-from .extended_encoder import extended_encoder
-from .extended_decoder import extended_decoder
-from .threaded_encoder import threaded_encoder
-from .threaded_decoder import threaded_decoder
-from .capillary_threaded_decoder import capillary_threaded_decoder
-from .capillary_threaded_encoder import capillary_threaded_encoder
-from .extended_async_encoder import extended_async_encoder
-from .extended_tagged_encoder import extended_tagged_encoder
-from .extended_tagged_decoder import extended_tagged_decoder
-
-
-from .fec_test import fec_test
-from .bercurve_generator import bercurve_generator
diff --git a/gr-fec/python/fec/_qa_helper.py b/gr-fec/python/fec/_qa_helper.py
index 00a6015684..1eef05d976 100644
--- a/gr-fec/python/fec/_qa_helper.py
+++ b/gr-fec/python/fec/_qa_helper.py
@@ -9,7 +9,6 @@
#
-
import numpy
from gnuradio import gr, blocks
@@ -23,9 +22,9 @@ class map_bb(gr.sync_block):
def __init__(self, bitmap):
gr.sync_block.__init__(
self,
- name = "map_bb",
- in_sig = [numpy.int8],
- out_sig = [numpy.int8])
+ name="map_bb",
+ in_sig=[numpy.int8],
+ out_sig=[numpy.int8])
self.bitmap = bitmap
def work(self, input_items, output_items):
@@ -45,11 +44,13 @@ class _qa_helper(gr.top_block):
self.data_size = data_size
self.threading = threading
- self.ext_encoder = extended_encoder(enc, threading=self.threading, puncpat=self.puncpat)
+ self.ext_encoder = extended_encoder(
+ enc, threading=self.threading, puncpat=self.puncpat)
self.ext_decoder = extended_decoder(dec, threading=self.threading, ann=None,
puncpat=self.puncpat, integration_period=10000)
- self.src = blocks.vector_source_b(data_size*[0, 1, 2, 3, 5, 7, 9, 13, 15, 25, 31, 45, 63, 95, 127], False)
+ self.src = blocks.vector_source_b(
+ data_size * [0, 1, 2, 3, 5, 7, 9, 13, 15, 25, 31, 45, 63, 95, 127], False)
self.unpack = blocks.unpack_k_bits_bb(8)
self.map = map_bb([-1, 1])
self.to_float = blocks.char_to_float(1)
@@ -62,18 +63,19 @@ class _qa_helper(gr.top_block):
self.connect(self.unpack, self.snk_input)
self.connect(self.ext_decoder, self.snk_output)
+
if __name__ == '__main__':
frame_size = 30
- enc = fec.dummy_encoder_make(frame_size*8)
+ enc = fec.dummy_encoder_make(frame_size * 8)
#enc = fec.repetition_encoder_make(frame_size*8, 3)
- dec = fec.dummy_decoder.make(frame_size*8)
+ dec = fec.dummy_decoder.make(frame_size * 8)
- tb = _qa_helper(10*frame_size, enc, dec, None)
+ tb = _qa_helper(10 * frame_size, enc, dec, None)
tb.run()
errs = 0
- for i,o in zip(tb.snk_input.data(), tb.snk_output.data()):
- if i-o != 0:
+ for i, o in zip(tb.snk_input.data(), tb.snk_output.data()):
+ if i - o != 0:
errs += 1
if errs == 0:
diff --git a/gr-fec/python/fec/bercurve_generator.py b/gr-fec/python/fec/bercurve_generator.py
index 3536c45d52..3d556cf7a1 100644
--- a/gr-fec/python/fec/bercurve_generator.py
+++ b/gr-fec/python/fec/bercurve_generator.py
@@ -13,6 +13,7 @@ import numpy
from .fec_test import fec_test
+
class bercurve_generator(gr.hier_block2):
def __init__(self, encoder_list, decoder_list, esno=numpy.arange(0.0, 3.0, .25),
@@ -20,7 +21,7 @@ class bercurve_generator(gr.hier_block2):
gr.hier_block2.__init__(
self, "ber_curve_generator",
gr.io_signature(0, 0, 0),
- gr.io_signature(len(esno) * 2, len(esno) * 2, gr.sizeof_char*1))
+ gr.io_signature(len(esno) * 2, len(esno) * 2, gr.sizeof_char * 1))
self.esno = esno
self.samp_rate = samp_rate
@@ -28,8 +29,9 @@ class bercurve_generator(gr.hier_block2):
self.decoder_list = decoder_list
self.puncpat = puncpat
- self.random_gen_b_0 = blocks.vector_source_b(list(map(int, numpy.random.randint(0, 256, 100000))), True)
- self.deinterleave = blocks.deinterleave(gr.sizeof_char*1)
+ self.random_gen_b_0 = blocks.vector_source_b(
+ list(map(int, numpy.random.randint(0, 256, 100000))), True)
+ self.deinterleave = blocks.deinterleave(gr.sizeof_char * 1)
self.connect(self.random_gen_b_0, self.deinterleave)
self.ber_generators = []
@@ -47,12 +49,12 @@ class bercurve_generator(gr.hier_block2):
threading=threading,
puncpat=puncpat,
seed=seed)
- self.ber_generators.append(ber_generator_temp);
+ self.ber_generators.append(ber_generator_temp)
for i in range(0, len(esno)):
self.connect((self.deinterleave, i), (self.ber_generators[i]))
- self.connect((self.ber_generators[i], 0), (self, i*2));
- self.connect((self.ber_generators[i], 1), (self, i*2 + 1));
+ self.connect((self.ber_generators[i], 0), (self, i * 2))
+ self.connect((self.ber_generators[i], 1), (self, i * 2 + 1))
def get_esno(self):
return self.esno
diff --git a/gr-fec/python/fec/bitflip.py b/gr-fec/python/fec/bitflip.py
index edd841ef14..a482152e54 100644
--- a/gr-fec/python/fec/bitflip.py
+++ b/gr-fec/python/fec/bitflip.py
@@ -9,63 +9,64 @@
#
-
-
def bitreverse(mint):
- res = 0;
+ res = 0
while mint != 0:
- res = res << 1;
- res += mint & 1;
- mint = mint >> 1;
- return res;
+ res = res << 1
+ res += mint & 1
+ mint = mint >> 1
+ return res
+
+
+const_lut = [2]
+specinvert_lut = [[0, 2, 1, 3]]
-const_lut = [2];
-specinvert_lut = [[0, 2, 1, 3]];
def bitflip(mint, bitflip_lut, index, csize):
- res = 0;
- cnt = 0;
- mask = (1 << const_lut[index]) - 1;
+ res = 0
+ cnt = 0
+ mask = (1 << const_lut[index]) - 1
while (cnt < csize):
- res += (bitflip_lut[(mint >> cnt) & (mask)]) << cnt;
- cnt += const_lut[index];
- return res;
+ res += (bitflip_lut[(mint >> cnt) & (mask)]) << cnt
+ cnt += const_lut[index]
+ return res
def read_bitlist(bitlist):
- res = 0;
+ res = 0
for i in range(len(bitlist)):
if int(bitlist[i]) == 1:
- res += 1 << (len(bitlist) - i - 1);
- return res;
+ res += 1 << (len(bitlist) - i - 1)
+ return res
def read_big_bitlist(bitlist):
ret = []
for j in range(0, len(bitlist) / 64):
- res = 0;
+ res = 0
for i in range(0, 64):
- if int(bitlist[j*64+i]) == 1:
- res += 1 << (64 - i - 1);
- ret.append(res);
- res = 0;
- j = 0;
- for i in range(len(bitlist)%64):
- if int(bitlist[len(ret)*64+i]) == 1:
- res += 1 << (64 - j - 1);
- j += 1;
- ret.append(res);
- return ret;
+ if int(bitlist[j * 64 + i]) == 1:
+ res += 1 << (64 - i - 1)
+ ret.append(res)
+ res = 0
+ j = 0
+ for i in range(len(bitlist) % 64):
+ if int(bitlist[len(ret) * 64 + i]) == 1:
+ res += 1 << (64 - j - 1)
+ j += 1
+ ret.append(res)
+ return ret
+
def generate_symmetries(symlist):
retlist = []
if len(symlist) == 1:
for i in range(len(symlist[0])):
- retlist.append(symlist[0][i:] + symlist[0][0:i]);
- invlist = symlist[0];
+ retlist.append(symlist[0][i:] + symlist[0][0:i])
+ invlist = symlist[0]
for i in range(1, len(symlist[0]) / 2):
- invlist[i] = symlist[0][i + len(symlist[0]) / 2];
- invlist[i + len(symlist[0]) / 2] = symlist[0][i];
+ invlist[i] = symlist[0][i + len(symlist[0]) / 2]
+ invlist[i + len(symlist[0]) / 2] = symlist[0][i]
for i in range(len(symlist[0])):
- retlist.append(symlist[0][i:] + symlist[0][0:i]);
- return retlist;
+ retlist.append(symlist[0][i:] + symlist[0][0:i])
+ return retlist
diff --git a/gr-fec/python/fec/capillary_threaded_decoder.py b/gr-fec/python/fec/capillary_threaded_decoder.py
index 71d968eb99..7a800098cd 100644
--- a/gr-fec/python/fec/capillary_threaded_decoder.py
+++ b/gr-fec/python/fec/capillary_threaded_decoder.py
@@ -19,14 +19,15 @@ class capillary_threaded_decoder(gr.hier_block2):
def __init__(self, decoder_list_0, input_size, output_size):
gr.hier_block2.__init__(
self, "Capillary Threaded Decoder",
- gr.io_signature(1, 1, input_size*1),
- gr.io_signature(1, 1, output_size*1))
+ gr.io_signature(1, 1, input_size * 1),
+ gr.io_signature(1, 1, output_size * 1))
self.decoder_list_0 = decoder_list_0
check = math.log10(len(self.decoder_list_0)) / math.log10(2.0)
if(abs(check - int(check)) > 0):
- gr.log.info("fec.capillary_threaded_decoder: number of decoders must be a power of 2.")
+ gr.log.info(
+ "fec.capillary_threaded_decoder: number of decoders must be a power of 2.")
raise AttributeError
self.deinterleaves_0 = []
@@ -37,7 +38,8 @@ class capillary_threaded_decoder(gr.hier_block2):
self.generic_decoders_0 = []
for i in range(len(decoder_list_0)):
- self.generic_decoders_0.append(fec.decoder(decoder_list_0[i], input_size, output_size))
+ self.generic_decoders_0.append(fec.decoder(
+ decoder_list_0[i], input_size, output_size))
self.interleaves_0 = []
for i in range(int(math.log(len(decoder_list_0), 2))):
@@ -49,15 +51,19 @@ class capillary_threaded_decoder(gr.hier_block2):
branchcount = 1
for i in range(int(math.log(len(decoder_list_0), 2)) - 1):
for j in range(int(math.pow(2, i))):
- self.connect((self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0))
- self.connect((self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0))
+ self.connect(
+ (self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0))
+ self.connect(
+ (self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0))
rootcount += 1
branchcount += 2
codercount = 0
for i in range(len(decoder_list_0) // 2):
- self.connect((self.deinterleaves_0[rootcount], 0), (self.generic_decoders_0[codercount], 0))
- self.connect((self.deinterleaves_0[rootcount], 1), (self.generic_decoders_0[codercount + 1], 0))
+ self.connect(
+ (self.deinterleaves_0[rootcount], 0), (self.generic_decoders_0[codercount], 0))
+ self.connect(
+ (self.deinterleaves_0[rootcount], 1), (self.generic_decoders_0[codercount + 1], 0))
rootcount += 1
codercount += 2
@@ -65,15 +71,19 @@ class capillary_threaded_decoder(gr.hier_block2):
branchcount = 1
for i in range(int(math.log(len(decoder_list_0), 2)) - 1):
for j in range(int(math.pow(2, i))):
- self.connect((self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0))
- self.connect((self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1))
+ self.connect(
+ (self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect(
+ (self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1))
rootcount += 1
branchcount += 2
codercount = 0
for i in range(len(decoder_list_0) // 2):
- self.connect((self.generic_decoders_0[codercount], 0), (self.interleaves_0[rootcount], 0))
- self.connect((self.generic_decoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1))
+ self.connect(
+ (self.generic_decoders_0[codercount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect(
+ (self.generic_decoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1))
rootcount += 1
codercount += 2
diff --git a/gr-fec/python/fec/capillary_threaded_encoder.py b/gr-fec/python/fec/capillary_threaded_encoder.py
index c8ed970bbc..abd7a61b49 100644
--- a/gr-fec/python/fec/capillary_threaded_encoder.py
+++ b/gr-fec/python/fec/capillary_threaded_encoder.py
@@ -25,59 +25,66 @@ class capillary_threaded_encoder(gr.hier_block2):
check = math.log10(len(self.encoder_list_0)) / math.log10(2.0)
if(abs(check - int(check)) > 0.0):
- gr.log.info("fec.capillary_threaded_encoder: number of encoders must be a power of 2.")
+ gr.log.info(
+ "fec.capillary_threaded_encoder: number of encoders must be a power of 2.")
raise AttributeError
- self.deinterleaves_0 = [];
+ self.deinterleaves_0 = []
for i in range(int(math.log(len(encoder_list_0), 2))):
for j in range(int(math.pow(2, i))):
self.deinterleaves_0.append(blocks.deinterleave(input_size,
fec.get_encoder_input_size(encoder_list_0[0])))
- self.generic_encoders_0 = [];
+ self.generic_encoders_0 = []
for i in range(len(encoder_list_0)):
self.generic_encoders_0.append(fec.encoder(encoder_list_0[i],
input_size, output_size))
- self.interleaves_0 = [];
+ self.interleaves_0 = []
for i in range(int(math.log(len(encoder_list_0), 2))):
for j in range(int(math.pow(2, i))):
self.interleaves_0.append(blocks.interleave(output_size,
fec.get_encoder_output_size(encoder_list_0[0])))
- rootcount = 0;
- branchcount = 1;
+ rootcount = 0
+ branchcount = 1
for i in range(int(math.log(len(encoder_list_0), 2)) - 1):
for j in range(int(math.pow(2, i))):
- self.connect((self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0))
- self.connect((self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0))
- rootcount += 1;
- branchcount += 2;
-
- codercount = 0;
+ self.connect(
+ (self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0))
+ self.connect(
+ (self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0))
+ rootcount += 1
+ branchcount += 2
+
+ codercount = 0
for i in range(len(encoder_list_0) // 2):
- self.connect((self.deinterleaves_0[rootcount], 0), (self.generic_encoders_0[codercount], 0))
- self.connect((self.deinterleaves_0[rootcount], 1), (self.generic_encoders_0[codercount + 1], 0))
- rootcount += 1;
- codercount += 2;
-
-
- rootcount = 0;
- branchcount = 1;
+ self.connect(
+ (self.deinterleaves_0[rootcount], 0), (self.generic_encoders_0[codercount], 0))
+ self.connect(
+ (self.deinterleaves_0[rootcount], 1), (self.generic_encoders_0[codercount + 1], 0))
+ rootcount += 1
+ codercount += 2
+
+ rootcount = 0
+ branchcount = 1
for i in range(int(math.log(len(encoder_list_0), 2)) - 1):
for j in range(int(math.pow(2, i))):
- self.connect((self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0))
- self.connect((self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1))
- rootcount += 1;
- branchcount += 2;
-
-
- codercount = 0;
+ self.connect(
+ (self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect(
+ (self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1))
+ rootcount += 1
+ branchcount += 2
+
+ codercount = 0
for i in range(len(encoder_list_0) // 2):
- self.connect((self.generic_encoders_0[codercount], 0), (self.interleaves_0[rootcount], 0))
- self.connect((self.generic_encoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1))
- rootcount += 1;
- codercount += 2;
+ self.connect(
+ (self.generic_encoders_0[codercount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect(
+ (self.generic_encoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1))
+ rootcount += 1
+ codercount += 2
if((len(self.encoder_list_0)) > 1):
self.connect((self, 0), (self.deinterleaves_0[0], 0))
diff --git a/gr-fec/python/fec/extended_async_encoder.py b/gr-fec/python/fec/extended_async_encoder.py
index 0a130ff4bf..434b86615e 100644
--- a/gr-fec/python/fec/extended_async_encoder.py
+++ b/gr-fec/python/fec/extended_async_encoder.py
@@ -27,14 +27,15 @@ class extended_async_encoder(gr.hier_block2):
self.message_port_register_hier_in('in')
self.message_port_register_hier_out('out')
- self.puncpat=puncpat
+ self.puncpat = puncpat
# If it's a list of encoders, take the first one, unless it's
# a list of lists of encoders.
if(type(encoder_obj_list) == list):
# This block doesn't handle parallelism of > 1
if(type(encoder_obj_list[0]) == list):
- gr.log.info("fec.extended_encoder: Parallelism must be 0 or 1.")
+ gr.log.info(
+ "fec.extended_encoder: Parallelism must be 0 or 1.")
raise AttributeError
encoder_obj = encoder_obj_list[0]
@@ -46,14 +47,14 @@ class extended_async_encoder(gr.hier_block2):
self.encoder = fec.async_encoder(encoder_obj)
#self.puncture = None
- #if self.puncpat != '11':
+ # if self.puncpat != '11':
# self.puncture = fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0)
self.msg_connect(weakref.proxy(self), "in", self.encoder, "in")
- #if(self.puncture):
+ # if(self.puncture):
# self.msg_connect(self.encoder, "out", self.puncture, "in")
# self.msg_connect(self.puncture, "out", weakref.proxy(self), "out")
- #else:
+ # else:
# self.msg_connect(self.encoder, "out", weakref.proxy(self), "out")
self.msg_connect(self.encoder, "out", weakref.proxy(self), "out")
diff --git a/gr-fec/python/fec/extended_decoder.py b/gr-fec/python/fec/extended_decoder.py
index 9409824981..648d03503c 100644
--- a/gr-fec/python/fec/extended_decoder.py
+++ b/gr-fec/python/fec/extended_decoder.py
@@ -101,19 +101,22 @@ class extended_decoder(gr.hier_block2):
# anything going through the annihilator needs shifted, uchar vals
if (
- fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar"
- or fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits"
+ fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or
+ fec.get_decoder_input_conversion(
+ decoder_obj_list[0]) == "packed_bits"
):
self.blocks.append(blocks.multiply_const_ff(48.0))
if fec.get_shift(decoder_obj_list[0]) != 0.0:
- self.blocks.append(blocks.add_const_ff(fec.get_shift(decoder_obj_list[0])))
+ self.blocks.append(blocks.add_const_ff(
+ fec.get_shift(decoder_obj_list[0])))
elif fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
self.blocks.append(blocks.add_const_ff(128.0))
if (
- fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar"
- or fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits"
+ fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or
+ fec.get_decoder_input_conversion(
+ decoder_obj_list[0]) == "packed_bits"
):
self.blocks.append(blocks.float_to_uchar())
@@ -134,9 +137,9 @@ class extended_decoder(gr.hier_block2):
if 1.0 / self.ann.count("1") >= i:
synd_garble = self.garbletable[i]
print(
- "using syndrom garble threshold "
- + str(synd_garble)
- + "for conv_bit_corr_bb"
+ "using syndrom garble threshold " +
+ str(synd_garble) +
+ "for conv_bit_corr_bb"
)
print("ceiling: .0335 data garble rate")
self.blocks.append(
@@ -196,7 +199,8 @@ class extended_decoder(gr.hier_block2):
)
if fec.get_decoder_output_conversion(decoder_obj_list[0]) == "unpack":
- self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
+ self.blocks.append(
+ blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
self.connect((self, 0), (self.blocks[0], 0))
self.connect((self.blocks[-1], 0), (self, 0))
diff --git a/gr-fec/python/fec/extended_encoder.py b/gr-fec/python/fec/extended_encoder.py
index fbcbeb1be1..1324f052c7 100644
--- a/gr-fec/python/fec/extended_encoder.py
+++ b/gr-fec/python/fec/extended_encoder.py
@@ -23,8 +23,8 @@ class extended_encoder(gr.hier_block2):
gr.io_signature(1, 1, gr.sizeof_char),
gr.io_signature(1, 1, gr.sizeof_char))
- self.blocks=[]
- self.puncpat=puncpat
+ self.blocks = []
+ self.puncpat = puncpat
if (type(encoder_obj_list) == list):
if (type(encoder_obj_list[0]) == list):
@@ -32,7 +32,7 @@ class extended_encoder(gr.hier_block2):
raise AttributeError
else:
# If it has parallelism of 0, force it into a list of 1
- encoder_obj_list = [encoder_obj_list,]
+ encoder_obj_list = [encoder_obj_list, ]
if fec.get_encoder_input_conversion(encoder_obj_list[0]) == "pack":
self.blocks.append(blocks.pack_k_bits_bb(8))
@@ -51,17 +51,19 @@ class extended_encoder(gr.hier_block2):
gr.sizeof_char))
if fec.get_encoder_output_conversion(encoder_obj_list[0]) == "packed_bits":
- self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
+ self.blocks.append(
+ blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
if self.puncpat != '11':
- self.blocks.append(fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+ self.blocks.append(fec.puncture_bb(
+ len(puncpat), read_bitlist(puncpat), 0))
# Connect the input to the encoder and the output to the
# puncture if used or the encoder if not.
- self.connect((self, 0), (self.blocks[0], 0));
- self.connect((self.blocks[-1], 0), (self, 0));
+ self.connect((self, 0), (self.blocks[0], 0))
+ self.connect((self.blocks[-1], 0), (self, 0))
# If using the puncture block, add it into the flowgraph after
# the encoder.
for i in range(len(self.blocks) - 1):
- self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
+ self.connect((self.blocks[i], 0), (self.blocks[i + 1], 0))
diff --git a/gr-fec/python/fec/extended_tagged_decoder.py b/gr-fec/python/fec/extended_tagged_decoder.py
index 0489d738ae..cda601db43 100644
--- a/gr-fec/python/fec/extended_tagged_decoder.py
+++ b/gr-fec/python/fec/extended_tagged_decoder.py
@@ -91,7 +91,8 @@ class extended_tagged_decoder(gr.hier_block2):
# We could just grab encoder [0][0], but we don't want to encourage
# this.
if isinstance(decoder_obj_list[0], list):
- gr.log.info("fec.extended_tagged_decoder: Parallelism must be 1.")
+ gr.log.info(
+ "fec.extended_tagged_decoder: Parallelism must be 1.")
raise AttributeError
decoder_obj = decoder_obj_list[0]
@@ -110,8 +111,8 @@ class extended_tagged_decoder(gr.hier_block2):
# anything going through the annihilator needs shifted, uchar vals
if (
- fec.get_decoder_input_conversion(decoder_obj) == "uchar"
- or fec.get_decoder_input_conversion(decoder_obj) == "packed_bits"
+ fec.get_decoder_input_conversion(decoder_obj) == "uchar" or
+ fec.get_decoder_input_conversion(decoder_obj) == "packed_bits"
):
self.blocks.append(blocks.multiply_const_ff(48.0))
@@ -121,8 +122,8 @@ class extended_tagged_decoder(gr.hier_block2):
self.blocks.append(blocks.add_const_ff(128.0))
if (
- fec.get_decoder_input_conversion(decoder_obj) == "uchar"
- or fec.get_decoder_input_conversion(decoder_obj) == "packed_bits"
+ fec.get_decoder_input_conversion(decoder_obj) == "uchar" or
+ fec.get_decoder_input_conversion(decoder_obj) == "packed_bits"
):
self.blocks.append(blocks.float_to_uchar())
@@ -143,9 +144,9 @@ class extended_tagged_decoder(gr.hier_block2):
if 1.0 / self.ann.count("1") >= i:
synd_garble = self.garbletable[i]
print(
- "using syndrom garble threshold "
- + str(synd_garble)
- + "for conv_bit_corr_bb"
+ "using syndrom garble threshold " +
+ str(synd_garble) +
+ "for conv_bit_corr_bb"
)
print("ceiling: .0335 data garble rate")
self.blocks.append(
@@ -191,7 +192,8 @@ class extended_tagged_decoder(gr.hier_block2):
)
if fec.get_decoder_output_conversion(decoder_obj) == "unpack":
- self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
+ self.blocks.append(
+ blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
self.connect((self, 0), (self.blocks[0], 0))
self.connect((self.blocks[-1], 0), (self, 0))
diff --git a/gr-fec/python/fec/extended_tagged_encoder.py b/gr-fec/python/fec/extended_tagged_encoder.py
index 8519436f83..c1ea7fb680 100644
--- a/gr-fec/python/fec/extended_tagged_encoder.py
+++ b/gr-fec/python/fec/extended_tagged_encoder.py
@@ -9,7 +9,6 @@
#
-
from gnuradio import gr, blocks
from . import fec_python as fec
@@ -23,8 +22,8 @@ class extended_tagged_encoder(gr.hier_block2):
gr.io_signature(1, 1, gr.sizeof_char),
gr.io_signature(1, 1, gr.sizeof_char))
- self.blocks=[]
- self.puncpat=puncpat
+ self.blocks = []
+ self.puncpat = puncpat
# If it's a list of encoders, take the first one, unless it's
# a list of lists of encoders.
@@ -32,7 +31,8 @@ class extended_tagged_encoder(gr.hier_block2):
# This block doesn't handle parallelism of > 1
# We could just grab encoder [0][0], but we don't want to encourage this.
if(type(encoder_obj_list[0]) == list):
- gr.log.info("fec.extended_tagged_encoder: Parallelism must be 0 or 1.")
+ gr.log.info(
+ "fec.extended_tagged_encoder: Parallelism must be 0 or 1.")
raise AttributeError
encoder_obj = encoder_obj_list[0]
@@ -61,14 +61,15 @@ class extended_tagged_encoder(gr.hier_block2):
lentagname, mtu))
if self.puncpat != '11':
- self.blocks.append(fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+ self.blocks.append(fec.puncture_bb(
+ len(puncpat), read_bitlist(puncpat), 0))
# Connect the input to the encoder and the output to the
# puncture if used or the encoder if not.
- self.connect((self, 0), (self.blocks[0], 0));
- self.connect((self.blocks[-1], 0), (self, 0));
+ self.connect((self, 0), (self.blocks[0], 0))
+ self.connect((self.blocks[-1], 0), (self, 0))
# If using the puncture block, add it into the flowgraph after
# the encoder.
for i in range(len(self.blocks) - 1):
- self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
+ self.connect((self.blocks[i], 0), (self.blocks[i + 1], 0))
diff --git a/gr-fec/python/fec/fec_test.py b/gr-fec/python/fec/fec_test.py
index 264abf4e3c..db38a3b93d 100644
--- a/gr-fec/python/fec/fec_test.py
+++ b/gr-fec/python/fec/fec_test.py
@@ -21,14 +21,15 @@ else:
from .extended_encoder import extended_encoder
from .extended_decoder import extended_decoder
+
class fec_test(gr.hier_block2):
def __init__(self, generic_encoder=0, generic_decoder=0, esno=0,
samp_rate=3200000, threading="capillary", puncpat='11',
seed=0):
gr.hier_block2.__init__(self, "fec_test",
- gr.io_signature(1, 1, gr.sizeof_char*1),
- gr.io_signature(2, 2, gr.sizeof_char*1))
+ gr.io_signature(1, 1, gr.sizeof_char * 1),
+ gr.io_signature(2, 2, gr.sizeof_char * 1))
self.generic_encoder = generic_encoder
self.generic_decoder = generic_decoder
@@ -52,7 +53,7 @@ class fec_test(gr.hier_block2):
ann=None, puncpat=puncpat,
integration_period=10000, rotator=None)
- noise = math.sqrt((10.0**(-esno / 10.0))/2.0)
+ noise = math.sqrt((10.0**(-esno / 10.0)) / 2.0)
#self.fastnoise = analog.fastnoise_source_f(analog.GR_GAUSSIAN, noise, seed, 8192)
self.fastnoise = analog.noise_source_f(analog.GR_GAUSSIAN, noise, seed)
self.addnoise = blocks.add_ff(1)
@@ -68,12 +69,11 @@ class fec_test(gr.hier_block2):
self.connect(self.encoder, self.map_bb)
self.connect(self.map_bb, self.b2f)
self.connect(self.b2f, (self.addnoise, 0))
- self.connect(self.fastnoise, (self.addnoise,1))
+ self.connect(self.fastnoise, (self.addnoise, 1))
self.connect(self.addnoise, self.decoder)
self.connect(self.decoder, self.pack8)
self.connect(self.pack8, (self, 0))
-
def get_generic_encoder(self):
return self.generic_encoder
diff --git a/gr-fec/python/fec/polar/__init__.py b/gr-fec/python/fec/polar/__init__.py
index ce4e506334..26698752c8 100644
--- a/gr-fec/python/fec/polar/__init__.py
+++ b/gr-fec/python/fec/polar/__init__.py
@@ -17,19 +17,25 @@ from .helper_functions import is_power_of_two
CHANNEL_TYPE_AWGN = 'AWGN'
CHANNEL_TYPE_BEC = 'BEC'
+
def get_z_params(is_prototype, channel, block_size, design_snr, mu):
- print('POLAR code channel construction called with parameters channel={0}, blocksize={1}, design SNR={2}, mu={3}'.format(channel, block_size, design_snr, mu))
+ print('POLAR code channel construction called with parameters channel={0}, blocksize={1}, design SNR={2}, mu={3}'.format(
+ channel, block_size, design_snr, mu))
if not (channel == 'AWGN' or channel == 'BEC'):
- raise ValueError("channel is {0}, but only BEC and AWGN are supported!".format(channel))
+ raise ValueError(
+ "channel is {0}, but only BEC and AWGN are supported!".format(channel))
if not is_power_of_two(block_size):
- raise ValueError("block size={0} is not a power of 2!".format(block_size))
+ raise ValueError(
+ "block size={0} is not a power of 2!".format(block_size))
if design_snr < -1.5917:
- raise ValueError("design SNR={0} < -1.5917. MUST be greater!".format(design_snr))
+ raise ValueError(
+ "design SNR={0} < -1.5917. MUST be greater!".format(design_snr))
if not mu > 0:
raise ValueError("mu={0} < 1. MUST be > 1!".format(mu))
if not is_prototype and channel == 'AWGN':
z_params = cc.load_z_parameters(block_size, design_snr, mu)
- print('Read Z-parameter file: {0}'.format(cc.default_dir() + cc.generate_filename(block_size, design_snr, mu)))
+ print('Read Z-parameter file: {0}'.format(cc.default_dir() +
+ cc.generate_filename(block_size, design_snr, mu)))
return z_params
return bhattacharyya_bounds(design_snr, block_size)
@@ -48,5 +54,5 @@ def load_frozen_bits_info(is_prototype, channel, block_size, num_info_bits, desi
'design_snr': design_snr,
'channel': channel,
'mu': mu,
- }
+ }
return data_set
diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py
index 2ec78679a5..643bfb7609 100644
--- a/gr-fec/python/fec/polar/channel_construction.py
+++ b/gr-fec/python/fec/polar/channel_construction.py
@@ -69,7 +69,8 @@ def frozen_bit_positions(block_size, info_size, design_snr=0.0):
def generate_filename(block_size, design_snr, mu):
filename = "polar_code_z_parameters_N" + str(int(block_size))
- filename += "_SNR" + str(float(design_snr)) + "_MU" + str(int(mu)) + ".polar"
+ filename += "_SNR" + str(float(design_snr)) + \
+ "_MU" + str(int(mu)) + ".polar"
return filename
diff --git a/gr-fec/python/fec/polar/channel_construction_awgn.py b/gr-fec/python/fec/polar/channel_construction_awgn.py
index 667c17b8fc..5a90262007 100644
--- a/gr-fec/python/fec/polar/channel_construction_awgn.py
+++ b/gr-fec/python/fec/polar/channel_construction_awgn.py
@@ -95,9 +95,9 @@ def discretize_awgn(mu, design_snr):
def instant_capacity_delta_callable():
return (
- lambda a, b: -1.0 * (a + b) * np.log2((a + b) / 2)
- + a * np.log2(a)
- + b * np.log2(b)
+ lambda a, b: -1.0 * (a + b) * np.log2((a + b) / 2) +
+ a * np.log2(a) +
+ b * np.log2(b)
)
@@ -114,16 +114,19 @@ def quantize_to_size(tpm, mu):
print("WARNING: This channel gets too small!")
# lambda works on vectors just fine. Use Numpy vector awesomeness.
- delta_i_vec = calculate_delta_I(tpm[0, 0:-1], tpm[1, 0:-1], tpm[0, 1:], tpm[1, 1:])
+ delta_i_vec = calculate_delta_I(
+ tpm[0, 0:-1], tpm[1, 0:-1], tpm[0, 1:], tpm[1, 1:])
for i in range(L - mu):
d = np.argmin(delta_i_vec)
ap = tpm[0, d] + tpm[0, d + 1]
bp = tpm[1, d] + tpm[1, d + 1]
if d > 0:
- delta_i_vec[d - 1] = calculate_delta_I(tpm[0, d - 1], tpm[1, d - 1], ap, bp)
+ delta_i_vec[d -
+ 1] = calculate_delta_I(tpm[0, d - 1], tpm[1, d - 1], ap, bp)
if d < delta_i_vec.size - 1:
- delta_i_vec[d + 1] = calculate_delta_I(ap, bp, tpm[0, d + 1], tpm[1, d + 1])
+ delta_i_vec[d +
+ 1] = calculate_delta_I(ap, bp, tpm[0, d + 1], tpm[1, d + 1])
delta_i_vec = np.delete(delta_i_vec, d)
tpm = np.delete(tpm, d, axis=1)
tpm[0, d] = ap
@@ -174,7 +177,8 @@ def tal_vardy_tpm_algorithm(block_size, design_snr, mu):
def merge_lr_based(q, mu):
lrs = q[0] / q[1]
- vals, indices, inv_indices = np.unique(lrs, return_index=True, return_inverse=True)
+ vals, indices, inv_indices = np.unique(
+ lrs, return_index=True, return_inverse=True)
# compare [1] (20). Ordering of representatives according to LRs.
temp = np.zeros((2, len(indices)), dtype=float)
if vals.size < mu:
diff --git a/gr-fec/python/fec/polar/channel_construction_bec.py b/gr-fec/python/fec/polar/channel_construction_bec.py
index dd0f386d81..edbfbacf45 100644
--- a/gr-fec/python/fec/polar/channel_construction_bec.py
+++ b/gr-fec/python/fec/polar/channel_construction_bec.py
@@ -41,7 +41,7 @@ def calc_one_recursion(iw0):
def calculate_bec_channel_capacities_loop(initial_channel, block_power):
- # compare [0, Arikan] eq. 6
+ # compare [0, Arikan] eq. 6
iw = np.array([initial_channel, ], dtype=float)
for i in range(block_power):
iw = calc_one_recursion(iw)
@@ -136,7 +136,7 @@ def plot_channel_capacities(capacity, save_file=None):
def plot_average_channel_distance(save_file=None):
- eta = 0.5 # design_snr_to_bec_eta(-1.5917)
+ eta = 0.5 # design_snr_to_bec_eta(-1.5917)
powers = np.arange(4, 26)
try:
@@ -219,5 +219,6 @@ def main():
# plot_average_channel_distance()
calculate_bec_channel_z_parameters(eta, block_size)
+
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/common.py b/gr-fec/python/fec/polar/common.py
index e60cc911f2..178544464a 100644
--- a/gr-fec/python/fec/polar/common.py
+++ b/gr-fec/python/fec/polar/common.py
@@ -46,7 +46,8 @@ class PolarCommon(object):
self.K = k
self.frozenbits = frozenbits
self.frozen_bit_position = frozen_bit_position
- self.info_bit_position = np.delete(np.arange(self.N), self.frozen_bit_position)
+ self.info_bit_position = np.delete(
+ np.arange(self.N), self.frozen_bit_position)
def _insert_frozen_bits(self, u):
prototype = np.empty(self.N, dtype=int)
diff --git a/gr-fec/python/fec/polar/decoder.py b/gr-fec/python/fec/polar/decoder.py
index 8cbda51017..d245f06e40 100644
--- a/gr-fec/python/fec/polar/decoder.py
+++ b/gr-fec/python/fec/polar/decoder.py
@@ -19,8 +19,10 @@ class PolarDecoder(PolarCommon):
def __init__(self, n, k, frozen_bit_position, frozenbits=None):
PolarCommon.__init__(self, n, k, frozen_bit_position, frozenbits)
- self.error_probability = 0.1 # this is kind of a dummy value. usually chosen individually.
- self.lrs = ((1 - self.error_probability) / self.error_probability, self.error_probability / (1 - self.error_probability))
+ # this is kind of a dummy value. usually chosen individually.
+ self.error_probability = 0.1
+ self.lrs = ((1 - self.error_probability) / self.error_probability,
+ self.error_probability / (1 - self.error_probability))
self.llrs = np.log(self.lrs)
def _llr_bit(self, bit):
@@ -77,8 +79,8 @@ class PolarDecoder(PolarCommon):
def _calculate_lrs(self, y, u):
ue = self._get_even_indices_values(u)
uo = self._get_odd_indices_values(u)
- ya = y[0:y.size//2]
- yb = y[(y.size//2):]
+ ya = y[0:y.size // 2]
+ yb = y[(y.size // 2):]
la = self._lr_decision_element(ya, (ue + uo) % 2)
lb = self._lr_decision_element(yb, ue)
return la, lb
@@ -140,7 +142,8 @@ class PolarDecoder(PolarCommon):
for i in range(self.N):
graph[i][self.power] = self._llr_bit(y[i])
decode_order = self._vector_bit_reversed(np.arange(self.N), self.power)
- decode_order = np.delete(decode_order, np.where(decode_order >= self.N // 2))
+ decode_order = np.delete(
+ decode_order, np.where(decode_order >= self.N // 2))
u = np.array([], dtype=int)
for pos in decode_order:
graph = self._butterfly(pos, 0, graph, u)
@@ -171,7 +174,8 @@ class PolarDecoder(PolarCommon):
# activate right side butterflies
u_even = self._get_even_indices_values(u)
u_odd = self._get_odd_indices_values(u)
- graph = self._butterfly(bf_entry_row, stage + 1, graph, (u_even + u_odd) % 2)
+ graph = self._butterfly(bf_entry_row, stage + 1,
+ graph, (u_even + u_odd) % 2)
lower_right = bf_entry_row + self.N // (2 ** (stage + 1))
graph = self._butterfly(lower_right, stage + 1, graph, u_even)
@@ -182,7 +186,8 @@ class PolarDecoder(PolarCommon):
def decode(self, data, is_packed=False):
if not len(data) == self.N:
- raise ValueError("len(data)={0} is not equal to n={1}!".format(len(data), self.N))
+ raise ValueError(
+ "len(data)={0} is not equal to n={1}!".format(len(data), self.N))
if is_packed:
data = np.unpackbits(data)
data = self._lr_sc_decoder_efficient(data)
@@ -192,12 +197,14 @@ class PolarDecoder(PolarCommon):
return data
def _extract_info_bits_reversed(self, y):
- info_bit_positions_reversed = self._vector_bit_reversed(self.info_bit_position, self.power)
+ info_bit_positions_reversed = self._vector_bit_reversed(
+ self.info_bit_position, self.power)
return y[info_bit_positions_reversed]
def decode_systematic(self, data):
if not len(data) == self.N:
- raise ValueError("len(data)={0} is not equal to n={1}!".format(len(data), self.N))
+ raise ValueError(
+ "len(data)={0} is not equal to n={1}!".format(len(data), self.N))
# data = self._reverse_bits(data)
data = self._lr_sc_decoder_efficient(data)
data = self._encode_natural_order(data)
diff --git a/gr-fec/python/fec/polar/encoder.py b/gr-fec/python/fec/polar/encoder.py
index d6279b1af5..273d6cc9cf 100644
--- a/gr-fec/python/fec/polar/encoder.py
+++ b/gr-fec/python/fec/polar/encoder.py
@@ -32,7 +32,8 @@ class PolarEncoder(PolarCommon):
def encode(self, data, is_packed=False):
if not len(data) == self.K:
- raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K))
+ raise ValueError(
+ "len(data)={0} is not equal to k={1}!".format(len(data), self.K))
if is_packed:
data = np.unpackbits(data)
if np.max(data) > 1 or np.min(data) < 0:
@@ -45,7 +46,8 @@ class PolarEncoder(PolarCommon):
def encode_systematic(self, data):
if not len(data) == self.K:
- raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K))
+ raise ValueError(
+ "len(data)={0} is not equal to k={1}!".format(len(data), self.K))
if np.max(data) > 1 or np.min(data) < 0:
raise ValueError("can only encode bits!")
@@ -102,7 +104,7 @@ def test_encoder_impls():
# frozenbits = np.zeros(n - k)
# frozenbitposition8 = np.array((0, 1, 2, 4), dtype=int) # keep it!
frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
- encoder = PolarEncoder(n, k, frozenbitposition) #, frozenbits)
+ encoder = PolarEncoder(n, k, frozenbitposition) # , frozenbits)
print('result:', compare_results(encoder, ntests, k))
print('Test rate-1 encoder/decoder chain results')
diff --git a/gr-fec/python/fec/polar/helper_functions.py b/gr-fec/python/fec/polar/helper_functions.py
index 357bccd5b2..bcddfb83c9 100644
--- a/gr-fec/python/fec/polar/helper_functions.py
+++ b/gr-fec/python/fec/polar/helper_functions.py
@@ -8,7 +8,8 @@
import numpy as np
-import time, sys
+import time
+import sys
import copy
@@ -125,8 +126,8 @@ def show_progress_bar(ndone, ntotal):
percentage = 100. * fract
ndone_chars = int(nchars * fract)
nundone_chars = nchars - ndone_chars
- sys.stdout.write('\r[{0}{1}] {2:5.2f}% ({3} / {4})'.format('=' * ndone_chars, ' ' * nundone_chars, percentage, ndone, ntotal))
-
+ sys.stdout.write('\r[{0}{1}] {2:5.2f}% ({3} / {4})'.format('=' *
+ ndone_chars, ' ' * nundone_chars, percentage, ndone, ntotal))
def mutual_information(w):
@@ -173,7 +174,6 @@ def main():
n = 6
m = 2 ** n
-
pos = np.arange(m)
rev_pos = bit_reverse_vector(pos, n)
print(pos)
@@ -190,6 +190,5 @@ def main():
print(a)
-
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/polar_channel_construction b/gr-fec/python/fec/polar/polar_channel_construction
index 8ea5753d48..6f1879acf7 100644
--- a/gr-fec/python/fec/polar/polar_channel_construction
+++ b/gr-fec/python/fec/polar/polar_channel_construction
@@ -18,14 +18,14 @@ def setup_parser():
override this and call the parent function. """
parser = ArgumentParser()
parser.add_argument("-c", "--channel", choices=('BEC', 'AWGN'),
- help="specify channel, currently BEC or AWGN (default='BEC')",
- default='BEC')
+ help="specify channel, currently BEC or AWGN (default='BEC')",
+ default='BEC')
parser.add_argument("-b", "--blocksize", type=int, dest="block_size",
- help="specify block size of polar code (default=16)", default=16)
+ help="specify block size of polar code (default=16)", default=16)
parser.add_argument("-s", "--design-snr", type=float, dest="design_snr",
- help="specify design SNR of polar code (default=0.0)", default=0.0)
+ help="specify design SNR of polar code (default=0.0)", default=0.0)
parser.add_argument("-k", "--mu", type=int,
- help="specify block size of polar code (default=2)", default=2)
+ help="specify block size of polar code (default=2)", default=2)
return parser
@@ -37,7 +37,7 @@ def main():
args = parser.parse_args()
z_params = cc.get_z_params(False, args.channel, args.block_size,
- args.design_snr, args.mu)
+ args.design_snr, args.mu)
print(z_params)
diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py
index 4e1b40a7f7..7ce9f57bb0 100644
--- a/gr-fec/python/fec/polar/testbed.py
+++ b/gr-fec/python/fec/polar/testbed.py
@@ -52,7 +52,8 @@ def is_equal(first, second):
result = first == second
for i in range(len(result)):
print(
- "{0:4}: {1:2} == {2:1} = {3}".format(i, first[i], second[i], result[i])
+ "{0:4}: {1:2} == {2:1} = {3}".format(
+ i, first[i], second[i], result[i])
)
return False
return True
@@ -132,7 +133,8 @@ def channel_analysis():
channel_counter = np.load(filename)
print(np.min(channel_counter), np.max(channel_counter))
channel_counter[0] = np.min(channel_counter)
- good_indices = find_good_indices(channel_counter, channel_counter.size // 2)
+ good_indices = find_good_indices(
+ channel_counter, channel_counter.size // 2)
info_bit_positions = np.where(good_indices > 0)
print(info_bit_positions)
frozen_bit_positions = np.delete(
@@ -303,7 +305,8 @@ def find_decoder_subframes(frozen_mask):
print("--------------------------")
ll = l
sub_t = sub_mask[i]
- print("{0:4} lock {1:4} value: {2} in sub {3}".format(i, 2 ** (l + 1), v, t))
+ print("{0:4} lock {1:4} value: {2} in sub {3}".format(
+ i, 2 ** (l + 1), v, t))
def systematic_encoder_decoder_chain_test():
diff --git a/gr-fec/python/fec/qa_fecapi_dummy.py b/gr-fec/python/fec/qa_fecapi_dummy.py
index fa1abdf789..065d3ed6d2 100644
--- a/gr-fec/python/fec/qa_fecapi_dummy.py
+++ b/gr-fec/python/fec/qa_fecapi_dummy.py
@@ -26,7 +26,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
-
+
def test_parallelism0_00(self):
frame_size = 30
enc = fec.dummy_encoder_make(frame_size * 8)
@@ -52,7 +52,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
-
+
self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism0_02(self):
@@ -66,7 +66,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
-
+
self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_00(self):
@@ -113,7 +113,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
self.tb.run()
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
-
+
self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_03(self):
diff --git a/gr-fec/python/fec/threaded_decoder.py b/gr-fec/python/fec/threaded_decoder.py
index b5444a0a92..a578537798 100644
--- a/gr-fec/python/fec/threaded_decoder.py
+++ b/gr-fec/python/fec/threaded_decoder.py
@@ -16,8 +16,8 @@ class threaded_decoder(gr.hier_block2):
def __init__(self, decoder_list_0, input_size, output_size):
gr.hier_block2.__init__(
self, "Threaded Decoder",
- gr.io_signature(1, 1, input_size*1),
- gr.io_signature(1, 1, output_size*1))
+ gr.io_signature(1, 1, input_size * 1),
+ gr.io_signature(1, 1, output_size * 1))
self.decoder_list_0 = decoder_list_0
@@ -33,11 +33,12 @@ class threaded_decoder(gr.hier_block2):
fec.get_decoder_output_size(decoder_list_0[0]))
for i in range(len(decoder_list_0)):
- self.connect((self.deinterleave_0, i), (self.generic_decoders_0[i], 0))
+ self.connect((self.deinterleave_0, i),
+ (self.generic_decoders_0[i], 0))
for i in range(len(decoder_list_0)):
- self.connect((self.generic_decoders_0[i], 0), (self.interleave_0, i))
-
+ self.connect(
+ (self.generic_decoders_0[i], 0), (self.interleave_0, i))
self.connect((self, 0), (self.deinterleave_0, 0))
self.connect((self.interleave_0, 0), (self, 0))
diff --git a/gr-fec/python/fec/threaded_encoder.py b/gr-fec/python/fec/threaded_encoder.py
index 94ae01df21..b933f60402 100644
--- a/gr-fec/python/fec/threaded_encoder.py
+++ b/gr-fec/python/fec/threaded_encoder.py
@@ -17,27 +17,29 @@ class threaded_encoder(gr.hier_block2):
def __init__(self, encoder_list_0, input_size, output_size):
gr.hier_block2.__init__(
self, "Threaded Encoder",
- gr.io_signature(1, 1, input_size*1),
- gr.io_signature(1, 1, output_size*1))
+ gr.io_signature(1, 1, input_size * 1),
+ gr.io_signature(1, 1, output_size * 1))
self.encoder_list_0 = encoder_list_0
self.fec_deinterleave_0 = blocks.deinterleave(input_size,
fec.get_encoder_input_size(encoder_list_0[0]))
- self.generic_encoders_0 = [];
+ self.generic_encoders_0 = []
for i in range(len(encoder_list_0)):
self.generic_encoders_0.append(fec.encoder(encoder_list_0[i],
input_size, output_size))
self.fec_interleave_0 = blocks.interleave(output_size,
- fec.get_encoder_output_size(encoder_list_0[0]))
+ fec.get_encoder_output_size(encoder_list_0[0]))
for i in range(len(encoder_list_0)):
- self.connect((self.fec_deinterleave_0, i), (self.generic_encoders_0[i], 0))
+ self.connect((self.fec_deinterleave_0, i),
+ (self.generic_encoders_0[i], 0))
for i in range(len(encoder_list_0)):
- self.connect((self.generic_encoders_0[i], 0), (self.fec_interleave_0, i))
+ self.connect(
+ (self.generic_encoders_0[i], 0), (self.fec_interleave_0, i))
self.connect((self, 0), (self.fec_deinterleave_0, 0))
self.connect((self.fec_interleave_0, 0), (self, 0))