From 11eebbfb73311ae891cd8f061718ab1d06282bb1 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Sat, 25 Mar 2023 13:05:39 +0800 Subject: [PATCH 1/2] can reset kafka Supervisor to offset by specified time --- .../kafka-supervisor-operations.md | 5 + .../MaterializedViewSupervisor.java | 6 + .../indexing/kafka/KafkaRecordSupplier.java | 25 +++ .../kinesis/KinesisRecordSupplier.java | 7 + .../supervisor/SupervisorManager.java | 19 ++ .../supervisor/SupervisorResource.java | 7 +- .../seekablestream/common/RecordSupplier.java | 10 ++ .../supervisor/SeekableStreamSupervisor.java | 169 ++++++++++++++++++ .../sampler/InputSourceSamplerTest.java | 6 + .../supervisor/SupervisorResourceTest.java | 4 +- .../RecordSupplierInputSourceTest.java | 6 + ...TestIndexerMetadataStorageCoordinator.java | 6 + .../IndexerMetadataStorageCoordinator.java | 10 ++ .../supervisor/NoopSupervisorSpec.java | 5 + .../overlord/supervisor/Supervisor.java | 2 + .../IndexerSQLMetadataStorageCoordinator.java | 28 +++ 16 files changed, 311 insertions(+), 4 deletions(-) diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md b/docs/development/extensions-core/kafka-supervisor-operations.md index fe8d1f562b66..74f88ece7c40 100644 --- a/docs/development/extensions-core/kafka-supervisor-operations.md +++ b/docs/development/extensions-core/kafka-supervisor-operations.md @@ -117,6 +117,11 @@ offsets in Kafka (depending on the value of `useEarliestOffset`). After clearing offsets, the supervisor kills and recreates any active tasks, so that tasks begin reading from valid offsets. +The `POST /druid/indexer/v1/supervisor//reset?timestamp=` operation uses +the offsets align to this timestamp, causing the supervisor to start reading offsets from +that offsets in Kafka. After clearing stored offsets, the supervisor kills and recreates any active tasks, +so that tasks begin reading from valid offsets. + Use care when using this operation! Resetting the supervisor may cause Kafka messages to be skipped or read twice, resulting in missing or duplicate data. diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index fd9303c17bc4..db31b546deb5 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -277,6 +277,12 @@ public void reset(DataSourceMetadata dataSourceMetadata) } } + @Override + public void resetToTime(long timestamp) + { + throw new UnsupportedOperationException("resetToTime() is not supported in MaterializedViewSupervisor"); + } + @Override public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index eb3833a0b69b..f9eb7bbb12a0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -37,6 +37,7 @@ import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -178,6 +179,30 @@ public Long getPosition(StreamPartition partition) ))); } + @Override + public Map getPositionFromTime(long offsetTime) + { + return wrapExceptions( + () -> { + Map timestampsToSearch = new HashMap<>(); + for (TopicPartition partition : consumer.assignment()) { + timestampsToSearch.put(partition, offsetTime); + } + Map offsetAndTimestamps = consumer.offsetsForTimes(timestampsToSearch); + return offsetAndTimestamps + .entrySet() + .stream() + .filter(e -> e.getValue() != null) + .collect( + Collectors.toMap( + e -> e.getKey().partition(), + e -> e.getValue().offset() + ) + ); + } + ); + } + @Override public Set getPartitionIds(String stream) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index f0645f8f82cc..0854788d33cc 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -628,6 +628,13 @@ public String getPosition(StreamPartition partition) throw new UnsupportedOperationException("getPosition() is not supported in Kinesis"); } + @Nullable + @Override + public Map getPositionFromTime(long offsetTime) + { + throw new UnsupportedOperationException("getPositionFromTime() is not supported in Kinesis"); + } + @Nonnull @Override public List> poll(long timeout) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 1258a1a6c582..d9b8d20af205 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -220,6 +220,25 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc return true; } + public boolean resetSupervisorToTime(String id, long timestamp) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Preconditions.checkNotNull(id, "id"); + + Pair supervisor = supervisors.get(id); + + if (supervisor == null) { + return false; + } + + supervisor.lhs.resetToTime(timestamp); + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.reset(); + } + return true; + } + public boolean checkPointDataSourceMetadata( String supervisorId, int taskGroupId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index a444d06bebc5..97335445ceaf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -454,11 +454,14 @@ public Response specGetHistory( @Path("/{id}/reset") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(SupervisorResourceFilter.class) - public Response reset(@PathParam("id") final String id) + public Response reset(@PathParam("id") final String id, @QueryParam("timestamp") final Long timestamp) { return asLeaderWithSupervisorManager( manager -> { - if (manager.resetSupervisor(id, null)) { + boolean success = timestamp != null + ? manager.resetSupervisorToTime(id, timestamp) + : manager.resetSupervisor(id, null); + if (success) { return Response.ok(ImmutableMap.of("id", id)).build(); } else { return Response.status(Response.Status.NOT_FOUND) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index b9db4202a8f3..74a39fd854f4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -123,6 +124,15 @@ boolean isOffsetAvailable(StreamPartition partition, */ SequenceOffsetType getPosition(StreamPartition partition); + /** + * returns the sequence number of all partitions at the specified timestamp + * + * @param offsetTime target timestamp in millisecond + * + * @return sequence number of all partitions representing this timestamp + */ + Map getPositionFromTime(long offsetTime); + /** * returns the set of partitions under the given stream * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0ae9aad9631e..1b90c2ab6acc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -610,6 +610,166 @@ public String getType() } } + private class OverrideNotice implements Notice + { + final DataSourceMetadata dataSourceMetadata; + private static final String TYPE = "override_notice"; + + OverrideNotice(DataSourceMetadata dataSourceMetadata) + { + this.dataSourceMetadata = dataSourceMetadata; + } + + @Override + public void handle() + { + if (!checkSourceMetadataMatch(dataSourceMetadata)) { + throw new IAE( + "Datasource metadata instance does not match required, found instance of [%s]", + dataSourceMetadata.getClass() + ); + } + @SuppressWarnings("unchecked") + final SeekableStreamDataSourceMetadata resetMetadata = + (SeekableStreamDataSourceMetadata) dataSourceMetadata; + + if (resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream())) { + final boolean metadataUpdateSuccess = indexerMetadataStorageCoordinator.overrideDataSourceMetadata(dataSource, resetMetadata); + + if (metadataUpdateSuccess) { + resetMetadata.getSeekableStreamSequenceNumbers() + .getPartitionSequenceNumberMap() + .keySet() + .forEach(partition -> { + final int groupId = getTaskGroupIdForPartition(partition); + killTaskGroupForPartitions( + ImmutableSet.of(partition), + "DataSourceMetadata is updated while override" + ); + activelyReadingTaskGroups.remove(groupId); + // killTaskGroupForPartitions() cleans up partitionGroups. + // Add the removed groups back. + partitionGroups.computeIfAbsent(groupId, k -> new HashSet<>()); + partitionOffsets.put(partition, getNotSetMarker()); + }); + } else { + throw new ISE("Unable to override metadata"); + } + } else { + log.warn( + "Override metadata stream [%s] and supervisor's stream name [%s] do not match", + resetMetadata.getSeekableStreamSequenceNumbers().getStream(), + ioConfig.getStream() + ); + } + } + + @VisibleForTesting + public void resetInternal(DataSourceMetadata dataSourceMetadata) + { + if (!checkSourceMetadataMatch(dataSourceMetadata)) { + throw new IAE( + "Datasource metadata instance does not match required, found instance of [%s]", + dataSourceMetadata.getClass() + ); + } + log.info("Reset dataSource[%s] with metadata[%s]", dataSource, dataSourceMetadata); + // Reset only the partitions in dataSourceMetadata if it has not been reset yet + @SuppressWarnings("unchecked") + final SeekableStreamDataSourceMetadata resetMetadata = + (SeekableStreamDataSourceMetadata) dataSourceMetadata; + + if (resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream())) { + // metadata can be null + final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource); + if (metadata != null && !checkSourceMetadataMatch(metadata)) { + throw new IAE( + "Datasource metadata instance does not match required, found instance of [%s]", + metadata.getClass() + ); + } + + @SuppressWarnings("unchecked") + final SeekableStreamDataSourceMetadata currentMetadata = + (SeekableStreamDataSourceMetadata) metadata; + + // defend against consecutive reset requests from replicas + // as well as the case where the metadata store do not have an entry for the reset partitions + boolean doReset = false; + for (Entry resetPartitionOffset : resetMetadata + .getSeekableStreamSequenceNumbers() + .getPartitionSequenceNumberMap() + .entrySet()) { + final SequenceOffsetType partitionOffsetInMetadataStore = currentMetadata == null + ? null + : currentMetadata + .getSeekableStreamSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(resetPartitionOffset.getKey()); + final TaskGroup partitionTaskGroup = activelyReadingTaskGroups.get( + getTaskGroupIdForPartition(resetPartitionOffset.getKey()) + ); + final boolean isSameOffset = partitionTaskGroup != null + && partitionTaskGroup.startingSequences.get(resetPartitionOffset.getKey()) + .equals(resetPartitionOffset.getValue()); + if (partitionOffsetInMetadataStore != null || isSameOffset) { + doReset = true; + break; + } + } + + if (!doReset) { + log.info("Ignoring duplicate reset request [%s]", dataSourceMetadata); + return; + } + + boolean metadataUpdateSuccess; + if (currentMetadata == null) { + metadataUpdateSuccess = true; + } else { + final DataSourceMetadata newMetadata = currentMetadata.minus(resetMetadata); + try { + metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, newMetadata); + } + catch (IOException e) { + log.error("Resetting DataSourceMetadata failed [%s]", e.getMessage()); + throw new RuntimeException(e); + } + } + if (metadataUpdateSuccess) { + resetMetadata.getSeekableStreamSequenceNumbers() + .getPartitionSequenceNumberMap() + .keySet() + .forEach(partition -> { + final int groupId = getTaskGroupIdForPartition(partition); + killTaskGroupForPartitions( + ImmutableSet.of(partition), + "DataSourceMetadata is updated while reset" + ); + activelyReadingTaskGroups.remove(groupId); + // killTaskGroupForPartitions() cleans up partitionGroups. + // Add the removed groups back. + partitionGroups.computeIfAbsent(groupId, k -> new HashSet<>()); + partitionOffsets.put(partition, getNotSetMarker()); + }); + } else { + throw new ISE("Unable to reset metadata"); + } + } else { + log.warn( + "Reset metadata stream [%s] and supervisor's stream name [%s] do not match", + resetMetadata.getSeekableStreamSequenceNumbers().getStream(), + ioConfig.getStream() + ); + } + } + + @Override + public String getType() { + return TYPE; + } + } + protected class CheckpointNotice implements Notice { private final int taskGroupId; @@ -1006,6 +1166,15 @@ public void reset(DataSourceMetadata dataSourceMetadata) addNotice(new ResetNotice(dataSourceMetadata)); } + @Override + public void resetToTime(long offsetTime) + { + log.info("Override %s's offset to time %s, posting OverrideNotice", dataSource, offsetTime); + Map offsets = recordSupplier.getPositionFromTime(offsetTime); + log.info("The %s offset to reset is %s", dataSource, offsets); + notices.add(new OverrideNotice(createDataSourceMetaDataForReset(ioConfig.getStream(), offsets))); + } + public ReentrantLock getRecordSupplierLock() { return recordSupplierLock; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index 5f6375e62974..1a8e42dfd4e3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -1696,6 +1696,12 @@ public Long getPosition(StreamPartition partition) return null; } + @Override + public Map getPositionFromTime(long offsetTime) + { + return null; + } + @Override public Set getPartitionIds(String stream) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index e15c43cc8180..4f9105ea1241 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -1085,12 +1085,12 @@ public void testReset() )).andReturn(false); replayAll(); - Response response = supervisorResource.reset("my-id"); + Response response = supervisorResource.reset("my-id", null); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity()); - response = supervisorResource.reset("my-id-2"); + response = supervisorResource.reset("my-id-2", null); Assert.assertEquals(404, response.getStatus()); Assert.assertEquals("my-id", id1.getValue()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java index cd714969e8c4..f722e80ef0a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java @@ -262,6 +262,12 @@ public Integer getPosition(StreamPartition partition) throw new UnsupportedOperationException(); } + @Override + public Map getPositionFromTime(long offsetTime) + { + throw new UnsupportedOperationException(); + } + private long getMinRowSize() { return TIMESTAMP_STRING.length() + (NUM_COLS - 1) * STR_LEN; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d64bd1d22263..bcc3246ca2cc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -67,6 +67,12 @@ public boolean deleteDataSourceMetadata(String dataSource) throw new UnsupportedOperationException(); } + @Override + public boolean overrideDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata) + { + return false; + } + @Override public boolean resetDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index b3c70f0cdbe9..5de8be8cd3dc 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -280,6 +280,16 @@ SegmentPublishResult announceHistoricalSegments( */ boolean deleteDataSourceMetadata(String dataSource); + /** + * Override dataSourceMetadata entry for 'dataSource' to the one supplied. + * + * @param dataSource identifier + * @param dataSourceMetadata value to override + * + * @return true if the entry was overrided, false otherwise + */ + boolean overrideDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata); + /** * Resets dataSourceMetadata entry for 'dataSource' to the one supplied. * diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index b19aeaa28812..7f70bab77394 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -150,6 +150,11 @@ public void reset(DataSourceMetadata dataSourceMetadata) { } + @Override + public void resetToTime(long timestamp) + { + } + @Override public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 10c48578a638..9f06a3f7058a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -63,6 +63,8 @@ default Boolean isHealthy() void reset(DataSourceMetadata dataSourceMetadata); + void resetToTime(long timestamp); + /** * The definition of checkpoint is not very strict as currently it does not affect data or control path. * On this call Supervisor can potentially checkpoint data processed so far to some durable storage diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 307bfb0508d9..affbaf887d25 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1717,6 +1717,34 @@ public Boolean withHandle(Handle handle) ); } + @Override + public boolean overrideDataSourceMetadata(final String dataSource, final DataSourceMetadata dataSourceMetadata) + { + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + handle.createStatement( + StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ) + .bind("dataSource", dataSource) + .execute(); + + final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + handle, + dataSource, + dataSourceMetadata, + dataSourceMetadata + ); + + return result == DataSourceMetadataUpdateResult.SUCCESS; + } + } + ); + } + @Override public boolean resetDataSourceMetadata(final String dataSource, final DataSourceMetadata dataSourceMetadata) throws IOException From bbedc87815e2f43af2fc43e4c439f072092c7935 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Sat, 25 Mar 2023 13:21:06 +0800 Subject: [PATCH 2/2] fix compile --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 3 ++- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1b90c2ab6acc..bb54982730ad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -765,7 +765,8 @@ public void resetInternal(DataSourceMetadata dataSourceMetadata) } @Override - public String getType() { + public String getType() + { return TYPE; } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index affbaf887d25..13719c214ddd 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1732,14 +1732,14 @@ public Boolean withHandle(Handle handle) throws Exception .bind("dataSource", dataSource) .execute(); - final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( handle, dataSource, dataSourceMetadata, dataSourceMetadata ); - return result == DataSourceMetadataUpdateResult.SUCCESS; + return result == DataStoreMetadataUpdateResult.SUCCESS; } } );