path: root/gr-fec/python/fec/polar/
diff options
Diffstat (limited to 'gr-fec/python/fec/polar/')
1 files changed, 493 insertions, 0 deletions
diff --git a/gr-fec/python/fec/polar/ b/gr-fec/python/fec/polar/
new file mode 100644
index 0000000000..f263c6d056
--- /dev/null
+++ b/gr-fec/python/fec/polar/
@@ -0,0 +1,493 @@
+#!/usr/bin/env python
+import numpy as np
+import matplotlib.pyplot as plt
+from encoder import PolarEncoder as enc
+# input alphabet X is always {0,1}
+# output alphabet is arbitrary
+# transition probabilities are arbitrary W(y|x)
+def bit_reverse(value, n):
+ # is this really missing in NumPy???
+ bits = np.zeros(n, type(value))
+ for index in range(n):
+ mask = 1
+ mask = np.left_shift(mask, index)
+ bit = np.bitwise_and(value, mask)
+ bit = np.right_shift(bit, index)
+ bits[index] = bit
+ bits = bits[::-1]
+ result = 0
+ for index, bit in enumerate(bits):
+ bit = np.left_shift(bit, index)
+ result += bit
+ return result
+def get_Bn(n):
+ # this is a bit reversal matrix.
+ lw = int(np.log2(n)) # number of used bits
+ indexes = [bit_reverse(i, lw) for i in range(n)]
+ Bn = np.zeros((n, n), type(n))
+ for i, index in enumerate(indexes):
+ Bn[i][index] = 1
+ return Bn
+def get_Fn(n):
+ # this matrix defines the actual channel combining.
+ if n == 1:
+ return np.array([1, ])
+ F2 = np.array([[1, 0], [1, 1]],
+ nump = int(np.log2(n)) - 1 # number of Kronecker products to calculate
+ Fn = F2
+ for i in range(nump):
+ Fn = np.kron(Fn, F2)
+ return Fn
+def get_Gn(n):
+ # this matrix is called generator matrix
+ if not is_power_of_2(n):
+ print "invalid input"
+ return None
+ if n == 1:
+ return np.array([1, ])
+ Bn = get_Bn(n)
+ Fn = get_Fn(n)
+ Gn =, Fn)
+ return Gn
+def odd_rec(iwn):
+ return iwn ** 2
+def even_rec(iwn):
+ return 2 * iwn - iwn ** 2
+def calc_one_recursion(iw0):
+ iw1 = np.zeros(2 * len(iw0)) # double values
+ for i in range(len(iw0)):
+ # careful indices screw you because paper is '1' based :(
+ iw1[2 * i] = odd_rec(iw0[i])
+ iw1[2 * i + 1] = even_rec(iw0[i])
+ return iw1
+def calculate_bec_channel_capacities(eta, n):
+ iw = 1 - eta # holds for BEC as stated in paper
+ lw = int(np.log2(n))
+ iw = [iw, ]
+ for i in range(lw):
+ iw = calc_one_recursion(iw)
+ return iw
+def bsc_channel(p):
+ '''
+ binary symmetric channel (BSC)
+ output alphabet Y = {0, 1} and
+ W(0|0) = W(1|1) and W(1|0) = W(0|1)
+ this function returns a prob's vector for a BSC
+ p denotes an erroneous transistion
+ '''
+ if not (p >= 0.0 and p <= 1.0):
+ print "given p is out of range!"
+ return ()
+ # 0 -> 0, 0 -> 1, 1 -> 0, 1 -> 1
+ W = np.array([[1 - p, p], [p, 1 - p]])
+ return W
+def bec_channel(eta):
+ '''
+ binary erasure channel (BEC)
+ for each y e Y
+ W(y|0) * W(y|1) = 0 or W(y|0) = W(y|1)
+ transistions are 1 -> 1 or 0 -> 0 or {0, 1} -> ? (erased symbol)
+ '''
+ print eta
+ # looks like BSC but should be interpreted differently.
+ W = (1 - eta, eta, 1 - eta)
+ return W
+def is_power_of_2(num):
+ if type(num) != int:
+ return False # make sure we only compute integers.
+ return num != 0 and ((num & (num - 1)) == 0)
+def combine_W2(u):
+ # This function applies the channel combination for W2
+ x1 = (u[0] + u[1]) % 2
+ x2 = (u[1])
+ return np.array([x1, x2],
+def combine_W4(u):
+ im = np.concatenate((combine_W2(u[0:2]), combine_W2(u[2:4])))
+ print " = ", im
+ swappy = im[1]
+ im[1] = im[2]
+ im[2] = swappy
+ print "combine_W4.reverse_shuffled = ", im
+ return np.concatenate((combine_W2(im[0:2]), combine_W2(im[2:4])))
+def combine_Wn(u):
+ '''Combine vector u for encoding. It's described in the "Channel combining" section'''
+ # will throw error if len(u) isn't a power of 2!
+ n = len(u)
+ G = get_Gn(n)
+ return, G) % 2
+def unpack_byte(byte, nactive):
+ if np.amin(byte) < 0 or np.amax(byte) > 255:
+ return None
+ if not byte.dtype == np.uint8:
+ byte = byte.astype(np.uint8)
+ if nactive == 0:
+ return np.array([], dtype=np.uint8)
+ return np.unpackbits(byte)[-nactive:]
+def pack_byte(bits):
+ if len(bits) == 0:
+ return 0
+ if np.amin(bits) < 0 or np.amax(bits) > 1: # only '1' and '0' in bits array allowed!
+ return None
+ bits = np.concatenate((np.zeros(8 - len(bits), dtype=np.uint8), bits))
+ res = np.packbits(bits)[0]
+ return res
+def calculate_conditional_prob(y, u, channel):
+ # y and u same size and 1d!
+ # channel 2 x 2 matrix!
+ x1 = (u[0] + u[1]) % 2
+ s = 0 if y[0] == x1 else 1
+ # print "W(", y[0], "|", u[0], "+", u[1], "=", s, ") =", channel[y[0]][x1]
+ w112 = channel[y[0]][x1]
+ w22 = channel[y[1]][u[1]]
+ return w112 * w22
+def calculate_w2_probs():
+ w0 = np.array([[0.9, 0.1], [0.1, 0.9]])
+ print w0
+ w2 = np.zeros((4, 4), dtype=float)
+ print "W(y1|u1+u2)"
+ for y in range(4):
+ ybits = unpack_byte(np.array([y, ]), 2)
+ for u in range(4):
+ ubits = unpack_byte(np.array([u, ]), 2)
+ prob = calculate_conditional_prob(ybits, ubits, w0)
+ w2[y][u] = prob
+ # print "W(y1,y2|u1,u2) =", prob
+ return w2
+def get_prob(y, u, channel):
+ return channel[y][u]
+def get_wn_prob(y, u, channel):
+ x = combine_Wn(u)
+ result = 1
+ for i in range(len(x)):
+ result *= get_prob(y[i], x[i], channel)
+ return result
+def calculate_Wn_probs(n, channel):
+ # print "calculate_wn_probs"
+ ncomb = 2 ** n
+ wn = np.ones((ncomb, ncomb), dtype=float)
+ for y in range(ncomb):
+ ybits = unpack_byte(np.array([y, ]), n)
+ for u in range(ncomb):
+ ubits = unpack_byte(np.array([u, ]), n)
+ xbits = combine_Wn(ubits)
+ wn[y][u] = get_wn_prob(ybits, ubits, channel)
+ return wn
+def calculate_channel_splitting_probs(wn, n):
+ print wn
+ wi = np.zeros((n, 2 ** n, 2 ** n), dtype=float)
+ divider = (1.0 / (2 ** (n - 1)))
+ for i in range(n):
+ for y in range(2 ** n):
+ ybits = unpack_byte(np.array([y, ]), n)
+ print
+ for u in range(2 ** n):
+ ubits = unpack_byte(np.array([u, ]), n)
+ usc = ubits[0:i]
+ u = ubits[i]
+ usum = ubits[i+1:]
+ fixed_pu = np.append(usc, u)
+ my_iter = []
+ if len(usum) == 0:
+ my_iter.append(np.append(np.zeros(8 - len(fixed_pu), dtype=np.uint8), fixed_pu))
+ else:
+ uiter = np.arange(2 ** len(usum), dtype=np.uint8)
+ for ui in uiter:
+ element = unpack_byte(ui, len(usum))
+ item = np.append(fixed_pu, element)
+ item = np.append(np.zeros(8 - len(item), dtype=np.uint8), item)
+ my_iter.append(item)
+ my_iter = np.array(my_iter)
+ prob_sum = 0
+ for item in my_iter:
+ upos = np.packbits(item)
+ # print "y=", np.binary_repr(y, n), "item=", item, "u=", np.binary_repr(upos, n)
+ wni = wn[y][upos]
+ prob_sum += wni
+ prob_sum *= divider
+ print "W[{5}]({0},{1},{2}|{3}) = {4}".format(ybits[0], ybits[1], usc, u, prob_sum, i)
+ # print "i=", i, "y=", np.binary_repr(y, n), " fixed=", fixed_pu, "usum=", usum, " WN(i) =", prob_sum
+ wi[i][y][u] = prob_sum
+ for i in range(n):
+ print
+ for y in range(2 ** n):
+ ybits = unpack_byte(np.array([y, ]), n)
+ for u in range(2 ** n):
+ print "W[{}] ({},{}|{})".format(i, ybits[0], ybits[1], u)
+ return wi
+def mutual_information(w):
+ '''
+ calculate mutual information I(W)
+ I(W) = sum over y e Y ( sum over x e X ( ... ) )
+ .5 W(y|x) log frac { W(y|x) }{ .5 W(y|0) + .5 W(y|1) }
+ '''
+ ydim, xdim = np.shape(w)
+ i = 0.0
+ for y in range(ydim):
+ for x in range(xdim):
+ v = w[y][x] * np.log2(w[y][x] / (0.5 * w[y][0] + 0.5 * w[y][1]))
+ i += v
+ i /= 2.0
+ return i
+def capacity(w):
+ '''
+ find supremum I(W) of a channel.
+ '''
+ return w
+def bhattacharyya_parameter(w):
+ '''bhattacharyya parameter is a measure of similarity between two prob. distributions'''
+ # sum over all y e Y for sqrt( W(y|0) * W(y|1) )
+ dim = np.shape(w)
+ ydim = dim[0]
+ xdim = dim[1]
+ z = 0.0
+ for y in range(ydim):
+ z += np.sqrt(w[y][0] * w[y][1])
+ # need all
+ return z
+def w_transition_prob(y, u, p):
+ return 1 - p if y == u else p
+def w_split_prob(y, u, G, p):
+ ''' Calculate channel splitting probabilities. '''
+ N = len(y) # number of combined channels
+ df = N - len(u) # degrees of freedom.
+ prob = 0.0
+ for uf in range(2 ** df):
+ utemp = unpack_byte(np.array([uf, ]), df)
+ ub = np.concatenate((u, utemp))
+ # print "y=", y, "\tu=", ub
+ x =, G) % 2
+ # print "x=", x
+ w = 1.0
+ for i in range(N):
+ w *= w_transition_prob(y[i], x[i], p)
+ # print "for u1=", uf, "prob=", w
+ prob += w
+ divider = 1.0 / (2 ** (N - 1))
+ return divider * prob
+def wn_split_channel(N, p):
+ G = get_Gn(N)
+ y = np.zeros((N, ), dtype=np.uint8)
+ n = 1
+ u = np.zeros((n + 1, ), dtype=np.uint8)
+ pn = w_split_prob(y, u, G, p)
+ # print "pn=", pn
+ z_params = []
+ c_params = []
+ for w in range(N):
+ nentries = (2 ** N) * (2 ** w)
+ print "for w=", w, " nentries=", nentries
+ w_probs = np.zeros((nentries, 2), dtype=float)
+ for y in range(2 ** N):
+ yb = unpack_byte(np.array([y, ]), N)
+ # print "\n\nyb", yb
+ for u in range(2 ** (w + 1)):
+ ub = unpack_byte(np.array([u, ]), w + 1)
+ # print "ub", ub
+ wp = w_split_prob(yb, ub, G, p)
+ ufixed = ub[0:-1]
+ yindex = y * (2 ** w) + pack_byte(ufixed)
+ uindex = ub[-1]
+ # print "y=", y, "ub", u, " prob=", wp, "index=[", yindex, ", ", uindex, "]"
+ w_probs[yindex][uindex] = wp
+ print "\n", np.sum(w_probs, axis=0)
+ z = bhattacharyya_parameter(w_probs)
+ z_params.append(z)
+ c = mutual_information(w_probs)
+ c_params.append(c)
+ print "W=", w, "Z=", z, "capacity=", c
+ return z_params, c_params
+def wminus(y0, y1, u0, p):
+ prob = 0.0
+ for i in range(2):
+ # print y0, y1, u0, i
+ u0u1 = (i + u0) % 2
+ w0 = w_transition_prob(y0, u0u1, p)
+ w1 = w_transition_prob(y1, i, p)
+ # print "w0=", w0, "\tw1=", w1
+ prob += w0 * w1
+ prob *= 0.5
+ return prob
+def wplus(y0, y1, u0, u1, p):
+ u0u1 = (u0 + u1) % 2
+ w0 = w_transition_prob(y0, u0u1, p)
+ w1 = w_transition_prob(y1, u1, p)
+ return 0.5 * w0 * w1
+def w2_split_channel(p):
+ wm_probs = np.zeros((4, 2), dtype=float)
+ for y0 in range(2):
+ for y1 in range(2):
+ for u0 in range(2):
+ wm = wminus(y0, y1, u0, p)
+ wm_probs[y0 * 2 + y1][u0] = wm
+ print wm_probs
+ print "W- Z parameter=", bhattacharyya_parameter(wm_probs)
+ print "I(W-)=", mutual_information(wm_probs)
+ chan_prob = 0.0
+ wp_probs = np.zeros((8, 2), dtype=float)
+ for y0 in range(2):
+ for y1 in range(2):
+ for u0 in range(2):
+ for u1 in range(2):
+ wp = wplus(y0, y1, u0, u1, p)
+ wp_probs[4 * y0 + 2 * y1 + u0][u1] = wp
+ print wp_probs
+ print "W+ Z parameter=", bhattacharyya_parameter(wp_probs)
+ print "I(W+)=", mutual_information(wp_probs)
+def plot_channel(probs, p, name):
+ nc = len(probs)
+ plt.plot(probs)
+ plt.grid()
+ plt.xlim((0, nc - 1))
+ plt.ylim((0.0, 1.0))
+ plt.xlabel('channel')
+ plt.ylabel('capacity')
+ fname = name, '_', p, '_', nc
+ print fname
+ fname = ''.join(str(n) for n in fname)
+ print fname
+ fname = fname.replace('.', '_')
+ print fname
+ fname = ''.join((fname, '.pdf'))
+ print fname
+ plt.savefig(fname)
+ #
+def main():
+ np.set_printoptions(precision=4, linewidth=150)
+ print "POLAR code!"
+ w2 = calculate_w2_probs()
+ print w2
+ p = 0.1
+ n = 2
+ wn = calculate_Wn_probs(n, bsc_channel(p))
+ wi = calculate_channel_splitting_probs(wn, n)
+ w0 = bsc_channel(p)
+ print w0
+ iw = mutual_information(w0)
+ print "I(W)=", iw
+ print 1 - (p * np.log2(1.0 / p) + (1 - p) * np.log2(1.0 / (1 - p)))
+ print "Z param call"
+ bhattacharyya_parameter(w0)
+ print "old vs new encoder"
+ p = 0.1
+ w2_split_channel(p)
+ z, c = wn_split_channel(4, p)
+ eta = 0.3
+ nc = 1024
+ probs = calculate_bec_channel_capacities(eta, nc)
+ # plot_channel(probs, eta, 'bec_capacity')
+ # plot_channel(c, p, 'bsc_capacity')
+ nt = 2 ** 5
+ fG = get_Gn(nt)
+ # print fG
+ encoder = enc(nt, 4, np.arange(nt - 4, dtype=int))
+ # print encoder.get_gn()
+ comp = np.equal(fG, encoder.get_gn())
+ print "is equal?", np.all(comp)
+if __name__ == '__main__':
+ main()