Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[versions]
hypertrace-entity-service = "0.8.78"
hypertrace-attribute-service = "0.14.38"
hypertrace-config-service = "0.1.54"
hypertrace-grpc-utils = "0.12.6"
hypertrace-serviceFramework = "0.1.62"
Expand All @@ -10,7 +11,12 @@ grpc = "1.57.2"
[libraries]

hypertrace-entityService-client = { module = "org.hypertrace.entity.service:entity-service-client", version.ref = "hypertrace-entity-service" }
hypertrace-entityTypeService-rxClient = { module = "org.hypertrace.entity.service:entity-type-service-rx-client", version.ref = "hypertrace-entity-service" }
hypertrace-entityDataService-rxClient = { module = "org.hypertrace.entity.service:entity-data-service-rx-client", version.ref = "hypertrace-entity-service" }
hypertrace-entityService-api = { module = "org.hypertrace.entity.service:entity-service-api", version.ref = "hypertrace-entity-service" }
hypertrace-attributeService-client = { module = "org.hypertrace.core.attribute.service:attribute-service-client", version.ref = "hypertrace-attribute-service" }
hypertrace-attributeService-attributeProjectionRegistry = { module = "org.hypertrace.core.attribute.service:attribute-projection-registry", version.ref = "hypertrace-attribute-service" }
hypertrace-attributeService-api = { module = "org.hypertrace.core.attribute.service:attribute-service-api", version.ref = "hypertrace-attribute-service" }
hypertrace-grpc-context-utils = { module = "org.hypertrace.core.grpcutils:grpc-context-utils", version.ref = "hypertrace-grpc-utils" }
hypertrace-grpc-client-utils ={ module = "org.hypertrace.core.grpcutils:grpc-client-utils", version.ref = "hypertrace-grpc-utils" }
hypertrace-grpc-client-rxUtils = { module = "org.hypertrace.core.grpcutils:grpc-client-rx-utils", version.ref = "hypertrace-grpc-utils"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {

implementation(libs.hypertrace.data.model)
implementation(libs.hypertrace.entityService.client)
implementation(libs.hypertrace.attributeService.client)
implementation(libs.hypertrace.serviceFramework.metrics)
implementation(libs.hypertrace.grpc.client.utils)
implementation(libs.hypertrace.spacesConfigServiceApi)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.hypertrace.traceenricher.enrichment.clients;

import io.grpc.Channel;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
Expand All @@ -17,8 +17,6 @@ public interface ClientRegistry {

GrpcChannelRegistry getChannelRegistry();

Channel getAttributeServiceChannel();

Channel getEntityServiceChannel();

Channel getConfigServiceChannel();
Expand All @@ -35,7 +33,7 @@ public interface ClientRegistry {

EntityCache getEntityCache();

CachingAttributeClient getCachingAttributeClient();

UserAgentParser getUserAgentParser();

AttributeServiceCachedClient getAttributeClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import com.typesafe.config.Config;
import io.grpc.Channel;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Set;
import java.util.concurrent.Executor;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient;
import org.hypertrace.core.attribute.service.client.config.AttributeServiceCachedClientConfig;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.grpcutils.client.GrpcChannelConfig;
Expand All @@ -29,8 +29,9 @@
import org.hypertrace.traceenricher.util.UserAgentParser;

public class DefaultClientRegistry implements ClientRegistry {
private static final String ATTRIBUTE_SERVICE_HOST_KEY = "attribute.service.config.host";
private static final String ATTRIBUTE_SERVICE_PORT_KEY = "attribute.service.config.port";
private static final String ATTRIBUTE_SERVICE_CONFIG_KEY = "attribute.service.config";
private static final String ATTRIBUTE_SERVICE_HOST_KEY = ATTRIBUTE_SERVICE_CONFIG_KEY + ".host";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are host/port still used directly?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're used to build channel

this.attributeServiceChannel =
        this.buildChannel(
            config.getString(ATTRIBUTE_SERVICE_HOST_KEY),
            config.getInt(ATTRIBUTE_SERVICE_PORT_KEY));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I thought that went away since you removed the getter and are passing in the raw config to the client. I wonder if passing in the registry would be better so it can read the channel config in the same way? Oh well, doesn't really matter - only thing I would suggest in this case would be to remove the instance variable for channel since now it's only used locally in creating the cached client.

private static final String ATTRIBUTE_SERVICE_PORT_KEY = ATTRIBUTE_SERVICE_CONFIG_KEY + ".port";
private static final String CONFIG_SERVICE_HOST_KEY = "config.service.config.host";
private static final String CONFIG_SERVICE_PORT_KEY = "config.service.config.port";
private static final String ENTITY_SERVICE_HOST_KEY = "entity.service.config.host";
Expand All @@ -39,25 +40,23 @@ public class DefaultClientRegistry implements ClientRegistry {
"trace.entity.write.throttle.duration";
private static final String TRACE_ENTITY_WRITE_EXCLUDED_ENTITY_TYPES =
"trace.entity.write.excluded.entity.types";

private static final String USER_AGENT_PARSER_CONFIG_KEY = "useragent.parser";
private final Channel attributeServiceChannel;
private final Channel configServiceChannel;
private final Channel entityServiceChannel;
private final EdsCacheClient edsCacheClient;
private final EntityDataClient entityDataClient;
private final CachingAttributeClient cachingAttributeClient;
private final EntityCache entityCache;
private final TraceEntityAccessor entityAccessor;
private final TraceAttributeReader<StructuredTrace, Event> attributeReader;
private final GrpcChannelRegistry grpcChannelRegistry;
private final UserAgentParser userAgentParser;
private final AttributeServiceCachedClient attributeClient;

public DefaultClientRegistry(
Config config, GrpcChannelRegistry grpcChannelRegistry, Executor cacheLoaderExecutor) {
this.grpcChannelRegistry = grpcChannelRegistry;

this.attributeServiceChannel =
Channel attributeServiceChannel =
this.buildChannel(
config.getString(ATTRIBUTE_SERVICE_HOST_KEY),
config.getInt(ATTRIBUTE_SERVICE_PORT_KEY));
Expand All @@ -68,13 +67,12 @@ public DefaultClientRegistry(
this.buildChannel(
config.getString(ENTITY_SERVICE_HOST_KEY), config.getInt(ENTITY_SERVICE_PORT_KEY));

this.cachingAttributeClient =
CachingAttributeClient.builder(this.attributeServiceChannel)
.withMaximumCacheContexts(100) // 100 Tenants
.withCacheExpiration(Duration.of(15, ChronoUnit.MINUTES))
.build();

this.attributeReader = TraceAttributeReaderFactory.build(this.cachingAttributeClient);
this.attributeClient =
new AttributeServiceCachedClient(
attributeServiceChannel,
AttributeServiceCachedClientConfig.from(
config.getConfig(ATTRIBUTE_SERVICE_CONFIG_KEY)));
this.attributeReader = TraceAttributeReaderFactory.build(attributeClient);
this.edsCacheClient =
new EdsCacheClient(
new EntityDataServiceClient(this.entityServiceChannel),
Expand All @@ -86,7 +84,7 @@ public DefaultClientRegistry(
new TraceEntityAccessorBuilder(
EntityTypeClient.builder(this.entityServiceChannel).build(),
this.entityDataClient,
this.cachingAttributeClient)
attributeClient)
.withEntityWriteThrottleDuration(
config.hasPath(TRACE_ENTITY_WRITE_THROTTLE_DURATION)
? config.getDuration(TRACE_ENTITY_WRITE_THROTTLE_DURATION)
Expand All @@ -104,11 +102,6 @@ public GrpcChannelRegistry getChannelRegistry() {
return grpcChannelRegistry;
}

@Override
public Channel getAttributeServiceChannel() {
return this.attributeServiceChannel;
}

@Override
public Channel getEntityServiceChannel() {
return this.entityServiceChannel;
Expand Down Expand Up @@ -152,13 +145,13 @@ public EntityCache getEntityCache() {
}

@Override
public CachingAttributeClient getCachingAttributeClient() {
return this.cachingAttributeClient;
public UserAgentParser getUserAgentParser() {
return this.userAgentParser;
}

@Override
public UserAgentParser getUserAgentParser() {
return this.userAgentParser;
public AttributeServiceCachedClient getAttributeClient() {
return attributeClient;
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ private List<String> calculateSpacesForAttribute(
span,
attributeValueRuleData.getAttributeScope(),
attributeValueRuleData.getAttributeKey())
.mapOptional(ValueCoercer::convertToString)
.flatMap(ValueCoercer::convertToString)
.filter(string -> !string.isEmpty())
.map(List::of)
.onErrorComplete()
.defaultIfEmpty(Collections.emptyList())
.blockingGet();
.orElse(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

import io.reactivex.rxjava3.core.Single;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.hypertrace.core.attribute.service.v1.LiteralValue;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
Expand Down Expand Up @@ -48,7 +47,7 @@ void beforeEach() {
@Test
void testConvertsStringValue() {
when(this.mockAttributeReader.getSpanValue(this.mockTrace, this.mockSpan, MOCK_SCOPE, MOCK_KEY))
.thenReturn(Single.just(LiteralValue.newBuilder().setStringValue("attr-value").build()));
.thenReturn(Optional.of(LiteralValue.newBuilder().setStringValue("attr-value").build()));
assertEquals(
List.of("attr-value"),
this.ruleEvaluator.calculateSpacesForRule(this.mockTrace, this.mockSpan, this.rule));
Expand All @@ -57,7 +56,7 @@ void testConvertsStringValue() {
@Test
void testConvertsIntValue() {
when(this.mockAttributeReader.getSpanValue(this.mockTrace, this.mockSpan, MOCK_SCOPE, MOCK_KEY))
.thenReturn(Single.just(LiteralValue.newBuilder().setIntValue(12).build()));
.thenReturn(Optional.of(LiteralValue.newBuilder().setIntValue(12).build()));
assertEquals(
List.of("12"),
this.ruleEvaluator.calculateSpacesForRule(this.mockTrace, this.mockSpan, this.rule));
Expand All @@ -66,7 +65,7 @@ void testConvertsIntValue() {
@Test
void testConvertsNoValue() {
when(this.mockAttributeReader.getSpanValue(this.mockTrace, this.mockSpan, MOCK_SCOPE, MOCK_KEY))
.thenReturn(Single.error(new NoSuchElementException("no value")));
.thenReturn(Optional.empty());
assertEquals(
List.of(),
this.ruleEvaluator.calculateSpacesForRule(this.mockTrace, this.mockSpan, this.rule));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ enricher {
host = ${?ATTRIBUTE_SERVICE_HOST_CONFIG}
port = 9012
port = ${?ATTRIBUTE_SERVICE_PORT_CONFIG}
cache = {
deadline = 30s
maxSize = 1000
refreshAfterWriteDuration = 15m
expireAfterAccessDuration = 1h
executorThreads = 1
}
}
config.service.config = {
host = localhost
Expand Down
11 changes: 6 additions & 5 deletions hypertrace-trace-enricher/trace-reader/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ plugins {
}

dependencies {
api("org.hypertrace.core.attribute.service:attribute-service-api:0.14.26")
api("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.14.26")
api("org.hypertrace.entity.service:entity-type-service-rx-client:0.8.75")
api("org.hypertrace.entity.service:entity-data-service-rx-client:0.8.75")
api(libs.hypertrace.attributeService.api)
api(libs.hypertrace.entityTypeService.rxClient)
api(libs.hypertrace.entityDataService.rxClient)
api(libs.hypertrace.data.model)
implementation("org.hypertrace.core.attribute.service:attribute-projection-registry:0.14.26")

implementation(libs.hypertrace.attributeService.client)
implementation(libs.hypertrace.attributeService.attributeProjectionRegistry)
implementation(libs.hypertrace.grpc.client.rxUtils)
implementation(libs.hypertrace.grpc.context.utils)
implementation(libs.hypertrace.grpc.client.utils)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.hypertrace.trace.accessor.entities;

import io.reactivex.rxjava3.core.Maybe;
import java.util.Optional;
import org.hypertrace.core.attribute.service.v1.LiteralValue;
import org.hypertrace.entity.data.service.v1.AttributeValue;
import org.hypertrace.entity.data.service.v1.Value;
Expand All @@ -10,25 +10,26 @@
interface AttributeValueConverter {
Logger LOG = LoggerFactory.getLogger(AttributeValueConverter.class);

static Maybe<AttributeValue> convertToAttributeValue(LiteralValue literalValue) {
static Optional<AttributeValue> convertToAttributeValue(LiteralValue literalValue) {
switch (literalValue.getValueCase()) {
case STRING_VALUE:
return attributeValueMaybe(Value.newBuilder().setString(literalValue.getStringValue()));
return attributeValueOptional(Value.newBuilder().setString(literalValue.getStringValue()));
case BOOLEAN_VALUE:
return attributeValueMaybe(Value.newBuilder().setBoolean(literalValue.getBooleanValue()));
return attributeValueOptional(
Value.newBuilder().setBoolean(literalValue.getBooleanValue()));
case FLOAT_VALUE:
return attributeValueMaybe(Value.newBuilder().setDouble(literalValue.getFloatValue()));
return attributeValueOptional(Value.newBuilder().setDouble(literalValue.getFloatValue()));
case INT_VALUE:
return attributeValueMaybe(Value.newBuilder().setLong(literalValue.getIntValue()));
return attributeValueOptional(Value.newBuilder().setLong(literalValue.getIntValue()));
case VALUE_NOT_SET:
return Maybe.empty();
return Optional.empty();
default:
LOG.error("Unexpected literal value case: " + literalValue.getValueCase());
return Maybe.empty();
return Optional.empty();
}
}

private static Maybe<AttributeValue> attributeValueMaybe(Value.Builder value) {
return Maybe.just(AttributeValue.newBuilder().setValue(value).build());
private static Optional<AttributeValue> attributeValueOptional(Value.Builder value) {
return Optional.of(AttributeValue.newBuilder().setValue(value).build());
}
}
Loading