diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 638e0b830..8d3d7286f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,16 +1,17 @@ [versions] -hypertrace-entity-service = "0.8.78" +hypertrace-entity-service = "0.8.89" hypertrace-attribute-service = "0.14.38" hypertrace-config-service = "0.1.54" hypertrace-grpc-utils = "0.12.6" hypertrace-serviceFramework = "0.1.62" -hypertrace-kafkaStreams = "0.4.3" +hypertrace-kafkaStreams = "0.4.4" hypertrace-view-generator = "0.4.21" grpc = "1.57.2" [libraries] hypertrace-entityService-client = { module = "org.hypertrace.entity.service:entity-service-client", version.ref = "hypertrace-entity-service" } +hypertrace-entityService-changeEventApi = { module = "org.hypertrace.entity.service:entity-service-change-event-api", version.ref = "hypertrace-entity-service" } hypertrace-entityTypeService-rxClient = { module = "org.hypertrace.entity.service:entity-type-service-rx-client", version.ref = "hypertrace-entity-service" } hypertrace-entityDataService-rxClient = { module = "org.hypertrace.entity.service:entity-data-service-rx-client", version.ref = "hypertrace-entity-service" } hypertrace-entityService-api = { module = "org.hypertrace.entity.service:entity-service-api", version.ref = "hypertrace-entity-service" } @@ -26,6 +27,7 @@ hypertrace-data-model = { module ="org.hypertrace.core.datamodel:data-model", ve hypertrace-kafkaStreams-framework = { module = "org.hypertrace.core.kafkastreams.framework:kafka-streams-framework", version.ref = "hypertrace-kafkaStreams" } hypertrace-kafkaStreams-avroPartitioners = { module = "org.hypertrace.core.kafkastreams.framework:avro-partitioners", version.ref = "hypertrace-kafkaStreams" } hypertrace-kafkaStreams-weightedGroupPartitioners = { module = "org.hypertrace.core.kafkastreams.framework:weighted-group-partitioner", version.ref = "hypertrace-kafkaStreams" } +hypertrace-kafkaStreams-eventListener = { module = "org.hypertrace.core.kafkastreams.framework:kafka-event-listener", version.ref = "hypertrace-kafkaStreams" } hypertrace-kafka-bom = { module = "org.hypertrace.core.kafkastreams.framework:kafka-bom", version.ref = "hypertrace-kafkaStreams" } hypertrace-viewGenerator-framework = { module="org.hypertrace.core.viewgenerator:view-generator-framework", version.ref="hypertrace-view-generator" } hypertrace-viewCreator-framework = { module="org.hypertrace.core.viewcreator:view-creator-framework", version.ref="hypertrace-view-generator" } @@ -37,6 +39,7 @@ slf4j-api = { module = "org.slf4j:slf4j-api", version = "2.0.5" } apache-log4j-slf4jImpl = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version = "2.20.0" } apache-commons-lang3 = { module = "org.apache.commons:commons-lang3", version = "3.12.0" } apache-kafka-kafkaStreamsTestUtils = { module = "org.apache.kafka:kafka-streams-test-utils", version = "7.2.1-ccs" } +kafka-protobufserde = { module = "io.confluent:kafka-streams-protobuf-serde"} google-guava = { module = "com.google.guava:guava", version = "32.1.2-jre" } google-protobuf-java = { module = "com.google.protobuf:protobuf-java", version = "3.23.3"} diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts index 11de7c8f6..ef84c1b39 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts @@ -16,19 +16,23 @@ dependencies { implementation(project(":semantic-convention-utils")) implementation(project(":hypertrace-trace-enricher:trace-reader")) + api(platform(libs.hypertrace.kafka.bom)) implementation(libs.hypertrace.data.model) implementation(libs.hypertrace.entityService.client) + implementation(libs.hypertrace.entityService.changeEventApi) implementation(libs.hypertrace.attributeService.client) implementation(libs.hypertrace.serviceFramework.metrics) implementation(libs.hypertrace.grpc.client.utils) implementation(libs.hypertrace.spacesConfigServiceApi) implementation(libs.hypertrace.grpc.context.utils) + implementation(libs.hypertrace.kafkaStreams.eventListener) implementation(libs.apache.commons.lang3) implementation(libs.slf4j.api) implementation(libs.uadetector.resources) implementation(libs.reactivex.rxjava3) implementation(libs.google.guava) + implementation(libs.kafka.protobufserde) testImplementation(libs.junit.jupiter) testImplementation("org.mockito:mockito-core:4.7.0") // Upgrade to 5.1.0 causes tests to fail diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java index 48a61dc8e..39be7796c 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java @@ -3,10 +3,17 @@ import static java.util.Collections.emptySet; import com.typesafe.config.Config; +import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde; import io.grpc.Channel; import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.client.config.AttributeServiceCachedClientConfig; import org.hypertrace.core.datamodel.Event; @@ -14,6 +21,9 @@ import org.hypertrace.core.grpcutils.client.GrpcChannelConfig; import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry; import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory; +import org.hypertrace.core.kafka.event.listener.KafkaLiveEventListener; +import org.hypertrace.entity.change.event.v1.EntityChangeEventKey; +import org.hypertrace.entity.change.event.v1.EntityChangeEventValue; import org.hypertrace.entity.data.service.client.EdsCacheClient; import org.hypertrace.entity.data.service.client.EntityDataServiceClient; import org.hypertrace.entity.data.service.rxclient.EntityDataClient; @@ -36,6 +46,14 @@ public class DefaultClientRegistry implements ClientRegistry { private static final String CONFIG_SERVICE_PORT_KEY = "config.service.config.port"; private static final String ENTITY_SERVICE_HOST_KEY = "entity.service.config.host"; private static final String ENTITY_SERVICE_PORT_KEY = "entity.service.config.port"; + private static final String ENTITY_CHANGE_EVENTS_CONFIG_KEY = + "entity.service.config.change.events.config"; + private static final String ENTITY_CHANGE_EVENTS_CONSUMER_NAME_KEY = + "entity.service.config.change.events.config.consumer.name"; + private static final String ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY = + "entity.service.config.change.events.config.enabled"; + private static final String ENTITY_CHANGE_EVENTS_SCHEMA_REGISTRY_URL_KEY = + "entity.service.config.change.events.config.schema.registry.url"; private static final String TRACE_ENTITY_WRITE_THROTTLE_DURATION = "trace.entity.write.throttle.duration"; private static final String TRACE_ENTITY_WRITE_EXCLUDED_ENTITY_TYPES = @@ -51,6 +69,8 @@ public class DefaultClientRegistry implements ClientRegistry { private final GrpcChannelRegistry grpcChannelRegistry; private final UserAgentParser userAgentParser; private final AttributeServiceCachedClient attributeClient; + private final Optional> + entityChangeEventListener; public DefaultClientRegistry( Config config, GrpcChannelRegistry grpcChannelRegistry, Executor cacheLoaderExecutor) { @@ -78,6 +98,8 @@ public DefaultClientRegistry( new EntityDataServiceClient(this.entityServiceChannel), EntityServiceClientConfig.from(config).getCacheConfig(), cacheLoaderExecutor); + this.entityChangeEventListener = + getEntityChangeEventConsumer(config, edsCacheClient::updateBasedOnChangeEvent); this.entityDataClient = EntityDataClient.builder(this.entityServiceChannel).build(); this.entityCache = new EntityCache(this.edsCacheClient, cacheLoaderExecutor); this.entityAccessor = @@ -156,6 +178,14 @@ public AttributeServiceCachedClient getAttributeClient() { public void shutdown() { this.grpcChannelRegistry.shutdown(); + this.entityChangeEventListener.ifPresent( + listener -> { + try { + listener.close(); + } catch (Exception e) { + + } + }); } protected Channel buildChannel(String host, int port) { @@ -165,4 +195,44 @@ protected Channel buildChannel(String host, int port) { protected Channel buildChannel(String host, int port, GrpcChannelConfig grpcChannelConfig) { return this.grpcChannelRegistry.forPlaintextAddress(host, port, grpcChannelConfig); } + + private static Optional> + getEntityChangeEventConsumer( + Config clientsConfig, BiConsumer callback) { + if (clientsConfig.hasPath(ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY) + && clientsConfig.getBoolean(ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY)) { + String consumerName = clientsConfig.getString(ENTITY_CHANGE_EVENTS_CONSUMER_NAME_KEY); + Map deserConfig = + Collections.singletonMap( + "schema.registry.url", + clientsConfig.getString(ENTITY_CHANGE_EVENTS_SCHEMA_REGISTRY_URL_KEY)); + return Optional.of( + new KafkaLiveEventListener.Builder() + .registerCallback(callback) + .build( + consumerName, + clientsConfig.getConfig(ENTITY_CHANGE_EVENTS_CONFIG_KEY), + getEntityChangeEventKeyDeser(deserConfig), + getEntityChangeEventValueDeser(deserConfig))); + } + return Optional.empty(); + } + + private static Deserializer getEntityChangeEventKeyDeser( + Map deserConfig) { + try (KafkaProtobufSerde entityChangeEventKeySerde = + new KafkaProtobufSerde<>(EntityChangeEventKey.class)) { + entityChangeEventKeySerde.configure(deserConfig, true); + return entityChangeEventKeySerde.deserializer(); + } + } + + private static Deserializer getEntityChangeEventValueDeser( + Map deserConfig) { + try (Serde entityChangeEventValueSerde = + new KafkaProtobufSerde<>(EntityChangeEventValue.class)) { + entityChangeEventValueSerde.configure(deserConfig, false); + return entityChangeEventValueSerde.deserializer(); + } + } } diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf index 4e604c805..91660a883 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf @@ -32,6 +32,12 @@ enricher { host = ${?ENTITY_SERVICE_HOST_CONFIG} port = 50061 port = ${?ENTITY_SERVICE_PORT_CONFIG} + change.events.config { + enabled = false + topic.name = "entity-change-events" + schema.registry.url = ${kafka.streams.config.schema.registry.url} + consumer.name = "entity-change-events-consumer" + } } attribute.service.config = { host = localhost