diff --git a/docs/requirements.txt b/docs/requirements.txt index 94b716c1a..9783a3079 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -8,3 +8,4 @@ portalocker>=2.7.0 semantic_version>=2.10.0 Sphinx>=5.2.2 sphinx_rtd_theme>=1.0.0 +wordcloud>=1.9.2 diff --git a/hed/models/hed_tag.py b/hed/models/hed_tag.py index bfc06abd2..180f7cefb 100644 --- a/hed/models/hed_tag.py +++ b/hed/models/hed_tag.py @@ -602,10 +602,14 @@ def _get_tag_units_portion(self, tag_unit_classes): @staticmethod def _find_modifier_unit_entry(units, all_valid_unit_permutations): possible_match = all_valid_unit_permutations.get(units) - if not possible_match or not possible_match.has_attribute(HedKey.UnitSymbol): - possible_match = all_valid_unit_permutations.get(units.lower()) - if possible_match and possible_match.has_attribute(HedKey.UnitSymbol): - possible_match = None + # If we have a match that's a unit symbol, we're done, return it. + if possible_match and possible_match.has_attribute(HedKey.UnitSymbol): + return possible_match + + possible_match = all_valid_unit_permutations.get(units.lower()) + # Unit symbols must match including case, a match of a unit symbol now is something like M becoming m. + if possible_match and possible_match.has_attribute(HedKey.UnitSymbol): + possible_match = None return possible_match diff --git a/hed/schema/hed_schema.py b/hed/schema/hed_schema.py index f27c97cb1..fc6978fa5 100644 --- a/hed/schema/hed_schema.py +++ b/hed/schema/hed_schema.py @@ -509,7 +509,6 @@ def _find_tag_entry(self, tag, schema_namespace=""): clean_tag = str(tag) namespace = schema_namespace clean_tag = clean_tag[len(namespace):] - prefix_tag_adj = len(namespace) working_tag = clean_tag.lower() # Most tags are in the schema directly, so test that first @@ -523,9 +522,26 @@ def _find_tag_entry(self, tag, schema_namespace=""): return found_entry, remainder, [] + prefix_tag_adj = len(namespace) + + try: + found_entry, current_slash_index = self._find_tag_subfunction(tag, working_tag, prefix_tag_adj) + except self._TagIdentifyError as e: + issue = e.issue + return None, None, issue + + remainder = None + if current_slash_index != -1: + remainder = clean_tag[current_slash_index:] + if remainder and found_entry.takes_value_child_entry: + found_entry = found_entry.takes_value_child_entry + + return found_entry, remainder, [] + + def _find_tag_subfunction(self, tag, working_tag, prefix_tag_adj): + """Finds the base tag and remainder from the left, raising exception on issues""" current_slash_index = -1 current_entry = None - # Loop left to right, checking each word. Once we find an invalid word, we stop. while True: next_index = working_tag.find("/", current_slash_index + 1) @@ -541,36 +557,37 @@ def _find_tag_entry(self, tag, schema_namespace=""): tag, index_in_tag=prefix_tag_adj, index_in_tag_end=prefix_tag_adj + next_index) - return None, None, error + raise self._TagIdentifyError(error) # If this is not a takes value node, validate each term in the remainder. if not current_entry.takes_value_child_entry: - child_names = working_tag[current_slash_index + 1:].split("/") - word_start_index = current_slash_index + 1 + prefix_tag_adj - for name in child_names: - if self._get_tag_entry(name): - error = ErrorHandler.format_error(ValidationErrors.INVALID_PARENT_NODE, - tag, - index_in_tag=word_start_index, - index_in_tag_end=word_start_index + len(name), - expected_parent_tag=self.all_tags[name].name) - return None, None, error - word_start_index += len(name) + 1 + # This will raise _TagIdentifyError on any issues + self._validate_remaining_terms(tag, working_tag, prefix_tag_adj, current_slash_index) break current_entry = parent_entry current_slash_index = next_index if next_index == len(working_tag): break - continue - - remainder = None - if current_slash_index != -1: - remainder = clean_tag[current_slash_index:] - if remainder and current_entry.takes_value_child_entry: - current_entry = current_entry.takes_value_child_entry - found_entry = current_entry - return found_entry, remainder, [] + return current_entry, current_slash_index + + def _validate_remaining_terms(self, tag, working_tag, prefix_tag_adj, current_slash_index): + """ Validates the terms past current_slash_index. + + :raises _TagIdentifyError: + - One of the extension terms already exists as a schema term. + """ + child_names = working_tag[current_slash_index + 1:].split("/") + word_start_index = current_slash_index + 1 + prefix_tag_adj + for name in child_names: + if self._get_tag_entry(name): + error = ErrorHandler.format_error(ValidationErrors.INVALID_PARENT_NODE, + tag, + index_in_tag=word_start_index, + index_in_tag_end=word_start_index + len(name), + expected_parent_tag=self.all_tags[name].name) + raise self._TagIdentifyError(error) + word_start_index += len(name) + 1 # =============================================== # Semi-private creation finalizing functions @@ -801,3 +818,8 @@ def _add_tag_to_dict(self, long_tag_name, new_entry, key_class): def _create_tag_entry(self, long_tag_name, key_class): section = self._sections[key_class] return section._create_tag_entry(long_tag_name) + + class _TagIdentifyError(Exception): + """Used internally to note when a tag cannot be identified.""" + def __init__(self, issue): + self.issue = issue diff --git a/hed/schema/schema_attribute_validators.py b/hed/schema/schema_attribute_validators.py new file mode 100644 index 000000000..2fa23d1db --- /dev/null +++ b/hed/schema/schema_attribute_validators.py @@ -0,0 +1,81 @@ +"""The built-in functions to validate known attributes. + +Template for the functions: +attribute_checker_template(hed_schema, tag_entry, attribute_name, possible_values): + hed_schema (HedSchema): The schema to use for validation + tag_entry (HedSchemaEntry): The schema entry for this tag. + attribute_name (str): The name of this attribute +Returns: + bool +""" + +from hed.errors.error_types import SchemaWarnings, ValidationErrors +from hed.errors.error_reporter import ErrorHandler +from hed.schema.hed_schema import HedSchema + + +def tag_is_placeholder_check(hed_schema, tag_entry, attribute_name): + """ Check if comma separated list has valid HedTags. + + Parameters: + hed_schema (HedSchema): The schema to use for validation + tag_entry (HedSchemaEntry): The schema entry for this tag. + attribute_name (str): The name of this attribute + + Returns: + list: A list of issues. Each issue is a dictionary. + + """ + issues = [] + if not tag_entry.name.endswith("/#"): + issues += ErrorHandler.format_error(SchemaWarnings.NON_PLACEHOLDER_HAS_CLASS, tag_entry.name, + attribute_name) + + return issues + + +def tag_exists_check(hed_schema, tag_entry, attribute_name): + """ Check if the list of possible tags exists in the schema. + + Parameters: + hed_schema (HedSchema): The schema to use for validation + tag_entry (HedSchemaEntry): The schema entry for this tag. + attribute_name (str): The name of this attribute + + Returns: + list: A list of issues. Each issue is a dictionary. + + """ + issues = [] + possible_tags = tag_entry.attributes.get(attribute_name, "") + split_tags = possible_tags.split(",") + for org_tag in split_tags: + if org_tag and org_tag not in hed_schema.all_tags: + issues += ErrorHandler.format_error(ValidationErrors.NO_VALID_TAG_FOUND, + org_tag, + index_in_tag=0, + index_in_tag_end=len(org_tag)) + + return issues + + +def tag_exists_base_schema_check(hed_schema, tag_entry, attribute_name): + """ Check if the single tag is a partnered schema tag + + Parameters: + hed_schema (HedSchema): The schema to use for validation + tag_entry (HedSchemaEntry): The schema entry for this tag. + attribute_name (str): The name of this attribute + + Returns: + list: A list of issues. Each issue is a dictionary. + """ + issues = [] + rooted_tag = tag_entry.attributes.get(attribute_name, "") + if rooted_tag and rooted_tag not in hed_schema.all_tags: + issues += ErrorHandler.format_error(ValidationErrors.NO_VALID_TAG_FOUND, + rooted_tag, + index_in_tag=0, + index_in_tag_end=len(rooted_tag)) + + return issues \ No newline at end of file diff --git a/hed/schema/schema_compliance.py b/hed/schema/schema_compliance.py index 9f372cdb5..20db73376 100644 --- a/hed/schema/schema_compliance.py +++ b/hed/schema/schema_compliance.py @@ -1,12 +1,10 @@ """ Utilities for HED schema checking. """ -from hed.errors import error_reporter -from hed.errors.error_types import SchemaWarnings, ErrorContext, SchemaErrors, ErrorSeverity, ValidationErrors +from hed.errors.error_types import ErrorContext, SchemaErrors, ErrorSeverity from hed.errors.error_reporter import ErrorHandler from hed.schema.hed_schema import HedSchema, HedKey - -ALLOWED_TAG_CHARS = "-" -ALLOWED_DESC_CHARS = "-_:;,./()+ ^" +from hed.schema import schema_attribute_validators +from hed.schema.schema_validation_util import validate_schema_term, validate_schema_description def check_compliance(hed_schema, check_for_warnings=True, name=None, error_handler=None): @@ -27,192 +25,92 @@ def check_compliance(hed_schema, check_for_warnings=True, name=None, error_handl if not isinstance(hed_schema, HedSchema): raise ValueError("To check compliance of a HedGroupSchema, call self.check_compliance on the schema itself.") - if error_handler is None: - error_handler = error_reporter.ErrorHandler() + error_handler = error_handler if error_handler else ErrorHandler(check_for_warnings) + validator = SchemaValidator(hed_schema, check_for_warnings, error_handler) issues_list = [] if not name: name = hed_schema.filename error_handler.push_error_context(ErrorContext.FILE_NAME, name) - unknown_attributes = hed_schema.get_unknown_attributes() - if unknown_attributes: - for attribute_name, source_tags in unknown_attributes.items(): - for tag in source_tags: - issues_list += error_handler.format_error_with_context(SchemaErrors.SCHEMA_ATTRIBUTE_INVALID, - attribute_name, - source_tag=tag) - - schema_attribute_validators = { - HedKey.SuggestedTag: tag_exists_check, - HedKey.RelatedTag: tag_exists_check, - HedKey.UnitClass: tag_is_placeholder_check, - HedKey.ValueClass: tag_is_placeholder_check, - HedKey.Rooted: tag_exists_base_schema_check, - } - - # Check attributes - for section_key in hed_schema._sections: - error_handler.push_error_context(ErrorContext.SCHEMA_SECTION, section_key) - # Check attributes - for tag_entry in hed_schema[section_key].values(): - error_handler.push_error_context(ErrorContext.SCHEMA_TAG, tag_entry.name) - for attribute_name in tag_entry.attributes: - validator = schema_attribute_validators.get(attribute_name) - if validator: - error_handler.push_error_context(ErrorContext.SCHEMA_ATTRIBUTE, attribute_name) - new_issues = validator(hed_schema, tag_entry, attribute_name) - # if force_issues_as_warnings: - for issue in new_issues: - issue['severity'] = ErrorSeverity.WARNING - error_handler.add_context_and_filter(new_issues) - issues_list += new_issues - error_handler.pop_error_context() - error_handler.pop_error_context() - - # Check duplicate names - for name, duplicate_entries in hed_schema[section_key].duplicate_names.items(): - values = set(entry.has_attribute(HedKey.InLibrary) for entry in duplicate_entries) - error_code = SchemaErrors.HED_SCHEMA_DUPLICATE_NODE - if len(values) == 2: - error_code = SchemaErrors.HED_SCHEMA_DUPLICATE_FROM_LIBRARY - issues_list += error_handler.format_error_with_context(error_code, name, - duplicate_tag_list=[entry.name for entry in duplicate_entries], - section=section_key) - - error_handler.pop_error_context() - - if check_for_warnings: - hed_terms = hed_schema.get_all_schema_tags(True) - for hed_term in hed_terms: - issues_list += validate_schema_term(hed_term) - - for tag_name, desc in hed_schema.get_desc_iter(): - issues_list += validate_schema_description(tag_name, desc) + issues_list += validator.check_unknown_attributes() + issues_list += validator.check_attributes() + issues_list += validator.check_duplicate_names() + issues_list += validator.check_invalid_chars() error_handler.pop_error_context() return issues_list -# attribute_checker_template(hed_schema, tag_entry, attribute_name, possible_values): -# hed_schema (HedSchema): The schema to use for validation -# tag_entry (HedSchemaEntry): The schema entry for this tag. -# attribute_name (str): The name of this attribute - - -def tag_is_placeholder_check(hed_schema, tag_entry, attribute_name): - """ Check if comma separated list has valid HedTags. - - Parameters: - hed_schema (HedSchema): The schema to use for validation - tag_entry (HedSchemaEntry): The schema entry for this tag. - attribute_name (str): The name of this attribute - - Returns: - list: A list of issues. Each issue is a dictionary. - - """ - issues = [] - if not tag_entry.name.endswith("/#"): - issues += ErrorHandler.format_error(SchemaWarnings.NON_PLACEHOLDER_HAS_CLASS, tag_entry.name, - attribute_name) - - return issues - - -def tag_exists_check(hed_schema, tag_entry, attribute_name): - """ Check if the list of possible tags exists in the schema. - - Parameters: - hed_schema (HedSchema): The schema to use for validation - tag_entry (HedSchemaEntry): The schema entry for this tag. - attribute_name (str): The name of this attribute - - Returns: - list: A list of issues. Each issue is a dictionary. - - """ - issues = [] - possible_tags = tag_entry.attributes.get(attribute_name, "") - split_tags = possible_tags.split(",") - for org_tag in split_tags: - if org_tag and org_tag not in hed_schema.all_tags: - issues += ErrorHandler.format_error(ValidationErrors.NO_VALID_TAG_FOUND, - org_tag, - index_in_tag=0, - index_in_tag_end=len(org_tag)) - - return issues - - -def tag_exists_base_schema_check(hed_schema, tag_entry, attribute_name): - """ Check if the single tag is a partnered schema tag - - Parameters: - hed_schema (HedSchema): The schema to use for validation - tag_entry (HedSchemaEntry): The schema entry for this tag. - attribute_name (str): The name of this attribute - - Returns: - list: A list of issues. Each issue is a dictionary. - """ - issues = [] - rooted_tag = tag_entry.attributes.get(attribute_name, "") - if rooted_tag and rooted_tag not in hed_schema.all_tags: - issues += ErrorHandler.format_error(ValidationErrors.NO_VALID_TAG_FOUND, - rooted_tag, - index_in_tag=0, - index_in_tag_end=len(rooted_tag)) - - return issues - -def validate_schema_term(hed_term): - """ Check short tag for capitalization and illegal characters. - - Parameters: - hed_term (str): A single hed term. - - Returns: - list: A list of all formatting issues found in the term. Each issue is a dictionary. - - """ - issues_list = [] - # Any # terms will have already been validated as the previous entry. - if hed_term == "#": +class SchemaValidator: + """Validator class to wrap some code. In general, just call check_compliance.""" + attribute_validators = { + HedKey.SuggestedTag: schema_attribute_validators.tag_exists_check, + HedKey.RelatedTag: schema_attribute_validators.tag_exists_check, + HedKey.UnitClass: schema_attribute_validators.tag_is_placeholder_check, + HedKey.ValueClass: schema_attribute_validators.tag_is_placeholder_check, + HedKey.Rooted: schema_attribute_validators.tag_exists_base_schema_check, + } + def __init__(self, hed_schema, check_for_warnings=True, error_handler=None): + self.hed_schema = hed_schema + self._check_for_warnings = check_for_warnings + self.error_handler = error_handler + + def check_unknown_attributes(self): + """Returns issues for any unknown attributes in any section""" + unknown_attributes = self.hed_schema.get_unknown_attributes() + issues_list = [] + if unknown_attributes: + for attribute_name, source_tags in unknown_attributes.items(): + for tag in source_tags: + issues_list += self.error_handler.format_error_with_context(SchemaErrors.SCHEMA_ATTRIBUTE_INVALID, + attribute_name, + source_tag=tag) return issues_list - for i, char in enumerate(hed_term): - if i == 0 and not (char.isdigit() or char.isupper()): - issues_list += ErrorHandler.format_error(SchemaWarnings.INVALID_CAPITALIZATION, - hed_term, char_index=i, problem_char=char) - continue - if char in ALLOWED_TAG_CHARS or char.isalnum(): - continue - issues_list += ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_TAG, - hed_term, char_index=i, problem_char=char) - return issues_list - - -def validate_schema_description(tag_name, hed_description): - """ Check the description of a single schema term. + def check_attributes(self): + """Returns issues from validating known attributes in all sections""" + issues_list = [] + for section_key in self.hed_schema._sections: + self.error_handler.push_error_context(ErrorContext.SCHEMA_SECTION, section_key) + for tag_entry in self.hed_schema[section_key].values(): + self.error_handler.push_error_context(ErrorContext.SCHEMA_TAG, tag_entry.name) + for attribute_name in tag_entry.attributes: + validator = self.attribute_validators.get(attribute_name) + if validator: + self.error_handler.push_error_context(ErrorContext.SCHEMA_ATTRIBUTE, attribute_name) + new_issues = validator(self.hed_schema, tag_entry, attribute_name) + for issue in new_issues: + issue['severity'] = ErrorSeverity.WARNING + self.error_handler.add_context_and_filter(new_issues) + issues_list += new_issues + self.error_handler.pop_error_context() + self.error_handler.pop_error_context() + self.error_handler.pop_error_context() + return issues_list - Parameters: - tag_name (str): A single hed tag - not validated here, just used for error messages. - hed_description (str): The description string to validate. + def check_duplicate_names(self): + """Return issues for any duplicate names in all sections.""" + issues_list = [] + for section_key in self.hed_schema._sections: + for name, duplicate_entries in self.hed_schema[section_key].duplicate_names.items(): + values = set(entry.has_attribute(HedKey.InLibrary) for entry in duplicate_entries) + error_code = SchemaErrors.HED_SCHEMA_DUPLICATE_NODE + if len(values) == 2: + error_code = SchemaErrors.HED_SCHEMA_DUPLICATE_FROM_LIBRARY + issues_list += self.error_handler.format_error_with_context(error_code, name, + duplicate_tag_list=[entry.name for entry in duplicate_entries], + section=section_key) + return issues_list - Returns: - list: A list of all formatting issues found in the description. + def check_invalid_chars(self): + """Returns issues for bad chars in terms or descriptions.""" + issues_list = [] + if self._check_for_warnings: + hed_terms = self.hed_schema.get_all_schema_tags(True) + for hed_term in hed_terms: + issues_list += validate_schema_term(hed_term) - """ - issues_list = [] - # Blank description is fine - if not hed_description: + for tag_name, desc in self.hed_schema.get_desc_iter(): + issues_list += validate_schema_description(tag_name, desc) return issues_list - for i, char in enumerate(hed_description): - if char.isalnum(): - continue - if char in ALLOWED_DESC_CHARS: - continue - issues_list += ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, - hed_description, tag_name, char_index=i, problem_char=char) - return issues_list diff --git a/hed/schema/schema_io/wiki2schema.py b/hed/schema/schema_io/wiki2schema.py index ff29f17ee..645c412e1 100644 --- a/hed/schema/schema_io/wiki2schema.py +++ b/hed/schema/schema_io/wiki2schema.py @@ -155,55 +155,60 @@ def _read_wiki(self, wiki_lines): msg = f"Required section separator '{SectionNames[section]}' not found in file" raise HedFileError(error_code, msg, filename=self.filename) + def _check_for_new_section(self, line, strings_for_section, current_section): + new_section = None + for key, section_string in SectionStarts.items(): + if line.startswith(section_string): + if key in strings_for_section: + msg = f"Found section {SectionNames[key]} twice" + raise HedFileError(HedExceptions.INVALID_SECTION_SEPARATOR, + msg, filename=self.filename) + if current_section < key: + new_section = key + else: + error_code = HedExceptions.INVALID_SECTION_SEPARATOR + if key in ErrorsBySection: + error_code = ErrorsBySection[key] + msg = f"Found section {SectionNames[key]} out of order in file" + raise HedFileError(error_code, msg, filename=self.filename) + break + return new_section + + def _handle_bad_section_sep(self, line, current_section): + if current_section != HedWikiSection.Schema and line.startswith(wiki_constants.ROOT_TAG): + msg = f"Invalid section separator '{line.strip()}'" + raise HedFileError(HedExceptions.INVALID_SECTION_SEPARATOR, msg, filename=self.filename) + + if line.startswith("!#"): + msg = f"Invalid section separator '{line.strip()}'" + raise HedFileError(HedExceptions.INVALID_SECTION_SEPARATOR, msg, filename=self.filename) + def _split_lines_into_sections(self, wiki_lines): - """ - Takes a list of lines, and splits it into valid wiki sections. + """ Takes a list of lines, and splits it into valid wiki sections. - Parameters - ---------- - wiki_lines : [str] + Parameters: + wiki_lines : [str] - Returns - ------- - sections: {str: [str]} + Returns: + sections: {str: [str]} A list of lines for each section of the schema(not including the identifying section line) """ - # We start having found the header and may still be in it current_section = HedWikiSection.HeaderLine - found_section = True strings_for_section = {} + strings_for_section[HedWikiSection.HeaderLine] = [] for line_number, line in enumerate(wiki_lines): - for key, section_string in SectionStarts.items(): - if line.startswith(section_string): - if key in strings_for_section: - msg = f"Found section {SectionNames[key]} twice" - raise HedFileError(HedExceptions.INVALID_SECTION_SEPARATOR, - msg, filename=self.filename) - - if current_section < key: - current_section = key - found_section = True - break - else: - error_code = HedExceptions.INVALID_SECTION_SEPARATOR - if key in ErrorsBySection: - error_code = ErrorsBySection[key] - msg = f"Found section {SectionNames[key]} out of order in file" - raise HedFileError(error_code, msg, filename=self.filename) - - if found_section: - strings_for_section[current_section] = [] - found_section = False + # Header is handled earlier + if line_number == 0: continue - if (current_section != HedWikiSection.Schema and line.startswith(wiki_constants.ROOT_TAG) and - not (line.startswith(wiki_constants.OLD_SYNTAX_SECTION_NAME) and not self._schema.is_hed3_schema)): - msg = f"Invalid section separator '{line.strip()}'" - raise HedFileError(HedExceptions.INVALID_SECTION_SEPARATOR, msg, filename=self.filename) + new_section = self._check_for_new_section(line, strings_for_section, current_section) + + if new_section: + strings_for_section[new_section] = [] + current_section = new_section + continue - if line.startswith("!#"): - msg = f"Invalid section separator '{line.strip()}'" - raise HedFileError(HedExceptions.INVALID_SECTION_SEPARATOR, msg, filename=self.filename) + self._handle_bad_section_sep(line, current_section) if current_section == HedWikiSection.Prologue or current_section == HedWikiSection.Epilogue: strings_for_section[current_section].append((line_number + 1, line)) diff --git a/hed/schema/schema_io/wiki_constants.py b/hed/schema/schema_io/wiki_constants.py index 131000e62..2f7020654 100644 --- a/hed/schema/schema_io/wiki_constants.py +++ b/hed/schema/schema_io/wiki_constants.py @@ -12,7 +12,6 @@ VALUE_CLASS_STRING = "'''Value classes'''" PROLOGUE_SECTION_ELEMENT = "'''Prologue'''" EPILOGUE_SECTION_ELEMENT = "'''Epilogue'''" -OLD_SYNTAX_SECTION_NAME = "'''Syntax'''" wiki_section_headers = { HedSectionKey.AllTags: START_HED_STRING, diff --git a/hed/schema/schema_validation_util.py b/hed/schema/schema_validation_util.py index 17052a4d1..e08e194a0 100644 --- a/hed/schema/schema_validation_util.py +++ b/hed/schema/schema_validation_util.py @@ -1,8 +1,13 @@ -"""Utilities used in HED validation using a HED schema.""" +"""Utilities used in HED validation/loading using a HED schema.""" from semantic_version import Version + +from hed.errors import ErrorHandler, SchemaWarnings from hed.schema import hed_schema_constants as constants from hed.errors.exceptions import HedExceptions, HedFileError +ALLOWED_TAG_CHARS = "-" +ALLOWED_DESC_CHARS = "-_:;,./()+ ^" + def validate_library_name(library_name): """ Check the validity of the library name. @@ -57,7 +62,7 @@ def is_hed3_version_number(version_string): return False -attribute_validators = { +header_attribute_validators = { constants.VERSION_ATTRIBUTE: (validate_version_string, HedExceptions.HED_SCHEMA_VERSION_INVALID), constants.LIBRARY_ATTRIBUTE: (validate_library_name, HedExceptions.BAD_HED_LIBRARY_NAME) } @@ -100,8 +105,8 @@ def validate_attributes(attrib_dict, filename): validate_present_attributes(attrib_dict, filename) for attribute_name, attribute_value in attrib_dict.items(): - if attribute_name in attribute_validators: - validator, error_code = attribute_validators[attribute_name] + if attribute_name in header_attribute_validators: + validator, error_code = header_attribute_validators[attribute_name] had_error = validator(attribute_value) if had_error: raise HedFileError(error_code, had_error, filename) @@ -163,3 +168,55 @@ def find_rooted_entry(tag_entry, schema, loading_merged): return None return rooted_entry + + +def validate_schema_term(hed_term): + """ Check short tag for capitalization and illegal characters. + + Parameters: + hed_term (str): A single hed term. + + Returns: + list: A list of all formatting issues found in the term. Each issue is a dictionary. + + """ + issues_list = [] + # Any # terms will have already been validated as the previous entry. + if hed_term == "#": + return issues_list + + for i, char in enumerate(hed_term): + if i == 0 and not (char.isdigit() or char.isupper()): + issues_list += ErrorHandler.format_error(SchemaWarnings.INVALID_CAPITALIZATION, + hed_term, char_index=i, problem_char=char) + continue + if char in ALLOWED_TAG_CHARS or char.isalnum(): + continue + issues_list += ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_TAG, + hed_term, char_index=i, problem_char=char) + return issues_list + + +def validate_schema_description(tag_name, hed_description): + """ Check the description of a single schema term. + + Parameters: + tag_name (str): A single hed tag - not validated here, just used for error messages. + hed_description (str): The description string to validate. + + Returns: + list: A list of all formatting issues found in the description. + + """ + issues_list = [] + # Blank description is fine + if not hed_description: + return issues_list + for i, char in enumerate(hed_description): + if char.isalnum(): + continue + if char in ALLOWED_DESC_CHARS: + continue + issues_list += ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, + hed_description, tag_name, char_index=i, problem_char=char) + return issues_list diff --git a/hed/tools/analysis/tabular_column_name_summary.py b/hed/tools/analysis/column_name_summary.py similarity index 95% rename from hed/tools/analysis/tabular_column_name_summary.py rename to hed/tools/analysis/column_name_summary.py index cd42651ae..5c7a710c9 100644 --- a/hed/tools/analysis/tabular_column_name_summary.py +++ b/hed/tools/analysis/column_name_summary.py @@ -3,7 +3,7 @@ import json -class TabularColumnNameSummary: +class ColumnNameSummary: def __init__(self, name=''): self.name = name self.file_dict = {} diff --git a/hed/tools/analysis/tabular_summary.py b/hed/tools/analysis/tabular_summary.py index d9fd79702..1262f368b 100644 --- a/hed/tools/analysis/tabular_summary.py +++ b/hed/tools/analysis/tabular_summary.py @@ -81,8 +81,9 @@ def get_summary(self, as_json=False): value_cols = {} for key in sorted_cols: value_cols[key] = self.value_info[key] - summary = {"Summary name": self.name, "Total events": self.total_events, "Total files": self.total_files, - "Categorical columns": categorical_cols, "Value columns": value_cols} + summary = {"Name": self.name, "Total events": self.total_events, "Total files": self.total_files, + "Categorical columns": categorical_cols, "Value columns": value_cols, + "Skip columns": self.skip_cols, "Files": self.files} if as_json: return json.dumps(summary, indent=4) else: @@ -215,6 +216,30 @@ def _update_dict_value(self, col_dict): self.value_info[col] = [self.value_info[col][0] + col_dict.value_info[col][0], self.value_info[col][1] + col_dict.value_info[col][1]] + @staticmethod + def extract_summary(summary_info): + """ Create a TabularSummary object from a serialized summary + + Parameters: + summary_info (dict or str): A JSON string or a dictionary containing contents of a TabularSummary. + + Returns: + TabularSummary: contains the information in summary_info as a TabularSummary object. + """ + + if isinstance(summary_info, str): + summary_info = json.loads(summary_info) + new_tab = TabularSummary(value_cols=summary_info.get('Value columns', {}).keys(), + skip_cols=summary_info.get('Skip columns', []), + name=summary_info.get('Summary name', '')) + new_tab.value_info = summary_info.get('Value_columns', {}) + new_tab.total_files = summary_info.get('Total files', 0) + new_tab.total_events = summary_info.get('Total events', 0) + new_tab.skip_cols = summary_info.get('Skip columns', []) + new_tab.categorical_info = summary_info.get('Categorical columns', {}) + new_tab.files = summary_info.get('Files', {}) + return new_tab + @staticmethod def get_columns_info(dataframe, skip_cols=None): """ Extract unique value counts for columns. diff --git a/hed/tools/remodeling/operations/summarize_column_names_op.py b/hed/tools/remodeling/operations/summarize_column_names_op.py index ed6082a45..2201827f9 100644 --- a/hed/tools/remodeling/operations/summarize_column_names_op.py +++ b/hed/tools/remodeling/operations/summarize_column_names_op.py @@ -1,6 +1,6 @@ """ Summarize the column names in a collection of tabular files. """ -from hed.tools.analysis.tabular_column_name_summary import TabularColumnNameSummary +from hed.tools.analysis.column_name_summary import ColumnNameSummary from hed.tools.remodeling.operations.base_op import BaseOp from hed.tools.remodeling.operations.base_summary import BaseSummary @@ -67,13 +67,13 @@ def do_op(self, dispatcher, df, name, sidecar=None): df_new = df.copy() summary = dispatcher.summary_dicts.get(self.summary_name, None) if not summary: - summary = ColumnNameSummary(self) + summary = ColumnNamesSummary(self) dispatcher.summary_dicts[self.summary_name] = summary summary.update_summary({"name": name, "column_names": list(df_new.columns)}) return df_new -class ColumnNameSummary(BaseSummary): +class ColumnNamesSummary(BaseSummary): def __init__(self, sum_op): super().__init__(sum_op) @@ -85,35 +85,39 @@ def update_summary(self, new_info): new_info (dict): A dictionary with the parameters needed to update a summary. Notes: - - The summary information is kept in separate TabularColumnNameSummary objects for each file. + - The summary information is kept in separate ColumnNameSummary objects for each file. - The summary needs a "name" str and a "column_names" list. - - The summary uses TabularColumnNameSummary as the summary object. + - The summary uses ColumnNameSummary as the summary object. """ name = new_info['name'] if name not in self.summary_dict: - self.summary_dict[name] = TabularColumnNameSummary(name=name) + self.summary_dict[name] = ColumnNameSummary(name=name) self.summary_dict[name].update(name, new_info["column_names"]) def get_details_dict(self, column_summary): """ Return the summary dictionary extracted from a ColumnNameSummary. Parameters: - column_summary (TabularColumnNameSummary): A column name summary for the data file. + column_summary (ColumnNameSummary): A column name summary for the data file. Returns: dict - a dictionary with the summary information for column names. """ - return column_summary.get_summary() + summary = column_summary.get_summary() + return {"Name": summary['Summary name'], "Total events": "n/a", + "Total files": summary['Number files'], + "Files": [name for name in column_summary.file_dict.keys()], + "Columns": summary['Columns']} def merge_all_info(self): - """ Create a TabularColumnNameSummary containing the overall dataset summary. + """ Create a ColumnNameSummary containing the overall dataset summary. Returns: - TabularColumnNameSummary - the overall summary object for column names. + ColumnNameSummary - the overall summary object for column names. """ - all_sum = TabularColumnNameSummary(name='Dataset') + all_sum = ColumnNameSummary(name='Dataset') for key, counts in self.summary_dict.items(): for name, pos in counts.file_dict.items(): all_sum.update(name, counts.unique_headers[pos]) @@ -152,7 +156,7 @@ def _get_dataset_string(result, indent=BaseSummary.DISPLAY_INDENT): """ sum_list = [f"Dataset: Number of files={result.get('Number files', 0)}"] - for element in result.get("Columns", []): + for element in result.get("Unique headers", []): sum_list.append(f"{indent}Columns: {str(element['Column names'])}") for file in element.get("Files", []): sum_list.append(f"{indent}{indent}{file}") diff --git a/hed/tools/remodeling/operations/summarize_column_values_op.py b/hed/tools/remodeling/operations/summarize_column_values_op.py index dc13790c7..0c80c6382 100644 --- a/hed/tools/remodeling/operations/summarize_column_values_op.py +++ b/hed/tools/remodeling/operations/summarize_column_values_op.py @@ -138,7 +138,7 @@ def merge_all_info(self): """ all_sum = TabularSummary(value_cols=self.op.value_columns, skip_cols=self.op.skip_columns, name='Dataset') - for key, counts in self.summary_dict.items(): + for counts in self.summary_dict.values(): all_sum.update_summary(counts) return all_sum diff --git a/hed/tools/remodeling/operations/summarize_hed_tags_op.py b/hed/tools/remodeling/operations/summarize_hed_tags_op.py index d74d87de6..5a504fed1 100644 --- a/hed/tools/remodeling/operations/summarize_hed_tags_op.py +++ b/hed/tools/remodeling/operations/summarize_hed_tags_op.py @@ -133,9 +133,10 @@ def get_details_dict(self, merge_counts): for key, key_list in self.tags.items(): details[key] = self._get_details(key_list, template, verbose=True) leftovers = [value.get_info(verbose=True) for value in unmatched] - return {"name": merge_counts.name, "total_events": merge_counts.total_events, - "files": [name for name in merge_counts.files.keys()], - "Main tags": details, "Other tags": leftovers} + return {"Name": merge_counts.name, "Total events": merge_counts.total_events, + "Total files": len(merge_counts.files.keys()), + "Files": [name for name in merge_counts.files.keys()], + "Specifics": {"Main tags": details, "Other tags": leftovers}} def _get_result_string(self, name, result, indent=BaseSummary.DISPLAY_INDENT): """ Return a formatted string with the summary for the indicated name. @@ -185,8 +186,8 @@ def _get_dataset_string(result, indent=BaseSummary.DISPLAY_INDENT): str: Formatted string suitable for saving in a file or printing. """ - sum_list = [f"Dataset: Total events={result.get('total_events', 0)} " - f"Total files={len(result.get('files', []))}"] + sum_list = [f"Dataset: Total events={result.get('Total events', 0)} " + f"Total files={len(result.get('Files', 0))}"] sum_list = sum_list + HedTagSummary._get_tag_list(result, indent=indent) return "\n".join(sum_list) @@ -202,7 +203,7 @@ def _get_individual_string(result, indent=BaseSummary.DISPLAY_INDENT): str: Formatted string suitable for saving in a file or printing. """ - sum_list = [f"Total events={result.get('total_events', 0)}"] + sum_list = [f"Total events={result.get('Total events', 0)}"] sum_list = sum_list + HedTagSummary._get_tag_list(result, indent=indent) return "\n".join(sum_list) @@ -214,7 +215,8 @@ def _tag_details(tags): return tag_list @staticmethod - def _get_tag_list(tag_info, indent=BaseSummary.DISPLAY_INDENT): + def _get_tag_list(result, indent=BaseSummary.DISPLAY_INDENT): + tag_info = result["Specifics"] sum_list = [f"\n{indent}Main tags[events,files]:"] for category, tags in tag_info['Main tags'].items(): sum_list.append(f"{indent}{indent}{category}:") diff --git a/hed/tools/visualizations/tag_word_cloud.py b/hed/tools/visualizations/tag_word_cloud.py new file mode 100644 index 000000000..2f2c25236 --- /dev/null +++ b/hed/tools/visualizations/tag_word_cloud.py @@ -0,0 +1,46 @@ +from wordcloud import WordCloud + + +def create_wordcloud(word_dict, width=400, height=200): + """Takes a word dict and returns a generated word cloud object + + Parameters: + word_dict(dict): words and their frequencies + width(int): width in pixels + height(int): height in pixels + Returns: + word_cloud(WordCloud): The generated cloud. + Use .to_file to save it out as an image. + + :raises ValueError: + An empty dictionary was passed + """ + wc = WordCloud(background_color='white', width=width, height=height) + + wc.generate_from_frequencies(word_dict) + + return wc + + +def summary_to_dict(summary): + """Converts a HedTagSummary json dict into the word cloud input format + + Parameters: + summary(dict): The summary from a summarize hed tags op + + Returns: + word_dict(dict): a dict of the words and their occurrence count + + :raises KeyError: + A malformed dictionary was passed + + """ + overall_summary = summary.get("Overall summary", {}) + specifics = overall_summary.get("Specifics", {}) + tag_dict = specifics.get("Main tags", {}) + word_dict = {} + for tag_sub_list in tag_dict.values(): + for tag_sub_dict in tag_sub_list: + word_dict[tag_sub_dict['tag']] = tag_sub_dict['events'] + + return word_dict diff --git a/hed/validator/def_validator.py b/hed/validator/def_validator.py index c8b0c23ad..8036c6e13 100644 --- a/hed/validator/def_validator.py +++ b/hed/validator/def_validator.py @@ -38,16 +38,68 @@ def validate_def_tags(self, hed_string_obj, tag_validator=None): return def_issues + @staticmethod + def _validate_def_units(def_tag, placeholder_tag, tag_validator, is_def_expand_tag): + """Validate units and value classes on def/def-expand tags + + Parameters: + def_tag(HedTag): The source tag + placeholder_tag(HedTag): The placeholder tag this def fills in + tag_validator(TagValidator): Used to validate the units/values + is_def_expand_tag(bool): If the given def_tag is a def-expand tag or not. + + Returns: + issues(list): Issues found from validating placeholders. + """ + def_issues = [] + error_code = ValidationErrors.DEF_INVALID + if is_def_expand_tag: + error_code = ValidationErrors.DEF_EXPAND_INVALID + if placeholder_tag.is_unit_class_tag(): + def_issues += tag_validator.check_tag_unit_class_units_are_valid(placeholder_tag, + report_as=def_tag, + error_code=error_code) + elif placeholder_tag.is_value_class_tag(): + def_issues += tag_validator.check_tag_value_class_valid(placeholder_tag, + report_as=def_tag, + error_code=error_code) + return def_issues + + @staticmethod + def _report_missing_or_invalid_value(def_tag, def_entry, is_def_expand_tag): + """Returns the correct error for this type of def tag + + Parameters: + def_tag(HedTag): The source tag + def_entry(DefinitionEntry): The entry for this definition + is_def_expand_tag(bool): If the given def_tag is a def-expand tag or not. + + Returns: + issues(list): Issues found from validating placeholders. + """ + def_issues = [] + if def_entry.takes_value: + error_code = ValidationErrors.HED_DEF_VALUE_MISSING + if is_def_expand_tag: + error_code = ValidationErrors.HED_DEF_EXPAND_VALUE_MISSING + else: + error_code = ValidationErrors.HED_DEF_VALUE_EXTRA + if is_def_expand_tag: + error_code = ValidationErrors.HED_DEF_EXPAND_VALUE_EXTRA + def_issues += ErrorHandler.format_error(error_code, tag=def_tag) + return def_issues + def _validate_def_contents(self, def_tag, def_expand_group, tag_validator): """ Check for issues with expanding a tag from Def to a Def-expand tag group Parameters: def_tag (HedTag): Source hed tag that may be a Def or Def-expand tag. - def_expand_group (HedGroup or HedTag): - Source group for this def-expand tag. Same as def_tag if this is not a def-expand tag. + def_expand_group (HedGroup or HedTag): Source group for this def-expand tag. + Same as def_tag if this is not a def-expand tag. tag_validator (TagValidator): Used to validate the placeholder replacement. + Returns: - issues + issues(list): Issues found from validating placeholders. """ def_issues = [] is_def_expand_tag = def_expand_group != def_tag @@ -75,27 +127,9 @@ def _validate_def_contents(self, def_tag, def_expand_group, tag_validator): found_def=def_expand_group) if def_entry.takes_value and tag_validator: placeholder_tag = def_contents.get_first_group().find_placeholder_tag() - error_code = ValidationErrors.DEF_INVALID - if is_def_expand_tag: - error_code = ValidationErrors.DEF_EXPAND_INVALID - if placeholder_tag.is_unit_class_tag(): - def_issues += tag_validator.check_tag_unit_class_units_are_valid(placeholder_tag, - report_as=def_tag, - error_code=error_code) - elif placeholder_tag.is_value_class_tag(): - def_issues += tag_validator.check_tag_value_class_valid(placeholder_tag, - report_as=def_tag, - error_code=error_code) - - elif def_entry.takes_value: - error_code = ValidationErrors.HED_DEF_VALUE_MISSING - if is_def_expand_tag: - error_code = ValidationErrors.HED_DEF_EXPAND_VALUE_MISSING - def_issues += ErrorHandler.format_error(error_code, tag=def_tag) + def_issues += self._validate_def_units(def_tag, placeholder_tag, tag_validator, + is_def_expand_tag) else: - error_code = ValidationErrors.HED_DEF_VALUE_EXTRA - if is_def_expand_tag: - error_code = ValidationErrors.HED_DEF_EXPAND_VALUE_EXTRA - def_issues += ErrorHandler.format_error(error_code, tag=def_tag) + def_issues += self._report_missing_or_invalid_value(def_tag, def_entry, is_def_expand_tag) - return def_issues \ No newline at end of file + return def_issues diff --git a/hed/validator/tag_validator.py b/hed/validator/tag_validator.py index 9986c6766..57ca84fe7 100644 --- a/hed/validator/tag_validator.py +++ b/hed/validator/tag_validator.py @@ -291,6 +291,29 @@ def check_tag_exists_in_schema(self, original_tag): index_in_tag_end=None) return validation_issues + def _check_value_class(self, original_tag, stripped_value, report_as, error_code=None): + """Returns any issues found if this is a value tag""" + validation_issues = [] + if original_tag.is_takes_value_tag() and \ + not self._validate_value_class_portion(original_tag, stripped_value): + validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID, report_as) + if error_code: + validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID, + report_as, actual_error=error_code) + return validation_issues + + def _check_units(self, original_tag, bad_units, report_as): + """Returns an issue noting this is either bad units, or missing units""" + if bad_units: + tag_unit_class_units = original_tag.get_tag_unit_class_units() + validation_issue = ErrorHandler.format_error(ValidationErrors.UNITS_INVALID, + tag=report_as, units=tag_unit_class_units) + else: + default_unit = original_tag.get_unit_class_default_unit() + validation_issue = ErrorHandler.format_error(ValidationErrors.UNITS_MISSING, + tag=report_as, default_unit=default_unit) + return validation_issue + def check_tag_unit_class_units_are_valid(self, original_tag, report_as=None, error_code=None): """ Report incorrect unit class or units. @@ -305,36 +328,19 @@ def check_tag_unit_class_units_are_valid(self, original_tag, report_as=None, err if original_tag.is_unit_class_tag(): stripped_value, unit = original_tag.get_stripped_unit_value() if not unit: - bad_units = " " in original_tag.extension - had_error = False # Todo: in theory this should separately validate the number and the units, for units # that are prefixes like $. Right now those are marked as unit invalid AND value_invalid. + bad_units = " " in original_tag.extension + report_as = report_as if report_as else original_tag + if bad_units: stripped_value = stripped_value.split(" ")[0] - if original_tag.is_takes_value_tag() and\ - not self._validate_value_class_portion(original_tag, stripped_value): - validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID, - report_as if report_as else original_tag) - if error_code: - had_error = True - validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID, - report_as if report_as else original_tag, - actual_error=error_code) - if bad_units: - tag_unit_class_units = original_tag.get_tag_unit_class_units() - if tag_unit_class_units: - validation_issues += ErrorHandler.format_error(ValidationErrors.UNITS_INVALID, - tag=report_as if report_as else original_tag, - units=tag_unit_class_units) - else: - default_unit = original_tag.get_unit_class_default_unit() - validation_issues += ErrorHandler.format_error(ValidationErrors.UNITS_MISSING, - tag=report_as if report_as else original_tag, - default_unit=default_unit) + validation_issues += self._check_value_class(original_tag, stripped_value, report_as, error_code) + validation_issues += self._check_units(original_tag, bad_units, report_as) # We don't want to give this overall error twice - if error_code and not had_error: + if error_code and not any(error_code == issue['code'] for issue in validation_issues): new_issue = validation_issues[0].copy() new_issue['code'] = error_code validation_issues += [new_issue] diff --git a/requirements.txt b/requirements.txt index 443e763d2..7dd623faa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ openpyxl>=3.1.0 pandas>=1.3.5 portalocker>=2.7.0 semantic_version>=2.10.0 +wordcloud>=1.9.2 diff --git a/tests/models/test_hed_tag.py b/tests/models/test_hed_tag.py index 9eba272eb..3fc2a74df 100644 --- a/tests/models/test_hed_tag.py +++ b/tests/models/test_hed_tag.py @@ -120,12 +120,16 @@ def test_strip_off_units_from_value(self): volume_string = HedTag('Volume/100 m^3', hed_schema=self.hed_schema) prefixed_volume_string = HedTag('Volume/100 cm^3', hed_schema=self.hed_schema) invalid_volume_string = HedTag('Volume/200 cm', hed_schema=self.hed_schema) + invalid_distance_string = HedTag('Distance/200 M', hed_schema=self.hed_schema) # currency_units = { # 'currency':self.schema.unit_classes['currency'] # } volume_units = { 'volume': self.hed_schema.unit_classes['volumeUnits'] } + distance_units = { + 'distance': self.hed_schema.unit_classes['physicalLengthUnits'] + } # stripped_dollars_string_no_space = dollars_string_no_space._get_tag_units_portion(currency_units) # stripped_dollars_string = dollars_string._get_tag_units_portion(currency_units) # stripped_dollars_string_invalid = dollars_string_invalid._get_tag_units_portion(currency_units) @@ -133,6 +137,7 @@ def test_strip_off_units_from_value(self): stripped_volume_string_no_space, _ = volume_string_no_space._get_tag_units_portion(volume_units) stripped_prefixed_volume_string, _ = prefixed_volume_string._get_tag_units_portion(volume_units) stripped_invalid_volume_string, _ = invalid_volume_string._get_tag_units_portion(volume_units) + stripped_invalid_distance_string, _ = invalid_distance_string._get_tag_units_portion(distance_units) # self.assertEqual(stripped_dollars_string_no_space, None) # self.assertEqual(stripped_dollars_string, '25.99') # self.assertEqual(stripped_dollars_string_invalid, None) @@ -140,6 +145,7 @@ def test_strip_off_units_from_value(self): self.assertEqual(stripped_volume_string_no_space, None) self.assertEqual(stripped_prefixed_volume_string, '100') self.assertEqual(stripped_invalid_volume_string, None) + self.assertEqual(stripped_invalid_distance_string, None) def test_determine_allows_extensions(self): extension_tag1 = HedTag('boat', hed_schema=self.hed_schema) diff --git a/tests/schema/test_schema_attribute_validators.py b/tests/schema/test_schema_attribute_validators.py new file mode 100644 index 000000000..67a25efb1 --- /dev/null +++ b/tests/schema/test_schema_attribute_validators.py @@ -0,0 +1,42 @@ +import unittest +import copy + +from hed.schema import schema_attribute_validators +from hed import schema + + +class Test(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.hed_schema = schema.load_schema_version("8.1.0") + + def test_util_placeholder(self): + tag_entry = self.hed_schema.all_tags["Event"] + attribute_name = "unitClass" + self.assertTrue(schema_attribute_validators.tag_is_placeholder_check(self.hed_schema, tag_entry, attribute_name)) + attribute_name = "unitClass" + tag_entry = self.hed_schema.all_tags["Age/#"] + self.assertFalse(schema_attribute_validators.tag_is_placeholder_check(self.hed_schema, tag_entry, attribute_name)) + + def test_util_suggested(self): + tag_entry = self.hed_schema.all_tags["Event/Sensory-event"] + attribute_name = "suggestedTag" + self.assertFalse(schema_attribute_validators.tag_exists_check(self.hed_schema, tag_entry, attribute_name)) + tag_entry = self.hed_schema.all_tags["Property"] + self.assertFalse(schema_attribute_validators.tag_exists_check(self.hed_schema, tag_entry, attribute_name)) + tag_entry = copy.deepcopy(tag_entry) + tag_entry.attributes["suggestedTag"] = "InvalidSuggestedTag" + self.assertTrue(schema_attribute_validators.tag_exists_check(self.hed_schema, tag_entry, attribute_name)) + + def test_util_rooted(self): + tag_entry = self.hed_schema.all_tags["Event"] + attribute_name = "rooted" + self.assertFalse(schema_attribute_validators.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) + tag_entry = self.hed_schema.all_tags["Property"] + self.assertFalse(schema_attribute_validators.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) + tag_entry = copy.deepcopy(tag_entry) + tag_entry.attributes["rooted"] = "Event" + self.assertFalse(schema_attribute_validators.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) + tag_entry = copy.deepcopy(tag_entry) + tag_entry.attributes["rooted"] = "NotRealTag" + self.assertTrue(schema_attribute_validators.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) \ No newline at end of file diff --git a/tests/schema/test_schema_compliance.py b/tests/schema/test_schema_compliance.py index 1578e57d9..467d34f7a 100644 --- a/tests/schema/test_schema_compliance.py +++ b/tests/schema/test_schema_compliance.py @@ -1,9 +1,9 @@ import unittest import os -import copy -from hed.schema import schema_compliance + + + from hed import schema -from hed.errors import ErrorHandler, SchemaWarnings class Test(unittest.TestCase): @@ -11,16 +11,6 @@ class Test(unittest.TestCase): def setUpClass(cls): cls.hed_schema = schema.load_schema_version("8.1.0") - def validate_term_base(self, input_text, expected_issues): - for text, issues in zip(input_text, expected_issues): - test_issues = schema_compliance.validate_schema_term(text) - self.assertCountEqual(issues, test_issues) - - def validate_desc_base(self, input_descriptions, expected_issues): - for description, issues in zip(input_descriptions, expected_issues): - test_issues = schema_compliance.validate_schema_description("dummy", description) - self.assertCountEqual(issues, test_issues) - def test_validate_schema(self): schema_path_with_issues = '../data/schema_tests/HED8.0.0.mediawiki' schema_path_with_issues = os.path.join(os.path.dirname(os.path.realpath(__file__)), schema_path_with_issues) @@ -29,76 +19,3 @@ def test_validate_schema(self): self.assertTrue(isinstance(issues, list)) self.assertTrue(len(issues) > 1) - def test_validate_schema_term(self): - test_terms = [ - "invalidcaps", - "Validcaps", - "3numberisvalid", - "Invalidchar#", - "@invalidcharatstart", - ] - expected_issues = [ - ErrorHandler.format_error(SchemaWarnings.INVALID_CAPITALIZATION, test_terms[0], char_index=0, - problem_char="i"), - [], - [], - ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_TAG, test_terms[3], char_index=11, - problem_char="#"), - ErrorHandler.format_error(SchemaWarnings.INVALID_CAPITALIZATION, test_terms[4], char_index=0, - problem_char="@"), - ] - self.validate_term_base(test_terms, expected_issues) - - def test_validate_schema_description(self): - test_descs = [ - "This is a tag description with no invalid characters.", - "This is (also) a tag description with no invalid characters. -_:;./()+ ^", - "This description has no invalid characters, as commas are allowed", - "This description has multiple invalid characters at the end @$%*" - ] - expected_issues = [ - [], - [], - [], - ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", - char_index=60, problem_char="@") - + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", - char_index=61, problem_char="$") - + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", - char_index=62, problem_char="%") - + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", - char_index=63, problem_char="*") - - ] - self.validate_desc_base(test_descs, expected_issues) - - def test_util_placeholder(self): - tag_entry = self.hed_schema.all_tags["Event"] - attribute_name = "unitClass" - self.assertTrue(schema_compliance.tag_is_placeholder_check(self.hed_schema, tag_entry, attribute_name)) - attribute_name = "unitClass" - tag_entry = self.hed_schema.all_tags["Age/#"] - self.assertFalse(schema_compliance.tag_is_placeholder_check(self.hed_schema, tag_entry, attribute_name)) - - def test_util_suggested(self): - tag_entry = self.hed_schema.all_tags["Event/Sensory-event"] - attribute_name = "suggestedTag" - self.assertFalse(schema_compliance.tag_exists_check(self.hed_schema, tag_entry, attribute_name)) - tag_entry = self.hed_schema.all_tags["Property"] - self.assertFalse(schema_compliance.tag_exists_check(self.hed_schema, tag_entry, attribute_name)) - tag_entry = copy.deepcopy(tag_entry) - tag_entry.attributes["suggestedTag"] = "InvalidSuggestedTag" - self.assertTrue(schema_compliance.tag_exists_check(self.hed_schema, tag_entry, attribute_name)) - - def test_util_rooted(self): - tag_entry = self.hed_schema.all_tags["Event"] - attribute_name = "rooted" - self.assertFalse(schema_compliance.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) - tag_entry = self.hed_schema.all_tags["Property"] - self.assertFalse(schema_compliance.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) - tag_entry = copy.deepcopy(tag_entry) - tag_entry.attributes["rooted"] = "Event" - self.assertFalse(schema_compliance.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) - tag_entry = copy.deepcopy(tag_entry) - tag_entry.attributes["rooted"] = "NotRealTag" - self.assertTrue(schema_compliance.tag_exists_base_schema_check(self.hed_schema, tag_entry, attribute_name)) \ No newline at end of file diff --git a/tests/schema/test_schema_validation_util.py b/tests/schema/test_schema_validation_util.py new file mode 100644 index 000000000..3c9494aac --- /dev/null +++ b/tests/schema/test_schema_validation_util.py @@ -0,0 +1,63 @@ +import unittest +import hed.schema.schema_validation_util +from hed import schema +from hed.errors import ErrorHandler, SchemaWarnings + + +class Test(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.hed_schema = schema.load_schema_version("8.1.0") + + def validate_term_base(self, input_text, expected_issues): + for text, issues in zip(input_text, expected_issues): + test_issues = hed.schema.schema_validation_util.validate_schema_term(text) + self.assertCountEqual(issues, test_issues) + + def validate_desc_base(self, input_descriptions, expected_issues): + for description, issues in zip(input_descriptions, expected_issues): + test_issues = hed.schema.schema_validation_util.validate_schema_description("dummy", description) + self.assertCountEqual(issues, test_issues) + + def test_validate_schema_term(self): + test_terms = [ + "invalidcaps", + "Validcaps", + "3numberisvalid", + "Invalidchar#", + "@invalidcharatstart", + ] + expected_issues = [ + ErrorHandler.format_error(SchemaWarnings.INVALID_CAPITALIZATION, test_terms[0], char_index=0, + problem_char="i"), + [], + [], + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_TAG, test_terms[3], char_index=11, + problem_char="#"), + ErrorHandler.format_error(SchemaWarnings.INVALID_CAPITALIZATION, test_terms[4], char_index=0, + problem_char="@"), + ] + self.validate_term_base(test_terms, expected_issues) + + def test_validate_schema_description(self): + test_descs = [ + "This is a tag description with no invalid characters.", + "This is (also) a tag description with no invalid characters. -_:;./()+ ^", + "This description has no invalid characters, as commas are allowed", + "This description has multiple invalid characters at the end @$%*" + ] + expected_issues = [ + [], + [], + [], + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", + char_index=60, problem_char="@") + + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", + char_index=61, problem_char="$") + + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", + char_index=62, problem_char="%") + + ErrorHandler.format_error(SchemaWarnings.INVALID_CHARACTERS_IN_DESC, test_descs[3], "dummy", + char_index=63, problem_char="*") + + ] + self.validate_desc_base(test_descs, expected_issues) diff --git a/tests/tools/analysis/test_tabular_column_name_summary.py b/tests/tools/analysis/test_column_name_summary.py similarity index 78% rename from tests/tools/analysis/test_tabular_column_name_summary.py rename to tests/tools/analysis/test_column_name_summary.py index d2825fcb8..31cb551c0 100644 --- a/tests/tools/analysis/test_tabular_column_name_summary.py +++ b/tests/tools/analysis/test_column_name_summary.py @@ -1,6 +1,6 @@ import json import unittest -from hed.tools.analysis.tabular_column_name_summary import TabularColumnNameSummary +from hed.tools.analysis.column_name_summary import ColumnNameSummary class Test(unittest.TestCase): @@ -17,16 +17,16 @@ def tearDownClass(cls): pass def test_constructor(self): - column_summary1 = TabularColumnNameSummary(name='Dataset') - self.assertIsInstance(column_summary1, TabularColumnNameSummary) + column_summary1 = ColumnNameSummary(name='Dataset') + self.assertIsInstance(column_summary1, ColumnNameSummary) self.assertEqual(column_summary1.name, 'Dataset') self.assertFalse(column_summary1.file_dict) self.assertFalse(column_summary1.unique_headers) - column_summary2 = TabularColumnNameSummary() - self.assertIsInstance(column_summary2, TabularColumnNameSummary) + column_summary2 = ColumnNameSummary() + self.assertIsInstance(column_summary2, ColumnNameSummary) def test_update(self): - column_summary = TabularColumnNameSummary() + column_summary = ColumnNameSummary() column_summary.update('run-01', self.columns1) column_summary.update('run-02', self.columns1) self.assertEqual(len(column_summary.unique_headers), 1) @@ -41,7 +41,7 @@ def test_update(self): self.assertEqual(context.exception.args[0], "FileHasChangedColumnNames") def test_update_headers(self): - column_summary = TabularColumnNameSummary() + column_summary = ColumnNameSummary() pos1 = column_summary.update_headers(self.columns1) self.assertEqual(pos1, 0) pos2 = column_summary.update_headers(self.columns1) @@ -50,7 +50,7 @@ def test_update_headers(self): self.assertEqual(pos3, 1) def test_get_summary(self): - column_summary = TabularColumnNameSummary('Dataset') + column_summary = ColumnNameSummary('Dataset') column_summary.update('run-01', self.columns1) column_summary.update('run-02', self.columns1) summary1 = column_summary.get_summary() diff --git a/tests/tools/analysis/test_tabular_summary.py b/tests/tools/analysis/test_tabular_summary.py index 1a35dabec..b983c6f8b 100644 --- a/tests/tools/analysis/test_tabular_summary.py +++ b/tests/tools/analysis/test_tabular_summary.py @@ -32,6 +32,22 @@ def test_constructor(self): self.assertIsInstance(dict2, TabularSummary, "TabularSummary: multiple values are okay in constructor") self.assertEqual(len(dict2.value_info.keys()), 3, "TabularSummary should have keys for each value column") + def test_extract_summary(self): + tab1 = TabularSummary() + stern_df = get_new_dataframe(self.stern_map_path) + tab1.update(stern_df) + sum_info = tab1.get_summary() + new_tab1 = TabularSummary.extract_summary(sum_info) + tab2 = TabularSummary(value_cols=['letter'], skip_cols=['event_type']) + tabular_info = {} + new_tab = TabularSummary.extract_summary(tabular_info) + self.assertIsInstance(new_tab, TabularSummary) + + def test_extract_summary_empty(self): + tabular_info = {} + new_tab = TabularSummary.extract_summary(tabular_info) + self.assertIsInstance(new_tab, TabularSummary) + def test_get_number_unique_values(self): dict1 = TabularSummary() wh_df = get_new_dataframe(self.wh_events_path) @@ -54,7 +70,7 @@ def test_get_summary(self): "TabularSummary categorical_info be columns minus skip and value columns") summary1 = dict1.get_summary(as_json=False) self.assertIsInstance(summary1, dict) - self.assertEqual(len(summary1), 5) + self.assertEqual(len(summary1), 7) summary2 = dict1.get_summary(as_json=True).replace('"', '') self.assertIsInstance(summary2, str) diff --git a/tests/tools/remodeling/operations/test_summarize_column_names_op.py b/tests/tools/remodeling/operations/test_summarize_column_names_op.py index ddd5a8658..2ef5eee27 100644 --- a/tests/tools/remodeling/operations/test_summarize_column_names_op.py +++ b/tests/tools/remodeling/operations/test_summarize_column_names_op.py @@ -2,9 +2,9 @@ import os import pandas as pd import unittest -from hed.tools.analysis.tabular_column_name_summary import TabularColumnNameSummary +# from hed.tools.analysis.column_name_summary import ColumnNameSummary from hed.tools.remodeling.dispatcher import Dispatcher -from hed.tools.remodeling.operations.summarize_column_names_op import ColumnNameSummary, SummarizeColumnNamesOp +from hed.tools.remodeling.operations.summarize_column_names_op import ColumnNamesSummary, SummarizeColumnNamesOp class Test(unittest.TestCase): @@ -77,7 +77,7 @@ def test_summary_op(self): new_summary = json.loads(json_value) self.assertIsInstance(new_summary, dict) merged1 = this_context.merge_all_info() - self.assertIsInstance(merged1, TabularColumnNameSummary) + # self.assertIsInstance(merged1, ColumnNameSummary) self.assertEqual(len(merged1.file_dict), 3) self.assertEqual(len(merged1.unique_headers), 2) with self.assertRaises(ValueError) as except_context: @@ -111,7 +111,7 @@ def test_text_summary(self): self.get_dfs(op, 'run-01', dispatch) self.get_dfs(op, 'run-02', dispatch) context = dispatch.summary_dicts['columns'] - self.assertIsInstance(context, ColumnNameSummary) + # self.assertIsInstance(context, ColumnNameSummary) text_summary1 = context.get_text_summary() self.assertIsInstance(text_summary1, dict) diff --git a/tests/tools/visualizations/test_tag_word_cloud.py b/tests/tools/visualizations/test_tag_word_cloud.py new file mode 100644 index 000000000..fa09e1710 --- /dev/null +++ b/tests/tools/visualizations/test_tag_word_cloud.py @@ -0,0 +1,52 @@ +import unittest +from wordcloud import WordCloud +from hed.tools.visualizations import tag_word_cloud + + +class TestWordCloudFunctions(unittest.TestCase): + + def test_convert_summary_to_word_dict(self): + # Assume we have a valid summary_json + summary_json = { + 'Dataset': { + 'Overall summary': { + 'Main tags': { + 'tag_category_1': [ + {'tag': 'tag1', 'events': 5}, + {'tag': 'tag2', 'events': 3} + ], + 'tag_category_2': [ + {'tag': 'tag3', 'events': 7} + ] + } + } + } + } + expected_output = {'tag1': 5, 'tag2': 3, 'tag3': 7} + + word_dict = tag_word_cloud.summary_to_dict(summary_json) + self.assertEqual(word_dict, expected_output) + + def test_create_wordcloud(self): + word_dict = {'tag1': 5, 'tag2': 3, 'tag3': 7} + width = 400 + height = 200 + wc = tag_word_cloud.create_wordcloud(word_dict, width, height) + + self.assertIsInstance(wc, WordCloud) + self.assertEqual(wc.width, width) + self.assertEqual(wc.height, height) + + def test_create_wordcloud_with_empty_dict(self): + # Test creation of word cloud with an empty dictionary + word_dict = {} + with self.assertRaises(ValueError): + tag_word_cloud.create_wordcloud(word_dict) + + def test_create_wordcloud_with_single_word(self): + # Test creation of word cloud with a single word + word_dict = {'single_word': 1} + wc = tag_word_cloud.create_wordcloud(word_dict) + self.assertIsInstance(wc, WordCloud) + # Check that the single word is in the word cloud + self.assertIn('single_word', wc.words_)