diff --git a/crud/src/main/java/org/openmbee/mms/crud/services/DefaultNodeService.java b/crud/src/main/java/org/openmbee/mms/crud/services/DefaultNodeService.java index 482e052fa..48f28e4c1 100644 --- a/crud/src/main/java/org/openmbee/mms/crud/services/DefaultNodeService.java +++ b/crud/src/main/java/org/openmbee/mms/crud/services/DefaultNodeService.java @@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.*; @@ -133,6 +135,7 @@ public void readAsStream(String projectId, String refId, .getBytes(StandardCharsets.UTF_8)); } catch (IOException ioe) { logger.error("Error writing to stream", ioe); + throw new InternalErrorException("Error writing to stream."); } }); if (!"application/x-ndjson".equals(accept)) { @@ -234,7 +237,7 @@ public void commitChanges(NodeChangeInfo info) { }); this.nodeRepository.saveAll(new ArrayList<>(nodes.values())); } catch (Exception e) { - logger.error("commitChanges error: {}", e.getMessage()); + logger.error("Error in commitChanges: ", e); throw new InternalErrorException("Error committing changes: " + e.getMessage()); } eventPublisher.forEach((pub) -> pub.publish( diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java index 9d90260e0..f83e20224 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/BaseElasticDAOImpl.java @@ -52,7 +52,7 @@ public abstract class BaseElasticDAOImpl> { protected static int readTimeout = 1000000000; protected RestHighLevelClient client; - private static final RequestOptions REQUEST_OPTIONS; + protected static final RequestOptions REQUEST_OPTIONS; static { RequestOptions.Builder requestBuilder = RequestOptions.DEFAULT.toBuilder(); // TODO: Should be configureable diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/CommitElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/mms/elastic/CommitElasticDAOImpl.java index 2f13e8a2d..9cb2ce422 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/CommitElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/CommitElasticDAOImpl.java @@ -42,7 +42,6 @@ public void index(BaseJson json) { public void index(CommitJson json) { int commitCount = getCommitSize(json); - List broken = new ArrayList<>(); if (commitCount > commitLimit) { List> allActions = new ArrayList<>(); allActions.addAll(json.getAdded().stream().peek(toAdd -> toAdd.put("action", "added")).collect(Collectors.toList())); @@ -71,10 +70,8 @@ public void index(CommitJson json) { break; } } while(getCommitSize(currentCommitCopy) < commitLimit && !allActions.isEmpty()); - broken.add(currentCommitCopy); - + this.index(getIndex(), currentCommitCopy); } - this.indexAll(broken); } else { this.index(getIndex(), json); @@ -213,7 +210,7 @@ private SearchHits getCommitResults(QueryBuilder query) throws IOException { sourceBuilder.size(this.resultLimit); // TODO handle paging requests sourceBuilder.sort(new FieldSortBuilder(CommitJson.CREATED).order(SortOrder.DESC)); searchRequest.source(sourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + SearchResponse searchResponse = client.search(searchRequest, REQUEST_OPTIONS); return searchResponse.getHits(); } diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java b/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java index f47a42efe..15d82b963 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/NodeElasticDAOImpl.java @@ -74,7 +74,7 @@ public Optional getByCommitId(String commitId, String nodeId) { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(query); searchRequest.source(sourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + SearchResponse searchResponse = client.search(searchRequest, REQUEST_OPTIONS); if (searchResponse.getHits().getTotalHits().value == 0) { return Optional.empty(); } @@ -136,7 +136,7 @@ public Optional getElementLessThanOrEqualTimestamp(String nodeId, searchSourceBuilder.size(1); searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + SearchResponse searchResponse = client.search(searchRequest, REQUEST_OPTIONS); SearchHit[] searchHits = searchResponse.getHits().getHits(); if (searchHits != null && searchHits.length > 0) { ElementJson elementJson = newInstance(); diff --git a/elastic/src/main/java/org/openmbee/mms/elastic/config/ElasticsearchConfig.java b/elastic/src/main/java/org/openmbee/mms/elastic/config/ElasticsearchConfig.java index 89bd9ff9f..7a8866c50 100644 --- a/elastic/src/main/java/org/openmbee/mms/elastic/config/ElasticsearchConfig.java +++ b/elastic/src/main/java/org/openmbee/mms/elastic/config/ElasticsearchConfig.java @@ -1,12 +1,12 @@ package org.openmbee.mms.elastic.config; import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.beans.factory.annotation.Value; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.RestClient; @Configuration public class ElasticsearchConfig { @@ -20,7 +20,9 @@ public class ElasticsearchConfig { @Bean(name = "clientElastic", destroyMethod = "close") public RestHighLevelClient restClient() { + RestClientBuilder builder = RestClient.builder(new HttpHost(elasticsearchHost, elasticsearchPort, elasticsearchHttp)); + builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(10000).setSocketTimeout(1000000)); RestHighLevelClient client = new RestHighLevelClient(builder); return client; }