From 52d807c4886c2c9ca343167798d7e0e2ecdb9b05 Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 22 Nov 2024 13:03:42 -0500 Subject: [PATCH 01/11] Working queuing of publishing --- .../common/actions/LocalTaskActionClient.java | 2 ++ .../SegmentTransactionalInsertAction.java | 22 ++++++++++++- .../supervisor/SupervisorManager.java | 13 ++++++++ .../SeekableStreamIndexTaskRunner.java | 7 +++- .../supervisor/SeekableStreamSupervisor.java | 33 ++++++++++++++++++- .../apache/druid/error/DruidException.java | 7 +++- .../overlord/supervisor/StreamSupervisor.java | 6 ++++ 7 files changed, 86 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index 1d0059335ed1..7601566f9dfb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -69,6 +70,7 @@ private R performAction(TaskAction taskAction) return result; } catch (Throwable t) { + log.error("Failed to perform action [%s]", Arrays.toString(t.getStackTrace())); throw new RuntimeException(t); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index e8dd472cf31d..1e227da7eed2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -32,7 +33,9 @@ import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -71,6 +74,8 @@ public class SegmentTransactionalInsertAction implements TaskAction segmentsToBeOverwritten, @@ -212,7 +217,22 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) checkWithSegmentLock(); } } - + String dataSourceToInsert = segments.stream().findFirst().get().getDataSource(); + log.info("dataSource [%s]", dataSourceToInsert); + if (task instanceof SeekableStreamIndexTask) { + SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task; + log.info("Running transactional append for streaming task [%s].", seekableStreamIndexTask.getId()); + if (!toolbox.getSupervisorManager().canPublishSegments(segments.stream().findFirst().get().getDataSource(), seekableStreamIndexTask.getIOConfig().getTaskGroupId(), task.getId())) { + log.warn("Streaming task [%s] is not currently publishable.", task.getId()); + throw DruidException + .forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.SERVICE_UNAVAILABLE) + .build("Cannot append segments to [%s] right now." + + "There might be another task waiting to publish its segments. Check the overlord logs for details.", + dataSource + ); + } + } try { retVal = toolbox.getTaskLockbox().doInCriticalSection( task, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 243c41e4b5d8..acce6a37d865 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -148,6 +148,19 @@ public boolean handoffTaskGroupsEarly(String id, List taskGroupIds) return true; } + public boolean canPublishSegments(String id, Integer taskGroupId, String taskId) + { + log.info("Supervisor ids [%s]", supervisors.keys().toString()); + Pair supervisor = supervisors.get(id); + if (supervisor == null || supervisor.lhs == null) { + log.info("Could not find supervisor [%s]", id); + return true; + } + log.info("Found supervisor [%s]", id); + final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "publishSegments"); + return streamSupervisor.canPublishSegments(taskGroupId, taskId); + } + public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) { Preconditions.checkState(started, "SupervisorManager not started"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index ab437eb7a60a..b2eb25548362 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -975,7 +975,12 @@ private void checkPublishAndHandoffFailure() throws ExecutionException, Interrup private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) { - log.debug("Publishing segments for sequence [%s].", sequenceMetadata); + log.info("Publishing segments for sequence [%s].", sequenceMetadata); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } final ListenableFuture publishFuture = Futures.transform( driver.publish( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 8b4845b08d07..f37303872df9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import io.vavr.collection.Seq; import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import org.apache.commons.codec.digest.DigestUtils; @@ -146,7 +147,7 @@ * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor +public abstract class SeekableStreamSupervisor, RecordType extends ByteEntity> implements StreamSupervisor { public static final String CHECKPOINTS_CTX_KEY = "checkpoints"; @@ -1995,6 +1996,36 @@ public void handoffTaskGroupsEarly(List taskGroupIds) addNotice(new HandoffTaskGroupsNotice(taskGroupIds)); } + @Override + public boolean canPublishSegments(Integer taskGroupId, String taskId) { + log.info("Checking if publishing is allowed for [%s] [%s]", taskGroupId, taskId); + CopyOnWriteArrayList pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId); + TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); + if (taskGroupToCheck == null) { + // This function is called by the SegmentTransactionAppendAction. + // This is only triggered after a task has already started publishing so this shouldn't really happen. + // It's okay to just let the task try publishing in this case. + log.info("Did not find a task group to check for publishing"); + return true; + } + log.info("Found a task group to check for publishing [%s]", taskGroupToCheck); + for (TaskGroup taskGroup : pendingCompletionTasksForGroup) { + if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) { + log.info("Found a task group with different starting sequences [%s]", taskGroup.startingSequences); + for (PartitionIdType sequence : taskGroup.startingSequences.keySet()) { + SequenceOffsetType publishingGroupOffset = taskGroupToCheck.startingSequences.getOrDefault(sequence, null); + SequenceOffsetType taskGroupOffset = taskGroup.startingSequences.getOrDefault(sequence, null); + if (publishingGroupOffset != null && taskGroupOffset != null) { + if (publishingGroupOffset.compareTo(taskGroupOffset) > 0) { + return false; + } + } + } + } + } + return true; + } + private void discoverTasks() throws ExecutionException, InterruptedException { int taskCount = 0; diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index 66190d13a91f..7c36869fd677 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -398,7 +398,12 @@ public enum Category * A catch-all for any time when we cannot come up with a meaningful categorization. This is hopefully only * used when converting generic exceptions from frameworks and libraries that we do not control into DruidExceptions */ - UNCATEGORIZED(500); + UNCATEGORIZED(500), + + /** + * Indicates the druid service is not available. This error code is retriable. + */ + SERVICE_UNAVAILABLE(503); private final int expectedStatus; diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java index cbae3c4eaa45..0d1052f2416b 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java @@ -22,6 +22,7 @@ import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.java.util.common.Pair; import java.util.List; @@ -69,4 +70,9 @@ default void handoffTaskGroupsEarly(List taskGroupIds) { throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented"); } + + default boolean canPublishSegments(Integer taskGroupId, String taskId) + { + return true; + } } From 64e3851160f5619457c7265692c29ca453789fc7 Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 22 Nov 2024 13:51:57 -0500 Subject: [PATCH 02/11] fix style --- .../actions/SegmentTransactionalInsertAction.java | 6 ++---- .../overlord/supervisor/SupervisorManager.java | 13 ++++++------- .../SeekableStreamIndexTaskRunner.java | 7 +------ .../supervisor/SeekableStreamSupervisor.java | 11 +++++------ .../overlord/supervisor/StreamSupervisor.java | 1 - 5 files changed, 14 insertions(+), 24 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 1e227da7eed2..a3547eac76c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -218,18 +218,16 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) } } String dataSourceToInsert = segments.stream().findFirst().get().getDataSource(); - log.info("dataSource [%s]", dataSourceToInsert); if (task instanceof SeekableStreamIndexTask) { SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task; - log.info("Running transactional append for streaming task [%s].", seekableStreamIndexTask.getId()); - if (!toolbox.getSupervisorManager().canPublishSegments(segments.stream().findFirst().get().getDataSource(), seekableStreamIndexTask.getIOConfig().getTaskGroupId(), task.getId())) { + if (!toolbox.getSupervisorManager().canPublishSegments(dataSourceToInsert, seekableStreamIndexTask.getIOConfig().getTaskGroupId(), task.getId())) { log.warn("Streaming task [%s] is not currently publishable.", task.getId()); throw DruidException .forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.SERVICE_UNAVAILABLE) .build("Cannot append segments to [%s] right now." + "There might be another task waiting to publish its segments. Check the overlord logs for details.", - dataSource + dataSourceToInsert ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index acce6a37d865..2aed70c1b8db 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -150,15 +150,14 @@ public boolean handoffTaskGroupsEarly(String id, List taskGroupIds) public boolean canPublishSegments(String id, Integer taskGroupId, String taskId) { - log.info("Supervisor ids [%s]", supervisors.keys().toString()); - Pair supervisor = supervisors.get(id); - if (supervisor == null || supervisor.lhs == null) { - log.info("Could not find supervisor [%s]", id); + try { + final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "publishSegments"); + return streamSupervisor.canPublishSegments(taskGroupId, taskId); + } + catch (Exception e) { + // If the publishing task is not a streaming task for whatever reason, allow it to publish. return true; } - log.info("Found supervisor [%s]", id); - final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "publishSegments"); - return streamSupervisor.canPublishSegments(taskGroupId, taskId); } public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index b2eb25548362..ab437eb7a60a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -975,12 +975,7 @@ private void checkPublishAndHandoffFailure() throws ExecutionException, Interrup private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) { - log.info("Publishing segments for sequence [%s].", sequenceMetadata); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + log.debug("Publishing segments for sequence [%s].", sequenceMetadata); final ListenableFuture publishFuture = Futures.transform( driver.publish( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f37303872df9..3042d9199f32 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -36,7 +36,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.vavr.collection.Seq; import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import org.apache.commons.codec.digest.DigestUtils; @@ -1997,25 +1996,25 @@ public void handoffTaskGroupsEarly(List taskGroupIds) } @Override - public boolean canPublishSegments(Integer taskGroupId, String taskId) { - log.info("Checking if publishing is allowed for [%s] [%s]", taskGroupId, taskId); + public boolean canPublishSegments(Integer taskGroupId, String taskId) + { CopyOnWriteArrayList pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId); TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); if (taskGroupToCheck == null) { // This function is called by the SegmentTransactionAppendAction. // This is only triggered after a task has already started publishing so this shouldn't really happen. // It's okay to just let the task try publishing in this case. - log.info("Did not find a task group to check for publishing"); + log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); return true; } - log.info("Found a task group to check for publishing [%s]", taskGroupToCheck); + for (TaskGroup taskGroup : pendingCompletionTasksForGroup) { if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) { - log.info("Found a task group with different starting sequences [%s]", taskGroup.startingSequences); for (PartitionIdType sequence : taskGroup.startingSequences.keySet()) { SequenceOffsetType publishingGroupOffset = taskGroupToCheck.startingSequences.getOrDefault(sequence, null); SequenceOffsetType taskGroupOffset = taskGroup.startingSequences.getOrDefault(sequence, null); if (publishingGroupOffset != null && taskGroupOffset != null) { + // The group that is trying to publish is ahead of a task that is still publishing. It should wait to publish. if (publishingGroupOffset.compareTo(taskGroupOffset) > 0) { return false; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java index 0d1052f2416b..43338e3f569a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java @@ -22,7 +22,6 @@ import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; -import org.apache.druid.java.util.common.Pair; import java.util.List; From 1c05c2f5f5716f18a996deb715e094d0b95da23d Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 25 Nov 2024 16:15:33 -0500 Subject: [PATCH 03/11] Add unit tests --- .../SegmentTransactionalInsertAction.java | 2 +- .../SegmentTransactionalInsertActionTest.java | 111 +++++++++++++++++- .../supervisor/SupervisorManagerTest.java | 46 ++++++++ .../supervisor/StreamSupervisorTest.java | 65 ++++++++++ 4 files changed, 222 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index a3547eac76c7..5888fb27fb73 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -225,7 +225,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throw DruidException .forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.SERVICE_UNAVAILABLE) - .build("Cannot append segments to [%s] right now." + + .build("Cannot append segments to [%s] right now. " + "There might be another task waiting to publish its segments. Check the overlord logs for details.", dataSourceToInsert ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 44ce60b5ceb2..e39db67d8fe1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -22,25 +22,45 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.assertj.core.api.Assertions; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; -public class SegmentTransactionalInsertActionTest +import javax.annotation.Nullable; +import java.util.Map; + +@RunWith(EasyMockRunner.class) +public class SegmentTransactionalInsertActionTest extends EasyMockSupport { @Rule public TaskActionTestKit actionTestKit = new TaskActionTestKit(); @@ -86,6 +106,12 @@ public class SegmentTransactionalInsertActionTest 1024 ); + @Mock + private TaskActionToolbox taskActionToolbox; + + @Mock + SupervisorManager supervisorManager; + private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs) throws InterruptedException { @@ -175,4 +201,87 @@ public void testFailBadVersion() throws Exception ); Assert.assertTrue(exception.getMessage().contains("are not covered by locks")); } + + @Test + public void testStreamingTaskNotPublishable() throws Exception + { + // Mocking the config classes because they have a lot of logic in their constructors that we don't really want here. + SeekableStreamIndexTaskTuningConfig taskTuningConfig = EasyMock.createMock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig taskIOConfig = EasyMock.createMock(SeekableStreamIndexTaskIOConfig.class); + + final SeekableStreamIndexTask streamingTask = new TestSeekableStreamIndexTask( + "id1", + null, + DataSchema.builder().withDataSource(DATA_SOURCE).build(), + taskTuningConfig, + taskIOConfig, + ImmutableMap.of(), + "0" + ); + + EasyMock.expect(taskActionToolbox.getSupervisorManager()).andReturn(supervisorManager); + EasyMock.expect(taskActionToolbox.getTaskLockbox()).andReturn(actionTestKit.getTaskLockbox()); + EasyMock.expect(supervisorManager.canPublishSegments(EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyString())) + .andReturn(false); + + actionTestKit.getTaskLockbox().add(streamingTask); + acquireTimeChunkLock(TaskLockType.EXCLUSIVE, streamingTask, INTERVAL, 5000); + replayAll(); + + DruidException druidException = Assert.assertThrows(DruidException.class, () -> SegmentTransactionalInsertAction.appendAction( + ImmutableSet.of(SEGMENT1), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableList.of(1)), + null + ).perform( + streamingTask, + taskActionToolbox + )); + verifyAll(); + + Assert.assertEquals(503, druidException.getStatusCode()); + } + + private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask + { + public TestSeekableStreamIndexTask( + String id, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId + ) + { + super( + id, + taskResource, + dataSchema, + tuningConfig, + ioConfig, + context, + groupId + ); + } + + @Override + protected SeekableStreamIndexTaskRunner createTaskRunner() + { + return null; + } + + @Override + protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) + { + return null; + } + + @Override + public String getType() + { + return "test"; + } + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 81153f238a4e..c98c7aeb3b6f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -291,6 +291,52 @@ public void testHandoffTaskGroupsEarlyOnNonStreamSupervisor() verifyAll(); } + @Test + public void testCanPublishSegments_returnsFalse() + { + String taskId = "id1"; + String supervisorId = "supervisor-id"; + Integer groupId = 1; + Map existingSpecs = ImmutableMap.of( + supervisorId, new TestSupervisorSpec(supervisorId, supervisor1) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + supervisor1.start(); + EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andReturn(false); + + replayAll(); + + manager.start(); + + Assert.assertFalse(manager.canPublishSegments(supervisorId, groupId, taskId)); + + verifyAll(); + } + + @Test + public void testCanPublishSegments_throwsException_returnsTrue() + { + String taskId = "id1"; + String supervisorId = "supervisor-id"; + Integer groupId = 1; + Map existingSpecs = ImmutableMap.of( + supervisorId, new TestSupervisorSpec(supervisorId, supervisor1) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + supervisor1.start(); + EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andThrow(new RuntimeException()); + + replayAll(); + + manager.start(); + + Assert.assertTrue(manager.canPublishSegments(supervisorId, groupId, taskId)); + + verifyAll(); + } + @Test public void testStartAlreadyStarted() { diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java index 287a13c34366..93b20777a582 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java @@ -100,4 +100,69 @@ public int getActiveTaskGroupsCount() ex.getMessage() ); } + + @Test + public void testDefaultCanPublishSegments() + { + // Create an instance of stream supervisor without overriding handoffTaskGroupsEarly(). + final StreamSupervisor streamSupervisor = new StreamSupervisor() + { + + @Override + public void start() + { + + } + + @Override + public void stop(boolean stopGracefully) + { + + } + + @Override + public SupervisorReport getStatus() + { + return null; + } + + @Override + public SupervisorStateManager.State getState() + { + return null; + } + + @Override + public void reset(@Nullable DataSourceMetadata dataSourceMetadata) + { + + } + + @Override + public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) + { + + } + + @Override + public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) + { + + } + + @Override + public LagStats computeLagStats() + { + return null; + } + + @Override + public int getActiveTaskGroupsCount() + { + return 0; + } + }; + + Assert.assertTrue(streamSupervisor.canPublishSegments(1, "taskId")); + } } From b6fc5793cdecf991a54875052c2de658beafb69d Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 26 Nov 2024 13:04:08 -0500 Subject: [PATCH 04/11] add tests --- .../supervisor/SeekableStreamSupervisor.java | 8 +++++-- .../SeekableStreamSupervisorStateTest.java | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 3042d9199f32..d437427a9598 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1999,14 +1999,18 @@ public void handoffTaskGroupsEarly(List taskGroupIds) public boolean canPublishSegments(Integer taskGroupId, String taskId) { CopyOnWriteArrayList pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId); - TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); - if (taskGroupToCheck == null) { + if (pendingCompletionTasksForGroup == null) { // This function is called by the SegmentTransactionAppendAction. // This is only triggered after a task has already started publishing so this shouldn't really happen. // It's okay to just let the task try publishing in this case. log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); return true; } + TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); + if (taskGroupToCheck == null) { + log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); + return true; + } for (TaskGroup taskGroup : pendingCompletionTasksForGroup) { if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..7d8ad815cfcb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1602,6 +1602,29 @@ public Duration getEmissionDuration() verifyAll(); } + @Test + public void testSupervisorCanPublish() + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + replayAll(); + + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + Map startingPartitions = new HashMap<>(); + startingPartitions.put("partition", 0); + Map checkpointPartitions = new HashMap<>(); + checkpointPartitions.put("partition", 10); + Assert.assertTrue(supervisor.canPublishSegments(0, "unknown_task")); + supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_1", startingPartitions); + Assert.assertTrue(supervisor.canPublishSegments(0, "task_1")); + supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_2", checkpointPartitions); + Assert.assertTrue(supervisor.canPublishSegments(0, "task_1")); + Assert.assertFalse(supervisor.canPublishSegments(0, "task_2")); + } + @Test public void testEmitBothLag() throws Exception { From 6b036cf993aa86d65819aa05a91870357ea9b462 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 9 Dec 2024 12:36:17 -0500 Subject: [PATCH 05/11] retry within the connector --- .../SegmentTransactionalInsertAction.java | 20 +--- .../supervisor/SupervisorManager.java | 12 -- .../supervisor/SeekableStreamSupervisor.java | 36 +----- .../SegmentTransactionalInsertActionTest.java | 111 +----------------- .../supervisor/SupervisorManagerTest.java | 46 -------- .../SeekableStreamSupervisorStateTest.java | 23 ---- .../apache/druid/error/DruidException.java | 7 +- .../overlord/supervisor/StreamSupervisor.java | 5 - .../IndexerSQLMetadataStorageCoordinator.java | 2 +- .../supervisor/StreamSupervisorTest.java | 65 ---------- 10 files changed, 5 insertions(+), 322 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 5888fb27fb73..e8dd472cf31d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -33,9 +32,7 @@ import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -74,8 +71,6 @@ public class SegmentTransactionalInsertAction implements TaskAction segmentsToBeOverwritten, @@ -217,20 +212,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) checkWithSegmentLock(); } } - String dataSourceToInsert = segments.stream().findFirst().get().getDataSource(); - if (task instanceof SeekableStreamIndexTask) { - SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task; - if (!toolbox.getSupervisorManager().canPublishSegments(dataSourceToInsert, seekableStreamIndexTask.getIOConfig().getTaskGroupId(), task.getId())) { - log.warn("Streaming task [%s] is not currently publishable.", task.getId()); - throw DruidException - .forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.SERVICE_UNAVAILABLE) - .build("Cannot append segments to [%s] right now. " + - "There might be another task waiting to publish its segments. Check the overlord logs for details.", - dataSourceToInsert - ); - } - } + try { retVal = toolbox.getTaskLockbox().doInCriticalSection( task, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 2aed70c1b8db..243c41e4b5d8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -148,18 +148,6 @@ public boolean handoffTaskGroupsEarly(String id, List taskGroupIds) return true; } - public boolean canPublishSegments(String id, Integer taskGroupId, String taskId) - { - try { - final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "publishSegments"); - return streamSupervisor.canPublishSegments(taskGroupId, taskId); - } - catch (Exception e) { - // If the publishing task is not a streaming task for whatever reason, allow it to publish. - return true; - } - } - public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) { Preconditions.checkState(started, "SupervisorManager not started"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d437427a9598..8b4845b08d07 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -146,7 +146,7 @@ * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor, RecordType extends ByteEntity> +public abstract class SeekableStreamSupervisor implements StreamSupervisor { public static final String CHECKPOINTS_CTX_KEY = "checkpoints"; @@ -1995,40 +1995,6 @@ public void handoffTaskGroupsEarly(List taskGroupIds) addNotice(new HandoffTaskGroupsNotice(taskGroupIds)); } - @Override - public boolean canPublishSegments(Integer taskGroupId, String taskId) - { - CopyOnWriteArrayList pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId); - if (pendingCompletionTasksForGroup == null) { - // This function is called by the SegmentTransactionAppendAction. - // This is only triggered after a task has already started publishing so this shouldn't really happen. - // It's okay to just let the task try publishing in this case. - log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); - return true; - } - TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); - if (taskGroupToCheck == null) { - log.warn("Did not find task group [%s] to check for publishing.", taskGroupId); - return true; - } - - for (TaskGroup taskGroup : pendingCompletionTasksForGroup) { - if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) { - for (PartitionIdType sequence : taskGroup.startingSequences.keySet()) { - SequenceOffsetType publishingGroupOffset = taskGroupToCheck.startingSequences.getOrDefault(sequence, null); - SequenceOffsetType taskGroupOffset = taskGroup.startingSequences.getOrDefault(sequence, null); - if (publishingGroupOffset != null && taskGroupOffset != null) { - // The group that is trying to publish is ahead of a task that is still publishing. It should wait to publish. - if (publishingGroupOffset.compareTo(taskGroupOffset) > 0) { - return false; - } - } - } - } - } - return true; - } - private void discoverTasks() throws ExecutionException, InterruptedException { int taskCount = 0; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index e39db67d8fe1..44ce60b5ceb2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -22,45 +22,25 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.apache.druid.data.input.impl.ByteEntity; -import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; -import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; -import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.assertj.core.api.Assertions; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.EasyMockSupport; -import org.easymock.Mock; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; -import javax.annotation.Nullable; -import java.util.Map; - -@RunWith(EasyMockRunner.class) -public class SegmentTransactionalInsertActionTest extends EasyMockSupport +public class SegmentTransactionalInsertActionTest { @Rule public TaskActionTestKit actionTestKit = new TaskActionTestKit(); @@ -106,12 +86,6 @@ public class SegmentTransactionalInsertActionTest extends EasyMockSupport 1024 ); - @Mock - private TaskActionToolbox taskActionToolbox; - - @Mock - SupervisorManager supervisorManager; - private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs) throws InterruptedException { @@ -201,87 +175,4 @@ public void testFailBadVersion() throws Exception ); Assert.assertTrue(exception.getMessage().contains("are not covered by locks")); } - - @Test - public void testStreamingTaskNotPublishable() throws Exception - { - // Mocking the config classes because they have a lot of logic in their constructors that we don't really want here. - SeekableStreamIndexTaskTuningConfig taskTuningConfig = EasyMock.createMock(SeekableStreamIndexTaskTuningConfig.class); - SeekableStreamIndexTaskIOConfig taskIOConfig = EasyMock.createMock(SeekableStreamIndexTaskIOConfig.class); - - final SeekableStreamIndexTask streamingTask = new TestSeekableStreamIndexTask( - "id1", - null, - DataSchema.builder().withDataSource(DATA_SOURCE).build(), - taskTuningConfig, - taskIOConfig, - ImmutableMap.of(), - "0" - ); - - EasyMock.expect(taskActionToolbox.getSupervisorManager()).andReturn(supervisorManager); - EasyMock.expect(taskActionToolbox.getTaskLockbox()).andReturn(actionTestKit.getTaskLockbox()); - EasyMock.expect(supervisorManager.canPublishSegments(EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyString())) - .andReturn(false); - - actionTestKit.getTaskLockbox().add(streamingTask); - acquireTimeChunkLock(TaskLockType.EXCLUSIVE, streamingTask, INTERVAL, 5000); - replayAll(); - - DruidException druidException = Assert.assertThrows(DruidException.class, () -> SegmentTransactionalInsertAction.appendAction( - ImmutableSet.of(SEGMENT1), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableList.of(1)), - null - ).perform( - streamingTask, - taskActionToolbox - )); - verifyAll(); - - Assert.assertEquals(503, druidException.getStatusCode()); - } - - private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask - { - public TestSeekableStreamIndexTask( - String id, - @Nullable TaskResource taskResource, - DataSchema dataSchema, - SeekableStreamIndexTaskTuningConfig tuningConfig, - SeekableStreamIndexTaskIOConfig ioConfig, - @Nullable Map context, - @Nullable String groupId - ) - { - super( - id, - taskResource, - dataSchema, - tuningConfig, - ioConfig, - context, - groupId - ); - } - - @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() - { - return null; - } - - @Override - protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) - { - return null; - } - - @Override - public String getType() - { - return "test"; - } - } - } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index c98c7aeb3b6f..81153f238a4e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -291,52 +291,6 @@ public void testHandoffTaskGroupsEarlyOnNonStreamSupervisor() verifyAll(); } - @Test - public void testCanPublishSegments_returnsFalse() - { - String taskId = "id1"; - String supervisorId = "supervisor-id"; - Integer groupId = 1; - Map existingSpecs = ImmutableMap.of( - supervisorId, new TestSupervisorSpec(supervisorId, supervisor1) - ); - - EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); - supervisor1.start(); - EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andReturn(false); - - replayAll(); - - manager.start(); - - Assert.assertFalse(manager.canPublishSegments(supervisorId, groupId, taskId)); - - verifyAll(); - } - - @Test - public void testCanPublishSegments_throwsException_returnsTrue() - { - String taskId = "id1"; - String supervisorId = "supervisor-id"; - Integer groupId = 1; - Map existingSpecs = ImmutableMap.of( - supervisorId, new TestSupervisorSpec(supervisorId, supervisor1) - ); - - EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); - supervisor1.start(); - EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andThrow(new RuntimeException()); - - replayAll(); - - manager.start(); - - Assert.assertTrue(manager.canPublishSegments(supervisorId, groupId, taskId)); - - verifyAll(); - } - @Test public void testStartAlreadyStarted() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 7d8ad815cfcb..af66ce3b8b97 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1602,29 +1602,6 @@ public Duration getEmissionDuration() verifyAll(); } - @Test - public void testSupervisorCanPublish() - { - EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); - EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); - EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); - - replayAll(); - - SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); - Map startingPartitions = new HashMap<>(); - startingPartitions.put("partition", 0); - Map checkpointPartitions = new HashMap<>(); - checkpointPartitions.put("partition", 10); - Assert.assertTrue(supervisor.canPublishSegments(0, "unknown_task")); - supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_1", startingPartitions); - Assert.assertTrue(supervisor.canPublishSegments(0, "task_1")); - supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_2", checkpointPartitions); - Assert.assertTrue(supervisor.canPublishSegments(0, "task_1")); - Assert.assertFalse(supervisor.canPublishSegments(0, "task_2")); - } - @Test public void testEmitBothLag() throws Exception { diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index 7c36869fd677..66190d13a91f 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -398,12 +398,7 @@ public enum Category * A catch-all for any time when we cannot come up with a meaningful categorization. This is hopefully only * used when converting generic exceptions from frameworks and libraries that we do not control into DruidExceptions */ - UNCATEGORIZED(500), - - /** - * Indicates the druid service is not available. This error code is retriable. - */ - SERVICE_UNAVAILABLE(503); + UNCATEGORIZED(500); private final int expectedStatus; diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java index 43338e3f569a..cbae3c4eaa45 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java @@ -69,9 +69,4 @@ default void handoffTaskGroupsEarly(List taskGroupIds) { throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented"); } - - default boolean canPublishSegments(Integer taskGroupId, String taskId) - { - return true; - } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a512f7935740..6e9b9356b5cd 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2661,7 +2661,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { // Offsets stored in startMetadata is greater than the last commited metadata. - return DataStoreMetadataUpdateResult.failure( + return DataStoreMetadataUpdateResult.retryableFailure( "The new start metadata state[%s] is ahead of the last committed" + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java index 93b20777a582..287a13c34366 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java @@ -100,69 +100,4 @@ public int getActiveTaskGroupsCount() ex.getMessage() ); } - - @Test - public void testDefaultCanPublishSegments() - { - // Create an instance of stream supervisor without overriding handoffTaskGroupsEarly(). - final StreamSupervisor streamSupervisor = new StreamSupervisor() - { - - @Override - public void start() - { - - } - - @Override - public void stop(boolean stopGracefully) - { - - } - - @Override - public SupervisorReport getStatus() - { - return null; - } - - @Override - public SupervisorStateManager.State getState() - { - return null; - } - - @Override - public void reset(@Nullable DataSourceMetadata dataSourceMetadata) - { - - } - - @Override - public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) - { - - } - - @Override - public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) - { - - } - - @Override - public LagStats computeLagStats() - { - return null; - } - - @Override - public int getActiveTaskGroupsCount() - { - return 0; - } - }; - - Assert.assertTrue(streamSupervisor.canPublishSegments(1, "taskId")); - } } From ccd34f4dacbd94f2c4694ea606b5ae1c2acb0036 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 9 Dec 2024 13:56:08 -0500 Subject: [PATCH 06/11] fix unit tests --- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 4b592e5f40da..b913316068ce 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -784,15 +784,15 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() ); Assert.assertEquals( SegmentPublishResult.fail( - InvalidInput.exception( + new RetryTransactionException( "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed" + " end state[null]. Try resetting the supervisor." ).toString()), result1 ); - // Should only be tried once. - Assert.assertEquals(1, metadataUpdateCounter.get()); + // Should be retried. + Assert.assertEquals(2, metadataUpdateCounter.get()); } @Test From 98e35ac1ff1ff332d9257187920556109f606c2c Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Tue, 10 Dec 2024 12:09:11 -0500 Subject: [PATCH 07/11] Update indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java Co-authored-by: Kashif Faraz --- .../druid/indexing/common/actions/LocalTaskActionClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index 7601566f9dfb..0d0a53ec28b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -70,7 +70,7 @@ private R performAction(TaskAction taskAction) return result; } catch (Throwable t) { - log.error("Failed to perform action [%s]", Arrays.toString(t.getStackTrace())); + log.error(t, "Failed to perform action[%s]", taskAction); throw new RuntimeException(t); } } From e4dc41a533c65cfdf0115d5f5fe1d195b4ce7aab Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 10 Dec 2024 12:16:28 -0500 Subject: [PATCH 08/11] Add comment --- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 6e9b9356b5cd..9c504fc9dade 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2661,6 +2661,8 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { // Offsets stored in startMetadata is greater than the last commited metadata. + // This can happen because the previous task is still publishing its segments and can resolve once + // the previous task finishes publishing. return DataStoreMetadataUpdateResult.retryableFailure( "The new start metadata state[%s] is ahead of the last committed" + " end state[%s]. Try resetting the supervisor.", From 67397f77b3e4a25584475b44a19df652150d69d6 Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 10 Dec 2024 13:05:54 -0500 Subject: [PATCH 09/11] fix style --- .../druid/indexing/common/actions/LocalTaskActionClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index 0d0a53ec28b8..81ff137ed179 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -27,7 +27,6 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import javax.annotation.Nullable; -import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; From 7132f6baa1704bab547d605e94f0c89c2881eb7d Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 11 Dec 2024 14:27:58 -0500 Subject: [PATCH 10/11] Fix unit tests --- .../common/actions/SegmentTransactionalInsertActionTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 44ce60b5ceb2..36e5626a7956 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.metadata.RetryTransactionException; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.assertj.core.api.Assertions; @@ -151,7 +152,7 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception Assert.assertEquals( SegmentPublishResult.fail( - InvalidInput.exception( + new RetryTransactionException( "The new start metadata state[ObjectMetadata{theObject=[1]}] is" + " ahead of the last committed end state[null]. Try resetting the supervisor." ).toString() From 2d47fbe88aaac449ddd5be80edb1ea033008adcb Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 11 Dec 2024 15:51:04 -0500 Subject: [PATCH 11/11] style fix --- .../common/actions/SegmentTransactionalInsertActionTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 36e5626a7956..7d5f4488fea0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task;