Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hed/tools/analysis/event_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,6 @@ def get_error_lines(issues):
line = issue.get('ec_line')
if line:
error_lines[code].append(line)
for key, value in error_lines.items():
error_lines[key] = set(value)
return error_lines
70 changes: 40 additions & 30 deletions hed/tools/analysis/events_summary.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
from hed import TabularInput
from hed.errors import ErrorHandler
from hed.schema import load_schema_version
from hed.errors import ErrorHandler, ErrorContext, get_printable_issue_string
from hed.tools import EventManager, HedTagManager
from hed.errors.error_types import TagQualityErrors
from hed.tools.analysis.event_checker import EventsChecker


Expand Down Expand Up @@ -47,15 +47,19 @@ class EventsSummary:

def __init__(self, hed_schema, file, sidecar=None, name=None):
""" Constructor for the HedString class."""
self.checker = None
self.fatal_errors = False
self._initialize(hed_schema, file, sidecar, name)

def _initialize(self, hed_schema, file, sidecar, name):
self.input_data = TabularInput(file, sidecar, name)
checker = EventsChecker(hed_schema, self.input_data, name)
self.hed_objs = checker.hed_objs
self.input_data = checker.input_data
issues = checker.validate_event_tags()
self.error_lines = EventsChecker.get_error_lines(issues)
errors = self.input_data.validate(hed_schema, error_handler=ErrorHandler(check_for_warnings=False))
if errors:
self.fatal_errors=True
return
self.checker = EventsChecker(hed_schema, self.input_data, name)
self.issues = self.checker.validate_event_tags()
self.error_lines = EventsChecker.get_error_lines(self.issues)


def extract_tag_summary(self):
Expand All @@ -68,14 +72,11 @@ def extract_tag_summary(self):

group_dict = {key: set() for key in self.MATCH_TYPES}
other = set()
group_error_lines = self.error_lines.get()
for index, hed_obj in enumerate(self.hed_objs):
if not hed_obj or index in self.group_error_lines:
group_error_lines = self.error_lines.get(TagQualityErrors.IMPROPER_EVENT_GROUPS)
for index, hed_obj in enumerate(self.checker.hed_objs):
if not hed_obj or index in group_error_lines:
continue
all_tags = hed_obj.get_all_tags()
# if index in self.missing_error_lines:
# other = self.update_tags(other, all_tags)
# continue
found = False
for key, tags in group_dict.items():
if self.match_tags(all_tags, key):
Expand Down Expand Up @@ -107,6 +108,26 @@ def update_tags(self, tag_set, all_tags):
return tag_set


def summarize_tags(schema, tsv, sidecar, name):
""" Summarize the tags in a given tabular input file.

Args:
hed_schema: The HED schema to use for validation.
file: The path to the input file.
sidecar: The path to the sidecar file (optional).
name: The name of the dataset (optional).

Returns:
dict: A dictionary with the summary information - (str, list)
list: A set of tags that do not match any of the specified types but are not excluded.
"""
events_summary = EventsSummary(schema, tsv, sidecar, name)
if events_summary.fatal_errors:
return None
summary, others = events_summary.extract_tag_summary()
return summary


if __name__ == '__main__':
schema = load_schema_version('8.4.0')

Expand All @@ -129,25 +150,14 @@ def update_tags(self, tag_set, all_tags):
# data_name = 'eeg_ds004117s_hed_sternberg'

# Create the event summary
events_summary = EventsSummary(schema, tsv_path, sidecar_path, data_name)
print(f"Data name: {data_name}")
# # Check the validity of the event tags
# these_issues = events_summary.validate_event_tags()
# if these_issues:
# print(f"Errors found in {get_printable_issue_string(these_issues, '')}")
# else:
# print(f"No errors found in {data_name}.")

# Extract the tag summary
tag_dict, others = events_summary.extract_tag_summary()

for the_key, the_item in tag_dict.items():
summary = summarize_tags(schema, tsv_path, sidecar=sidecar_path, name=data_name)
if summary is None:
print("Fatal errors in the input file. Cannot summarize tags.")
exit(1)

for the_key, the_item in summary.items():
if not the_item:
continue
print(f"{the_key}:")
for the_tag in the_item:
print(f" {the_tag}")

print("Other:")
for the_tag in others:
print(f" {the_tag}")