diff --git a/CHANGELOG.md b/CHANGELOG.md index 362f1b1718..50bc8da18a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,10 @@ ## 6.11.0 [unreleased] +### Features +1. [#647](https://github.com/influxdata/influxdb-client-java/pull/647): `findTasksStream` function with pagination + ### Bug Fixes -1. [#648](https://github.com/influxdata/influxdb-client-java/pull/648): With csv parsing, return empty string when `stringValue` and `defaultValue` are both an empty string. +1. [#648](https://github.com/influxdata/influxdb-client-java/pull/648): With csv parsing, return empty string when `stringValue` and `defaultValue` are both an empty string ### Dependencies diff --git a/client/src/main/java/com/influxdb/client/TasksApi.java b/client/src/main/java/com/influxdb/client/TasksApi.java index a0739dc25f..cd8844e4e6 100644 --- a/client/src/main/java/com/influxdb/client/TasksApi.java +++ b/client/src/main/java/com/influxdb/client/TasksApi.java @@ -23,6 +23,7 @@ import java.time.OffsetDateTime; import java.util.List; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -275,6 +276,15 @@ List findTasks(@Nullable final String afterID, @Nonnull List findTasks(@Nonnull final TasksQuery query); + /** + * Query tasks, automaticaly paged by given limit (default 100). + * + * @param query query params for task + * @return A list of tasks + */ + @Nonnull + Stream findTasksStream(@Nonnull final TasksQuery query); + /** * List all task members. * diff --git a/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java b/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java index 6f079fa2b2..d39f13d75b 100644 --- a/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java @@ -22,9 +22,15 @@ package com.influxdb.client.internal; import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -147,6 +153,77 @@ public List findTasks(@Nonnull final TasksQuery query) { return tasks.getTasks(); } + @Nonnull + @Override + public Stream findTasksStream(@Nonnull final TasksQuery query) { + Iterator iterator = new Iterator() { + private boolean hasNext = true; + + @Nonnull + private Iterator tasksIterator = Collections.emptyIterator(); + + @Nullable + private String after = query.getAfter(); + + @Override + public boolean hasNext() { + if (tasksIterator.hasNext()) { + return true; + } else if (hasNext) { + doQueryNext(); + return tasksIterator.hasNext(); + } else { + return false; + } + } + + private void doQueryNext() { + Call call = service.getTasks(null, query.getName(), after, query.getUser(), + query.getOrg(), query.getOrgID(), query.getStatus(), query.getLimit(), query.getType()); + + Tasks tasks = execute(call); + + List tasksList = tasks.getTasks(); + tasksIterator = tasksList.iterator(); + if (!tasksList.isEmpty()) { + Task lastTask = tasksList.get(tasksList.size() - 1); + after = lastTask.getId(); + } + + @Nullable String nextUrl = tasks.getLinks().getNext(); + hasNext = nextUrl != null && !nextUrl.isEmpty(); + + String logMsg = "findTasksStream found: {0} has next page: {1} next after {2}: "; + LOG.log(Level.FINEST, logMsg, new Object[]{tasks, hasNext, after}); + } + + @Override + public Task next() throws IndexOutOfBoundsException { + if (!tasksIterator.hasNext() && hasNext) { + doQueryNext(); + } + + if (tasksIterator.hasNext()) { + return tasksIterator.next(); + } else { + throw new IndexOutOfBoundsException(); + } + } + + @Override + public void remove() throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + }; + + Stream stream = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), + false); + + return stream; + + } + @Nonnull @Override public Task createTask(@Nonnull final Task task) { diff --git a/client/src/test/java/com/influxdb/client/ITTasksApi.java b/client/src/test/java/com/influxdb/client/ITTasksApi.java index 9f399eb460..333023f71b 100644 --- a/client/src/test/java/com/influxdb/client/ITTasksApi.java +++ b/client/src/test/java/com/influxdb/client/ITTasksApi.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import com.influxdb.client.domain.Authorization; @@ -49,6 +51,7 @@ import com.influxdb.exceptions.NotFoundException; import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -315,6 +318,42 @@ void findTasksAfterSpecifiedID() { Assertions.assertThat(tasks.get(0).getId()).isEqualTo(task2.getId()); } + @Test + void findTasksAll() { + String taskName = generateName("it task all"); + int numOfTasks = 10; + + for (int i = 0; i < numOfTasks; i++) { + tasksApi.createTaskCron(taskName, TASK_FLUX, "0 2 * * *", organization); + } + + final TasksQuery tasksQuery = new TasksQuery(); + tasksQuery.setName(taskName); + + List tasks; + + // get tasks in 3-4 batches + tasksQuery.setLimit(numOfTasks / 3); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(numOfTasks); + + // get tasks in one equally size batch + tasksQuery.setLimit(numOfTasks); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(numOfTasks); + + // get tasks in one batch + tasksQuery.setLimit(numOfTasks + 1); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(numOfTasks); + + // get no tasks + tasksQuery.setLimit(null); + tasksQuery.setName(taskName + "___"); + tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList()); + Assertions.assertThat(tasks).hasSize(0); + } + @Test void deleteTask() {