Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,12 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.value.schema.naming.strategy` | Yes | Subject name strategy of Schema Registry. Possible values are `topic.name`, `record.name` or `topic.record.name`. Equivalent to ABRiS property `SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY` |
| `transformer.{transformer-id}.value.schema.record.name` | Yes for naming strategies `record.name` and `topic.record.name` | Name of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY` |
| `transformer.{transformer-id}.value.schema.record.namespace` | Yes for naming strategies `record.name` and `topic.record.name` | Namespace of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY` |
| `transformer.{transformer-id}.value.optional.fields` | No | Comma-separated list of nullable value columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
| `transformer.{transformer-id}.produce.keys` | No | If set to `true`, keys will be produced according to the properties `key.column.prefix` and `key.column.names` of the [Hyperdrive Context](#hyperdrive-context) |
| `transformer.{transformer-id}.key.schema.naming.strategy` | Yes if `produce.keys` is true | Subject name strategy for key |
| `transformer.{transformer-id}.key.schema.record.name` | Yes for key naming strategies `record.name` and `topic.record.name` | Name of the record. |
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.JsonProperties
import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -74,6 +75,7 @@ private[transformer] class ConfluentAvroEncodingTransformer(
}

object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with ConfluentAvroEncodingTransformerAttributes {
private val logger = LogManager.getLogger

object AbrisConfigKeys extends AbrisProducerConfigKeys {
override val topic: String = KEY_TOPIC
Expand All @@ -93,12 +95,23 @@ object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with Co

def getKeyAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_KEY_OPTIONAL_FIELDS, config)
.map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap)
.getOrElse(Map())

val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues)
logger.info(s"Generated key schema\n${schema.toString(true)}")
AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig)
}

def getValueAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_VALUE_OPTIONAL_FIELDS, config)
.map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap)
.getOrElse(Map())
val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues)
logger.info(s"Generated value schema\n${schema.toString(true)}")
AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes
val KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY = "value.schema.naming.strategy"
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME = "value.schema.record.name"
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE = "value.schema.record.namespace"
val KEY_VALUE_OPTIONAL_FIELDS = "value.optional.fields"

val KEY_PRODUCE_KEYS = "produce.keys"
val KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY = "key.schema.naming.strategy"
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME = "key.schema.record.name"
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE = "key.schema.record.namespace"
val KEY_KEY_OPTIONAL_FIELDS = "key.optional.fields"

override def getName: String = "Confluent Avro Stream Encoder"

Expand All @@ -40,12 +42,14 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes
Some("Record name for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE -> PropertyMetadata("Value-Record namespace",
Some("Record namespace for naming strategies record.name or topic.record.name"), required = false),
KEY_VALUE_OPTIONAL_FIELDS -> PropertyMetadata("Value-Record optional fields", Some("Comma-separated list of nullable value columns that should get default value null in the avro schema"), required = false),

KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY -> PropertyMetadata("Key-Schema naming strategy",
Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false),
KEY_KEY_OPTIONAL_FIELDS -> PropertyMetadata("Key-Record optional fields", Some("Comma-separated list of nullable key columns that should get default value null in the avro schema"), required = false)
)

override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package za.co.absa.hyperdrive.ingestor.implementation.utils

import org.apache.avro.{JsonProperties, Schema}
import org.apache.commons.configuration2.Configuration
import org.apache.spark.sql.avro.SchemaConverters.toAvroType
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -63,32 +64,27 @@ private[hyperdrive] object AbrisConfigUtil {
fromSchemaRegisteringConfigFragment.usingSchemaRegistry(schemaRegistryConfig)
}

def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema,
schemaRegistryConfig: Map[String, String]): ToAvroConfig =
getProducerSettings(configuration, configKeys, isKey = true, expression, schemaRegistryConfig)
getProducerSettings(configuration, configKeys, isKey = true, schema, schemaRegistryConfig)

def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema,
schemaRegistryConfig: Map[String, String]): ToAvroConfig =
getProducerSettings(configuration, configKeys, isKey = false, expression, schemaRegistryConfig)
getProducerSettings(configuration, configKeys, isKey = false, schema, schemaRegistryConfig)

private def getProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, isKey: Boolean,
expression: Expression, schemaRegistryConfig: Map[String, String]): ToAvroConfig = {
schema: Schema, schemaRegistryConfig: Map[String, String]): ToAvroConfig = {
val schemaManager = SchemaManagerFactory.create(schemaRegistryConfig)
val topic = getTopic(configuration, configKeys)
val namingStrategy = getNamingStrategy(configuration, configKeys)
val schemaId = namingStrategy match {
case TopicNameStrategy =>
val schema = toAvroType(expression.dataType, expression.nullable)
val subject = SchemaSubject.usingTopicNameStrategy(topic, isKey)
schemaManager.register(subject, schema)
case RecordNameStrategy =>
val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys),
getRecordNamespace(configuration, configKeys))
val subject = SchemaSubject.usingRecordNameStrategy(schema)
schemaManager.register(subject, schema)
case TopicRecordNameStrategy =>
val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys),
getRecordNamespace(configuration, configKeys))
val subject = SchemaSubject.usingTopicRecordNameStrategy(topic, schema)
schemaManager.register(subject, schema)
case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name")
Expand All @@ -100,6 +96,75 @@ private[hyperdrive] object AbrisConfigUtil {
.usingSchemaRegistry(schemaRegistryConfig)
}

/**
* Generates an avro schema given a Spark expression. Record name and namespace are derived according to the
* configured naming strategy. Default values for the avro schema can be passed using a key-value map. The keys
* need to correspond to the field names. In case of nested structs, nested field names should be concatenated
* using the dot (.), e.g. "parent.childField.subChildField". Note that dots in avro field names are not allowed.
*/
def generateSchema(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
newDefaultValues: Map[String, Object]): Schema = {
val namingStrategy = getNamingStrategy(configuration, configKeys)
val initialSchema = namingStrategy match {
case TopicNameStrategy => toAvroType(expression.dataType, expression.nullable)
case x if x == RecordNameStrategy || x == TopicRecordNameStrategy => toAvroType(expression.dataType,
expression.nullable, getRecordName(configuration, configKeys), getRecordNamespace(configuration, configKeys))
case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name")
}

updateSchema(initialSchema, newDefaultValues)
}

/**
* This method is intended to update schemas created by [[org.apache.spark.sql.avro.SchemaConverters.toAvroType]] with
* new default values.
* Apart from the basic types, it only supports the complex types Record, Map and Array. New default values for Enum
* or Fixed cannot be assigned. Updating default values for the union type is only supported for a union with null.
* The correct order of arbitrary unions with respect to the given default value is not guaranteed.
*/
private def updateSchema(schema: Schema, newDefaultValues: Map[String, Object], fieldPrefix: String = ""): Schema = {
val prefixSeparator = if (fieldPrefix.isEmpty) "" else "."
import scala.collection.JavaConverters._
schema.getType match {
case Schema.Type.UNION =>
val newSchemas = schema.getTypes.asScala.map(t =>
updateSchema(t, newDefaultValues, fieldPrefix)
)
Schema.createUnion(newSchemas.asJava)
case Schema.Type.RECORD =>
val newFields = schema.getFields.asScala.map(f => {
val fullFieldName = s"$fieldPrefix$prefixSeparator${f.name()}"
val defaultValue = newDefaultValues.getOrElse(fullFieldName, f.defaultVal())
val newSchema = updateSchema(f.schema(), newDefaultValues, fullFieldName)
val newSchemaReordered = reorderUnionTypesForDefaultValueNull(newSchema, defaultValue)
new Schema.Field(f.name(), newSchemaReordered, f.doc(), defaultValue, f.order())
})
Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, newFields.asJava)
case Schema.Type.ARRAY =>
val newSchema = updateSchema(schema.getElementType, newDefaultValues, fieldPrefix)
Schema.createArray(newSchema)
case Schema.Type.MAP =>
val newSchema = updateSchema(schema.getValueType, newDefaultValues, fieldPrefix)
Schema.createMap(newSchema)
case _ => schema
}
}

private def reorderUnionTypesForDefaultValueNull(schema: Schema, defaultValue: Object) = {
import scala.collection.JavaConverters._
lazy val schemaTypes = schema.getTypes.asScala
if (schema.getType == Schema.Type.UNION &&
schemaTypes.size == 2 &&
schemaTypes.head.getType != Schema.Type.NULL &&
schemaTypes(1).getType == Schema.Type.NULL &&
defaultValue.isInstanceOf[JsonProperties.Null]
) {
Schema.createUnion(Schema.create(Schema.Type.NULL), schemaTypes.head)
} else {
schema
}
}

private def getTopic(configuration: Configuration, configKeys: AbrisConfigKeys): String =
getOrThrow(configKeys.topic, configuration, errorMessage = s"Topic not found. Is '${configKeys.topic}' properly set?")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.commons.configuration2.BaseConfiguration
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{array, lit, map, struct}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, StructField, StructType}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.config.AbrisConfig
import za.co.absa.commons.spark.SparkTestBase
import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
import za.co.absa.hyperdrive.ingestor.implementation.testutils.HyperdriveMockSchemaRegistryClient
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer._
import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil
Expand All @@ -31,11 +40,11 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B

private val topic = "topic"
private val SchemaRegistryURL = "http://localhost:8081"

private var mockSchemaRegistryClient: MockSchemaRegistryClient = _
behavior of ConfluentAvroEncodingTransformer.getClass.getSimpleName

before {
val mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
SchemaManagerFactory.resetSRClientInstance()
SchemaManagerFactory.addSRClientInstance(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> SchemaRegistryURL), mockSchemaRegistryClient)
}
Expand All @@ -52,7 +61,7 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
encoder.withKey shouldBe false
}

it should "encode the values" in {
"transform" should "encode the values" in {
// given
import spark.implicits._
val queryName = "dummyQuery"
Expand Down Expand Up @@ -83,4 +92,78 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
val byteArrays = outputDf.select("value").map(_ (0).asInstanceOf[Array[Byte]]).collect()
byteArrays.distinct.length shouldBe byteArrays.length
}

it should "register a schema with optional fields" in {
// given
val schema = StructType(Seq(
StructField("key__col1", IntegerType, nullable = true),
StructField("col2", StringType, nullable = true),
StructField("col3", StructType(
Seq(StructField("subCol1", StringType, nullable = true))
), nullable = true)
)
)
HyperdriveContext.put(HyperdriveContextKeys.keyColumnPrefix, "key__")
HyperdriveContext.put(HyperdriveContextKeys.keyColumnNames, Seq("col1"))
val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(schema))

val config = new BaseConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KafkaStreamWriter.KEY_TOPIC, topic)
config.addProperty(KEY_SCHEMA_REGISTRY_URL, SchemaRegistryURL)
config.addProperty(KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY, AbrisConfigUtil.TopicNameStrategy)
config.addProperty(KEY_PRODUCE_KEYS, "true")
config.addProperty(KEY_KEY_OPTIONAL_FIELDS, "col1")
config.addProperty(KEY_VALUE_OPTIONAL_FIELDS, "col2, col3, col3.subCol1")
val encoder = ConfluentAvroEncodingTransformer(config)

val expectedKeySchemaString = {
raw"""{
| "type" : "record",
| "name" : "topLevelRecord",
| "fields" : [ {
| "name" : "col1",
| "type" : [ "null", "int" ],
| "default" : null
| } ]
|}
|""".stripMargin
}
val expectedKeySchema = AvroSchemaUtils.parse(expectedKeySchemaString)

val expectedValueSchemaString =
raw"""{
| "type" : "record",
| "name" : "topLevelRecord",
| "fields" : [ {
| "name" : "col2",
| "type" : [ "null", "string" ],
| "default" : null
| }, {
| "name" : "col3",
| "type" : [ "null", {
| "type" : "record",
| "name" : "col3",
| "namespace" : "topLevelRecord",
| "fields" : [ {
| "name" : "subCol1",
| "type" : [ "null", "string" ],
| "default" : null
| } ]
| } ],
| "default" : null
| } ]
|}
|""".stripMargin
val expectedValueSchema = AvroSchemaUtils.parse(expectedValueSchemaString)

// when
encoder.transform(memoryStream.toDF())

// then
val keySchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-key")
keySchema.getSchema shouldBe expectedKeySchema.toString
val valueSchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-value")
valueSchema.getSchema shouldBe expectedValueSchema.toString
}
}
Loading