diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java index 582eaedd..81ac6480 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java @@ -7,18 +7,16 @@ import com.google.protobuf.ServiceException; import com.typesafe.config.Config; import io.grpc.stub.StreamObserver; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; -import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.SingleValueKey; import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry; import org.hypertrace.core.grpcutils.context.RequestContext; @@ -26,8 +24,6 @@ import org.hypertrace.entity.data.service.v1.AttributeValue; import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.Query; -import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest; -import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest.EntityUpdateInfo; import org.hypertrace.entity.query.service.v1.ColumnIdentifier; import org.hypertrace.entity.query.service.v1.ColumnMetadata; import org.hypertrace.entity.query.service.v1.EntityQueryRequest; @@ -41,7 +37,6 @@ import org.hypertrace.entity.query.service.v1.SetAttribute; import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest; import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse; -import org.hypertrace.entity.query.service.v1.UpdateOperation; import org.hypertrace.entity.query.service.v1.Value; import org.hypertrace.entity.query.service.v1.ValueType; import org.hypertrace.entity.service.constants.EntityServiceConstants; @@ -240,8 +235,8 @@ Row convertToEntityQueryResult( public void update(EntityUpdateRequest request, StreamObserver responseObserver) { // Validations RequestContext requestContext = RequestContext.CURRENT.get(); - Optional maybeTenantId = requestContext.getTenantId(); - if (maybeTenantId.isEmpty()) { + Optional tenantId = requestContext.getTenantId(); + if (tenantId.isEmpty()) { responseObserver.onError(new ServiceException("Tenant id is missing in the request.")); return; } @@ -261,9 +256,14 @@ public void update(EntityUpdateRequest request, StreamObserver r doUpdate(requestContext, request); // Finally return the selections - List entities = - getProjectedEntities( - request.getEntityIdsList(), request.getSelectionList(), requestContext); + Query entitiesQuery = Query.newBuilder().addAllEntityId(request.getEntityIdsList()).build(); + List docStoreSelections = + entityQueryConverter.convertSelectionsToDocStoreSelections( + requestContext, request.getSelectionList()); + Iterator documentIterator = + entitiesCollection.search( + DocStoreConverter.transform(tenantId.get(), entitiesQuery, docStoreSelections)); + List entities = convertDocsToEntities(documentIterator); responseObserver.onNext( convertEntitiesToResultSetChunk(requestContext, entities, request.getSelectionList())); responseObserver.onCompleted(); @@ -274,7 +274,7 @@ public void update(EntityUpdateRequest request, StreamObserver r } private void doUpdate(RequestContext requestContext, EntityUpdateRequest request) - throws Exception { + throws IOException { if (request.getOperation().hasSetAttribute()) { SetAttribute setAttribute = request.getOperation().getSetAttribute(); String attributeId = setAttribute.getAttribute().getColumnName(); @@ -289,128 +289,20 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request AttributeValue attributeValue = EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); - JSONDocument jsonDocument = new JSONDocument(jsonValue); - Map> entitiesUpdateMap = new HashMap<>(); for (String entityId : request.getEntityIdsList()) { SingleValueKey key = new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId); - if (entitiesUpdateMap.containsKey(key)) { - entitiesUpdateMap.get(key).put(subDocPath, jsonDocument); - } else { - Map subDocument = new HashMap<>(); - subDocument.put(subDocPath, jsonDocument); - entitiesUpdateMap.put(key, subDocument); + // TODO better error reporting once doc store exposes the, + if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) { + LOG.warn( + "Failed to update entity {}, subDocPath {}, with new doc {}.", + key, + subDocPath, + jsonValue); } } - try { - entitiesCollection.bulkUpdateSubDocs(entitiesUpdateMap); - } catch (Exception e) { - LOG.error( - "Failed to update entities {}, subDocPath {}, with new doc {}.", - entitiesUpdateMap, - subDocPath, - jsonValue, - e); - throw e; - } - } - } - - @Override - public void bulkUpdate( - BulkEntityUpdateRequest request, StreamObserver responseObserver) { - // Validations - RequestContext requestContext = RequestContext.CURRENT.get(); - Optional maybeTenantId = requestContext.getTenantId(); - if (maybeTenantId.isEmpty()) { - responseObserver.onError(new ServiceException("Tenant id is missing in the request.")); - return; - } - if (StringUtils.isEmpty(request.getEntityType())) { - responseObserver.onError(new ServiceException("Entity type is missing in the request.")); - return; - } - if (request.getEntitiesCount() == 0) { - responseObserver.onError(new ServiceException("Entities are missing in the request.")); - } - Map entitiesMap = request.getEntitiesMap(); - try { - doBulkUpdate(requestContext, entitiesMap); - responseObserver.onCompleted(); - } catch (Exception e) { - responseObserver.onError( - new ServiceException("Error occurred while executing " + request, e)); - } - } - - private List getProjectedEntities( - Iterable entityIdsList, - List selectionList, - RequestContext requestContext) { - Query entitiesQuery = Query.newBuilder().addAllEntityId(entityIdsList).build(); - List docStoreSelections = - entityQueryConverter.convertSelectionsToDocStoreSelections(requestContext, selectionList); - Iterator documentIterator = - entitiesCollection.search( - DocStoreConverter.transform( - requestContext.getTenantId().orElseThrow(), entitiesQuery, docStoreSelections)); - return convertDocsToEntities(documentIterator); - } - - private void doBulkUpdate( - RequestContext requestContext, Map entitiesMap) throws Exception { - Map> entitiesUpdateMap = new HashMap<>(); - for (String entityId : entitiesMap.keySet()) { - Map transformedUpdateOperations = - transformUpdateOperations( - entitiesMap.get(entityId).getUpdateOperationList(), requestContext); - if (transformedUpdateOperations.isEmpty()) { - continue; - } - entitiesUpdateMap.put( - new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId), - transformedUpdateOperations); - } - - if (entitiesUpdateMap.isEmpty()) { - LOG.error("There are no entities to update!"); - return; - } - - try { - entitiesCollection.bulkUpdateSubDocs(entitiesUpdateMap); - } catch (Exception e) { - LOG.error("Failed to update entities {}", entitiesMap, e); - throw e; - } - } - - private Map transformUpdateOperations( - List updateOperationList, RequestContext requestContext) throws Exception { - Map documentMap = new HashMap<>(); - for (UpdateOperation updateOperation : updateOperationList) { - if (!updateOperation.hasSetAttribute()) { - continue; - } - SetAttribute setAttribute = updateOperation.getSetAttribute(); - String attributeId = setAttribute.getAttribute().getColumnName(); - String subDocPath = - entityAttributeMapping - .getDocStorePathByAttributeId(requestContext, attributeId) - .orElseThrow( - () -> new IllegalArgumentException("Unknown attribute FQN " + attributeId)); - AttributeValue attributeValue = - EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); - try { - String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); - documentMap.put(subDocPath, new JSONDocument(jsonValue)); - } catch (Exception e) { - LOG.error("Failed to put update corresponding to {} in the documentMap", subDocPath, e); - throw e; - } } - return Collections.unmodifiableMap(documentMap); } @Override diff --git a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java index cab30c9e..7554142a 100644 --- a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java +++ b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java @@ -14,10 +14,8 @@ import io.grpc.Context; import io.grpc.stub.StreamObserver; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; -import org.hypertrace.core.documentstore.BulkUpdateResult; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.Filter; @@ -27,8 +25,6 @@ import org.hypertrace.core.grpcutils.context.RequestContext; import org.hypertrace.entity.data.service.v1.AttributeValue; import org.hypertrace.entity.data.service.v1.Entity; -import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest; -import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest.EntityUpdateInfo; import org.hypertrace.entity.query.service.v1.ColumnIdentifier; import org.hypertrace.entity.query.service.v1.EntityQueryRequest; import org.hypertrace.entity.query.service.v1.EntityUpdateRequest; @@ -53,7 +49,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) @@ -201,149 +196,10 @@ public void testUpdate_success() throws Exception { }); verify(mockEntitiesCollection, times(1)) - .bulkUpdateSubDocs( - eq( - Map.of( - new SingleValueKey("tenant1", "entity-id-1"), - Map.of( - "attributes.status", - new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))))); - } - - @Nested - class BulkUpdateEntities { - @Test - public void testBulkUpdate_noTenantId() throws Exception { - StreamObserver mockResponseObserver = mock(StreamObserver.class); - when(requestContext.getTenantId()).thenReturn(Optional.empty()); - Context.current() - .withValue(RequestContext.CURRENT, requestContext) - .call( - () -> { - EntityQueryServiceImpl eqs = - new EntityQueryServiceImpl(entitiesCollection, mockAttributeMapping, 1); - - eqs.bulkUpdate(null, mockResponseObserver); - - verify(mockResponseObserver, times(1)) - .onError( - argThat( - new ExceptionMessageMatcher("Tenant id is missing in the request."))); - return null; - }); - } - - @Test - public void testBulkUpdate_noEntityType() throws Exception { - StreamObserver mockResponseObserver = mock(StreamObserver.class); - - Context.current() - .withValue(RequestContext.CURRENT, mockRequestContextWithTenantId()) - .call( - () -> { - EntityQueryServiceImpl eqs = - new EntityQueryServiceImpl(entitiesCollection, mockAttributeMapping, 1); - - eqs.bulkUpdate(BulkEntityUpdateRequest.newBuilder().build(), mockResponseObserver); - - verify(mockResponseObserver, times(1)) - .onError( - argThat( - new ExceptionMessageMatcher("Entity type is missing in the request."))); - return null; - }); - } - - @Test - public void testBulkUpdate_noEntities() throws Exception { - StreamObserver mockResponseObserver = mock(StreamObserver.class); - - Context.current() - .withValue(RequestContext.CURRENT, mockRequestContextWithTenantId()) - .call( - () -> { - EntityQueryServiceImpl eqs = - new EntityQueryServiceImpl(entitiesCollection, mockAttributeMapping, 1); - - eqs.bulkUpdate( - BulkEntityUpdateRequest.newBuilder().setEntityType(TEST_ENTITY_TYPE).build(), - mockResponseObserver); - - verify(mockResponseObserver, times(1)) - .onError( - argThat( - new ExceptionMessageMatcher("Entities are missing in the request."))); - return null; - }); - } - - @Test - public void testBulkUpdate_entitiesWithNoUpdateOperations() throws Exception { - EntityUpdateInfo.Builder updateInfo = EntityUpdateInfo.newBuilder(); - BulkEntityUpdateRequest bulkUpdateRequest = - BulkEntityUpdateRequest.newBuilder() - .setEntityType(TEST_ENTITY_TYPE) - .putEntities("entity-id-1", updateInfo.build()) - .build(); - - StreamObserver mockResponseObserver = mock(StreamObserver.class); - - Context.current() - .withValue(RequestContext.CURRENT, mockRequestContextWithTenantId()) - .call( - () -> { - EntityQueryServiceImpl eqs = - new EntityQueryServiceImpl(entitiesCollection, mockAttributeMapping, 1); - eqs.bulkUpdate(bulkUpdateRequest, mockResponseObserver); - return null; - }); - verify(entitiesCollection, Mockito.never()).bulkUpdateSubDocs(any()); - } - - @Test - public void testBulkUpdate_success() throws Exception { - Collection mockEntitiesCollection = mockEntitiesCollection(); - - Builder newStatus = - LiteralConstant.newBuilder() - .setValue(Value.newBuilder().setValueType(ValueType.STRING).setString("NEW_STATUS")); - - UpdateOperation.Builder updateOperation = - UpdateOperation.newBuilder() - .setSetAttribute( - SetAttribute.newBuilder() - .setAttribute(ColumnIdentifier.newBuilder().setColumnName(ATTRIBUTE_ID1)) - .setValue(newStatus)); - EntityUpdateInfo.Builder updateInfo = - EntityUpdateInfo.newBuilder().addUpdateOperation(updateOperation); - BulkEntityUpdateRequest bulkUpdateRequest = - BulkEntityUpdateRequest.newBuilder() - .setEntityType(TEST_ENTITY_TYPE) - .putEntities("entity-id-1", updateInfo.build()) - .build(); - - StreamObserver mockResponseObserver = mock(StreamObserver.class); - - Context.current() - .withValue(RequestContext.CURRENT, mockRequestContextWithTenantId()) - .call( - () -> { - EntityQueryServiceImpl eqs = - new EntityQueryServiceImpl( - mockEntitiesCollection, mockMappingForAttribute1(), 1); - eqs.bulkUpdate(bulkUpdateRequest, mockResponseObserver); - return null; - }); - - verify(mockEntitiesCollection, times(1)) - .bulkUpdateSubDocs( - eq( - Map.of( - new SingleValueKey("tenant1", "entity-id-1"), - Map.of( - "attributes.entity_id", - new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))))); - } + .updateSubDoc( + eq(new SingleValueKey("tenant1", "entity-id-1")), + eq("attributes.status"), + eq(new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))); } @Test @@ -629,15 +485,9 @@ public void test_sendCorrectTotalResponse() throws Exception { } } - private Collection mockEntitiesCollection() throws Exception { + private Collection mockEntitiesCollection() { // mock successful update - try { - return when(entitiesCollection.bulkUpdateSubDocs(any())) - .thenReturn(new BulkUpdateResult(0)) - .getMock(); - } catch (Exception e) { - throw e; - } + return when(entitiesCollection.updateSubDoc(any(), any(), any())).thenReturn(true).getMock(); } private RequestContext mockRequestContextWithTenantId() { diff --git a/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityQueryServiceTest.java b/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityQueryServiceTest.java index 812c0988..116a86f7 100644 --- a/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityQueryServiceTest.java +++ b/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityQueryServiceTest.java @@ -23,8 +23,6 @@ import java.util.UUID; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.DatastoreProvider; -import org.hypertrace.core.grpcutils.client.GrpcClientRequestContextUtil; -import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory; import org.hypertrace.core.serviceframework.IntegrationTestServerUtil; import org.hypertrace.entity.constants.v1.ApiAttribute; import org.hypertrace.entity.constants.v1.CommonAttribute; @@ -34,22 +32,16 @@ import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.Value; import org.hypertrace.entity.query.service.client.EntityQueryServiceClient; -import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest; -import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest.EntityUpdateInfo; import org.hypertrace.entity.query.service.v1.ColumnIdentifier; import org.hypertrace.entity.query.service.v1.EntityQueryRequest; -import org.hypertrace.entity.query.service.v1.EntityQueryServiceGrpc; -import org.hypertrace.entity.query.service.v1.EntityQueryServiceGrpc.EntityQueryServiceBlockingStub; import org.hypertrace.entity.query.service.v1.Expression; import org.hypertrace.entity.query.service.v1.Filter; import org.hypertrace.entity.query.service.v1.LiteralConstant; import org.hypertrace.entity.query.service.v1.Operator; import org.hypertrace.entity.query.service.v1.ResultSetChunk; import org.hypertrace.entity.query.service.v1.Row; -import org.hypertrace.entity.query.service.v1.SetAttribute; import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest; import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse; -import org.hypertrace.entity.query.service.v1.UpdateOperation; import org.hypertrace.entity.query.service.v1.ValueType; import org.hypertrace.entity.service.EntityServiceConfig; import org.hypertrace.entity.service.client.config.EntityServiceClientConfig; @@ -76,7 +68,7 @@ public class EntityQueryServiceTest { private static final Logger LOG = LoggerFactory.getLogger(EntityQueryServiceTest.class); private static final Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG); - private static EntityQueryServiceBlockingStub entityQueryServiceClient; + private static EntityQueryServiceClient entityQueryServiceClient; // needed to create entities private static EntityDataServiceClient entityDataServiceClient; @@ -90,7 +82,6 @@ public class EntityQueryServiceTest { private static final String SERVICE_ID = generateRandomUUID(); private static Map apiAttributesMap; - private static final Map HEADERS = Map.of("x-tenant-id", TENANT_ID); // attributes defined in application.conf in attribute map private static final String API_DISCOVERY_STATE_ATTR = "API.apiDiscoveryState"; private static final String API_HTTP_METHOD_ATTR = "API.httpMethod"; @@ -100,7 +91,7 @@ public class EntityQueryServiceTest { @BeforeAll public static void setUp() throws Exception { mongo = - new GenericContainer<>(DockerImageName.parse("mongo:4.4.0")) + new GenericContainer<>(DockerImageName.parse("hypertrace/mongodb:main")) .withExposedPorts(27017) .withStartupAttempts(CONTAINER_STARTUP_ATTEMPTS) .waitingFor(Wait.forListeningPort()) @@ -121,10 +112,7 @@ public static void setUp() throws Exception { entityServiceTestConfig.getHost(), entityServiceTestConfig.getPort()) .usePlaintext() .build(); - entityQueryServiceClient = - EntityQueryServiceGrpc.newBlockingStub(channel) - .withCallCredentials( - RequestContextClientCallCredsProviderFactory.getClientCallCredsProvider().get()); + entityQueryServiceClient = new EntityQueryServiceClient(channel); entityDataServiceClient = new EntityDataServiceClient(channel); datastore = getDatastore(); @@ -309,8 +297,7 @@ public void testExecute() { assertFalse(createdEntity6.getEntityId().trim().isEmpty()); Iterator resultSetChunkIterator = - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.execute(queryRequest)); + entityQueryServiceClient.execute(queryRequest, Map.of("x-tenant-id", TENANT_ID)); List list = Lists.newArrayList(resultSetChunkIterator); assertEquals(3, list.size()); assertEquals(2, list.get(0).getRowCount()); @@ -380,8 +367,7 @@ public void testExecute_EmptyResponse() { .build(); Iterator resultSetChunkIterator = - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.execute(queryRequestNoResult)); + entityQueryServiceClient.execute(queryRequestNoResult, Map.of("x-tenant-id", TENANT_ID)); List list = Lists.newArrayList(resultSetChunkIterator); assertEquals(1, list.size()); @@ -412,8 +398,7 @@ public void testCreateAndGetEntity() { .addSelection(createExpression(API_DISCOVERY_STATE_ATTR)) .build(); Iterator resultSetChunkIterator = - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.execute(entityQueryRequest)); + entityQueryServiceClient.execute(entityQueryRequest, Map.of("x-tenant-id", TENANT_ID)); List values = new ArrayList<>(); @@ -456,8 +441,7 @@ public void testCreateAndGetEntities() { .addSelection(createExpression(API_DISCOVERY_STATE_ATTR)) .build(); Iterator resultSetChunkIterator = - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.execute(entityQueryRequest)); + entityQueryServiceClient.execute(entityQueryRequest, Map.of("x-tenant-id", TENANT_ID)); List values = new ArrayList<>(); @@ -477,118 +461,6 @@ public void testCreateAndGetEntities() { assertEquals("UNDER_DISCOVERY", values.get(1)); } - @Test - public void testBulkUpdate() { - Entity.Builder apiEntityBuilder1 = - Entity.newBuilder() - .setTenantId(TENANT_ID) - .setEntityType(EntityType.API.name()) - .setEntityName("api1") - .putIdentifyingAttributes( - EntityConstants.getValue(ServiceAttribute.SERVICE_ATTRIBUTE_ID), - createAttribute(SERVICE_ID)) - .putIdentifyingAttributes( - EntityConstants.getValue(ApiAttribute.API_ATTRIBUTE_NAME), createAttribute("api1")) - .putIdentifyingAttributes( - EntityConstants.getValue(ApiAttribute.API_ATTRIBUTE_API_TYPE), - createAttribute(API_TYPE)); - apiEntityBuilder1 - .putAttributes( - apiAttributesMap.get(API_DISCOVERY_STATE_ATTR), createAttribute("DISCOVERED")) - .putAttributes(apiAttributesMap.get(API_HTTP_METHOD_ATTR), createAttribute("GET")); - Entity entity1 = entityDataServiceClient.upsert(apiEntityBuilder1.build()); - - Entity.Builder apiEntityBuilder2 = - Entity.newBuilder() - .setTenantId(TENANT_ID) - .setEntityType(EntityType.API.name()) - .setEntityName("api2") - .putIdentifyingAttributes( - EntityConstants.getValue(ServiceAttribute.SERVICE_ATTRIBUTE_ID), - createAttribute(SERVICE_ID)) - .putIdentifyingAttributes( - EntityConstants.getValue(ApiAttribute.API_ATTRIBUTE_NAME), createAttribute("api2")) - .putIdentifyingAttributes( - EntityConstants.getValue(ApiAttribute.API_ATTRIBUTE_API_TYPE), - createAttribute(API_TYPE)); - apiEntityBuilder2 - .putAttributes( - apiAttributesMap.get(API_DISCOVERY_STATE_ATTR), createAttribute("UNDER_DISCOVERY")) - .putAttributes(apiAttributesMap.get(API_HTTP_METHOD_ATTR), createAttribute("GET")); - Entity entity2 = entityDataServiceClient.upsert(apiEntityBuilder2.build()); - - // create BulkUpdate request - UpdateOperation update1 = - UpdateOperation.newBuilder() - .setSetAttribute( - SetAttribute.newBuilder() - .setAttribute( - ColumnIdentifier.newBuilder().setColumnName(API_DISCOVERY_STATE_ATTR)) - .setValue( - LiteralConstant.newBuilder() - .setValue( - org.hypertrace.entity.query.service.v1.Value.newBuilder() - .setString("DISCOVERED")))) - .build(); - UpdateOperation update2 = - UpdateOperation.newBuilder() - .setSetAttribute( - SetAttribute.newBuilder() - .setAttribute(ColumnIdentifier.newBuilder().setColumnName(API_HTTP_METHOD_ATTR)) - .setValue( - LiteralConstant.newBuilder() - .setValue( - org.hypertrace.entity.query.service.v1.Value.newBuilder() - .setString("POST")))) - .build(); - EntityUpdateInfo updateInfo1 = - EntityUpdateInfo.newBuilder().addUpdateOperation(update2).build(); - EntityUpdateInfo updateInfo2 = - EntityUpdateInfo.newBuilder() - .addUpdateOperation(update1) - .addUpdateOperation(update2) - .build(); - BulkEntityUpdateRequest bulkUpdateRequest = - BulkEntityUpdateRequest.newBuilder() - .setEntityType(EntityType.API.name()) - .putEntities(entity1.getEntityId(), updateInfo1) - .putEntities(entity2.getEntityId(), updateInfo2) - .build(); - - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.bulkUpdate(bulkUpdateRequest)); - - EntityQueryRequest entityQueryRequest = - EntityQueryRequest.newBuilder() - .setEntityType(EntityType.API.name()) - .addSelection(createExpression(API_DISCOVERY_STATE_ATTR)) - .addSelection(createExpression(API_HTTP_METHOD_ATTR)) - .build(); - - Iterator resultSetChunkIterator = - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.execute(entityQueryRequest)); - - List values = new ArrayList<>(); - - while (resultSetChunkIterator.hasNext()) { - ResultSetChunk chunk = resultSetChunkIterator.next(); - - for (Row row : chunk.getRowList()) { - for (int i = 0; i < row.getColumnCount(); i++) { - String value = row.getColumnList().get(i).getString(); - values.add(value); - } - } - } - - assertEquals(4, values.size()); - assertEquals("DISCOVERED", values.get(0)); - assertEquals("POST", values.get(1)); - assertEquals("DISCOVERED", values.get(2)); - assertEquals("POST", values.get(3)); - } - @Nested class TotalEntities { @@ -612,8 +484,7 @@ public void testTotal() { TotalEntitiesRequest totalEntitiesRequest = TotalEntitiesRequest.newBuilder().setEntityType(EntityType.API.name()).build(); TotalEntitiesResponse response = - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.total(totalEntitiesRequest)); + entityQueryServiceClient.total(totalEntitiesRequest, Map.of("x-tenant-id", TENANT_ID)); assertEquals(2, response.getTotal()); } @@ -662,8 +533,7 @@ public void testTotalWithFilter() { .build()) .build(); TotalEntitiesResponse response = - GrpcClientRequestContextUtil.executeWithHeadersContext( - HEADERS, () -> entityQueryServiceClient.total(totalEntitiesRequest)); + entityQueryServiceClient.total(totalEntitiesRequest, Map.of("x-tenant-id", TENANT_ID)); assertEquals(1, response.getTotal()); }