Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package nl.vpro.elasticsearchclient;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -26,11 +28,37 @@ public class BulkRequestEntry {
final Map<String, String> mdc;
final String id;

public BulkRequestEntry(ObjectNode action, ObjectNode source, UnaryOperator<String> unalias, Map<String, String> mdc) {
@Getter
@Setter
Consumer<ObjectNode> sourceConsumer;


public BulkRequestEntry(
ObjectNode action,
ObjectNode source,
UnaryOperator<String> unalias,
Map<String, String> mdc) {
this(action, source, unalias, mdc, null);
}
@lombok.Builder
BulkRequestEntry(
ObjectNode action,
ObjectNode source,
UnaryOperator<String> unalias,
Map<String, String> mdc,
Consumer<ObjectNode> sourceConsumer) {
this.action = action;
this.source = source;
this.mdc = mdc;
this.id = idFromActionNode(action, unalias);
this.sourceConsumer = sourceConsumer;
}


public void use() {
if (sourceConsumer != null) {
sourceConsumer.accept(source);
}
}

public static String idFromActionNode(ObjectNode action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,12 +985,12 @@ private final BulkRequestEntry _indexRequest(String type, String id, Integer ver
}


ObjectNode objectNode = objectMapper.valueToTree(o);
for (Consumer<ObjectNode> c : consumers) {
c.accept(objectNode);
}

return new BulkRequestEntry(actionLine, objectNode, this::unalias, mdcSupplier.get());
ObjectNode objectNode = objectMapper.valueToTree(o);
return new BulkRequestEntry(actionLine, objectNode, this::unalias, mdcSupplier.get(), (on) -> {
for (Consumer<ObjectNode> c : consumers) {
c.accept(on);
}
});
}

@SafeVarargs
Expand All @@ -1002,12 +1002,14 @@ private final BulkRequestEntry _updateRequest(String id, Object o, Consumer<Obje
update.put(RETRY_ON_CONFLICT, 3);
ObjectNode updateNode = objectMapper.createObjectNode();
ObjectNode objectNode = objectMapper.valueToTree(o);
for (Consumer<ObjectNode> c : consumers) {
c.accept(objectNode);
}

updateNode.set(Fields.DOC, objectNode);
updateNode.put(Fields.DOC_AS_UPSERT, false);
return new BulkRequestEntry(actionLine, updateNode, this::unalias, mdcSupplier.get());
return new BulkRequestEntry(actionLine, updateNode, this::unalias, mdcSupplier.get(), (on) -> {
for (Consumer<ObjectNode> c : consumers) {
c.accept(on);
}
});
}

/**
Expand All @@ -1026,7 +1028,6 @@ public BulkRequestEntry indexRequest(String type, String id, Object o, String ro
/**
* @deprecated Types are deprecated in elasticsearch, and will disappear in 8.
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public BulkRequestEntry deleteRequest(String type, String id) {
return _deleteRequest(type, id);
Expand Down