Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
[versions]
hypertrace-entity-service = "0.8.78"
hypertrace-entity-service = "0.8.89"
hypertrace-attribute-service = "0.14.38"
hypertrace-config-service = "0.1.54"
hypertrace-grpc-utils = "0.12.6"
hypertrace-serviceFramework = "0.1.62"
hypertrace-kafkaStreams = "0.4.3"
hypertrace-kafkaStreams = "0.4.4"
hypertrace-view-generator = "0.4.21"
grpc = "1.57.2"

[libraries]

hypertrace-entityService-client = { module = "org.hypertrace.entity.service:entity-service-client", version.ref = "hypertrace-entity-service" }
hypertrace-entityService-changeEventApi = { module = "org.hypertrace.entity.service:entity-service-change-event-api", version.ref = "hypertrace-entity-service" }
hypertrace-entityTypeService-rxClient = { module = "org.hypertrace.entity.service:entity-type-service-rx-client", version.ref = "hypertrace-entity-service" }
hypertrace-entityDataService-rxClient = { module = "org.hypertrace.entity.service:entity-data-service-rx-client", version.ref = "hypertrace-entity-service" }
hypertrace-entityService-api = { module = "org.hypertrace.entity.service:entity-service-api", version.ref = "hypertrace-entity-service" }
Expand All @@ -26,6 +27,7 @@ hypertrace-data-model = { module ="org.hypertrace.core.datamodel:data-model", ve
hypertrace-kafkaStreams-framework = { module = "org.hypertrace.core.kafkastreams.framework:kafka-streams-framework", version.ref = "hypertrace-kafkaStreams" }
hypertrace-kafkaStreams-avroPartitioners = { module = "org.hypertrace.core.kafkastreams.framework:avro-partitioners", version.ref = "hypertrace-kafkaStreams" }
hypertrace-kafkaStreams-weightedGroupPartitioners = { module = "org.hypertrace.core.kafkastreams.framework:weighted-group-partitioner", version.ref = "hypertrace-kafkaStreams" }
hypertrace-kafkaStreams-eventListener = { module = "org.hypertrace.core.kafkastreams.framework:kafka-event-listener", version.ref = "hypertrace-kafkaStreams" }
hypertrace-kafka-bom = { module = "org.hypertrace.core.kafkastreams.framework:kafka-bom", version.ref = "hypertrace-kafkaStreams" }
hypertrace-viewGenerator-framework = { module="org.hypertrace.core.viewgenerator:view-generator-framework", version.ref="hypertrace-view-generator" }
hypertrace-viewCreator-framework = { module="org.hypertrace.core.viewcreator:view-creator-framework", version.ref="hypertrace-view-generator" }
Expand All @@ -37,6 +39,7 @@ slf4j-api = { module = "org.slf4j:slf4j-api", version = "2.0.5" }
apache-log4j-slf4jImpl = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version = "2.20.0" }
apache-commons-lang3 = { module = "org.apache.commons:commons-lang3", version = "3.12.0" }
apache-kafka-kafkaStreamsTestUtils = { module = "org.apache.kafka:kafka-streams-test-utils", version = "7.2.1-ccs" }
kafka-protobufserde = { module = "io.confluent:kafka-streams-protobuf-serde"}

google-guava = { module = "com.google.guava:guava", version = "32.1.2-jre" }
google-protobuf-java = { module = "com.google.protobuf:protobuf-java", version = "3.23.3"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ dependencies {
implementation(project(":semantic-convention-utils"))
implementation(project(":hypertrace-trace-enricher:trace-reader"))

api(platform(libs.hypertrace.kafka.bom))
implementation(libs.hypertrace.data.model)
implementation(libs.hypertrace.entityService.client)
implementation(libs.hypertrace.entityService.changeEventApi)
implementation(libs.hypertrace.attributeService.client)
implementation(libs.hypertrace.serviceFramework.metrics)
implementation(libs.hypertrace.grpc.client.utils)
implementation(libs.hypertrace.spacesConfigServiceApi)
implementation(libs.hypertrace.grpc.context.utils)
implementation(libs.hypertrace.kafkaStreams.eventListener)

implementation(libs.apache.commons.lang3)
implementation(libs.slf4j.api)
implementation(libs.uadetector.resources)
implementation(libs.reactivex.rxjava3)
implementation(libs.google.guava)
implementation(libs.kafka.protobufserde)

testImplementation(libs.junit.jupiter)
testImplementation("org.mockito:mockito-core:4.7.0") // Upgrade to 5.1.0 causes tests to fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@
import static java.util.Collections.emptySet;

import com.typesafe.config.Config;
import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde;
import io.grpc.Channel;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient;
import org.hypertrace.core.attribute.service.client.config.AttributeServiceCachedClientConfig;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.grpcutils.client.GrpcChannelConfig;
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory;
import org.hypertrace.core.kafka.event.listener.KafkaLiveEventListener;
import org.hypertrace.entity.change.event.v1.EntityChangeEventKey;
import org.hypertrace.entity.change.event.v1.EntityChangeEventValue;
import org.hypertrace.entity.data.service.client.EdsCacheClient;
import org.hypertrace.entity.data.service.client.EntityDataServiceClient;
import org.hypertrace.entity.data.service.rxclient.EntityDataClient;
Expand All @@ -36,6 +46,14 @@ public class DefaultClientRegistry implements ClientRegistry {
private static final String CONFIG_SERVICE_PORT_KEY = "config.service.config.port";
private static final String ENTITY_SERVICE_HOST_KEY = "entity.service.config.host";
private static final String ENTITY_SERVICE_PORT_KEY = "entity.service.config.port";
private static final String ENTITY_CHANGE_EVENTS_CONFIG_KEY =
"entity.service.config.change.events.config";
private static final String ENTITY_CHANGE_EVENTS_CONSUMER_NAME_KEY =
"entity.service.config.change.events.config.consumer.name";
private static final String ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY =
"entity.service.config.change.events.config.enabled";
private static final String ENTITY_CHANGE_EVENTS_SCHEMA_REGISTRY_URL_KEY =
"entity.service.config.change.events.config.schema.registry.url";
private static final String TRACE_ENTITY_WRITE_THROTTLE_DURATION =
"trace.entity.write.throttle.duration";
private static final String TRACE_ENTITY_WRITE_EXCLUDED_ENTITY_TYPES =
Expand All @@ -51,6 +69,8 @@ public class DefaultClientRegistry implements ClientRegistry {
private final GrpcChannelRegistry grpcChannelRegistry;
private final UserAgentParser userAgentParser;
private final AttributeServiceCachedClient attributeClient;
private final Optional<KafkaLiveEventListener<EntityChangeEventKey, EntityChangeEventValue>>
entityChangeEventListener;

public DefaultClientRegistry(
Config config, GrpcChannelRegistry grpcChannelRegistry, Executor cacheLoaderExecutor) {
Expand Down Expand Up @@ -78,6 +98,8 @@ public DefaultClientRegistry(
new EntityDataServiceClient(this.entityServiceChannel),
EntityServiceClientConfig.from(config).getCacheConfig(),
cacheLoaderExecutor);
this.entityChangeEventListener =
getEntityChangeEventConsumer(config, edsCacheClient::updateBasedOnChangeEvent);
this.entityDataClient = EntityDataClient.builder(this.entityServiceChannel).build();
this.entityCache = new EntityCache(this.edsCacheClient, cacheLoaderExecutor);
this.entityAccessor =
Expand Down Expand Up @@ -156,6 +178,14 @@ public AttributeServiceCachedClient getAttributeClient() {

public void shutdown() {
this.grpcChannelRegistry.shutdown();
this.entityChangeEventListener.ifPresent(
listener -> {
try {
listener.close();
} catch (Exception e) {

}
});
}
Comment on lines 179 to 189
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method isn't called really, but added the close for sanity


protected Channel buildChannel(String host, int port) {
Expand All @@ -165,4 +195,44 @@ protected Channel buildChannel(String host, int port) {
protected Channel buildChannel(String host, int port, GrpcChannelConfig grpcChannelConfig) {
return this.grpcChannelRegistry.forPlaintextAddress(host, port, grpcChannelConfig);
}

private static Optional<KafkaLiveEventListener<EntityChangeEventKey, EntityChangeEventValue>>
getEntityChangeEventConsumer(
Config clientsConfig, BiConsumer<EntityChangeEventKey, EntityChangeEventValue> callback) {
if (clientsConfig.hasPath(ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY)
&& clientsConfig.getBoolean(ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY)) {
String consumerName = clientsConfig.getString(ENTITY_CHANGE_EVENTS_CONSUMER_NAME_KEY);
Map<String, Object> deserConfig =
Collections.singletonMap(
"schema.registry.url",
clientsConfig.getString(ENTITY_CHANGE_EVENTS_SCHEMA_REGISTRY_URL_KEY));
return Optional.of(
new KafkaLiveEventListener.Builder<EntityChangeEventKey, EntityChangeEventValue>()
.registerCallback(callback)
.build(
consumerName,
clientsConfig.getConfig(ENTITY_CHANGE_EVENTS_CONFIG_KEY),
getEntityChangeEventKeyDeser(deserConfig),
getEntityChangeEventValueDeser(deserConfig)));
}
return Optional.empty();
}

private static Deserializer<EntityChangeEventKey> getEntityChangeEventKeyDeser(
Map<String, Object> deserConfig) {
try (KafkaProtobufSerde<EntityChangeEventKey> entityChangeEventKeySerde =
new KafkaProtobufSerde<>(EntityChangeEventKey.class)) {
entityChangeEventKeySerde.configure(deserConfig, true);
return entityChangeEventKeySerde.deserializer();
}
}

private static Deserializer<EntityChangeEventValue> getEntityChangeEventValueDeser(
Map<String, Object> deserConfig) {
try (Serde<EntityChangeEventValue> entityChangeEventValueSerde =
new KafkaProtobufSerde<>(EntityChangeEventValue.class)) {
entityChangeEventValueSerde.configure(deserConfig, false);
return entityChangeEventValueSerde.deserializer();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ enricher {
host = ${?ENTITY_SERVICE_HOST_CONFIG}
port = 50061
port = ${?ENTITY_SERVICE_PORT_CONFIG}
change.events.config {
enabled = false
topic.name = "entity-change-events"
schema.registry.url = ${kafka.streams.config.schema.registry.url}
consumer.name = "entity-change-events-consumer"
}
}
attribute.service.config = {
host = localhost
Expand Down