diff --git a/monai/transforms/utility/dictionary.py b/monai/transforms/utility/dictionary.py index 4bc8ff1afa..3a5be20e8b 100644 --- a/monai/transforms/utility/dictionary.py +++ b/monai/transforms/utility/dictionary.py @@ -17,6 +17,7 @@ import copy import logging +import re from copy import deepcopy from typing import Any, Callable, Dict, Hashable, List, Mapping, Optional, Sequence, Tuple, Union @@ -637,8 +638,38 @@ class DeleteItemsd(MapTransform): It will remove the key-values and copy the others to construct a new dictionary. """ + def __init__( + self, + keys: KeysCollection, + sep: str = ".", + use_re: Union[Sequence[bool], bool] = False, + ) -> None: + """ + Args: + keys: keys of the corresponding items to delete, can be "A{sep}B{sep}C" + to delete key `C` in nested dictionary, `C` can be regular expression. + See also: :py:class:`monai.transforms.compose.MapTransform` + sep: the separator tag to define nested dictionary keys, default to ".". + use_re: whether the specified key is a regular expression, it also can be + a list of bool values, map the to keys. + """ + super().__init__(keys) + self.sep = sep + self.use_re = ensure_tuple_rep(use_re, len(self.keys)) + def __call__(self, data): - return {key: val for key, val in data.items() if key not in self.key_iterator(data)} + def _delete_item(keys, d, use_re: bool = False): + key = keys[0] + if len(keys) > 1: + d[key] = _delete_item(keys[1:], d[key], use_re) + return d + return {k: v for k, v in d.items() if (use_re and not re.search(key, k)) or (not use_re and k != key)} + + d = dict(data) + for key, use_re in zip(self.keys, self.use_re): + d = _delete_item(key.split(self.sep), d, use_re) + + return d class SelectItemsd(MapTransform): diff --git a/tests/test_delete_itemsd.py b/tests/test_delete_itemsd.py index 7426e39ff0..b7cd104c46 100644 --- a/tests/test_delete_itemsd.py +++ b/tests/test_delete_itemsd.py @@ -19,19 +19,36 @@ TEST_CASE_1 = [{"keys": [str(i) for i in range(30)]}, 20] +TEST_CASE_2 = [{"keys": ["image/" + str(i) for i in range(30)], "sep": "/"}, 20] + +TEST_CASE_3 = [{"keys": "meta_dict%0008\\|[0-9]", "sep": "%", "use_re": True}] + class TestDeleteItemsd(unittest.TestCase): - @parameterized.expand([TEST_CASE_1]) + @parameterized.expand([TEST_CASE_1, TEST_CASE_2]) def test_memory(self, input_param, expected_key_size): - input_data = {} + input_data = {"image": {}} if "sep" in input_param else {} for i in range(50): - input_data[str(i)] = [time.time()] * 100000 + if "sep" in input_param: + input_data["image"][str(i)] = [time.time()] * 100000 + else: + input_data[str(i)] = [time.time()] * 100000 result = DeleteItemsd(**input_param)(input_data) - self.assertEqual(len(result.keys()), expected_key_size) + if "sep" in input_param: + self.assertEqual(len(result["image"].keys()), expected_key_size) + else: + self.assertEqual(len(result.keys()), expected_key_size) self.assertGreaterEqual( sys.getsizeof(input_data) * float(expected_key_size) / len(input_data), sys.getsizeof(result) ) + @parameterized.expand([TEST_CASE_3]) + def test_re(self, input_param): + input_data = {"image": [1, 2, 3], "meta_dict": {"0008|0005": 1, "0008|1050": 2, "0008test": 3}} + result = DeleteItemsd(**input_param)(input_data) + self.assertEqual(result["meta_dict"]["0008test"], 3) + self.assertTrue(len(result["meta_dict"]), 1) + if __name__ == "__main__": unittest.main()