diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto index 9a2cc5c4..e30e2d57 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto @@ -25,6 +25,16 @@ message EntityUpdateRequest { repeated Expression selection = 4; } +message BulkEntityUpdateRequest { + string entityType = 1; + // map of entity id, update operations corresponding to this entity id + map entities = 2; + repeated Expression selection = 3; + message EntityUpdateInfo { + repeated UpdateOperation updateOperation = 1; + } +} + message TotalEntitiesRequest { string entityType = 1; Filter filter = 2; diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto index aba3de64..bb4672ac 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto @@ -14,6 +14,8 @@ service EntityQueryService { } rpc update (EntityUpdateRequest) returns (stream ResultSetChunk) { } + rpc bulkUpdate (BulkEntityUpdateRequest) returns (stream ResultSetChunk) { + } rpc total (TotalEntitiesRequest) returns (TotalEntitiesResponse) { } } diff --git a/entity-service-impl/build.gradle.kts b/entity-service-impl/build.gradle.kts index fbe4ae5c..b27bc857 100644 --- a/entity-service-impl/build.gradle.kts +++ b/entity-service-impl/build.gradle.kts @@ -11,7 +11,7 @@ dependencies { annotationProcessor("org.projectlombok:lombok:1.18.18") compileOnly("org.projectlombok:lombok:1.18.18") - implementation("org.hypertrace.core.documentstore:document-store:0.5.8") + implementation("org.hypertrace.core.documentstore:document-store:0.6.0") implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.4.0") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.4.0") implementation("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.12.3") diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java index 81ac6480..85c6e5dc 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java @@ -10,13 +10,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; +import org.hypertrace.core.documentstore.BulkUpdateResult; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; +import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.SingleValueKey; import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry; import org.hypertrace.core.grpcutils.context.RequestContext; @@ -24,6 +28,7 @@ import org.hypertrace.entity.data.service.v1.AttributeValue; import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.Query; +import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest; import org.hypertrace.entity.query.service.v1.ColumnIdentifier; import org.hypertrace.entity.query.service.v1.ColumnMetadata; import org.hypertrace.entity.query.service.v1.EntityQueryRequest; @@ -37,6 +42,7 @@ import org.hypertrace.entity.query.service.v1.SetAttribute; import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest; import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse; +import org.hypertrace.entity.query.service.v1.UpdateOperation; import org.hypertrace.entity.query.service.v1.Value; import org.hypertrace.entity.query.service.v1.ValueType; import org.hypertrace.entity.service.constants.EntityServiceConstants; @@ -290,19 +296,116 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); + Map> toUpdate = new HashMap<>(); + for (String entityId : request.getEntityIdsList()) { SingleValueKey key = new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId); // TODO better error reporting once doc store exposes the, - if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) { - LOG.warn( - "Failed to update entity {}, subDocPath {}, with new doc {}.", - key, - subDocPath, - jsonValue); + if (toUpdate.containsValue(key)) { + toUpdate.get(key).put(subDocPath, new JSONDocument(jsonValue)); + } else { + Map subDocument = new HashMap<>(); + subDocument.put(subDocPath, new JSONDocument(jsonValue)); + toUpdate.put(key, subDocument); } } + try { + entitiesCollection.bulkUpdateSubDocs(toUpdate); + } catch (Exception e) { + LOG.warn( + "Failed to update entities, subDocPath {}, with new doc {}.", subDocPath, jsonValue); + } + } + } + + @Override + public void bulkUpdate( + BulkEntityUpdateRequest request, StreamObserver responseObserver) { + // Validations + RequestContext requestContext = RequestContext.CURRENT.get(); + Optional tenantId = requestContext.getTenantId(); + if (tenantId.isEmpty()) { + responseObserver.onError(new ServiceException("Tenant id is missing in the request.")); + return; + } + if (StringUtils.isEmpty(request.getEntityType())) { + responseObserver.onError(new ServiceException("Entity type is missing in the request.")); + return; + } + if (request.getEntitiesCount() == 0) { + responseObserver.onError(new ServiceException("Entity IDs are missing in the request.")); + } + // todo hasOperation() check in every entityUpdateInfo + try { + bulkDoUpdate(requestContext, request); + // Finally return the selections + List entityIdsList = new ArrayList<>(); + for (String entityId : request.getEntities().keySet()) { + entityIdsList.add(entityId); + } + Query entitiesQuery = Query.newBuilder().addAllEntityId(entityIdsList).build(); + List docStoreSelections = + entityQueryConverter.convertSelectionsToDocStoreSelections( + requestContext, request.getSelectionList()); + Iterator documentIterator = + entitiesCollection.search( + DocStoreConverter.transform(tenantId.get(), entitiesQuery, docStoreSelections)); + List entities = convertDocsToEntities(documentIterator); + responseObserver.onNext( + convertEntitiesToResultSetChunk(requestContext, entities, request.getSelectionList())); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError( + new ServiceException("Error occurred while executing " + request, e)); + } + } + + private void bulkDoUpdate(RequestContext requestContext, BulkEntityUpdateRequest request) { + Map> toUpdate = new HashMap<>(); + Map documentMap = new HashMap<>(); + for (String entityId : request.getEntities().keySet()) { + documentMap.clear(); + try { + documentMap = + documentMapMaker( + request.getEntities().get(entityId).getUpdateOperationList(), requestContext); + } catch (Exception e) { + LOG.warn("Failed to update entity id {}", entityId); + continue; + } + if (!documentMap.isEmpty()) { + toUpdate.put( + new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId), documentMap); + } + } + + try { + BulkUpdateResult bulkUpdateResult = entitiesCollection.bulkUpdateSubDocs(toUpdate); + } catch (Exception e) { + LOG.warn("Failed to update entities", e); + } + } + + private Map documentMapMaker( + List updateOperationList, RequestContext requestContext) throws IOException { + Map documentMap = new HashMap<>(); + for (UpdateOperation updateOperation : updateOperationList) { + if (updateOperation.hasSetAttribute()) { + SetAttribute setAttribute = updateOperation.getSetAttribute(); + String attributeId = setAttribute.getAttribute().getColumnName(); + String subDocPath = + entityAttributeMapping + .getDocStorePathByAttributeId(requestContext, attributeId) + .orElseThrow( + () -> new IllegalArgumentException("Unknown attribute FQN " + attributeId)); + AttributeValue attributeValue = + EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); + String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); + documentMap.put(subDocPath, new JSONDocument(jsonValue)); + } } + return documentMap; } @Override diff --git a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java index 7554142a..36ab9740 100644 --- a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java +++ b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java @@ -14,8 +14,10 @@ import io.grpc.Context; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; +import org.hypertrace.core.documentstore.BulkUpdateResult; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.Filter; @@ -196,10 +198,13 @@ public void testUpdate_success() throws Exception { }); verify(mockEntitiesCollection, times(1)) - .updateSubDoc( - eq(new SingleValueKey("tenant1", "entity-id-1")), - eq("attributes.status"), - eq(new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))); + .bulkUpdateSubDocs( + eq( + Map.of( + new SingleValueKey("tenant1", "entity-id-1"), + Map.of( + "attributes.status", + new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))))); } @Test @@ -485,9 +490,15 @@ public void test_sendCorrectTotalResponse() throws Exception { } } - private Collection mockEntitiesCollection() { + private Collection mockEntitiesCollection() throws Exception { // mock successful update - return when(entitiesCollection.updateSubDoc(any(), any(), any())).thenReturn(true).getMock(); + try { + return when(entitiesCollection.bulkUpdateSubDocs(any())) + .thenReturn(new BulkUpdateResult(0)) + .getMock(); + } catch (Exception e) { + throw e; + } } private RequestContext mockRequestContextWithTenantId() { diff --git a/entity-service/build.gradle.kts b/entity-service/build.gradle.kts index 869769ef..8083abfe 100644 --- a/entity-service/build.gradle.kts +++ b/entity-service/build.gradle.kts @@ -18,7 +18,7 @@ dependencies { implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.4.0") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.4.0") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.28") - implementation("org.hypertrace.core.documentstore:document-store:0.5.8") + implementation("org.hypertrace.core.documentstore:document-store:0.6.0") runtimeOnly("io.grpc:grpc-netty:1.36.1") constraints {