From bfa54a40ddcba252abfbe02bea9b0e9d4611484c Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Tue, 21 Nov 2023 14:55:50 +0100 Subject: [PATCH 1/8] feat: find_tasks_iter --- examples/task_example.py | 6 +++ influxdb_client/client/tasks_api.py | 59 ++++++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/examples/task_example.py b/examples/task_example.py index 55595ba9..242dcf4d 100644 --- a/examples/task_example.py +++ b/examples/task_example.py @@ -25,3 +25,9 @@ task_request = TaskCreateRequest(flux=flux, org=org, description="Task Description", status="active") task = tasks_api.create_task(task_create_request=task_request) print(task) + + tasks = tasks_api.find_tasks_iter() + + # print all tasks id + for task in tasks: + print(task.id) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index dd85683b..691604e9 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -10,6 +10,24 @@ from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \ AddResourceMemberRequestBody, RunManually, Run, LogEvent +class TasksIterator: + def __init__(self, values, next) -> None: + self.values = values + self.next = next + self.no_values = False if values else True + + def __iter__(self): + return self + + def __next__(self): + if self.no_values: + raise StopIteration + if not self.values: + self.values, self.next = self.next() + if not self.values: + raise StopIteration + return self.values.pop(0) + class TasksApi(object): """Implementation for '/api/v2/tasks' endpoint.""" @@ -25,7 +43,7 @@ def find_task_by_id(self, task_id) -> Task: return task def find_tasks(self, **kwargs): - """List all tasks. + """List all tasks up to set limit (max 500). :key str name: only returns tasks with the specified name :key str after: returns tasks after specified ID @@ -37,6 +55,45 @@ def find_tasks(self, **kwargs): """ return self._service.get_tasks(**kwargs).tasks + def _find_tasks_paged(self, **kwargs): + """List all tasks with ability to list next tasks after limit. + + :key str name: only returns tasks with the specified name + :key str after: returns tasks after specified ID + :key str user: filter tasks to a specific user ID + :key str org: filter tasks to a specific organization name + :key str org_id: filter tasks to a specific organization ID + :key int limit: the number of tasks to return in one page + :return: Tasks, Next + """ + tasks = self._service.get_tasks(**kwargs).tasks + + last_id = tasks[-1].id if tasks else None + def next(): + if last_id is not None: + return self._find_tasks_paged(**{**kwargs, 'after': last_id}) + else: + def func(): + raise Exception("There are no additional pages remaining for tasks.") + return [], func + + return tasks, next + + def find_tasks_iter(self, **kwargs): + """Iterate over all tasks with pagination. + + :key str name: only returns tasks with the specified name + :key str after: returns tasks after specified ID + :key str user: filter tasks to a specific user ID + :key str org: filter tasks to a specific organization name + :key str org_id: filter tasks to a specific organization ID + :key int limit: the number of tasks in one page + :return: Tasks iterator + """ + tasks, next = self._find_tasks_paged(**kwargs) + + return iter(TasksIterator(tasks, next)) + def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task: """Create a new task.""" if task_create_request is not None: From f72b3cc0493ca0d894d4e4141ea064de8c5a9c76 Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Tue, 21 Nov 2023 15:02:16 +0100 Subject: [PATCH 2/8] feat: optimize __next__ of TasksIterator --- influxdb_client/client/tasks_api.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index 691604e9..521ecebe 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -14,14 +14,11 @@ class TasksIterator: def __init__(self, values, next) -> None: self.values = values self.next = next - self.no_values = False if values else True def __iter__(self): - return self + return self if self.values else (_ for _ in ()) def __next__(self): - if self.no_values: - raise StopIteration if not self.values: self.values, self.next = self.next() if not self.values: From 082231c10c9200ad56621e8a280c0e81d6197b3c Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Wed, 22 Nov 2023 09:45:06 +0100 Subject: [PATCH 3/8] fix: no empty calls paged tasks --- influxdb_client/client/tasks_api.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index 521ecebe..774a4e0a 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -20,7 +20,8 @@ def __iter__(self): def __next__(self): if not self.values: - self.values, self.next = self.next() + if self.next: + self.values, self.next = self.next() if not self.values: raise StopIteration return self.values.pop(0) @@ -61,18 +62,18 @@ def _find_tasks_paged(self, **kwargs): :key str org: filter tasks to a specific organization name :key str org_id: filter tasks to a specific organization ID :key int limit: the number of tasks to return in one page - :return: Tasks, Next + :return: Tasks, Next or None """ - tasks = self._service.get_tasks(**kwargs).tasks + tasks_response = self._service.get_tasks(**kwargs) + tasks = tasks_response.tasks + has_next = tasks_response.links.next is not None last_id = tasks[-1].id if tasks else None def next(): - if last_id is not None: + if has_next and last_id is not None: return self._find_tasks_paged(**{**kwargs, 'after': last_id}) else: - def func(): - raise Exception("There are no additional pages remaining for tasks.") - return [], func + return [], None return tasks, next From 213860d19995ca5d089198aa13c153013e4ffdbe Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Wed, 22 Nov 2023 10:02:16 +0100 Subject: [PATCH 4/8] style: linted --- influxdb_client/client/tasks_api.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index 774a4e0a..deef72c3 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -10,7 +10,8 @@ from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \ AddResourceMemberRequestBody, RunManually, Run, LogEvent -class TasksIterator: + +class _TasksIterator: def __init__(self, values, next) -> None: self.values = values self.next = next @@ -69,9 +70,10 @@ def _find_tasks_paged(self, **kwargs): has_next = tasks_response.links.next is not None last_id = tasks[-1].id if tasks else None + def next(): if has_next and last_id is not None: - return self._find_tasks_paged(**{**kwargs, 'after': last_id}) + return self._find_tasks_paged(**{**kwargs, 'after': last_id}) else: return [], None @@ -89,8 +91,8 @@ def find_tasks_iter(self, **kwargs): :return: Tasks iterator """ tasks, next = self._find_tasks_paged(**kwargs) - - return iter(TasksIterator(tasks, next)) + + return iter(_TasksIterator(tasks, next)) def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task: """Create a new task.""" From d390fd6bea769d1b39e4403c54b74c9862b0bece Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Wed, 22 Nov 2023 12:12:02 +0100 Subject: [PATCH 5/8] test: find_tasks_iter --- tests/test_TasksApi.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_TasksApi.py b/tests/test_TasksApi.py index 2bea7659..862abde2 100644 --- a/tests/test_TasksApi.py +++ b/tests/test_TasksApi.py @@ -184,6 +184,32 @@ def test_find_task_by_user_id(self): print(tasks) self.assertEqual(len(tasks), 1) + def test_find_tasks_iter(self): + task_name = self.generate_name("it task") + num_of_tasks = 10 + + for _ in range(num_of_tasks): + self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id) + + def count_unique_ids(tasks): + return len(set(map(lambda task: task.id, tasks))) + + # get tasks in 3-4 batches + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks // 3) + self.assertEqual(count_unique_ids(tasks), num_of_tasks) + + # get tasks in one equaly size batch + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks) + self.assertEqual(count_unique_ids(tasks), num_of_tasks) + + # get tasks in one batch + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks + 1) + self.assertEqual(count_unique_ids(tasks), num_of_tasks) + + # get no tasks + tasks = self.tasks_api.find_tasks_iter(name= task_name + "blah") + self.assertEqual(count_unique_ids(tasks), 0) + def test_delete_task(self): task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *", self.organization.id) From 968d82370b17b9c6d2eeca7a056afb8269791635 Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Fri, 24 Nov 2023 11:14:56 +0100 Subject: [PATCH 6/8] refactor: reusable paging --- influxdb_client/client/tasks_api.py | 65 ++++++++++++++++------------- tests/test_TasksApi.py | 5 +++ 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index deef72c3..c7a9ae00 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -11,21 +11,36 @@ AddResourceMemberRequestBody, RunManually, Run, LogEvent -class _TasksIterator: - def __init__(self, values, next) -> None: +class _Page: + def __init__(self, values, has_next, next_after): + self.has_next = has_next self.values = values - self.next = next + self.next_after = next_after + + @staticmethod + def empty(): + return _Page([], False, None) + + @staticmethod + def initial(after): + return _Page([], True, after) + + +class _PageIterator: + def __init__(self, page: _Page, get_next_page): + self.page = page + self.get_next_page = get_next_page def __iter__(self): - return self if self.values else (_ for _ in ()) + return self def __next__(self): - if not self.values: - if self.next: - self.values, self.next = self.next() - if not self.values: + if not self.page.values: + if self.page.has_next: + self.page = self.get_next_page(self.page) + if not self.page.values: raise StopIteration - return self.values.pop(0) + return self.page.values.pop(0) class TasksApi(object): @@ -54,30 +69,18 @@ def find_tasks(self, **kwargs): """ return self._service.get_tasks(**kwargs).tasks - def _find_tasks_paged(self, **kwargs): - """List all tasks with ability to list next tasks after limit. + def _find_tasks_next_page(self, page: _Page, **kwargs): + if not page.has_next: + return _Page.empty() - :key str name: only returns tasks with the specified name - :key str after: returns tasks after specified ID - :key str user: filter tasks to a specific user ID - :key str org: filter tasks to a specific organization name - :key str org_id: filter tasks to a specific organization ID - :key int limit: the number of tasks to return in one page - :return: Tasks, Next or None - """ - tasks_response = self._service.get_tasks(**kwargs) - tasks = tasks_response.tasks + args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs + tasks_response = self._service.get_tasks(**args) + tasks = tasks_response.tasks has_next = tasks_response.links.next is not None last_id = tasks[-1].id if tasks else None - def next(): - if has_next and last_id is not None: - return self._find_tasks_paged(**{**kwargs, 'after': last_id}) - else: - return [], None - - return tasks, next + return _Page(tasks, has_next, last_id) def find_tasks_iter(self, **kwargs): """Iterate over all tasks with pagination. @@ -90,9 +93,11 @@ def find_tasks_iter(self, **kwargs): :key int limit: the number of tasks in one page :return: Tasks iterator """ - tasks, next = self._find_tasks_paged(**kwargs) - return iter(_TasksIterator(tasks, next)) + def get_next_page(page: _Page): + return self._find_tasks_next_page(page, **kwargs) + + return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page)) def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task: """Create a new task.""" diff --git a/tests/test_TasksApi.py b/tests/test_TasksApi.py index 862abde2..dc936dfd 100644 --- a/tests/test_TasksApi.py +++ b/tests/test_TasksApi.py @@ -210,6 +210,11 @@ def count_unique_ids(tasks): tasks = self.tasks_api.find_tasks_iter(name= task_name + "blah") self.assertEqual(count_unique_ids(tasks), 0) + # skip some tasks + *_, split_task = self.tasks_api.find_tasks(name= task_name, limit= num_of_tasks // 3) + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= 3, after= split_task.id) + self.assertEqual(count_unique_ids(tasks), num_of_tasks - num_of_tasks // 3) + def test_delete_task(self): task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *", self.organization.id) From 9290d4ec8c838cf4211e4716f46b91333f50b50f Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Fri, 24 Nov 2023 11:50:06 +0100 Subject: [PATCH 7/8] docs: Readme update --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f7973f1..3ab629b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.39.0 [unreleased] +### Features +1. [#616](https://github.com/influxdata/influxdb-client-python/pull/616): Add `find_tasks_iter` function that allow iterate through all pages of tasks. + ## 1.38.0 [2023-10-02] ### Bug Fixes From ae2c18bd01fdaecbbd76af1ccd2e0b00605b3af8 Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Mon, 27 Nov 2023 09:46:39 +0100 Subject: [PATCH 8/8] style: private function moved --- influxdb_client/client/tasks_api.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index c7a9ae00..9edb2ec9 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -69,19 +69,6 @@ def find_tasks(self, **kwargs): """ return self._service.get_tasks(**kwargs).tasks - def _find_tasks_next_page(self, page: _Page, **kwargs): - if not page.has_next: - return _Page.empty() - - args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs - tasks_response = self._service.get_tasks(**args) - - tasks = tasks_response.tasks - has_next = tasks_response.links.next is not None - last_id = tasks[-1].id if tasks else None - - return _Page(tasks, has_next, last_id) - def find_tasks_iter(self, **kwargs): """Iterate over all tasks with pagination. @@ -272,3 +259,16 @@ def get_logs(self, task_id: str) -> List['LogEvent']: def find_tasks_by_user(self, task_user_id): """List all tasks by user.""" return self.find_tasks(user=task_user_id) + + def _find_tasks_next_page(self, page: _Page, **kwargs): + if not page.has_next: + return _Page.empty() + + args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs + tasks_response = self._service.get_tasks(**args) + + tasks = tasks_response.tasks + has_next = tasks_response.links.next is not None + last_id = tasks[-1].id if tasks else None + + return _Page(tasks, has_next, last_id)