From cc066184274f277f38839acfda402a3aa92b73fd Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 22 Aug 2024 14:35:58 -0400 Subject: [PATCH 1/7] Add log message when task count is higher than partitions --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 81d8871b8e8e..9e26e91fd10f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1295,6 +1295,7 @@ private SupervisorReport payload = createReportPayload( numPartitions, includeOffsets @@ -2733,6 +2734,10 @@ private boolean updatePartitionDataFromStream() log.debug("Found [%d] partitions for stream [%s]", partitionIdsFromSupplier.size(), ioConfig.getStream()); + Integer taskCount = spec.getIoConfig().getTaskCount(); + if (partitionIdsFromSupplier.size() > spec.getIoConfig().getTaskCount()) { + log.warn("Configured task count [%s] is greater than the [%d] of partitions.", taskCount, partitionIdsFromSupplier.size()); + } Map storedMetadata = getOffsetsFromMetadataStorage(); Set storedPartitions = storedMetadata.keySet(); Set closedPartitions = storedMetadata From 03f8b7310d12c1941638a93dc8d0482a8ae9de62 Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 22 Aug 2024 14:36:29 -0400 Subject: [PATCH 2/7] newline --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 9e26e91fd10f..f9d318e618fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1295,7 +1295,6 @@ private SupervisorReport payload = createReportPayload( numPartitions, includeOffsets From 59aab591b4c7dca6633f4843f9afb9f1e13e0474 Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 22 Aug 2024 14:37:28 -0400 Subject: [PATCH 3/7] fix ordering --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f9d318e618fa..1a9b460d164e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2734,7 +2734,7 @@ private boolean updatePartitionDataFromStream() log.debug("Found [%d] partitions for stream [%s]", partitionIdsFromSupplier.size(), ioConfig.getStream()); Integer taskCount = spec.getIoConfig().getTaskCount(); - if (partitionIdsFromSupplier.size() > spec.getIoConfig().getTaskCount()) { + if (spec.getIoConfig().getTaskCount() > partitionIdsFromSupplier.size()) { log.warn("Configured task count [%s] is greater than the [%d] of partitions.", taskCount, partitionIdsFromSupplier.size()); } Map storedMetadata = getOffsetsFromMetadataStorage(); From f2e53c0d76720aed6b91144dc01953efad0f285a Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 22 Aug 2024 15:10:50 -0400 Subject: [PATCH 4/7] Add supervisor id --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1a9b460d164e..fb2038bb2dd0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2735,7 +2735,7 @@ private boolean updatePartitionDataFromStream() Integer taskCount = spec.getIoConfig().getTaskCount(); if (spec.getIoConfig().getTaskCount() > partitionIdsFromSupplier.size()) { - log.warn("Configured task count [%s] is greater than the [%d] of partitions.", taskCount, partitionIdsFromSupplier.size()); + log.warn("Configured task count [%s] is greater than the [%d] of partitions for [%s].", taskCount, partitionIdsFromSupplier.size(), supervisorId); } Map storedMetadata = getOffsetsFromMetadataStorage(); Set storedPartitions = storedMetadata.keySet(); From bf5d790a9bf169218d74674650c978de6854efcf Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 23 Aug 2024 08:27:08 -0700 Subject: [PATCH 5/7] Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java Co-authored-by: Kashif Faraz --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index fb2038bb2dd0..dbabcea77cd0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2735,7 +2735,7 @@ private boolean updatePartitionDataFromStream() Integer taskCount = spec.getIoConfig().getTaskCount(); if (spec.getIoConfig().getTaskCount() > partitionIdsFromSupplier.size()) { - log.warn("Configured task count [%s] is greater than the [%d] of partitions for [%s].", taskCount, partitionIdsFromSupplier.size(), supervisorId); + log.warn("Configured task count[%s] for supervisor[%s] is greater than the number of partitions[%d].", configuredTaskCount, supervisorId, partitionIdsFromSupplier.size()); } Map storedMetadata = getOffsetsFromMetadataStorage(); Set storedPartitions = storedMetadata.keySet(); From 93ce0a7c4899f7189ab83737db5011a4173f3ee4 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 23 Aug 2024 08:27:22 -0700 Subject: [PATCH 6/7] Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java Co-authored-by: Kashif Faraz --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index dbabcea77cd0..d5407ea4fba7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2733,7 +2733,7 @@ private boolean updatePartitionDataFromStream() log.debug("Found [%d] partitions for stream [%s]", partitionIdsFromSupplier.size(), ioConfig.getStream()); - Integer taskCount = spec.getIoConfig().getTaskCount(); + final int configuredTaskCount = spec.getIoConfig().getTaskCount(); if (spec.getIoConfig().getTaskCount() > partitionIdsFromSupplier.size()) { log.warn("Configured task count[%s] for supervisor[%s] is greater than the number of partitions[%d].", configuredTaskCount, supervisorId, partitionIdsFromSupplier.size()); } From 683fa6dce6e58acd7847e7bf2b9414a76b654627 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 23 Aug 2024 08:27:28 -0700 Subject: [PATCH 7/7] Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java Co-authored-by: Kashif Faraz --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d5407ea4fba7..d7294abc9d58 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2734,7 +2734,7 @@ private boolean updatePartitionDataFromStream() log.debug("Found [%d] partitions for stream [%s]", partitionIdsFromSupplier.size(), ioConfig.getStream()); final int configuredTaskCount = spec.getIoConfig().getTaskCount(); - if (spec.getIoConfig().getTaskCount() > partitionIdsFromSupplier.size()) { + if (configuredTaskCount > partitionIdsFromSupplier.size()) { log.warn("Configured task count[%s] for supervisor[%s] is greater than the number of partitions[%d].", configuredTaskCount, supervisorId, partitionIdsFromSupplier.size()); } Map storedMetadata = getOffsetsFromMetadataStorage();