diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java index 1ef83b76c..9d90260e0 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java @@ -8,10 +8,8 @@ import java.util.Optional; import java.util.Set; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; + import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -24,12 +22,10 @@ import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.openmbee.mms.core.exceptions.InternalErrorException; +import org.openmbee.mms.elastic.utils.BulkProcessor; import org.openmbee.mms.elastic.utils.Index; import org.openmbee.mms.json.BaseJson; import org.slf4j.Logger; @@ -209,43 +205,6 @@ public E update(String index, BaseJson json) { } protected BulkProcessor getBulkProcessor(RestHighLevelClient client) { - BulkProcessor.Builder bpBuilder = BulkProcessor.builder((request, bulkListener) -> { - try { - BulkResponse response = client.bulk(request, RequestOptions.DEFAULT); - if (response.hasFailures()) { - String failure = response.buildFailureMessage(); - logger.error("Bulk response error: {}", failure); - throw new InternalErrorException(failure); - } - } catch (IOException ioe) { - logger.error(ioe.getMessage(), ioe); - throw new InternalErrorException(ioe); - } - }, getListener()); - bpBuilder.setBulkActions(bulkLimit); - bpBuilder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); - bpBuilder.setConcurrentRequests(1); - bpBuilder.setFlushInterval(TimeValue.timeValueSeconds(5)); - bpBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3)); - - return bpBuilder.build(); - } - - private BulkProcessor.Listener getListener() { - return new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - throw new InternalErrorException(failure); - } - }; + return new BulkProcessor(client, REQUEST_OPTIONS, bulkLimit); } } diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java index 8f1f31aad..f47a42efe 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java @@ -7,7 +7,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; @@ -23,6 +22,7 @@ import org.openmbee.mms.core.config.ContextHolder; import org.openmbee.mms.core.dao.NodeIndexDAO; import org.openmbee.mms.core.exceptions.InternalErrorException; +import org.openmbee.mms.elastic.utils.BulkProcessor; import org.openmbee.mms.elastic.utils.Index; import org.openmbee.mms.json.BaseJson; import org.openmbee.mms.json.ElementJson; diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java new file mode 100644 index 000000000..8b8f0fc5c --- /dev/null +++ b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java @@ -0,0 +1,60 @@ +package org.openmbee.mms.elastic.utils; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.openmbee.mms.core.exceptions.InternalErrorException; + +import java.io.IOException; + +public class BulkProcessor { + + protected int bulkLimit; + RestHighLevelClient client; + RequestOptions options; + BulkRequest bulkRequest = new BulkRequest(); + + public BulkProcessor(RestHighLevelClient client, RequestOptions options, int bulkLimit) { + this.client = client; + this.options = options; + this.bulkLimit = bulkLimit; + } + + public void add(IndexRequest action) { + bulkRequest.add(action); + clear(); + } + + public void add(UpdateRequest action) { + bulkRequest.add(action); + clear(); + } + + public void clear() { + if (bulkRequest.numberOfActions() >= bulkLimit) { + bulkBatchRequests(); + } + } + + public void close() { + if (bulkRequest.numberOfActions() > 0) { + bulkBatchRequests(); + } + } + + protected void bulkBatchRequests() { + try { + BulkResponse response = client.bulk(bulkRequest, options); + if (response.hasFailures()) { + String failure = response.buildFailureMessage(); + throw new InternalErrorException(failure); + } + bulkRequest = new BulkRequest(); + } catch (IOException ioe) { + throw new InternalErrorException(ioe); + } + } +}