diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ab859f541..638e0b830 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,5 +1,6 @@ [versions] hypertrace-entity-service = "0.8.78" +hypertrace-attribute-service = "0.14.38" hypertrace-config-service = "0.1.54" hypertrace-grpc-utils = "0.12.6" hypertrace-serviceFramework = "0.1.62" @@ -10,7 +11,12 @@ grpc = "1.57.2" [libraries] hypertrace-entityService-client = { module = "org.hypertrace.entity.service:entity-service-client", version.ref = "hypertrace-entity-service" } +hypertrace-entityTypeService-rxClient = { module = "org.hypertrace.entity.service:entity-type-service-rx-client", version.ref = "hypertrace-entity-service" } +hypertrace-entityDataService-rxClient = { module = "org.hypertrace.entity.service:entity-data-service-rx-client", version.ref = "hypertrace-entity-service" } hypertrace-entityService-api = { module = "org.hypertrace.entity.service:entity-service-api", version.ref = "hypertrace-entity-service" } +hypertrace-attributeService-client = { module = "org.hypertrace.core.attribute.service:attribute-service-client", version.ref = "hypertrace-attribute-service" } +hypertrace-attributeService-attributeProjectionRegistry = { module = "org.hypertrace.core.attribute.service:attribute-projection-registry", version.ref = "hypertrace-attribute-service" } +hypertrace-attributeService-api = { module = "org.hypertrace.core.attribute.service:attribute-service-api", version.ref = "hypertrace-attribute-service" } hypertrace-grpc-context-utils = { module = "org.hypertrace.core.grpcutils:grpc-context-utils", version.ref = "hypertrace-grpc-utils" } hypertrace-grpc-client-utils ={ module = "org.hypertrace.core.grpcutils:grpc-client-utils", version.ref = "hypertrace-grpc-utils" } hypertrace-grpc-client-rxUtils = { module = "org.hypertrace.core.grpcutils:grpc-client-rx-utils", version.ref = "hypertrace-grpc-utils"} diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts index 7616f84e0..11de7c8f6 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts @@ -18,6 +18,7 @@ dependencies { implementation(libs.hypertrace.data.model) implementation(libs.hypertrace.entityService.client) + implementation(libs.hypertrace.attributeService.client) implementation(libs.hypertrace.serviceFramework.metrics) implementation(libs.hypertrace.grpc.client.utils) implementation(libs.hypertrace.spacesConfigServiceApi) diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/ClientRegistry.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/ClientRegistry.java index c05deb472..5754633c3 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/ClientRegistry.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/ClientRegistry.java @@ -1,7 +1,7 @@ package org.hypertrace.traceenricher.enrichment.clients; import io.grpc.Channel; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry; @@ -17,8 +17,6 @@ public interface ClientRegistry { GrpcChannelRegistry getChannelRegistry(); - Channel getAttributeServiceChannel(); - Channel getEntityServiceChannel(); Channel getConfigServiceChannel(); @@ -35,7 +33,7 @@ public interface ClientRegistry { EntityCache getEntityCache(); - CachingAttributeClient getCachingAttributeClient(); - UserAgentParser getUserAgentParser(); + + AttributeServiceCachedClient getAttributeClient(); } diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java index 7b6f91e46..48a61dc8e 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java @@ -5,10 +5,10 @@ import com.typesafe.config.Config; import io.grpc.Channel; import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.Set; import java.util.concurrent.Executor; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; +import org.hypertrace.core.attribute.service.client.config.AttributeServiceCachedClientConfig; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; import org.hypertrace.core.grpcutils.client.GrpcChannelConfig; @@ -29,8 +29,9 @@ import org.hypertrace.traceenricher.util.UserAgentParser; public class DefaultClientRegistry implements ClientRegistry { - private static final String ATTRIBUTE_SERVICE_HOST_KEY = "attribute.service.config.host"; - private static final String ATTRIBUTE_SERVICE_PORT_KEY = "attribute.service.config.port"; + private static final String ATTRIBUTE_SERVICE_CONFIG_KEY = "attribute.service.config"; + private static final String ATTRIBUTE_SERVICE_HOST_KEY = ATTRIBUTE_SERVICE_CONFIG_KEY + ".host"; + private static final String ATTRIBUTE_SERVICE_PORT_KEY = ATTRIBUTE_SERVICE_CONFIG_KEY + ".port"; private static final String CONFIG_SERVICE_HOST_KEY = "config.service.config.host"; private static final String CONFIG_SERVICE_PORT_KEY = "config.service.config.port"; private static final String ENTITY_SERVICE_HOST_KEY = "entity.service.config.host"; @@ -39,25 +40,23 @@ public class DefaultClientRegistry implements ClientRegistry { "trace.entity.write.throttle.duration"; private static final String TRACE_ENTITY_WRITE_EXCLUDED_ENTITY_TYPES = "trace.entity.write.excluded.entity.types"; - private static final String USER_AGENT_PARSER_CONFIG_KEY = "useragent.parser"; - private final Channel attributeServiceChannel; private final Channel configServiceChannel; private final Channel entityServiceChannel; private final EdsCacheClient edsCacheClient; private final EntityDataClient entityDataClient; - private final CachingAttributeClient cachingAttributeClient; private final EntityCache entityCache; private final TraceEntityAccessor entityAccessor; private final TraceAttributeReader attributeReader; private final GrpcChannelRegistry grpcChannelRegistry; private final UserAgentParser userAgentParser; + private final AttributeServiceCachedClient attributeClient; public DefaultClientRegistry( Config config, GrpcChannelRegistry grpcChannelRegistry, Executor cacheLoaderExecutor) { this.grpcChannelRegistry = grpcChannelRegistry; - this.attributeServiceChannel = + Channel attributeServiceChannel = this.buildChannel( config.getString(ATTRIBUTE_SERVICE_HOST_KEY), config.getInt(ATTRIBUTE_SERVICE_PORT_KEY)); @@ -68,13 +67,12 @@ public DefaultClientRegistry( this.buildChannel( config.getString(ENTITY_SERVICE_HOST_KEY), config.getInt(ENTITY_SERVICE_PORT_KEY)); - this.cachingAttributeClient = - CachingAttributeClient.builder(this.attributeServiceChannel) - .withMaximumCacheContexts(100) // 100 Tenants - .withCacheExpiration(Duration.of(15, ChronoUnit.MINUTES)) - .build(); - - this.attributeReader = TraceAttributeReaderFactory.build(this.cachingAttributeClient); + this.attributeClient = + new AttributeServiceCachedClient( + attributeServiceChannel, + AttributeServiceCachedClientConfig.from( + config.getConfig(ATTRIBUTE_SERVICE_CONFIG_KEY))); + this.attributeReader = TraceAttributeReaderFactory.build(attributeClient); this.edsCacheClient = new EdsCacheClient( new EntityDataServiceClient(this.entityServiceChannel), @@ -86,7 +84,7 @@ public DefaultClientRegistry( new TraceEntityAccessorBuilder( EntityTypeClient.builder(this.entityServiceChannel).build(), this.entityDataClient, - this.cachingAttributeClient) + attributeClient) .withEntityWriteThrottleDuration( config.hasPath(TRACE_ENTITY_WRITE_THROTTLE_DURATION) ? config.getDuration(TRACE_ENTITY_WRITE_THROTTLE_DURATION) @@ -104,11 +102,6 @@ public GrpcChannelRegistry getChannelRegistry() { return grpcChannelRegistry; } - @Override - public Channel getAttributeServiceChannel() { - return this.attributeServiceChannel; - } - @Override public Channel getEntityServiceChannel() { return this.entityServiceChannel; @@ -152,13 +145,13 @@ public EntityCache getEntityCache() { } @Override - public CachingAttributeClient getCachingAttributeClient() { - return this.cachingAttributeClient; + public UserAgentParser getUserAgentParser() { + return this.userAgentParser; } @Override - public UserAgentParser getUserAgentParser() { - return this.userAgentParser; + public AttributeServiceCachedClient getAttributeClient() { + return attributeClient; } public void shutdown() { diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluator.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluator.java index b066abf21..215cf46ea 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluator.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluator.java @@ -36,11 +36,9 @@ private List calculateSpacesForAttribute( span, attributeValueRuleData.getAttributeScope(), attributeValueRuleData.getAttributeKey()) - .mapOptional(ValueCoercer::convertToString) + .flatMap(ValueCoercer::convertToString) .filter(string -> !string.isEmpty()) .map(List::of) - .onErrorComplete() - .defaultIfEmpty(Collections.emptyList()) - .blockingGet(); + .orElse(Collections.emptyList()); } } diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluatorTest.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluatorTest.java index 9ffecdc55..73072b5b4 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluatorTest.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/space/SpaceRuleEvaluatorTest.java @@ -3,9 +3,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.when; -import io.reactivex.rxjava3.core.Single; import java.util.List; -import java.util.NoSuchElementException; +import java.util.Optional; import org.hypertrace.core.attribute.service.v1.LiteralValue; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; @@ -48,7 +47,7 @@ void beforeEach() { @Test void testConvertsStringValue() { when(this.mockAttributeReader.getSpanValue(this.mockTrace, this.mockSpan, MOCK_SCOPE, MOCK_KEY)) - .thenReturn(Single.just(LiteralValue.newBuilder().setStringValue("attr-value").build())); + .thenReturn(Optional.of(LiteralValue.newBuilder().setStringValue("attr-value").build())); assertEquals( List.of("attr-value"), this.ruleEvaluator.calculateSpacesForRule(this.mockTrace, this.mockSpan, this.rule)); @@ -57,7 +56,7 @@ void testConvertsStringValue() { @Test void testConvertsIntValue() { when(this.mockAttributeReader.getSpanValue(this.mockTrace, this.mockSpan, MOCK_SCOPE, MOCK_KEY)) - .thenReturn(Single.just(LiteralValue.newBuilder().setIntValue(12).build())); + .thenReturn(Optional.of(LiteralValue.newBuilder().setIntValue(12).build())); assertEquals( List.of("12"), this.ruleEvaluator.calculateSpacesForRule(this.mockTrace, this.mockSpan, this.rule)); @@ -66,7 +65,7 @@ void testConvertsIntValue() { @Test void testConvertsNoValue() { when(this.mockAttributeReader.getSpanValue(this.mockTrace, this.mockSpan, MOCK_SCOPE, MOCK_KEY)) - .thenReturn(Single.error(new NoSuchElementException("no value"))); + .thenReturn(Optional.empty()); assertEquals( List.of(), this.ruleEvaluator.calculateSpacesForRule(this.mockTrace, this.mockSpan, this.rule)); diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf index 2be70f962..4e604c805 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf @@ -38,6 +38,13 @@ enricher { host = ${?ATTRIBUTE_SERVICE_HOST_CONFIG} port = 9012 port = ${?ATTRIBUTE_SERVICE_PORT_CONFIG} + cache = { + deadline = 30s + maxSize = 1000 + refreshAfterWriteDuration = 15m + expireAfterAccessDuration = 1h + executorThreads = 1 + } } config.service.config = { host = localhost diff --git a/hypertrace-trace-enricher/trace-reader/build.gradle.kts b/hypertrace-trace-enricher/trace-reader/build.gradle.kts index 9c3c7486f..13b8d2b31 100644 --- a/hypertrace-trace-enricher/trace-reader/build.gradle.kts +++ b/hypertrace-trace-enricher/trace-reader/build.gradle.kts @@ -6,12 +6,13 @@ plugins { } dependencies { - api("org.hypertrace.core.attribute.service:attribute-service-api:0.14.26") - api("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.14.26") - api("org.hypertrace.entity.service:entity-type-service-rx-client:0.8.75") - api("org.hypertrace.entity.service:entity-data-service-rx-client:0.8.75") + api(libs.hypertrace.attributeService.api) + api(libs.hypertrace.entityTypeService.rxClient) + api(libs.hypertrace.entityDataService.rxClient) api(libs.hypertrace.data.model) - implementation("org.hypertrace.core.attribute.service:attribute-projection-registry:0.14.26") + + implementation(libs.hypertrace.attributeService.client) + implementation(libs.hypertrace.attributeService.attributeProjectionRegistry) implementation(libs.hypertrace.grpc.client.rxUtils) implementation(libs.hypertrace.grpc.context.utils) implementation(libs.hypertrace.grpc.client.utils) diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/AttributeValueConverter.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/AttributeValueConverter.java index 4d7d93edb..e20200824 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/AttributeValueConverter.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/AttributeValueConverter.java @@ -1,6 +1,6 @@ package org.hypertrace.trace.accessor.entities; -import io.reactivex.rxjava3.core.Maybe; +import java.util.Optional; import org.hypertrace.core.attribute.service.v1.LiteralValue; import org.hypertrace.entity.data.service.v1.AttributeValue; import org.hypertrace.entity.data.service.v1.Value; @@ -10,25 +10,26 @@ interface AttributeValueConverter { Logger LOG = LoggerFactory.getLogger(AttributeValueConverter.class); - static Maybe convertToAttributeValue(LiteralValue literalValue) { + static Optional convertToAttributeValue(LiteralValue literalValue) { switch (literalValue.getValueCase()) { case STRING_VALUE: - return attributeValueMaybe(Value.newBuilder().setString(literalValue.getStringValue())); + return attributeValueOptional(Value.newBuilder().setString(literalValue.getStringValue())); case BOOLEAN_VALUE: - return attributeValueMaybe(Value.newBuilder().setBoolean(literalValue.getBooleanValue())); + return attributeValueOptional( + Value.newBuilder().setBoolean(literalValue.getBooleanValue())); case FLOAT_VALUE: - return attributeValueMaybe(Value.newBuilder().setDouble(literalValue.getFloatValue())); + return attributeValueOptional(Value.newBuilder().setDouble(literalValue.getFloatValue())); case INT_VALUE: - return attributeValueMaybe(Value.newBuilder().setLong(literalValue.getIntValue())); + return attributeValueOptional(Value.newBuilder().setLong(literalValue.getIntValue())); case VALUE_NOT_SET: - return Maybe.empty(); + return Optional.empty(); default: LOG.error("Unexpected literal value case: " + literalValue.getValueCase()); - return Maybe.empty(); + return Optional.empty(); } } - private static Maybe attributeValueMaybe(Value.Builder value) { - return Maybe.just(AttributeValue.newBuilder().setValue(value).build()); + private static Optional attributeValueOptional(Value.Builder value) { + return Optional.of(AttributeValue.newBuilder().setValue(value).build()); } } diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java index 8b59259eb..215d3c330 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java @@ -1,24 +1,22 @@ package org.hypertrace.trace.accessor.entities; -import static io.reactivex.rxjava3.core.Maybe.zip; import static java.util.function.Predicate.not; -import io.reactivex.rxjava3.core.Maybe; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.AttributeSource; import org.hypertrace.core.attribute.service.v1.LiteralValue; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; import org.hypertrace.core.grpcutils.client.rx.GrpcRxExecutionContext; -import org.hypertrace.core.grpcutils.context.RequestContext; import org.hypertrace.entity.data.service.rxclient.EntityDataClient; import org.hypertrace.entity.data.service.v1.AttributeValue; import org.hypertrace.entity.data.service.v1.AttributeValue.TypeCase; @@ -36,7 +34,7 @@ class DefaultTraceEntityAccessor implements TraceEntityAccessor { private final EntityTypeClient entityTypeClient; private final EntityDataClient entityDataClient; - private final CachingAttributeClient attributeClient; + private final AttributeServiceCachedClient attributeClient; private final TraceAttributeReader traceAttributeReader; private final Duration writeThrottleDuration; private final Set excludedEntityTypes; @@ -44,7 +42,7 @@ class DefaultTraceEntityAccessor implements TraceEntityAccessor { DefaultTraceEntityAccessor( EntityTypeClient entityTypeClient, EntityDataClient entityDataClient, - CachingAttributeClient attributeClient, + AttributeServiceCachedClient attributeClient, TraceAttributeReader traceAttributeReader, Duration writeThrottleDuration, Set excludedEntityTypes) { @@ -72,32 +70,26 @@ private boolean isExcludedEntityType(final EntityType entityType) { private void writeEntityIfExists(EntityType entityType, StructuredTrace trace, Event span) { this.buildEntity(entityType, trace, span) - .subscribe( - entity -> { - UpsertCondition upsertCondition = - this.buildUpsertCondition(entityType, trace, span) - .defaultIfEmpty(UpsertCondition.getDefaultInstance()) - .blockingGet(); - - this.entityDataClient.createOrUpdateEntityEventually( - RequestContext.forTenantId(this.traceAttributeReader.getTenantId(span)), - entity, - upsertCondition, - this.writeThrottleDuration); - }); + .ifPresent( + entity -> + this.entityDataClient.createOrUpdateEntityEventually( + this.traceAttributeReader.getRequestContext(span), + entity, + this.buildUpsertCondition(entityType, trace, span) + .orElse(UpsertCondition.getDefaultInstance()), + this.writeThrottleDuration)); } - private Maybe buildUpsertCondition( + private Optional buildUpsertCondition( EntityType entityType, StructuredTrace trace, Event span) { if (entityType.getTimestampAttributeKey().isEmpty()) { - return Maybe.empty(); + return Optional.empty(); } - - return spanTenantContext(span) - .wrapSingle( - () -> - this.attributeClient.get( - entityType.getAttributeScope(), entityType.getTimestampAttributeKey())) + return this.attributeClient + .get( + traceAttributeReader.getRequestContext(span), + entityType.getAttributeScope(), + entityType.getTimestampAttributeKey()) .filter(this::isEntitySourced) .flatMap( attribute -> @@ -105,16 +97,14 @@ private Maybe buildUpsertCondition( attribute, PredicateOperator.PREDICATE_OPERATOR_LESS_THAN, trace, span)); } - private Maybe buildUpsertCondition( + private Optional buildUpsertCondition( AttributeMetadata attribute, PredicateOperator operator, StructuredTrace trace, Event span) { - return this.traceAttributeReader .getSpanValue(trace, span, attribute.getScopeString(), attribute.getKey()) - .onErrorComplete() .flatMap(value -> this.buildUpsertCondition(attribute, operator, value)); } - private Maybe buildUpsertCondition( + private Optional buildUpsertCondition( AttributeMetadata attribute, PredicateOperator operator, LiteralValue currentValue) { return AttributeValueConverter.convertToAttributeValue(currentValue) .map( @@ -128,29 +118,24 @@ private Maybe buildUpsertCondition( .build()); } - private Maybe buildEntity(EntityType entityType, StructuredTrace trace, Event span) { - Maybe> attributes = - this.resolveAllAttributes(entityType.getAttributeScope(), trace, span).cache(); - - Maybe id = - attributes.mapOptional( - map -> this.extractNonEmptyString(map, entityType.getIdAttributeKey())); - - Maybe name = - attributes.mapOptional( + private Optional buildEntity(EntityType entityType, StructuredTrace trace, Event span) { + Optional> attributes = + this.resolveAllAttributes(entityType.getAttributeScope(), trace, span); + Optional id = + attributes.flatMap(map -> this.extractNonEmptyString(map, entityType.getIdAttributeKey())); + Optional name = + attributes.flatMap( map -> this.extractNonEmptyString(map, entityType.getNameAttributeKey())); - - return zip( - id, - name, - attributes, - (resolvedId, resolvedName, resolvedAttributeMap) -> - Entity.newBuilder() - .setEntityId(resolvedId) - .setEntityType(entityType.getName()) - .setEntityName(resolvedName) - .putAllAttributes(resolvedAttributeMap) - .build()) + if (id.isEmpty() || name.isEmpty()) { + return Optional.empty(); + } + return Optional.of( + Entity.newBuilder() + .setEntityId(id.get()) + .setEntityType(entityType.getName()) + .setEntityName(name.get()) + .putAllAttributes(attributes.get()) + .build()) .filter(entity -> this.canCreateEntity(entityType, entity)); } @@ -171,22 +156,27 @@ private boolean passesFormationCondition(Entity entity, EntityFormationCondition } } - private Maybe> resolveAllAttributes( + private Optional> resolveAllAttributes( String scope, StructuredTrace trace, Event span) { - return spanTenantContext(span) - .wrapSingle(() -> this.attributeClient.getAllInScope(scope)) - .flattenAsObservable(list -> list) - .filter(this::isEntitySourced) - .flatMapMaybe(attributeMetadata -> this.resolveAttribute(attributeMetadata, trace, span)) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)) - .toMaybe(); + List attributeMetadataList = + this.attributeClient.getAllInScope( + this.traceAttributeReader.getRequestContext(span), scope); + Map resolvedAttributes = + attributeMetadataList.stream() + .filter(this::isEntitySourced) + .map(attributeMetadata -> this.resolveAttribute(attributeMetadata, trace, span)) + .flatMap(Optional::stream) + .collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue)); + if (resolvedAttributes.isEmpty()) { + return Optional.empty(); + } + return Optional.of(resolvedAttributes); } - private Maybe> resolveAttribute( + private Optional> resolveAttribute( AttributeMetadata attributeMetadata, StructuredTrace trace, Event span) { return this.traceAttributeReader .getSpanValue(trace, span, attributeMetadata.getScopeString(), attributeMetadata.getKey()) - .onErrorComplete() .flatMap(AttributeValueConverter::convertToAttributeValue) .map(value -> Map.entry(attributeMetadata.getKey(), value)); } diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/TraceEntityAccessorBuilder.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/TraceEntityAccessorBuilder.java index baecbd2df..97ea1a062 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/TraceEntityAccessorBuilder.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/TraceEntityAccessorBuilder.java @@ -4,7 +4,7 @@ import java.time.Duration; import java.util.Set; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.entity.data.service.rxclient.EntityDataClient; import org.hypertrace.entity.type.service.rxclient.EntityTypeClient; import org.hypertrace.trace.reader.attributes.TraceAttributeReaderFactory; @@ -12,14 +12,14 @@ public class TraceEntityAccessorBuilder { private final EntityTypeClient entityTypeClient; private final EntityDataClient entityDataClient; - private final CachingAttributeClient attributeClient; + private final AttributeServiceCachedClient attributeClient; private Duration entityWriteThrottleDuration = Duration.ofSeconds(15); private Set excludedEntityTypes = emptySet(); public TraceEntityAccessorBuilder( EntityTypeClient entityTypeClient, EntityDataClient entityDataClient, - CachingAttributeClient attributeClient) { + AttributeServiceCachedClient attributeClient) { this.entityTypeClient = entityTypeClient; this.entityDataClient = entityDataClient; this.attributeClient = attributeClient; diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/TraceEntityClientContext.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/TraceEntityClientContext.java deleted file mode 100644 index 31e9a26eb..000000000 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/TraceEntityClientContext.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.hypertrace.trace.accessor.entities; - -import io.grpc.Channel; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; -import org.hypertrace.entity.data.service.rxclient.EntityDataClient; -import org.hypertrace.entity.type.service.rxclient.EntityTypeClient; - -public class TraceEntityClientContext { - /** - * Instantiates a new builder which reuses existing clients - * - * @param entityTypeClient - * @param entityDataClient - * @param attributeClient - * @return {@link TraceEntityClientContext} - */ - public static TraceEntityClientContext usingClients( - EntityTypeClient entityTypeClient, - EntityDataClient entityDataClient, - CachingAttributeClient attributeClient) { - return new TraceEntityClientContext(entityTypeClient, entityDataClient, attributeClient); - } - - /** - * Instantiates a new builder which creates default configured caches for connections to each - * service. - * - * @param entityDataChannel - * @param entityTypeChannel - * @param attributeChannel - * @return {@link TraceEntityClientContext} - */ - public static TraceEntityClientContext usingChannels( - Channel entityDataChannel, Channel entityTypeChannel, Channel attributeChannel) { - return new TraceEntityClientContext( - EntityTypeClient.builder(entityTypeChannel).build(), - EntityDataClient.builder(entityDataChannel).build(), - CachingAttributeClient.builder(attributeChannel).build()); - } - - private final EntityTypeClient entityTypeClient; - private final EntityDataClient entityDataClient; - private final CachingAttributeClient attributeClient; - - private TraceEntityClientContext( - EntityTypeClient entityTypeClient, - EntityDataClient entityDataClient, - CachingAttributeClient attributeClient) { - this.entityTypeClient = entityTypeClient; - this.entityDataClient = entityDataClient; - this.attributeClient = attributeClient; - } - - public EntityTypeClient getEntityTypeClient() { - return entityTypeClient; - } - - public EntityDataClient getEntityDataClient() { - return entityDataClient; - } - - public CachingAttributeClient getAttributeClient() { - return attributeClient; - } -} diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReader.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReader.java index 53907ef2a..3b4c60cd7 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReader.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReader.java @@ -2,25 +2,26 @@ import static org.hypertrace.trace.reader.attributes.ValueSource.TRACE_SCOPE; -import io.reactivex.rxjava3.core.Single; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import java.util.Optional; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.LiteralValue; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; +import org.hypertrace.core.grpcutils.context.RequestContext; class DefaultTraceAttributeReader implements TraceAttributeReader { - private final CachingAttributeClient attributeClient; + private final AttributeServiceCachedClient attributeClient; private final ValueResolver valueResolver; - DefaultTraceAttributeReader(CachingAttributeClient attributeClient) { + DefaultTraceAttributeReader(AttributeServiceCachedClient attributeClient) { this.attributeClient = attributeClient; this.valueResolver = ValueResolver.build(this.attributeClient); } @Override - public Single getSpanValue( + public Optional getSpanValue( StructuredTrace trace, Event span, String attributeScope, String attributeKey) { ValueSource valueSource = ValueSourceFactory.forSpan(trace, span); return this.getAttribute(valueSource, attributeScope, attributeKey) @@ -28,7 +29,7 @@ public Single getSpanValue( } @Override - public Single getTraceValue(StructuredTrace trace, String attributeKey) { + public Optional getTraceValue(StructuredTrace trace, String attributeKey) { ValueSource valueSource = ValueSourceFactory.forTrace(trace); return this.getAttribute(valueSource, TRACE_SCOPE, attributeKey) .flatMap(definition -> this.valueResolver.resolve(valueSource, definition)); @@ -39,10 +40,13 @@ public String getTenantId(Event span) { return span.getCustomerId(); } - private Single getAttribute( + @Override + public RequestContext getRequestContext(Event span) { + return RequestContext.forTenantId(span.getCustomerId()); + } + + private Optional getAttribute( ValueSource valueSource, String attributeScope, String attributeKey) { - return valueSource - .executionContext() - .wrapSingle(() -> this.attributeClient.get(attributeScope, attributeKey)); + return this.attributeClient.get(valueSource.requestContext(), attributeScope, attributeKey); } } diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultValueResolver.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultValueResolver.java index 78217fd76..06ce49695 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultValueResolver.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/DefaultValueResolver.java @@ -1,15 +1,12 @@ package org.hypertrace.trace.reader.attributes; -import static io.reactivex.rxjava3.core.Single.zip; - import com.google.common.util.concurrent.RateLimiter; -import io.reactivex.rxjava3.core.Maybe; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.core.Single; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.projection.AttributeProjection; import org.hypertrace.core.attribute.service.projection.AttributeProjectionRegistry; import org.hypertrace.core.attribute.service.v1.AttributeDefinition; @@ -19,7 +16,6 @@ import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.AttributeType; import org.hypertrace.core.attribute.service.v1.LiteralValue; -import org.hypertrace.core.attribute.service.v1.LiteralValue.ValueCase; import org.hypertrace.core.attribute.service.v1.Projection; import org.hypertrace.core.attribute.service.v1.ProjectionExpression; @@ -27,29 +23,29 @@ class DefaultValueResolver implements ValueResolver { // One log a minute private static final RateLimiter LOGGING_LIMITER = RateLimiter.create(1 / 60d); - private final CachingAttributeClient attributeClient; + private final AttributeServiceCachedClient attributeClient; private final AttributeProjectionRegistry attributeProjectionRegistry; DefaultValueResolver( - CachingAttributeClient attributeClient, + AttributeServiceCachedClient attributeClient, AttributeProjectionRegistry attributeProjectionRegistry) { this.attributeClient = attributeClient; this.attributeProjectionRegistry = attributeProjectionRegistry; } @Override - public Single resolve( + public Optional resolve( ValueSource valueSource, AttributeMetadata attributeMetadata) { if (!attributeMetadata.hasDefinition()) { - return this.buildAndLogErrorLazily( - "Attribute definition not set for attribute: " + attributeMetadata.getId()); + logError("Attribute definition not set for attribute: {}", attributeMetadata.getId()); + return Optional.empty(); } return this.resolveDefinition( valueSource, attributeMetadata, attributeMetadata.getDefinition()); } - private Single resolveDefinition( + private Optional resolveDefinition( ValueSource valueSource, AttributeMetadata attributeMetadata, AttributeDefinition definition) { @@ -72,108 +68,104 @@ private Single resolveDefinition( valueSource, attributeMetadata, definition.getFirstValuePresent()); case VALUE_NOT_SET: default: - return this.buildAndLogErrorLazily("Unrecognized attribute definition"); + return Optional.empty(); } } - private Maybe maybeResolveDefinition( - ValueSource valueSource, - AttributeMetadata attributeMetadata, - AttributeDefinition definition) { - return this.resolveDefinition(valueSource, attributeMetadata, definition) - .filter(literalValue -> !literalValue.getValueCase().equals(ValueCase.VALUE_NOT_SET)) - .onErrorComplete(); - } - - private Single resolveValue( + private Optional resolveValue( ValueSource contextValueSource, String attributeScope, AttributeType attributeType, AttributeKind attributeKind, String path) { - Single matchingValueSource = - Maybe.fromOptional(contextValueSource.sourceForScope(attributeScope)) - .switchIfEmpty( - this.buildAndLogErrorLazily( - "No value source available supporting scope %s", attributeScope)); + Optional matchingValueSource = contextValueSource.sourceForScope(attributeScope); + if (matchingValueSource.isEmpty()) { + logError("No value source available supporting scope {}", attributeScope); + return Optional.empty(); + } switch (attributeType) { case ATTRIBUTE: - return matchingValueSource - .mapOptional(valueSource -> valueSource.getAttribute(path, attributeKind)) - .defaultIfEmpty(LiteralValue.getDefaultInstance()); + return matchingValueSource.flatMap( + valueSource -> valueSource.getAttribute(path, attributeKind)); case METRIC: - return matchingValueSource - .mapOptional(valueSource -> valueSource.getMetric(path, attributeKind)) - .defaultIfEmpty(LiteralValue.getDefaultInstance()); + return matchingValueSource.flatMap( + valueSource -> valueSource.getMetric(path, attributeKind)); case UNRECOGNIZED: case TYPE_UNDEFINED: default: - return this.buildAndLogErrorLazily("Unrecognized projection type"); + return Optional.empty(); } } - private Single resolveProjection(ValueSource valueSource, Projection projection) { + private Optional resolveProjection(ValueSource valueSource, Projection projection) { switch (projection.getValueCase()) { case ATTRIBUTE_ID: - return valueSource - .executionContext() - .wrapSingle(() -> this.attributeClient.get(projection.getAttributeId())) + return this.attributeClient + .getById(valueSource.requestContext(), projection.getAttributeId()) .flatMap(attributeMetadata -> this.resolve(valueSource, attributeMetadata)); case LITERAL: - return Single.just(projection.getLiteral()); + return Optional.of(projection.getLiteral()); case EXPRESSION: return this.resolveExpression(valueSource, projection.getExpression()); case VALUE_NOT_SET: default: - return this.buildAndLogErrorLazily("Unrecognized projection type"); + logError("Unrecognized projection type {}", projection.getValueCase()); + return Optional.empty(); } } - private Single resolveField( + private Optional resolveField( ValueSource valueSource, SourceField sourceField, AttributeKind attributeKind) { - return Maybe.fromOptional(valueSource.getSourceField(sourceField, attributeKind)) - .defaultIfEmpty(LiteralValue.getDefaultInstance()); + return valueSource.getSourceField(sourceField, attributeKind); } - private Single resolveFirstValuePresent( + private Optional resolveFirstValuePresent( ValueSource valueSource, AttributeMetadata attributeMetadata, AttributeDefinitions definitions) { - return Observable.fromIterable(definitions.getDefinitionsList()) - .concatMapMaybe( - definition -> this.maybeResolveDefinition(valueSource, attributeMetadata, definition)) - .first(LiteralValue.getDefaultInstance()); + return definitions.getDefinitionsList().stream() + .map(definition -> this.resolveDefinition(valueSource, attributeMetadata, definition)) + .flatMap(Optional::stream) + .findFirst(); } - private Single resolveExpression( + private Optional resolveExpression( ValueSource valueSource, ProjectionExpression expression) { - - Single projectionSingle = - Maybe.fromOptional(this.attributeProjectionRegistry.getProjection(expression.getOperator())) - .switchIfEmpty( - buildAndLogErrorLazily( - "Unregistered projection operator: %s", expression.getOperator())); - Single> argumentsSingle = + Optional maybeProjection = + this.attributeProjectionRegistry.getProjection(expression.getOperator()); + if (maybeProjection.isEmpty()) { + logError("Unregistered projection operator: {}", expression.getOperator()); + return Optional.empty(); + } + List argumentList = this.resolveArgumentList(valueSource, expression.getArgumentsList()); - return zip(projectionSingle, argumentsSingle, AttributeProjection::project); + if (argumentList.isEmpty()) { + logError("Failed to resolve argument list for expression with operator: {}", expression); + return Optional.empty(); + } + return maybeProjection.map(projection -> projection.project(argumentList)); } - private Single> resolveArgumentList( - ValueSource valueSource, List arguments) { - return Observable.fromIterable(arguments) - .flatMapSingle(argument -> this.resolveProjection(valueSource, argument)) - .collect(Collectors.toList()); + private void logError(String format, Object... args) { + if (LOGGING_LIMITER.tryAcquire()) { + log.error(format, args); + } } - private Single buildAndLogErrorLazily(String message, Object... args) { - return Single.error( - () -> { - if (LOGGING_LIMITER.tryAcquire()) { - log.error(String.format(message, args)); - } - return new UnsupportedOperationException(String.format(message, args)); - }); + private List resolveArgumentList( + ValueSource valueSource, List arguments) { + List resolvedArguments = + arguments.stream() + .map(argument -> this.resolveProjection(valueSource, argument)) + .flatMap(Optional::stream) + .collect(Collectors.toUnmodifiableList()); + if (resolvedArguments.size() != arguments.size()) { + // if any of the arguments don't resolve, return empty list to don't support partial + // resolution + return Collections.emptyList(); + } + return resolvedArguments; } } diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/SpanValueSource.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/SpanValueSource.java index fc3878f2e..22e947b61 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/SpanValueSource.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/SpanValueSource.java @@ -8,7 +8,7 @@ import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.Resource; import org.hypertrace.core.datamodel.StructuredTrace; -import org.hypertrace.core.grpcutils.client.rx.GrpcRxExecutionContext; +import org.hypertrace.core.grpcutils.context.RequestContext; class SpanValueSource extends AvroBackedValueSource { @@ -57,8 +57,8 @@ public Optional sourceForScope(String scope) { } @Override - public GrpcRxExecutionContext executionContext() { - return GrpcRxExecutionContext.forTenantContext(this.span.getCustomerId()); + public RequestContext requestContext() { + return RequestContext.forTenantId(span.getCustomerId()); } @Override diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReader.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReader.java index d8678bb2f..909026e87 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReader.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReader.java @@ -1,13 +1,16 @@ package org.hypertrace.trace.reader.attributes; -import io.reactivex.rxjava3.core.Single; +import java.util.Optional; import org.apache.avro.generic.GenericRecord; import org.hypertrace.core.attribute.service.v1.LiteralValue; +import org.hypertrace.core.grpcutils.context.RequestContext; public interface TraceAttributeReader { - Single getSpanValue(T trace, S span, String attributeScope, String attributeKey); + Optional getSpanValue(T trace, S span, String attributeScope, String attributeKey); - Single getTraceValue(T trace, String attributeKey); + Optional getTraceValue(T trace, String attributeKey); String getTenantId(S span); + + RequestContext getRequestContext(S span); } diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReaderFactory.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReaderFactory.java index 827fa895c..1c1640e41 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReaderFactory.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceAttributeReaderFactory.java @@ -1,12 +1,12 @@ package org.hypertrace.trace.reader.attributes; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; public interface TraceAttributeReaderFactory { static TraceAttributeReader build( - CachingAttributeClient attributeClient) { + AttributeServiceCachedClient attributeClient) { return new DefaultTraceAttributeReader(attributeClient); } } diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceValueSource.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceValueSource.java index 9bdbe5627..54f8df4b2 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceValueSource.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/TraceValueSource.java @@ -6,7 +6,7 @@ import org.hypertrace.core.attribute.service.v1.AttributeKind; import org.hypertrace.core.attribute.service.v1.LiteralValue; import org.hypertrace.core.datamodel.StructuredTrace; -import org.hypertrace.core.grpcutils.client.rx.GrpcRxExecutionContext; +import org.hypertrace.core.grpcutils.context.RequestContext; class TraceValueSource extends AvroBackedValueSource { @@ -49,8 +49,8 @@ public Optional sourceForScope(String scope) { } @Override - public GrpcRxExecutionContext executionContext() { - return GrpcRxExecutionContext.forTenantContext(this.trace.getCustomerId()); + public RequestContext requestContext() { + return RequestContext.forTenantId(trace.getCustomerId()); } @Override diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueResolver.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueResolver.java index 166b01628..f17de434e 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueResolver.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueResolver.java @@ -1,16 +1,16 @@ package org.hypertrace.trace.reader.attributes; -import io.reactivex.rxjava3.core.Single; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import java.util.Optional; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.projection.AttributeProjectionRegistry; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.LiteralValue; public interface ValueResolver { - Single resolve(ValueSource valueSource, AttributeMetadata attributeMetadata); + Optional resolve(ValueSource valueSource, AttributeMetadata attributeMetadata); - static ValueResolver build(CachingAttributeClient attributeClient) { + static ValueResolver build(AttributeServiceCachedClient attributeClient) { return new DefaultValueResolver(attributeClient, new AttributeProjectionRegistry()); } } diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueSource.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueSource.java index 5db9d5450..89f32bd20 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueSource.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/reader/attributes/ValueSource.java @@ -4,7 +4,7 @@ import org.hypertrace.core.attribute.service.v1.AttributeDefinition.SourceField; import org.hypertrace.core.attribute.service.v1.AttributeKind; import org.hypertrace.core.attribute.service.v1.LiteralValue; -import org.hypertrace.core.grpcutils.client.rx.GrpcRxExecutionContext; +import org.hypertrace.core.grpcutils.context.RequestContext; public interface ValueSource { Optional getAttribute(String key, AttributeKind attributeKind); @@ -15,7 +15,7 @@ public interface ValueSource { Optional sourceForScope(String scope); - GrpcRxExecutionContext executionContext(); + RequestContext requestContext(); String TRACE_SCOPE = "TRACE"; } diff --git a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/AttributeValueConverterTest.java b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/AttributeValueConverterTest.java index efdede23c..15a9315a0 100644 --- a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/AttributeValueConverterTest.java +++ b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/AttributeValueConverterTest.java @@ -19,36 +19,31 @@ class AttributeValueConverterTest { @Test void convertsStringValue() { - assertEquals( - stringAttributeValue("foo"), convertToAttributeValue(stringLiteral("foo")).blockingGet()); - assertEquals( - stringAttributeValue(""), convertToAttributeValue(stringLiteral("")).blockingGet()); + assertEquals(stringAttributeValue("foo"), convertToAttributeValue(stringLiteral("foo")).get()); + assertEquals(stringAttributeValue(""), convertToAttributeValue(stringLiteral("")).get()); } @Test void convertsBooleanValue() { + assertEquals(booleanAttributeValue(true), convertToAttributeValue(booleanLiteral(true)).get()); assertEquals( - booleanAttributeValue(true), convertToAttributeValue(booleanLiteral(true)).blockingGet()); - assertEquals( - booleanAttributeValue(false), convertToAttributeValue(booleanLiteral(false)).blockingGet()); + booleanAttributeValue(false), convertToAttributeValue(booleanLiteral(false)).get()); } @Test void convertsIntValue() { - assertEquals(longAttributeValue(0), convertToAttributeValue(longLiteral(0)).blockingGet()); - assertEquals(longAttributeValue(100), convertToAttributeValue(longLiteral(100)).blockingGet()); + assertEquals(longAttributeValue(0), convertToAttributeValue(longLiteral(0)).get()); + assertEquals(longAttributeValue(100), convertToAttributeValue(longLiteral(100)).get()); } @Test void convertsFloatValue() { - assertEquals( - doubleAttributeValue(10.4), convertToAttributeValue(doubleLiteral(10.4)).blockingGet()); - assertEquals( - doubleAttributeValue(-3.5), convertToAttributeValue(doubleLiteral(-3.5)).blockingGet()); + assertEquals(doubleAttributeValue(10.4), convertToAttributeValue(doubleLiteral(10.4)).get()); + assertEquals(doubleAttributeValue(-3.5), convertToAttributeValue(doubleLiteral(-3.5)).get()); } @Test void emptyOnUnknownValue() { - assertTrue(convertToAttributeValue(LiteralValue.getDefaultInstance()).isEmpty().blockingGet()); + assertTrue(convertToAttributeValue(LiteralValue.getDefaultInstance()).isEmpty()); } } diff --git a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java index 848b54fe9..117bb43cc 100644 --- a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java +++ b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java @@ -16,14 +16,13 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Scheduler; -import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.schedulers.Schedulers; import java.time.Duration; import java.util.Arrays; import java.util.Map; -import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.AttributeSource; import org.hypertrace.core.attribute.service.v1.AttributeType; @@ -119,7 +118,7 @@ class DefaultTraceEntityAccessorTest { @Mock EntityTypeClient mockTypeClient; @Mock EntityDataClient mockDataClient; - @Mock CachingAttributeClient mockAttributeClient; + @Mock AttributeServiceCachedClient mockAttributeClient; @Mock TraceAttributeReader mockAttributeReader; MockedStatic mockSchedulers; @@ -136,6 +135,12 @@ void beforeEach() { this.mockAttributeReader, DEFAULT_DURATION, EXCLUDE_ENTITY_TYPES); + when(mockAttributeReader.getRequestContext(any())) + .thenAnswer( + inv -> { + Event event = inv.getArgument(0); + return RequestContext.forTenantId(event.getCustomerId()); + }); mockSchedulers = Mockito.mockStatic(Schedulers.class); mockSchedulers.when(Schedulers::io).thenReturn(trampoline); } @@ -288,24 +293,27 @@ private void mockTenantId() { private void mockAttributeRead(AttributeMetadata attributeMetadata, LiteralValue value) { when(this.mockAttributeReader.getSpanValue( TEST_TRACE, TEST_SPAN, attributeMetadata.getScopeString(), attributeMetadata.getKey())) - .thenReturn(Single.just(value)); + .thenReturn(Optional.of(value)); } private void mockAttributeReadError(AttributeMetadata attributeMetadata) { when(this.mockAttributeReader.getSpanValue( TEST_TRACE, TEST_SPAN, attributeMetadata.getScopeString(), attributeMetadata.getKey())) - .thenReturn(Single.error(new NoSuchElementException())); + .thenReturn(Optional.empty()); } private void mockGetAllAttributes(AttributeMetadata... attributeMetadata) { - when(this.mockAttributeClient.getAllInScope(TEST_ENTITY_TYPE_NAME)) - .thenReturn(Single.just(Arrays.asList(attributeMetadata))); + when(this.mockAttributeClient.getAllInScope( + argThat(MATCHING_TENANT_REQUEST_CONTEXT), eq(TEST_ENTITY_TYPE_NAME))) + .thenReturn(Arrays.asList(attributeMetadata)); } private void mockGetSingleAttribute(AttributeMetadata attributeMetadata) { when(this.mockAttributeClient.get( - attributeMetadata.getScopeString(), attributeMetadata.getKey())) - .thenReturn(Single.just(attributeMetadata)); + argThat(MATCHING_TENANT_REQUEST_CONTEXT), + eq(attributeMetadata.getScopeString()), + eq(attributeMetadata.getKey()))) + .thenReturn(Optional.of(attributeMetadata)); } private void mockAllEntityTypes(EntityType entityType) { diff --git a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReaderTest.java b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReaderTest.java index 6c3f983bc..653c8ed8f 100644 --- a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReaderTest.java +++ b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultTraceAttributeReaderTest.java @@ -5,28 +5,36 @@ import static org.hypertrace.trace.reader.attributes.AvroUtil.defaultedStructuredTraceBuilder; import static org.hypertrace.trace.reader.attributes.LiteralValueUtil.stringLiteral; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.reactivex.rxjava3.core.Single; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import java.util.Optional; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.v1.AttributeDefinition; import org.hypertrace.core.attribute.service.v1.AttributeKind; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.AttributeType; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; +import org.hypertrace.core.grpcutils.context.RequestContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) class DefaultTraceAttributeReaderTest { - @Mock CachingAttributeClient mockAttributeClient; + @Mock AttributeServiceCachedClient mockAttributeClient; private TraceAttributeReader traceAttributeReader; + private static final ArgumentMatcher MATCHING_TENANT_REQUEST_CONTEXT = + arg -> + arg.buildContextualKey() + .equals(RequestContext.forTenantId("defaultCustomerId").buildContextualKey()); @BeforeEach void beforeEach() { @@ -42,7 +50,9 @@ void canReadSpanValues() { .setValueKind(AttributeKind.TYPE_STRING) .setDefinition(AttributeDefinition.newBuilder().setSourcePath("attrPath").build()) .build(); - when(this.mockAttributeClient.get("TEST_SCOPE", "key")).thenReturn(Single.just(metadata)); + when(this.mockAttributeClient.get( + argThat(MATCHING_TENANT_REQUEST_CONTEXT), eq("TEST_SCOPE"), eq("key"))) + .thenReturn(Optional.of(metadata)); Event span = defaultedEventBuilder() @@ -53,7 +63,7 @@ void canReadSpanValues() { stringLiteral("attrValue"), this.traceAttributeReader .getSpanValue(mock(StructuredTrace.class), span, "TEST_SCOPE", "key") - .blockingGet()); + .get()); } @Test @@ -65,7 +75,9 @@ void canReadTraceValues() { .setValueKind(AttributeKind.TYPE_STRING) .setDefinition(AttributeDefinition.newBuilder().setSourcePath("attrPath").build()) .build(); - when(this.mockAttributeClient.get("TRACE", "key")).thenReturn(Single.just(metadata)); + when(this.mockAttributeClient.get( + argThat(MATCHING_TENANT_REQUEST_CONTEXT), eq("TRACE"), eq("key"))) + .thenReturn(Optional.of(metadata)); StructuredTrace trace = defaultedStructuredTraceBuilder() @@ -73,7 +85,6 @@ void canReadTraceValues() { .build(); assertEquals( - stringLiteral("attrValue"), - this.traceAttributeReader.getTraceValue(trace, "key").blockingGet()); + stringLiteral("attrValue"), this.traceAttributeReader.getTraceValue(trace, "key").get()); } } diff --git a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultValueResolverTest.java b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultValueResolverTest.java index e093fd515..17babbeed 100644 --- a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultValueResolverTest.java +++ b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/reader/attributes/DefaultValueResolverTest.java @@ -8,12 +8,15 @@ import static org.hypertrace.trace.reader.attributes.LiteralValueUtil.longLiteral; import static org.hypertrace.trace.reader.attributes.LiteralValueUtil.stringLiteral; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.reactivex.rxjava3.core.Single; import java.util.Map; -import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; +import java.util.Optional; +import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient; import org.hypertrace.core.attribute.service.projection.AttributeProjectionRegistry; import org.hypertrace.core.attribute.service.v1.AttributeDefinition; import org.hypertrace.core.attribute.service.v1.AttributeDefinition.AttributeDefinitions; @@ -36,7 +39,7 @@ @ExtendWith(MockitoExtension.class) class DefaultValueResolverTest { - @Mock CachingAttributeClient mockAttributeClient; + @Mock AttributeServiceCachedClient mockAttributeServiceCachedClient; @Mock StructuredTrace mockStructuredTrace; private DefaultValueResolver resolver; @@ -44,7 +47,8 @@ class DefaultValueResolverTest { @BeforeEach void beforeEach() { this.resolver = - new DefaultValueResolver(this.mockAttributeClient, new AttributeProjectionRegistry()); + new DefaultValueResolver( + this.mockAttributeServiceCachedClient, new AttributeProjectionRegistry()); } @Test @@ -66,7 +70,7 @@ void resolvesAttributes() { stringLiteral("attrValue"), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadata) - .blockingGet()); + .get()); } @Test @@ -86,7 +90,7 @@ void resolvesMetrics() { longLiteral(42), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadata) - .blockingGet()); + .get()); } @Test @@ -103,7 +107,7 @@ void resolvesLiteralProjections() { this.resolver .resolve( ValueSourceFactory.forSpan(this.mockStructuredTrace, mock(Event.class)), metadata) - .blockingGet()); + .get()); } @Test @@ -122,7 +126,8 @@ void resolvesAttributeProjections() { .setValueKind(AttributeKind.TYPE_INT64) .setDefinition(AttributeDefinition.newBuilder().setSourcePath("metricPath").build()) .build(); - when(this.mockAttributeClient.get("TEST_SCOPE.other")).thenReturn(Single.just(otherMetadata)); + when(this.mockAttributeServiceCachedClient.getById(any(), eq("TEST_SCOPE.other"))) + .thenReturn(Optional.of((otherMetadata))); Event span = defaultedEventBuilder().setMetrics(buildMetricsWithKeyValue("metricPath", 42)).build(); @@ -131,7 +136,7 @@ void resolvesAttributeProjections() { longLiteral(42), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), projectionMetadata) - .blockingGet()); + .get()); } @Test @@ -166,8 +171,10 @@ void resolvesExpressionProjections() { .setValueKind(AttributeKind.TYPE_STRING) .setDefinition(AttributeDefinition.newBuilder().setSourcePath("attrPath").build()) .build(); - when(this.mockAttributeClient.get("TEST_SCOPE.first")).thenReturn(Single.just(firstMetadata)); - when(this.mockAttributeClient.get("TEST_SCOPE.second")).thenReturn(Single.just(secondMetadata)); + when(this.mockAttributeServiceCachedClient.getById(any(), eq("TEST_SCOPE.first"))) + .thenReturn(Optional.of(firstMetadata)); + when(this.mockAttributeServiceCachedClient.getById(any(), eq("TEST_SCOPE.second"))) + .thenReturn(Optional.of(secondMetadata)); Event span = defaultedEventBuilder() @@ -179,7 +186,7 @@ void resolvesExpressionProjections() { stringLiteral("42coolString"), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), projectionMetadata) - .blockingGet()); + .get()); } @Test @@ -198,18 +205,20 @@ void resolvesProjectionsAcrossScopes() { .setValueKind(AttributeKind.TYPE_INT64) .setDefinition(AttributeDefinition.newBuilder().setSourcePath("metricPath").build()) .build(); - when(this.mockAttributeClient.get("TRACE.other")).thenReturn(Single.just(otherMetadata)); + when(this.mockAttributeServiceCachedClient.getById(any(), eq("TRACE.other"))) + .thenReturn(Optional.of(otherMetadata)); StructuredTrace trace = defaultedStructuredTraceBuilder() .setMetrics(buildMetricsWithKeyValue("metricPath", 42)) .build(); + Event span = mock(Event.class); + when(span.getCustomerId()).thenReturn(trace.getCustomerId()); + assertEquals( longLiteral(42), - this.resolver - .resolve(ValueSourceFactory.forSpan(trace, mock(Event.class)), projectionMetadata) - .blockingGet()); + this.resolver.resolve(ValueSourceFactory.forSpan(trace, span), projectionMetadata).get()); } @Test @@ -239,12 +248,12 @@ void resolveFields() { longLiteral(123), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadataStartTime) - .blockingGet()); + .get()); assertEquals( longLiteral(234), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadataEndTime) - .blockingGet()); + .get()); } @Test @@ -279,7 +288,7 @@ void resolvesFirstAvailableDefinition() { longLiteral(14), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadata) - .blockingGet()); + .get()); } @Test @@ -305,11 +314,10 @@ void resolvesEmptyIfNoDefinitionAvailable() { buildAttributesWithKeyValues(Map.of("path.to.string", "foo", "path.to.int", "14"))) .build(); - assertEquals( - LiteralValue.getDefaultInstance(), + assertTrue( this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadata) - .blockingGet()); + .isEmpty()); } @Test @@ -340,7 +348,7 @@ void resolvesFirstAttributeProjection() { stringLiteral("expected-value"), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadata) - .blockingGet()); + .get()); } @Test @@ -375,6 +383,6 @@ void resolvesNestedFirstAttribute() { longLiteral(13), this.resolver .resolve(ValueSourceFactory.forSpan(this.mockStructuredTrace, span), metadata) - .blockingGet()); + .get()); } }