-
Notifications
You must be signed in to change notification settings - Fork 20
Add (example of) PubMed parser #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
85aa692
a1a749d
57fabc2
c70b69d
3e45361
e0c42fb
c7ec531
d1c87b8
e18f4ff
ba4c010
7ad57e5
bf7bee7
5262da2
4c8b27d
eac47af
cc8ff15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,8 @@ | |||||||||
| from .config import ( | ||||||||||
| DELIMITED_TAG_MAPPING, | ||||||||||
| LIST_TYPE_TAGS, | ||||||||||
| PUBMED_LIST_TYPE_TAGS, | ||||||||||
| PUBMED_TAG_KEY_MAPPING, | ||||||||||
| TAG_KEY_MAPPING, | ||||||||||
| WOK_LIST_TYPE_TAGS, | ||||||||||
| WOK_TAG_KEY_MAPPING, | ||||||||||
|
|
@@ -45,7 +47,6 @@ class RisParser: | |||||||||
|
|
||||||||||
| START_TAG: str = "TY" | ||||||||||
| END_TAG: str = "ER" | ||||||||||
| UNKNOWN_TAG: str = "UK" | ||||||||||
| PATTERN: str | ||||||||||
| DEFAULT_IGNORE: ClassVar[list[str]] = [] | ||||||||||
| DEFAULT_MAPPING: dict = TAG_KEY_MAPPING | ||||||||||
|
|
@@ -63,6 +64,7 @@ def __init__( | |||||||||
| skip_unknown_tags: bool = False, | ||||||||||
| enforce_list_tags: bool = True, | ||||||||||
| newline: Optional[str] = None, | ||||||||||
| undo_wrapping: bool = False, | ||||||||||
| ): | ||||||||||
| """Initialize the parser function. | ||||||||||
|
|
||||||||||
|
|
@@ -98,6 +100,7 @@ def __init__( | |||||||||
| self.skip_unknown_tags = skip_unknown_tags | ||||||||||
| self.enforce_list_tags = enforce_list_tags | ||||||||||
| self.newline = newline if newline is not None else self.DEFAULT_NEWLINE | ||||||||||
| self.undo_wrapping = undo_wrapping | ||||||||||
|
|
||||||||||
| def _iter_till_start(self, lines) -> dict: | ||||||||||
| while True: | ||||||||||
|
|
@@ -122,24 +125,39 @@ def parse_lines(self, lines: Union[TextIO, list[str]]) -> list[dict]: | |||||||||
| while True: | ||||||||||
| tag, content = self.parse_line(next(lines)) | ||||||||||
|
|
||||||||||
| if tag is None: | ||||||||||
| self._add_tag(record, last_tag, content, extend_multiline=True) | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| if tag in self.ignore: | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| if tag == self.END_TAG: | ||||||||||
| if self.END_TAG and tag == self.END_TAG: | ||||||||||
| result.append(record) | ||||||||||
|
|
||||||||||
| last_tag = tag | ||||||||||
| record = self._iter_till_start(lines) | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| self._add_tag(record, tag, content) | ||||||||||
| last_tag = tag | ||||||||||
| if self.END_TAG is None and tag == self.START_TAG: | ||||||||||
| result.append(record) | ||||||||||
| record = {self.mapping[self.START_TAG]: content} | ||||||||||
| last_tag = tag | ||||||||||
| continue | ||||||||||
|
|
||||||||||
| if tag is None and not self.undo_wrapping and last_tag in self.list_tags: | ||||||||||
| self._add_tag(record, last_tag, content) | ||||||||||
| elif tag is None: | ||||||||||
| self._extend_tag(record, last_tag, content) | ||||||||||
| else: | ||||||||||
| self._add_tag(record, tag, content) | ||||||||||
| last_tag = tag | ||||||||||
|
|
||||||||||
| except StopIteration: | ||||||||||
| return result | ||||||||||
| pass | ||||||||||
|
|
||||||||||
| if self.END_TAG is not None and last_tag != self.END_TAG: | ||||||||||
| raise ParseError(f"Missing end tag: {self.END_TAG}") | ||||||||||
|
|
||||||||||
| if self.END_TAG is None: | ||||||||||
| result.append(record) | ||||||||||
|
|
||||||||||
| return result | ||||||||||
|
|
||||||||||
| def parse_line(self, line: str) -> Union[tuple[str, str], tuple[None, str]]: | ||||||||||
| """Parse line of RIS file. | ||||||||||
|
|
@@ -178,18 +196,11 @@ def _add_single_value( | |||||||||
| The output for a tag can be a list when a delimiter is specified, | ||||||||||
| even if it is not a list tag. | ||||||||||
| """ | ||||||||||
| if not is_multi: | ||||||||||
| if self.enforce_list_tags or name not in record: | ||||||||||
| ignore_this_if_has_one = value | ||||||||||
| record.setdefault(name, ignore_this_if_has_one) | ||||||||||
| else: | ||||||||||
| self._add_list_value(record, name, value) | ||||||||||
| if self.enforce_list_tags or name not in record: | ||||||||||
| ignore_this_if_has_one = value | ||||||||||
| record.setdefault(name, ignore_this_if_has_one) | ||||||||||
| else: | ||||||||||
| value_must_exist_or_is_bug = record[name] | ||||||||||
| if isinstance(value, list): | ||||||||||
| record[name].extend(value) | ||||||||||
| else: | ||||||||||
| record[name] = " ".join((value_must_exist_or_is_bug, value)) | ||||||||||
| self._add_list_value(record, name, value) | ||||||||||
|
|
||||||||||
| def _add_list_value(self, record: dict, name: str, value: Union[str, list[str]]) -> None: | ||||||||||
| """Process tags with multiple values.""" | ||||||||||
|
|
@@ -204,29 +215,34 @@ def _add_list_value(self, record: dict, name: str, value: Union[str, list[str]]) | |||||||||
| must_exist = record[name] | ||||||||||
| record[name] = [must_exist, *value_list] | ||||||||||
|
|
||||||||||
| def _add_tag( | ||||||||||
| self, record: dict, tag: str, content: str, extend_multiline: bool = False | ||||||||||
| ) -> None: | ||||||||||
| def _extend_tag(self, record: dict, tag: str, content: Union[str, list[str]]) -> None: | ||||||||||
| """Extend tags with multiline values.""" | ||||||||||
|
|
||||||||||
| sep = " " if self.undo_wrapping else "\n" | ||||||||||
|
|
||||||||||
| name = self.mapping[tag] | ||||||||||
| if isinstance(record[name], list): | ||||||||||
| record[name][-1] = sep.join((record[name][-1], content)) | ||||||||||
| else: | ||||||||||
| record[name] = sep.join((record[name], content)) | ||||||||||
|
|
||||||||||
| def _add_tag(self, record: dict, tag: str, content: str) -> None: | ||||||||||
| try: | ||||||||||
| name = self.mapping[tag] | ||||||||||
| except KeyError: | ||||||||||
| if self.skip_unknown_tags: | ||||||||||
| return | ||||||||||
|
|
||||||||||
| # handle unknown tag | ||||||||||
| name = self.mapping[self.UNKNOWN_TAG] | ||||||||||
| if name not in record: | ||||||||||
| record[name] = defaultdict(list) | ||||||||||
| record[name][tag].append(content) | ||||||||||
|
|
||||||||||
| record.setdefault("unknown_tag", defaultdict(list))[tag].append(content) | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we removed |
||||||||||
| return | ||||||||||
| else: | ||||||||||
| if delimiter := self.delimiter_map.get(tag): | ||||||||||
| content = [i.strip() for i in content.split(delimiter)] | ||||||||||
|
|
||||||||||
| if tag in self.list_tags: | ||||||||||
| self._add_list_value(record, name, content) | ||||||||||
| else: | ||||||||||
| self._add_single_value(record, name, content, is_multi=extend_multiline) | ||||||||||
| self._add_single_value(record, name, content) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class WokParser(RisParser): | ||||||||||
|
|
@@ -238,6 +254,9 @@ class WokParser(RisParser): | |||||||||
| DEFAULT_LIST_TAGS = WOK_LIST_TYPE_TAGS | ||||||||||
| DEFAULT_DELIMITER_MAPPING: ClassVar[dict] = {} | ||||||||||
|
|
||||||||||
| def __init__(self, undo_wrapping: bool = True, **kw): | ||||||||||
| super().__init__(undo_wrapping=undo_wrapping, **kw) | ||||||||||
|
|
||||||||||
| def parse_line(self, line: str) -> Union[tuple[str, str], tuple[None, str]]: | ||||||||||
| """Parse line of RIS file. | ||||||||||
|
|
||||||||||
|
|
@@ -262,6 +281,38 @@ def parse_line(self, line: str) -> Union[tuple[str, str], tuple[None, str]]: | |||||||||
| return (line[0:2], line[3:].strip()) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class PubMedParser(RisParser): | ||||||||||
| """Subclass of Base for reading PubMed RIS files.""" | ||||||||||
|
|
||||||||||
| START_TAG: str = "PMID" | ||||||||||
| END_TAG: None = None | ||||||||||
| DEFAULT_MAPPING: dict = PUBMED_TAG_KEY_MAPPING | ||||||||||
| DEFAULT_LIST_TAGS: list[str] = PUBMED_LIST_TYPE_TAGS | ||||||||||
| DEFAULT_DELIMITER_MAPPING: ClassVar[dict] = {} | ||||||||||
|
|
||||||||||
| def __init__(self, undo_wrapping: bool = True, **kw): | ||||||||||
| super().__init__(undo_wrapping=undo_wrapping, **kw) | ||||||||||
|
|
||||||||||
| def parse_line(self, line: str) -> Union[tuple[str, str], tuple[None, str]]: | ||||||||||
| """Parse line of PubMed file. | ||||||||||
|
|
||||||||||
| Parameters | ||||||||||
| ---------- | ||||||||||
| line : str | ||||||||||
| Line of RIS file between start and end tag. | ||||||||||
|
|
||||||||||
| Returns | ||||||||||
| ------- | ||||||||||
| tuple | ||||||||||
| Tuple containing the tag and the content of the tag. | ||||||||||
| """ | ||||||||||
|
|
||||||||||
|
||||||||||
| if len(line) < 5: | |
| return (None, line.strip()) |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -117,8 +117,11 @@ def _format_reference(self, ref, count, n): | |||||
| try: | ||||||
| tag = self._rev_mapping[label.lower()] | ||||||
| except KeyError: | ||||||
| warnings.warn(UserWarning(f"label `{label}` not exported"), stacklevel=2) | ||||||
| continue | ||||||
| if label.lower() == "unknown_tag": | ||||||
|
||||||
| if label.lower() == "unknown_tag": | |
| if label.lower() == self.UNKNOWN_TAG: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove from this PR?