diff --git a/README.md b/README.md index 0966b5df..4ce0a3d6 100644 --- a/README.md +++ b/README.md @@ -157,10 +157,12 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github. | `transformer.{transformer-id}.value.schema.naming.strategy` | Yes | Subject name strategy of Schema Registry. Possible values are `topic.name`, `record.name` or `topic.record.name`. Equivalent to ABRiS property `SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY` | | `transformer.{transformer-id}.value.schema.record.name` | Yes for naming strategies `record.name` and `topic.record.name` | Name of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY` | | `transformer.{transformer-id}.value.schema.record.namespace` | Yes for naming strategies `record.name` and `topic.record.name` | Namespace of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY` | +| `transformer.{transformer-id}.value.optional.fields` | No | Comma-separated list of nullable value columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) | | `transformer.{transformer-id}.produce.keys` | No | If set to `true`, keys will be produced according to the properties `key.column.prefix` and `key.column.names` of the [Hyperdrive Context](#hyperdrive-context) | | `transformer.{transformer-id}.key.schema.naming.strategy` | Yes if `produce.keys` is true | Subject name strategy for key | | `transformer.{transformer-id}.key.schema.record.name` | Yes for key naming strategies `record.name` and `topic.record.name` | Name of the record. | | `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. | +| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) | | `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `:`. It will be passed as `basic.auth.user.info` to the schema registry config | Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.` diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformer.scala index e364e408..6c7fc027 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformer.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformer.scala @@ -15,6 +15,7 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent +import org.apache.avro.JsonProperties import org.apache.commons.configuration2.Configuration import org.apache.logging.log4j.LogManager import org.apache.spark.sql.DataFrame @@ -74,6 +75,7 @@ private[transformer] class ConfluentAvroEncodingTransformer( } object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with ConfluentAvroEncodingTransformerAttributes { + private val logger = LogManager.getLogger object AbrisConfigKeys extends AbrisProducerConfigKeys { override val topic: String = KEY_TOPIC @@ -93,12 +95,23 @@ object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with Co def getKeyAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = { val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config) - AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig) + val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_KEY_OPTIONAL_FIELDS, config) + .map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap) + .getOrElse(Map()) + + val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues) + logger.info(s"Generated key schema\n${schema.toString(true)}") + AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig) } def getValueAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = { val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config) - AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig) + val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_VALUE_OPTIONAL_FIELDS, config) + .map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap) + .getOrElse(Map()) + val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues) + logger.info(s"Generated value schema\n${schema.toString(true)}") + AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig) } } diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformerAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformerAttributes.scala index edd3182d..ac10be76 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformerAttributes.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformerAttributes.scala @@ -21,11 +21,13 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes val KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY = "value.schema.naming.strategy" val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME = "value.schema.record.name" val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE = "value.schema.record.namespace" + val KEY_VALUE_OPTIONAL_FIELDS = "value.optional.fields" val KEY_PRODUCE_KEYS = "produce.keys" val KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY = "key.schema.naming.strategy" val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME = "key.schema.record.name" val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE = "key.schema.record.namespace" + val KEY_KEY_OPTIONAL_FIELDS = "key.optional.fields" override def getName: String = "Confluent Avro Stream Encoder" @@ -40,12 +42,14 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes Some("Record name for naming strategies record.name or topic.record.name"), required = false), KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE -> PropertyMetadata("Value-Record namespace", Some("Record namespace for naming strategies record.name or topic.record.name"), required = false), + KEY_VALUE_OPTIONAL_FIELDS -> PropertyMetadata("Value-Record optional fields", Some("Comma-separated list of nullable value columns that should get default value null in the avro schema"), required = false), KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY -> PropertyMetadata("Key-Schema naming strategy", Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = false), KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false), KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false), - KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form : for basic auth in schema registry"), required = false) + KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form : for basic auth in schema registry"), required = false), + KEY_KEY_OPTIONAL_FIELDS -> PropertyMetadata("Key-Record optional fields", Some("Comma-separated list of nullable key columns that should get default value null in the avro schema"), required = false) ) override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AbrisConfigUtil.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AbrisConfigUtil.scala index fc7aef3f..d11261f4 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AbrisConfigUtil.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AbrisConfigUtil.scala @@ -15,6 +15,7 @@ package za.co.absa.hyperdrive.ingestor.implementation.utils +import org.apache.avro.{JsonProperties, Schema} import org.apache.commons.configuration2.Configuration import org.apache.spark.sql.avro.SchemaConverters.toAvroType import org.apache.spark.sql.catalyst.expressions.Expression @@ -63,32 +64,27 @@ private[hyperdrive] object AbrisConfigUtil { fromSchemaRegisteringConfigFragment.usingSchemaRegistry(schemaRegistryConfig) } - def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression, + def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema, schemaRegistryConfig: Map[String, String]): ToAvroConfig = - getProducerSettings(configuration, configKeys, isKey = true, expression, schemaRegistryConfig) + getProducerSettings(configuration, configKeys, isKey = true, schema, schemaRegistryConfig) - def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression, + def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema, schemaRegistryConfig: Map[String, String]): ToAvroConfig = - getProducerSettings(configuration, configKeys, isKey = false, expression, schemaRegistryConfig) + getProducerSettings(configuration, configKeys, isKey = false, schema, schemaRegistryConfig) private def getProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, isKey: Boolean, - expression: Expression, schemaRegistryConfig: Map[String, String]): ToAvroConfig = { + schema: Schema, schemaRegistryConfig: Map[String, String]): ToAvroConfig = { val schemaManager = SchemaManagerFactory.create(schemaRegistryConfig) val topic = getTopic(configuration, configKeys) val namingStrategy = getNamingStrategy(configuration, configKeys) val schemaId = namingStrategy match { case TopicNameStrategy => - val schema = toAvroType(expression.dataType, expression.nullable) val subject = SchemaSubject.usingTopicNameStrategy(topic, isKey) schemaManager.register(subject, schema) case RecordNameStrategy => - val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys), - getRecordNamespace(configuration, configKeys)) val subject = SchemaSubject.usingRecordNameStrategy(schema) schemaManager.register(subject, schema) case TopicRecordNameStrategy => - val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys), - getRecordNamespace(configuration, configKeys)) val subject = SchemaSubject.usingTopicRecordNameStrategy(topic, schema) schemaManager.register(subject, schema) case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name") @@ -100,6 +96,75 @@ private[hyperdrive] object AbrisConfigUtil { .usingSchemaRegistry(schemaRegistryConfig) } + /** + * Generates an avro schema given a Spark expression. Record name and namespace are derived according to the + * configured naming strategy. Default values for the avro schema can be passed using a key-value map. The keys + * need to correspond to the field names. In case of nested structs, nested field names should be concatenated + * using the dot (.), e.g. "parent.childField.subChildField". Note that dots in avro field names are not allowed. + */ + def generateSchema(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression, + newDefaultValues: Map[String, Object]): Schema = { + val namingStrategy = getNamingStrategy(configuration, configKeys) + val initialSchema = namingStrategy match { + case TopicNameStrategy => toAvroType(expression.dataType, expression.nullable) + case x if x == RecordNameStrategy || x == TopicRecordNameStrategy => toAvroType(expression.dataType, + expression.nullable, getRecordName(configuration, configKeys), getRecordNamespace(configuration, configKeys)) + case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name") + } + + updateSchema(initialSchema, newDefaultValues) + } + + /** + * This method is intended to update schemas created by [[org.apache.spark.sql.avro.SchemaConverters.toAvroType]] with + * new default values. + * Apart from the basic types, it only supports the complex types Record, Map and Array. New default values for Enum + * or Fixed cannot be assigned. Updating default values for the union type is only supported for a union with null. + * The correct order of arbitrary unions with respect to the given default value is not guaranteed. + */ + private def updateSchema(schema: Schema, newDefaultValues: Map[String, Object], fieldPrefix: String = ""): Schema = { + val prefixSeparator = if (fieldPrefix.isEmpty) "" else "." + import scala.collection.JavaConverters._ + schema.getType match { + case Schema.Type.UNION => + val newSchemas = schema.getTypes.asScala.map(t => + updateSchema(t, newDefaultValues, fieldPrefix) + ) + Schema.createUnion(newSchemas.asJava) + case Schema.Type.RECORD => + val newFields = schema.getFields.asScala.map(f => { + val fullFieldName = s"$fieldPrefix$prefixSeparator${f.name()}" + val defaultValue = newDefaultValues.getOrElse(fullFieldName, f.defaultVal()) + val newSchema = updateSchema(f.schema(), newDefaultValues, fullFieldName) + val newSchemaReordered = reorderUnionTypesForDefaultValueNull(newSchema, defaultValue) + new Schema.Field(f.name(), newSchemaReordered, f.doc(), defaultValue, f.order()) + }) + Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, newFields.asJava) + case Schema.Type.ARRAY => + val newSchema = updateSchema(schema.getElementType, newDefaultValues, fieldPrefix) + Schema.createArray(newSchema) + case Schema.Type.MAP => + val newSchema = updateSchema(schema.getValueType, newDefaultValues, fieldPrefix) + Schema.createMap(newSchema) + case _ => schema + } + } + + private def reorderUnionTypesForDefaultValueNull(schema: Schema, defaultValue: Object) = { + import scala.collection.JavaConverters._ + lazy val schemaTypes = schema.getTypes.asScala + if (schema.getType == Schema.Type.UNION && + schemaTypes.size == 2 && + schemaTypes.head.getType != Schema.Type.NULL && + schemaTypes(1).getType == Schema.Type.NULL && + defaultValue.isInstanceOf[JsonProperties.Null] + ) { + Schema.createUnion(Schema.create(Schema.Type.NULL), schemaTypes.head) + } else { + schema + } + } + private def getTopic(configuration: Configuration, configKeys: AbrisConfigKeys): String = getOrThrow(configKeys.topic, configuration, errorMessage = s"Topic not found. Is '${configKeys.topic}' properly set?") diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/TestConfluentAvroEncodingTransformer.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/TestConfluentAvroEncodingTransformer.scala index 0770c574..6dab67fc 100644 --- a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/TestConfluentAvroEncodingTransformer.scala +++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/TestConfluentAvroEncodingTransformer.scala @@ -15,13 +15,22 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.apache.commons.configuration2.BaseConfiguration +import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.{array, lit, map, struct} import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, StructField, StructType} import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} +import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory import za.co.absa.abris.config.AbrisConfig import za.co.absa.commons.spark.SparkTestBase +import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext +import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys import za.co.absa.hyperdrive.ingestor.implementation.testutils.HyperdriveMockSchemaRegistryClient import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer._ import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil @@ -31,11 +40,11 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B private val topic = "topic" private val SchemaRegistryURL = "http://localhost:8081" - + private var mockSchemaRegistryClient: MockSchemaRegistryClient = _ behavior of ConfluentAvroEncodingTransformer.getClass.getSimpleName before { - val mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient() + mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient() SchemaManagerFactory.resetSRClientInstance() SchemaManagerFactory.addSRClientInstance(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> SchemaRegistryURL), mockSchemaRegistryClient) } @@ -52,7 +61,7 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B encoder.withKey shouldBe false } - it should "encode the values" in { + "transform" should "encode the values" in { // given import spark.implicits._ val queryName = "dummyQuery" @@ -83,4 +92,78 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B val byteArrays = outputDf.select("value").map(_ (0).asInstanceOf[Array[Byte]]).collect() byteArrays.distinct.length shouldBe byteArrays.length } + + it should "register a schema with optional fields" in { + // given + val schema = StructType(Seq( + StructField("key__col1", IntegerType, nullable = true), + StructField("col2", StringType, nullable = true), + StructField("col3", StructType( + Seq(StructField("subCol1", StringType, nullable = true)) + ), nullable = true) + ) + ) + HyperdriveContext.put(HyperdriveContextKeys.keyColumnPrefix, "key__") + HyperdriveContext.put(HyperdriveContextKeys.keyColumnNames, Seq("col1")) + val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(schema)) + + val config = new BaseConfiguration() + config.setListDelimiterHandler(new DefaultListDelimiterHandler(',')) + config.addProperty(KafkaStreamWriter.KEY_TOPIC, topic) + config.addProperty(KEY_SCHEMA_REGISTRY_URL, SchemaRegistryURL) + config.addProperty(KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY, AbrisConfigUtil.TopicNameStrategy) + config.addProperty(KEY_PRODUCE_KEYS, "true") + config.addProperty(KEY_KEY_OPTIONAL_FIELDS, "col1") + config.addProperty(KEY_VALUE_OPTIONAL_FIELDS, "col2, col3, col3.subCol1") + val encoder = ConfluentAvroEncodingTransformer(config) + + val expectedKeySchemaString = { + raw"""{ + | "type" : "record", + | "name" : "topLevelRecord", + | "fields" : [ { + | "name" : "col1", + | "type" : [ "null", "int" ], + | "default" : null + | } ] + |} + |""".stripMargin + } + val expectedKeySchema = AvroSchemaUtils.parse(expectedKeySchemaString) + + val expectedValueSchemaString = + raw"""{ + | "type" : "record", + | "name" : "topLevelRecord", + | "fields" : [ { + | "name" : "col2", + | "type" : [ "null", "string" ], + | "default" : null + | }, { + | "name" : "col3", + | "type" : [ "null", { + | "type" : "record", + | "name" : "col3", + | "namespace" : "topLevelRecord", + | "fields" : [ { + | "name" : "subCol1", + | "type" : [ "null", "string" ], + | "default" : null + | } ] + | } ], + | "default" : null + | } ] + |} + |""".stripMargin + val expectedValueSchema = AvroSchemaUtils.parse(expectedValueSchemaString) + + // when + encoder.transform(memoryStream.toDF()) + + // then + val keySchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-key") + keySchema.getSchema shouldBe expectedKeySchema.toString + val valueSchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-value") + valueSchema.getSchema shouldBe expectedValueSchema.toString + } } diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAbrisConfigUtil.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAbrisConfigUtil.scala index 616227b2..ddf4464d 100644 --- a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAbrisConfigUtil.scala +++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAbrisConfigUtil.scala @@ -16,15 +16,16 @@ package za.co.absa.hyperdrive.ingestor.implementation.utils import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient +import org.apache.avro.JsonProperties import org.apache.commons.configuration2.BaseConfiguration import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType} import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory import za.co.absa.abris.config.AbrisConfig import za.co.absa.hyperdrive.ingestor.implementation.testutils.HyperdriveMockSchemaRegistryClient -import za.co.absa.hyperdrive.ingestor.implementation.testutils.abris.AbrisTestUtil.{getFromSchemaString, getSchemaId, getSchemaRegistryConf, getToSchemaString} +import za.co.absa.hyperdrive.ingestor.implementation.testutils.abris.AbrisTestUtil.{getFromSchemaString, getSchemaId, getSchemaRegistryConf} class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { @@ -48,9 +49,9 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { ] }""" } + private val dummyRecordNameSchema = AvroSchemaUtils.parse(getSchemaString(recordName, recordNamespace)) private val dummyTopicNameSchema = AvroSchemaUtils.parse(getSchemaString("topLevelRecord", "")) - private val dummyExpr = struct(lit(null).cast(IntegerType).as(columnName)).expr private val keyTopic = "kafka.topic" private val keySchemaRegistryUrl = "schema.registry.url" @@ -58,12 +59,14 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { private val keySchemaRegistryNamingStrategy = "schema.registry.naming.strategy" private val keySchemaRegistryRecordName = "schema.registry.record.name" private val keySchemaRegistryRecordNamespace = "schema.registry.record.namespace" + private object ProducerConfigKeys extends AbrisProducerConfigKeys { override val namingStrategy: String = keySchemaRegistryNamingStrategy override val recordName: String = keySchemaRegistryRecordName override val recordNamespace: String = keySchemaRegistryRecordNamespace override val topic: String = keyTopic } + private object ConsumerConfigKeys extends AbrisConsumerConfigKeys { override val schemaId: String = keySchemaRegistrySchemaId override val namingStrategy: String = keySchemaRegistryNamingStrategy @@ -81,6 +84,79 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { SchemaManagerFactory.addSRClientInstance(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> dummySchemaRegistryUrl), mockSchemaRegistryClient) } + "generateSchema" should "generate a schema with topic name strategy" in { + val config = createBaseConfiguration + config.addProperty(keySchemaRegistryNamingStrategy, AbrisConfigUtil.TopicNameStrategy) + val dummyExpr = struct(lit(null).cast(IntegerType).as(columnName)).expr + + val schema = AbrisConfigUtil.generateSchema(config, ProducerConfigKeys, dummyExpr, Map()) + + schema shouldBe dummyTopicNameSchema + } + + it should "generate a schema with record name strategy" in { + val config = createBaseConfiguration + config.addProperty(keySchemaRegistryNamingStrategy, AbrisConfigUtil.RecordNameStrategy) + config.addProperty(keySchemaRegistryRecordName, recordName) + config.addProperty(keySchemaRegistryRecordNamespace, recordNamespace) + val dummyExpr = struct(lit(null).cast(IntegerType).as(columnName)).expr + + val schema = AbrisConfigUtil.generateSchema(config, ProducerConfigKeys, dummyExpr, Map()) + + schema shouldBe dummyRecordNameSchema + } + + it should "set default values for the specified fields" in { + val config = createBaseConfiguration + config.addProperty(keySchemaRegistryNamingStrategy, AbrisConfigUtil.TopicNameStrategy) + val expression = struct( + lit(null).cast(IntegerType).as("col1"), + lit("abc").cast(StringType).as("col2"), + struct( + lit(null).cast(IntegerType).as("subcol1") + ).as("col3"), + array(struct( + lit(null).cast(BooleanType).as("subcol1") + )).as("col4"), + map( + lit("abc").as("keycol1"), + struct(lit(null).cast(StringType).as("valuecol1") + )).as("col5") + ).expr + + val expectedSchemaString = + raw"""{ + | "type": "record", + | "name": "topLevelRecord", + | "fields":[ + | {"name": "col1", "type": ["int", "null"], "default": 42 }, + | {"name": "col2", "type": "string"}, + | {"name": "col3", "type": + | {"type": "record", "name": "col3", "namespace": "topLevelRecord", "fields":[ + | {"name": "subcol1", "type": ["null", "int"], "default": null}]}}, + | {"name": "col4", "type": + | { "type": "array", "items": + | { "type": "record", "name": "col4", "namespace": "topLevelRecord", "fields":[ + | {"name": "subcol1", "type": ["null", "boolean"], "default": null}]}}}, + | {"name": "col5", "type": + | { "type":"map", "values": + | { "type": "record", "name": "col5", "namespace":"topLevelRecord", "fields":[ + | {"name":"valuecol1","type":["null","string"], "default": null}]}}} + | ] + |} + |""".stripMargin + val expectedSchema = AvroSchemaUtils.parse(expectedSchemaString) + + val schema = AbrisConfigUtil.generateSchema(config, ProducerConfigKeys, expression, Map( + "col1" -> 42.asInstanceOf[Object], + "col3.subcol1" -> JsonProperties.NULL_VALUE, + "col4.subcol1" -> JsonProperties.NULL_VALUE, + "col5.valuecol1" -> JsonProperties.NULL_VALUE + )) + + schema shouldBe expectedSchema + } + "getKeyProducerSettings" should "return settings and register subject with topic name strategy" in { // given val config = createBaseConfiguration @@ -89,10 +165,9 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { config.addProperty(keySchemaRegistryNamingStrategy, AbrisConfigUtil.TopicNameStrategy) // when - val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr, schemaRegistryConfig) + val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyTopicNameSchema, schemaRegistryConfig) // then - getToSchemaString(settings) shouldBe dummyTopicNameSchema.toString getSchemaId(settings) shouldBe Some(1) mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"${topic}-key") } @@ -107,10 +182,9 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { config.addProperty(keySchemaRegistryRecordNamespace, recordNamespace) // when - val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr, schemaRegistryConfig) + val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyRecordNameSchema, schemaRegistryConfig) // then - getToSchemaString(settings) shouldBe dummyRecordNameSchema.toString getSchemaId(settings) shouldBe Some(1) mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$recordNamespace.$recordName") } @@ -125,10 +199,9 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { config.addProperty(keySchemaRegistryRecordNamespace, recordNamespace) // when - val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr, schemaRegistryConfig) + val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyRecordNameSchema, schemaRegistryConfig) // then - getToSchemaString(settings) shouldBe dummyRecordNameSchema.toString getSchemaId(settings) shouldBe Some(1) mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$topic-$recordNamespace.$recordName") } @@ -141,10 +214,9 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter { val schemaRegistryConfig = createBaseSchemaRegistryConfig // when - val settings = AbrisConfigUtil.getValueProducerSettings(config, ProducerConfigKeys, dummyExpr, schemaRegistryConfig) + val settings = AbrisConfigUtil.getValueProducerSettings(config, ProducerConfigKeys, dummyTopicNameSchema, schemaRegistryConfig) // then - getToSchemaString(settings) shouldBe dummyTopicNameSchema.toString getSchemaId(settings) shouldBe Some(1) mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$topic-value") }