From 59435930756c38342064fbdb36ac01b015b81bba Mon Sep 17 00:00:00 2001 From: de-git Date: Fri, 5 May 2017 20:40:31 +0100 Subject: [PATCH 01/10] added unit tests --- .gitignore | 1 + .pylintrc | 2 + alignment/sequencealigner_test.py | 177 ++++++++++++++++++++++++++++++ 3 files changed, 180 insertions(+) create mode 100644 .pylintrc create mode 100644 alignment/sequencealigner_test.py diff --git a/.gitignore b/.gitignore index f24cd99..4ddbf2e 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ pip-log.txt # Unit test / coverage reports .coverage .tox +.cache #Translations *.mo diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..1756fdc --- /dev/null +++ b/.pylintrc @@ -0,0 +1,2 @@ +[MESSAGES CONTROL] +disable=C0111,C0103 diff --git a/alignment/sequencealigner_test.py b/alignment/sequencealigner_test.py new file mode 100644 index 0000000..5340aeb --- /dev/null +++ b/alignment/sequencealigner_test.py @@ -0,0 +1,177 @@ +""" +Unit test +""" +from .vocabulary import Vocabulary + +from .sequencealigner import ( + GlobalSequenceAligner, + StrictGlobalSequenceAligner, + LocalSequenceAligner, + Sequence, + SimpleScoring +) + +DEFAULT_MATCH_SCORE = 3 +DEFAULT_MISMATCH_SCORE = -1 +DEFAULT_GAP_SCORE = -2 + +def _align( + first, + second, + aligner): + vocab = Vocabulary() + score, encodeds = aligner.align( + vocab.encodeSequence(Sequence(first)), + vocab.encodeSequence(Sequence(second)), + backtrace=True + ) + return score, [vocab.decodeSequenceAlignment(encoded) for encoded in encodeds] + +def _global_align( + first, second, + match_score=DEFAULT_MATCH_SCORE, + mismatch_score=DEFAULT_MISMATCH_SCORE, + gap_score=DEFAULT_GAP_SCORE): + scoring = SimpleScoring(match_score, mismatch_score) + return _align(first, second, aligner=GlobalSequenceAligner(scoring, gap_score)) + +def _strict_global_align( + first, second, + match_score=DEFAULT_MATCH_SCORE, + mismatch_score=DEFAULT_MISMATCH_SCORE, + gap_score=DEFAULT_GAP_SCORE): + scoring = SimpleScoring(match_score, mismatch_score) + return _align(first, second, aligner=StrictGlobalSequenceAligner(scoring, gap_score)) + +def _local_align( + first, second, + match_score=DEFAULT_MATCH_SCORE, + mismatch_score=DEFAULT_MISMATCH_SCORE, + gap_score=DEFAULT_GAP_SCORE): + scoring = SimpleScoring(match_score, mismatch_score) + return _align(first, second, aligner=LocalSequenceAligner(scoring, gap_score)) + + +class CommonSequenceAlignerTests(object): + def _align(self, _first, _second): + raise Exception('not implemented') + + # def test_no_match(self): + # score, alignments = self._align('ab', 'xy') + # assert len(alignments) == 0 + # assert score == 0 + + def test_exact_match(self): + score, alignments = self._align('ab', 'ab') + print("alignment:", alignments[0]) + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a b' + assert str(alignments[0].second) == 'a b' + assert alignments[0].percentIdentity() == 100.0 + assert alignments[0].percentSimilarity() == 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 2 + assert alignments[0].score == score + + def test_exact_left_partial_match(self): + score, alignments = self._align('xaby', 'ab') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a b' + assert str(alignments[0].second) == 'a b' + assert alignments[0].score == score + assert alignments[0].percentIdentity() == 100.0 + assert alignments[0].percentSimilarity() == 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 2 + + def test_exact_right_partial_match(self): + score, alignments = self._align('ab', 'xaby') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a b' + assert str(alignments[0].second) == 'a b' + assert alignments[0].percentIdentity() == 100.0 + assert alignments[0].percentSimilarity() == 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 2 + assert alignments[0].score == score + + def test_exact_left_partial_match_with_gap(self): + score, alignments = self._align('xaby', 'aob') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a - b' + assert str(alignments[0].second) == 'a o b' + assert alignments[0].percentIdentity() == 2 / 3 * 100.0 + assert alignments[0].percentSimilarity() == 2 / 3 * 100.0 + assert alignments[0].percentGap() == 1 / 3 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_GAP_SCORE + assert alignments[0].score == score + + def test_exact_left_partial_match_with_mismatch(self): + score, alignments = self._align('xamby', 'aob') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a m b' + assert str(alignments[0].second) == 'a o b' + assert alignments[0].percentIdentity() == 2 / 3 * 100.0 + assert alignments[0].percentSimilarity() == 2 / 3 * 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_MISMATCH_SCORE + assert alignments[0].score == score + + +class TestGlobalSequenceAligner(CommonSequenceAlignerTests): + def _align(self, first, second): + return _global_align(first, second) + + +class TestStrictGlobalSequenceAligner(CommonSequenceAlignerTests): + def _align(self, first, second): + return _strict_global_align(first, second) + + def test_exact_left_partial_match(self): + score, alignments = self._align('xaby', 'ab') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'x a b y' + assert str(alignments[0].second) == '- a b -' + assert alignments[0].score == score + assert alignments[0].percentIdentity() == 2 / 4 * 100.0 + assert alignments[0].percentSimilarity() == 2 / 4 * 100.0 + assert alignments[0].percentGap() == 2 / 4 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_GAP_SCORE * 2 + + def test_exact_right_partial_match(self): + score, alignments = self._align('ab', 'xaby') + assert len(alignments) == 1 + assert str(alignments[0].first) == '- a b -' + assert str(alignments[0].second) == 'x a b y' + assert alignments[0].percentIdentity() == 2 / 4 * 100.0 + assert alignments[0].percentSimilarity() == 2 / 4 * 100.0 + assert alignments[0].percentGap() == 2 / 4 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_GAP_SCORE * 2 + assert alignments[0].score == score + + def test_exact_left_partial_match_with_gap(self): + score, alignments = self._align('xaby', 'aob') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'x a - b y' + assert str(alignments[0].second) == '- a o b -' + assert alignments[0].percentIdentity() == 2 / 5 * 100.0 + assert alignments[0].percentSimilarity() == 2 / 5 * 100.0 + assert alignments[0].percentGap() == 3 / 5 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_GAP_SCORE * 3 + assert alignments[0].score == score + + def test_exact_left_partial_match_with_mismatch(self): + score, alignments = self._align('xamby', 'aob') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'x a m b y' + assert str(alignments[0].second) == '- a o b -' + assert alignments[0].percentIdentity() == 2 / 5 * 100.0 + assert alignments[0].percentSimilarity() == 2 / 5 * 100.0 + assert alignments[0].percentGap() == 2 / 5 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_MISMATCH_SCORE + DEFAULT_GAP_SCORE * 2 + assert alignments[0].score == score + + +class TestLocalSequenceAligner(CommonSequenceAlignerTests): + def _align(self, first, second): + return _local_align(first, second) From e5e4dfa1aeb7ac3a76c17e82ba8f3144df2b456a Mon Sep 17 00:00:00 2001 From: de-git Date: Fri, 5 May 2017 21:18:08 +0100 Subject: [PATCH 02/10] added alternative non-recursive smith waterman implementation --- alignment/sequencealigner.py | 139 ++++++++++++++++++++++++++++++ alignment/sequencealigner_test.py | 17 ++++ 2 files changed, 156 insertions(+) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index 31abca1..6f3f820 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -1,4 +1,5 @@ from builtins import range +from itertools import zip_longest try: import numpypy as numpy except ImportError: @@ -398,3 +399,141 @@ def backtraceFrom(self, first, second, f, i, j, alignments, alignment): self.backtraceFrom(first, second, f, i - 1, j, alignments, alignment) alignment.pop() + +class LinkedListNode(object): + def __init__(self, data, next_node=None): + self.data = data + self.next_node = next_node + + def __str__(self): + if self.next_node is not None: + return str(self.data) + ' -> ' + str(self.next_node) + else: + return str(self.data) + + def __iter__(self): + yield self.data + next_node = self.next_node + while next_node is not None: + yield next_node.data + next_node = next_node.next_node + +def _path_positions_to_indices(positions): + return [ + p - 1 if p_next is None or p_next != p else None + for p, p_next in zip_longest(positions, positions[1:]) + ] + +def _path_to_indices(path): + positions1, positions2 = zip(*path) + return ( + _path_positions_to_indices(positions1), + _path_positions_to_indices(positions2) + ) + +def _map_indices(indices, seq, default_value=None): + return [seq[i] if i is not None else default_value for i in indices] + +def _path_to_alignment(score_matrix, path, s1, s2, gap=GAP_CODE): + indices1, indices2 = _path_to_indices(path) + matched_seq1 = _map_indices(indices1, s1) + matched_seq2 = _map_indices(indices2, s2) + cum_scores = [score_matrix[i][j] for i, j in path] + scores = [score2 - score1 for score1, score2 in zip([0] + cum_scores, cum_scores)] + identical_count = sum([ + 1 if i1 is not None and i2 is not None and s1[i1] == s2[i2] else 0 + for i1, i2 in zip(indices1, indices2) + ]) + similar_count = sum([ + 1 if score > 0 else 0 + for score in scores + ]) + gap_count = sum([ + 1 if i1 is None or i2 is None else 0 + for i1, i2 in zip(indices1, indices2) + ]) + total_score = sum(scores) + first = Sequence([c or gap for c in matched_seq1]) + second = Sequence([c or gap for c in matched_seq2]) + seq_alignment = SequenceAlignment(first, second) + seq_alignment.scores = scores + seq_alignment.score = total_score + seq_alignment.identicalCount = identical_count + seq_alignment.similarCount = similar_count + seq_alignment.gapCount = gap_count + seq_alignment.first_indices = indices1 + seq_alignment.second_indices = indices2 + return seq_alignment + +class SmithWatermanAligner(object): + def __init__(self, scoring, gap_score): + self.scoring = scoring + self.gap_score = gap_score + + def _create_score_matrix(self, rows, cols, calc_score): + score_matrix = numpy.zeros((rows, cols), int) + + # Fill the scoring matrix. + for i in range(1, rows): + for j in range(1, cols): + score_matrix[i][j] = calc_score(score_matrix, i, j) + return score_matrix + + def _traceback(self, score_matrix, start_locs): + pending_roots = [ + LinkedListNode(tuple(loc)) + for loc in start_locs + ] + cur_roots = [] + while len(pending_roots) > 0: + next_pending_roots = [] + for n in pending_roots: + i, j = n.data + moves = self._next_moves(score_matrix, i, j) + if len(moves) == 0: + cur_roots.append(n) + else: + next_pending_roots.extend([ + LinkedListNode(loc, n) + for loc in moves + ]) + pending_roots = next_pending_roots + return cur_roots + + def _next_moves(self, score_matrix, i, j): + diag = score_matrix[i - 1][j - 1] + up = score_matrix[i - 1][j] + left = score_matrix[i][j - 1] + max_score = max(diag, up, left) + moves = [] + if max_score == 0: + return moves + if diag == max_score: + moves.append((i - 1, j - 1)) + if up == max_score: + moves.append((i - 1, j)) + if left == max_score: + moves.append((i, j - 1)) + return moves + + def align(self, s1, s2, backtrace=True, gap=GAP_CODE): + calc_score = lambda score_matrix, i, j: max( + 0, + score_matrix[i - 1][j - 1] + self.scoring(s1[i - 1], s2[j - 1]), + score_matrix[i - 1][j] + self.gap_score, + score_matrix[i][j - 1] + self.gap_score + ) + score_matrix = self._create_score_matrix(len(s1) + 1, len(s2) + 1, calc_score) + max_score = score_matrix.max() + + if not backtrace: + return max_score + + max_score_loc = numpy.argwhere(score_matrix == max_score) + paths = self._traceback(score_matrix, max_score_loc) + seq_alignments = [ + _path_to_alignment(score_matrix, path, s1, s2, gap) + for path in paths + ] + score = seq_alignments[0].score if len(seq_alignments) > 0 else 0 + return score, seq_alignments diff --git a/alignment/sequencealigner_test.py b/alignment/sequencealigner_test.py index 5340aeb..d648a30 100644 --- a/alignment/sequencealigner_test.py +++ b/alignment/sequencealigner_test.py @@ -7,6 +7,7 @@ GlobalSequenceAligner, StrictGlobalSequenceAligner, LocalSequenceAligner, + SmithWatermanAligner, Sequence, SimpleScoring ) @@ -175,3 +176,19 @@ def test_exact_left_partial_match_with_mismatch(self): class TestLocalSequenceAligner(CommonSequenceAlignerTests): def _align(self, first, second): return _local_align(first, second) + +class TestSmithWatermannAligner(CommonSequenceAlignerTests): + def _align(self, first, second): + scoring = SimpleScoring(DEFAULT_MATCH_SCORE, DEFAULT_MISMATCH_SCORE) + return _align(first, second, aligner=SmithWatermanAligner(scoring, DEFAULT_GAP_SCORE)) + +class TestSmithWatermannAlignerWithString(CommonSequenceAlignerTests): + def _align(self, first, second): + scoring = SimpleScoring(DEFAULT_MATCH_SCORE, DEFAULT_MISMATCH_SCORE) + aligner = SmithWatermanAligner(scoring, DEFAULT_GAP_SCORE) + return aligner.align( + Sequence(first), + Sequence(second), + backtrace=True, + gap='-' + ) From d8ee79742689a0ec461f20207209a3e7e7ab8e86 Mon Sep 17 00:00:00 2001 From: de-git Date: Sat, 6 May 2017 08:17:21 +0100 Subject: [PATCH 03/10] fixed overly greedy algorithm --- alignment/sequencealigner.py | 41 ++++++++++---------- alignment/sequencealigner_test.py | 63 ++++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 24 deletions(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index 88505a8..5ab4530 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -483,7 +483,6 @@ def __init__(self, scoring, gap_score): def _create_score_matrix(self, rows, cols, calc_score): score_matrix = numpy.zeros((rows, cols), int) - # Fill the scoring matrix. for i in range(1, rows): for j in range(1, cols): score_matrix[i][j] = calc_score(score_matrix, i, j) @@ -499,32 +498,30 @@ def _traceback(self, score_matrix, start_locs): next_pending_roots = [] for n in pending_roots: i, j = n.data - moves = self._next_moves(score_matrix, i, j) - if len(moves) == 0: + next_loc = self._next_loc(score_matrix, i, j) + if next_loc is None: cur_roots.append(n) else: - next_pending_roots.extend([ - LinkedListNode(loc, n) - for loc in moves - ]) + next_pending_roots.append( + LinkedListNode(next_loc, n) + ) pending_roots = next_pending_roots return cur_roots - def _next_moves(self, score_matrix, i, j): - diag = score_matrix[i - 1][j - 1] - up = score_matrix[i - 1][j] - left = score_matrix[i][j - 1] - max_score = max(diag, up, left) - moves = [] - if max_score == 0: - return moves - if diag == max_score: - moves.append((i - 1, j - 1)) - if up == max_score: - moves.append((i - 1, j)) - if left == max_score: - moves.append((i, j - 1)) - return moves + def _next_loc(self, score_matrix, i, j): + diag_score = score_matrix[i - 1][j - 1] + up_score = score_matrix[i - 1][j] + left_score = score_matrix[i][j - 1] + max_score = max(diag_score, up_score, left_score) + if max_score == 0 or diag_score == 0: + return None + if diag_score == max_score: + return i - 1, j - 1 + elif up_score == max_score: + return i - 1, j + elif left_score == max_score: + return i, j - 1 + return None def align(self, s1, s2, backtrace=True, gap=GAP_CODE): calc_score = lambda score_matrix, i, j: max( diff --git a/alignment/sequencealigner_test.py b/alignment/sequencealigner_test.py index 86c8461..3aaa780 100644 --- a/alignment/sequencealigner_test.py +++ b/alignment/sequencealigner_test.py @@ -124,6 +124,30 @@ class TestGlobalSequenceAligner(CommonSequenceAlignerTests): def _align(self, first, second): return _global_align(first, second) + def test_multiple_alignments(self): + score, alignments = self._align('xabcabcy', 'abc') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a b c' + assert str(alignments[0].second) == 'a b c' + assert alignments[0].percentIdentity() == 3 / 3 * 100.0 + assert alignments[0].percentSimilarity() == 3 / 3 * 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 3 + assert alignments[0].score == score + + def test_shortest_path_alignment(self): + # this tests that it doesn't pick longer paths on the way + # (e.g. goes up instead of diagonally) + score, alignments = self._align('aac', 'bac') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a a c' + assert str(alignments[0].second) == 'b a c' + assert alignments[0].percentIdentity() == 2 / 3 * 100.0 + assert alignments[0].percentSimilarity() == 2 / 3 * 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_MISMATCH_SCORE + assert alignments[0].score == score + class TestStrictGlobalSequenceAligner(CommonSequenceAlignerTests): def _align(self, first, second): @@ -173,17 +197,52 @@ def test_exact_left_partial_match_with_mismatch(self): assert score == DEFAULT_MATCH_SCORE * 2 + DEFAULT_MISMATCH_SCORE + DEFAULT_GAP_SCORE * 2 assert alignments[0].score == score + def test_multiple_alignments(self): + score, alignments = self._align('xabcabcy', 'abc') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'x a b c a b c y' + assert str(alignments[0].second) == '- a b c - - - -' + assert alignments[0].percentIdentity() == 3 / 8 * 100.0 + assert alignments[0].percentSimilarity() == 3 / 8 * 100.0 + assert alignments[0].percentGap() == 5 / 8 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 3 + DEFAULT_GAP_SCORE * 5 + assert alignments[0].score == score + class TestLocalSequenceAligner(CommonSequenceAlignerTests): def _align(self, first, second): return _local_align(first, second) -class TestSmithWatermannAligner(CommonSequenceAlignerTests): + def test_multiple_alignments(self): + score, alignments = self._align('xabcabcy', 'abc') + assert len(alignments) == 2 + assert str(alignments[0].first) == 'a b c' + assert str(alignments[0].second) == 'a b c' + assert alignments[0].percentIdentity() == 3 / 3 * 100.0 + assert alignments[0].percentSimilarity() == 3 / 3 * 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 3 + assert alignments[0].score == score + + def test_shortest_path_alignment(self): + # this tests that it doesn't pick longer paths on the way + # (e.g. goes up instead of diagonally) + score, alignments = self._align('aac', 'bac') + assert len(alignments) == 1 + assert str(alignments[0].first) == 'a c' + assert str(alignments[0].second) == 'a c' + assert alignments[0].percentIdentity() == 3 / 3 * 100.0 + assert alignments[0].percentSimilarity() == 3 / 3 * 100.0 + assert alignments[0].percentGap() == 0.0 + assert score == DEFAULT_MATCH_SCORE * 2 + assert alignments[0].score == score + +class TestSmithWatermannAligner(TestLocalSequenceAligner): def _align(self, first, second): scoring = SimpleScoring(DEFAULT_MATCH_SCORE, DEFAULT_MISMATCH_SCORE) return _align(first, second, aligner=SmithWatermanAligner(scoring, DEFAULT_GAP_SCORE)) -class TestSmithWatermannAlignerWithString(CommonSequenceAlignerTests): +class TestSmithWatermannAlignerWithString(TestSmithWatermannAligner): def _align(self, first, second): scoring = SimpleScoring(DEFAULT_MATCH_SCORE, DEFAULT_MISMATCH_SCORE) aligner = SmithWatermanAligner(scoring, DEFAULT_GAP_SCORE) From e37227551a5a8ea1c1d5fd340e3ad38a40e50eee Mon Sep 17 00:00:00 2001 From: de-git Date: Sat, 6 May 2017 08:52:41 +0100 Subject: [PATCH 04/10] use original computeAlignmentMatrix --- alignment/sequencealigner.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index 5ab4530..8e20b02 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -480,13 +480,24 @@ def __init__(self, scoring, gap_score): self.scoring = scoring self.gap_score = gap_score - def _create_score_matrix(self, rows, cols, calc_score): - score_matrix = numpy.zeros((rows, cols), int) + def computeAlignmentMatrix(self, first, second): + m = len(first) + 1 + n = len(second) + 1 + f = numpy.zeros((m, n), int) + for i in range(1, m): + for j in range(1, n): + # Match elements. + ab = f[i - 1, j - 1] \ + + self.scoring(first[i - 1], second[j - 1]) + + # Gap on sequenceA. + ga = f[i, j - 1] + self.gap_score + + # Gap on sequenceB. + gb = f[i - 1, j] + self.gap_score - for i in range(1, rows): - for j in range(1, cols): - score_matrix[i][j] = calc_score(score_matrix, i, j) - return score_matrix + f[i, j] = max(0, max(ab, max(ga, gb))) + return f def _traceback(self, score_matrix, start_locs): pending_roots = [ @@ -524,13 +535,7 @@ def _next_loc(self, score_matrix, i, j): return None def align(self, s1, s2, backtrace=True, gap=GAP_CODE): - calc_score = lambda score_matrix, i, j: max( - 0, - score_matrix[i - 1][j - 1] + self.scoring(s1[i - 1], s2[j - 1]), - score_matrix[i - 1][j] + self.gap_score, - score_matrix[i][j - 1] + self.gap_score - ) - score_matrix = self._create_score_matrix(len(s1) + 1, len(s2) + 1, calc_score) + score_matrix = self.computeAlignmentMatrix(s1, s2) max_score = score_matrix.max() if not backtrace: From 2e3af1bdf4d47dc72b9395928819748a788689a2 Mon Sep 17 00:00:00 2001 From: de-git Date: Sat, 6 May 2017 10:13:04 +0100 Subject: [PATCH 05/10] combine max call --- alignment/sequencealigner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index 8e20b02..7b1590e 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -496,7 +496,7 @@ def computeAlignmentMatrix(self, first, second): # Gap on sequenceB. gb = f[i - 1, j] + self.gap_score - f[i, j] = max(0, max(ab, max(ga, gb))) + f[i, j] = max(0, ab, ga, gb) return f def _traceback(self, score_matrix, start_locs): From 0cf9005db9d7a77047c84c2da548d755ada432a2 Mon Sep 17 00:00:00 2001 From: de-git Date: Sun, 7 May 2017 10:27:30 +0100 Subject: [PATCH 06/10] convert to depth first traceback --- alignment/sequencealigner.py | 47 +++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index 7b1590e..150402e 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -1,4 +1,5 @@ from itertools import zip_longest +from collections import deque from six import text_type from six.moves import range @@ -24,18 +25,22 @@ def __call__(self, firstElement, secondElement): return 0 -class SimpleScoring(Scoring): +# class SimpleScoring(Scoring): - def __init__(self, matchScore, mismatchScore): - self.matchScore = matchScore - self.mismatchScore = mismatchScore +# def __init__(self, matchScore, mismatchScore): +# self.matchScore = matchScore +# self.mismatchScore = mismatchScore - def __call__(self, firstElement, secondElement): - if firstElement == secondElement: - return self.matchScore - else: - return self.mismatchScore +# def __call__(self, firstElement, secondElement): +# if firstElement == secondElement: +# return self.matchScore +# else: +# return self.mismatchScore +def SimpleScoring(matchScore, mismatchScore): + return lambda firstElement, secondElement: ( + matchScore if firstElement == secondElement else mismatchScore + ) # Alignment ------------------------------------------------------------------- @@ -500,23 +505,21 @@ def computeAlignmentMatrix(self, first, second): return f def _traceback(self, score_matrix, start_locs): - pending_roots = [ + pending_roots = deque([ LinkedListNode(tuple(loc)) for loc in start_locs - ] + ]) cur_roots = [] while len(pending_roots) > 0: - next_pending_roots = [] - for n in pending_roots: - i, j = n.data - next_loc = self._next_loc(score_matrix, i, j) - if next_loc is None: - cur_roots.append(n) - else: - next_pending_roots.append( - LinkedListNode(next_loc, n) - ) - pending_roots = next_pending_roots + n = pending_roots.pop() + i, j = n.data + next_loc = self._next_loc(score_matrix, i, j) + if next_loc is None: + cur_roots.append(n) + else: + pending_roots.append( + LinkedListNode(next_loc, n) + ) return cur_roots def _next_loc(self, score_matrix, i, j): From 33a85d4fc23d70b38360fac30aeacb1da333cec4 Mon Sep 17 00:00:00 2001 From: de-git Date: Sun, 7 May 2017 10:28:22 +0100 Subject: [PATCH 07/10] convert traceback to generator --- alignment/sequencealigner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index 150402e..c70da3e 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -509,18 +509,16 @@ def _traceback(self, score_matrix, start_locs): LinkedListNode(tuple(loc)) for loc in start_locs ]) - cur_roots = [] while len(pending_roots) > 0: n = pending_roots.pop() i, j = n.data next_loc = self._next_loc(score_matrix, i, j) if next_loc is None: - cur_roots.append(n) + yield n else: pending_roots.append( LinkedListNode(next_loc, n) ) - return cur_roots def _next_loc(self, score_matrix, i, j): diag_score = score_matrix[i - 1][j - 1] From b9eb888df27918664add2f97abece14d90fbd713 Mon Sep 17 00:00:00 2001 From: de-git Date: Sun, 7 May 2017 10:52:46 +0100 Subject: [PATCH 08/10] go left and up if the scores are the same (and are better than going diagonally) --- alignment/sequencealigner.py | 26 ++++++++++++++------------ alignment/sequencealigner_test.py | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index c70da3e..7838050 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -512,28 +512,30 @@ def _traceback(self, score_matrix, start_locs): while len(pending_roots) > 0: n = pending_roots.pop() i, j = n.data - next_loc = self._next_loc(score_matrix, i, j) - if next_loc is None: + next_locs = self._next_locs(score_matrix, i, j) + if len(next_locs) == 0: yield n else: - pending_roots.append( + pending_roots.extend([ LinkedListNode(next_loc, n) - ) + for next_loc in next_locs + ]) - def _next_loc(self, score_matrix, i, j): + def _next_locs(self, score_matrix, i, j): diag_score = score_matrix[i - 1][j - 1] up_score = score_matrix[i - 1][j] left_score = score_matrix[i][j - 1] max_score = max(diag_score, up_score, left_score) if max_score == 0 or diag_score == 0: - return None + return [] if diag_score == max_score: - return i - 1, j - 1 - elif up_score == max_score: - return i - 1, j - elif left_score == max_score: - return i, j - 1 - return None + return [(i - 1, j - 1)] + locs = [] + if up_score == max_score: + locs.append((i - 1, j)) + if left_score == max_score: + locs.append((i, j - 1)) + return locs def align(self, s1, s2, backtrace=True, gap=GAP_CODE): score_matrix = self.computeAlignmentMatrix(s1, s2) diff --git a/alignment/sequencealigner_test.py b/alignment/sequencealigner_test.py index 3aaa780..5ee1962 100644 --- a/alignment/sequencealigner_test.py +++ b/alignment/sequencealigner_test.py @@ -224,6 +224,22 @@ def test_multiple_alignments(self): assert score == DEFAULT_MATCH_SCORE * 3 assert alignments[0].score == score + def test_multiple_gap_alignments(self): + score, alignments = self._align('abxc', 'axbc') + assert len(alignments) == 2 + assert ( + set([(str(a.first), str(a.second)) for a in alignments]) == + set([ + ('a b x - c', 'a - x b c'), + ('a - b x c', 'a x b - c') + ]) + ) + assert alignments[0].percentIdentity() == 3 / 5 * 100.0 + assert alignments[0].percentSimilarity() == 3 / 5 * 100.0 + assert alignments[0].percentGap() == 2 / 5 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 3 + DEFAULT_GAP_SCORE * 2 + assert alignments[0].score == score + def test_shortest_path_alignment(self): # this tests that it doesn't pick longer paths on the way # (e.g. goes up instead of diagonally) From ba993cfedea22a14df49d9440859b553d71a8499 Mon Sep 17 00:00:00 2001 From: de-git Date: Sun, 7 May 2017 11:11:04 +0100 Subject: [PATCH 09/10] introduced limit --- alignment/sequencealigner.py | 6 ++++-- alignment/sequencealigner_test.py | 32 +++++++++++++++++++++++++------ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index 7838050..cc4a3e4 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -1,4 +1,4 @@ -from itertools import zip_longest +from itertools import zip_longest, islice from collections import deque from six import text_type @@ -537,7 +537,7 @@ def _next_locs(self, score_matrix, i, j): locs.append((i, j - 1)) return locs - def align(self, s1, s2, backtrace=True, gap=GAP_CODE): + def align(self, s1, s2, backtrace=True, gap=GAP_CODE, limit=None): score_matrix = self.computeAlignmentMatrix(s1, s2) max_score = score_matrix.max() @@ -546,6 +546,8 @@ def align(self, s1, s2, backtrace=True, gap=GAP_CODE): max_score_loc = numpy.argwhere(score_matrix == max_score) paths = self._traceback(score_matrix, max_score_loc) + if limit is not None: + paths = islice(paths, limit) seq_alignments = [ _path_to_alignment(score_matrix, path, s1, s2, gap) for path in paths diff --git a/alignment/sequencealigner_test.py b/alignment/sequencealigner_test.py index 5ee1962..34ecb89 100644 --- a/alignment/sequencealigner_test.py +++ b/alignment/sequencealigner_test.py @@ -20,12 +20,14 @@ def _align( first, second, - aligner): + aligner, + **kwargs): vocab = Vocabulary() score, encodeds = aligner.align( vocab.encodeSequence(Sequence(first)), vocab.encodeSequence(Sequence(second)), - backtrace=True + backtrace=True, + **kwargs ) return score, [vocab.decodeSequenceAlignment(encoded) for encoded in encodeds] @@ -254,17 +256,35 @@ def test_shortest_path_alignment(self): assert alignments[0].score == score class TestSmithWatermannAligner(TestLocalSequenceAligner): - def _align(self, first, second): + def _align(self, first, second, **kwargs): scoring = SimpleScoring(DEFAULT_MATCH_SCORE, DEFAULT_MISMATCH_SCORE) - return _align(first, second, aligner=SmithWatermanAligner(scoring, DEFAULT_GAP_SCORE)) + return _align( + first, second, aligner=SmithWatermanAligner(scoring, DEFAULT_GAP_SCORE), **kwargs) + + def test_limit_alignments(self): + score, alignments = self._align('abxc', 'axbc', limit=1) + assert len(alignments) == 1 + assert ( + (str(alignments[0].first), str(alignments[0].second)) in + set([ + ('a b x - c', 'a - x b c'), + ('a - b x c', 'a x b - c') + ]) + ) + assert alignments[0].percentIdentity() == 3 / 5 * 100.0 + assert alignments[0].percentSimilarity() == 3 / 5 * 100.0 + assert alignments[0].percentGap() == 2 / 5 * 100.0 + assert score == DEFAULT_MATCH_SCORE * 3 + DEFAULT_GAP_SCORE * 2 + assert alignments[0].score == score class TestSmithWatermannAlignerWithString(TestSmithWatermannAligner): - def _align(self, first, second): + def _align(self, first, second, **kwargs): scoring = SimpleScoring(DEFAULT_MATCH_SCORE, DEFAULT_MISMATCH_SCORE) aligner = SmithWatermanAligner(scoring, DEFAULT_GAP_SCORE) return aligner.align( Sequence(first), Sequence(second), backtrace=True, - gap='-' + gap='-', + **kwargs ) From 934dd76444a3ffc89f31fb2b5a007c316c923a47 Mon Sep 17 00:00:00 2001 From: de-git Date: Mon, 8 May 2017 13:45:28 +0100 Subject: [PATCH 10/10] reversed unintended change to SimpleScoring --- alignment/sequencealigner.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/alignment/sequencealigner.py b/alignment/sequencealigner.py index cc4a3e4..9d1ae27 100644 --- a/alignment/sequencealigner.py +++ b/alignment/sequencealigner.py @@ -25,22 +25,18 @@ def __call__(self, firstElement, secondElement): return 0 -# class SimpleScoring(Scoring): +class SimpleScoring(Scoring): -# def __init__(self, matchScore, mismatchScore): -# self.matchScore = matchScore -# self.mismatchScore = mismatchScore + def __init__(self, matchScore, mismatchScore): + self.matchScore = matchScore + self.mismatchScore = mismatchScore -# def __call__(self, firstElement, secondElement): -# if firstElement == secondElement: -# return self.matchScore -# else: -# return self.mismatchScore + def __call__(self, firstElement, secondElement): + if firstElement == secondElement: + return self.matchScore + else: + return self.mismatchScore -def SimpleScoring(matchScore, mismatchScore): - return lambda firstElement, secondElement: ( - matchScore if firstElement == secondElement else mismatchScore - ) # Alignment -------------------------------------------------------------------