From a92bb411fee7443827e1161d72604dff14d0123d Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 13 Nov 2019 16:25:36 -0800 Subject: [PATCH 1/5] add sequenceName and currentCheckPoint for backwards compatibility --- .../CheckPointDataSourceMetadataAction.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index fe36308b5c06..c4cd0923fcbd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -75,6 +75,28 @@ public SeekableStreamDataSourceMetadata getPreviousCheckPoint() return checkpointMetadata; } + // For backwards compatibility + @Deprecated + @JsonProperty + public SeekableStreamDataSourceMetadata getCurrentCheckPoint() + { + return checkpointMetadata; + } + + /** + * Returning a dummy value so the objects from older versions still work + * with current version + * TODO : this should be removed in the next release + * + * @return dummy value + */ + @Deprecated + @JsonProperty("sequenceName") + public String getBaseSequenceName() + { + return "dummy"; + } + @JsonProperty public SeekableStreamDataSourceMetadata getCheckpointMetadata() { From cecf8de0ba9c21e5b07d8c664f72e9d59c1d4284 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 14 Nov 2019 17:15:02 -0800 Subject: [PATCH 2/5] Add serde unit test in kafka --- ...CheckpointDataSourceMetadataSerdeTest.java | 154 ++++++++++++++++++ .../CheckPointDataSourceMetadataAction.java | 26 +++ 2 files changed, 180 insertions(+) create mode 100644 extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java new file mode 100644 index 000000000000..ad2569aa622c --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class KafkaCheckpointDataSourceMetadataSerdeTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void CheckPointDataSourceMetadataActionSerdeTest() throws IOException + { + MAPPER.registerSubtypes(KafkaDataSourceMetadata.class); + + final KafkaDataSourceMetadata kafkaDataSourceMetadata = + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ) + ); + final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( + "id_1", + 1, + null, + kafkaDataSourceMetadata + ); + + final String serialized = MAPPER.writeValueAsString(checkpointAction); + final CheckPointDataSourceMetadataAction deserialized = MAPPER.readValue( + serialized, + CheckPointDataSourceMetadataAction.class + ); + Assert.assertEquals(checkpointAction, deserialized); + } + + @Test + public void CheckPointDataSourceMetadataActionOldJsonSerdeTest() throws IOException + { + MAPPER.registerSubtypes(KafkaDataSourceMetadata.class); + final String jsonStr = "{\n" + + "\t\"type\": \"checkPointDataSourceMetadata\",\n" + + "\t\"supervisorId\": \"id_1\",\n" + + "\t\"taskGroupId\": 1,\n" + + "\t\"previousCheckPoint\": {\n" + + "\t\t\"type\": \"KafkaDataSourceMetadata\",\n" + + "\t\t\"partitions\": {\n" + + "\t\t\t\"type\": \"start\",\n" + + "\t\t\t\"stream\": \"topic\",\n" + + "\t\t\t\"topic\": \"topic\",\n" + + "\t\t\t\"partitionSequenceNumberMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"partitionOffsetMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"exclusivePartitions\": []\n" + + "\t\t}\n" + + "\t},\n" + + "\t\"checkpointMetadata\": {\n" + + "\t\t\"type\": \"KafkaDataSourceMetadata\",\n" + + "\t\t\"partitions\": {\n" + + "\t\t\t\"type\": \"start\",\n" + + "\t\t\t\"stream\": \"topic\",\n" + + "\t\t\t\"topic\": \"topic\",\n" + + "\t\t\t\"partitionSequenceNumberMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"partitionOffsetMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"exclusivePartitions\": []\n" + + "\t\t}\n" + + "\t},\n" + + "\t\"currentCheckPoint\": {\n" + + "\t\t\"type\": \"KafkaDataSourceMetadata\",\n" + + "\t\t\"partitions\": {\n" + + "\t\t\t\"type\": \"start\",\n" + + "\t\t\t\"stream\": \"topic\",\n" + + "\t\t\t\"topic\": \"topic\",\n" + + "\t\t\t\"partitionSequenceNumberMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"partitionOffsetMap\": {\n" + + "\t\t\t\t\"0\": 10,\n" + + "\t\t\t\t\"1\": 20,\n" + + "\t\t\t\t\"2\": 30\n" + + "\t\t\t},\n" + + "\t\t\t\"exclusivePartitions\": []\n" + + "\t\t}\n" + + "\t},\n" + + "\t\"sequenceName\": \"dummy\"\n" + + "}"; + + final CheckPointDataSourceMetadataAction actual = MAPPER.readValue( + jsonStr, + CheckPointDataSourceMetadataAction.class + ); + + KafkaDataSourceMetadata kafkaDataSourceMetadata = + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ) + ); + CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( + "id_1", + 1, + kafkaDataSourceMetadata, + kafkaDataSourceMetadata + ); + Assert.assertEquals(checkpointAction, actual); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index c4cd0923fcbd..14a841c4facf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import javax.annotation.Nullable; +import java.util.Objects; public class CheckPointDataSourceMetadataAction implements TaskAction { @@ -136,4 +137,29 @@ public String toString() ", checkpointMetadata=" + checkpointMetadata + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CheckPointDataSourceMetadataAction that = (CheckPointDataSourceMetadataAction) o; + if (!supervisorId.equals(that.supervisorId)) { + return false; + } + if (taskGroupId != that.taskGroupId) { + return false; + } + + if (!Objects.equals(checkpointMetadata, that.checkpointMetadata)) { + return false; + } + + return true; + } } From 7388323c306de4a57863501bc08d55480fbc9978 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 14 Nov 2019 19:01:47 -0800 Subject: [PATCH 3/5] fix checkstyle --- .../kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java index ad2569aa622c..d5b4477f4466 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaCheckpointDataSourceMetadataSerdeTest.java @@ -35,7 +35,7 @@ public class KafkaCheckpointDataSourceMetadataSerdeTest private static final ObjectMapper MAPPER = new DefaultObjectMapper(); @Test - public void CheckPointDataSourceMetadataActionSerdeTest() throws IOException + public void testCheckPointDataSourceMetadataActionSerde() throws IOException { MAPPER.registerSubtypes(KafkaDataSourceMetadata.class); @@ -63,7 +63,7 @@ public void CheckPointDataSourceMetadataActionSerdeTest() throws IOException } @Test - public void CheckPointDataSourceMetadataActionOldJsonSerdeTest() throws IOException + public void testCheckPointDataSourceMetadataActionOldJsonSerde() throws IOException { MAPPER.registerSubtypes(KafkaDataSourceMetadata.class); final String jsonStr = "{\n" From 025ed1faf918d592daec3bcf9b7b63864816b6cb Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 16 Nov 2019 21:41:05 -0800 Subject: [PATCH 4/5] add hashcode --- .../CheckPointDataSourceMetadataAction.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 14a841c4facf..20247741f62d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -69,17 +69,15 @@ public Integer getTaskGroupId() } // For backwards compatibility - @Deprecated @JsonProperty - public SeekableStreamDataSourceMetadata getPreviousCheckPoint() + private SeekableStreamDataSourceMetadata getPreviousCheckPoint() { return checkpointMetadata; } // For backwards compatibility - @Deprecated @JsonProperty - public SeekableStreamDataSourceMetadata getCurrentCheckPoint() + private SeekableStreamDataSourceMetadata getCurrentCheckPoint() { return checkpointMetadata; } @@ -91,9 +89,8 @@ public SeekableStreamDataSourceMetadata getCurrentCheckPoint() * * @return dummy value */ - @Deprecated @JsonProperty("sequenceName") - public String getBaseSequenceName() + private String getBaseSequenceName() { return "dummy"; } @@ -159,7 +156,17 @@ public boolean equals(Object o) if (!Objects.equals(checkpointMetadata, that.checkpointMetadata)) { return false; } - return true; } + + @Override + public int hashCode() + { + return Objects.hash( + supervisorId, + taskGroupId, + checkpointMetadata + ); + } + } From a73d8c1ecb304a4ce3d197ad0d5eb6816057e168 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 19 Nov 2019 10:21:28 -0800 Subject: [PATCH 5/5] update javadoc --- .../actions/CheckPointDataSourceMetadataAction.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 20247741f62d..276a9fe22708 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -83,9 +83,11 @@ private SeekableStreamDataSourceMetadata getCurrentCheckPoint() } /** - * Returning a dummy value so the objects from older versions still work - * with current version - * TODO : this should be removed in the next release + * This method is for backwards compatibility to add the missing property (sequenceName) in serialized JSON, + * so rolling-updates from older versions are compatible, a dummy value is returned since the value is not + * used in any production code as long as the json property is present + * + * TODO : this should be removed when we don't need rolling-update compatibility with version 0.15 or earlier anymore * * @return dummy value */