diff --git a/hed/errors/error_messages.py b/hed/errors/error_messages.py index 8b7d69554..0432bf180 100644 --- a/hed/errors/error_messages.py +++ b/hed/errors/error_messages.py @@ -211,8 +211,8 @@ def val_error_extra_slashes_spaces(tag, problem_tag): @hed_error(ValidationErrors.SIDECAR_KEY_MISSING, default_severity=ErrorSeverity.WARNING) -def val_error_sidecar_key_missing(invalid_keys, category_keys): - return f"Category keys '{invalid_keys}' do not exist in column. Valid keys are: {category_keys}" +def val_error_sidecar_key_missing(invalid_keys, category_keys, column_name): + return f"Category keys {invalid_keys} do not exist in sidecar for column '{column_name}'. Valid keys are: {category_keys}" @hed_error(ValidationErrors.TSV_COLUMN_MISSING, actual_code=ValidationErrors.SIDECAR_KEY_MISSING, @@ -470,14 +470,34 @@ def missing_event_type(string, line): return f"The HED string '{string}' at line {line} has no Event type." +@hed_error(TagQualityErrors.IMPROPER_EVENT_GROUPS, default_severity=ErrorSeverity.WARNING, + actual_code=TagQualityErrors.IMPROPER_EVENT_GROUPS) +def improper_event_groups(event_types, string, line): + return f"The HED string '{string}' at line {line} has multiple events [{event_types}] but is improperly " + \ + f"parenthesized so the other tags cannot be uniquely associated with an event." + + @hed_error(TagQualityErrors.MISSING_TASK_ROLE, default_severity=ErrorSeverity.WARNING, actual_code=TagQualityErrors.MISSING_TASK_ROLE) def missing_task_role(event_type, string, line): return f"The HED string '{string}' at line {line} with event {event_type} has no Task-event-role type tag." -@hed_error(TagQualityErrors.IMPROPER_TAG_GROUPING, default_severity=ErrorSeverity.WARNING, - actual_code=TagQualityErrors.IMPROPER_TAG_GROUPING) -def improper_tag_grouping(event_types, string, line): - return f"The HED string '{string}' at line {line} has multiple events [{event_types}] but is improperly " + \ - f"parenthesized so the other tags cannot be uniquely associated with an event." \ No newline at end of file +@hed_error(TagQualityErrors.AMBIGUOUS_TAG_GROUPING, default_severity=ErrorSeverity.WARNING, + actual_code=TagQualityErrors.AMBIGUOUS_TAG_GROUPING) +def ambiguous_tag_grouping(tags, string, line): + return f"The HED string '{string}' at line {line} has ambiguously grouped tags [{tags}] and needs parentheses." + + +@hed_error(TagQualityErrors.MISSING_SENSORY_PRESENTATION, default_severity=ErrorSeverity.WARNING, + actual_code=TagQualityErrors.MISSING_SENSORY_PRESENTATION) +def missing_sensory_presentation(string, line): + return f"The HED string '{string}' at line {line} is a Sensory-event but does not have a sensory presentation " + \ + f"modality tag such as Visual-presentation or Auditory-presentation." + + +@hed_error(TagQualityErrors.MISSING_ACTION_TAG, default_severity=ErrorSeverity.WARNING, + actual_code=TagQualityErrors.MISSING_ACTION_TAG) +def missing_action_tag(string, line): + return f"The HED string '{string}' at line {line} is an Agent-action event but does not any Action tags " + \ + f"such as Move or Perform." \ No newline at end of file diff --git a/hed/errors/error_reporter.py b/hed/errors/error_reporter.py index 8f5b4104b..556ee4573 100644 --- a/hed/errors/error_reporter.py +++ b/hed/errors/error_reporter.py @@ -5,7 +5,8 @@ """ from functools import wraps -import xml.etree.ElementTree as ET +import xml.etree.ElementTree as et +from collections import defaultdict from hed.errors.error_types import ErrorContext, ErrorSeverity from hed.errors.known_error_codes import known_error_codes @@ -47,7 +48,7 @@ def hed_error(error_type, default_severity=ErrorSeverity.ERROR, actual_code=None Parameters: error_type (str): A value from error_types or optionally another value. - default_severity (ErrorSeverity): The default severity for the decorated error. + default_severity (int): The default severity for the decorated error. actual_code (str): The actual error to report to the outside world. """ @@ -61,7 +62,7 @@ def wrapper(*args, severity=default_severity, **kwargs): Parameters: args (args): non keyword args. - severity (ErrorSeverity): Will override the default error value if passed. + severity (int): Will override the default error value if passed. kwargs (**kwargs): Any keyword args to be passed down to error message function. Returns: @@ -82,7 +83,7 @@ def hed_tag_error(error_type, default_severity=ErrorSeverity.ERROR, has_sub_tag= Parameters: error_type (str): A value from error_types or optionally another value. - default_severity (ErrorSeverity): The default severity for the decorated error. + default_severity (int): The default severity for the decorated error. has_sub_tag (bool): If True, this error message also wants a sub_tag passed down. eg "This" in "This/Is/A/Tag" actual_code (str): The actual error to report to the outside world. @@ -101,7 +102,7 @@ def wrapper(tag, index_in_tag, index_in_tag_end, *args, severity=default_severit index_in_tag (int): The index into the tag with a problem(usually 0). index_in_tag_end (int): The last index into the tag with a problem - usually len(tag). args (args): Any other non keyword args. - severity (ErrorSeverity): Used to include warnings as well as errors. + severity (int): Used to include warnings as well as errors. kwargs (**kwargs): Any keyword args to be passed down to error message function. Returns: @@ -218,6 +219,19 @@ def reset_error_context(self): """ self.error_context = [] + def add_context_and_filter(self, issues): + """ Filter out warnings if requested, while adding context to issues. + + issues(list): + list: A list containing a single dictionary representing a single error. + """ + if not self._check_for_warnings: + issues[:] = self.filter_issues_by_severity(issues, ErrorSeverity.ERROR) + + for error_object in issues: + self._add_context_to_errors(error_object, self.error_context) + self._update_error_with_char_pos(error_object) + def format_error_with_context(self, *args, **kwargs): error_object = ErrorHandler.format_error(*args, **kwargs) if self is not None: @@ -259,19 +273,6 @@ def format_error(error_type, *args, actual_error=None, **kwargs): return [error_object] - def add_context_and_filter(self, issues): - """ Filter out warnings if requested, while adding context to issues. - - issues(list): - list: A list containing a single dictionary representing a single error. - """ - if not self._check_for_warnings: - issues[:] = self.filter_issues_by_severity(issues, ErrorSeverity.ERROR) - - for error_object in issues: - self._add_context_to_errors(error_object, self.error_context) - self._update_error_with_char_pos(error_object) - @staticmethod def format_error_from_context(error_type, error_context, *args, actual_error=None, **kwargs): """ Format an error based on the error type. @@ -392,6 +393,57 @@ def filter_issues_by_severity(issues_list, severity): """ return [issue for issue in issues_list if issue['severity'] <= severity] + @staticmethod + def filter_issues_by_count(issues, count, by_file=False): + """ Filter the issues list to only include the first count issues of each code. + + Parameters: + issues (list): A list of dictionaries containing the full issue list. + count (int): The number of issues to keep for each code. + by_file (bool): If True, group by file name. + + Returns: + list: A list of dictionaries containing the issue list after filtering by count. + dict: A dictionary with the codes as keys and the number of occurrences as values. + + """ + total_seen = {} + file_dicts = {'': {}} + filtered_issues = [] + for issue in issues: + seen_codes = file_dicts[''] + if by_file and 'ec_filename' in issue: + file_name = issue['ec_filename'] + if file_name not in file_dicts: + file_dicts[file_name] = {} + seen_codes = file_dicts[file_name] + + code = issue['code'] + if code not in seen_codes: + seen_codes[code] = 0 + seen_codes[code] += 1 + if seen_codes[code] > count: + continue + filtered_issues.append(issue) + + return filtered_issues, ErrorHandler.aggregate_code_counts(file_dicts) + + @staticmethod + def aggregate_code_counts(file_code_dict): + """ Aggregate the counts of codes across multiple files. + + Parameters: + file_code_dict (dict): A dictionary where keys are filenames and values are dictionaries of code counts. + + Returns: + dict: A dictionary with the aggregated counts of codes across all files. + """ + total_counts = defaultdict(int) + for file_dict in file_code_dict.values(): + for code, count in file_dict.items(): + total_counts[code] += count + return dict(total_counts) + def sort_issues(issues, reverse=False): """Sort a list of issues by the error context values. @@ -425,7 +477,7 @@ def check_for_any_errors(issues_list): return False -def get_printable_issue_string(issues, title=None, severity=None, skip_filename=True, add_link=False): +def get_printable_issue_string(issues, title=None, severity=None, skip_filename=True, add_link=False, show_details=False): """ Return a string with issues list flatted into single string, one per line. Parameters: @@ -434,6 +486,7 @@ def get_printable_issue_string(issues, title=None, severity=None, skip_filename= severity (int): Return only warnings >= severity. skip_filename (bool): If True, don't add the filename context to the printable string. add_link (bool): Add a link at the end of message to the appropriate error if True + show_details (bool): If True, show details about the issues. Returns: str: A string containing printable version of the issues or ''. @@ -442,14 +495,14 @@ def get_printable_issue_string(issues, title=None, severity=None, skip_filename= issues = ErrorHandler.filter_issues_by_severity(issues, severity) output_dict = _build_error_context_dict(issues, skip_filename) - issue_string = _error_dict_to_string(output_dict, add_link=add_link) + issue_string = _error_dict_to_string(output_dict, add_link=add_link, show_details=show_details) if title: issue_string = title + '\n' + issue_string return issue_string -def get_printable_issue_string_html(issues, title=None, severity=None, skip_filename=True): +def get_printable_issue_string_html(issues, title=None, severity=None, skip_filename=True, show_details=False): """ Return a string with issues list as an HTML tree. Parameters: @@ -457,6 +510,7 @@ def get_printable_issue_string_html(issues, title=None, severity=None, skip_file title (str): Optional title that will always show up first if present. severity (int): Return only warnings >= severity. skip_filename (bool): If True, don't add the filename context to the printable string. + show_details (bool): If True, show details about the issues. Returns: str: An HTML string containing the issues or ''. @@ -468,10 +522,10 @@ def get_printable_issue_string_html(issues, title=None, severity=None, skip_file root_element = _create_error_tree(output_dict) if title: - title_element = ET.Element("h1") + title_element = et.Element("h1") title_element.text = title root_element.insert(0, title_element) - return ET.tostring(root_element, encoding='unicode') + return et.tostring(root_element, encoding='unicode') def iter_errors(issues): """ An iterator over issues represented as flat dictionaries. @@ -556,7 +610,7 @@ def _add_single_error_to_dict(items, root=None, issue_to_add=None): return root -def _error_dict_to_string(print_dict, add_link=True, level=0): +def _error_dict_to_string(print_dict, add_link=True, show_details=False, level=0): output = "" if print_dict is None: return output @@ -569,15 +623,34 @@ def _error_dict_to_string(print_dict, add_link=True, level=0): if add_link: link_url = create_doc_link(child['code']) if link_url: - single_issue_message += f" See... {link_url}" + single_issue_message += "\n" + (level + 1) * "\t" + f" See... {link_url}" + if show_details and "details" in child: + issue_string += _expand_details(child["details"], level + 1) output += issue_string continue output += _format_single_context_string(context[0], context[1], level) - output += _error_dict_to_string(value, add_link, level + 1) + output += _error_dict_to_string(value, add_link, show_details, level + 1) return output +def _expand_details(details, indent=0): + """ Expand the details of an error into a string. + + Parameters: + details (str): The details to expand. + indent (int): The indentation level. + + Returns: + str: The expanded details string. + """ + if not details: + return "" + expanded_details = "" + for line in details: + expanded_details += indent * "\t" + line + "\n" + return expanded_details + def _get_context_from_issue(val_issue, skip_filename=True): """ Extract all the context values from the given issue. @@ -653,12 +726,12 @@ def _format_single_context_string(context_type, context, tab_count=0): def _create_error_tree(error_dict, parent_element=None, add_link=True): if parent_element is None: - parent_element = ET.Element("ul") + parent_element = et.Element("ul") for context, value in error_dict.items(): if context == "children": for child in value: - child_li = ET.SubElement(parent_element, "li") + child_li = et.SubElement(parent_element, "li") error_prefix = _get_error_prefix(child) single_issue_message = child["message"] @@ -666,7 +739,7 @@ def _create_error_tree(error_dict, parent_element=None, add_link=True): if add_link: link_url = create_doc_link(child['code']) if link_url: - a_element = ET.SubElement(child_li, "a", href=link_url) + a_element = et.SubElement(child_li, "a", href=link_url) a_element.text = error_prefix a_element.tail = " " + single_issue_message else: @@ -675,9 +748,9 @@ def _create_error_tree(error_dict, parent_element=None, add_link=True): child_li.text = error_prefix + " " + single_issue_message continue - context_li = ET.SubElement(parent_element, "li") + context_li = et.SubElement(parent_element, "li") context_li.text = _format_single_context_string(context[0], context[1]) - context_ul = ET.SubElement(context_li, "ul") + context_ul = et.SubElement(context_li, "ul") _create_error_tree(value, context_ul, add_link) return parent_element diff --git a/hed/errors/error_types.py b/hed/errors/error_types.py index 6157c5978..5e8549c58 100644 --- a/hed/errors/error_types.py +++ b/hed/errors/error_types.py @@ -193,5 +193,8 @@ class ColumnErrors: class TagQualityErrors: MISSING_EVENT_TYPE = "MISSING_EVENT_TYPE" + IMPROPER_EVENT_GROUPS = "IMPROPER_EVENT_GROUPS" MISSING_TASK_ROLE = "MISSING_TASK_ROLE" - IMPROPER_TAG_GROUPING = "IMPROPER_TAG_GROUPING" \ No newline at end of file + AMBIGUOUS_TAG_GROUPING = "AMBIGUOUS_TAG_GROUPING" + MISSING_ACTION_TAG = "MISSING_ACTION_TAG" + MISSING_SENSORY_PRESENTATION = "MISSING_SENSORY_PRESENTATION" diff --git a/hed/models/base_input.py b/hed/models/base_input.py index c483ca5e0..95f9c0f4b 100644 --- a/hed/models/base_input.py +++ b/hed/models/base_input.py @@ -117,12 +117,14 @@ def series_filtered(self): """ if self.onsets is not None: return filter_series_by_onset(self.series_a, self.onsets) + return None @property def onsets(self): """Return the onset column if it exists. """ if "onset" in self.columns: return self._dataframe["onset"] + return None @property def needs_sorting(self): diff --git a/hed/models/df_util.py b/hed/models/df_util.py index c46f43d3a..77528f6a5 100644 --- a/hed/models/df_util.py +++ b/hed/models/df_util.py @@ -137,7 +137,7 @@ def replace_ref(text, oldvalue, newvalue="n/a"): str: The modified string with the ref replaced or removed. """ # If it's not n/a, we can just replace directly. - if newvalue != "n/a": + if newvalue != "n/a" and newvalue != "": return text.replace(oldvalue, newvalue) def _remover(match): diff --git a/hed/scripts/validate_bids.py b/hed/scripts/validate_bids.py index 132b0adc4..d80b6f8c5 100644 --- a/hed/scripts/validate_bids.py +++ b/hed/scripts/validate_bids.py @@ -6,21 +6,28 @@ def get_parser(): # Create the argument parser parser = argparse.ArgumentParser(description="Validate a BIDS-formatted HED dataset.") parser.add_argument("data_path", help="Full path of dataset root directory.") + parser.add_argument("-ec", "--error_count", dest="error_limit", type=int, default=None, + help="Limit the number of errors of each code type to report for text output.") + parser.add_argument("-ef", "--errors_by_file", dest="errors_by_file", type=bool, default=False, + help="Apply error limit by file rather than overall for text output.") parser.add_argument("-f", "--format", choices=["text", "json", "json_pp"], default="text", help="Output format: 'text' (default) or 'json' ('json_pp' for pretty-printed json)") parser.add_argument("-o", "--output_file", dest="output_file", default='', help="Full path of output of validator -- otherwise output written to standard error.") - parser.add_argument("-s", "--suffixes", dest="suffixes", nargs="*", default=['events', 'participants'], - help="Optional list of suffixes (no under_bar) of tsv files to validate. If -s with no values, will use all possible suffixes as with single argument '*'.") - parser.add_argument("-x", "--exclude-dirs", nargs="*", default=['sourcedata', 'derivatives', 'code', 'stimuli'], - dest="exclude_dirs", - help="Directories name to exclude in search for files to validate.") + parser.add_argument("-p", "--print_output", action='store_true', dest="print_output", help="If present, output the results to standard out in addition to any saving of the files.") - parser.add_argument("-w", "--check_for_warnings", action='store_true', dest="check_for_warnings", - help="If present, check for warnings as well as errors.") + parser.add_argument("-s", "--suffixes", dest="suffixes", nargs="*", default=['events', 'participants'], + help = "Optional list of suffixes (no under_bar) of tsv files to validate." + + " If -s with no values, will use all possible suffixes as with single argument '*'.") + parser.add_argument("-v", "--verbose", action='store_true', help="If present, output informative messages as computation progresses.") + parser.add_argument("-w", "--check_for_warnings", action='store_true', dest="check_for_warnings", + help="If present, check for warnings as well as errors.") + parser.add_argument("-x", "--exclude-dirs", nargs="*", default=['sourcedata', 'derivatives', 'code', 'stimuli'], + dest="exclude_dirs", + help="Directories name to exclude in search for files to validate.") return parser @@ -38,7 +45,7 @@ def main(arg_list=None): def validate_dataset(args): # Delayed imports to speed up --help - from hed.errors import get_printable_issue_string + from hed.errors import get_printable_issue_string, ErrorHandler from hed.tools import BidsDataset from hed import _version as vr @@ -51,7 +58,9 @@ def validate_dataset(args): # Validate the dataset bids = BidsDataset(args.data_path, suffixes=args.suffixes, exclude_dirs=args.exclude_dirs) issue_list = bids.validate(check_for_warnings=args.check_for_warnings) + # Output based on format + output = "" if args.format == "json_pp": output = json.dumps({"issues": issue_list, "hedtools_version": str(vr.get_versions())}, indent=4) elif args.format == "json": @@ -59,6 +68,11 @@ def validate_dataset(args): elif args.format == "text": output = f"Using HEDTOOLS version: {str(vr.get_versions())}\n" output += f"Number of issues: {len(issue_list)}\n" + if args.error_limit: + [issue_list, code_counts] = ErrorHandler.filter_issues_by_count(issue_list, args.error_limit, + by_file=args.errors_by_file) + output += " ".join(f"{code}:{count}" for code, count in code_counts.items()) + "\n" + output += f"Number of issues after filtering: {len(issue_list)}\n" if issue_list: output += get_printable_issue_string(issue_list, "HED validation errors: ", skip_filename=False) diff --git a/hed/tools/analysis/event_checker.py b/hed/tools/analysis/event_checker.py index 3b3f6b863..417ccdbed 100644 --- a/hed/tools/analysis/event_checker.py +++ b/hed/tools/analysis/event_checker.py @@ -1,21 +1,19 @@ -import os -from hed import load_schema_version, get_printable_issue_string -from hed.tools.analysis.event_manager import EventManager -from hed.errors.error_reporter import ErrorHandler, ErrorContext from hed.errors.error_types import TagQualityErrors -from hed.models.tabular_input import TabularInput -from hed.tools.analysis.hed_tag_manager import HedTagManager +from hed.errors import ErrorHandler, ErrorContext, get_printable_issue_string +from hed import TabularInput +from hed.tools import EventManager, HedTagManager + class EventChecker: EVENT_TAGS = {'Event', 'Sensory-event', 'Agent-action', 'Data-feature', 'Experiment-control', 'Experiment-structure', 'Measurement-event'} NON_TASK_EVENTS = {'Data-feature', 'Experiment-control', 'Experiment-structure', 'Measurement-event'} TASK_ROLES = {'Experimental-stimulus', 'Participant-response', 'Incidental', 'Instructional', 'Mishap', - 'Task-activity', 'Warning'} + 'Task-activity', 'Warning', 'Cue', 'Feedback'} ACTION_ROLES = {'Appropriate-action', 'Correct-action', 'Correction', 'Done-indication', 'Imagined-action', 'Inappropriate-action', 'Incorrect-action', 'Indeterminate-action', 'Miss', 'Near-miss', 'Omitted-action', 'Ready-indication'} - STIMULUS_ROLES = {'Cue', 'Distractor', 'Expected', 'Extraneous', 'Feedback', 'Go-signal', 'Meaningful', + STIMULUS_ROLES = { 'Distractor', 'Expected', 'Extraneous', 'Go-signal', 'Meaningful', 'Newly-learned', 'Non-informative', 'Non-target', 'Not-meaningful', 'Novel', 'Oddball', 'Penalty', 'Planned', 'Priming', 'Query', 'Reward', 'Stop-signal', 'Target', 'Threat', 'Timed', 'Unexpected', 'Unplanned'} @@ -38,7 +36,6 @@ def __init__(self, hed_obj, line_number, error_handler=None): else: self.error_handler = error_handler self.issues = self._verify_events(self.hed_obj) - self.group_error = any(issue['code'] == TagQualityErrors.IMPROPER_TAG_GROUPING for issue in self.issues) def _verify_events(self, hed_obj): """ Verify that the events in the HED string are properly grouped. @@ -48,6 +45,12 @@ def _verify_events(self, hed_obj): Returns: list: list of issues + + Errors are detected for the following cases: + 1. The HED string has no event tags. + 2. The HED string has multiple event tags that aren't in separate groups. + 3. The HED string has multiple event tags and a top-level group doesn't have an event tag. + 4. The HED string has no task role tags. """ if not hed_obj: return [] @@ -74,19 +77,37 @@ def _check_grouping(self, hed_groups): if not event_tags: return ErrorHandler.format_error_with_context(self.error_handler, TagQualityErrors.MISSING_EVENT_TYPE, string=str(group), line=self.line_number) + if len(event_tags) == 1: - return self._check_task_role(group, event_tags[0], all_tags) + return self._check_event_group(group, event_tags[0], all_tags) # At this point, we know we have multiple event tags in the group. if any(tag.short_base_tag in event_tags for tag in group.tags()): - return ErrorHandler.format_error_with_context(self.error_handler, TagQualityErrors.IMPROPER_TAG_GROUPING, + return ErrorHandler.format_error_with_context(self.error_handler, TagQualityErrors.IMPROPER_EVENT_GROUPS, string=str(group), line=self.line_number, event_types =', '.join(event_tags)) hed_groups.extend(group.groups()) return [] + def _check_event_group(self, hed_group, event_tag, all_tags): + """ Check that a group with a single event tag has the right supporting tags + + Parameters: + hed_group (HedGroup): The HED group to check (should have a single event tag). + event_tag (str): The single event tag associated with the group. + all_tags (list): A list of all the HedTag objects in the group. + + Returns: + list: list of issues: + + """ + issues = self._check_task_role(hed_group, event_tag, all_tags) + issues += self._check_presentation_modality(hed_group, event_tag, all_tags) + issues += self._check_action_tags(hed_group, event_tag, all_tags) + return issues + def _check_task_role(self, hed_group, event_tag, all_tags): - """ Check that a group with a single event tag has at least one task role tag. + """ Check that a group with a single event tag has at least one task role tag unless it is a non-task event. Parameters: hed_group (HedGroup): The HED group to check (should have a single event tag). @@ -113,180 +134,173 @@ def _check_task_role(self, hed_group, event_tag, all_tags): event_type=event_tag, string=str(hed_group), line=self.line_number) -class EventsSummary: - # Excluding tags for condition-variables and task -- these can be done separately if we want to. + def _check_presentation_modality(self, hed_group, event_tag, all_tags): + """ Check that a group with a single event sensory event tag + + Parameters: + hed_group (HedGroup): The HED group to check (should have a single event tag). + event_tag (str): The single event tag associated with the group. + all_tags (list): A list of all the HedTag objects in the group. + + Returns: + list: list of issues + + """ + if event_tag != 'Sensory-event': + return [] + if any('sensory-presentation' in tag.tag_terms for tag in all_tags): + return [] + return ErrorHandler.format_error_with_context(self.error_handler, TagQualityErrors.MISSING_SENSORY_PRESENTATION, + string=str(hed_group), line=self.line_number) + + def _check_action_tags(self, hed_group, event_tag, all_tags): + """ Check that a group with a single event tag has at least one task role tag unless it is a non-task event. + + Parameters: + hed_group (HedGroup): The HED group to check (should have a single event tag). + event_tag (str): The single event tag associated with the group. + all_tags (list): A list of all the HedTag objects in the group. + + Returns: + list: list of issues + + """ + if event_tag != 'Agent-action': + return [] + if any('action' in tag.tag_terms for tag in all_tags): + return [] + return ErrorHandler.format_error_with_context(self.error_handler, TagQualityErrors.MISSING_ACTION_TAG, + string=str(hed_group), line=self.line_number) + +class EventsChecker: + """ Class to check for event tag quality errors in an event file.""" + + # Excluding tags for condition-variables and task -- these can be done separately if we want to. REMOVE_TYPES = ['Condition-variable', 'Task'] - # Tags organized by whether they are found with either of these - MATCH_TYPES = ['Experimental-stimulus', 'Participant-response', 'Cue', 'Feedback', 'Instructional', 'Sensory-event', 'Agent-action'] - - # If a tag has any of these as a parent, it is excluded - EXCLUDED_PARENTS = {'data-marker', 'data-resolution', 'quantitative-value', 'spatiotemporal-value', - 'statistical-value', 'informational-property', 'organizational-property', - 'grayscale', 'hsv-color', 'rgb-color', 'luminance', 'luminance-contrast', 'opacity', - 'task-effect-evidence', 'task-relationship', 'relation'} - - # If a tag has any of these as a parent, it is replaced by this parent only - CUTOFF_TAGS = {'blue-color', 'brown-color', 'cyan-color', 'gray-color', 'green-color', 'orange-color', - 'pink-color', 'purple-color', 'red-color', 'white-color', 'yellow-color', - 'visual-presentation'} - - # These tags are removed at the end as non-informational - FILTERED_TAGS = {'event', 'agent', 'action', 'move-body-part', 'item', 'biological-item', 'anatomical-item', - 'body-part', - 'lower-extremity-part', 'upper-extremity-part', 'head-part', 'torso-part', 'face-part', - 'language-item', 'object', 'geometric-object', - 'man-made-object', 'device', 'computing-device', 'io-device', 'input-device', 'output-device', - 'auditory-device', 'display-device', - 'recording-device', 'natural-object', 'document', 'media', 'media-clip', 'visualization', - 'property', 'agent-property', 'agent-state', - 'agent-cognitive-state', 'agent-emotional-state', 'agent-physiological-state', - 'agent-postural-state', - 'agent-task-role', 'agent-trait', - 'data-property', 'biological-artifact', 'nonbiological-artifact', - 'spatial-property', 'temporal-property', 'spectral-property', 'dara-source-type', 'data-value', - 'categorical-value', 'categorical-class-value', 'categorical-judgment-value', - 'categorical-level-value', 'categorical-location-value', 'categorical-orientation-value', - 'physical-value', 'data-variability-attribute', 'environmental-property', 'sensory-property', - 'sensory-attribute', 'auditory-attribute', 'gustatory-attribute', 'olfactory-attribute', - 'tactile-attribute', 'visual-attribute', 'sensory-presentation', 'task-property', - 'task-action-type', - 'task-attentional-demand', 'task-event-role', 'task-stimulus-role'} - - def __init__(self, hed_schema, file, sidecar=None, name=None): - """ Constructor for the HedString class. + + + def __init__(self, hed_schema, input_data, name=None): + """ Constructor for the EventChecker class. Parameters: - hed_schema (HedSchema): The HedSchema object to use for the summary. - file (str or FileLike or pd.Dataframe): A tsv file to open. - sidecar (str or Sidecar or FileLike): A Sidecar or source file/filename. + hed_schema (HedSchema): The HedSchema object to check. + input_data (TabularInput): The input data object to check. name (str): The name to display for this file for error purposes. - """ self._schema = hed_schema + self.input_data = input_data self.name = name - if name is None and isinstance(file, str): - self.name = file - self.hed_objs = self._initialize_hed(file, sidecar, name) self.group_error_lines = [] self.missing_error_lines = [] + self._initialize() - def _initialize_hed(self, file, sidecar, name): - input_data = TabularInput(file, sidecar, name=name) - event_manager = EventManager(input_data, self._schema) + def _initialize(self): + + event_manager = EventManager(self.input_data, self._schema) tag_man = HedTagManager(event_manager, remove_types=self.REMOVE_TYPES) - return tag_man.get_hed_objs(include_context=False, replace_defs=True) + self.hed_objs = tag_man.get_hed_objs(include_context=False, replace_defs=True) def validate_event_tags(self): """ Verify that the events in the HED strings validly represent events. Returns: - issues: list of issues (each of which is a dictionary with 'code' and 'message' keys). + list: each element is a dictionary with 'code' and 'message' keys, """ - all_issues = [] + issues = [] error_handler = ErrorHandler() error_handler.push_error_context(ErrorContext.FILE_NAME, self.name) for index, hed_obj in enumerate(self.hed_objs): if not hed_obj: continue + error_handler.push_error_context(ErrorContext.LINE, index) event_check = EventChecker(hed_obj, index, error_handler) - if event_check.group_error: - self.group_error_lines.append(index) - if event_check.issues: - self.missing_error_lines.append(index) - all_issues += event_check.issues - return all_issues + issues += event_check.issues + error_handler.pop_error_context() + return issues - def extract_tag_summary(self): - """ Extract a summary of the tags in a given tabular input file. + def insert_issue_details(self, issues): + """ Inserts issue details as part of the 'message' key for a list of issues. + + Parameters: + issues (list): List of issues to get details for. - Returns: - dict: A dictionary with the summary information - (str, list) - list: A set of tags that do not match any of the specified types but are not excluded. """ + side_data = self.input_data._mapper.sidecar_column_data + for issue in issues: + line = issue.get('ec_line') + if line is None: + continue + lines = self.get_onset_lines(line) + data_info = self.input_data._dataframe.iloc[lines] + details = ["Sources:"] + for index, row in data_info.iterrows(): + details += EventsChecker.get_issue_details(row, index, side_data) + issue['details'] = details - group_dict = {key: set() for key in self.MATCH_TYPES} - other = set() + @staticmethod + def get_issue_details(data_info, line, side_data): + """ Get the source details for the issue. - for index, hed_obj in enumerate(self.hed_objs): - if not hed_obj or index in self.group_error_lines: + Parameters: + data_info (pd.Series): The row information from the original tsv. + line (list): A list of lines from the original tsv. + side_data (pd.Series): The sidecar data. + + Returns: + list: The HED associated with the relevant columns. + """ + details = [] + for col, value in data_info.items(): + if value == 'n/a': continue - all_tags = hed_obj.get_all_tags() - if index in self.missing_error_lines: - other = self.update_tags(other, all_tags) + col_line = '' + # Check to see if it has HED in the sidecar for this column + if side_data and col in side_data and side_data[col] and side_data[col].hed_dict: + col_line = f" => sidecar_source:{EventsChecker.get_hed_source(side_data[col].hed_dict, value)}" + if not col_line and col != 'HED': continue - found = False - for key, tags in group_dict.items(): - if self.match_tags(all_tags, key): - group_dict[key] = self.update_tags(group_dict[key], all_tags) - found = True - break - if not found: - other = self.update_tags(other, all_tags) - - for key, tags in group_dict.items(): - group_dict[key] = sorted(tags - self.FILTERED_TAGS) - other = sorted(other - self.FILTERED_TAGS) - return group_dict, other + col_line = f"\t[line:{line} column_name:{col} column_value:{data_info[col]}]" + col_line + details.append(col_line) + return details @staticmethod - def match_tags(all_tags, key): - return any(tag.short_base_tag == key for tag in all_tags) + def get_hed_source(hed_dict, value): + """ Get the source of the HED string. - def update_tags(self, tag_set, all_tags): - for tag in all_tags: - terms = tag.tag_terms - if any(item in self.EXCLUDED_PARENTS for item in terms): - continue - match = next((item for item in terms if item in self.CUTOFF_TAGS), None) - if match: - tag_set.add(match) - else: - tag_set.update(tag.tag_terms) - return tag_set - - -if __name__ == '__main__': - schema = load_schema_version('8.4.0') - - # # Wakeman Henson example - root_dir = 'g:/HEDExamples/hed-examples/datasets/eeg_ds003645s_hed' - sidecar_path = os.path.join(root_dir, 'task-FacePerception_events.json') - tsv_path = os.path.join(root_dir, 'sub-002/eeg/sub-002_task-FacePerception_run-1_events.tsv') - data_name = 'eeg_ds003645s_hed' - - # # Attention shift example - # root_dir = 'g:/HEDExamples/hed-examples/datasets/eeg_ds002893s_hed_attention_shift' - # sidecar_path = os.path.join(root_dir, 'task-AuditoryVisualShift_events.json') - # tsv_path = os.path.join(root_dir, 'sub-002/eeg/sub-002_task-AuditoryVisualShift_run-01_events.tsv') - # data_name = 'eeg_ds002893s_hed_attention_shift' - - # Sternberg example - # root_dir = 'g:/HEDExamples/hed-examples/datasets/eeg_ds004117s_hed_sternberg' - # sidecar_path = os.path.join(root_dir, 'task-WorkingMemory_events.json') - # tsv_path = os.path.join(root_dir, 'sub-001/ses-01/eeg/sub-001_ses-01_task-WorkingMemory_run-1_events.tsv') - # data_name = 'eeg_ds004117s_hed_sternberg' - - # Create the event summary - events_summary = EventsSummary(schema, tsv_path, sidecar_path, data_name) - - # Check the validity of the event tags - these_issues = events_summary.validate_event_tags() - if these_issues: - print(f"Errors found in {get_printable_issue_string(these_issues, '')}") - else: - print(f"No errors found in {data_name}.") - - # Extract the tag summary - tag_dict, others = events_summary.extract_tag_summary() - - for the_key, the_item in tag_dict.items(): - if not the_item: - continue - print(f"{the_key}:") - for the_tag in the_item: - print(f" {the_tag}") - - print("Other:") - for the_tag in others: - print(f" {the_tag}") \ No newline at end of file + Parameters: + hed_dict (HedTag): The HedTag object to get the source for. + + Returns: + str: The source of the HED string. + """ + if isinstance(hed_dict, dict): + return hed_dict.get(value) + else: + return hed_dict + + def get_onset_lines(self, line): + """ Get the lines in the input data with the same line numbers as the data_frame. """ + none_positions = [i for i in range(line + 1, len(self.hed_objs)) if self.hed_objs[i] is None] + return [line] + none_positions + + @staticmethod + def get_error_lines(issues): + """ Get the lines grouped by code. + + Parameters: + issues (list): A list of issues to check. + + + Returns: + dict: A dict with keys that are error codes and values that are lists of line numbers. + """ + error_lines = {} + for issue in issues: + code = issue.get('code') + if code not in error_lines: + error_lines[code] = [] + line = issue.get('ec_line') + if line: + error_lines[code].append(line) + return error_lines diff --git a/hed/tools/analysis/events_summary.py b/hed/tools/analysis/events_summary.py new file mode 100644 index 000000000..365e16be7 --- /dev/null +++ b/hed/tools/analysis/events_summary.py @@ -0,0 +1,153 @@ +import os +from hed import TabularInput +from hed.schema import load_schema_version +from hed.errors import ErrorHandler, ErrorContext, get_printable_issue_string +from hed.tools import EventManager, HedTagManager +from hed.tools.analysis.event_checker import EventsChecker + + +class EventsSummary: + # Excluding tags for condition-variables and task -- these can be done separately if we want to. + REMOVE_TYPES = ['Condition-variable', 'Task'] + # Tags organized by whether they are found with either of these + MATCH_TYPES = ['Experimental-stimulus', 'Participant-response', 'Cue', 'Feedback', 'Instructional', 'Sensory-event', 'Agent-action'] + + # If a tag has any of these as a parent, it is excluded + EXCLUDED_PARENTS = {'data-marker', 'data-resolution', 'quantitative-value', 'spatiotemporal-value', + 'statistical-value', 'informational-property', 'organizational-property', + 'grayscale', 'hsv-color', 'rgb-color', 'luminance', 'luminance-contrast', 'opacity', + 'task-effect-evidence', 'task-relationship', 'relation'} + + # If a tag has any of these as a parent, it is replaced by this parent only + CUTOFF_TAGS = {'blue-color', 'brown-color', 'cyan-color', 'gray-color', 'green-color', 'orange-color', + 'pink-color', 'purple-color', 'red-color', 'white-color', 'yellow-color', + 'visual-presentation'} + + # These tags are removed at the end as non-informational + FILTERED_TAGS = {'event', 'agent', 'action', 'move-body-part', 'item', 'biological-item', 'anatomical-item', + 'body-part', + 'lower-extremity-part', 'upper-extremity-part', 'head-part', 'torso-part', 'face-part', + 'language-item', 'object', 'geometric-object', + 'man-made-object', 'device', 'computing-device', 'io-device', 'input-device', 'output-device', + 'auditory-device', 'display-device', + 'recording-device', 'natural-object', 'document', 'media', 'media-clip', 'visualization', + 'property', 'agent-property', 'agent-state', + 'agent-cognitive-state', 'agent-emotional-state', 'agent-physiological-state', + 'agent-postural-state', + 'agent-task-role', 'agent-trait', + 'data-property', 'biological-artifact', 'nonbiological-artifact', + 'spatial-property', 'temporal-property', 'spectral-property', 'dara-source-type', 'data-value', + 'categorical-value', 'categorical-class-value', 'categorical-judgment-value', + 'categorical-level-value', 'categorical-location-value', 'categorical-orientation-value', + 'physical-value', 'data-variability-attribute', 'environmental-property', 'sensory-property', + 'sensory-attribute', 'auditory-attribute', 'gustatory-attribute', 'olfactory-attribute', + 'tactile-attribute', 'visual-attribute', 'sensory-presentation', 'task-property', + 'task-action-type', + 'task-attentional-demand', 'task-event-role', 'task-stimulus-role'} + + def __init__(self, hed_schema, file, sidecar=None, name=None): + """ Constructor for the HedString class.""" + self._initialize(hed_schema, file, sidecar, name) + + def _initialize(self, hed_schema, file, sidecar, name): + self.input_data = TabularInput(file, sidecar, name) + checker = EventsChecker(hed_schema, self.input_data, name) + self.hed_objs = checker.hed_objs + self.input_data = checker.input_data + issues = checker.validate_event_tags() + self.error_lines = EventsChecker.get_error_lines(issues) + + + def extract_tag_summary(self): + """ Extract a summary of the tags in a given tabular input file. + + Returns: + dict: A dictionary with the summary information - (str, list) + list: A set of tags that do not match any of the specified types but are not excluded. + """ + + group_dict = {key: set() for key in self.MATCH_TYPES} + other = set() + group_error_lines = self.error_lines.get() + for index, hed_obj in enumerate(self.hed_objs): + if not hed_obj or index in self.group_error_lines: + continue + all_tags = hed_obj.get_all_tags() + # if index in self.missing_error_lines: + # other = self.update_tags(other, all_tags) + # continue + found = False + for key, tags in group_dict.items(): + if self.match_tags(all_tags, key): + group_dict[key] = self.update_tags(group_dict[key], all_tags) + found = True + break + if not found: + other = self.update_tags(other, all_tags) + + for key, tags in group_dict.items(): + group_dict[key] = sorted(tags - self.FILTERED_TAGS) + other = sorted(other - self.FILTERED_TAGS) + return group_dict, other + + @staticmethod + def match_tags(all_tags, key): + return any(tag.short_base_tag == key for tag in all_tags) + + def update_tags(self, tag_set, all_tags): + for tag in all_tags: + terms = tag.tag_terms + if any(item in self.EXCLUDED_PARENTS for item in terms): + continue + match = next((item for item in terms if item in self.CUTOFF_TAGS), None) + if match: + tag_set.add(match) + else: + tag_set.update(tag.tag_terms) + return tag_set + + +if __name__ == '__main__': + schema = load_schema_version('8.4.0') + + # # Wakeman Henson example + root_dir = 'g:/HEDExamples/hed-examples/datasets/eeg_ds003645s_hed' + sidecar_path = os.path.join(root_dir, 'task-FacePerception_events.json') + tsv_path = os.path.join(root_dir, 'sub-002/eeg/sub-002_task-FacePerception_run-1_events.tsv') + data_name = 'eeg_ds003645s_hed' + + # # Attention shift example + # root_dir = 'g:/HEDExamples/hed-examples/datasets/eeg_ds002893s_hed_attention_shift' + # sidecar_path = os.path.join(root_dir, 'task-AuditoryVisualShift_events.json') + # tsv_path = os.path.join(root_dir, 'sub-002/eeg/sub-002_task-AuditoryVisualShift_run-01_events.tsv') + # data_name = 'eeg_ds002893s_hed_attention_shift' + + # Sternberg example + # root_dir = 'g:/HEDExamples/hed-examples/datasets/eeg_ds004117s_hed_sternberg' + # sidecar_path = os.path.join(root_dir, 'task-WorkingMemory_events.json') + # tsv_path = os.path.join(root_dir, 'sub-001/ses-01/eeg/sub-001_ses-01_task-WorkingMemory_run-1_events.tsv') + # data_name = 'eeg_ds004117s_hed_sternberg' + + # Create the event summary + events_summary = EventsSummary(schema, tsv_path, sidecar_path, data_name) + print(f"Data name: {data_name}") + # # Check the validity of the event tags + # these_issues = events_summary.validate_event_tags() + # if these_issues: + # print(f"Errors found in {get_printable_issue_string(these_issues, '')}") + # else: + # print(f"No errors found in {data_name}.") + + # Extract the tag summary + tag_dict, others = events_summary.extract_tag_summary() + + for the_key, the_item in tag_dict.items(): + if not the_item: + continue + print(f"{the_key}:") + for the_tag in the_item: + print(f" {the_tag}") + + print("Other:") + for the_tag in others: + print(f" {the_tag}") \ No newline at end of file diff --git a/hed/validator/spreadsheet_validator.py b/hed/validator/spreadsheet_validator.py index 8ff5599e6..d6072b16c 100644 --- a/hed/validator/spreadsheet_validator.py +++ b/hed/validator/spreadsheet_validator.py @@ -246,7 +246,8 @@ def _validate_column_structure(self, base_input, error_handler): if invalid_values: error_handler.push_error_context(ErrorContext.COLUMN, column.column_name) issues += error_handler.format_error_with_context(ValidationErrors.SIDECAR_KEY_MISSING, - invalid_keys=str(list(invalid_values)), category_keys=list(valid_keys)) + invalid_keys=str(list(invalid_values)), category_keys=list(valid_keys), + column_name=column.column_name) error_handler.pop_error_context() column_refs = set(base_input.get_column_refs()) # Convert to set for O(1) lookup diff --git a/tests/errors/test_error_filter_count.py b/tests/errors/test_error_filter_count.py new file mode 100644 index 000000000..e9fa5db01 --- /dev/null +++ b/tests/errors/test_error_filter_count.py @@ -0,0 +1,153 @@ +import unittest +from hed.errors.error_reporter import ErrorHandler + +class TestFilterIssuesByCount(unittest.TestCase): + + def test_empty_issues_list(self): + issues = [] + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 2) + self.assertEqual(result, []) + + def test_all_below_limit(self): + issues = [{'code': 'A'}, {'code': 'B'}, {'code': 'A'}] + counts = {'A': 2, 'B': 1} + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 2) + self.assertEqual(result, issues) + self.assertEqual(counts, result_counts) + + def test_some_above_limit(self): + issues = [{'code': 'A'}, {'code': 'A'}, {'code': 'A'}, {'code': 'B'}, {'code': 'B'}, {'code': 'C'}] + counts = {'A': 3, 'B': 2, 'C': 1} + expected = [{'code': 'A'}, {'code': 'A'}, {'code': 'B'}, {'code': 'B'}, {'code': 'C'}] + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 2) + self.assertEqual(result, expected) + self.assertEqual(counts, result_counts) + + def test_zero_limit(self): + issues = [{'code': 'A'}, {'code': 'B'}, {'code': 'A'}] + counts = {'A': 2, 'B': 1} + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 0) + self.assertEqual(result, []) + self.assertEqual(counts, result_counts) + + def test_single_issue_limit(self): + issues = [{'code': 'X'}, {'code': 'X'}, {'code': 'Y'}, {'code': 'Y'}, {'code': 'Z'}] + counts = {'X': 2, 'Y': 2, 'Z': 1} + expected = [{'code': 'X'}, {'code': 'Y'}, {'code': 'Z'}] + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 1) + self.assertEqual(result, expected) + self.assertEqual(counts, result_counts) + + def test_non_consecutive_codes(self): + issues = [{'code': 'A'}, {'code': 'B'}, {'code': 'A'}, {'code': 'B'}, {'code': 'A'}, {'code': 'B'}] + counts = {'A': 3, 'B': 3} + expected = [{'code': 'A'}, {'code': 'B'}, {'code': 'A'}, {'code': 'B'}] + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 2) + self.assertEqual(result, expected) + self.assertEqual(counts, result_counts) + + def test_by_file_false_default_behavior(self): + issues = [{'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file2'}] + counts = {'A': 3} + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 2) + expected = [{'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file1'}] + self.assertEqual(result, expected) + self.assertEqual(counts, result_counts) + + def test_by_file_true_grouping(self): + issues = [{'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file2'}, + {'code': 'A', 'ec_filename': 'file2'}, + {'code': 'A', 'ec_filename': 'file2'}] + counts = {'A': 5} + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 2, by_file=True) + expected = [{'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file2'}, + {'code': 'A', 'ec_filename': 'file2'}] + self.assertEqual(result, expected) + self.assertEqual(counts, result_counts) + + def test_mixed_codes_and_files(self): + issues = [{'code': 'X', 'ec_filename': 'file1'}, + {'code': 'X', 'ec_filename': 'file1'}, + {'code': 'X', 'ec_filename': 'file2'}, + {'code': 'Y', 'ec_filename': 'file1'}, + {'code': 'Y', 'ec_filename': 'file2'}, + {'code': 'Y', 'ec_filename': 'file2'}] + counts = {'X': 3, 'Y': 3} + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 1, by_file=True) + expected = [{'code': 'X', 'ec_filename': 'file1'}, + {'code': 'X', 'ec_filename': 'file2'}, + {'code': 'Y', 'ec_filename': 'file1'}, + {'code': 'Y', 'ec_filename': 'file2'}] + self.assertEqual(result, expected) + self.assertEqual(counts, result_counts) + + def test_missing_ec_filename_with_by_file(self): + issues = [ + {'code': 'A'}, # No 'ec_filename' + {'code': 'A'}, # No 'ec_filename' + {'code': 'A', 'ec_filename': 'file1'}, + {'code': 'A', 'ec_filename': 'file1'} + ] + counts = {'A': 4} + result, result_counts = ErrorHandler.filter_issues_by_count(issues, 1, by_file=True) + expected = [ + {'code': 'A'}, # First from default ('' file group) + {'code': 'A', 'ec_filename': 'file1'} # First from file1 group + ] + self.assertEqual(result, expected) + self.assertEqual(counts, result_counts) + + +class TestAggregateCodeCounts(unittest.TestCase): + + def test_empty_input(self): + input_data = {} + expected_output = {} + self.assertEqual(ErrorHandler.aggregate_code_counts(input_data), expected_output) + + def test_single_file_single_code(self): + input_data = {'file1.txt': {'A': 5}} + expected_output = {'A': 5} + self.assertEqual(ErrorHandler.aggregate_code_counts(input_data), expected_output) + + def test_single_file_multiple_codes(self): + input_data = {'file1.txt': {'A': 1, 'B': 2, 'C': 3}} + expected_output = {'A': 1, 'B': 2, 'C': 3} + self.assertEqual(ErrorHandler.aggregate_code_counts(input_data), expected_output) + + def test_multiple_files_overlapping_codes(self): + input_data = { + 'file1.txt': {'A': 2, 'B': 1}, + 'file2.txt': {'A': 3, 'C': 4}, + 'file3.txt': {'B': 2, 'C': 1} + } + expected_output = {'A': 5, 'B': 3, 'C': 5} + self.assertEqual(ErrorHandler.aggregate_code_counts(input_data), expected_output) + + def test_multiple_files_non_overlapping_codes(self): + input_data = { + 'file1.txt': {'A': 2}, + 'file2.txt': {'B': 3}, + 'file3.txt': {'C': 4} + } + expected_output = {'A': 2, 'B': 3, 'C': 4} + self.assertEqual(ErrorHandler.aggregate_code_counts(input_data), expected_output) + + def test_zero_counts(self): + input_data = { + 'file1.txt': {'A': 0}, + 'file2.txt': {'A': 2} + } + expected_output = {'A': 2} + self.assertEqual(ErrorHandler.aggregate_code_counts(input_data), expected_output) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/tools/analysis/test_event_checker.py b/tests/tools/analysis/test_event_checker.py index 636191966..8d84e5647 100644 --- a/tests/tools/analysis/test_event_checker.py +++ b/tests/tools/analysis/test_event_checker.py @@ -11,70 +11,78 @@ class TestEventChecker(unittest.TestCase): def setUpClass(cls): cls.hed_schema = load_schema_version('8.3.0') + def check_issues(self, hed_string, expected_code=None, line_number=0): + hed_obj = HedString(hed_string, hed_schema=self.hed_schema) + checker = EventChecker(hed_obj, line_number) + if expected_code is None: + self.assertEqual(checker.issues, []) + else: + self.assertTrue(checker.issues) + self.assertEqual(checker.issues[0]["code"], expected_code) + def test_no_event_tag(self): - hed_strings = ['Action, (Participant-response, Red)'] - for hed_string in hed_strings: - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 0) - self.assertEqual(checker.issues[0]["code"], TagQualityErrors.MISSING_EVENT_TYPE) + self.check_issues('Action, (Participant-response, Red)', TagQualityErrors.MISSING_EVENT_TYPE) def test_event_without_task_role(self): hed_strings = ['Sensory-event, (Red, Blue)', '((Agent-action, Red))'] - for hed_string in hed_strings: - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 2) - self.assertEqual(checker.issues[0]["code"], TagQualityErrors.MISSING_TASK_ROLE) + for s in hed_strings: + with self.subTest(s=s): + self.check_issues(s, TagQualityErrors.MISSING_TASK_ROLE) def test_event_with_task_role(self): - hed_strings = ['(Sensory-event, (Experimental-stimulus, Blue, Green))', - '((Agent-action, Participant-response, Red))'] - for hed_string in hed_strings: - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 2) - self.assertEqual(checker.issues, []) + hed_strings = [ + '(Sensory-event, Visual-presentation, (Experimental-stimulus, Blue, Green))', + '((Agent-action, Participant-response, Red, Jump))' + ] + for s in hed_strings: + with self.subTest(s=s): + self.check_issues(s) + + def test_event_missing_sensory_presentation(self): + self.check_issues('(Sensory-event, Experimental-stimulus)', TagQualityErrors.MISSING_SENSORY_PRESENTATION) + + def test_event_with_sensory_presentation(self): + self.check_issues('(Sensory-event, Experimental-stimulus, Auditory-presentation)') + + def test_event_missing_action_tag(self): + self.check_issues('(Agent-action, Participant-response)', TagQualityErrors.MISSING_ACTION_TAG) + + def test_non_task_event_tag_no_task_role(self): + # Should not raise missing task role for non-task event + self.check_issues('(Data-feature, Blue)') def test_improperly_grouped_event_tags(self): hed_strings = ['Sensory-event, (Red, Blue), Experiment-control', '((Sensory-event, (Red, Blue), Experiment-control))'] - for hed_string in hed_strings: - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 2) - self.assertEqual(checker.issues[0]["code"], TagQualityErrors.IMPROPER_TAG_GROUPING) + for s in hed_strings: + with self.subTest(s=s): + self.check_issues(s, TagQualityErrors.IMPROPER_EVENT_GROUPS) def test_nested_group_with_event_and_task_role(self): - hed_strings = ['Sensory-event, ((Experimental-stimulus, Red))', '(Experiment-control, Incidental)'] - for hed_string in hed_strings: - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 5) - self.assertEqual(checker.issues, []) + hed_strings = ['Sensory-event, Visual-presentation, ((Experimental-stimulus, Red))', '(Experiment-control, Incidental)'] + for s in hed_strings: + with self.subTest(s=s): + self.check_issues(s) def test_empty_hed_string(self): checker = EventChecker(None, 6) self.assertEqual(checker.issues, []) def test_flat_event_with_task_role(self): - hed_string = 'Agent-action, Participant-response, Red' - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 7) - self.assertEqual(checker.issues, []) + self.check_issues('Agent-action, Participant-response, Red, Jump') def test_task_role_without_event(self): - hed_string = '(Experimental-stimulus, Green)' - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 8) - self.assertEqual(checker.issues[0]["code"], TagQualityErrors.MISSING_EVENT_TYPE) + self.check_issues('(Experimental-stimulus, Green)', TagQualityErrors.MISSING_EVENT_TYPE) def test_multiple_event_tags_mixed_grouping(self): - hed_string = 'Sensory-event, (Agent-action, Instructional)' - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 9) - self.assertEqual(checker.issues[0]["code"], TagQualityErrors.IMPROPER_TAG_GROUPING) + self.check_issues('Sensory-event, (Agent-action, Instructional)', TagQualityErrors.IMPROPER_EVENT_GROUPS) def test_empty_nested_group(self): - hed_string = '(())' - hed_obj = HedString(hed_string, hed_schema=self.hed_schema) - checker = EventChecker(hed_obj, 10) - self.assertEqual(checker.issues[0]["code"], TagQualityErrors.MISSING_EVENT_TYPE) + self.check_issues('(())', TagQualityErrors.MISSING_EVENT_TYPE) + + def test_multiple_properly_grouped_events(self): + hed_string = '((Sensory-event, Experimental-stimulus, Visual-presentation)), ((Agent-action, Participant-response, Press))' + self.check_issues(hed_string) if __name__ == '__main__': unittest.main() \ No newline at end of file