Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions hed/errors/error_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,23 @@ def aggregate_code_counts(file_code_dict) -> dict:
total_counts[code] += count
return dict(total_counts)

@staticmethod
def get_code_counts(issues: list[dict]) -> dict[str, int]:
""" Count the occurrences of each error code in the issues list.

Parameters:
issues (list[dict]): A list of dictionaries containing the issues.

Returns:
dict[str, int]: A dictionary with error codes as keys and their occurrence counts as values.

"""
code_counts = defaultdict(int)
for issue in issues:
code = issue.get('code', 'UNKNOWN')
code_counts[code] += 1
return dict(code_counts)


def sort_issues(issues, reverse=False) -> list[dict]:
"""Sort a list of issues by the error context values.
Expand Down
110 changes: 74 additions & 36 deletions hed/scripts/validate_bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import json
import logging
import sys
from hed import _version as vr
from hed.errors import get_printable_issue_string, ErrorHandler
from hed.tools import BidsDataset

def get_parser():
# Create the argument parser
Expand All @@ -31,22 +34,20 @@ def get_parser():
help="Apply error limit by file rather than overall for text output.")
parser.add_argument("-f", "--format", choices=["text", "json", "json_pp"], default="text",
help="Output format: 'text' (default) or 'json' ('json_pp' for pretty-printed json)")
parser.add_argument("-l", "--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="WARNING", help="Log level (case insensitive). Default: INFO")
parser.add_argument("-lf", "--log-file", dest="log_file", default=None,
help="Full path to save log output to file. If not specified, logs go to stderr.")
parser.add_argument("-lq", "--log-quiet", action='store_true', dest="log_quiet",
help="If present, suppress log output to stderr (only applies if --log-file is used).")
parser.add_argument("-o", "--output_file", dest="output_file", default='',
help="Full path of output of validator -- otherwise output written to standard error.")

parser.add_argument("-p", "--print_output", action='store_true', dest="print_output",
help="If present, output the results to standard out in addition to any saving of the files.")
parser.add_argument("-s", "--suffixes", dest="suffixes", nargs="*", default=['events', 'participants'],
help = "Optional list of suffixes (no under_bar) of tsv files to validate." +
" If -s with no values, will use all possible suffixes as with single argument '*'.")

parser.add_argument("-l", "--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="WARNING", help="Log level (case insensitive). Default: WARNING")
parser.add_argument("-lf", "--log-file", dest="log_file", default=None,
help="Full path to save log output to file. If not specified, logs go to stderr.")
parser.add_argument("-lq", "--log-quiet", action='store_true', dest="log_quiet",
help="If present, suppress log output to stderr (only applies if --log-file is used).")
parser.add_argument("-v", "--verbose", action='store_true',
help="If present, output informative messages as computation progresses (equivalent to --log-level INFO).")
parser.add_argument("-w", "--check_for_warnings", action='store_true', dest="check_for_warnings",
Expand All @@ -57,15 +58,62 @@ def get_parser():
return parser


def format_validation_results(issue_list, args, ErrorHandler):
"""Generate and output validation results based on format and options.

Parameters:
issue_list (list): List of validation issues found
args: Parsed command line arguments containing format and output options
ErrorHandler: Error handling class for filtering issues

Returns:
str: Formatted validation results as a string in the requested format (text, json, or json_pp)
"""
# Output based on format
output = ""
if args.format == "json_pp":
output = json.dumps({"issues": issue_list, "hedtools_version": str(vr.get_versions())}, indent=4)
elif args.format == "json":
output = json.dumps(issue_list)
elif args.format == "text":
output = f"Using HEDTOOLS version: {str(vr.get_versions())}\n"
output += f"Number of issues: {len(issue_list)}\n"
if args.error_limit:
[issue_list, code_counts] = ErrorHandler.filter_issues_by_count(issue_list, args.error_limit,
by_file=args.errors_by_file)
output += " ".join(f"{code}:{count}" for code, count in code_counts.items()) + "\n"
output += f"Number of issues after filtering: {len(issue_list)}\n"
if issue_list:
output += get_printable_issue_string(issue_list, "HED validation errors: ", skip_filename=False)

return output

def format_final_report(issue_list):
"""Generate a final summary report of the validation results.

Parameters:
issue_list (list): List of validation issues found

Returns:
str: Summary report of the validation results
"""
report = f"Validation completed.\n\tFound {len(issue_list)} issues."
if issue_list:
unique_codes = ErrorHandler.get_code_counts(issue_list)
code_summary = ", ".join(f"{code}({count})" for code, count in unique_codes.items())
report += f"\nCode counts: {code_summary}"
return report


def main(arg_list=None):
# Create the argument parser
parser = get_parser()

# Parse the arguments
args = parser.parse_args(arg_list)

print(f"{str(args)}")
# Setup logging configuration
log_level = args.log_level.upper() if args.log_level else 'WARNING'
log_level = args.log_level.upper() if args.log_level else 'INFO'
if args.verbose:
log_level = 'INFO'

Expand Down Expand Up @@ -106,19 +154,17 @@ def main(arg_list=None):
try:
issue_list = validate_dataset(args)
except Exception as e:
logger.exception(f"Validation failed with exception: {e}")
logger.error(f"Validation failed with exception: {e}")
raise

final_report = format_final_report(issue_list)
logger.info(final_report)
print(final_report)
# Return 1 if there are issues, 0 otherwise
return int(bool(issue_list))


def validate_dataset(args):
# Delayed imports to speed up --help
from hed.errors import get_printable_issue_string, ErrorHandler
from hed.tools import BidsDataset
from hed import _version as vr

logger = logging.getLogger('validate_bids')
logger.info(f"Data directory: {args.data_path}")
logger.info(f"HED tools version: {str(vr.get_versions())}")
Expand All @@ -128,8 +174,7 @@ def validate_dataset(args):

if args.suffixes == ['*'] or args.suffixes == []:
args.suffixes = None
logger.debug("Using all available suffixes")


# Validate the dataset
try:
logger.info("Creating BIDS dataset object...")
Expand All @@ -145,31 +190,24 @@ def validate_dataset(args):
logger.debug("Full exception details:", exc_info=True)
raise

# Output based on format
output = ""
if args.format == "json_pp":
output = json.dumps({"issues": issue_list, "hedtools_version": str(vr.get_versions())}, indent=4)
elif args.format == "json":
output = json.dumps(issue_list)
elif args.format == "text":
output = f"Using HEDTOOLS version: {str(vr.get_versions())}\n"
output += f"Number of issues: {len(issue_list)}\n"
if args.error_limit:
[issue_list, code_counts] = ErrorHandler.filter_issues_by_count(issue_list, args.error_limit,
by_file=args.errors_by_file)
output += " ".join(f"{code}:{count}" for code, count in code_counts.items()) + "\n"
output += f"Number of issues after filtering: {len(issue_list)}\n"
if issue_list:
output += get_printable_issue_string(issue_list, "HED validation errors: ", skip_filename=False)

# Generate and output the results if there is to be output
if args.output_file or args.print_output:
output = format_validation_results(issue_list, args, ErrorHandler)
else:
output = ""

# Output to file or print to screen
if args.output_file:
with open(args.output_file, 'w') as fp:
fp.write(output)
if args.print_output:
print(output)


return issue_list




if __name__ == "__main__":
sys.exit(main())
40 changes: 40 additions & 0 deletions tests/errors/test_error_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,43 @@ def test_format_unknown_error(self):
"param1", param2=0,
actual_error=actual_code)
self.assertEqual(error_list[0]['code'], actual_code)

def test_get_code_counts(self):
"""Test the get_code_counts static method."""
# Create test issues with different error codes
issues = [
{'code': 'ERROR_CODE_1', 'message': 'First error'},
{'code': 'ERROR_CODE_2', 'message': 'Second error'},
{'code': 'ERROR_CODE_1', 'message': 'Another first error'},
{'code': 'ERROR_CODE_3', 'message': 'Third error'},
{'code': 'ERROR_CODE_1', 'message': 'Yet another first error'},
{'code': 'ERROR_CODE_2', 'message': 'Another second error'}
]

result = ErrorHandler.get_code_counts(issues)

# Check that counts are correct
expected = {
'ERROR_CODE_1': 3,
'ERROR_CODE_2': 2,
'ERROR_CODE_3': 1
}
self.assertEqual(result, expected)

# Test with empty list
empty_result = ErrorHandler.get_code_counts([])
self.assertEqual(empty_result, {})

# Test with issue missing 'code' key
issues_with_missing_code = [
{'code': 'VALID_CODE', 'message': 'Valid error'},
{'message': 'Missing code error'}, # No 'code' key
{'code': 'VALID_CODE', 'message': 'Another valid error'}
]

result_with_missing = ErrorHandler.get_code_counts(issues_with_missing_code)
expected_with_missing = {
'VALID_CODE': 2,
'UNKNOWN': 1 # Default for missing code
}
self.assertEqual(result_with_missing, expected_with_missing)
30 changes: 23 additions & 7 deletions tests/scripts/test_validate_bids.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import io
import unittest
import logging
from unittest.mock import patch
from hed.scripts.validate_bids import get_parser, validate_dataset, main

Expand All @@ -14,16 +15,31 @@ def setUpClass(cls):

def test_main_bids(self):
arg_list = [self.data_root, '-x', 'derivatives', 'stimuli' ]
with patch('sys.stdout', new=io.StringIO()) as fp:
main(arg_list)
x = fp.getvalue()
self.assertFalse(x)
# Suppress all output including logging
with patch('sys.stdout', new=io.StringIO()), \
patch('sys.stderr', new=io.StringIO()), \
patch('logging.getLogger') as mock_logger:
# Configure mock logger to do nothing
mock_logger.return_value.info.return_value = None
mock_logger.return_value.debug.return_value = None
mock_logger.return_value.warning.return_value = None
mock_logger.return_value.error.return_value = None
x = main(arg_list)
self.assertFalse(x)

def test_main_warnings(self):
arg_list = [self.data_root, '-x', 'derivatives', 'stimuli', '-w', '-p', '-s' ]
with patch('sys.stdout', new=io.StringIO()) as fp:
main(arg_list)
self.assertTrue(fp.getvalue())
# Suppress all output including logging
with patch('sys.stdout', new=io.StringIO()), \
patch('sys.stderr', new=io.StringIO()), \
patch('logging.getLogger') as mock_logger:
# Configure mock logger to do nothing
mock_logger.return_value.info.return_value = None
mock_logger.return_value.debug.return_value = None
mock_logger.return_value.warning.return_value = None
mock_logger.return_value.error.return_value = None
x = main(arg_list)
self.assertTrue(x)

if __name__ == '__main__':
unittest.main()
Loading