From 0738dee356e4cb16378204267db19196e563de21 Mon Sep 17 00:00:00 2001 From: Ankit Choudhary Date: Thu, 29 Jul 2021 15:04:45 +0530 Subject: [PATCH 1/8] BulkUpdate API --- .../query/service/v1/entity_query_request.proto | 13 +++++++++++++ .../query/service/v1/entity_query_service.proto | 2 ++ 2 files changed, 15 insertions(+) diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto index 9a2cc5c4..338c293f 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto @@ -25,6 +25,19 @@ message EntityUpdateRequest { repeated Expression selection = 4; } +message BulkUpdateRequest { + string entityType = 1; + map bulkUpdateMap = 2; + message EntityUpdateInfo { + repeated UpdateInfo updateInfo = 1; + } +} + +message UpdateInfo { + UpdateOperation updateOperation = 1; + repeated Expression selection = 2; +} + message TotalEntitiesRequest { string entityType = 1; Filter filter = 2; diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto index aba3de64..ac182cab 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto @@ -14,6 +14,8 @@ service EntityQueryService { } rpc update (EntityUpdateRequest) returns (stream ResultSetChunk) { } + rpc bulkUpdate (BulkUpdateRequest) returns (stream ResultSetChunk) { + } rpc total (TotalEntitiesRequest) returns (TotalEntitiesResponse) { } } From db3dc33959250ac4b4a4e56d485a079e31a47aee Mon Sep 17 00:00:00 2001 From: Ankit Choudhary Date: Thu, 29 Jul 2021 17:12:06 +0530 Subject: [PATCH 2/8] proto definitions for BulkUpdate api --- .../query/service/v1/entity_query_request.proto | 11 ++++------- .../query/service/v1/entity_query_service.proto | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto index 338c293f..846947c5 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto @@ -25,19 +25,16 @@ message EntityUpdateRequest { repeated Expression selection = 4; } -message BulkUpdateRequest { +message BulkEntityUpdateRequest { string entityType = 1; + // Map of EntityId, update operations corresponding to this EntityId map bulkUpdateMap = 2; + repeated Expression selection = 3; message EntityUpdateInfo { - repeated UpdateInfo updateInfo = 1; + repeated UpdateOperation updateOperation = 1; } } -message UpdateInfo { - UpdateOperation updateOperation = 1; - repeated Expression selection = 2; -} - message TotalEntitiesRequest { string entityType = 1; Filter filter = 2; diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto index ac182cab..bb4672ac 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_service.proto @@ -14,7 +14,7 @@ service EntityQueryService { } rpc update (EntityUpdateRequest) returns (stream ResultSetChunk) { } - rpc bulkUpdate (BulkUpdateRequest) returns (stream ResultSetChunk) { + rpc bulkUpdate (BulkEntityUpdateRequest) returns (stream ResultSetChunk) { } rpc total (TotalEntitiesRequest) returns (TotalEntitiesResponse) { } From 855ea6769b903130101e515ff2918fb9fc94f78c Mon Sep 17 00:00:00 2001 From: ankitchoudhary111 <35135474+ankitchoudhary111@users.noreply.github.com> Date: Fri, 30 Jul 2021 11:37:53 +0530 Subject: [PATCH 3/8] Update entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto Co-authored-by: SJ <48863181+skjindal93@users.noreply.github.com> --- .../entity/query/service/v1/entity_query_request.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto index 846947c5..9701835b 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto @@ -27,7 +27,7 @@ message EntityUpdateRequest { message BulkEntityUpdateRequest { string entityType = 1; - // Map of EntityId, update operations corresponding to this EntityId + // map of entity id, update operations corresponding to this entity id map bulkUpdateMap = 2; repeated Expression selection = 3; message EntityUpdateInfo { From 9364728ddb3046c4fc923fdca35ea524354ddedb Mon Sep 17 00:00:00 2001 From: ankitchoudhary111 <35135474+ankitchoudhary111@users.noreply.github.com> Date: Fri, 30 Jul 2021 11:38:09 +0530 Subject: [PATCH 4/8] Update entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto Co-authored-by: SJ <48863181+skjindal93@users.noreply.github.com> --- .../entity/query/service/v1/entity_query_request.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto index 9701835b..e30e2d57 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/query/service/v1/entity_query_request.proto @@ -28,7 +28,7 @@ message EntityUpdateRequest { message BulkEntityUpdateRequest { string entityType = 1; // map of entity id, update operations corresponding to this entity id - map bulkUpdateMap = 2; + map entities = 2; repeated Expression selection = 3; message EntityUpdateInfo { repeated UpdateOperation updateOperation = 1; From d08f195ec8117b2093c62b5eb231959fd713a9d4 Mon Sep 17 00:00:00 2001 From: Ankit Choudhary Date: Fri, 30 Jul 2021 11:58:00 +0530 Subject: [PATCH 5/8] - --- .../hypertrace/entity/query/service/EntityQueryServiceImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java index 81ac6480..699a64c7 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java @@ -294,6 +294,7 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request SingleValueKey key = new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId); // TODO better error reporting once doc store exposes the, + if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) { LOG.warn( "Failed to update entity {}, subDocPath {}, with new doc {}.", From 481181660cb5312d9f6a9534b53cb25acd0ae80c Mon Sep 17 00:00:00 2001 From: Ankit Choudhary Date: Fri, 30 Jul 2021 12:02:40 +0530 Subject: [PATCH 6/8] -- --- .../hypertrace/entity/query/service/EntityQueryServiceImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java index 699a64c7..81ac6480 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java @@ -294,7 +294,6 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request SingleValueKey key = new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId); // TODO better error reporting once doc store exposes the, - if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) { LOG.warn( "Failed to update entity {}, subDocPath {}, with new doc {}.", From 3ff0038312574e1e115d7fd84e560d1a5f20d285 Mon Sep 17 00:00:00 2001 From: Ankit Choudhary Date: Fri, 30 Jul 2021 16:40:14 +0530 Subject: [PATCH 7/8] BulkEntityUpdate api --- entity-service-impl/build.gradle.kts | 2 +- .../query/service/EntityQueryServiceImpl.java | 115 +++++++++++++++++- .../service/EntityQueryServiceImplTest.java | 23 +++- entity-service/build.gradle.kts | 2 +- 4 files changed, 128 insertions(+), 14 deletions(-) diff --git a/entity-service-impl/build.gradle.kts b/entity-service-impl/build.gradle.kts index fbe4ae5c..b27bc857 100644 --- a/entity-service-impl/build.gradle.kts +++ b/entity-service-impl/build.gradle.kts @@ -11,7 +11,7 @@ dependencies { annotationProcessor("org.projectlombok:lombok:1.18.18") compileOnly("org.projectlombok:lombok:1.18.18") - implementation("org.hypertrace.core.documentstore:document-store:0.5.8") + implementation("org.hypertrace.core.documentstore:document-store:0.6.0") implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.4.0") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.4.0") implementation("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.12.3") diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java index 81ac6480..2a19836b 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java @@ -10,13 +10,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; +import org.hypertrace.core.documentstore.BulkUpdateResult; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; +import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.SingleValueKey; import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry; import org.hypertrace.core.grpcutils.context.RequestContext; @@ -24,6 +28,7 @@ import org.hypertrace.entity.data.service.v1.AttributeValue; import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.Query; +import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest; import org.hypertrace.entity.query.service.v1.ColumnIdentifier; import org.hypertrace.entity.query.service.v1.ColumnMetadata; import org.hypertrace.entity.query.service.v1.EntityQueryRequest; @@ -37,6 +42,7 @@ import org.hypertrace.entity.query.service.v1.SetAttribute; import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest; import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse; +import org.hypertrace.entity.query.service.v1.UpdateOperation; import org.hypertrace.entity.query.service.v1.Value; import org.hypertrace.entity.query.service.v1.ValueType; import org.hypertrace.entity.service.constants.EntityServiceConstants; @@ -290,19 +296,116 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); + Map> toUpdate = new HashMap<>(); + for (String entityId : request.getEntityIdsList()) { SingleValueKey key = new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId); // TODO better error reporting once doc store exposes the, - if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) { - LOG.warn( - "Failed to update entity {}, subDocPath {}, with new doc {}.", - key, - subDocPath, - jsonValue); + if (toUpdate.containsValue(key)) { + toUpdate.get(key).put(subDocPath, new JSONDocument(jsonValue)); + } else { + Map subDocument = new HashMap<>(); + subDocument.put(subDocPath, new JSONDocument(jsonValue)); + toUpdate.put(key, subDocument); } } + try { + entitiesCollection.bulkUpdateSubDocs(toUpdate); + } catch (Exception e) { + LOG.warn( + "Failed to update entities, subDocPath {}, with new doc {}.", subDocPath, jsonValue); + } + } + } + + @Override + public void bulkUpdate( + BulkEntityUpdateRequest request, StreamObserver responseObserver) { + // Validations + RequestContext requestContext = RequestContext.CURRENT.get(); + Optional tenantId = requestContext.getTenantId(); + if (tenantId.isEmpty()) { + responseObserver.onError(new ServiceException("Tenant id is missing in the request.")); + return; + } + if (StringUtils.isEmpty(request.getEntityType())) { + responseObserver.onError(new ServiceException("Entity type is missing in the request.")); + return; + } + if (request.getEntitiesCount() == 0) { + responseObserver.onError(new ServiceException("Entity IDs are missing in the request.")); + } + // todo hasOperation() check in every entityUpdateInfo + try { + bulkDoUpdate(requestContext, request); + // Finally return the selections + List entityIdsList = new ArrayList<>(); + for (String entityId : request.getEntities().keySet()) { + entityIdsList.add(entityId); + } + Query entitiesQuery = Query.newBuilder().addAllEntityId(entityIdsList).build(); + List docStoreSelections = + entityQueryConverter.convertSelectionsToDocStoreSelections( + requestContext, request.getSelectionList()); + Iterator documentIterator = + entitiesCollection.search( + DocStoreConverter.transform(tenantId.get(), entitiesQuery, docStoreSelections)); + List entities = convertDocsToEntities(documentIterator); + responseObserver.onNext( + convertEntitiesToResultSetChunk(requestContext, entities, request.getSelectionList())); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError( + new ServiceException("Error occurred while executing " + request, e)); + } + } + + private void bulkDoUpdate(RequestContext requestContext, BulkEntityUpdateRequest request) { + Map> toUpdate = new HashMap<>(); + Map documentMap = new HashMap<>(); + for (String entityId : request.getEntities().keySet()) { + documentMap.clear(); + try { + documentMap = + documentMapMaker( + request.getEntities().get(entityId).getUpdateOperationList(), requestContext); + } catch (Exception e) { + LOG.warn("Failed to update entity id {}", entityId); + continue; + } + if (!documentMap.isEmpty()) { + toUpdate.put( + new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId), documentMap); + } + } + + try { + BulkUpdateResult bulkUpdateResult = entitiesCollection.bulkUpdateSubDocs(toUpdate); + } catch (Exception e) { + LOG.warn("Failed to update entities", e); + } + } + + private Map documentMapMaker( + List updateOperationList, RequestContext requestContext) throws IOException { + Map documentMap = new HashMap<>(); + for (UpdateOperation updateOperation : updateOperationList) { + if (updateOperation.hasSetAttribute()) { + SetAttribute setAttribute = updateOperation.getSetAttribute(); + String attributeId = setAttribute.getAttribute().getColumnName(); + String subDocPath = + entityAttributeMapping + .getDocStorePathByAttributeId(requestContext, attributeId) + .orElseThrow( + () -> new IllegalArgumentException("Unknown attribute FQN " + attributeId)); + AttributeValue attributeValue = + EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); + String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); + documentMap.put(subDocPath, new JSONDocument(jsonValue)); + } } + return documentMap; } @Override diff --git a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java index 7554142a..36ab9740 100644 --- a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java +++ b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java @@ -14,8 +14,10 @@ import io.grpc.Context; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; +import org.hypertrace.core.documentstore.BulkUpdateResult; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.Filter; @@ -196,10 +198,13 @@ public void testUpdate_success() throws Exception { }); verify(mockEntitiesCollection, times(1)) - .updateSubDoc( - eq(new SingleValueKey("tenant1", "entity-id-1")), - eq("attributes.status"), - eq(new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))); + .bulkUpdateSubDocs( + eq( + Map.of( + new SingleValueKey("tenant1", "entity-id-1"), + Map.of( + "attributes.status", + new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))))); } @Test @@ -485,9 +490,15 @@ public void test_sendCorrectTotalResponse() throws Exception { } } - private Collection mockEntitiesCollection() { + private Collection mockEntitiesCollection() throws Exception { // mock successful update - return when(entitiesCollection.updateSubDoc(any(), any(), any())).thenReturn(true).getMock(); + try { + return when(entitiesCollection.bulkUpdateSubDocs(any())) + .thenReturn(new BulkUpdateResult(0)) + .getMock(); + } catch (Exception e) { + throw e; + } } private RequestContext mockRequestContextWithTenantId() { diff --git a/entity-service/build.gradle.kts b/entity-service/build.gradle.kts index 869769ef..8083abfe 100644 --- a/entity-service/build.gradle.kts +++ b/entity-service/build.gradle.kts @@ -18,7 +18,7 @@ dependencies { implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.4.0") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.4.0") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.28") - implementation("org.hypertrace.core.documentstore:document-store:0.5.8") + implementation("org.hypertrace.core.documentstore:document-store:0.6.0") runtimeOnly("io.grpc:grpc-netty:1.36.1") constraints { From ebf1d2cbd710a8bf8bc8d684f503f44543206ed0 Mon Sep 17 00:00:00 2001 From: Ankit Choudhary Date: Fri, 30 Jul 2021 16:46:41 +0530 Subject: [PATCH 8/8] . --- .../hypertrace/entity/query/service/EntityQueryServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java index 2a19836b..85c6e5dc 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java @@ -318,7 +318,7 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request } } } - + @Override public void bulkUpdate( BulkEntityUpdateRequest request, StreamObserver responseObserver) {