From 6fb0306881f6e6f11b1866128c04e9a5880baedd Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 29 Sep 2020 16:48:31 +0530 Subject: [PATCH 01/12] testing generic serde changes --- build.gradle.kts | 2 + pinot-servicemanager/build.gradle.kts | 9 +++ pinot-servicemanager/install.sh | 2 +- .../backendEntityView-tableConfigFile.json | 6 +- .../rawServiceView-tableConfigFile.json | 6 +- .../schemas/rawTraceView-tableConfigFile.json | 6 +- .../serviceCallView-tableConfigFile.json | 6 +- .../spanEventView-tableConfigFile.json | 6 +- .../pinot/plugins/AvroRecordExtractor.java | 38 +++++++++++++ .../plugins/GenericAvroMessageDecoder.java | 55 +++++++++++++++++++ settings.gradle.kts | 2 +- 11 files changed, 121 insertions(+), 17 deletions(-) create mode 100644 pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java create mode 100644 pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java diff --git a/build.gradle.kts b/build.gradle.kts index 4abf39e..b4a9fab 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,6 @@ plugins { + id("org.hypertrace.repository-plugin") version "0.2.3" + id("org.hypertrace.ci-utils-plugin") version "0.1.4" id("org.hypertrace.docker-plugin") version "0.7.1" id("org.hypertrace.docker-publish-plugin") version "0.7.1" } diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 238701f..0a2a0e6 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -1,4 +1,6 @@ plugins { + `java-library` + id("org.hypertrace.docker-java-application-plugin") version "0.7.1" apply true id("org.hypertrace.docker-publish-plugin") } @@ -7,3 +9,10 @@ hypertraceDocker { imageName.set("pinot-servicemanager") } } + +dependencies { + implementation("org.apache.pinot:pinot-spi:0.5.0") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.10") +} + + diff --git a/pinot-servicemanager/install.sh b/pinot-servicemanager/install.sh index e043aed..f7546f1 100755 --- a/pinot-servicemanager/install.sh +++ b/pinot-servicemanager/install.sh @@ -5,7 +5,7 @@ set -eux # Choose the main distribution and the plugins we use -for artifactId in pinot-distribution pinot-confluent-avro pinot-kafka-2.0 +for artifactId in pinot-distribution pinot-confluent-avro pinot-avro pinot-kafka-2.0 do # Download scripts and config for Kafka and ZooKeeper, but not for Connect wget -qO temp.zip https://jitpack.io/com/github/${JITPACK_USER}/incubator-pinot/${artifactId}/${JITPACK_TAG}/${artifactId}-${JITPACK_TAG}-shaded.jar diff --git a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json index 86df90c..1ccda47 100644 --- a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json @@ -16,12 +16,12 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "simple", + "stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"BackendEntityView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"span_id\",\"type\":\"bytes\"},{\"name\":\"backend_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"backend_host\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"backend_port\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"backend_protocol\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"backend_path\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"span_kind\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0},{\"name\":\"num_calls\",\"type\":\"int\",\"default\":0},{\"name\":\"backend_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"backend_trace_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"display_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"status_code\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status_message\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"tags\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},\"default\":{}},{\"name\":\"caller_service_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"caller_api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}", + "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json index 412935d..35fac98 100644 --- a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json @@ -16,12 +16,12 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "simple", + "stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"RawServiceView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"span_id\",\"type\":\"bytes\"},{\"name\":\"service_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"start_time_millis\",\"type\":\"long\"},{\"name\":\"end_time_millis\",\"type\":\"long\"},{\"name\":\"duration_millis\",\"type\":\"long\"},{\"name\":\"span_kind\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0},{\"name\":\"protocol_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status_code\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"service_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"parent_span_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"num_calls\",\"type\":\"int\",\"default\":0},{\"name\":\"transaction_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_discovery_state\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}", + "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json index 859f21a..0b8719b 100644 --- a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json @@ -16,12 +16,12 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "simple", + "stream.kafka.decoder.prop.schema":"{\"type\":\"record\",\"name\":\"RawTraceView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"transaction_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"services\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},\"default\":[]},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":-1},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":-1},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"num_services\",\"type\":\"int\",\"default\":0},{\"name\":\"num_spans\",\"type\":\"int\",\"default\":0}]}", + "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json index 15ba0bf..bcf7db1 100644 --- a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json @@ -16,12 +16,12 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "simple", + "stream.kafka.decoder.prop.schema":"{\"type\":\"record\",\"name\":\"ServiceCallView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"transaction_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"client_event_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"server_event_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"caller_service\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"caller_api\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_service\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_api\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"request_url\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"request_method\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"protocol_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"response_status_code\",\"type\":\"int\",\"default\":0},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0},{\"name\":\"caller_service_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"caller_api_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_service_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_api_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"num_calls\",\"type\":\"int\",\"default\":1},{\"name\":\"callee_backend_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_backend_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}", + "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json index 683ba32..c016ead 100644 --- a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json @@ -16,12 +16,12 @@ }, "tableIndexConfig": { "streamConfigs": { - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "streamType": "kafka", - "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.consumer.type": "simple", + "stream.kafka.decoder.prop.schema":"{\"type\":\"record\",\"name\":\"SpanEventView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"span_id\",\"type\":\"bytes\"},{\"name\":\"span_kind\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"parent_span_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"service_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"entry_api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"protocol_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"tags\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},\"default\":{}},{\"name\":\"status_code\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"api_trace_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"service_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_boundary_type\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status_message\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_trace_count\",\"type\":\"int\",\"default\":0},{\"name\":\"display_entity_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"display_span_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"request_url\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"api_discovery_state\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0}]}", + "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", - "stream.kafka.consumer.type": "LowLevel", "stream.kafka.zk.broker.url": "zookeeper:2181", "realtime.segment.flush.threshold.time": "3600000", "stream.kafka.broker.list": "kafka:9092", diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java new file mode 100644 index 0000000..76b205d --- /dev/null +++ b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java @@ -0,0 +1,38 @@ +package org.hypertrace.pinot.plugins; + +import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.reflect.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordExtractor; +import org.apache.pinot.spi.data.readers.RecordExtractorConfig; + +public class AvroRecordExtractor implements RecordExtractor { + private Set _fields; + private boolean _extractAll = false; + + @Override + public void init(Set fields, @Nullable RecordExtractorConfig recordExtractorConfig) { + _fields = fields; + if (fields == null || fields.isEmpty()) { + _extractAll = true; + } + } + + @Override + public GenericRow extract(GenericRecord from, GenericRow to) { + if (_extractAll) { + List fields = from.getSchema().getFields(); + for (Schema.Field field : fields) { + String fieldName = field.name(); + to.putValue(fieldName, AvroUtils.convert(from.get(fieldName))); + } + } else { + for (String fieldName : _fields) { + to.putValue(fieldName, AvroUtils.convert(from.get(fieldName))); + } + } + return to; + } +} diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java new file mode 100644 index 0000000..d040706 --- /dev/null +++ b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java @@ -0,0 +1,55 @@ +package org.hypertrace.pinot.plugins; + +import java.util.Map; +import java.util.Set; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordExtractor; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.hypertrace.core.kafkastreams.framework.serdes.GenericAvroSerde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@NotThreadSafe +public class GenericAvroMessageDecoder implements StreamMessageDecoder { + + private static final Logger LOGGER = LoggerFactory.getLogger(GenericAvroMessageDecoder.class); + GenericAvroSerde genericAvroSerde; + String topicName; + private DatumReader _datumReader; + private RecordExtractor _avroRecordExtractor; + private BinaryDecoder _binaryDecoderToReuse; + private GenericData.Record _avroRecordToReuse; + + @Override + public void init(Map props, Set fieldsToRead, String topicName) + throws Exception { + this.topicName = topicName; + //_datumReader = new GenericDatumReader<>(_avroSchema); + genericAvroSerde = new GenericAvroSerde(); + String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY); + // Backward compatibility to support Avro by default + if (recordExtractorClass == null) { + recordExtractorClass = AvroRecordExtractor.class.getName(); + } + _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); + _avroRecordExtractor.init(fieldsToRead, null); + } + + /** + * {@inheritDoc} + * + *

NOTE: the payload should contain message content only (without header). + */ + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + GenericData.Record record = (GenericData.Record) genericAvroSerde.deserializer() + .deserialize(this.topicName, payload); + return _avroRecordExtractor.extract(record, destination); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index cc4f819..4d266b6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -5,7 +5,7 @@ pluginManagement { maven("https://dl.bintray.com/hypertrace/maven") } } - +rootProject.name = "pinot" plugins { id("org.hypertrace.version-settings") version "0.1.1" } From 6daaa1600a62ffc033501993762424a268b26355 Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 29 Sep 2020 17:08:46 +0530 Subject: [PATCH 02/12] implement StreamMessageDecoder interface --- pinot-servicemanager/build.gradle.kts | 1 + .../pinot/plugins/AvroRecordExtractor.java | 38 ------------------- .../plugins/GenericAvroMessageDecoder.java | 22 ++++++++--- 3 files changed, 17 insertions(+), 44 deletions(-) delete mode 100644 pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 0a2a0e6..18b5dba 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -12,6 +12,7 @@ hypertraceDocker { dependencies { implementation("org.apache.pinot:pinot-spi:0.5.0") + implementation("org.apache.pinot:pinot-avro-base:0.5.0") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.10") } diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java deleted file mode 100644 index 76b205d..0000000 --- a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/AvroRecordExtractor.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.hypertrace.pinot.plugins; - -import java.util.Set; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.reflect.Nullable; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordExtractor; -import org.apache.pinot.spi.data.readers.RecordExtractorConfig; - -public class AvroRecordExtractor implements RecordExtractor { - private Set _fields; - private boolean _extractAll = false; - - @Override - public void init(Set fields, @Nullable RecordExtractorConfig recordExtractorConfig) { - _fields = fields; - if (fields == null || fields.isEmpty()) { - _extractAll = true; - } - } - - @Override - public GenericRow extract(GenericRecord from, GenericRow to) { - if (_extractAll) { - List fields = from.getSchema().getFields(); - for (Schema.Field field : fields) { - String fieldName = field.name(); - to.putValue(fieldName, AvroUtils.convert(from.get(fieldName))); - } - } else { - for (String fieldName : _fields) { - to.putValue(fieldName, AvroUtils.convert(from.get(fieldName))); - } - } - return to; - } -} diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java index d040706..234bf99 100644 --- a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java +++ b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java @@ -1,12 +1,12 @@ package org.hypertrace.pinot.plugins; +import java.util.Arrays; import java.util.Map; import java.util.Set; import javax.annotation.concurrent.NotThreadSafe; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordExtractor; import org.apache.pinot.spi.plugin.PluginManager; @@ -21,17 +21,17 @@ public class GenericAvroMessageDecoder implements StreamMessageDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(GenericAvroMessageDecoder.class); GenericAvroSerde genericAvroSerde; String topicName; - private DatumReader _datumReader; private RecordExtractor _avroRecordExtractor; - private BinaryDecoder _binaryDecoderToReuse; - private GenericData.Record _avroRecordToReuse; + @Override public void init(Map props, Set fieldsToRead, String topicName) throws Exception { this.topicName = topicName; - //_datumReader = new GenericDatumReader<>(_avroSchema); + genericAvroSerde = new GenericAvroSerde(); + genericAvroSerde.configure(props, false); + String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY); // Backward compatibility to support Avro by default if (recordExtractorClass == null) { @@ -52,4 +52,14 @@ public GenericRow decode(byte[] payload, GenericRow destination) { .deserialize(this.topicName, payload); return _avroRecordExtractor.extract(record, destination); } + + /** + * {@inheritDoc} + * + *

NOTE: the payload should contain message content only (without header). + */ + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } } From c407e68f894567724ee0d6309d535fc8fdfb0240 Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 29 Sep 2020 17:45:22 +0530 Subject: [PATCH 03/12] modify the dockerfile to use generic avro serde --- pinot-servicemanager/Dockerfile | 3 +++ pinot-servicemanager/build.gradle.kts | 1 + 2 files changed, 4 insertions(+) diff --git a/pinot-servicemanager/Dockerfile b/pinot-servicemanager/Dockerfile index 24f6058..53033b4 100644 --- a/pinot-servicemanager/Dockerfile +++ b/pinot-servicemanager/Dockerfile @@ -32,6 +32,9 @@ WORKDIR /opt/pinot RUN adduser -g '' -h ${PWD} -D ${USER} USER ${USER} +# Copy generic avro serde +COPY --chown=${USER} build/classes/java/main/ ./classes/ + # Copy binaries and config we installed earlier COPY --from=install --chown=${USER} /install . diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 18b5dba..3096eaa 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -7,6 +7,7 @@ plugins { hypertraceDocker { defaultImage { imageName.set("pinot-servicemanager") + dockerFile.set(file("Dockerfile")) } } From 045853ee99d091f0188ce8a10f3246ffcd7c4fef Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 29 Sep 2020 17:50:37 +0530 Subject: [PATCH 04/12] using generic-avro-serde as streming config --- pinot-servicemanager/build.gradle.kts | 2 +- .../schemas/backendEntityView-tableConfigFile.json | 3 +-- .../schemas/rawServiceView-tableConfigFile.json | 3 +-- pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json | 3 +-- .../schemas/serviceCallView-tableConfigFile.json | 3 +-- .../schemas/spanEventView-tableConfigFile.json | 3 +-- 6 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 3096eaa..d1dd659 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -14,7 +14,7 @@ hypertraceDocker { dependencies { implementation("org.apache.pinot:pinot-spi:0.5.0") implementation("org.apache.pinot:pinot-avro-base:0.5.0") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.10") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") } diff --git a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json index 1ccda47..6f2827b 100644 --- a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json @@ -18,8 +18,7 @@ "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "simple", - "stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"BackendEntityView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"span_id\",\"type\":\"bytes\"},{\"name\":\"backend_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"backend_host\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"backend_port\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"backend_protocol\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"backend_path\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"span_kind\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0},{\"name\":\"num_calls\",\"type\":\"int\",\"default\":0},{\"name\":\"backend_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"backend_trace_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"display_name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"status_code\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status_message\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"tags\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},\"default\":{}},{\"name\":\"caller_service_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"caller_api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}", - "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", "stream.kafka.zk.broker.url": "zookeeper:2181", diff --git a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json index 35fac98..bbc7a77 100644 --- a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json @@ -18,8 +18,7 @@ "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "simple", - "stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"RawServiceView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"span_id\",\"type\":\"bytes\"},{\"name\":\"service_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"start_time_millis\",\"type\":\"long\"},{\"name\":\"end_time_millis\",\"type\":\"long\"},{\"name\":\"duration_millis\",\"type\":\"long\"},{\"name\":\"span_kind\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0},{\"name\":\"protocol_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status_code\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"service_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"parent_span_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"num_calls\",\"type\":\"int\",\"default\":0},{\"name\":\"transaction_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_discovery_state\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}", - "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", "stream.kafka.zk.broker.url": "zookeeper:2181", diff --git a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json index 0b8719b..fed9064 100644 --- a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json @@ -18,8 +18,7 @@ "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "simple", - "stream.kafka.decoder.prop.schema":"{\"type\":\"record\",\"name\":\"RawTraceView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"transaction_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"services\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},\"default\":[]},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":-1},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":-1},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"num_services\",\"type\":\"int\",\"default\":0},{\"name\":\"num_spans\",\"type\":\"int\",\"default\":0}]}", - "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", "stream.kafka.zk.broker.url": "zookeeper:2181", diff --git a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json index bcf7db1..c410f78 100644 --- a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json @@ -18,8 +18,7 @@ "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "simple", - "stream.kafka.decoder.prop.schema":"{\"type\":\"record\",\"name\":\"ServiceCallView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"transaction_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"client_event_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"server_event_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"caller_service\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"caller_api\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_service\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_api\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"request_url\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"request_method\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"protocol_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"response_status_code\",\"type\":\"int\",\"default\":0},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0},{\"name\":\"caller_service_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"caller_api_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_service_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_api_id_str\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"num_calls\",\"type\":\"int\",\"default\":1},{\"name\":\"callee_backend_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"callee_backend_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}", - "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", "stream.kafka.zk.broker.url": "zookeeper:2181", diff --git a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json index c016ead..ed3dab3 100644 --- a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json @@ -18,8 +18,7 @@ "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "simple", - "stream.kafka.decoder.prop.schema":"{\"type\":\"record\",\"name\":\"SpanEventView\",\"namespace\":\"org.hypertrace.viewgenerator.api\",\"fields\":[{\"name\":\"tenant_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"span_id\",\"type\":\"bytes\"},{\"name\":\"span_kind\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"parent_span_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"trace_id\",\"type\":\"bytes\"},{\"name\":\"service_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"entry_api_id\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"protocol_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"tags\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},\"default\":{}},{\"name\":\"status_code\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"start_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"end_time_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"duration_millis\",\"type\":\"long\",\"default\":0},{\"name\":\"api_trace_id\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"service_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_boundary_type\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status_message\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"status\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"api_trace_count\",\"type\":\"int\",\"default\":0},{\"name\":\"display_entity_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"display_span_name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"request_url\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"error_count\",\"type\":\"int\",\"default\":0},{\"name\":\"api_discovery_state\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"exception_count\",\"type\":\"int\",\"default\":0}]}", - "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", + "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", "stream.kafka.zk.broker.url": "zookeeper:2181", From 8a6c9b2d9ab72d8626cadd53237469b820d003b2 Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 29 Sep 2020 20:24:24 +0530 Subject: [PATCH 05/12] using generic avro serde for making schema-registry optional --- build.gradle.kts | 1 - pinot-servicemanager/build.gradle.kts | 1 - settings.gradle.kts | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index b4a9fab..9ae5c1d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,6 +1,5 @@ plugins { id("org.hypertrace.repository-plugin") version "0.2.3" - id("org.hypertrace.ci-utils-plugin") version "0.1.4" id("org.hypertrace.docker-plugin") version "0.7.1" id("org.hypertrace.docker-publish-plugin") version "0.7.1" } diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index d1dd659..886dacd 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -1,6 +1,5 @@ plugins { `java-library` - id("org.hypertrace.docker-java-application-plugin") version "0.7.1" apply true id("org.hypertrace.docker-publish-plugin") } diff --git a/settings.gradle.kts b/settings.gradle.kts index 4d266b6..cc4f819 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -5,7 +5,7 @@ pluginManagement { maven("https://dl.bintray.com/hypertrace/maven") } } -rootProject.name = "pinot" + plugins { id("org.hypertrace.version-settings") version "0.1.1" } From e52f26e335eb9938685c2e3ce3f5349061741cd8 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 30 Sep 2020 09:39:10 +0530 Subject: [PATCH 06/12] adding deps --- build.gradle.kts | 9 ++++ pinot-servicemanager/install.sh | 10 +++- .../backendEntityView-tableConfigFile.json | 2 +- .../rawServiceView-tableConfigFile.json | 2 +- .../schemas/rawTraceView-tableConfigFile.json | 2 +- .../serviceCallView-tableConfigFile.json | 2 +- .../spanEventView-tableConfigFile.json | 2 +- .../plugins/GenericAvroMessageDecoder.java | 51 ++++++++++++++----- 8 files changed, 61 insertions(+), 19 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 9ae5c1d..a963301 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -9,3 +9,12 @@ hypertraceDocker { imageName.set("pinot") } } + +subprojects { + pluginManager.withPlugin("java") { + configure { + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 + } + } +} \ No newline at end of file diff --git a/pinot-servicemanager/install.sh b/pinot-servicemanager/install.sh index f7546f1..d4fdab5 100755 --- a/pinot-servicemanager/install.sh +++ b/pinot-servicemanager/install.sh @@ -15,6 +15,14 @@ do rm -rf temp.zip classes/META-INF/license done +# copy hypertrace plugins +# Download scripts and config for Kafka and ZooKeeper, but not for Connect +wget -qO temp.zip https://dl.bintray.com/hypertrace/maven/org/hypertrace/core/kafkastreams/framework/kafka-streams-serdes/0.1.11/kafka-streams-serdes-0.1.11.jar +# Pinot starts faster when classes are extracted +unzip -qo temp.zip -d classes +# remove license because sometimes a file and other times a directory +rm -rf temp.zip classes/META-INF/license + # TODO: try maven-dependency-plugin:unpack instead of wget # https://maven.apache.org/plugins/maven-dependency-plugin/examples/unpacking-artifacts.html # https://github.com/hypertrace/pinot/issues/16 @@ -27,7 +35,7 @@ appender.console.type=Console appender.console.name=STDOUT appender.console.layout.type=PatternLayout appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n -rootLogger.level=warn +rootLogger.level=info rootLogger.appenderRefs=stdout rootLogger.appenderRef.stdout.ref=STDOUT # Hush reflections similarly to https://github.com/apache/incubator-pinot/pull/5001 diff --git a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json index 6f2827b..9c20c75 100644 --- a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "lowlevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json index bbc7a77..b772d48 100644 --- a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "lowlevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json index fed9064..f693fec 100644 --- a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "lowlevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json index c410f78..7a06767 100644 --- a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "lowlevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json index ed3dab3..e8fcc7a 100644 --- a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "lowlevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java index 234bf99..9674274 100644 --- a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java +++ b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java @@ -27,18 +27,33 @@ public class GenericAvroMessageDecoder implements StreamMessageDecoder { @Override public void init(Map props, Set fieldsToRead, String topicName) throws Exception { - this.topicName = topicName; + try { + LOGGER.info(String.format("initializing GenericAvroMessageDecoder for topic:%s", topicName)); + this.topicName = topicName; - genericAvroSerde = new GenericAvroSerde(); - genericAvroSerde.configure(props, false); + genericAvroSerde = new GenericAvroSerde(); + LOGGER.info("GenericAvroMessageDecoder reached 1"); + genericAvroSerde.configure(props, false); - String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY); - // Backward compatibility to support Avro by default - if (recordExtractorClass == null) { - recordExtractorClass = AvroRecordExtractor.class.getName(); + LOGGER.info("GenericAvroMessageDecoder reached 2"); + + String recordExtractorClass = null; + if (props != null) { + recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY); + } + // Backward compatibility to support Avro by default + if (recordExtractorClass == null) { + recordExtractorClass = AvroRecordExtractor.class.getName(); + } + LOGGER.info("GenericAvroMessageDecoder reached 3"); + _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); + _avroRecordExtractor.init(fieldsToRead, null); + LOGGER.info(String + .format("Successfully initialized GenericAvroMessageDecoder for topic:%s", topicName)); + } catch (Exception e) { + LOGGER.info("Failed in init GenericAvroMessageDecoder", e); + throw e; } - _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); - _avroRecordExtractor.init(fieldsToRead, null); } /** @@ -48,9 +63,14 @@ public void init(Map props, Set fieldsToRead, String top */ @Override public GenericRow decode(byte[] payload, GenericRow destination) { - GenericData.Record record = (GenericData.Record) genericAvroSerde.deserializer() - .deserialize(this.topicName, payload); - return _avroRecordExtractor.extract(record, destination); + try { + GenericData.Record record = (GenericData.Record) genericAvroSerde.deserializer() + .deserialize(this.topicName, payload); + return _avroRecordExtractor.extract(record, destination); + } catch (Exception e) { + LOGGER.info("Failed in decode GenericAvroMessageDecoder", e); + throw new RuntimeException(e); + } } /** @@ -60,6 +80,11 @@ public GenericRow decode(byte[] payload, GenericRow destination) { */ @Override public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { - return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + try { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } catch (Exception e) { + LOGGER.info("Failed in decode1 GenericAvroMessageDecoder", e); + throw new RuntimeException(e); + } } } From ee75ae5a2d371a4f0636895e23019301d80203a9 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 30 Sep 2020 10:42:42 +0530 Subject: [PATCH 07/12] fixing the null pointer issue in configure --- pinot-servicemanager/build.gradle.kts | 8 +-- .../plugins/GenericAvroMessageDecoder.java | 59 +++++++------------ 2 files changed, 25 insertions(+), 42 deletions(-) diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 886dacd..98c97a5 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -6,14 +6,14 @@ plugins { hypertraceDocker { defaultImage { imageName.set("pinot-servicemanager") - dockerFile.set(file("Dockerfile")) + //dockerFile.set(file("Dockerfile")) } } dependencies { - implementation("org.apache.pinot:pinot-spi:0.5.0") - implementation("org.apache.pinot:pinot-avro-base:0.5.0") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") + compileOnly("org.apache.pinot:pinot-spi:0.5.0") + compileOnly("org.apache.pinot:pinot-avro-base:0.5.0") + compileOnly("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") } diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java index 9674274..5db7667 100644 --- a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java +++ b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java @@ -27,33 +27,26 @@ public class GenericAvroMessageDecoder implements StreamMessageDecoder { @Override public void init(Map props, Set fieldsToRead, String topicName) throws Exception { - try { - LOGGER.info(String.format("initializing GenericAvroMessageDecoder for topic:%s", topicName)); - this.topicName = topicName; + LOGGER.info( + String.format("initializing generic avro message based serde for topic:%s", topicName)); - genericAvroSerde = new GenericAvroSerde(); - LOGGER.info("GenericAvroMessageDecoder reached 1"); - genericAvroSerde.configure(props, false); + this.topicName = topicName; + genericAvroSerde = new GenericAvroSerde(); - LOGGER.info("GenericAvroMessageDecoder reached 2"); - - String recordExtractorClass = null; - if (props != null) { - recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY); - } - // Backward compatibility to support Avro by default - if (recordExtractorClass == null) { - recordExtractorClass = AvroRecordExtractor.class.getName(); - } - LOGGER.info("GenericAvroMessageDecoder reached 3"); - _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); - _avroRecordExtractor.init(fieldsToRead, null); - LOGGER.info(String - .format("Successfully initialized GenericAvroMessageDecoder for topic:%s", topicName)); - } catch (Exception e) { - LOGGER.info("Failed in init GenericAvroMessageDecoder", e); - throw e; + String recordExtractorClass = null; + if (props != null) { + recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY); + } + // Backward compatibility to support Avro by default + if (recordExtractorClass == null) { + recordExtractorClass = AvroRecordExtractor.class.getName(); } + _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); + _avroRecordExtractor.init(fieldsToRead, null); + + LOGGER.info(String + .format("Successfully initialized generic avro message based serde for topic:%s", + topicName)); } /** @@ -63,14 +56,9 @@ public void init(Map props, Set fieldsToRead, String top */ @Override public GenericRow decode(byte[] payload, GenericRow destination) { - try { - GenericData.Record record = (GenericData.Record) genericAvroSerde.deserializer() - .deserialize(this.topicName, payload); - return _avroRecordExtractor.extract(record, destination); - } catch (Exception e) { - LOGGER.info("Failed in decode GenericAvroMessageDecoder", e); - throw new RuntimeException(e); - } + GenericData.Record record = (GenericData.Record) genericAvroSerde.deserializer() + .deserialize(this.topicName, payload); + return _avroRecordExtractor.extract(record, destination); } /** @@ -80,11 +68,6 @@ public GenericRow decode(byte[] payload, GenericRow destination) { */ @Override public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { - try { - return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); - } catch (Exception e) { - LOGGER.info("Failed in decode1 GenericAvroMessageDecoder", e); - throw new RuntimeException(e); - } + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); } } From 4de6d47027d5256196783261636d73c712da7a52 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 30 Sep 2020 14:31:29 +0530 Subject: [PATCH 08/12] addressed review comments --- build.gradle.kts | 9 --------- pinot-servicemanager/Dockerfile | 6 +++--- pinot-servicemanager/build.gradle.kts | 20 +++++++++++++++++++- pinot-servicemanager/install.sh | 11 +++++------ 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index a963301..e69af34 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,13 +8,4 @@ hypertraceDocker { defaultImage { imageName.set("pinot") } -} - -subprojects { - pluginManager.withPlugin("java") { - configure { - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 - } - } } \ No newline at end of file diff --git a/pinot-servicemanager/Dockerfile b/pinot-servicemanager/Dockerfile index 53033b4..6a49f4b 100644 --- a/pinot-servicemanager/Dockerfile +++ b/pinot-servicemanager/Dockerfile @@ -12,6 +12,9 @@ ARG JITPACK_TAG=23797914c6fea24e34f3ebfca9a73e0fff72c2b7 USER root WORKDIR /install +# Copy customer hypertrace plugins +COPY build/libs plugins/ + COPY schemas/* schemas/ COPY install.sh /tmp/ @@ -32,9 +35,6 @@ WORKDIR /opt/pinot RUN adduser -g '' -h ${PWD} -D ${USER} USER ${USER} -# Copy generic avro serde -COPY --chown=${USER} build/classes/java/main/ ./classes/ - # Copy binaries and config we installed earlier COPY --from=install --chown=${USER} /install . diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 98c97a5..5a410cf 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -7,13 +7,31 @@ hypertraceDocker { defaultImage { imageName.set("pinot-servicemanager") //dockerFile.set(file("Dockerfile")) + tasks.named(buildTaskName) { + dependsOn("copyPlugins") + } } } +configure { + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 +} + +val plugins by configurations.creating + dependencies { compileOnly("org.apache.pinot:pinot-spi:0.5.0") compileOnly("org.apache.pinot:pinot-avro-base:0.5.0") - compileOnly("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") } +dependencies { + plugins("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") +} + +tasks.register("copyPlugins") { + from(plugins).include("kafka-streams-serdes*") + into("${buildDir}/libs") +} diff --git a/pinot-servicemanager/install.sh b/pinot-servicemanager/install.sh index d4fdab5..5c32246 100755 --- a/pinot-servicemanager/install.sh +++ b/pinot-servicemanager/install.sh @@ -16,13 +16,12 @@ do done # copy hypertrace plugins -# Download scripts and config for Kafka and ZooKeeper, but not for Connect -wget -qO temp.zip https://dl.bintray.com/hypertrace/maven/org/hypertrace/core/kafkastreams/framework/kafka-streams-serdes/0.1.11/kafka-streams-serdes-0.1.11.jar -# Pinot starts faster when classes are extracted -unzip -qo temp.zip -d classes -# remove license because sometimes a file and other times a directory -rm -rf temp.zip classes/META-INF/license +for JAR in plugins/* +do + unzip -qo $JAR -d classes +done +rm -rf plugins # TODO: try maven-dependency-plugin:unpack instead of wget # https://maven.apache.org/plugins/maven-dependency-plugin/examples/unpacking-artifacts.html # https://github.com/hypertrace/pinot/issues/16 From 8133e810cd0f21ebecb4ac29be1e15a5b805fe12 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 30 Sep 2020 16:27:24 +0530 Subject: [PATCH 09/12] using current project dependency --- pinot-servicemanager/Dockerfile | 2 +- pinot-servicemanager/build.gradle.kts | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pinot-servicemanager/Dockerfile b/pinot-servicemanager/Dockerfile index 6a49f4b..7261201 100644 --- a/pinot-servicemanager/Dockerfile +++ b/pinot-servicemanager/Dockerfile @@ -13,7 +13,7 @@ USER root WORKDIR /install # Copy customer hypertrace plugins -COPY build/libs plugins/ +COPY build/plugins plugins/ COPY schemas/* schemas/ diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 5a410cf..c387c50 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -27,11 +27,11 @@ dependencies { } dependencies { - plugins("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") + plugins(project(":${project.name}")) } -tasks.register("copyPlugins") { - from(plugins).include("kafka-streams-serdes*") - into("${buildDir}/libs") +tasks.register("copyPlugins") { + from(plugins).include("pinot-servicemanager*").include("kafka-streams-serdes*") + into("${buildDir}/plugins") } From fe3dedbe7e004c333d80c4e944298afd37c4ec70 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 30 Sep 2020 16:28:26 +0530 Subject: [PATCH 10/12] removed commented lines --- pinot-servicemanager/build.gradle.kts | 1 - 1 file changed, 1 deletion(-) diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index c387c50..e72711d 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -6,7 +6,6 @@ plugins { hypertraceDocker { defaultImage { imageName.set("pinot-servicemanager") - //dockerFile.set(file("Dockerfile")) tasks.named(buildTaskName) { dependsOn("copyPlugins") } From a7a42044b4dfe4285dfc19edfff1ef91d2f3ddd1 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 30 Sep 2020 16:55:08 +0530 Subject: [PATCH 11/12] addressed review comments --- pinot-servicemanager/build.gradle.kts | 8 ++++++-- pinot-servicemanager/install.sh | 2 +- .../schemas/backendEntityView-tableConfigFile.json | 2 +- .../schemas/rawServiceView-tableConfigFile.json | 2 +- .../schemas/rawTraceView-tableConfigFile.json | 2 +- .../schemas/serviceCallView-tableConfigFile.json | 2 +- .../schemas/spanEventView-tableConfigFile.json | 2 +- .../pinot/plugins/GenericAvroMessageDecoder.java | 4 ++-- 8 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index e72711d..9b1d360 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -22,7 +22,11 @@ val plugins by configurations.creating dependencies { compileOnly("org.apache.pinot:pinot-spi:0.5.0") compileOnly("org.apache.pinot:pinot-avro-base:0.5.0") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") + compileOnly("org.apache.kafka:kafka-streams:5.5.1-ccs") + compileOnly("org.apache.kafka:kafka-clients:5.5.1-ccs") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") { + isTransitive = false + } } dependencies { @@ -30,7 +34,7 @@ dependencies { } tasks.register("copyPlugins") { - from(plugins).include("pinot-servicemanager*").include("kafka-streams-serdes*") + from(plugins) into("${buildDir}/plugins") } diff --git a/pinot-servicemanager/install.sh b/pinot-servicemanager/install.sh index 5c32246..7d45af0 100755 --- a/pinot-servicemanager/install.sh +++ b/pinot-servicemanager/install.sh @@ -34,7 +34,7 @@ appender.console.type=Console appender.console.name=STDOUT appender.console.layout.type=PatternLayout appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n -rootLogger.level=info +rootLogger.level=warn rootLogger.appenderRefs=stdout rootLogger.appenderRef.stdout.ref=STDOUT # Hush reflections similarly to https://github.com/apache/incubator-pinot/pull/5001 diff --git a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json index 9c20c75..6f2827b 100644 --- a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "lowlevel", + "stream.kafka.consumer.type": "simple", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json index b772d48..bbc7a77 100644 --- a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "lowlevel", + "stream.kafka.consumer.type": "simple", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json index f693fec..fed9064 100644 --- a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "lowlevel", + "stream.kafka.consumer.type": "simple", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json index 7a06767..c410f78 100644 --- a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "lowlevel", + "stream.kafka.consumer.type": "simple", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json index e8fcc7a..ed3dab3 100644 --- a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "lowlevel", + "stream.kafka.consumer.type": "simple", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java index 5db7667..026d1e5 100644 --- a/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java +++ b/pinot-servicemanager/src/main/java/org/hypertrace/pinot/plugins/GenericAvroMessageDecoder.java @@ -19,8 +19,8 @@ public class GenericAvroMessageDecoder implements StreamMessageDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(GenericAvroMessageDecoder.class); - GenericAvroSerde genericAvroSerde; - String topicName; + private GenericAvroSerde genericAvroSerde; + private String topicName; private RecordExtractor _avroRecordExtractor; From 2b566e8b9574e7f6060fb7244486c02f8991a3a0 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 30 Sep 2020 17:12:05 +0530 Subject: [PATCH 12/12] addressed review comments --- pinot-servicemanager/Dockerfile | 2 +- pinot-servicemanager/build.gradle.kts | 1 + .../schemas/backendEntityView-tableConfigFile.json | 2 +- .../schemas/rawServiceView-tableConfigFile.json | 2 +- pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json | 2 +- .../schemas/serviceCallView-tableConfigFile.json | 2 +- pinot-servicemanager/schemas/spanEventView-tableConfigFile.json | 2 +- 7 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pinot-servicemanager/Dockerfile b/pinot-servicemanager/Dockerfile index 7261201..d602ba5 100644 --- a/pinot-servicemanager/Dockerfile +++ b/pinot-servicemanager/Dockerfile @@ -12,7 +12,7 @@ ARG JITPACK_TAG=23797914c6fea24e34f3ebfca9a73e0fff72c2b7 USER root WORKDIR /install -# Copy customer hypertrace plugins +# Copy hypertrace plugins COPY build/plugins plugins/ COPY schemas/* schemas/ diff --git a/pinot-servicemanager/build.gradle.kts b/pinot-servicemanager/build.gradle.kts index 9b1d360..db2d70d 100644 --- a/pinot-servicemanager/build.gradle.kts +++ b/pinot-servicemanager/build.gradle.kts @@ -25,6 +25,7 @@ dependencies { compileOnly("org.apache.kafka:kafka-streams:5.5.1-ccs") compileOnly("org.apache.kafka:kafka-clients:5.5.1-ccs") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-serdes:0.1.11") { + // disable the transitive dependencies and use them from pinot itself. isTransitive = false } } diff --git a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json index 6f2827b..84023f9 100644 --- a/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/backendEntityView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "LowLevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json index bbc7a77..4d5d4ab 100644 --- a/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawServiceView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "LowLevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json index fed9064..bf111ca 100644 --- a/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/rawTraceView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "LowLevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json index c410f78..0a9b9d0 100644 --- a/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/serviceCallView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "LowLevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000", diff --git a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json index ed3dab3..e793516 100644 --- a/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json +++ b/pinot-servicemanager/schemas/spanEventView-tableConfigFile.json @@ -17,7 +17,7 @@ "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", - "stream.kafka.consumer.type": "simple", + "stream.kafka.consumer.type": "LowLevel", "stream.kafka.decoder.class.name": "org.hypertrace.pinot.plugins.GenericAvroMessageDecoder", "stream.kafka.hlc.zk.connect.string": "zookeeper:2181", "realtime.segment.flush.threshold.size": "500000",