Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions hed/models/base_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ def __init__(self, file, file_type=None, worksheet_name=None, has_column_names=T
# This is the loaded workbook if we loaded originally from an Excel file.
self._loaded_workbook = None
self._worksheet_name = worksheet_name
pandas_header = 0
if not self._has_column_names:
pandas_header = None
self._dataframe = None

input_type = file_type
if isinstance(file, str):
Expand All @@ -67,35 +65,8 @@ def __init__(self, file, file_type=None, worksheet_name=None, has_column_names=T
if self.name is None:
self._name = file

self._dataframe = None

if isinstance(file, pandas.DataFrame):
self._dataframe = file.astype(str)
self._has_column_names = self._dataframe_has_names(self._dataframe)
elif not file:
raise HedFileError(HedExceptions.FILE_NOT_FOUND, "Empty file passed to BaseInput.", file)
elif input_type in self.TEXT_EXTENSION:
try:
self._dataframe = pandas.read_csv(file, delimiter='\t', header=pandas_header,
dtype=str, keep_default_na=True, na_values=["", "null"])
except Exception as e:
raise HedFileError(HedExceptions.INVALID_FILE_FORMAT, str(e), self.name) from e
# Convert nan values to a known value
self._dataframe = self._dataframe.fillna("n/a")
elif input_type in self.EXCEL_EXTENSION:
try:
self._loaded_workbook = openpyxl.load_workbook(file)
loaded_worksheet = self.get_worksheet(self._worksheet_name)
self._dataframe = self._get_dataframe_from_worksheet(loaded_worksheet, has_column_names)
except Exception as e:
raise HedFileError(HedExceptions.GENERIC_ERROR, str(e), self.name) from e
else:
raise HedFileError(HedExceptions.INVALID_EXTENSION, "", file)

if self._dataframe.size == 0:
raise HedFileError(HedExceptions.INVALID_DATAFRAME, "Invalid dataframe(malformed datafile, etc)", file)
self._open_dataframe_file(file, has_column_names, input_type)

# todo: Can we get rid of this behavior now that we're using pandas?
column_issues = ColumnMapper.check_for_blank_names(self.columns, allow_blank_names=allow_blank_names)
if column_issues:
raise HedFileError(HedExceptions.BAD_COLUMN_NAMES, "Duplicate or blank columns found. See issues.",
Expand Down Expand Up @@ -517,3 +488,34 @@ def get_column_refs(self):
column_refs(list): A list of unique column refs found
"""
return []

def _open_dataframe_file(self, file, has_column_names, input_type):
pandas_header = 0
if not has_column_names:
pandas_header = None

if isinstance(file, pandas.DataFrame):
self._dataframe = file.astype(str)
self._has_column_names = self._dataframe_has_names(self._dataframe)
elif not file:
raise HedFileError(HedExceptions.FILE_NOT_FOUND, "Empty file passed to BaseInput.", file)
elif input_type in self.TEXT_EXTENSION:
try:
self._dataframe = pandas.read_csv(file, delimiter='\t', header=pandas_header,
dtype=str, keep_default_na=True, na_values=["", "null"])
except Exception as e:
raise HedFileError(HedExceptions.INVALID_FILE_FORMAT, str(e), self.name) from e
# Convert nan values to a known value
self._dataframe = self._dataframe.fillna("n/a")
elif input_type in self.EXCEL_EXTENSION:
try:
self._loaded_workbook = openpyxl.load_workbook(file)
loaded_worksheet = self.get_worksheet(self._worksheet_name)
self._dataframe = self._get_dataframe_from_worksheet(loaded_worksheet, has_column_names)
except Exception as e:
raise HedFileError(HedExceptions.GENERIC_ERROR, str(e), self.name) from e
else:
raise HedFileError(HedExceptions.INVALID_EXTENSION, "", file)

if self._dataframe.size == 0:
raise HedFileError(HedExceptions.INVALID_DATAFRAME, "Invalid dataframe(malformed datafile, etc)", file)
1 change: 1 addition & 0 deletions hed/models/column_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def check_for_blank_names(column_map, allow_blank_names):
return []

issues = []

for column_number, name in enumerate(column_map):
if name is None or not name or name.startswith(PANDAS_COLUMN_PREFIX_TO_IGNORE):
issues += ErrorHandler.format_error(ValidationErrors.HED_BLANK_COLUMN, column_number)
Expand Down
73 changes: 36 additions & 37 deletions hed/models/definition_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,9 @@ def check_for_definitions(self, hed_string_obj, error_handler=None):
def_issues = []
for definition_tag, group in hed_string_obj.find_top_level_tags(anchor_tags={DefTagNames.DEFINITION_KEY}):
group_tag, new_def_issues = self._find_group(definition_tag, group, error_handler)
def_tag_name = definition_tag.extension
def_tag_name, def_takes_value = self._strip_value_placeholder(definition_tag.extension)

def_takes_value = def_tag_name.lower().endswith("/#")
if def_takes_value:
def_tag_name = def_tag_name[:-len("/#")]

def_tag_lower = def_tag_name.lower()
if "/" in def_tag_lower or "#" in def_tag_lower:
if "/" in def_tag_name or "#" in def_tag_name:
new_def_issues += ErrorHandler.format_error_with_context(error_handler,
DefinitionErrors.INVALID_DEFINITION_EXTENSION,
tag=definition_tag,
Expand All @@ -134,29 +129,42 @@ def check_for_definitions(self, hed_string_obj, error_handler=None):
def_issues += new_def_issues
continue

new_def_issues += self._validate_contents(definition_tag, group_tag, error_handler)
new_def_issues = self._validate_contents(definition_tag, group_tag, error_handler)
new_def_issues += self._validate_placeholders(def_tag_name, group_tag, def_takes_value, error_handler)

if new_def_issues:
def_issues += new_def_issues
continue

if error_handler:
context = error_handler.get_error_context_copy()
else:
context = []
if def_tag_lower in self.defs:
new_def_issues += ErrorHandler.format_error_with_context(error_handler,
DefinitionErrors.DUPLICATE_DEFINITION,
def_name=def_tag_name)
new_def_issues, context = self._validate_name_and_context(def_tag_name, error_handler)
if new_def_issues:
def_issues += new_def_issues
continue
self.defs[def_tag_lower] = DefinitionEntry(name=def_tag_name, contents=group_tag,
takes_value=def_takes_value,
source_context=context)

self.defs[def_tag_name.lower()] = DefinitionEntry(name=def_tag_name, contents=group_tag,
takes_value=def_takes_value,
source_context=context)

return def_issues

def _strip_value_placeholder(self, def_tag_name):
def_takes_value = def_tag_name.lower().endswith("/#")
if def_takes_value:
def_tag_name = def_tag_name[:-len("/#")]
return def_tag_name, def_takes_value

def _validate_name_and_context(self, def_tag_name, error_handler):
if error_handler:
context = error_handler.get_error_context_copy()
else:
context = []
new_def_issues = []
if def_tag_name.lower() in self.defs:
new_def_issues += ErrorHandler.format_error_with_context(error_handler,
DefinitionErrors.DUPLICATE_DEFINITION,
def_name=def_tag_name)
return new_def_issues, context

def _validate_placeholders(self, def_tag_name, group, def_takes_value, error_handler):
new_issues = []
placeholder_tags = []
Expand Down Expand Up @@ -245,18 +253,17 @@ def construct_def_tags(self, hed_string_obj):
Parameters:
hed_string_obj(HedString): The hed string to identify definition contents in
"""
for def_tag, def_expand_group, def_group in hed_string_obj.find_def_tags(recursive=True):
def_contents = self._get_definition_contents(def_tag)
if def_contents is not None:
def_tag._expandable = def_contents
def_tag._expanded = def_tag != def_expand_group
for tag in hed_string_obj.get_all_tags():
self.construct_def_tag(tag)

def construct_def_tag(self, hed_tag):
""" Identify def/def-expand tag contents in the given HedTag.

Parameters:
hed_tag(HedTag): The hed tag to identify definition contents in
"""
# Finish tracking down why parent is set incorrectly on def tags sometimes
# It should be ALWAYS set
if hed_tag.short_base_tag in {DefTagNames.DEF_ORG_KEY, DefTagNames.DEF_EXPAND_ORG_KEY}:
save_parent = hed_tag._parent
def_contents = self._get_definition_contents(hed_tag)
Expand All @@ -277,24 +284,16 @@ def _get_definition_contents(self, def_tag):
def_contents: HedGroup
The contents to replace the previous def-tag with.
"""
is_label_tag = def_tag.extension
placeholder = None
found_slash = is_label_tag.find("/")
if found_slash != -1:
placeholder = is_label_tag[found_slash + 1:]
is_label_tag = is_label_tag[:found_slash]

label_tag_lower = is_label_tag.lower()
tag_label, _, placeholder = def_tag.extension.partition('/')

label_tag_lower = tag_label.lower()
def_entry = self.defs.get(label_tag_lower)
if def_entry is None:
# Could raise an error here?
return None
else:
def_tag_name, def_contents = def_entry.get_definition(def_tag, placeholder_value=placeholder)
if def_tag_name:
return def_contents

return None
def_contents = def_entry.get_definition(def_tag, placeholder_value=placeholder)
return def_contents

@staticmethod
def get_as_strings(def_dict):
Expand Down
14 changes: 5 additions & 9 deletions hed/models/definition_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,31 @@ def get_definition(self, replace_tag, placeholder_value=None, return_copy_of_tag
return_copy_of_tag(bool): Set to true for validation

Returns:
tuple:
str: The expanded def tag name
HedGroup: The contents of this definition(including the def tag itself)
HedGroup: The contents of this definition(including the def tag itself)

:raises ValueError:
- Something internally went wrong with finding the placeholder tag. This should not be possible.
"""
if self.takes_value == (placeholder_value is None):
return None, []
if self.takes_value == (not placeholder_value):
return None

if return_copy_of_tag:
replace_tag = replace_tag.copy()
output_contents = [replace_tag]
name = self.name
if self.contents:
output_group = self.contents
if placeholder_value is not None:
if placeholder_value:
output_group = copy.deepcopy(self.contents)
placeholder_tag = output_group.find_placeholder_tag()
if not placeholder_tag:
raise ValueError("Internal error related to placeholders in definition mapping")
name = f"{name}/{placeholder_value}"
placeholder_tag.replace_placeholder(placeholder_value)

output_contents = [replace_tag, output_group]

output_contents = HedGroup(replace_tag._hed_string,
startpos=replace_tag.span[0], endpos=replace_tag.span[1], contents=output_contents)
return f"{DefTagNames.DEF_EXPAND_ORG_KEY}/{name}", output_contents
return output_contents

def __str__(self):
return str(self.contents)
43 changes: 36 additions & 7 deletions hed/models/expression_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ def __init__(self, text):
"(": Token.LogicalGroup,
")": Token.LogicalGroupEnd,
"~": Token.LogicalNegation,
"?": Token.Wildcard, # Any tag or group
"??": Token.Wildcard, # Any tag
"???": Token.Wildcard, # Any Group
"{": Token.ExactMatch, # Nothing else
"?": Token.Wildcard, # Any tag or group
"??": Token.Wildcard, # Any tag
"???": Token.Wildcard, # Any Group
"{": Token.ExactMatch, # Nothing else
"}": Token.ExactMatchEnd, # Nothing else
"@": Token.NotInLine
}
Expand Down Expand Up @@ -218,6 +218,7 @@ def handle_expr(self, hed_group, exact=False):
all_found_groups = [search_result(group, tag) for tag, group in groups_found]
return all_found_groups


class ExpressionOr(Expression):
def handle_expr(self, hed_group, exact=False):
groups1 = self.left.handle_expr(hed_group, exact=exact)
Expand All @@ -229,7 +230,7 @@ def handle_expr(self, hed_group, exact=False):
for group in groups1:
for other_group in groups2:
if group.has_same_tags(other_group):
duplicates.append(group)
duplicates.append(group)

groups1 = [group for group in groups1 if not any(other_group is group for other_group in duplicates)]

Expand All @@ -245,12 +246,13 @@ def __str__(self):
output_str += ")"
return output_str


class ExpressionNegation(Expression):
def handle_expr(self, hed_group, exact=False):
found_groups = self.right.handle_expr(hed_group, exact=exact)

# Todo: this may need more thought with respects to wildcards and negation
#negated_groups = [group for group in hed_group.get_all_groups() if group not in groups]
# negated_groups = [group for group in hed_group.get_all_groups() if group not in groups]
# This simpler version works on python >= 3.9
# negated_groups = [search_result(group, []) for group in hed_group.get_all_groups() if group not in groups]
# Python 3.7/8 compatible version.
Expand All @@ -259,6 +261,7 @@ def handle_expr(self, hed_group, exact=False):

return negated_groups


class ExpressionContainingGroup(Expression):
def handle_expr(self, hed_group, exact=False):
result = self.right.handle_expr(hed_group, exact=True)
Expand Down Expand Up @@ -310,7 +313,32 @@ def handle_expr(self, hed_group, exact=False):

class QueryParser:
"""Parse a search expression into a form than can be used to search a hed string."""

def __init__(self, expression_string):
"""Compiles a QueryParser for a particular expression, so it can be used to search hed strings.


Basic Input Examples:

'Event' - Finds any strings with Event, or a descendent tag of Event such as Sensory-event

'Event and Action' - Find any strings with Event and Action, including descendant tags

'Event or Action' - Same as above, but it has either

'"Event"' - Finds the Event tag, but not any descendent tags

'Def/DefName/*' - Find Def/DefName instances with placeholders, regardless of the value of the placeholder

'Eve*' - Find any short tags that begin with Eve*, such as Event, but not Sensory-event

'[Event and Action]' - Find a group that contains both Event and Action(at any level)

'[[Event and Action]]' - Find a group with Event And Action at the same level.

Parameters:
expression_string(str): The query string
"""
self.tokens = []
self.at_token = -1
self.tree = self._parse(expression_string.lower())
Expand Down Expand Up @@ -360,7 +388,8 @@ def _handle_negation(self):
return self._handle_grouping_op()

def _handle_grouping_op(self):
next_token = self._next_token_is([Token.ContainingGroup, Token.LogicalGroup, Token.DescendantGroup, Token.ExactMatch])
next_token = self._next_token_is(
[Token.ContainingGroup, Token.LogicalGroup, Token.DescendantGroup, Token.ExactMatch])
if next_token == Token.ContainingGroup:
interior = self._handle_and_op()
expr = ExpressionContainingGroup(next_token, right=interior)
Expand Down
Loading