From 5d5f2ad961d9a2c842fb12712f24ea140977630d Mon Sep 17 00:00:00 2001 From: Jason Han Date: Wed, 17 Feb 2021 10:27:04 -0800 Subject: [PATCH 1/4] Bump version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 88dfda6dc..efead23bc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=4.0.1 +version=4.0.2 group=org.openmbee.mms springBootVersion=2.2.6.RELEASE From 550ce361d29dd5c18748ff26731f00edb9725688 Mon Sep 17 00:00:00 2001 From: Jason Han Date: Wed, 17 Feb 2021 16:38:58 -0800 Subject: [PATCH 2/4] Add custom bulk processor --- .../mms/elastic/BaseElasticDAOImpl.java | 47 +------------- .../mms/elastic/NodeElasticDAOImpl.java | 2 +- .../mms/elastic/utils/BulkProcessor.java | 65 +++++++++++++++++++ 3 files changed, 69 insertions(+), 45 deletions(-) create mode 100644 elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java index 1ef83b76c..9d90260e0 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java @@ -8,10 +8,8 @@ import java.util.Optional; import java.util.Set; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; + import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -24,12 +22,10 @@ import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.openmbee.mms.core.exceptions.InternalErrorException; +import org.openmbee.mms.elastic.utils.BulkProcessor; import org.openmbee.mms.elastic.utils.Index; import org.openmbee.mms.json.BaseJson; import org.slf4j.Logger; @@ -209,43 +205,6 @@ public E update(String index, BaseJson json) { } protected BulkProcessor getBulkProcessor(RestHighLevelClient client) { - BulkProcessor.Builder bpBuilder = BulkProcessor.builder((request, bulkListener) -> { - try { - BulkResponse response = client.bulk(request, RequestOptions.DEFAULT); - if (response.hasFailures()) { - String failure = response.buildFailureMessage(); - logger.error("Bulk response error: {}", failure); - throw new InternalErrorException(failure); - } - } catch (IOException ioe) { - logger.error(ioe.getMessage(), ioe); - throw new InternalErrorException(ioe); - } - }, getListener()); - bpBuilder.setBulkActions(bulkLimit); - bpBuilder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); - bpBuilder.setConcurrentRequests(1); - bpBuilder.setFlushInterval(TimeValue.timeValueSeconds(5)); - bpBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3)); - - return bpBuilder.build(); - } - - private BulkProcessor.Listener getListener() { - return new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - throw new InternalErrorException(failure); - } - }; + return new BulkProcessor(client, REQUEST_OPTIONS, bulkLimit); } } diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java index 8f1f31aad..f47a42efe 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java @@ -7,7 +7,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; @@ -23,6 +22,7 @@ import org.openmbee.mms.core.config.ContextHolder; import org.openmbee.mms.core.dao.NodeIndexDAO; import org.openmbee.mms.core.exceptions.InternalErrorException; +import org.openmbee.mms.elastic.utils.BulkProcessor; import org.openmbee.mms.elastic.utils.Index; import org.openmbee.mms.json.BaseJson; import org.openmbee.mms.json.ElementJson; diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java new file mode 100644 index 000000000..849d5bdff --- /dev/null +++ b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java @@ -0,0 +1,65 @@ +package org.openmbee.mms.elastic.utils; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.openmbee.mms.core.exceptions.InternalErrorException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +public class BulkProcessor { + + protected int bulkLimit; + RestHighLevelClient client; + RequestOptions options; + List> allRequests = new ArrayList<>(); + + public BulkProcessor(RestHighLevelClient client, RequestOptions options, int bulkLimit) { + this.client = client; + this.options = options; + this.bulkLimit = bulkLimit; + } + + public void add(IndexRequest action) { + allRequests.add(action); + clear(); + } + + public void add(UpdateRequest action) { + allRequests.add(action); + clear(); + } + + public void clear() { + if (allRequests.size() > bulkLimit) { + bulkBatchRequests(allRequests); + allRequests = new ArrayList<>(); + } + } + + public void close() { + batches(allRequests, bulkLimit).forEach(this::bulkBatchRequests); + } + + protected static Stream> batches(List source, int length) { + return IntStream.iterate(0, i -> i < source.size(), i -> i + length) + .mapToObj(i -> source.subList(i, Math.min(i + length, source.size()))); + } + + protected void bulkBatchRequests(List> actionRequest) { + BulkRequest bulkRequest = new BulkRequest(); + actionRequest.forEach(bulkRequest::add); + try { + client.bulk(bulkRequest, options); + } catch (IOException ioe) { + throw new InternalErrorException(ioe); + } + } +} From 52d20e177c9592213013e81b72df4429dbf60ab6 Mon Sep 17 00:00:00 2001 From: Jason Han Date: Wed, 17 Feb 2021 17:01:04 -0800 Subject: [PATCH 3/4] Optimize --- .../org/openmbee/mms/elastic/utils/BulkProcessor.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java index 849d5bdff..d5c5da870 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java @@ -11,8 +11,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.stream.IntStream; -import java.util.stream.Stream; public class BulkProcessor { @@ -38,19 +36,14 @@ public void add(UpdateRequest action) { } public void clear() { - if (allRequests.size() > bulkLimit) { + if (allRequests.size() >= bulkLimit) { bulkBatchRequests(allRequests); allRequests = new ArrayList<>(); } } public void close() { - batches(allRequests, bulkLimit).forEach(this::bulkBatchRequests); - } - - protected static Stream> batches(List source, int length) { - return IntStream.iterate(0, i -> i < source.size(), i -> i + length) - .mapToObj(i -> source.subList(i, Math.min(i + length, source.size()))); + bulkBatchRequests(allRequests); } protected void bulkBatchRequests(List> actionRequest) { From d81f716d2143e129cf0fd932f9a78238e2c07c1c Mon Sep 17 00:00:00 2001 From: Jason Han Date: Wed, 17 Feb 2021 17:19:18 -0800 Subject: [PATCH 4/4] Remove redundant list and simplify --- .../mms/elastic/utils/BulkProcessor.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java index d5c5da870..8b8f0fc5c 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/utils/BulkProcessor.java @@ -1,7 +1,7 @@ package org.openmbee.mms.elastic.utils; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; @@ -9,15 +9,13 @@ import org.openmbee.mms.core.exceptions.InternalErrorException; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; public class BulkProcessor { protected int bulkLimit; RestHighLevelClient client; RequestOptions options; - List> allRequests = new ArrayList<>(); + BulkRequest bulkRequest = new BulkRequest(); public BulkProcessor(RestHighLevelClient client, RequestOptions options, int bulkLimit) { this.client = client; @@ -26,31 +24,35 @@ public BulkProcessor(RestHighLevelClient client, RequestOptions options, int bul } public void add(IndexRequest action) { - allRequests.add(action); + bulkRequest.add(action); clear(); } public void add(UpdateRequest action) { - allRequests.add(action); + bulkRequest.add(action); clear(); } public void clear() { - if (allRequests.size() >= bulkLimit) { - bulkBatchRequests(allRequests); - allRequests = new ArrayList<>(); + if (bulkRequest.numberOfActions() >= bulkLimit) { + bulkBatchRequests(); } } public void close() { - bulkBatchRequests(allRequests); + if (bulkRequest.numberOfActions() > 0) { + bulkBatchRequests(); + } } - protected void bulkBatchRequests(List> actionRequest) { - BulkRequest bulkRequest = new BulkRequest(); - actionRequest.forEach(bulkRequest::add); + protected void bulkBatchRequests() { try { - client.bulk(bulkRequest, options); + BulkResponse response = client.bulk(bulkRequest, options); + if (response.hasFailures()) { + String failure = response.buildFailureMessage(); + throw new InternalErrorException(failure); + } + bulkRequest = new BulkRequest(); } catch (IOException ioe) { throw new InternalErrorException(ioe); }