Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pip-log.txt
# Unit test / coverage reports
.coverage
.tox
.cache
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: Which tool creates a .cache folder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was pytest, it added test results to that folder - I wasn't quite sure about that either.


# Translations
*.mo
Expand Down
2 changes: 2 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[MESSAGES CONTROL]
disable=C0111,C0103
149 changes: 148 additions & 1 deletion alignment/sequencealigner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from itertools import zip_longest, islice
from collections import deque

from six import text_type
from six.moves import range

Expand All @@ -9,7 +12,7 @@
from abc import abstractmethod

from .sequence import GAP_CODE
from .sequence import EncodedSequence
from .sequence import EncodedSequence, Sequence


# Scoring ---------------------------------------------------------------------
Expand Down Expand Up @@ -403,3 +406,147 @@ def backtraceFrom(self, first, second, f, i, j, alignments, alignment):
self.backtraceFrom(first, second, f, i - 1, j,
alignments, alignment)
alignment.pop()

class LinkedListNode(object):
def __init__(self, data, next_node=None):
self.data = data
self.next_node = next_node

def __str__(self):
if self.next_node is not None:
return str(self.data) + ' -> ' + str(self.next_node)
else:
return str(self.data)

def __iter__(self):
yield self.data
next_node = self.next_node
while next_node is not None:
yield next_node.data
next_node = next_node.next_node

def _path_positions_to_indices(positions):
return [
p - 1 if p_next is None or p_next != p else None
for p, p_next in zip_longest(positions, positions[1:])
]

def _path_to_indices(path):
positions1, positions2 = zip(*path)
return (
_path_positions_to_indices(positions1),
_path_positions_to_indices(positions2)
)

def _map_indices(indices, seq, default_value=None):
return [seq[i] if i is not None else default_value for i in indices]

def _path_to_alignment(score_matrix, path, s1, s2, gap=GAP_CODE):
indices1, indices2 = _path_to_indices(path)
matched_seq1 = _map_indices(indices1, s1)
matched_seq2 = _map_indices(indices2, s2)
cum_scores = [score_matrix[i][j] for i, j in path]
scores = [score2 - score1 for score1, score2 in zip([0] + cum_scores, cum_scores)]
identical_count = sum([
1 if i1 is not None and i2 is not None and s1[i1] == s2[i2] else 0
for i1, i2 in zip(indices1, indices2)
])
similar_count = sum([
1 if score > 0 else 0
for score in scores
])
gap_count = sum([
1 if i1 is None or i2 is None else 0
for i1, i2 in zip(indices1, indices2)
])
total_score = sum(scores)
if isinstance(s1, EncodedSequence):
first = EncodedSequence([c or gap for c in matched_seq1], id=s1.id)
second = EncodedSequence([c or gap for c in matched_seq2], id=s2.id)
else:
first = Sequence([c or gap for c in matched_seq1])
second = Sequence([c or gap for c in matched_seq2])
seq_alignment = SequenceAlignment(first, second)
seq_alignment.scores = scores
seq_alignment.score = total_score
seq_alignment.identicalCount = identical_count
seq_alignment.similarCount = similar_count
seq_alignment.gapCount = gap_count
seq_alignment.first_indices = indices1
seq_alignment.second_indices = indices2
return seq_alignment

class SmithWatermanAligner(object):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just another implementation of an already provided algorithm. Furthermore, the implementation does not follow any of the interfaces/principles currently provided by this library. (It just reuses some method names such as computeAlignmentMatrix.)

I'm against turning this library into a bag of separately-maintained alignment-related stuff. So, I say let's address the issues that you brought up and maybe release a backwards incompatible version 2.0 with a completely new interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just temporarily. I wanted to leave the old implementations there while this one is in progress. Once we are happy with one implementation the others should be changed accordingly (I had something related to that in the TODO section). And the SequenceAlignment should change as well.

def __init__(self, scoring, gap_score):
self.scoring = scoring
self.gap_score = gap_score

def computeAlignmentMatrix(self, first, second):
m = len(first) + 1
n = len(second) + 1
f = numpy.zeros((m, n), int)
for i in range(1, m):
for j in range(1, n):
# Match elements.
ab = f[i - 1, j - 1] \
+ self.scoring(first[i - 1], second[j - 1])

# Gap on sequenceA.
ga = f[i, j - 1] + self.gap_score

# Gap on sequenceB.
gb = f[i - 1, j] + self.gap_score

f[i, j] = max(0, ab, ga, gb)
return f

def _traceback(self, score_matrix, start_locs):
pending_roots = deque([
LinkedListNode(tuple(loc))
for loc in start_locs
])
while len(pending_roots) > 0:
n = pending_roots.pop()
i, j = n.data
next_locs = self._next_locs(score_matrix, i, j)
if len(next_locs) == 0:
yield n
else:
pending_roots.extend([
LinkedListNode(next_loc, n)
for next_loc in next_locs
])

def _next_locs(self, score_matrix, i, j):
diag_score = score_matrix[i - 1][j - 1]
up_score = score_matrix[i - 1][j]
left_score = score_matrix[i][j - 1]
max_score = max(diag_score, up_score, left_score)
if max_score == 0 or diag_score == 0:
return []
if diag_score == max_score:
return [(i - 1, j - 1)]
locs = []
if up_score == max_score:
locs.append((i - 1, j))
if left_score == max_score:
locs.append((i, j - 1))
return locs

def align(self, s1, s2, backtrace=True, gap=GAP_CODE, limit=None):
score_matrix = self.computeAlignmentMatrix(s1, s2)
max_score = score_matrix.max()

if not backtrace:
return max_score

max_score_loc = numpy.argwhere(score_matrix == max_score)
paths = self._traceback(score_matrix, max_score_loc)
if limit is not None:
paths = islice(paths, limit)
seq_alignments = [
_path_to_alignment(score_matrix, path, s1, s2, gap)
for path in paths
]
score = seq_alignments[0].score if len(seq_alignments) > 0 else 0
return score, seq_alignments
Loading