diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java new file mode 100644 index 000000000000..d5b4477f4466 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class KafkaCheckpointDataSourceMetadataSerdeTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testCheckPointDataSourceMetadataActionSerde() throws IOException + { + MAPPER.registerSubtypes(KafkaDataSourceMetadata.class); + + final KafkaDataSourceMetadata kafkaDataSourceMetadata = + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ) + ); + final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( + "id_1", + 1, + null, + kafkaDataSourceMetadata + ); + + final String serialized = MAPPER.writeValueAsString(checkpointAction); + final CheckPointDataSourceMetadataAction deserialized = MAPPER.readValue( + serialized, + CheckPointDataSourceMetadataAction.class + ); + Assert.assertEquals(checkpointAction, deserialized); + } + + @Test + public void testCheckPointDataSourceMetadataActionOldJsonSerde() throws IOException + { + MAPPER.registerSubtypes(KafkaDataSourceMetadata.class); + final String jsonStr = "{\n" + + "\t\"type\": \"checkPointDataSourceMetadata\",\n" + + "\t\"supervisorId\": \"id_1\",\n" + + "\t\"taskGroupId\": 1,\n" + + "\t\"previousCheckPoint\": {\n" + + "\t\t\"type\": \"KafkaDataSourceMetadata\",\n" + + "\t\t\"partitions\": {\n" + + "\t\t\t\"type\": \"start\",\n" + + "\t\t\t\"stream\": \"topic\",\n" + + "\t\t\t\"topic\": \"topic\",\n" + + "\t\t\t\"partitionSequenceNumberMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"partitionOffsetMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"exclusivePartitions\": []\n" + + "\t\t}\n" + + "\t},\n" + + "\t\"checkpointMetadata\": {\n" + + "\t\t\"type\": \"KafkaDataSourceMetadata\",\n" + + "\t\t\"partitions\": {\n" + + "\t\t\t\"type\": \"start\",\n" + + "\t\t\t\"stream\": \"topic\",\n" + + "\t\t\t\"topic\": \"topic\",\n" + + "\t\t\t\"partitionSequenceNumberMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"partitionOffsetMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"exclusivePartitions\": []\n" + + "\t\t}\n" + + "\t},\n" + + "\t\"currentCheckPoint\": {\n" + + "\t\t\"type\": \"KafkaDataSourceMetadata\",\n" + + "\t\t\"partitions\": {\n" + + "\t\t\t\"type\": \"start\",\n" + + "\t\t\t\"stream\": \"topic\",\n" + + "\t\t\t\"topic\": \"topic\",\n" + + "\t\t\t\"partitionSequenceNumberMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"partitionOffsetMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"exclusivePartitions\": []\n" + + "\t\t}\n" + + "\t},\n" + + "\t\"sequenceName\": \"dummy\"\n" + + "}"; + + final CheckPointDataSourceMetadataAction actual = MAPPER.readValue( + jsonStr, + CheckPointDataSourceMetadataAction.class + ); + + KafkaDataSourceMetadata kafkaDataSourceMetadata = + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ) + ); + CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( + "id_1", + 1, + kafkaDataSourceMetadata, + kafkaDataSourceMetadata + ); + Assert.assertEquals(checkpointAction, actual); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 46c90941636e..8cf4b16aa6a4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import javax.annotation.Nullable; +import java.util.Objects; public class CheckPointDataSourceMetadataAction implements TaskAction { @@ -80,13 +81,34 @@ public Integer getTaskGroupId() } // For backwards compatibility - @Deprecated @JsonProperty - public SeekableStreamDataSourceMetadata getPreviousCheckPoint() + private SeekableStreamDataSourceMetadata getPreviousCheckPoint() + { + return checkpointMetadata; + } + + // For backwards compatibility + @JsonProperty + private SeekableStreamDataSourceMetadata getCurrentCheckPoint() { return checkpointMetadata; } + /** + * This method is for backwards compatibility to add the missing property (sequenceName) in serialized JSON, + * so rolling-updates from older versions are compatible, a dummy value is returned since the value is not + * used in any production code as long as the json property is present + * + * TODO : this should be removed when we don't need rolling-update compatibility with version 0.15 or earlier anymore + * + * @return dummy value + */ + @JsonProperty("sequenceName") + private String getBaseSequenceName() + { + return "dummy"; + } + @JsonProperty public SeekableStreamDataSourceMetadata getCheckpointMetadata() { @@ -128,4 +150,39 @@ public String toString() ", checkpointMetadata=" + checkpointMetadata + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CheckPointDataSourceMetadataAction that = (CheckPointDataSourceMetadataAction) o; + if (!supervisorId.equals(that.supervisorId)) { + return false; + } + if (taskGroupId != that.taskGroupId) { + return false; + } + + if (!Objects.equals(checkpointMetadata, that.checkpointMetadata)) { + return false; + } + return true; + } + + @Override + public int hashCode() + { + return Objects.hash( + supervisorId, + taskGroupId, + checkpointMetadata + ); + } + }