diff --git a/clams/__init__.py b/clams/__init__.py index 7ead7de..6943815 100644 --- a/clams/__init__.py +++ b/clams/__init__.py @@ -3,7 +3,8 @@ from mmif import __specver__ from clams import develop -from clams import source +from clams.mmif_utils import source +from clams.mmif_utils import rewind from clams.app import * from clams.app import __all__ as app_all from clams.appmetadata import AppMetadata @@ -23,7 +24,7 @@ def prep_argparser(): version=version_template.format(__version__, __specver__) ) subparsers = parser.add_subparsers(title='sub-command', dest='subcmd') - for subcmd_module in [source, develop]: + for subcmd_module in [source, rewind, develop]: subcmd_name = subcmd_module.__name__.rsplit('.')[-1] subcmd_parser = subcmd_module.prep_argparser(add_help=False) subparsers.add_parser(subcmd_name, parents=[subcmd_parser], @@ -42,5 +43,7 @@ def cli(): args = parser.parse_args() if args.subcmd == 'source': source.main(args) + if args.subcmd == 'rewind': + rewind.main(args) if args.subcmd == 'develop': develop.main(args) diff --git a/clams/app/__init__.py b/clams/app/__init__.py index 22ff159..c0074be 100644 --- a/clams/app/__init__.py +++ b/clams/app/__init__.py @@ -193,10 +193,11 @@ def sign_view(self, view: View, runtime_conf: Optional[dict] = None) -> None: :param runtime_conf: runtime configuration of the app as k-v pairs """ # TODO (krim @ 8/2/23): once all devs understood this change, make runtime_conf a required argument - warnings.warn("`runtime_conf` argument for ClamsApp.sign_view() will " - "no longer be optional in the future. Please just pass " - "`runtime_params` from _annotate() method.", - FutureWarning, stacklevel=2) + if runtime_conf is None: + warnings.warn("`runtime_conf` argument for ClamsApp.sign_view() will " + "no longer be optional in the future. Please just pass " + "`runtime_params` from _annotate() method.", + FutureWarning, stacklevel=2) view.metadata.app = self.metadata.identifier if runtime_conf is not None: if self._RAW_PARAMS_KEY in runtime_conf: diff --git a/clams/mmif_utils/__init__.py b/clams/mmif_utils/__init__.py new file mode 100644 index 0000000..4ece749 --- /dev/null +++ b/clams/mmif_utils/__init__.py @@ -0,0 +1,3 @@ +from clams.mmif_utils import rewind +from clams.mmif_utils import source + diff --git a/clams/mmif_utils/rewind.py b/clams/mmif_utils/rewind.py new file mode 100644 index 0000000..63bb3ea --- /dev/null +++ b/clams/mmif_utils/rewind.py @@ -0,0 +1,135 @@ +import argparse +import sys +import textwrap +from pathlib import Path as P + +import mmif + + +def is_valid_choice(choice): + try: + ichoice = int(choice) + if 0 <= ichoice: + return ichoice + else: + raise ValueError(f"\nInvalid argument for -n. Please enter a positive integer.") + except ValueError: + raise argparse.ArgumentTypeError(f"\nInvalid argument for -n. Please enter a positive integer.") + +def user_choice(mmif_obj:mmif.Mmif) -> int: + """ + Function to ask user to choose the rewind range. + + :param mmif_obj: mmif object + :return: int option number + """ + + ## Give a user options (#, "app", "timestamp") - time order + n = len(mmif_obj.views) + i = 0 # option number + # header + print("\n" + "{:<4} {:<30} {:<100}".format("num", "timestamp", "app")) + for view in mmif_obj.views: + option = "{:<4} {:<30} {:<100}".format(n-i, str(view.metadata.timestamp), str(view.metadata.app)) + print(option) + i += 1 + + ## User input + while True: + choice = int(input("\nEnter the number to delete from that point by rewinding: ")) + try: + if 0 <= choice <= n: + return choice + else: + print(f"\nInvalid choice. Please enter an integer in the range [0, {n}].") + except ValueError: + print("\nInvalid input. Please enter a valid number.") + + +def rewind_mmif(mmif_obj: mmif.Mmif, choice: int, choice_is_viewnum: bool = True) -> mmif.Mmif: + """ + Rewind MMIF by deleting the last N views. + The number of views to rewind is given as a number of "views", or number of "producer apps". + By default, the number argument is interpreted as the number of "views". + Note that when the same app is repeatedly run in a CLAMS pipeline and produces multiple views in a row, + rewinding in "app" mode will rewind all those views at once. + + :param mmif_obj: mmif object + :param choice: number of views to rewind + :param choice_is_viewnum: if True, choice is the number of views to rewind. If False, choice is the number of producer apps to rewind. + :return: rewound mmif object + + """ + if choice_is_viewnum: + for vid in list(v.id for v in mmif_obj.views)[-1:-choice-1:-1]: + mmif_obj.views._items.pop(vid) + else: + app_count = 0 + cur_app = "" + vid_to_pop = [] + for v in reversed(mmif_obj.views): + vid_to_pop.append(v.id) + if app_count >= choice: + break + if v.metadata.app != cur_app: + app_count += 1 + cur_app = v.metadata.app + for vid in vid_to_pop: + mmif_obj.views._items.pop(vid) + return mmif_obj + + +def describe_argparser(): + """ + returns two strings: one-line description of the argparser, and addition material, + which will be shown in `clams --help` and `clams --help`, respectively. + """ + oneliner = 'provides CLI to rewind a MMIF from a CLAMS pipeline.' + additional = textwrap.dedent(""" + MMIF rewinder rewinds a MMIF by deleting the last N views. + N can be specified as a number of views, or a number of producer apps. """) + return oneliner, oneliner + '\n\n' + additional + + +def prep_argparser(**kwargs): + parser = argparse.ArgumentParser(description=describe_argparser()[1], formatter_class=argparse.RawDescriptionHelpFormatter, **kwargs) + parser.add_argument("mmif_file", nargs=1, help="Path to the input MMIF file, or '-' to read from stdin.") + parser.add_argument("-o", '--output', default=None, metavar="PATH", help="Path to the rewound MMIF output file. When not given, the rewound MMIF is printed to stdout.") + parser.add_argument("-p", '--pretty', action='store_true', help="Pretty-print rewound MMIF. True by default") + parser.add_argument("-n", '--number', default="0", type=is_valid_choice, help="Number of views to rewind (default: interactive mode)") + parser.add_argument("-m", '--mode', choices=['app', 'view'], default='view', help="Number of views to rewind (default: interactive mode)") + return parser + + +def main(args): + mmif_obj = mmif.Mmif(sys.stdin) if args.mmif_file[0] == '-' else mmif.Mmif(open(args.mmif_file[0]).read()) + + if args.number == 0: # If user doesn't know how many views to rewind, give them choices. + choice = user_choice(mmif_obj) + else: + choice = args.number + + + if args.output: + # Check if the same file name exist in the path and avoid overwriting. + output_fp = P(args.output) + if output_fp.is_file(): + parent = output_fp.parent + stem = output_fp.stem + suffix = output_fp.suffix + count = 1 + while (parent / f"{stem}_{count}{suffix}").is_file(): + count += 1 + output_fp = parent / f"{stem}_{count}{suffix}" + + out_f = open(output_fp, 'w') + else: + out_f = sys.stdout + out_f.write(rewind_mmif(mmif_obj, choice, args.mode == 'view').serialize(pretty=args.pretty)) + + +if __name__ == "__main__": + parser = prep_argparser() + args = parser.parse_args() + main(args) + diff --git a/clams/source/__init__.py b/clams/mmif_utils/source.py similarity index 100% rename from clams/source/__init__.py rename to clams/mmif_utils/source.py diff --git a/tests/test_clamscli.py b/tests/test_clamscli.py index 88b241f..89a3deb 100644 --- a/tests/test_clamscli.py +++ b/tests/test_clamscli.py @@ -1,10 +1,15 @@ +import contextlib +import copy import io import os import unittest -import contextlib -import clams -from clams import source + from mmif.serialize import Mmif +from mmif.vocabulary import DocumentTypes, AnnotationTypes + +import clams +from clams.mmif_utils import rewind +from clams.mmif_utils import source class TestCli(unittest.TestCase): @@ -105,5 +110,106 @@ def test_generate_mixed_scheme(self): self.assertTrue('file' in schemes) +class TestRewind(unittest.TestCase): + def setUp(self): + self.dummy_app_one = ExampleApp() + self.dummy_app_one.metadata.identifier = "dummy_app_one" + self.dummy_app_two = ExampleApp() + self.dummy_app_two.metadata.identifier = "dummy_app_two" + + # mmif we add views to + self.mmif_one = Mmif( + { + "metadata": {"mmif": "http://mmif.clams.ai/1.0.0"}, + "documents": [], + "views": [], + } + ) + + # baseline empty mmif for comparison + self.empty_mmif = Mmif( + { + "metadata": {"mmif": "http://mmif.clams.ai/1.0.0"}, + "documents": [], + "views": [], + } + ) + + def test_view_rewind(self): + """ + Tests the use of "view-rewiding" to remove multiple views from a single app. + """ + # Regular Case + mmif_added_views = self.dummy_app_one.mmif_add_views(self.mmif_one, 10) + self.assertEqual(len(mmif_added_views.views), 10) + rewound = rewind.rewind_mmif(mmif_added_views, 5) + self.assertEqual(len(rewound.views), 5) + # rewinding is done "in-place" + self.assertEqual(len(rewound.views), len(mmif_added_views.views)) + + def test_app_rewind(self): + # Regular Case + app_one_views = 3 + app_two_views = 2 + app_one_out = self.dummy_app_one.mmif_add_views(self.mmif_one, app_one_views) + app_two_out = self.dummy_app_two.mmif_add_views(app_one_out, app_two_views) + self.assertEqual(len(app_two_out.views), app_one_views + app_two_views) + rewound = rewind.rewind_mmif(app_two_out, 1, choice_is_viewnum=False) + self.assertEqual(len(rewound.views), app_one_views) + +def compare_views(a: Mmif, b: Mmif) -> bool: + perfect_match = True + for view_a, view_b in zip(a.views, b.views): + if view_a != view_b: + perfect_match = False + return perfect_match + + +class ExampleApp(clams.app.ClamsApp): + """This is a barebones implementation of a CLAMS App + used to generate simple Views within a mmif object + for testing purposes. The three methods here all streamline + the mmif annotation process for the purposes of repeated insertion + and removal. + """ + + app_version = "lorem_ipsum" + + def _appmetadata(self): + pass + + def _annotate(self, mmif: Mmif, message: str, idx: int, **kwargs): + if type(mmif) is not Mmif: + mmif_obj = Mmif(mmif, validate=False) + else: + mmif_obj = mmif + + new_view = mmif_obj.new_view() + self.sign_view(new_view, runtime_conf=kwargs) + self.gen_annotate(new_view, message, idx) + + d1 = DocumentTypes.VideoDocument + d2 = DocumentTypes.from_str(f"{str(d1)[:-1]}99") + if mmif.get_documents_by_type(d2): + new_view.new_annotation(AnnotationTypes.TimePoint, "tp1") + if "raise_error" in kwargs and kwargs["raise_error"]: + raise ValueError + return mmif + + def gen_annotate(self, mmif_view, message, idx=0): + mmif_view.new_contain( + AnnotationTypes.TimeFrame, **{"producer": "dummy-producer"} + ) + ann = mmif_view.new_annotation( + AnnotationTypes.TimeFrame, "a1", start=10, end=99 + ) + ann.add_property("f1", message) + + def mmif_add_views(self, mmif_obj, idx: int): + """Helper Function to add an arbitrary number of views to a mmif""" + for i in range(idx): + mmif_obj = self._annotate(mmif_obj, message=f"message {i}", idx=idx) + return mmif_obj + if __name__ == '__main__': unittest.main()