summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/utils/alignment.py
blob: 531c39830e2c4dd03e6f4d6946af7a6bb0257f34 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
#
# Copyright 2011 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#
"""
This module contains functions for aligning sequences.

>>> import random
>>> rndm = random.Random()
>>> rndm.seed(1234)
>>> ran_seq = [rndm.randint(0,1) for i in range(0, 100)]
>>> offset_seq = [0] * 20 + ran_seq
>>> correct, overlap, offset = align_sequences(ran_seq, offset_seq)
>>> print(correct, overlap, offset)
(1.0, 100, -20)
>>> offset_err_seq = []
>>> for bit in offset_seq:
...     if rndm.randint(0,4) == 4:
...         offset_err_seq.append(rndm.randint(0,1))
...     else:
...         offset_err_seq.append(bit)
>>> correct, overlap, offset = align_sequences(ran_seq, offset_err_seq)
>>> print(overlap, offset)
(100, -20)

"""

import random

# DEFAULT PARAMETERS
# If the fraction of matching bits between two sequences is greater than
# this the sequences are assumed to be aligned.
def_correct_cutoff = 0.9
# The maximum offset to test during sequence alignment.
def_max_offset = 500
# The maximum number of samples to take from two sequences to check alignment.
def_num_samples = 1000


def compare_sequences(d1, d2, offset, sample_indices=None):
    """
    Takes two binary sequences and an offset and returns the number of
    matching entries and the number of compared entries.
    d1 & d2 -- sequences
    offset -- offset of d2 relative to d1
    sample_indices -- a list of indices to use for the comparison
    """
    max_index = min(len(d1), len(d2) + offset)
    if sample_indices is None:
        sample_indices = list(range(0, max_index))
    correct = 0
    total = 0
    for i in sample_indices:
        if i >= max_index:
            break
        if d1[i] == d2[i - offset]:
            correct += 1
        total += 1
    return (correct, total)


def random_sample(size, num_samples=def_num_samples, seed=None):
    """
    Returns a set of random integers between 0 and (size-1).
    The set contains no more than num_samples integers.
    """
    rndm = random.Random()
    rndm.seed(seed)
    if num_samples > size:
        indices = set(range(0, size))
    else:
        if num_samples > size / 2:
            num_samples = num_samples / 2
        indices = set([])
        while len(indices) < num_samples:
            index = rndm.randint(0, size - 1)
            indices.add(index)
    indices = list(indices)
    indices.sort()
    return indices


def align_sequences(d1, d2,
                    num_samples=def_num_samples,
                    max_offset=def_max_offset,
                    correct_cutoff=def_correct_cutoff,
                    seed=None,
                    indices=None):
    """
    Takes two sequences and finds the offset and which the two sequences best
    match.  It returns the fraction correct, the number of entries compared,
    the offset.
    d1 & d2 -- sequences to compare
    num_samples -- the maximum number of entries to compare
    max_offset -- the maximum offset between the sequences that is checked
    correct_cutoff -- If the fraction of bits correct is greater than this then
                      the offset is assumed to optimum.
    seed -- a random number seed
    indices -- an explicit list of the indices used to compare the two sequences
    """
    max_overlap = max(len(d1), len(d2))
    if indices is None:
        indices = random_sample(max_overlap, num_samples, seed)
    max_frac_correct = 0
    best_offset = None
    best_compared = None
    best_correct = None
    pos_range = list(range(0, min(len(d1), max_offset)))
    neg_range = list(range(-1, -min(len(d2), max_offset), -1))
    # Interleave the positive and negative offsets.
    int_range = [item for items in zip(pos_range, neg_range) for item in items]
    for offset in int_range:
        correct, compared = compare_sequences(d1, d2, offset, indices)
        frac_correct = 1.0 * correct / compared
        if frac_correct > max_frac_correct:
            max_frac_correct = frac_correct
            best_offset = offset
            best_compared = compared
            best_correct = correct
            if frac_correct > correct_cutoff:
                break
    return max_frac_correct, best_compared, best_offset, indices


if __name__ == "__main__":
    import doctest
    doctest.testmod()