From 34640f388391266fa593c3cd3e1566c7e18c756a Mon Sep 17 00:00:00 2001 From: Renato Date: Fri, 26 May 2023 18:21:30 -0500 Subject: [PATCH 1/2] Add pipeline setup to index and create operations --- .../_helpers/bulk/IngesterOperation.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index 37f6117740..284148e54f 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -80,7 +80,7 @@ private static IngesterOperation createOperation(BulkOperation operation, JsonpM BinaryData binaryDoc = BinaryData.of(create.document(), mapper); size += binaryDoc.size(); newOperation = BulkOperation.of(bo -> bo.create(idx -> { - copyBaseProperties(create, idx); + copyCreateProperties(create, idx); return idx.document(binaryDoc); })); } @@ -102,7 +102,7 @@ private static IngesterOperation indexOperation(BulkOperation operation, JsonpMa BinaryData binaryDoc = BinaryData.of(index.document(), mapper); size += binaryDoc.size(); newOperation = BulkOperation.of(bo -> bo.index(idx -> { - copyBaseProperties(index, idx); + copyIndexProperties(index, idx); return idx.document(binaryDoc); })); } @@ -154,6 +154,16 @@ private static void copyBaseProperties(BulkOperationBase op, BulkOperationBase.A .versionType(op.versionType()); } + private static void copyIndexProperties(IndexOperation op, IndexOperation.Builder builder) { + copyBaseProperties(op, builder); + builder.pipeline(op.pipeline()); + } + + private static void copyCreateProperties(CreateOperation op, CreateOperation.Builder builder) { + copyBaseProperties(op, builder); + builder.pipeline(op.pipeline()); + } + private static int size(String name, @Nullable Boolean value) { if (value != null) { return name.length() + 12; // 12 added chars for "name":"false", From f0cfc0a10c7bdad3c66e5296b2da3c84864a8606 Mon Sep 17 00:00:00 2001 From: Renato Date: Fri, 9 Jun 2023 13:00:21 -0500 Subject: [PATCH 2/2] Add requireAlias to copyProperties methods and add test to BulkIngester --- .../_helpers/bulk/IngesterOperation.java | 2 ++ .../_helpers/bulk/BulkIngesterTest.java | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index 284148e54f..f2a2ac28cb 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -157,11 +157,13 @@ private static void copyBaseProperties(BulkOperationBase op, BulkOperationBase.A private static void copyIndexProperties(IndexOperation op, IndexOperation.Builder builder) { copyBaseProperties(op, builder); builder.pipeline(op.pipeline()); + builder.requireAlias(op.requireAlias()); } private static void copyCreateProperties(CreateOperation op, CreateOperation.Builder builder) { copyBaseProperties(op, builder); builder.pipeline(op.pipeline()); + builder.requireAlias(op.requireAlias()); } private static int size(String name, @Nullable Boolean value) { diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 80d9daec2f..4353a696df 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -29,6 +29,7 @@ import co.elastic.clients.elasticsearch.core.bulk.OperationType; import co.elastic.clients.elasticsearch.end_to_end.RequestTest; import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.JsonpUtils; import co.elastic.clients.json.SimpleJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; @@ -358,6 +359,28 @@ public void beforeBulk(long executionId, BulkRequest request, List context assertEquals("bar", storedRequest.get().routing()); } + @Test + public void pipelineTest() { + String json = "{\"create\":{\"_id\":\"some_id\",\"_index\":\"some_idx\",\"pipeline\":\"pipe\",\"require_alias\":true}}"; + JsonpMapper mapper = new SimpleJsonpMapper(); + + BulkOperation create = BulkOperation.of(o -> o.create(c -> c + .pipeline("pipe") + .requireAlias(true) + .index("some_idx") + .id("some_id") + .document("Some doc") + )); + + String createStr = JsonpUtils.toJsonString(create, mapper); + assertEquals(json, createStr); + + BulkOperation create1 = IngesterOperation.of(create, mapper).operation(); + + String create1Str = JsonpUtils.toJsonString(create1, mapper); + assertEquals(json, create1Str); + } + @Test public void endToEndTest() throws Exception { String index = "bulk-ingester-test";