diff --git a/entity-service-impl/build.gradle.kts b/entity-service-impl/build.gradle.kts index b27bc857..fbe4ae5c 100644 --- a/entity-service-impl/build.gradle.kts +++ b/entity-service-impl/build.gradle.kts @@ -11,7 +11,7 @@ dependencies { annotationProcessor("org.projectlombok:lombok:1.18.18") compileOnly("org.projectlombok:lombok:1.18.18") - implementation("org.hypertrace.core.documentstore:document-store:0.6.0") + implementation("org.hypertrace.core.documentstore:document-store:0.5.8") implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.4.0") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.4.0") implementation("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.12.3") diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java index 85c6e5dc..81ac6480 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java @@ -10,17 +10,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; -import org.hypertrace.core.documentstore.BulkUpdateResult; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; -import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.SingleValueKey; import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry; import org.hypertrace.core.grpcutils.context.RequestContext; @@ -28,7 +24,6 @@ import org.hypertrace.entity.data.service.v1.AttributeValue; import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.Query; -import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest; import org.hypertrace.entity.query.service.v1.ColumnIdentifier; import org.hypertrace.entity.query.service.v1.ColumnMetadata; import org.hypertrace.entity.query.service.v1.EntityQueryRequest; @@ -42,7 +37,6 @@ import org.hypertrace.entity.query.service.v1.SetAttribute; import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest; import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse; -import org.hypertrace.entity.query.service.v1.UpdateOperation; import org.hypertrace.entity.query.service.v1.Value; import org.hypertrace.entity.query.service.v1.ValueType; import org.hypertrace.entity.service.constants.EntityServiceConstants; @@ -296,116 +290,19 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); - Map> toUpdate = new HashMap<>(); - for (String entityId : request.getEntityIdsList()) { SingleValueKey key = new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId); // TODO better error reporting once doc store exposes the, - if (toUpdate.containsValue(key)) { - toUpdate.get(key).put(subDocPath, new JSONDocument(jsonValue)); - } else { - Map subDocument = new HashMap<>(); - subDocument.put(subDocPath, new JSONDocument(jsonValue)); - toUpdate.put(key, subDocument); + if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) { + LOG.warn( + "Failed to update entity {}, subDocPath {}, with new doc {}.", + key, + subDocPath, + jsonValue); } } - try { - entitiesCollection.bulkUpdateSubDocs(toUpdate); - } catch (Exception e) { - LOG.warn( - "Failed to update entities, subDocPath {}, with new doc {}.", subDocPath, jsonValue); - } - } - } - - @Override - public void bulkUpdate( - BulkEntityUpdateRequest request, StreamObserver responseObserver) { - // Validations - RequestContext requestContext = RequestContext.CURRENT.get(); - Optional tenantId = requestContext.getTenantId(); - if (tenantId.isEmpty()) { - responseObserver.onError(new ServiceException("Tenant id is missing in the request.")); - return; - } - if (StringUtils.isEmpty(request.getEntityType())) { - responseObserver.onError(new ServiceException("Entity type is missing in the request.")); - return; - } - if (request.getEntitiesCount() == 0) { - responseObserver.onError(new ServiceException("Entity IDs are missing in the request.")); - } - // todo hasOperation() check in every entityUpdateInfo - try { - bulkDoUpdate(requestContext, request); - // Finally return the selections - List entityIdsList = new ArrayList<>(); - for (String entityId : request.getEntities().keySet()) { - entityIdsList.add(entityId); - } - Query entitiesQuery = Query.newBuilder().addAllEntityId(entityIdsList).build(); - List docStoreSelections = - entityQueryConverter.convertSelectionsToDocStoreSelections( - requestContext, request.getSelectionList()); - Iterator documentIterator = - entitiesCollection.search( - DocStoreConverter.transform(tenantId.get(), entitiesQuery, docStoreSelections)); - List entities = convertDocsToEntities(documentIterator); - responseObserver.onNext( - convertEntitiesToResultSetChunk(requestContext, entities, request.getSelectionList())); - responseObserver.onCompleted(); - } catch (Exception e) { - responseObserver.onError( - new ServiceException("Error occurred while executing " + request, e)); - } - } - - private void bulkDoUpdate(RequestContext requestContext, BulkEntityUpdateRequest request) { - Map> toUpdate = new HashMap<>(); - Map documentMap = new HashMap<>(); - for (String entityId : request.getEntities().keySet()) { - documentMap.clear(); - try { - documentMap = - documentMapMaker( - request.getEntities().get(entityId).getUpdateOperationList(), requestContext); - } catch (Exception e) { - LOG.warn("Failed to update entity id {}", entityId); - continue; - } - if (!documentMap.isEmpty()) { - toUpdate.put( - new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId), documentMap); - } - } - - try { - BulkUpdateResult bulkUpdateResult = entitiesCollection.bulkUpdateSubDocs(toUpdate); - } catch (Exception e) { - LOG.warn("Failed to update entities", e); - } - } - - private Map documentMapMaker( - List updateOperationList, RequestContext requestContext) throws IOException { - Map documentMap = new HashMap<>(); - for (UpdateOperation updateOperation : updateOperationList) { - if (updateOperation.hasSetAttribute()) { - SetAttribute setAttribute = updateOperation.getSetAttribute(); - String attributeId = setAttribute.getAttribute().getColumnName(); - String subDocPath = - entityAttributeMapping - .getDocStorePathByAttributeId(requestContext, attributeId) - .orElseThrow( - () -> new IllegalArgumentException("Unknown attribute FQN " + attributeId)); - AttributeValue attributeValue = - EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build(); - String jsonValue = DocStoreJsonFormat.printer().print(attributeValue); - documentMap.put(subDocPath, new JSONDocument(jsonValue)); - } } - return documentMap; } @Override diff --git a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java index 36ab9740..7554142a 100644 --- a/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java +++ b/entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java @@ -14,10 +14,8 @@ import io.grpc.Context; import io.grpc.stub.StreamObserver; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; -import org.hypertrace.core.documentstore.BulkUpdateResult; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.Filter; @@ -198,13 +196,10 @@ public void testUpdate_success() throws Exception { }); verify(mockEntitiesCollection, times(1)) - .bulkUpdateSubDocs( - eq( - Map.of( - new SingleValueKey("tenant1", "entity-id-1"), - Map.of( - "attributes.status", - new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))))); + .updateSubDoc( + eq(new SingleValueKey("tenant1", "entity-id-1")), + eq("attributes.status"), + eq(new JSONDocument(DocStoreJsonFormat.printer().print(newStatus)))); } @Test @@ -490,15 +485,9 @@ public void test_sendCorrectTotalResponse() throws Exception { } } - private Collection mockEntitiesCollection() throws Exception { + private Collection mockEntitiesCollection() { // mock successful update - try { - return when(entitiesCollection.bulkUpdateSubDocs(any())) - .thenReturn(new BulkUpdateResult(0)) - .getMock(); - } catch (Exception e) { - throw e; - } + return when(entitiesCollection.updateSubDoc(any(), any(), any())).thenReturn(true).getMock(); } private RequestContext mockRequestContextWithTenantId() { diff --git a/entity-service/build.gradle.kts b/entity-service/build.gradle.kts index 8083abfe..869769ef 100644 --- a/entity-service/build.gradle.kts +++ b/entity-service/build.gradle.kts @@ -18,7 +18,7 @@ dependencies { implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.4.0") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.4.0") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.28") - implementation("org.hypertrace.core.documentstore:document-store:0.6.0") + implementation("org.hypertrace.core.documentstore:document-store:0.5.8") runtimeOnly("io.grpc:grpc-netty:1.36.1") constraints {