Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
plugins {
id("org.hypertrace.repository-plugin") version "0.2.3"
id("org.hypertrace.docker-plugin") version "0.7.1"
id("org.hypertrace.docker-publish-plugin") version "0.7.1"
}
Expand All @@ -7,4 +8,4 @@ hypertraceDocker {
defaultImage {
imageName.set("pinot")
}
}
}
3 changes: 3 additions & 0 deletions pinot-servicemanager/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ ARG JITPACK_TAG=23797914c6fea24e34f3ebfca9a73e0fff72c2b7
USER root
WORKDIR /install

# Copy hypertrace plugins
COPY build/plugins plugins/

COPY schemas/* schemas/

COPY install.sh /tmp/
Expand Down
32 changes: 32 additions & 0 deletions pinot-servicemanager/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,9 +1,41 @@
plugins {
`java-library`
id("org.hypertrace.docker-publish-plugin")
}

hypertraceDocker {
defaultImage {
imageName.set("pinot-servicemanager")
tasks.named(buildTaskName) {
dependsOn("copyPlugins")
}
}
}

configure<JavaPluginExtension> {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}

val plugins by configurations.creating

dependencies {
compileOnly("org.apache.pinot:pinot-spi:0.5.0")
compileOnly("org.apache.pinot:pinot-avro-base:0.5.0")
compileOnly("org.apache.kafka:kafka-streams:5.5.1-ccs")
compileOnly("org.apache.kafka:kafka-clients:5.5.1-ccs")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") {
// disable the transitive dependencies and use them from pinot itself.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is always a risky move, as we're now forced to ensure that the dependencies of this module and those existing in pinot are in sync, when either one can move.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we understand your concern, and we internally had this discussion before we proceed with this implementation.

This Serde implementation depends on the following interfaces.

  1. Serde from kafka stream
  2. Serializer and Deserializer from kafka client
  3. StreamMessageDecoder, RecordExtractor from pinot

As the above interfaces are public and stable interfaces, the risk involved incompatibility is very low and this Serde is used only in HT quickstart mode. Secondly, we are planning to contribute - GenericAvroMessageDecoder - to pinot-community for quickstart mode.

cc: @laxmanchekka

isTransitive = false
}
}

dependencies {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this circularly creating a project dependency on itself? This should move to a separate module.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will handle this as part of the UDF PR since it's conflicted from this anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for moving to pinot-avro-serde as part of your PR.

plugins(project(":${project.name}"))
}

tasks.register<Sync>("copyPlugins") {
from(plugins)
into("${buildDir}/plugins")
}

9 changes: 8 additions & 1 deletion pinot-servicemanager/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
set -eux

# Choose the main distribution and the plugins we use
for artifactId in pinot-distribution pinot-confluent-avro pinot-kafka-2.0
for artifactId in pinot-distribution pinot-confluent-avro pinot-avro pinot-kafka-2.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need pinot-avro?
Also do we need pinot-confluent-avro now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, in this image, we can remove it. Will try this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we will keep this as we may want to switch back to schema-registry-based deployment.

do
# Download scripts and config for Kafka and ZooKeeper, but not for Connect
wget -qO temp.zip https://jitpack.io/com/github/${JITPACK_USER}/incubator-pinot/${artifactId}/${JITPACK_TAG}/${artifactId}-${JITPACK_TAG}-shaded.jar
Expand All @@ -15,6 +15,13 @@ do
rm -rf temp.zip classes/META-INF/license
done

# copy hypertrace plugins
for JAR in plugins/*
do
unzip -qo $JAR -d classes
done

rm -rf plugins
# TODO: try maven-dependency-plugin:unpack instead of wget
# https://maven.apache.org/plugins/maven-dependency-plugin/examples/unpacking-artifacts.html
# https://github.com/hypertrace/pinot/issues/16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
},
"tableIndexConfig": {
"streamConfigs": {
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"streamType": "kafka",
"stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181",
"realtime.segment.flush.threshold.size": "500000",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.zk.broker.url": "zookeeper:2181",
"realtime.segment.flush.threshold.time": "3600000",
"stream.kafka.broker.list": "kafka:9092",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
},
"tableIndexConfig": {
"streamConfigs": {
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"streamType": "kafka",
"stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181",
"realtime.segment.flush.threshold.size": "500000",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.zk.broker.url": "zookeeper:2181",
"realtime.segment.flush.threshold.time": "3600000",
"stream.kafka.broker.list": "kafka:9092",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
},
"tableIndexConfig": {
"streamConfigs": {
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"streamType": "kafka",
"stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181",
"realtime.segment.flush.threshold.size": "500000",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.zk.broker.url": "zookeeper:2181",
"realtime.segment.flush.threshold.time": "3600000",
"stream.kafka.broker.list": "kafka:9092",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
},
"tableIndexConfig": {
"streamConfigs": {
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"streamType": "kafka",
"stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181",
"realtime.segment.flush.threshold.size": "500000",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.zk.broker.url": "zookeeper:2181",
"realtime.segment.flush.threshold.time": "3600000",
"stream.kafka.broker.list": "kafka:9092",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
},
"tableIndexConfig": {
"streamConfigs": {
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"streamType": "kafka",
"stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181",
"realtime.segment.flush.threshold.size": "500000",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.zk.broker.url": "zookeeper:2181",
"realtime.segment.flush.threshold.time": "3600000",
"stream.kafka.broker.list": "kafka:9092",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.hypertrace.pinot.plugins;

import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.hypertrace.core.kafkastreams.framework.serdes.GenericAvroSerde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class GenericAvroMessageDecoder implements StreamMessageDecoder<byte[]> {

private static final Logger LOGGER = LoggerFactory.getLogger(GenericAvroMessageDecoder.class);
private GenericAvroSerde genericAvroSerde;
private String topicName;
private RecordExtractor<Record> _avroRecordExtractor;


@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
throws Exception {
LOGGER.info(
String.format("initializing generic avro message based serde for topic:%s", topicName));

this.topicName = topicName;
genericAvroSerde = new GenericAvroSerde();

String recordExtractorClass = null;
if (props != null) {
recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
}
// Backward compatibility to support Avro by default
if (recordExtractorClass == null) {
recordExtractorClass = AvroRecordExtractor.class.getName();
}
_avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass);
_avroRecordExtractor.init(fieldsToRead, null);

LOGGER.info(String
.format("Successfully initialized generic avro message based serde for topic:%s",
topicName));
}

/**
* {@inheritDoc}
*
* <p>NOTE: the payload should contain message content only (without header).
*/
@Override
public GenericRow decode(byte[] payload, GenericRow destination) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So as far as I understand this will read the payload from the message and ignore the schema ID to proceed with a local avro schema?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write schema will be part of the payload if we use genericAvroSerde. So, internally, it will first read the schema and use it. Can check here - https://github.com/hypertrace/kafka-streams-framework/blob/main/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/GenericAvroDeserializer.java

GenericData.Record record = (GenericData.Record) genericAvroSerde.deserializer()
.deserialize(this.topicName, payload);
return _avroRecordExtractor.extract(record, destination);
}

/**
* {@inheritDoc}
*
* <p>NOTE: the payload should contain message content only (without header).
*/
@Override
public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
return decode(Arrays.copyOfRange(payload, offset, offset + length), destination);
}
}