From a0fa118b45df225c9520dee06e111cf6cb3bc6f7 Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Thu, 23 Nov 2023 12:59:12 +0100 Subject: [PATCH 1/5] feat: findTasksStream --- .../java/com/influxdb/client/TasksApi.java | 10 +++ .../client/internal/TasksApiImpl.java | 77 +++++++++++++++++++ .../java/com/influxdb/client/ITTasksApi.java | 39 ++++++++++ 3 files changed, 126 insertions(+) diff --git a/client/src/main/java/com/influxdb/client/TasksApi.java b/client/src/main/java/com/influxdb/client/TasksApi.java index a0739dc25ff..cd8844e4e6a 100644 --- a/client/src/main/java/com/influxdb/client/TasksApi.java +++ b/client/src/main/java/com/influxdb/client/TasksApi.java @@ -23,6 +23,7 @@ import java.time.OffsetDateTime; import java.util.List; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -275,6 +276,15 @@ List findTasks(@Nullable final String afterID, @Nonnull List findTasks(@Nonnull final TasksQuery query); + /** + * Query tasks, automaticaly paged by given limit (default 100). + * + * @param query query params for task + * @return A list of tasks + */ + @Nonnull + Stream findTasksStream(@Nonnull final TasksQuery query); + /** * List all task members. * diff --git a/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java b/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java index 6f079fa2b23..36310338bce 100644 --- a/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java @@ -22,9 +22,15 @@ package com.influxdb.client.internal; import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -147,6 +153,77 @@ public List findTasks(@Nonnull final TasksQuery query) { return tasks.getTasks(); } + @Nonnull + @Override + public Stream findTasksStream(@Nonnull final TasksQuery query) { + Iterator iterator = new Iterator() { + private boolean hasNext = true; + + @Nonnull + private Iterator tasksIterator = Collections.emptyIterator(); + + @Nullable + private String after = query.getAfter(); + + @Override + public boolean hasNext() { + if (tasksIterator.hasNext()) { + return true; + } else if (hasNext) { + doQueryNext(); + return tasksIterator.hasNext(); + } else { + return false; + } + } + + private void doQueryNext() { + Call call = service.getTasks(null, query.getName(), after, query.getUser(), + query.getOrg(), query.getOrgID(), query.getStatus(), query.getLimit(), query.getType()); + + Tasks tasks = execute(call); + + List tasksList = tasks.getTasks(); + tasksIterator = tasksList.iterator(); + if (!tasksList.isEmpty()) { + Task lastTask = tasksList.get(tasksList.size() - 1); + after = lastTask.getId(); + } + + @Nullable String nextUrl = tasks.getLinks().getNext(); + hasNext = nextUrl != null && !nextUrl.isEmpty(); + + String logMsg = "findTasksStream found: {0} has next page: {1} next after {2}: "; + LOG.log(Level.FINEST, logMsg, new Object[]{tasks, hasNext, after}); + } + + @Override + public Task next() throws IndexOutOfBoundsException { + if (!tasksIterator.hasNext() && hasNext) { + doQueryNext(); + } + + if (tasksIterator.hasNext()){ + return tasksIterator.next(); + } else { + throw new IndexOutOfBoundsException(); + } + } + + @Override + public void remove() throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + }; + + Stream stream = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), + false); + + return stream; + + } + @Nonnull @Override public Task createTask(@Nonnull final Task task) { diff --git a/client/src/test/java/com/influxdb/client/ITTasksApi.java b/client/src/test/java/com/influxdb/client/ITTasksApi.java index 9f399eb4604..333023f71bf 100644 --- a/client/src/test/java/com/influxdb/client/ITTasksApi.java +++ b/client/src/test/java/com/influxdb/client/ITTasksApi.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import com.influxdb.client.domain.Authorization; @@ -49,6 +51,7 @@ import com.influxdb.exceptions.NotFoundException; import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -315,6 +318,42 @@ void findTasksAfterSpecifiedID() { Assertions.assertThat(tasks.get(0).getId()).isEqualTo(task2.getId()); } + @Test + void findTasksAll() { + String taskName = generateName("it task all"); + int numOfTasks = 10; + + for (int i = 0; i < numOfTasks; i++) { + tasksApi.createTaskCron(taskName, TASK_FLUX, "0 2 * * *", organization); + } + + final TasksQuery tasksQuery = new TasksQuery(); + tasksQuery.setName(taskName); + + List tasks; + + // get tasks in 3-4 batches + tasksQuery.setLimit(numOfTasks / 3); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(numOfTasks); + + // get tasks in one equally size batch + tasksQuery.setLimit(numOfTasks); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(numOfTasks); + + // get tasks in one batch + tasksQuery.setLimit(numOfTasks + 1); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(numOfTasks); + + // get no tasks + tasksQuery.setLimit(null); + tasksQuery.setName(taskName + "___"); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(0); + } + @Test void deleteTask() { From 7fa63f3f54b661a5001dc57b08787a2213f5ab70 Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Mon, 27 Nov 2023 10:18:22 +0100 Subject: [PATCH 2/5] style: --- .../main/java/com/influxdb/client/internal/TasksApiImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java b/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java index 36310338bce..d39f13d75b1 100644 --- a/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java @@ -203,7 +203,7 @@ public Task next() throws IndexOutOfBoundsException { doQueryNext(); } - if (tasksIterator.hasNext()){ + if (tasksIterator.hasNext()) { return tasksIterator.next(); } else { throw new IndexOutOfBoundsException(); From fa2c9c6cc00945f4a182c3f3ec295fb230056872 Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Mon, 27 Nov 2023 10:26:44 +0100 Subject: [PATCH 3/5] chore: fix ci --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 3ed02d579db..809c7c5ee77 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -200,7 +200,7 @@ jobs: - maven-cache_v3-<< parameters.maven-image >>- - run: name: "Check generate site" - command: mvn clean site site:stage -DskipTests + command: mvn clean install site site:stage -DskipTests deploy-snapshot: docker: From c10bb5d84b70dc024f9ff73ef2bbfec62b6a36db Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Mon, 27 Nov 2023 12:29:43 +0100 Subject: [PATCH 4/5] docs: CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index aec05e6a729..eb615923d89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 6.11.0 [unreleased] +### Features + +1. [647](https://github.com/influxdata/influxdb-client-java/pull/647): `findTasksStream` function with pagination. + ### Dependencies Update dependencies: From ded0925c596ea6d0d64c87c56126670f62c1da57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Wed, 29 Nov 2023 12:29:40 +0100 Subject: [PATCH 5/5] docs: Update CHANGELOG.md --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a55a22bae43..50bc8da18a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,10 @@ ## 6.11.0 [unreleased] ### Features -1. [#647](https://github.com/influxdata/influxdb-client-java/pull/647): `findTasksStream` function with pagination. +1. [#647](https://github.com/influxdata/influxdb-client-java/pull/647): `findTasksStream` function with pagination ### Bug Fixes -1. [#648](https://github.com/influxdata/influxdb-client-java/pull/648): With csv parsing, return empty string when `stringValue` and `defaultValue` are both an empty string. +1. [#648](https://github.com/influxdata/influxdb-client-java/pull/648): With csv parsing, return empty string when `stringValue` and `defaultValue` are both an empty string ### Dependencies