diff --git a/build.gradle.kts b/build.gradle.kts index 4abf39e..e69af34 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,5 @@ plugins { + id("org.hypertrace.repository-plugin") version "0.2.3" id("org.hypertrace.docker-plugin") version "0.7.1" id("org.hypertrace.docker-publish-plugin") version "0.7.1" } @@ -7,4 +8,4 @@ hypertraceDocker { defaultImage { imageName.set("pinot") } -} +} \ No newline at end of file diff --git a/pinot-servicemanager/Dockerfile b/pinot-servicemanager/Dockerfile index 24f6058..d602ba5 100644 --- a/pinot-servicemanager/Dockerfile +++ b/pinot-servicemanager/Dockerfile @@ -12,6 +12,9 @@ ARG JITPACK_TAG=23797914c6fea24e34f3ebfca9a73e0fff72c2b7 USER root WORKDIR /install +# Copy hypertrace plugins +COPY build/plugins plugins/ + COPY schemas/* schemas/ COPY install.sh /tmp/ diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 238701f..db2d70d 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -1,9 +1,41 @@ plugins { + `java-library` id("org.hypertrace.docker-publish-plugin") } hypertraceDocker { defaultImage { imageName.set("pinot-servicemanager") + tasks.named(buildTaskName) { + dependsOn("copyPlugins") + } } } + +configure { + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 +} + +val plugins by configurations.creating + +dependencies { + compileOnly("org.apache.pinot:pinot-spi:0.5.0") + compileOnly("org.apache.pinot:pinot-avro-base:0.5.0") + compileOnly("org.apache.kafka:kafka-streams:5.5.1-ccs") + compileOnly("org.apache.kafka:kafka-clients:5.5.1-ccs") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") { + // disable the transitive dependencies and use them from pinot itself. + isTransitive = false + } +} + +dependencies { + plugins(project(":${project.name}")) +} + +tasks.register("copyPlugins") { + from(plugins) + into("${buildDir}/plugins") +} + diff --git a/pinot-servicemanager/install.sh b/pinot-servicemanager/install.sh index e043aed..7d45af0 100755 --- a/pinot-servicemanager/install.sh +++ b/pinot-servicemanager/install.sh @@ -5,7 +5,7 @@ set -eux # Choose the main distribution and the plugins we use -for artifactId in pinot-distribution pinot-confluent-avro pinot-kafka-2.0 +for artifactId in pinot-distribution pinot-confluent-avro pinot-avro pinot-kafka-2.0 do # Download scripts and config for Kafka and ZooKeeper, but not for Connect wget -qO temp.zip https://jitpack.io/com/github/${JITPACK_USER}/incubator-pinot/${artifactId}/${JITPACK_TAG}/${artifactId}-${JITPACK_TAG}-shaded.jar @@ -15,6 +15,13 @@ do rm -rf temp.zip classes/META-INF/license done +# copy hypertrace plugins +for JAR in plugins/* +do + unzip -qo $JAR -d classes +done + +rm -rf plugins # TODO: try maven-dependency-plugin:unpack instead of wget # https://maven.apache.org/plugins/maven-dependency-plugin/examples/unpacking-artifacts.html # https://github.com/hypertrace/pinot/issues/16 diff --git a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json index 86df90c..84023f9 100644 --- a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json @@ -16,12 +16,11 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json index 412935d..4d5d4ab 100644 --- a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json @@ -16,12 +16,11 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json index 859f21a..bf111ca 100644 --- a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json @@ -16,12 +16,11 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json index 15ba0bf..0a9b9d0 100644 --- a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json @@ -16,12 +16,11 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json index 683ba32..e793516 100644 --- a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json @@ -16,12 +16,11 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java new file mode 100644 index 0000000..026d1e5 --- /dev/null +++ b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java @@ -0,0 +1,73 @@ +package org.hypertrace.pinot.plugins; + +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordExtractor; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.hypertrace.core.kafkastreams.framework.serdes.GenericAvroSerde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@NotThreadSafe +public class GenericAvroMessageDecoder implements StreamMessageDecoder { + + private static final Logger LOGGER = LoggerFactory.getLogger(GenericAvroMessageDecoder.class); + private GenericAvroSerde genericAvroSerde; + private String topicName; + private RecordExtractor _avroRecordExtractor; + + + @Override + public void init(Map props, Set fieldsToRead, String topicName) + throws Exception { + LOGGER.info( + String.format("initializing generic avro message based serde for topic:%s", topicName)); + + this.topicName = topicName; + genericAvroSerde = new GenericAvroSerde(); + + String recordExtractorClass = null; + if (props != null) { + recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY); + } + // Backward compatibility to support Avro by default + if (recordExtractorClass == null) { + recordExtractorClass = AvroRecordExtractor.class.getName(); + } + _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); + _avroRecordExtractor.init(fieldsToRead, null); + + LOGGER.info(String + .format("Successfully initialized generic avro message based serde for topic:%s", + topicName)); + } + + /** + * {@inheritDoc} + * + *

NOTE: the payload should contain message content only (without header). + */ + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + GenericData.Record record = (GenericData.Record) genericAvroSerde.deserializer() + .deserialize(this.topicName, payload); + return _avroRecordExtractor.extract(record, destination); + } + + /** + * {@inheritDoc} + * + *

NOTE: the payload should contain message content only (without header). + */ + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } +}