diff --git a/README.md b/README.md
index c3a8b3f6..39431484 100644
--- a/README.md
+++ b/README.md
@@ -230,6 +230,48 @@ will produce the following schema
```
+
+##### DeduplicateKafkaSinkTransformer
+`DeduplicateKafkaSinkTransformer` deduplicates records in a query from a Kafka source to a Kafka destination in a rerun after a failure.
+Records are identified across source and destination topic by a user-defined id, which may be a composite id and may include consumer record
+properties such as offset, partition, but also fields from the key or value schema.
+Deduplication is needed because the Kafka-destination provides only a at-least-once guarantee. Deduplication works by getting the ids
+from the last partial run in the destination topic and excluding them in the query.
+
+Note that there must be only one source and one destination topic, and there must be only one writer writing to the destination topic, and
+no records must have been written to the destination topic after the partial run. Otherwise, records may still be duplicated.
+
+To use this transformer, `KafkaStreamReader`, `ConfluentAvroDecodingTransformer`, `ConfluentAvroEncodingTransformer` and `KafkaStreamWriter`
+must be configured as well.
+
+Note that usage of the star-operator `*` within column names is not supported and may lead to unexpected behaviour.
+
+To add the transformer to the pipeline use this class name:
+```
+component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka.DeduplicateKafkaSinkTransformer
+```
+
+| Property Name | Required | Description |
+| :--- | :---: | :--- |
+| `transformer.{transformer-id}.source.id.columns` | Yes | A comma-separated list of consumer record properties that define the composite id. For example, `offset, partition` or `key.some_user_id`. |
+| `transformer.{transformer-id}.destination.id.columns` | Yes | A comma-separated list of consumer record properties that define the composite id. For example, `value.src_offset, value.src_partition` or `key.some_user_id`. |
+| `transformer.{transformer-id}.kafka.consumer.timeout` | No | Kafka consumer timeout in seconds. The default value is 120s. |
+
+The following fields can be selected on the consumer record
+
+- `topic`
+- `offset`
+- `partition`
+- `timestamp`
+- `timestampType`
+- `serializedKeySize`
+- `serializedValueSize`
+- `key`
+- `value`
+
+In case of `key` and `value`, the fields of their schemas can be specified by adding a dot, e.g.
+`key.some_nested_record.some_id` or likewise `value.some_nested_record.some_id`
+
See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`.
##### ParquetStreamWriter
| Property Name | Required | Description |
diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala
index 8b512cf7..6c049cc2 100644
--- a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala
+++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/transformer/StreamTransformerFactory.scala
@@ -31,3 +31,9 @@ trait StreamTransformerFactory extends ComponentFactory[StreamTransformer] {
*/
def getMappingFromRetainedGlobalConfigToLocalConfig(globalConfig: Configuration): Map[String, String] = Map()
}
+
+object StreamTransformerFactory {
+ val IdsKeyPrefix = "component.transformer.id"
+ val ClassKeyPrefix = "component.transformer.class"
+ val TransformerKeyPrefix = "transformer"
+}
diff --git a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/utils/ConfigUtils.scala b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/utils/ConfigUtils.scala
index e9e1e418..2353a359 100644
--- a/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/utils/ConfigUtils.scala
+++ b/api/src/main/scala/za/co/absa/hyperdrive/ingestor/api/utils/ConfigUtils.scala
@@ -15,8 +15,10 @@
package za.co.absa.hyperdrive.ingestor.api.utils
-import org.apache.commons.configuration2.Configuration
+import org.apache.commons.configuration2.{Configuration, ConfigurationConverter}
+import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
+import scala.collection.mutable
import scala.util.{Failure, Success, Try}
object ConfigUtils {
@@ -98,4 +100,16 @@ object ConfigUtils {
Success(target)
}
}
+
+ def getTransformerPrefix[T <: StreamTransformer](config: Configuration, transformerClass: Class[T]): Option[String] = {
+ import scala.collection.JavaConverters._
+ val className = transformerClass.getCanonicalName
+ val transformerPrefixConfig = config.subset(StreamTransformerFactory.ClassKeyPrefix)
+ val transformerPrefixMap = ConfigurationConverter.getMap(transformerPrefixConfig).asScala
+ transformerPrefixMap.find {
+ case (_: String, value: String) => value == className
+ }.map {
+ case (key: String, _) => key
+ }
+ }
}
diff --git a/api/src/test/scala/za/co/absa/hyperdrive/ingestor/api/utils/DummyStreamTransformer.scala b/api/src/test/scala/za/co/absa/hyperdrive/ingestor/api/utils/DummyStreamTransformer.scala
new file mode 100644
index 00000000..979d2dbc
--- /dev/null
+++ b/api/src/test/scala/za/co/absa/hyperdrive/ingestor/api/utils/DummyStreamTransformer.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.api.utils
+
+import org.apache.spark.sql.DataFrame
+import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer
+
+class DummyStreamTransformer extends StreamTransformer {
+ override def transform(streamData: DataFrame): DataFrame = ???
+}
diff --git a/api/src/test/scala/za/co/absa/hyperdrive/ingestor/api/utils/TestConfigUtils.scala b/api/src/test/scala/za/co/absa/hyperdrive/ingestor/api/utils/TestConfigUtils.scala
index 86287393..8bf44dd2 100644
--- a/api/src/test/scala/za/co/absa/hyperdrive/ingestor/api/utils/TestConfigUtils.scala
+++ b/api/src/test/scala/za/co/absa/hyperdrive/ingestor/api/utils/TestConfigUtils.scala
@@ -19,6 +19,7 @@ import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.commons.configuration2.{BaseConfiguration, Configuration}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
+import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactory
class TestConfigUtils extends FlatSpec with Matchers with MockitoSugar {
@@ -322,4 +323,21 @@ class TestConfigUtils extends FlatSpec with Matchers with MockitoSugar {
val ex4 = the[Exception] thrownBy ConfigUtils.getOptionalBoolean("key4", config)
ex4.getMessage should include("key4")
}
+
+ "getTransformerPrefix" should "get the prefix of a transformer class" in {
+ val config = new BaseConfiguration
+ config.addProperty(s"${StreamTransformerFactory.ClassKeyPrefix}.[dummy-transformer]", classOf[DummyStreamTransformer].getCanonicalName)
+
+ val prefix = ConfigUtils.getTransformerPrefix(config, classOf[DummyStreamTransformer])
+
+ prefix shouldBe Some("[dummy-transformer]")
+ }
+
+ it should "return None if the transformer class is not registered in the config" in {
+ val config = new BaseConfiguration
+
+ val prefix = ConfigUtils.getTransformerPrefix(config, classOf[DummyStreamTransformer])
+
+ prefix shouldBe None
+ }
}
diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaSchemaRegistryWrapper.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaSchemaRegistryWrapper.scala
index b7973f97..a6515d53 100644
--- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaSchemaRegistryWrapper.scala
+++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaSchemaRegistryWrapper.scala
@@ -17,7 +17,6 @@ package za.co.absa.hyperdrive.driver.drivers
import java.util.Properties
-import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.logging.log4j.LogManager
@@ -29,7 +28,7 @@ case class SchemaRegistryContainer(dockerImageName: String) extends GenericConta
class KafkaSchemaRegistryWrapper {
private val logger = LogManager.getLogger
- private val confluentPlatformVersion = "5.3.1"
+ private val confluentPlatformVersion = "5.3.1" // should be same as kafka.avro.serializer.version property in pom file
private val schemaRegistryPort = 8081
private val commonNetwork = Network.newNetwork()
val kafka: KafkaContainer = startKafka(commonNetwork)
diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala
new file mode 100644
index 00000000..3bb97f31
--- /dev/null
+++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.driver.drivers
+
+import java.time.Duration
+import java.util
+import java.util.UUID.randomUUID
+import java.util.{Collections, Properties}
+
+import org.apache.avro.Schema.Parser
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
+import za.co.absa.commons.io.TempDirectory
+import za.co.absa.commons.spark.SparkTestBase
+import za.co.absa.abris.avro.registry.SchemaSubject
+import za.co.absa.hyperdrive.ingestor.implementation.utils.KafkaUtil
+import za.co.absa.hyperdrive.shared.exceptions.IngestionException
+
+/**
+ * This e2e test requires a Docker installation on the executing machine.
+ * In this test, 50 messages with schema v1 are written to the source topic, followed by 50 messages with schema v2.
+ * Schema v2 contains a forward-incompatible change, i.e. messages written with v2 cannot be read with v1.
+ *
+ * The first run is configured as a long-running job (writer.common.trigger.type=ProcessingTime) and with a maximum
+ * number of messages per micro-batch set to 20 (reader.option.maxOffsetsPerTrigger=20). Furthermore, the schema id is
+ * explicitly set for v1 (see transformer.[avro.decoder].value.schema.id). Due to the forward-incompatible change,
+ * it will fail at the 51st message, which was written with schema v2. At this point, 2 micro-batches (i.e. 40 messages)
+ * have been successfully committed, while the 3rd has failed half-way through. 50 messages have been written
+ * to the destination topic.
+ *
+ * To successfully rerun, the schema id needs to be set to use schema v2. In order to avoid an infinite runtime, the
+ * trigger is set to Once. The Deduplication transformer ensures that the 41st-50th messages are not written to the
+ * destination topic again. In this test, offset and partition from the source topic are used as a composite id
+ * to identify messages across the topics (See transformer.[kafka.deduplicator].source.id.columns
+ * and transformer.[kafka.deduplicator].destination.id.columns)
+ *
+ * Finally, the destination topic is expected to contain all messages from the source topic
+ * exactly once, thanks to the deduplication transformer (see test case 1).
+ * Without the deduplication transformer, the 41st-50th messages are duplicated (see test case 2).
+ */
+class KafkaToKafkaDeduplicationAfterRetryDockerTest extends FlatSpec with Matchers with SparkTestBase with BeforeAndAfter {
+ import scala.collection.JavaConverters._
+
+ private val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+ private var baseDir: TempDirectory = _
+
+ behavior of "CommandLineIngestionDriver"
+
+ it should "write exactly-once using the deduplicate transformer" in {
+ val recordIdsV1 = 0 until 50
+ val recordIdsV2 = 50 until 100
+ val deduplicatorConfig = Map(
+ "component.transformer.id.2" -> "[kafka.deduplicator]",
+ "component.transformer.class.[kafka.deduplicator]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka.DeduplicateKafkaSinkTransformer",
+ "transformer.[kafka.deduplicator].source.id.columns" -> "offset,partition",
+ "transformer.[kafka.deduplicator].destination.id.columns" -> "value.hyperdrive_id.source_offset, value.hyperdrive_id.source_partition"
+ )
+ val kafkaSchemaRegistryWrapper = new KafkaSchemaRegistryWrapper
+ val destinationTopic = "deduplication_dest"
+
+ executeTestCase(deduplicatorConfig, recordIdsV1, recordIdsV2, kafkaSchemaRegistryWrapper, destinationTopic)
+
+ val consumer = createConsumer(kafkaSchemaRegistryWrapper)
+ val records = getAllMessages(consumer, destinationTopic)
+ val valueFieldNames = records.head.value().getSchema.getFields.asScala.map(_.name())
+ valueFieldNames should contain theSameElementsAs List("record_id", "value_field", "hyperdrive_id")
+ val actualRecordIds = records.map(_.value().get("record_id"))
+ actualRecordIds.distinct.size shouldBe actualRecordIds.size
+ actualRecordIds should contain theSameElementsAs recordIdsV1 ++ recordIdsV2
+ }
+
+ it should "write duplicate entries without the deduplicate transformer" in {
+ val recordIdsV1 = 0 until 50
+ val recordIdsV2 = 50 until 100
+ val kafkaSchemaRegistryWrapper = new KafkaSchemaRegistryWrapper
+ val destinationTopic = "deduplication_dest"
+
+ executeTestCase(Map(), recordIdsV1, recordIdsV2, kafkaSchemaRegistryWrapper, destinationTopic)
+
+ val consumer = createConsumer(kafkaSchemaRegistryWrapper)
+ val records = getAllMessages(consumer, destinationTopic)
+ val valueFieldNames = records.head.value().getSchema.getFields.asScala.map(_.name())
+ valueFieldNames should contain theSameElementsAs List("record_id", "value_field", "hyperdrive_id")
+ val actualRecordIds = records.map(_.value().get("record_id"))
+ actualRecordIds.distinct.size should be < actualRecordIds.size
+ }
+
+ // scalastyle:off method.length
+ private def executeTestCase(deduplicatorConfig: Map[String, String], recordIdsV1: Seq[Int], recordIdsV2: Seq[Int],
+ kafkaSchemaRegistryWrapper: KafkaSchemaRegistryWrapper, destinationTopic: String) = {
+ // given
+ val checkpointDir = s"${baseDir.path.toUri}/checkpoint"
+ val sourceTopic = "deduplication_src"
+ val sourceTopicPartitions = 5
+ val destinationTopicPartitions = 3
+ val schemaManager = SchemaManagerFactory.create(Map("schema.registry.url" -> kafkaSchemaRegistryWrapper.schemaRegistryUrl))
+ val subject = SchemaSubject.usingTopicNameStrategy(sourceTopic)
+ val parserV1 = new Parser()
+ val schemaV1 = parserV1.parse(schemaV1String(sourceTopic))
+ val parserV2 = new Parser()
+ val schemaV2 = parserV2.parse(schemaV2String(sourceTopic))
+ val schemaV1Id = schemaManager.register(subject, schemaV1)
+ val schemaV2Id = schemaManager.register(subject, schemaV2)
+
+ val producer = createProducer(kafkaSchemaRegistryWrapper)
+ createTopic(kafkaSchemaRegistryWrapper, sourceTopic, sourceTopicPartitions)
+ createTopic(kafkaSchemaRegistryWrapper, destinationTopic, destinationTopicPartitions)
+
+ val recordsV1 = recordIdsV1.map(i => {
+ val valueRecord = new GenericData.Record(schemaV1)
+ valueRecord.put("record_id", i)
+ valueRecord.put("value_field", s"valueHello_$i")
+ valueRecord
+ })
+ val recordsV2 = recordIdsV2.map(i => {
+ val valueRecord = new GenericData.Record(schemaV2)
+ valueRecord.put("record_id", i)
+ valueRecord.put("value_field", null)
+ valueRecord
+ })
+ sendData(producer, recordsV1, sourceTopic, sourceTopicPartitions)
+ sendData(producer, recordsV2, sourceTopic, sourceTopicPartitions)
+
+ Thread.sleep(3000)
+
+ val driverConfig = Map(
+ // Pipeline settings
+ "component.ingestor" -> "spark",
+ "component.reader" -> "za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader",
+ "component.transformer.id.0" -> "[column.copy]",
+ "component.transformer.class.[column.copy]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy.ColumnCopyStreamTransformer",
+ "component.transformer.id.1" -> "[avro.decoder]",
+ "component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer",
+ "component.transformer.id.3" -> "[avro.encoder]",
+ "component.transformer.class.[avro.encoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer",
+ "component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter",
+
+ // Spark settings
+ "ingestor.spark.app.name" -> "ingestor-app",
+ "ingestor.spark.termination.timeout" -> "60000",
+
+ // Source(Kafka) settings
+ "reader.kafka.topic" -> sourceTopic,
+ "reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl,
+ "reader.option.maxOffsetsPerTrigger" -> "20",
+
+ "transformer.[column.copy].columns.copy.from" -> "offset, partition",
+ "transformer.[column.copy].columns.copy.to" -> "hyperdrive_id.source_offset, hyperdrive_id.source_partition",
+
+ // Avro Decoder (ABRiS) settings
+ "transformer.[avro.decoder].schema.registry.url" -> kafkaSchemaRegistryWrapper.schemaRegistryUrl,
+ "transformer.[avro.decoder].value.schema.id" -> s"$schemaV1Id",
+ "transformer.[avro.decoder].value.schema.naming.strategy" -> "topic.name",
+ "transformer.[avro.decoder].keep.columns" -> "hyperdrive_id",
+
+ // Avro Encoder (ABRiS) settings
+ "transformer.[avro.encoder].schema.registry.url" -> "${transformer.[avro.decoder].schema.registry.url}",
+ "transformer.[avro.encoder].value.schema.naming.strategy" -> "topic.name",
+
+ // Sink(Kafka) settings
+ "writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
+ "writer.common.trigger.type" -> "ProcessingTime",
+ "writer.kafka.topic" -> destinationTopic,
+ "writer.kafka.brokers" -> "${reader.kafka.brokers}"
+ ) ++ deduplicatorConfig
+ val driverConfigArray = driverConfig.map { case (key, value) => s"$key=$value" }.toArray
+
+ // when, then
+ var exceptionWasThrown = false
+ try {
+ CommandLineIngestionDriver.main(driverConfigArray)
+ } catch {
+ case _: IngestionException =>
+ exceptionWasThrown = true
+ val retryConfig = driverConfig ++ Map(
+ "transformer.[avro.decoder].value.schema.id" -> s"$schemaV2Id",
+ "writer.common.trigger.type" -> "Once",
+ "reader.option.maxOffsetsPerTrigger" -> "9999"
+ )
+ val retryConfigArray = retryConfig.map { case (key, value) => s"$key=$value"}.toArray
+ CommandLineIngestionDriver.main(retryConfigArray) // first rerun only retries the failed micro-batch
+ CommandLineIngestionDriver.main(retryConfigArray) // second rerun consumes the rest of the messages
+ }
+
+ exceptionWasThrown shouldBe true
+ fs.exists(new Path(s"$checkpointDir/$sourceTopic")) shouldBe true
+ }
+ // scalastyle:on method.length
+
+ before {
+ baseDir = TempDirectory("hyperdriveE2eTest").deleteOnExit()
+ }
+
+ after {
+ SchemaManagerFactory.resetSRClientInstance()
+ baseDir.delete()
+ }
+
+ private def schemaV1String(name: String) =
+ raw"""{"type": "record", "name": "$name", "fields": [
+ |{"type": "int", "name": "record_id"},
+ |{"type": "string", "name": "value_field", "nullable": false}
+ |]}""".stripMargin
+
+ private def schemaV2String(name: String) =
+ raw"""{"type": "record", "name": "$name", "fields": [
+ |{"type": "int", "name": "record_id"},
+ |{"type": ["null", "string"], "name": "value_field", "nullable": true}
+ |]}""".stripMargin
+
+ private def sendData(producer: KafkaProducer[GenericRecord, GenericRecord], records: Seq[GenericRecord], topic: String, partitions: Int): Unit = {
+ records.zipWithIndex.foreach {
+ case (record, i) =>
+ val partition = i % partitions
+ val producerRecord = new ProducerRecord[GenericRecord, GenericRecord](topic, partition, null, record)
+ producer.send(producerRecord)
+ }
+ }
+
+ private def createTopic(kafkaSchemaRegistryWrapper: KafkaSchemaRegistryWrapper, topicName: String, partitions: Int): Unit = {
+ val config = new Properties()
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSchemaRegistryWrapper.kafka.getBootstrapServers)
+ val localKafkaAdmin = AdminClient.create(config)
+ val replication = 1.toShort
+ val topic = new NewTopic(topicName, partitions, replication)
+ localKafkaAdmin.createTopics(util.Arrays.asList(topic)).all().get()
+ }
+
+ private def createProducer(kafkaSchemaRegistryWrapper: KafkaSchemaRegistryWrapper): KafkaProducer[GenericRecord, GenericRecord] = {
+ val props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSchemaRegistryWrapper.kafka.getBootstrapServers)
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaToKafkaProducer")
+ props.put(ProducerConfig.ACKS_CONFIG, "1")
+ kafkaSchemaRegistryWrapper.createProducer(props)
+ }
+
+ private def createConsumer(kafkaSchemaRegistryWrapper: KafkaSchemaRegistryWrapper): KafkaConsumer[GenericRecord, GenericRecord] = {
+ import org.apache.kafka.clients.consumer.ConsumerConfig
+ val props = new Properties()
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, randomUUID.toString)
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSchemaRegistryWrapper.kafka.getBootstrapServers)
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer")
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer")
+ kafkaSchemaRegistryWrapper.createConsumer(props)
+ }
+
+ private def getAllMessages[K, V](consumer: KafkaConsumer[K, V], topic: String) = {
+ val topicPartitions = KafkaUtil.getTopicPartitions(consumer, topic)
+ val offsets = consumer.endOffsets(topicPartitions.asJava)
+ implicit val kafkaConsumerTimeout: Duration = Duration.ofSeconds(10L)
+ KafkaUtil.getMessagesAtLeastToOffset(consumer, offsets.asScala.mapValues(Long2long).toMap)
+ }
+}
diff --git a/ingestor-default/pom.xml b/ingestor-default/pom.xml
index 9375dc7e..13a977ba 100644
--- a/ingestor-default/pom.xml
+++ b/ingestor-default/pom.xml
@@ -52,5 +52,15 @@
org.apache.spark
spark-sql-kafka-${kafka.spark.version}_${scala.compat.version}
+
+
+
+ org.testcontainers
+ testcontainers
+
+
+ org.testcontainers
+ kafka
+
diff --git a/ingestor-default/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetProxy.scala b/ingestor-default/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetProxy.scala
new file mode 100644
index 00000000..66324c5b
--- /dev/null
+++ b/ingestor-default/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetProxy.scala
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.spark.sql.execution.streaming.Offset
+
+object KafkaSourceOffsetProxy {
+ def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
+ KafkaSourceOffset.getPartitionOffsets(offset)
+ }
+
+ def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
+ KafkaSourceOffset.apply(offsetTuples:_*)
+ }
+}
diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformer.scala
new file mode 100644
index 00000000..277fc60f
--- /dev/null
+++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformer.scala
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka
+
+import java.time.Duration
+import java.util.{Properties, UUID}
+
+import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
+import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.{ConfluentAvroDecodingTransformer, ConfluentAvroEncodingTransformer}
+import za.co.absa.hyperdrive.ingestor.implementation.utils.KafkaUtil
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.commons.configuration2.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
+import org.apache.logging.log4j.LogManager
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog}
+import org.apache.spark.sql.functions.{col, lit, not, struct}
+import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
+import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.{getOrThrow, getPropertySubset, getSeqOrThrow}
+import za.co.absa.hyperdrive.ingestor.api.utils.StreamWriterUtil
+import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterCommonAttributes
+import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
+import za.co.absa.hyperdrive.ingestor.implementation.utils.AvroUtil
+import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter
+
+
+private[transformer] class DeduplicateKafkaSinkTransformer(
+ val readerTopic: String,
+ val readerBrokers: String,
+ val readerExtraOptions: Map[String, String],
+ val readerSchemaRegistryUrl: String,
+ val writerTopic: String,
+ val writerBrokers: String,
+ val writerExtraOptions: Map[String, String],
+ val writerSchemaRegistryUrl: String,
+ val checkpointLocation: String,
+ val sourceIdColumnNames: Seq[String],
+ val destinationIdColumnNames: Seq[String],
+ val kafkaConsumerTimeout: Duration
+) extends StreamTransformer {
+ private val logger = LogManager.getLogger
+
+ override def transform(dataFrame: DataFrame): DataFrame = {
+ val spark = dataFrame.sparkSession
+ val offsetLog = new OffsetSeqLog(spark, new Path(checkpointLocation, "offsets").toString)
+ val commitLog = new CommitLog(spark, new Path(checkpointLocation, "commits").toString)
+ val latestOffsetLog = offsetLog.getLatest().map(_._1)
+ val latestCommitLog = commitLog.getLatest().map(_._1)
+
+ if (latestOffsetLog != latestCommitLog) {
+ deduplicateDataFrame(dataFrame, offsetLog, commitLog)
+ } else {
+ dataFrame
+ }
+ }
+
+ private def deduplicateDataFrame(dataFrame: DataFrame, offsetLog: OffsetSeqLog, commitLog: CommitLog) = {
+ logger.info("Deduplicate rows after retry")
+ implicit val kafkaConsumerTimeoutImpl: Duration = kafkaConsumerTimeout
+ val sourceConsumer = createConsumer(readerBrokers, readerExtraOptions, readerSchemaRegistryUrl)
+ val latestCommittedOffsets = KafkaUtil.getLatestCommittedOffset(offsetLog, commitLog)
+ KafkaUtil.seekToOffsetsOrBeginning(sourceConsumer, readerTopic, latestCommittedOffsets)
+
+ val latestOffsetsOpt = KafkaUtil.getLatestOffset(offsetLog)
+ val sourceRecords = latestOffsetsOpt.map(latestOffset => consumeAndClose(sourceConsumer,
+ consumer => KafkaUtil.getMessagesAtLeastToOffset(consumer, latestOffset))).getOrElse(Seq())
+ val sourceIds = sourceRecords.map(extractIdFieldsFromRecord(_, sourceIdColumnNames))
+
+ val sinkConsumer = createConsumer(writerBrokers, writerExtraOptions, writerSchemaRegistryUrl)
+ val sinkTopicPartitions = KafkaUtil.getTopicPartitions(sinkConsumer, writerTopic)
+ val recordsPerPartition = sinkTopicPartitions.map(p => p -> sourceRecords.size.toLong).toMap
+ val latestSinkRecords = consumeAndClose(sinkConsumer, consumer =>
+ KafkaUtil.getAtLeastNLatestRecordsFromPartition(consumer, recordsPerPartition))
+ val publishedIds = latestSinkRecords.map(extractIdFieldsFromRecord(_, destinationIdColumnNames))
+
+ val duplicatedIds = sourceIds.intersect(publishedIds)
+ val duplicatedIdsLit = duplicatedIds.map(duplicatedId => struct(duplicatedId.map(lit): _*))
+ val idColumns = sourceIdColumnNames.map(col)
+ dataFrame.filter(not(struct(idColumns: _*).isInCollection(duplicatedIdsLit)))
+ }
+
+ private def extractIdFieldsFromRecord(record: ConsumerRecord[GenericRecord, GenericRecord], idColumnNames: Seq[String]): Seq[Any] = {
+ idColumnNames.map(idColumnName =>
+ AvroUtil.getFromConsumerRecord(record, idColumnName)
+ .getOrElse(throw new IllegalArgumentException(s"Could not find value for field $idColumnName"))
+ )
+ }
+
+ private def consumeAndClose[T](consumer: KafkaConsumer[GenericRecord, GenericRecord], consume: KafkaConsumer[GenericRecord, GenericRecord] => T) = {
+ try {
+ consume(consumer)
+ } catch {
+ case throwable: Throwable => logger.error(s"An unexpected error occurred while consuming", throwable)
+ throw throwable
+ } finally {
+ consumer.close()
+ }
+ }
+
+ private def createConsumer(brokers: String, extraOptions: Map[String, String], schemaRegistryUrl: String) = {
+ val props = new Properties()
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, s"hyperdrive_consumer_${UUID.randomUUID().toString}")
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, s"hyperdrive_group_${UUID.randomUUID().toString}")
+ extraOptions.foreach {
+ case (key, value) => props.put(key, value)
+ }
+ props.put("schema.registry.url", schemaRegistryUrl)
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer")
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer")
+ new KafkaConsumer[GenericRecord, GenericRecord](props)
+ }
+}
+
+object DeduplicateKafkaSinkTransformer extends StreamTransformerFactory with DeduplicateKafkaSinkTransformerAttributes {
+
+ private val DefaultKafkaConsumerTimeoutSeconds = 120L
+
+ private val readerSchemaRegistryUrlKey = "deduplicateKafkaSinkTransformer.readerSchemaRegistryUrl"
+ private val writerSchemaRegistryUrlKey = "deduplicateKafkaSinkTransformer.writerSchemaRegistryUrl"
+
+ override def apply(config: Configuration): StreamTransformer = {
+ val readerTopic = getOrThrow(KafkaStreamReader.KEY_TOPIC, config)
+ val readerBrokers = getOrThrow(KafkaStreamReader.KEY_BROKERS, config)
+ val readerExtraOptions = KafkaStreamReader.getExtraConfigurationPrefix.map(getPropertySubset(config, _)).getOrElse(Map())
+ val readerSchemaRegistryUrl = getOrThrow(readerSchemaRegistryUrlKey, config)
+
+ val writerTopic = getOrThrow(KafkaStreamWriter.KEY_TOPIC, config)
+ val writerBrokers = getOrThrow(KafkaStreamWriter.KEY_BROKERS, config)
+ val writerExtraOptions = KafkaStreamWriter.getExtraConfigurationPrefix.map(getPropertySubset(config, _)).getOrElse(Map())
+ val writerSchemaRegistryUrl = getOrThrow(writerSchemaRegistryUrlKey, config)
+
+ val checkpointLocation = StreamWriterUtil.getCheckpointLocation(config)
+
+ val sourceIdColumns = getSeqOrThrow(SourceIdColumns, config)
+ val destinationIdColumns = getSeqOrThrow(DestinationIdColumns, config)
+ if (sourceIdColumns.size != destinationIdColumns.size) {
+ throw new IllegalArgumentException("The size of source id column names doesn't match the list of destination id column names " +
+ s"${sourceIdColumns.size} != ${destinationIdColumns.size}.")
+ }
+
+ val kafkaConsumerTimeout = Duration.ofSeconds(config.getLong(KafkaConsumerTimeout, DefaultKafkaConsumerTimeoutSeconds))
+
+ new DeduplicateKafkaSinkTransformer(readerTopic, readerBrokers, readerExtraOptions, readerSchemaRegistryUrl,
+ writerTopic, writerBrokers, writerExtraOptions, writerSchemaRegistryUrl,
+ checkpointLocation, sourceIdColumns, destinationIdColumns, kafkaConsumerTimeout)
+ }
+
+ override def getMappingFromRetainedGlobalConfigToLocalConfig(globalConfig: Configuration): Map[String, String] = {
+ import scala.collection.JavaConverters._
+ val readerExtraOptionsKeys =
+ KafkaStreamReader.getExtraConfigurationPrefix.map(globalConfig.getKeys(_).asScala.toSeq).getOrElse(Seq())
+ val writerExtraOptionsKeys =
+ KafkaStreamWriter.getExtraConfigurationPrefix.map(globalConfig.getKeys(_).asScala.toSeq).getOrElse(Seq())
+ val keys = readerExtraOptionsKeys ++ writerExtraOptionsKeys ++
+ Seq(
+ KafkaStreamReader.KEY_TOPIC,
+ KafkaStreamReader.KEY_BROKERS,
+ KafkaStreamWriter.KEY_TOPIC,
+ KafkaStreamWriter.KEY_BROKERS,
+ StreamWriterCommonAttributes.keyCheckpointBaseLocation
+ )
+ val oneToOneMappings = keys.map(e => e -> e).toMap
+
+ val readerSchemaRegistryUrlGlobalKey = getSchemaRegistryUrlKey(globalConfig, classOf[ConfluentAvroDecodingTransformer],
+ ConfluentAvroDecodingTransformer.KEY_SCHEMA_REGISTRY_URL)
+ val writerSchemaRegistryUrlGlobalKey = getSchemaRegistryUrlKey(globalConfig, classOf[ConfluentAvroEncodingTransformer],
+ ConfluentAvroEncodingTransformer.KEY_SCHEMA_REGISTRY_URL)
+
+ oneToOneMappings ++ Map(
+ readerSchemaRegistryUrlGlobalKey -> readerSchemaRegistryUrlKey,
+ writerSchemaRegistryUrlGlobalKey -> writerSchemaRegistryUrlKey
+ )
+ }
+
+ private def getSchemaRegistryUrlKey[T <: StreamTransformer](config: Configuration, transformerClass: Class[T], transformerKey: String) = {
+ val prefix = ConfigUtils.getTransformerPrefix(config, transformerClass).getOrElse(throw new IllegalArgumentException(
+ s"Could not find transformer configuration for ${transformerClass.getCanonicalName}, but it is required"))
+
+ s"${StreamTransformerFactory.TransformerKeyPrefix}.${prefix}.${transformerKey}"
+ }
+
+}
+
+
diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformerAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformerAttributes.scala
new file mode 100644
index 00000000..efb05361
--- /dev/null
+++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformerAttributes.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka
+
+import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
+
+trait DeduplicateKafkaSinkTransformerAttributes extends HasComponentAttributes {
+
+ val SourceIdColumns = "source.id.columns"
+ val DestinationIdColumns = "destination.id.columns"
+ val KafkaConsumerTimeout = "kafka.consumer.timeout"
+
+ override def getName: String = "Deduplicate Kafka Sink Transformer"
+
+ override def getDescription: String = "This transformer deduplicates records in a Kafka-to-Kafka query in a rerun after a failure." +
+ "It is assumed that only one query writes to the sink and no records have been written to the sink since the failure."
+
+ override def getProperties: Map[String, PropertyMetadata] = Map(
+ SourceIdColumns -> PropertyMetadata("Source Id columns", Some("Comma separated list of columns that represent the id"), required = true),
+ DestinationIdColumns -> PropertyMetadata("Destination Id columns", Some("Comma separated list of columns that represent the id"), required = true),
+ KafkaConsumerTimeout -> PropertyMetadata("Kafka consumer timeout in seconds", None, required = false)
+ )
+}
diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformerLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformerLoader.scala
new file mode 100644
index 00000000..85c8363b
--- /dev/null
+++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformerLoader.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka
+
+import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}
+
+class DeduplicateKafkaSinkTransformerLoader extends StreamTransformerFactoryProvider {
+ override def getComponentFactory: StreamTransformerFactory = DeduplicateKafkaSinkTransformer
+}
diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/StreamTransformerAbstractFactory.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/StreamTransformerAbstractFactory.scala
index 022d39e0..9135d79b 100644
--- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/StreamTransformerAbstractFactory.scala
+++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/StreamTransformerAbstractFactory.scala
@@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
+import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactory._
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
import za.co.absa.hyperdrive.shared.utils.ClassLoaderUtils
@@ -33,9 +34,6 @@ import za.co.absa.hyperdrive.shared.utils.ClassLoaderUtils
object StreamTransformerAbstractFactory {
private val logger = LogManager.getLogger
- val idsKeyPrefix = "component.transformer.id"
- val classKeyPrefix = "component.transformer.class"
- val transformerKeyPrefix = "transformer"
/**
* For each transformer, the configuration is assumed to contain property keys according to the following example
@@ -52,16 +50,16 @@ object StreamTransformerAbstractFactory {
validateConfiguration(config)
- val orderedTransformerIds = config.getKeys(idsKeyPrefix).asScala.toList
- .map(key => key.replace(s"$idsKeyPrefix.", "").toInt -> config.getString(key))
+ val orderedTransformerIds = config.getKeys(IdsKeyPrefix).asScala.toList
+ .map(key => key.replace(s"$IdsKeyPrefix.", "").toInt -> config.getString(key))
.sortBy { case (order, _) => order }
.map { case (_, id) => id }
- val transformerClassNames = orderedTransformerIds.map(id => id -> config.getString(s"$classKeyPrefix.$id"))
+ val transformerClassNames = orderedTransformerIds.map(id => id -> config.getString(s"$ClassKeyPrefix.$id"))
transformerClassNames
.map { case (id, className) => id -> ClassLoaderUtils.loadSingletonClassOfType[StreamTransformerFactory](className) }
- .map { case (id, factory) => factory -> ConfigUtils.copyAndMapConfig(config, config.subset(s"$transformerKeyPrefix.$id"), factory.getMappingFromRetainedGlobalConfigToLocalConfig(config)) }
+ .map { case (id, factory) => factory -> ConfigUtils.copyAndMapConfig(config, config.subset(s"$TransformerKeyPrefix.$id"), factory.getMappingFromRetainedGlobalConfigToLocalConfig(config)) }
.map { case (factory, configTry) => configTry match {
case Failure(exception) => throw exception
case Success(value) => factory -> value
@@ -71,10 +69,10 @@ object StreamTransformerAbstractFactory {
}
private def validateConfiguration(config: Configuration): Unit = {
- val keys = config.getKeys(idsKeyPrefix).asScala.toList
+ val keys = config.getKeys(IdsKeyPrefix).asScala.toList
val invalidTransformerKeys = keys
- .map(key => key -> key.replace(s"$idsKeyPrefix.", ""))
+ .map(key => key -> key.replace(s"$IdsKeyPrefix.", ""))
.map { case (key, order) => key -> Try(order.toInt) }
.filter { case (_, orderAsInt) => orderAsInt.isFailure }
.map { case (key, _) => key }
@@ -88,7 +86,7 @@ object StreamTransformerAbstractFactory {
}
val missingClassKeys = transformerIds
- .map(id => s"$classKeyPrefix.$id")
+ .map(id => s"$ClassKeyPrefix.$id")
.filter(classKey => !config.containsKey(classKey))
if (missingClassKeys.nonEmpty) {
diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AvroUtil.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AvroUtil.scala
new file mode 100644
index 00000000..e7453b17
--- /dev/null
+++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AvroUtil.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.utils
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.util.Utf8
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+
+import scala.annotation.tailrec
+
+private[hyperdrive] object AvroUtil {
+
+ def getFromConsumerRecord(record: ConsumerRecord[GenericRecord, GenericRecord], fieldName: String): Option[Any] = {
+ val fieldValue = fieldName match {
+ case "topic" => Option(record.topic())
+ case "offset" => Option(record.offset())
+ case "partition" => Option(record.partition())
+ case "timestamp" => Option(record.timestamp())
+ case "timestampType" => Option(record.timestampType())
+ case "serializedKeySize" => Option(record.serializedKeySize())
+ case "serializedValueSize" => Option(record.serializedValueSize())
+ case keyColumn if keyColumn.startsWith("key.") => getFromGenericRecordNullSafe(record.key(),
+ UnresolvedAttribute.parseAttributeName(keyColumn.stripPrefix("key.")).toList)
+ case valueColumn if valueColumn.startsWith("value.") => getFromGenericRecordNullSafe(record.value(),
+ UnresolvedAttribute.parseAttributeName(valueColumn.stripPrefix("value.")).toList)
+ case _ => None
+ }
+
+ fieldValue.map {
+ case utf8: Utf8 => utf8.toString
+ case v => v
+ }
+ }
+
+ private def getFromGenericRecordNullSafe(record: GenericRecord, keys: Seq[String]) =
+ Option(record).flatMap(getFromGenericRecord(_, keys))
+
+ @tailrec
+ private def getFromGenericRecord(record: GenericRecord, keys: Seq[String]): Option[Any] = keys match {
+ case key :: Nil => Option(record.get(key))
+ case head :: tail =>
+ val value = record.get(head)
+ value match {
+ case genericRecord: GenericRecord => getFromGenericRecord(genericRecord, tail)
+ case _ => None
+ }
+ }
+}
diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/KafkaUtil.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/KafkaUtil.scala
new file mode 100644
index 00000000..528e6d11
--- /dev/null
+++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/KafkaUtil.scala
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.utils
+
+import java.time.Duration
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+import org.apache.logging.log4j.LogManager
+import org.apache.spark.sql.execution.streaming.{CommitLog, Offset, OffsetSeqLog}
+import org.apache.spark.sql.kafka010.KafkaSourceOffsetProxy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+private[hyperdrive] object KafkaUtil {
+ private val logger = LogManager.getLogger
+
+ def getAtLeastNLatestRecordsFromPartition[K, V](consumer: KafkaConsumer[K, V], numberOfRecords: Map[TopicPartition, Long])
+ (implicit kafkaConsumerTimeout: Duration): Seq[ConsumerRecord[K, V]] = {
+ consumer.assign(numberOfRecords.keySet.asJava)
+ val endOffsets = consumer.endOffsets(numberOfRecords.keySet.asJava).asScala.mapValues(Long2long)
+ val topicPartitions = endOffsets.keySet
+
+ var records: Seq[ConsumerRecord[K, V]] = Seq()
+ val offsetLowerBounds = mutable.Map(endOffsets.toSeq: _*)
+ import scala.util.control.Breaks._
+ breakable {
+ while (true) {
+ val recordSizes = records
+ .groupBy(r => new TopicPartition(r.topic(), r.partition()))
+ .mapValues(records => records.size)
+ val unfinishedPartitions = topicPartitions.filter(p => recordSizes.getOrElse(p, 0) < numberOfRecords(p) && offsetLowerBounds(p) != 0)
+ if (unfinishedPartitions.isEmpty) {
+ break()
+ }
+
+ unfinishedPartitions.foreach { p =>
+ offsetLowerBounds(p) = Math.max(0, offsetLowerBounds(p) - numberOfRecords(p))
+ }
+ offsetLowerBounds.foreach {
+ case (partition, offset) => consumer.seek(partition, offset)
+ }
+ records = getMessagesAtLeastToOffset(consumer, endOffsets.toMap)
+ }
+ }
+
+ records
+ }
+
+ def getMessagesAtLeastToOffset[K, V](consumer: KafkaConsumer[K, V], toOffsets: Map[TopicPartition, Long])
+ (implicit kafkaConsumerTimeout: Duration): Seq[ConsumerRecord[K, V]] = {
+ consumer.assign(toOffsets.keySet.asJava)
+ val endOffsets = consumer.endOffsets(toOffsets.keys.toSeq.asJava).asScala
+ endOffsets.foreach { case (topicPartition, offset) =>
+ val toOffset = toOffsets(topicPartition)
+ if (toOffset > offset) {
+ throw new IllegalArgumentException(s"Requested consumption to offsets $toOffsets, but they cannot be higher " +
+ s"than the end offsets, which are $endOffsets")
+ }
+ }
+
+ import scala.util.control.Breaks._
+ var records: Seq[ConsumerRecord[K, V]] = mutable.Seq()
+ breakable {
+ while (true) {
+ val newRecords = consumer.poll(kafkaConsumerTimeout).asScala.toSeq
+ records ++= newRecords
+ if (newRecords.isEmpty || offsetsHaveBeenReached(consumer, toOffsets)) {
+ break()
+ }
+ }
+ }
+
+ toOffsets.foreach { case (tp, toOffset) =>
+ val offsetAfterPoll = consumer.position(tp)
+ if (offsetAfterPoll < toOffset) {
+ throw new IllegalStateException(s"Expected to reach offset $toOffset on $tp, but only reached $offsetAfterPoll." +
+ s" Not all expected messages were consumed. Consider increasing the consumer timeout")
+ }
+ }
+
+ records
+ }
+
+ private def offsetsHaveBeenReached[K, V](consumer: KafkaConsumer[K, V], toOffsets: Map[TopicPartition, Long]) = {
+ toOffsets.forall { case (tp, toOffset) =>
+ val position = consumer.position(tp)
+ logger.info(s"Reached position $position on topic partition $tp. Target offset is $toOffset")
+ position >= toOffset
+ }
+ }
+
+ def seekToOffsetsOrBeginning[K, V](consumer: KafkaConsumer[K, V], topic: String, offsetsOpt: Option[Map[TopicPartition, Long]]): Unit = {
+ val partitions = getTopicPartitions(consumer, topic)
+ consumer.assign(partitions.asJava)
+ offsetsOpt match {
+ case Some(topicPartitionOffsets) => topicPartitionOffsets.foreach {
+ case (topicPartition, offset) => consumer.seek(topicPartition, offset)
+ }
+ case None =>
+ consumer.seekToBeginning(partitions.asJava)
+ }
+ }
+
+ def getTopicPartitions[K, V](consumer: KafkaConsumer[K, V], topic: String): Seq[TopicPartition] = {
+ consumer.partitionsFor(topic).asScala.map(p => new TopicPartition(p.topic(), p.partition()))
+ }
+
+ def getLatestOffset(offsetLog: OffsetSeqLog): Option[Map[TopicPartition, Long]] = {
+ val offsetSeqOpt = offsetLog.getLatest().map(_._2.offsets)
+ offsetSeqOpt.flatMap(parseOffsetSeq)
+ }
+
+ def getLatestCommittedOffset(offsetLog: OffsetSeqLog, commitLog: CommitLog): Option[Map[TopicPartition, Long]] = {
+ val offsetSeqOpt = commitLog.getLatest().map(_._1)
+ .map(batchId => offsetLog.get(batchId)
+ .getOrElse(throw new IllegalStateException(s"No offset found for committed batchId ${batchId}")))
+ .map(offsetLog => offsetLog.offsets)
+ offsetSeqOpt.flatMap(parseOffsetSeq)
+ }
+
+ private def parseOffsetSeq(offsetSeq: Seq[Option[Offset]]) = {
+ if (offsetSeq.size == 1) {
+ if (offsetSeq.head.isDefined) {
+ Some(KafkaSourceOffsetProxy.getPartitionOffsets(offsetSeq.head.get))
+ } else {
+ None
+ }
+ } else {
+ throw new IllegalStateException(s"Cannot support more than 1 source, got ${offsetSeq.toString}")
+ }
+ }
+}
diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/kafka/KafkaStreamWriterAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/kafka/KafkaStreamWriterAttributes.scala
index 073a77e9..33622194 100644
--- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/kafka/KafkaStreamWriterAttributes.scala
+++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/kafka/KafkaStreamWriterAttributes.scala
@@ -45,4 +45,6 @@ trait KafkaStreamWriterAttributes extends HasComponentAttributes {
StreamWriterCommonAttributes.keyTriggerProcessingTime -> StreamWriterCommonAttributes.triggerProcessingTimeMetadata,
StreamWriterCommonAttributes.keyCheckpointBaseLocation -> StreamWriterCommonAttributes.checkpointBaseLocation
)
+
+ override def getExtraConfigurationPrefix: Option[String] = Some(optionalConfKey)
}
diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/TestDeduplicateKafkaSinkTransformerObject.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/TestDeduplicateKafkaSinkTransformerObject.scala
new file mode 100644
index 00000000..24f93fe9
--- /dev/null
+++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/TestDeduplicateKafkaSinkTransformerObject.scala
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka
+
+import java.time.Duration
+
+import org.apache.commons.configuration2.DynamicCombinedConfiguration
+import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
+import org.scalatest.{FlatSpec, Matchers}
+import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterCommonAttributes
+import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
+import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.{ConfluentAvroDecodingTransformer, ConfluentAvroEncodingTransformer}
+import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter
+
+class TestDeduplicateKafkaSinkTransformerObject extends FlatSpec with Matchers {
+ behavior of DeduplicateKafkaSinkTransformer.getClass.getSimpleName
+
+ private val readerSchemaRegistryUrlKey = "deduplicateKafkaSinkTransformer.readerSchemaRegistryUrl" // copied from DeduplicateKafkaSinkTransformer
+ private val writerSchemaRegistryUrlKey = "deduplicateKafkaSinkTransformer.writerSchemaRegistryUrl" // copied from DeduplicateKafkaSinkTransformer
+
+ private val dummySourceRegistry = "http://sourceRegistry:8081"
+ private val dummyDestinationRegistry = "http://destinationRegistry:8081"
+
+ "apply" should "create a DeduplicateKafkaSinkTransformer" in {
+ // given
+ val config = getLocalConfig()
+
+ // when
+ val transformer = DeduplicateKafkaSinkTransformer(config).asInstanceOf[DeduplicateKafkaSinkTransformer]
+
+ // then
+ transformer.readerTopic shouldBe "readerTopic"
+ transformer.readerBrokers shouldBe "http://readerBrokers:9092"
+ transformer.readerExtraOptions should contain theSameElementsAs Map(
+ "kafka.security.protocol" -> "SASL_PLAINTEXT",
+ "failOnDataLoss" -> "false"
+ )
+ transformer.readerSchemaRegistryUrl shouldBe dummySourceRegistry
+ transformer.writerTopic shouldBe "writerTopic"
+ transformer.writerBrokers shouldBe "http://writerBrokers:9092"
+ transformer.writerExtraOptions shouldBe Map(
+ "kafka.sasl.mechanism" -> "GSSAPI"
+ )
+ transformer.writerSchemaRegistryUrl shouldBe dummyDestinationRegistry
+
+ transformer.checkpointLocation shouldBe "/tmp/checkpoint"
+ transformer.sourceIdColumnNames should contain theSameElementsInOrderAs Seq("offset", "partition")
+ transformer.destinationIdColumnNames should contain theSameElementsInOrderAs Seq("value.hyperdrive_id.source_offset", "value.hyperdrive_id.source_partition")
+ transformer.kafkaConsumerTimeout shouldBe Duration.ofSeconds(5L)
+ }
+
+ it should "throw an exception if source id columns and destination id columns have different size" in {
+ val config = getLocalConfig()
+ config.setProperty(DeduplicateKafkaSinkTransformer.DestinationIdColumns, "value.hyperdrive_id")
+
+ val ex = the[IllegalArgumentException] thrownBy DeduplicateKafkaSinkTransformer(config)
+
+ ex.getMessage should include ("The size of source id column names doesn't match")
+ }
+
+ it should "use the default value for kafka consumer timeout if not provided" in {
+ // given
+ val config = getLocalConfig()
+ config.clearProperty(DeduplicateKafkaSinkTransformer.KafkaConsumerTimeout)
+
+ // when
+ val transformer = DeduplicateKafkaSinkTransformer(config).asInstanceOf[DeduplicateKafkaSinkTransformer]
+
+ // then
+ transformer.kafkaConsumerTimeout shouldBe Duration.ofSeconds(120L)
+ }
+
+ it should "throw an exception if the kafka reader config is missing" in {
+ val config = getLocalConfig()
+ config.clearProperty(KafkaStreamReader.KEY_TOPIC)
+
+ val exception = the[Exception] thrownBy DeduplicateKafkaSinkTransformer(config)
+
+ exception.getMessage should include(KafkaStreamReader.KEY_TOPIC)
+ }
+
+ it should "throw an exception if the kafka writer config is missing" in {
+ val config = getLocalConfig()
+ config.clearProperty(KafkaStreamWriter.KEY_TOPIC)
+
+ val exception = the[Exception] thrownBy DeduplicateKafkaSinkTransformer(config)
+
+ exception.getMessage should include(KafkaStreamWriter.KEY_TOPIC)
+ }
+
+ it should "throw an exception if the reader schema registry config is missing" in {
+ val config = getLocalConfig()
+ config.clearProperty(readerSchemaRegistryUrlKey)
+
+ val exception = the[Exception] thrownBy DeduplicateKafkaSinkTransformer(config)
+
+ exception.getMessage should include(readerSchemaRegistryUrlKey)
+ }
+
+ it should "throw an exception if the writer schema registry config is missing" in {
+ val config = getLocalConfig()
+ config.clearProperty(writerSchemaRegistryUrlKey)
+
+ val exception = the[Exception] thrownBy DeduplicateKafkaSinkTransformer(config)
+
+ exception.getMessage should include(writerSchemaRegistryUrlKey)
+ }
+
+
+ "getMappingFromRetainedGlobalConfigToLocalConfig" should "return the local config mapping" in {
+ // given
+ val config = getEmptyConfiguration
+ config.addProperty("reader.option.kafka.option1", "value1")
+ config.addProperty("reader.option.kafka.option2", "value2")
+ config.addProperty("component.transformer.id.0", "decoder")
+ config.addProperty("component.transformer.class.decoder", classOf[ConfluentAvroDecodingTransformer].getCanonicalName)
+ config.addProperty(s"transformer.decoder.${ConfluentAvroDecodingTransformer.KEY_SCHEMA_REGISTRY_URL}", dummySourceRegistry)
+
+ config.addProperty("component.transformer.id.1", "encoder")
+ config.addProperty("component.transformer.class.encoder", classOf[ConfluentAvroEncodingTransformer].getCanonicalName)
+ config.addProperty(s"transformer.encoder.${ConfluentAvroEncodingTransformer.KEY_SCHEMA_REGISTRY_URL}", dummyDestinationRegistry)
+
+ config.addProperty("writer.kafka.option.option3", "value3")
+ // when
+ val mapping = DeduplicateKafkaSinkTransformer.getMappingFromRetainedGlobalConfigToLocalConfig(config)
+
+ // then
+ mapping should contain theSameElementsAs Map(
+ "reader.option.kafka.option1" -> "reader.option.kafka.option1",
+ "reader.option.kafka.option2" -> "reader.option.kafka.option2",
+ "writer.kafka.option.option3" -> "writer.kafka.option.option3",
+ s"transformer.decoder.${ConfluentAvroDecodingTransformer.KEY_SCHEMA_REGISTRY_URL}" -> "deduplicateKafkaSinkTransformer.readerSchemaRegistryUrl",
+ s"transformer.encoder.${ConfluentAvroEncodingTransformer.KEY_SCHEMA_REGISTRY_URL}" -> "deduplicateKafkaSinkTransformer.writerSchemaRegistryUrl",
+ KafkaStreamReader.KEY_TOPIC -> KafkaStreamReader.KEY_TOPIC,
+ KafkaStreamReader.KEY_BROKERS -> KafkaStreamReader.KEY_BROKERS,
+ KafkaStreamWriter.KEY_TOPIC -> KafkaStreamWriter.KEY_TOPIC,
+ KafkaStreamWriter.KEY_BROKERS -> KafkaStreamWriter.KEY_BROKERS,
+ StreamWriterCommonAttributes.keyCheckpointBaseLocation -> StreamWriterCommonAttributes.keyCheckpointBaseLocation
+ )
+ }
+
+ private def getLocalConfig() = {
+ val config = getEmptyConfiguration
+ config.addProperty(KafkaStreamReader.KEY_TOPIC, "readerTopic")
+ config.addProperty(KafkaStreamReader.KEY_BROKERS, "http://readerBrokers:9092")
+ config.addProperty("reader.option.kafka.security.protocol", "SASL_PLAINTEXT")
+ config.addProperty("reader.option.failOnDataLoss", false)
+ config.addProperty(readerSchemaRegistryUrlKey, dummySourceRegistry)
+
+ config.addProperty(KafkaStreamWriter.KEY_TOPIC, "writerTopic")
+ config.addProperty(KafkaStreamWriter.KEY_BROKERS, "http://writerBrokers:9092")
+ config.addProperty("writer.kafka.option.kafka.sasl.mechanism", "GSSAPI")
+ config.addProperty("component.transformer.class.encoder", classOf[ConfluentAvroEncodingTransformer].getCanonicalName)
+ config.addProperty(writerSchemaRegistryUrlKey, dummyDestinationRegistry)
+
+ config.addProperty(StreamWriterCommonAttributes.keyCheckpointBaseLocation, "/tmp/checkpoint")
+
+ config.addProperty(DeduplicateKafkaSinkTransformer.SourceIdColumns, "offset, partition")
+ config.addProperty(DeduplicateKafkaSinkTransformer.DestinationIdColumns, "value.hyperdrive_id.source_offset, value.hyperdrive_id.source_partition")
+ config.addProperty(DeduplicateKafkaSinkTransformer.KafkaConsumerTimeout, 5)
+
+ config
+ }
+
+ private def getEmptyConfiguration = {
+ val config = new DynamicCombinedConfiguration()
+ config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
+ config
+ }
+
+}
diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/TestStreamTransformerAbstractFactory.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/TestStreamTransformerAbstractFactory.scala
index 96b28201..7a9e05a2 100644
--- a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/TestStreamTransformerAbstractFactory.scala
+++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/factories/TestStreamTransformerAbstractFactory.scala
@@ -20,25 +20,26 @@ import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.factories.DummyStreamTransformer._
-import za.co.absa.hyperdrive.ingestor.implementation.transformer.factories.StreamTransformerAbstractFactory.{classKeyPrefix, idsKeyPrefix}
-
+import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactory._
class TestStreamTransformerAbstractFactory extends FlatSpec with BeforeAndAfterEach with MockitoSugar with Matchers {
behavior of StreamTransformerAbstractFactory.getClass.getSimpleName
+ private val dummyTransformerA = "dummy.transformer.A"
+ private val dummyTransformerB = "dummy.transformer.B"
+
it should "create transformer instances in the correct order" in {
- import StreamTransformerAbstractFactory._
val config = getBaseConfiguration
- config.addProperty(s"${idsKeyPrefix}.1", "dummy.transformer.A")
- config.addProperty(s"${classKeyPrefix}.dummy.transformer.A", DummyStreamTransformer.getClass.getName)
- config.addProperty(s"${transformerKeyPrefix}.dummy.transformer.A.$DummyProperty1Name", "value1")
- config.addProperty(s"${transformerKeyPrefix}.dummy.transformer.A.$DummyProperty2Name", "100")
+ config.addProperty(s"${IdsKeyPrefix}.1", dummyTransformerA)
+ config.addProperty(s"${ClassKeyPrefix}.$dummyTransformerA", DummyStreamTransformer.getClass.getName)
+ config.addProperty(s"${TransformerKeyPrefix}.$dummyTransformerA.$DummyProperty1Name", "value1")
+ config.addProperty(s"${TransformerKeyPrefix}.$dummyTransformerA.$DummyProperty2Name", "100")
- config.addProperty(s"${idsKeyPrefix}.2", "dummy.transformer.B")
- config.addProperty(s"${classKeyPrefix}.dummy.transformer.B", DummyStreamTransformer.getClass.getName)
- config.addProperty(s"${transformerKeyPrefix}.dummy.transformer.B.$DummyProperty1Name", "value2")
- config.addProperty(s"${transformerKeyPrefix}.dummy.transformer.B.$DummyProperty2Name", "200")
+ config.addProperty(s"${IdsKeyPrefix}.2", dummyTransformerB)
+ config.addProperty(s"${ClassKeyPrefix}.$dummyTransformerB", DummyStreamTransformer.getClass.getName)
+ config.addProperty(s"${TransformerKeyPrefix}.$dummyTransformerB.$DummyProperty1Name", "value2")
+ config.addProperty(s"${TransformerKeyPrefix}.$dummyTransformerB.$DummyProperty2Name", "200")
val transformers = StreamTransformerAbstractFactory.build(config)
transformers should have size 2
@@ -57,13 +58,12 @@ class TestStreamTransformerAbstractFactory extends FlatSpec with BeforeAndAfterE
}
it should "support negative orders" in {
- import StreamTransformerAbstractFactory._
val config = getBaseConfiguration
- config.addProperty(s"${idsKeyPrefix}.2", "[column.transformer]")
- config.addProperty(s"${classKeyPrefix}.[column.transformer]", ColumnSelectorStreamTransformer.getClass.getName)
+ config.addProperty(s"${IdsKeyPrefix}.2", "[column.transformer]")
+ config.addProperty(s"${ClassKeyPrefix}.[column.transformer]", ColumnSelectorStreamTransformer.getClass.getName)
- config.addProperty(s"${idsKeyPrefix}.-1", "dummy.transformer.A")
- config.addProperty(s"${classKeyPrefix}.dummy.transformer.A", DummyStreamTransformer.getClass.getName)
+ config.addProperty(s"${IdsKeyPrefix}.-1", dummyTransformerA)
+ config.addProperty(s"${ClassKeyPrefix}.$dummyTransformerA", DummyStreamTransformer.getClass.getName)
val transformers = StreamTransformerAbstractFactory.build(config)
transformers should have size 2
@@ -79,34 +79,34 @@ class TestStreamTransformerAbstractFactory extends FlatSpec with BeforeAndAfterE
it should "throw if transformer ids are not unique" in {
val config = getBaseConfiguration
- config.addProperty(s"${idsKeyPrefix}.1", "dummy.transformer.A")
- config.addProperty(s"${idsKeyPrefix}.2", "dummy.transformer.A")
+ config.addProperty(s"${IdsKeyPrefix}.1", dummyTransformerA)
+ config.addProperty(s"${IdsKeyPrefix}.2", dummyTransformerA)
val throwable = intercept[IllegalArgumentException](StreamTransformerAbstractFactory.build(config))
- throwable.getMessage should include(s"dummy.transformer.A")
+ throwable.getMessage should include(dummyTransformerA)
}
it should "throw if transformer id is non-numeric" in {
val config = getBaseConfiguration
- config.addProperty(s"${idsKeyPrefix}.First", "dummy.transformer.A")
+ config.addProperty(s"${IdsKeyPrefix}.First", dummyTransformerA)
val throwable = intercept[IllegalArgumentException](StreamTransformerAbstractFactory.build(config))
- throwable.getMessage should include(s"${idsKeyPrefix}.First")
+ throwable.getMessage should include(s"${IdsKeyPrefix}.First")
}
it should "throw if no class name is associated to the transformer id" in {
val config = getBaseConfiguration
- config.addProperty(s"${idsKeyPrefix}.1", "dummy.transformer.A")
+ config.addProperty(s"${IdsKeyPrefix}.1", dummyTransformerA)
val throwable = intercept[IllegalArgumentException](StreamTransformerAbstractFactory.build(config))
- throwable.getMessage should include(s"${classKeyPrefix}.dummy.transformer.A")
+ throwable.getMessage should include(s"${ClassKeyPrefix}.$dummyTransformerA")
}
it should "throw if data transformer parameter is invalid" in {
val invalidFactoryName = "an-invalid-factory-name"
val config = getBaseConfiguration
- config.addProperty(s"${idsKeyPrefix}.1", "dummy.transformer.A")
- config.addProperty(s"${classKeyPrefix}.dummy.transformer.A", invalidFactoryName)
+ config.addProperty(s"${IdsKeyPrefix}.1", dummyTransformerA)
+ config.addProperty(s"${ClassKeyPrefix}.$dummyTransformerA", invalidFactoryName)
val throwable = intercept[IllegalArgumentException](StreamTransformerAbstractFactory.build(config))
assert(throwable.getMessage.contains(invalidFactoryName))
diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAvroUtil.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAvroUtil.scala
new file mode 100644
index 00000000..24666d18
--- /dev/null
+++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAvroUtil.scala
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.utils
+
+import org.apache.avro.Schema.Parser
+import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder}
+import org.apache.avro.util.Utf8
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+
+class TestAvroUtil extends FlatSpec with Matchers with BeforeAndAfter {
+
+ private val valueSchemaString = raw"""
+ {"type": "record", "name": "schemaName", "fields": [
+ {"type": "int", "name": "record_id"},
+ {"type": ["null", "schemaName"], "name": "child_record", "nullable": true}
+ ]}"""
+
+ private val keySchemaString = raw"""
+ {"type": "record", "name": "keySchema", "fields": [
+ {"type": "string", "name": "key"}
+ ]}"""
+
+ "getIdColumnsFromRecord" should "get the specified fields from the record" in {
+ // given
+ val parser = new Parser()
+ val valueSchema = parser.parse(valueSchemaString)
+ val childRecord2 = new GenericData.Record(valueSchema)
+ childRecord2.put("record_id", 13)
+ childRecord2.put("child_record", null)
+ val childRecord1 = new GenericData.Record(valueSchema)
+ childRecord1.put("record_id", 12)
+ childRecord1.put("child_record", childRecord2)
+ val valueRecord = new GenericData.Record(valueSchema)
+ valueRecord.put("record_id", 11)
+ valueRecord.put("child_record", childRecord1)
+
+ val keySchema = parser.parse(keySchemaString)
+ val keyRecord = new GenericData.Record(keySchema)
+ keyRecord.put("key", new Utf8("abcdef"))
+
+ val consumerRecord: ConsumerRecord[GenericRecord, GenericRecord] =
+ new ConsumerRecord("topicName", 0, 42, keyRecord, valueRecord)
+
+ // when, then
+ AvroUtil.getFromConsumerRecord(consumerRecord, "topic") shouldBe Some("topicName")
+ AvroUtil.getFromConsumerRecord(consumerRecord, "offset") shouldBe Some(42)
+ AvroUtil.getFromConsumerRecord(consumerRecord, "partition") shouldBe Some(0)
+ AvroUtil.getFromConsumerRecord(consumerRecord, "key.key") shouldBe Some("abcdef")
+ AvroUtil.getFromConsumerRecord(consumerRecord, "value.record_id") shouldBe Some(11)
+ AvroUtil.getFromConsumerRecord(consumerRecord, "value.child_record.record_id") shouldBe Some(12)
+ AvroUtil.getFromConsumerRecord(consumerRecord, "value.child_record.child_record.record_id") shouldBe Some(13)
+ }
+
+ it should "return None if a record is not nested as expected" in {
+ // given
+ val parser = new Parser()
+ val valueSchema = parser.parse(valueSchemaString)
+ val valueRecord = new GenericData.Record(valueSchema)
+ valueRecord.put("record_id", 11)
+ valueRecord.put("child_record", null)
+
+ val consumerRecord: ConsumerRecord[GenericRecord, GenericRecord] =
+ new ConsumerRecord("topicName", 0, 42, null, valueRecord)
+
+ // when, then
+ AvroUtil.getFromConsumerRecord(consumerRecord, "value.child_record.child_record.record_id") shouldBe None
+ }
+
+ it should "return None if a field does not exist" in {
+ // given
+ val parser = new Parser()
+ val valueSchema = parser.parse(valueSchemaString)
+ val valueRecord = new GenericData.Record(valueSchema)
+ valueRecord.put("record_id", 11)
+ valueRecord.put("child_record", null)
+
+ val keySchema = parser.parse(keySchemaString)
+ val keyRecord = new GenericData.Record(keySchema)
+ keyRecord.put("key", new Utf8("abcdef"))
+
+ val consumerRecord: ConsumerRecord[GenericRecord, GenericRecord] =
+ new ConsumerRecord("topicName", 0, 42, keyRecord, valueRecord)
+
+ // when, then
+ AvroUtil.getFromConsumerRecord(consumerRecord, "key.some_nonexistent_field") shouldBe None
+ }
+
+ it should "return None if a field does not exist on the consumer record" in {
+ // given
+ val consumerRecord: ConsumerRecord[GenericRecord, GenericRecord] =
+ new ConsumerRecord("topicName", 0, 42, null, null)
+
+ // when, then
+ AvroUtil.getFromConsumerRecord(consumerRecord, "some_nonexistent_field") shouldBe None
+ }
+
+ it should "return None if a field on the record is requested, but the record is null" in {
+ // given
+ val consumerRecord: ConsumerRecord[GenericRecord, GenericRecord] =
+ new ConsumerRecord("topicName", 0, 42, null, null)
+
+ // when, then
+ AvroUtil.getFromConsumerRecord(consumerRecord, "key.some_nonexistent_field") shouldBe None
+ }
+}
diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestKafkaUtil.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestKafkaUtil.scala
new file mode 100644
index 00000000..320f85ae
--- /dev/null
+++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestKafkaUtil.scala
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.utils
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.kafka010.KafkaSourceOffsetProxy
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+import za.co.absa.commons.io.TempDirectory
+import za.co.absa.commons.spark.SparkTestBase
+
+class TestKafkaUtil extends FlatSpec with Matchers with BeforeAndAfter with SparkTestBase {
+ private var baseDir: TempDirectory = _
+
+ before {
+ baseDir = TempDirectory("test-dir").deleteOnExit()
+ }
+
+ after{
+ baseDir.delete()
+ }
+
+ "getLatestOffset" should "return the latest offsets" in {
+ // given
+ val offset0 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 10L), ("t", 1, 110L)))
+ val offset1 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 25L), ("t", 1, 125L)))
+ val offset2 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 42L), ("t", 1, 142L)))
+
+ val offsetSeqLog = new OffsetSeqLog(spark, baseDir.path.toString)
+ offsetSeqLog.add(0, offset0)
+ offsetSeqLog.add(1, offset1)
+ offsetSeqLog.add(2, offset2)
+
+ // when
+ val latestOffset = KafkaUtil.getLatestOffset(offsetSeqLog)
+
+ // then
+ latestOffset.get should contain theSameElementsAs Map(
+ new TopicPartition("t", 0) -> 42L,
+ new TopicPartition("t", 1) -> 142L
+ )
+ }
+
+ it should "return None if there are no offsets" in {
+ val offsetSeqLog = new OffsetSeqLog(spark, baseDir.path.toString)
+
+ val latestOffset = KafkaUtil.getLatestOffset(offsetSeqLog)
+
+ latestOffset shouldBe None
+ }
+
+ it should "return None if the offset is not defined" in {
+ // given
+ val offset = OffsetSeq.fill(null: Offset)
+ val offsetSeqLog = new OffsetSeqLog(spark, baseDir.path.toString)
+ offsetSeqLog.add(0, offset)
+
+ // when
+ val result = KafkaUtil.getLatestOffset(offsetSeqLog)
+
+ // then
+ result shouldBe None
+ }
+
+ it should "throw an exception if the offsets contain multiple sources" in {
+ // given
+ val offset = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 10L)), KafkaSourceOffsetProxy(("t2", 1, 110L)))
+ val offsetSeqLog = new OffsetSeqLog(spark, baseDir.path.toString)
+ offsetSeqLog.add(0, offset)
+
+ // when
+ val exception = the[Exception] thrownBy KafkaUtil.getLatestOffset(offsetSeqLog)
+
+ // then
+ exception.getMessage should include("Cannot support more than 1 source")
+ }
+
+ "getLatestCommittedOffset" should "return the latest committed offset" in {
+ // given
+ val commitLog = new CommitLog(spark, s"${baseDir.path.toString}/commits")
+ commitLog.add(0, CommitMetadata())
+ commitLog.add(1, CommitMetadata())
+
+ val offset0 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 10L), ("t", 1, 110L)))
+ val offset1 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 25L), ("t", 1, 125L)))
+ val offset2 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 42L), ("t", 1, 142L)))
+
+ val offsetSeqLog = new OffsetSeqLog(spark, s"${baseDir.path.toString}/offsets")
+ offsetSeqLog.add(0, offset0)
+ offsetSeqLog.add(1, offset1)
+ offsetSeqLog.add(2, offset2)
+
+ // when
+ val actualOffset = KafkaUtil.getLatestCommittedOffset(offsetSeqLog, commitLog)
+
+ // then
+ actualOffset.get should contain theSameElementsAs Map(
+ new TopicPartition("t", 0) -> 25L,
+ new TopicPartition("t", 1) -> 125L
+ )
+ }
+
+ it should "return None if there is no commit" in {
+ val commitLog = new CommitLog(spark, s"${baseDir.path.toString}/commits")
+
+ val offset0 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 10L), ("t", 1, 110L)))
+ val offset1 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 25L), ("t", 1, 125L)))
+ val offset2 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 42L), ("t", 1, 142L)))
+
+ val offsetSeqLog = new OffsetSeqLog(spark, s"${baseDir.path.toString}/offsets")
+ offsetSeqLog.add(0, offset0)
+ offsetSeqLog.add(1, offset1)
+ offsetSeqLog.add(2, offset2)
+
+ // when
+ val actualOffset = KafkaUtil.getLatestCommittedOffset(offsetSeqLog, commitLog)
+
+ // then
+ actualOffset shouldBe None
+ }
+
+ it should "return throw an exception if there is no offset corresponding to the commit" in {
+ // given
+ val commitLog = new CommitLog(spark, s"${baseDir.path.toString}/commits")
+ commitLog.add(0, CommitMetadata())
+ commitLog.add(1, CommitMetadata())
+
+ val offset0 = OffsetSeq.fill(KafkaSourceOffsetProxy(("t", 0, 10L), ("t", 1, 110L)))
+
+ val offsetSeqLog = new OffsetSeqLog(spark, s"${baseDir.path.toString}/offsets")
+ offsetSeqLog.add(0, offset0)
+
+ // when
+ val result = the[Exception] thrownBy KafkaUtil.getLatestCommittedOffset(offsetSeqLog, commitLog)
+
+ // then
+ result.getMessage should include ("batchId 1")
+ }
+}
diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestKafkaUtilDockerTest.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestKafkaUtilDockerTest.scala
new file mode 100644
index 00000000..94bf0845
--- /dev/null
+++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestKafkaUtilDockerTest.scala
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.ingestor.implementation.utils
+
+import java.time.Duration
+import java.util
+import java.util.UUID.randomUUID
+import java.util.{Collections, Properties}
+
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
+import org.scalatest.{AppendedClues, BeforeAndAfter, FlatSpec, Matchers}
+import org.testcontainers.containers.KafkaContainer
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+class TestKafkaUtilDockerTest extends FlatSpec with Matchers with BeforeAndAfter with AppendedClues {
+
+ private val confluentPlatformVersion = "5.3.4" // should be same as kafka.avro.serializer.version property in pom file
+ private val kafka = new KafkaContainer(confluentPlatformVersion)
+ private val kafkaSufficientTimeout = Duration.ofSeconds(5L)
+ private val kafkaInsufficientTimeout = Duration.ofMillis(1L)
+ private val topic = "test-topic"
+ private val maxPollRecords = 10
+
+ before{
+ kafka.start()
+ }
+
+ after {
+ kafka.stop()
+ }
+
+ "getMessagesAtLeastToOffset" should "get all available messages" in {
+ // given
+ val partitions = 3
+ createTopic(kafka, topic, partitions)
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+ produceData(producer, messages, topic, partitions)
+
+ val consumer = createConsumer(kafka)
+ val topicPartitions = KafkaUtil.getTopicPartitions(consumer, topic)
+ val offsets = consumer.endOffsets(topicPartitions.asJava).asScala.toMap.mapValues(_.asInstanceOf[Long])
+
+ // when
+ implicit val kafkaConsumerTimeout: Duration = kafkaSufficientTimeout
+ val records = KafkaUtil.getMessagesAtLeastToOffset(consumer, offsets)
+
+ // then
+ val actualMessages = records.map(_.value()).toList.sorted
+ actualMessages should contain theSameElementsAs messages
+ }
+
+ it should "stop polling when the desired end offset has been reached and not run infinitely" in {
+ // given
+ val partitions = 3
+ createTopic(kafka, topic, partitions)
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+ produceData(producer, messages, topic, partitions)
+
+ val infiniteProducerThread = new Thread {
+ override def run {
+ var i = 0
+ while (true) {
+ val partition = i % partitions
+ val producerRecord = new ProducerRecord[String, String](topic, partition, null, s"message_${i}")
+ producer.send(producerRecord)
+ i += 1
+ if (i % 100 == 0) {
+ producer.flush()
+ }
+ }
+ }
+ }
+
+ val consumer = createConsumer(kafka)
+ val topicPartitions = KafkaUtil.getTopicPartitions(consumer, topic)
+ val offsets = consumer.endOffsets(topicPartitions.asJava).asScala.toMap.mapValues(_.asInstanceOf[Long])
+ infiniteProducerThread.start()
+
+ // when
+ implicit val kafkaConsumerTimeout: Duration = kafkaSufficientTimeout
+ val records = KafkaUtil.getMessagesAtLeastToOffset(consumer, offsets)
+
+ // then
+ val actualMessages = records.map(_.value()).toList.sorted
+ actualMessages should contain allElementsOf messages
+
+ // cleanup
+ infiniteProducerThread.interrupt()
+ }
+
+ it should "throw an exception if consumer is already subscribed" in {
+ // given
+ createTopic(kafka, topic, 1)
+
+ val consumer = createConsumer(kafka)
+ consumer.subscribe(Collections.singletonList(topic))
+
+ // when
+ implicit val kafkaConsumerTimeout: Duration = kafkaInsufficientTimeout
+ val exception = the[Exception] thrownBy KafkaUtil.getMessagesAtLeastToOffset(consumer, Map(new TopicPartition(topic, 0) -> 0))
+
+ // then
+ exception.getMessage should include ("Subscription to topics, partitions and pattern are mutually exclusive")
+ }
+
+ it should "throw an exception if not all messages could be consumed (because the timeout is too short)" in {
+ // given
+ val partitions = 3
+ createTopic(kafka, topic, partitions)
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+ produceData(producer, messages, topic, partitions)
+
+ val consumer = createConsumer(kafka)
+ val topicPartitions = KafkaUtil.getTopicPartitions(consumer, topic)
+ consumer.assign(topicPartitions.asJava)
+ consumer.seekToBeginning(topicPartitions.asJava)
+ val offsets = consumer.endOffsets(topicPartitions.asJava).asScala.toMap.mapValues(_.asInstanceOf[Long])
+
+ // when
+ implicit val kafkaConsumerTimeout: Duration = kafkaInsufficientTimeout
+ val exception = the[Exception] thrownBy KafkaUtil.getMessagesAtLeastToOffset(consumer, offsets)
+
+ // then
+ exception.getMessage should include ("Not all expected messages were consumed")
+ }
+
+ it should "throw an exception if requested offsets are not available" in {
+ // given
+ val partitions = 3
+ createTopic(kafka, topic, partitions)
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+ produceData(producer, messages, topic, partitions)
+
+ val consumer = createConsumer(kafka)
+ val topicPartitions = KafkaUtil.getTopicPartitions(consumer, topic)
+ val offsets = consumer.endOffsets(topicPartitions.asJava).asScala.toMap.mapValues(_ * 2L)
+
+ // when
+ implicit val kafkaConsumerTimeout: Duration = kafkaInsufficientTimeout
+ val exception = the[Exception] thrownBy KafkaUtil.getMessagesAtLeastToOffset(consumer, offsets)
+
+ // then
+ exception.getMessage should include ("Requested consumption")
+ }
+
+ "getAtLeastNLatestRecords" should "get at least the n latest records if there are gaps in the offsets" in {
+ val messageCreationTimeout = 100L
+ val partitions = 3
+ createTopic(kafka, topic, partitions, Map(
+ "cleanup.policy" -> "compact",
+ "delete.retention.ms" -> "100",
+ "segment.ms" -> s"$messageCreationTimeout",
+ "min.cleanable.dirty.ratio" -> "0.01"
+ ))
+
+ val producer = createProducer(kafka)
+ val messages = (1 to 103).map(i => {
+ val key = if (i % 2 == 0 || i > 100) 1000 + i else 1
+ (key.toString, s"msg_${i}", i % partitions)
+ })
+ produceData(producer, messages, topic, Some(messageCreationTimeout))
+
+ val waitForCompactionMillis = 20000L
+ Thread.sleep(waitForCompactionMillis)
+
+ val testConsumer = createConsumer(kafka)
+ testConsumer.subscribe(Collections.singletonList(topic))
+ import scala.util.control.Breaks._
+ var records: Seq[ConsumerRecord[String, String]] = mutable.Seq()
+ breakable {
+ while (true) {
+ val newRecords = testConsumer.poll(kafkaSufficientTimeout).asScala.toSeq
+ records ++= newRecords
+ if (newRecords.isEmpty) {
+ break()
+ }
+ }
+ }
+
+ withClue(){
+ records.size shouldBe messages.map(r => (r._1, r._3)).distinct.size
+ } withClue(s"This is likely an artifact of the test itself. You may want to increase waitForCompactionMillis." +
+ s" The current value is $waitForCompactionMillis")
+
+ val consumer = createConsumer(kafka)
+ implicit val kafkaConsumerTimeout: Duration = kafkaSufficientTimeout
+ val recordsPerPartition = (0 to partitions).map(p => new TopicPartition(topic, p) -> 4L).toMap
+ val actualRecords = KafkaUtil.getAtLeastNLatestRecordsFromPartition(consumer, recordsPerPartition)
+ val values = actualRecords.map(_.value())
+
+ values.size should be >= 12
+ values should contain allElementsOf Seq("msg_103", "msg_102", "msg_101", "msg_100", "msg_99", "msg_97", "msg_95",
+ "msg_98", "msg_96", "msg_94", "msg_92", "msg_90")
+ }
+
+ it should "get from multiple topics simultaneously" in {
+ // given
+ val partitions = 3
+ createTopic(kafka, topic, partitions)
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+ produceData(producer, messages, topic, partitions)
+
+ val consumer = createConsumer(kafka)
+ val topicPartitions = KafkaUtil.getTopicPartitions(consumer, topic)
+
+ // when
+ implicit val kafkaConsumerTimeout: Duration = kafkaSufficientTimeout
+ val recordsPerPartition = topicPartitions.map(t => t -> 1000L).toMap
+ val records = KafkaUtil.getAtLeastNLatestRecordsFromPartition(consumer, recordsPerPartition)
+
+ // then
+ val actualMessages = records.map(_.value()).toList.sorted
+ actualMessages should contain theSameElementsAs messages
+ }
+
+ it should "throw an exception if the timeout is too short" in {
+ createTopic(kafka, topic, 1)
+
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+
+ produceData(producer, messages, topic, 1)
+
+ val consumer = createConsumer(kafka)
+ implicit val kafkaConsumerTimeout: Duration = kafkaInsufficientTimeout
+ val result = the[Exception] thrownBy KafkaUtil.getAtLeastNLatestRecordsFromPartition(consumer, Map(new TopicPartition(topic, 0) -> 10))
+ result.getMessage should include("increasing the consumer timeout")
+ }
+
+ "getTopicPartitions" should "return the partitions" in {
+ createTopic(kafka, topic, 10)
+ val consumer = createConsumer(kafka)
+
+ val topicPartitions = KafkaUtil.getTopicPartitions(consumer, topic)
+
+ val expectedPartitions = (0 until 10).map(i => new TopicPartition(topic, i))
+ topicPartitions should contain theSameElementsAs expectedPartitions
+ }
+
+ "seekToOffsetsOrBeginning" should "seek to the provided offsets" in {
+ // given
+ createTopic(kafka, topic, 3)
+ val consumer = createConsumer(kafka)
+
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+ produceData(producer, messages, topic, 3)
+
+ val tp0 = new TopicPartition(topic, 0)
+ val tp1 = new TopicPartition(topic, 1)
+ val tp2 = new TopicPartition(topic, 2)
+ val offsets = Map(
+ tp0 -> 10L,
+ tp1 -> 15L,
+ tp2 -> 20L
+ )
+
+ // when
+ KafkaUtil.seekToOffsetsOrBeginning(consumer, topic, Some(offsets))
+
+ // then
+ consumer.position(tp0) shouldBe 10L
+ consumer.position(tp1) shouldBe 15L
+ consumer.position(tp2) shouldBe 20L
+ }
+
+ it should "seek to the beginning if no offsets are given" in {
+ // given
+ createTopic(kafka, topic, 3)
+ val consumer = createConsumer(kafka)
+
+ val producer = createProducer(kafka)
+ val messages = (1 to 100).map(i => s"message_${i}")
+ produceData(producer, messages, topic, 3)
+
+ val tp0 = new TopicPartition(topic, 0)
+ val tp1 = new TopicPartition(topic, 1)
+ val tp2 = new TopicPartition(topic, 2)
+
+ // when
+ KafkaUtil.seekToOffsetsOrBeginning(consumer, topic, None)
+
+ // then
+ consumer.position(tp0) shouldBe 0L
+ consumer.position(tp1) shouldBe 0L
+ consumer.position(tp2) shouldBe 0L
+ }
+
+ private def createTopic(kafkaContainer: KafkaContainer, topicName: String, partitions: Int, extraConfig: Map[String, String] = Map()): Unit = {
+ val config = new Properties()
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers)
+ val localKafkaAdmin = AdminClient.create(config)
+ val replication = 1.toShort
+ val topic = new NewTopic(topicName, partitions, replication).configs(extraConfig.asJava)
+ val topicCreationFut = localKafkaAdmin.createTopics(util.Arrays.asList(topic)).all()
+ topicCreationFut.get()
+ }
+
+ def createProducer(kafkaContainer: KafkaContainer): KafkaProducer[String, String] = {
+ val props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers)
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, randomUUID().toString)
+ props.put(ProducerConfig.ACKS_CONFIG, "1")
+ new KafkaProducer[String, String](props)
+ }
+
+ def createConsumer(kafkaContainer: KafkaContainer): KafkaConsumer[String, String] = {
+ import org.apache.kafka.clients.consumer.ConsumerConfig
+ val props = new Properties()
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, randomUUID.toString)
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers)
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+ new KafkaConsumer[String, String](props)
+ }
+
+ private def produceData(producer: KafkaProducer[String, String], valueRecords: Seq[String], topic: String, partitions: Int): Unit = {
+ val records = valueRecords.zipWithIndex.map {
+ case (value, i) => (null, value, i % partitions)
+ }
+ produceData(producer, records, topic)
+ }
+
+ private def produceData(producer: KafkaProducer[String, String], records: Seq[(String, String, Int)], topic: String,
+ timeout: Option[Long] = None): Unit = {
+ records.foreach {
+ record =>
+ val producerRecord = new ProducerRecord[String, String](topic, record._3, record._1, record._2)
+ producer.send(producerRecord)
+ timeout.foreach(Thread.sleep)
+ }
+ producer.flush()
+ }
+}
+
+