diff --git a/hypertrace-trace-enricher/trace-reader/build.gradle.kts b/hypertrace-trace-enricher/trace-reader/build.gradle.kts index d86649244..52016e07a 100644 --- a/hypertrace-trace-enricher/trace-reader/build.gradle.kts +++ b/hypertrace-trace-enricher/trace-reader/build.gradle.kts @@ -20,7 +20,7 @@ dependencies { compileOnly("org.projectlombok:lombok:1.18.20") testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") - testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("org.mockito:mockito-inline:3.8.0") testImplementation("org.mockito:mockito-junit-jupiter:3.8.0") testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.14.1") diff --git a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java index 39135c225..d869ca7c4 100644 --- a/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java +++ b/hypertrace-trace-enricher/trace-reader/src/main/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessor.java @@ -4,6 +4,7 @@ import static java.util.function.Predicate.not; import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.time.Duration; import java.util.Map; import java.util.Map.Entry; @@ -61,20 +62,21 @@ public void writeAssociatedEntitiesForSpanEventually(StructuredTrace trace, Even } private void writeEntityIfExists(EntityType entityType, StructuredTrace trace, Event span) { - Entity entity = this.buildEntity(entityType, trace, span).blockingGet(); - if (entity == null) { - return; - } - UpsertCondition upsertCondition = - this.buildUpsertCondition(entityType, trace, span) - .defaultIfEmpty(UpsertCondition.getDefaultInstance()) - .blockingGet(); - - this.entityDataClient.createOrUpdateEntityEventually( - RequestContext.forTenantId(this.traceAttributeReader.getTenantId(span)), - entity, - upsertCondition, - this.writeThrottleDuration); + this.buildEntity(entityType, trace, span) + .subscribeOn(Schedulers.io()) + .subscribe( + entity -> { + UpsertCondition upsertCondition = + this.buildUpsertCondition(entityType, trace, span) + .defaultIfEmpty(UpsertCondition.getDefaultInstance()) + .blockingGet(); + + this.entityDataClient.createOrUpdateEntityEventually( + RequestContext.forTenantId(this.traceAttributeReader.getTenantId(span)), + entity, + upsertCondition, + this.writeThrottleDuration); + }); } private Maybe buildUpsertCondition( diff --git a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java index 8c2200ea1..5e9a3e3cb 100644 --- a/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java +++ b/hypertrace-trace-enricher/trace-reader/src/test/java/org/hypertrace/trace/accessor/entities/DefaultTraceEntityAccessorTest.java @@ -15,7 +15,9 @@ import static org.mockito.Mockito.when; import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.time.Duration; import java.util.Arrays; import java.util.Map; @@ -37,11 +39,14 @@ import org.hypertrace.entity.type.service.v2.EntityType; import org.hypertrace.entity.type.service.v2.EntityType.EntityFormationCondition; import org.hypertrace.trace.reader.attributes.TraceAttributeReader; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentMatcher; import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) @@ -105,11 +110,13 @@ class DefaultTraceEntityAccessorTest { @Mock EntityDataClient mockDataClient; @Mock CachingAttributeClient mockAttributeClient; @Mock TraceAttributeReader mockAttributeReader; + MockedStatic mockSchedulers; private DefaultTraceEntityAccessor entityAccessor; @BeforeEach void beforeEach() { + Scheduler trampoline = Schedulers.trampoline(); this.entityAccessor = new DefaultTraceEntityAccessor( this.mockTypeClient, @@ -117,6 +124,13 @@ void beforeEach() { this.mockAttributeClient, this.mockAttributeReader, DEFAULT_DURATION); + mockSchedulers = Mockito.mockStatic(Schedulers.class); + mockSchedulers.when(Schedulers::io).thenReturn(trampoline); + } + + @AfterEach + void afterEach() { + mockSchedulers.close(); } @Test @@ -126,7 +140,6 @@ void canWriteAllEntities() { mockTenantId(); mockAttributeRead(TEST_ENTITY_ID_ATTRIBUTE, stringLiteral(TEST_ENTITY_ID_ATTRIBUTE_VALUE)); mockAttributeRead(TEST_ENTITY_NAME_ATTRIBUTE, stringLiteral(TEST_ENTITY_NAME_ATTRIBUTE_VALUE)); - this.entityAccessor.writeAssociatedEntitiesForSpanEventually(TEST_TRACE, TEST_SPAN); verify(mockDataClient, times(1))