Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions tests/unit_tests_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,18 @@ def test_no_gff_files(self):
]

with contextlib.redirect_stdout(stdout):
for gff_config in gff_configs:
config['gff_files'] = gff_config
config.validate_paths()
config_return = config.get_gff_config()

self.assertRegex(stdout.getvalue(), r".*(No GFF files).*")
self.assertEqual(config_return, {})
stdout.truncate(0)
# Skip countdown timer for each config
with patch('tiny.rna.configuration.time.sleep') as countdown:
for gff_config in gff_configs:
config['gff_files'] = gff_config
config.validate_paths()
config_return = config.get_gff_config()

self.assertRegex(stdout.getvalue(), r".*(No GFF files).*")
self.assertEqual(config_return, {})
stdout.truncate(0)

countdown.assert_has_calls([call(1)] * 6 * len(gff_configs))

"""Does PathsFile check for missing files for single entry parameters?"""

Expand Down
58 changes: 55 additions & 3 deletions tests/unit_tests_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ def setUpClass(self):

def make_feature_for_interval_test(self, iv_rule, feat_id, chrom, strand: str, start, stop):
feat_iv = HTSeq.GenomicInterval(chrom, start, stop, strand)
rules = [
dict(deepcopy(rules_template[0]), Overlap=iv_rule, Identity=('N/A', 'N/A'), nt5end='all', Length='all')]
rules = [dict(deepcopy(rules_template[0]), Overlap=iv_rule, Identity=('*', '*'), nt5end='*', Length='*')]
fs = FeatureSelector(rules, LibraryStats(normalize_by_hits=True))

# Feature with specified coordinates, matching rule 0 with hierarchy 0 and the appropriate selector for iv_rule
match_tuple = tuple(fs.build_interval_selectors(feat_iv, [(0, 0, iv_rule)]))
selectors = fs.build_interval_selectors(feat_iv, [(0, 0, iv_rule)])
match_tuple = (selectors[feat_iv][0],)
feat = {(feat_id, strand_to_bool(strand), match_tuple)}

return feat, fs
Expand Down Expand Up @@ -94,6 +94,15 @@ def test_HTSeq_antisense_iv_slice(self):
self.assertEqual(matches_with_cooridnates[2][0], HTSeq.GenomicInterval("I", 15, 20, '-'))
self.assertEqual(matches_with_cooridnates[2][0], HTSeq.GenomicInterval("I", 15, 20, '-'))

"""Do GenomicArraysOfSets support negative start/end positions? No, they don't."""

def test_HTSeq_GenomicArrayOfSets_negative_indexing(self):
gas = HTSeq.GenomicArrayOfSets("auto", stranded=True)
ivn = HTSeq.GenomicInterval("I", -10, 10, '+')

with self.assertRaises(IndexError):
gas[ivn] += "Negative"

"""Does assign_features correctly handle alignments with zero feature matches?"""

def test_assign_features_no_match(self):
Expand Down Expand Up @@ -383,6 +392,49 @@ def test_wildcard_identities(self):

self.assertDictEqual(actual, expected)

"""Does FeatureSelector build both shifted and unshifted selectors and group them by resulting interval?"""

def test_build_interval_selectors_grouping(self):
fs = FeatureSelector(deepcopy(rules_template), LibraryStats())
iv = HTSeq.GenomicInterval('I', 10, 20, '+')

match_tuples = [('n/a', 'n/a', 'partial'),
('n/a', 'n/a', 'full'),
('n/a', 'n/a', 'exact'),
('n/a', 'n/a', "5' anchored"),
('n/a', 'n/a', "3' anchored"),
# iv_shifted_1 Shift values:
('n/a', 'n/a', 'partial, -5, 5'), # 5': -5 3': 5
('n/a', 'n/a', 'full, -5, 5'), # 5': -5 3': 5
# iv_shifted_2
('n/a', 'n/a', 'exact, -10, 10'), # 5': -10 3': 10
# iv_shifted_3
('n/a', 'n/a', "5' anchored, -1, -1"), # 5': -1 3': -1
('n/a', 'n/a', "3' anchored, -1, -1")] # 5': -1 3': -1

iv_shifted_1 = HTSeq.GenomicInterval('I', 5, 25, '+')
iv_shifted_2 = HTSeq.GenomicInterval('I', 0, 30, '+')
iv_shifted_3 = HTSeq.GenomicInterval('I', 9, 19, '+')

result = fs.build_interval_selectors(iv, match_tuples)

# Correct number of groups
self.assertEqual(len(result), 4)

# Correct number of selectors per group
self.assertEqual(len(result[iv]), 5)
self.assertEqual(len(result[iv_shifted_1]), 2)
self.assertEqual(len(result[iv_shifted_2]), 1)
self.assertEqual(len(result[iv_shifted_3]), 2)

# Agreement between group interval and selector's interval
for iv_s, tuples in result.items():
group_iv = (iv_s.start, iv_s.end)
for match in tuples:
selector_iv = (match[2].start, match[2].end)
self.assertEqual(group_iv, selector_iv)



if __name__ == '__main__':
unittest.main()
43 changes: 37 additions & 6 deletions tests/unit_tests_matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ def test_feature_selector_interval_build(self):
('n/a', 'n/a', 'full'),
('n/a', 'n/a', 'exact'),
('n/a', 'n/a', "5' anchored"),
('n/a', 'n/a', "3' anchored")]
('n/a', 'n/a', "3' anchored"),
('n/a', 'n/a', "5'anchored"), # spaces should be optional
('n/a', 'n/a', "3'anchored")]

result = fs.build_interval_selectors(iv, match_tuples)

self.assertIsInstance(result[0][2], IntervalPartialMatch)
self.assertIsInstance(result[1][2], IntervalFullMatch)
self.assertIsInstance(result[2][2], IntervalExactMatch)
self.assertIsInstance(result[3][2], Interval5pMatch)
self.assertIsInstance(result[4][2], Interval3pMatch)
self.assertEqual(len(result), 1)
self.assertIsInstance(result[iv][0][2], IntervalPartialMatch)
self.assertIsInstance(result[iv][1][2], IntervalFullMatch)
self.assertIsInstance(result[iv][2][2], IntervalExactMatch)
self.assertIsInstance(result[iv][3][2], Interval5pMatch)
self.assertIsInstance(result[iv][4][2], Interval3pMatch)

"""Are interval selectors hashable and properly compare for equality?
This is important for storing feature records in GenomicArraysOfSets"""
Expand Down Expand Up @@ -108,6 +111,34 @@ def test_NumericalMatch_rule_validation(self):
with self.assertRaisesRegex(AssertionError, errmsg):
NumericalMatch(defn)

"""Does IntervalSelector properly validate the shift parameter?"""

def test_IntervalSelector_shift_validation(self):
good_defs = ["1,1", "2, 2", " -3 ,-3", "0,0 ", " -0 , -0 "]
bad_defs = ["1,", ",2", "3", "4 4", " ", ",", ", "]

for defn in good_defs:
IntervalSelector.validate_shift_params(defn)

for defn in bad_defs:
errmsg = f'Invalid overlap shift parameters: "{defn}"'
with self.assertRaisesRegex(AssertionError, errmsg):
IntervalSelector.validate_shift_params(defn)

"""Does IntervalSelector properly raise IllegalShiftErrors?"""

def test_IntervalSelector_illegal_shift(self):
iv = HTSeq.GenomicInterval('n/a', 0, 5, '+')

with self.assertRaisesRegex(IllegalShiftError, "null interval"):
IntervalSelector.get_shifted_interval("0,-5", iv)

with self.assertRaisesRegex(IllegalShiftError, "inverted interval"):
IntervalSelector.get_shifted_interval("10, 0", iv)

with self.assertRaisesRegex(IllegalShiftError, "negative start interval"):
IntervalSelector.get_shifted_interval("-1, 0", iv)


if __name__ == '__main__':
unittest.main()
54 changes: 36 additions & 18 deletions tiny/rna/counter/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,40 +201,58 @@ def build_selectors(rules_table) -> List[dict]:

@staticmethod
def build_interval_selectors(iv: 'HTSeq.GenomicInterval', match_tuples: List[unbuilt_match_tuple]):
"""Builds partial/full/exact/3' anchored/5' anchored interval selectors
"""Builds partial/full/exact/5'anchored/3'anchored interval selectors

Unlike build_selectors() and build_inverted_identities(), this function
is not called at construction time. Instead, it is called when finalizing
match-tuples in reference parsers. This is because interval selectors are
created for each feature (requiring start/stop/strand to be known) for
each of the feature's identity matches (each match-tuple).
match-tuples in reference parsers. This is because the intervals of features
passing Stage 1 selection, and the specific rules they matched, must be known.

Index 2 of each match tuple is from the Overlap column of the Features Sheet.
It defines the desired selector and, optionally, a shift parameter for shifting
the 5' and/or 3' ends of the interval. Its syntax is:
selector,M,N
M = signed shift value for 5' end
N = signed shift value for 3' end

Args:
iv: The interval of the feature from which each selector is built
match_tuples: A list of tuples representing the feature's identity
matches. Each tuple index 2 defines and is replaced by the selector.
match_tuples: A list of tuples representing the feature's Stage 1 matches
"""

built_selectors = {}
cache = {}
selector_factory = {
'full': lambda: IntervalFullMatch(iv),
'exact': lambda: IntervalExactMatch(iv),
'partial': lambda: IntervalPartialMatch(iv),
'anchored': lambda: IntervalAnchorMatch(iv),
"5' anchored": lambda: Interval5pMatch(iv) if iv.strand in ('+', '-') else IntervalAnchorMatch(iv),
"3' anchored": lambda: Interval3pMatch(iv) if iv.strand in ('+', '-') else IntervalAnchorMatch(iv),
'full': lambda x: IntervalFullMatch(x),
'exact': lambda x: IntervalExactMatch(x),
'partial': lambda x: IntervalPartialMatch(x),
'anchored': lambda x: IntervalAnchorMatch(x),
"5'anchored": lambda x: Interval5pMatch(x) if iv.strand in ('+', '-') else IntervalAnchorMatch(x),
"3'anchored": lambda x: Interval3pMatch(x) if iv.strand in ('+', '-') else IntervalAnchorMatch(x),
}

for i in range(len(match_tuples)):
matches_by_interval = defaultdict(list)
for i, match in enumerate(match_tuples):
try:
match = match_tuples[i]
# Split optional shift parameters from definition
defn = re.split(r'\s*,\s*', match[2], 1)
selector, shift = defn[0], defn[1:]
selector = selector.replace(' ', '')

# Shift the interval before constructing if shift parameters were provided
match_iv = IntervalSelector.get_shifted_interval(shift[0], iv) if shift else iv
cache_key = (selector, match_iv)

# Cache instances to prevent duplicates for the same match type on the same iv
selector = built_selectors.setdefault(match[2], selector_factory[match[2]]())
match_tuples[i] = (match[0], match[1], selector)
selector_obj = cache.setdefault(cache_key, selector_factory[selector](match_iv))

built_match_tuple = (match[0], match[1], selector_obj)
matches_by_interval[match_iv].append(built_match_tuple)
except KeyError:
raise ValueError(f'Invalid overlap selector: "{match_tuples[i][2]}"')
except IllegalShiftError:
pass # Drop offending match tuple

return match_tuples
return matches_by_interval

@staticmethod
def build_inverted_identities(rules_table) -> Dict[Tuple[str, str], List[int]]:
Expand Down
49 changes: 39 additions & 10 deletions tiny/rna/counter/hts_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,17 @@ def _finalize_aliases(self):
self.alias = {feat: tuple(sorted(aliases, key=str.lower)) for feat, aliases in self.alias.items()}

def _finalize_features(self):
"""Assigns matches to their corresponding intervals by populating GenomicArray with match tuples"""
"""Assigns matches to their corresponding intervals by populating GenomicArray with match tuples

Each feature that is handled here has matched a rule, and its matches might be classified
into subsets depending on the rules it matched. However, the feature's interval
is the same regardless of the match classification. This interval might be
discontinuous, so we start by merging any of its sub-intervals that are
adjacent to reduce loop count in Stage 2 and 3 selection. Each group of
classified matches is then added to the GenomicArray under this shared
interval (but the interval is not necessarily shared by matches that
have a shift parameter in their overlap selector...)
"""

for root_id, unmerged_sub_ivs in self.intervals.items():
merged_sub_ivs = self._merge_adjacent_subintervals(unmerged_sub_ivs)
Expand All @@ -728,13 +738,29 @@ def _finalize_features(self):
self.chrom_vector_setdefault(merged_sub_ivs[0].chrom)
continue

tagged_matches = self.matches[tagged_id]
self._add_subinterval_matches(tagged_id, merged_sub_ivs, tagged_matches)

def _add_subinterval_matches(self, tagged_id: tuple, sub_ivs: list, matches: set):
"""Adds the classified group of matches to the GenomicArray under each of the
feature's sub-intervals

These sub-intervals might be further subset if the matches define a shift
parameter. The shift operation has to be applied to each of the feature's
sub-intervals before the given match can be added to the GenomicArray. The
shifted interval must match the interval that the overlap selector expects.
"""

for sub_iv in sub_ivs:
# Build interval selectors for this
matches_by_shifted_iv = self.selector.build_interval_selectors(sub_iv, matches)
strand = self.map_strand(sub_iv.strand)

for shifted_iv, built_matches in matches_by_shifted_iv.items():
# Sort match tuples by rank for more efficient feature selection
sorted_matches = sorted(self.matches[tagged_id], key=lambda x: x[1])
sorted_matches = sorted(built_matches, key=lambda x: x[1])
self.feats[shifted_iv] += (tagged_id, strand, tuple(sorted_matches))

for sub_iv in merged_sub_ivs:
finalized_match_tuples = self.selector.build_interval_selectors(sub_iv, sorted_matches.copy())
strand = self.map_strand(sub_iv.strand)
self.feats[sub_iv] += (tagged_id, strand, tuple(finalized_match_tuples))

@staticmethod
def _merge_adjacent_subintervals(unmerged_sub_ivs: List[HTSeq.GenomicInterval]) -> list:
Expand Down Expand Up @@ -794,7 +820,7 @@ def get_matches(self):
key=lambda x: x[1]['Hierarchy']
)]

def add_reference_seq(self, seq_id, seq_len, unbuilt_match_tuples):
def add_reference_seq(self, seq_id, seq_len, matches):

# Features are classified in Reference Tables (Stage 1 selection)
# For compatibility, use the seq_id with an empty classifier (index 1)
Expand All @@ -803,7 +829,10 @@ def add_reference_seq(self, seq_id, seq_len, unbuilt_match_tuples):

for strand in ('+', '-'):
iv = HTSeq.GenomicInterval(seq_id, 0, seq_len, strand)
match_tuples = self.selector.build_interval_selectors(iv, unbuilt_match_tuples.copy())
tinyrna_strand = self.map_strand(strand)
matches_by_shifted_iv = self.selector.build_interval_selectors(iv, matches)
strand = self.map_strand(strand)

self.feats[iv] += (tagged_id, tinyrna_strand, tuple(match_tuples))
for shifted_iv, built_matches in matches_by_shifted_iv.items():
# Sort match tuples by rank for more efficient feature selection
sorted_matches = sorted(built_matches, key=lambda x: x[1])
self.feats[shifted_iv] += (tagged_id, strand, tuple(sorted_matches))
Loading