From 115c17354c20489df981dea9ed38dfa406c225cd Mon Sep 17 00:00:00 2001 From: Jason Han Date: Thu, 3 Sep 2020 14:30:33 -0700 Subject: [PATCH 1/4] Adding new bulk processor for elasticsearch implementation --- Dockerfile | 4 +- .../sdvc/elastic/BaseElasticDAOImpl.java | 63 ++++++++++++++++--- example/example.gradle | 1 - .../example/config/ExampleSecurityConfig.java | 5 -- gradle.properties | 2 +- .../TwcRevisionMmsCommitMapService.java | 2 +- 6 files changed, 57 insertions(+), 20 deletions(-) diff --git a/Dockerfile b/Dockerfile index b1e42e79b..70d3db842 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ FROM openjdk:11.0.8-jdk COPY . /mms WORKDIR /mms -RUN ./gradlew --no-daemon build -x test +RUN ./gradlew --no-daemon bootJar RUN cp /mms/example/build/libs/example*.jar /app.jar -ENTRYPOINT ["java", "--add-opens", "java.base/java.lang=ALL-UNNAMED","-jar", "/app.jar"] +ENTRYPOINT ["java", "-Djdk.tls.client.protocols=TLSv1", "--add-opens", "java.base/java.lang=ALL-UNNAMED", "-jar", "/app.jar"] EXPOSE 8080 diff --git a/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java index 179070f03..49c9e6b1d 100644 --- a/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java @@ -7,8 +7,14 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -21,8 +27,12 @@ import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.openmbee.sdvc.core.exceptions.SdvcException; import org.openmbee.sdvc.elastic.utils.Index; import org.openmbee.sdvc.json.BaseJson; import org.springframework.beans.factory.annotation.Autowired; @@ -31,6 +41,8 @@ public abstract class BaseElasticDAOImpl> { + private final Logger logger = LogManager.getLogger(getClass()); + @Value("${elasticsearch.limit.result}") protected int resultLimit; @Value("${elasticsearch.limit.term}") @@ -39,10 +51,10 @@ public abstract class BaseElasticDAOImpl> { protected RestHighLevelClient client; private static final RequestOptions REQUEST_OPTIONS; static { - RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + RequestOptions.Builder requestBuilder = RequestOptions.DEFAULT.toBuilder(); // TODO: Should be configureable - builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(1024 * 1024 * 1024)); - REQUEST_OPTIONS = builder.build(); + requestBuilder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(1024 * 1024 * 1024)); + REQUEST_OPTIONS = requestBuilder.build(); } @Autowired @@ -133,18 +145,18 @@ public List findAllById(String index, Set docIds) { } public void indexAll(String index, Collection jsons) { + BulkProcessor bulkProcessor = getBulkProcessor(client); try { - BulkRequest bulkIndex = new BulkRequest(); - for (BaseJson json : jsons) { - bulkIndex.add(new IndexRequest(index).id(json.getDocId()).source(json)); + if(!bulkProcessor.awaitClose(1200L, TimeUnit.SECONDS)) { + logger.error("Timed out in bulk processing"); } - client.bulk(bulkIndex, REQUEST_OPTIONS); - } catch (IOException e) { - throw new RuntimeException(e); + } catch (InterruptedException e) { + logger.error(e); } + } - public void index(String index, BaseJson json) { + public void index(String index, BaseJson json) { try { client.index(new IndexRequest(index).id(json.getDocId()).source(json), REQUEST_OPTIONS); @@ -174,4 +186,35 @@ public E update(String index, BaseJson json) { } return response; } + + private static BulkProcessor getBulkProcessor(RestHighLevelClient client) { + return getBulkProcessor(client, null); + } + + private static BulkProcessor getBulkProcessor(RestHighLevelClient client, BulkProcessor.Listener listener) { + if (listener == null) { + listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + } + }; + } + BulkProcessor.Builder bpBuilder = BulkProcessor.builder((request, bulkListener) -> client + .bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); + bpBuilder.setBulkActions(5000); + bpBuilder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + bpBuilder.setConcurrentRequests(1); + bpBuilder.setFlushInterval(TimeValue.timeValueSeconds(5)); + bpBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3)); + + return bpBuilder.build(); + } } diff --git a/example/example.gradle b/example/example.gradle index 8aefd1c54..946eebb4f 100644 --- a/example/example.gradle +++ b/example/example.gradle @@ -24,7 +24,6 @@ dependencies { project(':jupyter'), project(':permissions'), project(':webhooks'), - project(':twc'), project(':search'), project(':artifacts'), 'org.springframework.boot:spring-boot-starter-web', diff --git a/example/src/main/java/org/openmbee/sdvc/example/config/ExampleSecurityConfig.java b/example/src/main/java/org/openmbee/sdvc/example/config/ExampleSecurityConfig.java index 9aea0ce22..f42a5657c 100644 --- a/example/src/main/java/org/openmbee/sdvc/example/config/ExampleSecurityConfig.java +++ b/example/src/main/java/org/openmbee/sdvc/example/config/ExampleSecurityConfig.java @@ -1,7 +1,6 @@ package org.openmbee.sdvc.example.config; import org.openmbee.sdvc.authenticator.config.AuthSecurityConfig; -import org.openmbee.sdvc.twc.config.TwcAuthSecurityConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -36,15 +35,11 @@ public class ExampleSecurityConfig extends WebSecurityConfigurerAdapter implemen @Autowired AuthSecurityConfig authSecurityConfig; - @Autowired - TwcAuthSecurityConfig twcAuthSecurityConfig; - @Override public void configure(HttpSecurity http) throws Exception { http.csrf().disable().authorizeRequests().anyRequest().permitAll().and().httpBasic(); http.headers().cacheControl(); http.addFilterAfter(corsFilter(), ExceptionTranslationFilter.class); - twcAuthSecurityConfig.setAuthConfig(http); authSecurityConfig.setAuthConfig(http); } diff --git a/gradle.properties b/gradle.properties index 8eb2def0d..9906d3482 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,4 +7,4 @@ springSecurityVersion=5.3.1.RELEASE springDataVersion=2.2.6.RELEASE jacksonVersion=2.10.3 log4jVersion=2.13.1 -elasticVersion=7.1.1 +elasticVersion=7.9.0 diff --git a/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java b/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java index de07f4a79..bd7f6ca30 100644 --- a/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java +++ b/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java @@ -36,7 +36,7 @@ public void setBranchRepository(BranchDAO branchRepository) { */ public CommitsResponse updateTwcRevisionID(String projectId, String commitId, String revisionId) { CommitsResponse commitsResponse = new CommitsResponse(); - if (revisionId.isEmpty() || revisionId.isBlank()) { + if (revisionId.isEmpty()) { return commitsResponse.addMessage("Revision id can not be empty"); } ContextHolder.setContext(projectId); From 381230aafba6d9e4fc0894ce368f5884c879d17e Mon Sep 17 00:00:00 2001 From: Jason Han Date: Tue, 15 Sep 2020 17:58:06 -0700 Subject: [PATCH 2/4] Add back IndexRequest adding --- .../java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java index 49c9e6b1d..cc3152c51 100644 --- a/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java @@ -146,6 +146,9 @@ public List findAllById(String index, Set docIds) { public void indexAll(String index, Collection jsons) { BulkProcessor bulkProcessor = getBulkProcessor(client); + for (BaseJson json : jsons) { + bulkProcessor.add(new IndexRequest(index).id(json.getDocId()).source(json)); + } try { if(!bulkProcessor.awaitClose(1200L, TimeUnit.SECONDS)) { logger.error("Timed out in bulk processing"); From 7607bbafe30622efdbec2d61b795185cea564bac Mon Sep 17 00:00:00 2001 From: Jason Han Date: Fri, 18 Sep 2020 13:10:09 -0700 Subject: [PATCH 3/4] Adding null check and logging. Remove ldap from example --- .../org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java | 11 ++++++----- example/example.gradle | 1 - gradle.properties | 2 +- .../twc/services/TwcRevisionMmsCommitMapService.java | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java index cc3152c51..77864b63a 100644 --- a/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/sdvc/elastic/BaseElasticDAOImpl.java @@ -8,8 +8,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; @@ -32,16 +30,17 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; -import org.openmbee.sdvc.core.exceptions.SdvcException; import org.openmbee.sdvc.elastic.utils.Index; import org.openmbee.sdvc.json.BaseJson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; public abstract class BaseElasticDAOImpl> { - private final Logger logger = LogManager.getLogger(getClass()); + private final Logger logger = LoggerFactory.getLogger(getClass()); @Value("${elasticsearch.limit.result}") protected int resultLimit; @@ -154,7 +153,7 @@ public void indexAll(String index, Collection jsons) { logger.error("Timed out in bulk processing"); } } catch (InterruptedException e) { - logger.error(e); + logger.error("Index all interrupted: ", e); } } @@ -197,6 +196,7 @@ private static BulkProcessor getBulkProcessor(RestHighLevelClient client) { private static BulkProcessor getBulkProcessor(RestHighLevelClient client, BulkProcessor.Listener listener) { if (listener == null) { listener = new BulkProcessor.Listener() { + private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void beforeBulk(long executionId, BulkRequest request) { } @@ -207,6 +207,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + logger.error("Error in bulk processing: ", failure); } }; } diff --git a/example/example.gradle b/example/example.gradle index 946eebb4f..bd0bf45ef 100644 --- a/example/example.gradle +++ b/example/example.gradle @@ -18,7 +18,6 @@ dependencies { implementation( project(':authenticator'), project(':localuser'), - project(':ldap'), project(':cameo'), project(':elastic'), project(':jupyter'), diff --git a/gradle.properties b/gradle.properties index 9906d3482..268a966a9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,4 +7,4 @@ springSecurityVersion=5.3.1.RELEASE springDataVersion=2.2.6.RELEASE jacksonVersion=2.10.3 log4jVersion=2.13.1 -elasticVersion=7.9.0 +elasticVersion=7.7.1 diff --git a/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java b/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java index bd7f6ca30..04ae42101 100644 --- a/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java +++ b/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java @@ -36,7 +36,7 @@ public void setBranchRepository(BranchDAO branchRepository) { */ public CommitsResponse updateTwcRevisionID(String projectId, String commitId, String revisionId) { CommitsResponse commitsResponse = new CommitsResponse(); - if (revisionId.isEmpty()) { + if (revisionId != null && revisionId.isEmpty()) { return commitsResponse.addMessage("Revision id can not be empty"); } ContextHolder.setContext(projectId); From c7be98a5c6172573c912271241992d74a408dfbf Mon Sep 17 00:00:00 2001 From: Jason Han Date: Fri, 18 Sep 2020 14:45:05 -0700 Subject: [PATCH 4/4] Fix comp --- .../sdvc/twc/services/TwcRevisionMmsCommitMapService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java b/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java index 04ae42101..9548e32dc 100644 --- a/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java +++ b/twc/src/main/java/org/openmbee/sdvc/twc/services/TwcRevisionMmsCommitMapService.java @@ -36,7 +36,7 @@ public void setBranchRepository(BranchDAO branchRepository) { */ public CommitsResponse updateTwcRevisionID(String projectId, String commitId, String revisionId) { CommitsResponse commitsResponse = new CommitsResponse(); - if (revisionId != null && revisionId.isEmpty()) { + if (revisionId == null || revisionId.isEmpty()) { return commitsResponse.addMessage("Revision id can not be empty"); } ContextHolder.setContext(projectId);