From 06675bab9764040f0abb7270551b1a73b11c3989 Mon Sep 17 00:00:00 2001 From: Michiel Meeuwissen Date: Tue, 6 Oct 2020 15:37:07 +0200 Subject: [PATCH] MSE-4933 To get the the esPublishDate field as accurate as possible, postpone evaluation and adding it to the json until just before the actual execution. --- .../elasticsearchclient/BulkRequestEntry.java | 30 ++++++++++++++++++- .../vpro/elasticsearchclient/IndexHelper.java | 23 +++++++------- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/BulkRequestEntry.java b/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/BulkRequestEntry.java index f47ef568c..25a1c911e 100644 --- a/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/BulkRequestEntry.java +++ b/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/BulkRequestEntry.java @@ -1,9 +1,11 @@ package nl.vpro.elasticsearchclient; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.util.Map; +import java.util.function.Consumer; import java.util.function.UnaryOperator; import com.fasterxml.jackson.databind.JsonNode; @@ -26,11 +28,37 @@ public class BulkRequestEntry { final Map mdc; final String id; - public BulkRequestEntry(ObjectNode action, ObjectNode source, UnaryOperator unalias, Map mdc) { + @Getter + @Setter + Consumer sourceConsumer; + + + public BulkRequestEntry( + ObjectNode action, + ObjectNode source, + UnaryOperator unalias, + Map mdc) { + this(action, source, unalias, mdc, null); + } + @lombok.Builder + BulkRequestEntry( + ObjectNode action, + ObjectNode source, + UnaryOperator unalias, + Map mdc, + Consumer sourceConsumer) { this.action = action; this.source = source; this.mdc = mdc; this.id = idFromActionNode(action, unalias); + this.sourceConsumer = sourceConsumer; + } + + + public void use() { + if (sourceConsumer != null) { + sourceConsumer.accept(source); + } } public static String idFromActionNode(ObjectNode action) { diff --git a/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/IndexHelper.java b/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/IndexHelper.java index ca63f81af..d7c5c9249 100644 --- a/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/IndexHelper.java +++ b/vpro-shared-elasticsearch-client/src/main/java/nl/vpro/elasticsearchclient/IndexHelper.java @@ -985,12 +985,12 @@ private final BulkRequestEntry _indexRequest(String type, String id, Integer ver } - ObjectNode objectNode = objectMapper.valueToTree(o); - for (Consumer c : consumers) { - c.accept(objectNode); - } - - return new BulkRequestEntry(actionLine, objectNode, this::unalias, mdcSupplier.get()); + ObjectNode objectNode = objectMapper.valueToTree(o); + return new BulkRequestEntry(actionLine, objectNode, this::unalias, mdcSupplier.get(), (on) -> { + for (Consumer c : consumers) { + c.accept(on); + } + }); } @SafeVarargs @@ -1002,12 +1002,14 @@ private final BulkRequestEntry _updateRequest(String id, Object o, Consumer c : consumers) { - c.accept(objectNode); - } + updateNode.set(Fields.DOC, objectNode); updateNode.put(Fields.DOC_AS_UPSERT, false); - return new BulkRequestEntry(actionLine, updateNode, this::unalias, mdcSupplier.get()); + return new BulkRequestEntry(actionLine, updateNode, this::unalias, mdcSupplier.get(), (on) -> { + for (Consumer c : consumers) { + c.accept(on); + } + }); } /** @@ -1026,7 +1028,6 @@ public BulkRequestEntry indexRequest(String type, String id, Object o, String ro /** * @deprecated Types are deprecated in elasticsearch, and will disappear in 8. */ - @SuppressWarnings("DeprecatedIsStillUsed") @Deprecated public BulkRequestEntry deleteRequest(String type, String id) { return _deleteRequest(type, id);