From 23a96ccc0786d31dfe2190fe470b0afe2b52c936 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 1 Oct 2018 18:39:03 -0400 Subject: [PATCH 01/16] Can change source field used for document ID. Unable to /findOne in Alerts UI --- .../writer/ElasticsearchWriter.java | 37 +++-- .../writer/ElasticsearchWriterConfig.java | 136 ++++++++++++++++++ 2 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 4b8dd083e9..9cb423349f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -17,6 +17,8 @@ */ package org.apache.metron.elasticsearch.writer; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.field.FieldNameConverter; @@ -43,6 +45,8 @@ import java.util.List; import java.util.Map; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.DOC_ID_SOURCE_FIELD; + /** * A {@link BulkMessageWriter} that writes messages to Elasticsearch. */ @@ -72,10 +76,13 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura @Override public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable tuples, List messages) throws Exception { - // fetch the field name converter for this sensor type + // writer settings FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations); - final String indexPostfix = dateFormat.format(new Date()); + final String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); + final String docType = sensorType + "_doc"; + final String docIdSourceField = DOC_ID_SOURCE_FIELD.get(configurations.getGlobalConfig(), String.class); + BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject message: messages) { @@ -84,23 +91,33 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura copyField(k.toString(), message, esDoc, fieldNameConverter); } - String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); - IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, sensorType + "_doc"); - indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString()); - String guid = (String)esDoc.get(Constants.GUID); - if(guid != null) { - indexRequestBuilder.setId(guid); + IndexRequestBuilder indexRequestBuilder = client + .prepareIndex(indexName, docType) + .setSource(esDoc.toJSONString()); + + // set the document identifier + if(StringUtils.isNotBlank(docIdSourceField)) { + String docId = (String) esDoc.get(docIdSourceField); + if(docId != null) { + indexRequestBuilder.setId(docId); + + } else { + LOG.warn("Message is missing document ID source field; document ID not set; sourceField={}", docIdSourceField); + } } - Object ts = esDoc.get("timestamp"); + // set the document timestamp, if one exists + Object ts = esDoc.get(Constants.Fields.TIMESTAMP.getName()); if(ts != null) { - indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString()); + indexRequestBuilder.setTimestamp(ts.toString()); } bulkRequest.add(indexRequestBuilder); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + LOG.info("Wrote {} message(s) to Elasticsearch; sensorType={}, index={}, docType={}, took={}", + ArrayUtils.getLength(bulkResponse.getItems()), sensorType, indexName, docType, bulkResponse.getTook().format()); return buildWriteReponse(tuples, bulkResponse); } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java new file mode 100644 index 0000000000..90621ee056 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.elasticsearch.writer; + +import org.apache.metron.common.Constants; +import org.apache.metron.stellar.common.utils.ConversionUtils; + +import java.util.Map; + +/** + * Configuration settings that customize the behavior of the {@link ElasticsearchWriter}. + */ +public enum ElasticsearchWriterConfig { + + /** + * Defines which message field, the document identifier is set to. + * + *

If defined, the value of the specified message field is set as the Elasticsearch doc ID. If + * this field is undefined or blank, then the document identifier is not set. + * + *

By default, the document identifier is set to the message GUID. + */ + DOC_ID_SOURCE_FIELD("es.doc.id.source.field", Constants.GUID, String.class); + + /** + * The key for the configuration value. + */ + private String key; + + /** + * The default value of the configuration, if none other is specified. + */ + private Object defaultValue; + + /** + * The type of the configuration value. + */ + private Class valueType; + + ElasticsearchWriterConfig(String key, Object defaultValue, Class valueType) { + this.key = key; + this.defaultValue = defaultValue; + this.valueType = valueType; + } + + /** + * Returns the key of the configuration value. + */ + public String getKey() { + return key; + } + + /** + * Returns the default value of the configuration. + */ + public Object getDefault() { + return getDefault(valueType); + } + + /** + * Returns the default value of the configuration, cast to the expected type. + * + * @param clazz The class of the expected type of the configuration value. + * @param The expected type of the configuration value. + */ + public T getDefault(Class clazz) { + return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values. + * + * @param config A map containing configuration values. + */ + public Object get(Map config) { + return getOrDefault(config, defaultValue); + } + + /** + * Returns the configuration value from a map of configuration values, cast to the expected type. + * + * @param config A map containing configuration values. + */ + public T get(Map config, Class clazz) { + return getOrDefault(config, defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values. If the value is not specified, + * the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @return The configuration value or the specified default, if one is not defined. + */ + private Object getOrDefault(Map config, Object defaultValue) { + return getOrDefault(config, defaultValue, valueType); + } + + /** + * Returns the configuration value, cast to the expected type, from a map of configuration values. + * If the value is not specified, the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @param clazz The class of the expected type of the configuration value. + * @param The expected type of the configuration value. + * @return The configuration value or the specified default, if one is not defined. + */ + private T getOrDefault(Map config, Object defaultValue, Class clazz) { + Object value = config.getOrDefault(key, defaultValue.toString()); + return value == null ? null : ConversionUtils.convert(value, clazz); + } + + @Override + public String toString() { + return key; + } +} From 2d0f478327981403eb2463b7efefe3d871441db0 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 2 Oct 2018 11:09:02 -0400 Subject: [PATCH 02/16] Cannot assume that ES doc ID == Metron GUID --- .../dao/ElasticsearchRetrieveLatestDao.java | 146 ++++++++++++------ 1 file changed, 95 insertions(+), 51 deletions(-) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java index f6bfedafaa..aa33cf7f8c 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java @@ -20,27 +20,35 @@ import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.function.Function; +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.Constants; import org.apache.metron.indexing.dao.RetrieveLatestDao; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.update.Document; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.termsQuery; public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private TransportClient transportClient; public ElasticsearchRetrieveLatestDao(TransportClient transportClient) { @@ -49,7 +57,7 @@ public ElasticsearchRetrieveLatestDao(TransportClient transportClient) { @Override public Document getLatest(String guid, String sensorType) { - Optional doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); + Optional doc = searchByGuid(guid, sensorType, hit -> toDocument(hit)); return doc.orElse(null); } @@ -61,26 +69,19 @@ public Iterable getAllLatest(List getRequests) { guids.add(getRequest.getGuid()); sensorTypes.add(getRequest.getSensorType()); } - List documents = searchByGuids( - guids, - sensorTypes, - hit -> { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); - try { - return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } - } - - ); - return documents; + return searchByGuids(guids, sensorTypes, hit -> toDocument(hit)); } - Optional searchByGuid(String guid, String sensorType, - Function> callback) { + /** + * Search for a Metron GUID. + * + * @param guid The Metron GUID to search for. + * @param sensorType The sensor type to include in the search. + * @param callback A callback that transforms a search hit to a search result. + * @param The type of search results expected. + * @return All documents with the given Metron GUID and sensor type. + */ + Optional searchByGuid(String guid, String sensorType, Function> callback) { Collection sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; List results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); if (results.size() > 0) { @@ -91,48 +92,69 @@ Optional searchByGuid(String guid, String sensorType, } /** - * Return the search hit based on the UUID and sensor type. - * A callback can be specified to transform the hit into a type T. - * If more than one hit happens, the first one will be returned. + * Search for one or more Metron GUIDs. + * + * @param guids The Metron GUIDs to search by. + * @param sensorTypes The sensor types to include in the search. + * @param callback A callback that transforms a search hit to a search result. + * @param The type of expected results. + * @return All documents with the given Metron GUID and sensor type. */ - List searchByGuids(Collection guids, Collection sensorTypes, - Function> callback) { + List searchByGuids(Collection guids, + Collection sensorTypes, + Function> callback) { if (guids == null || guids.isEmpty()) { return Collections.emptyList(); } - QueryBuilder query = null; - IdsQueryBuilder idsQuery; - if (sensorTypes != null) { - String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc") - .toArray(String[]::new); - idsQuery = QueryBuilders.idsQuery(types); - } else { - idsQuery = QueryBuilders.idsQuery(); - } - for (String guid : guids) { - query = idsQuery.addIds(guid); + // search by metron's GUID field + QueryBuilder query = boolQuery().must(termsQuery(Constants.GUID, guids)); + SearchRequestBuilder request = transportClient + .prepareSearch() + .setQuery(query) + .setSize(guids.size()); + + if(sensorTypes != null) { + request.setTypes(toDocTypes(sensorTypes)); } - SearchRequestBuilder request = transportClient.prepareSearch() - .setQuery(query) - .setSize(guids.size()); - org.elasticsearch.action.search.SearchResponse response = request.get(); - SearchHits hits = response.getHits(); + // transform the search hits to results + SearchHits hits = request.get().getHits(); + return getResults(hits, callback); + } + + /** + * Retrieve search results. + * + * @param hits The search hit. + * @param callback A callback function that transforms search hits to search results. + * @param The execpted type of search result. + * @return The search results. + */ + private List getResults(SearchHits hits, Function> callback) { List results = new ArrayList<>(); + for (SearchHit hit : hits) { Optional result = callback.apply(hit); if (result.isPresent()) { results.add(result.get()); } } + return results; } - private Optional toDocument(final String guid, SearchHit hit) { + /** + * Transforms a {@link SearchHit} to a {@link Document}. + * + * @param hit The result of a search. + * @return An optional {@link Document}. + */ + private Optional toDocument(SearchHit hit) { Long ts = 0L; String doc = hit.getSourceAsString(); String sourceType = toSourceType(hit.getType()); + String guid = (String) hit.getSource().get(Constants.GUID); try { return Optional.of(new Document(doc, guid, sourceType, ts)); } catch (IOException e) { @@ -148,4 +170,26 @@ private Optional toDocument(final String guid, SearchHit hit) { private String toSourceType(String docType) { return Iterables.getFirst(Splitter.on("_doc").split(docType), null); } + + /** + * Returns the doc types for a given collection of sensor types. + * + * @param sensorTypes The sensor types. + * @return The doc types associated with each sensor. + */ + private String[] toDocTypes(Collection sensorTypes) { + String[] result; + + if(sensorTypes != null && sensorTypes.size() > 0) { + result = sensorTypes + .stream() + .map(sensorType -> sensorType + "_doc") + .toArray(String[]::new); + + } else { + result = new String[0]; + } + + return result; + } } From d66c839eda55d5efd22f56f8f5920937e2703d14 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 2 Oct 2018 11:10:13 -0400 Subject: [PATCH 03/16] Removed unnecessary dependencies --- metron-platform/metron-elasticsearch/pom.xml | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml index 9cc974d3fa..cd9fe617c4 100644 --- a/metron-platform/metron-elasticsearch/pom.xml +++ b/metron-platform/metron-elasticsearch/pom.xml @@ -206,24 +206,7 @@ test-jar test - - org.apache.logging.log4j - log4j-api - ${global_log4j_core_version} - - - org.apache.logging.log4j - log4j-core - ${global_log4j_core_version} - - - com.google.guava - guava-testlib - ${global_guava_version} - test - - From 36c59211181480db571619b811456105b80d7b0d Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 2 Oct 2018 15:33:04 -0400 Subject: [PATCH 04/16] Search results need to use Metron GUID as ID, not the doc ID --- .../dao/ElasticsearchRetrieveLatestDao.java | 3 +- .../dao/ElasticsearchSearchDao.java | 28 +++++++--- .../utils/ElasticsearchUtils.java | 56 +++++++++++++++---- .../dao/ElasticsearchDaoTest.java | 10 +++- .../indexing/dao/SearchIntegrationTest.java | 18 +++--- 5 files changed, 85 insertions(+), 30 deletions(-) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java index aa33cf7f8c..2f00b6805f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.Constants; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.RetrieveLatestDao; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.update.Document; @@ -154,7 +155,7 @@ private Optional toDocument(SearchHit hit) { Long ts = 0L; String doc = hit.getSourceAsString(); String sourceType = toSourceType(hit.getType()); - String guid = (String) hit.getSource().get(Constants.GUID); + String guid = ElasticsearchUtils.getGUID(hit); try { return Optional.of(new Document(doc, guid, sourceType, ts)); } catch (IOException e) { diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java index 5cd0a4d477..c944578f80 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java @@ -33,6 +33,8 @@ import java.util.Map; import java.util.Optional; import java.util.function.Function; + +import org.apache.metron.common.Constants; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.FieldType; @@ -74,6 +76,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.metron.common.Constants.GUID; + public class ElasticsearchSearchDao implements SearchDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -284,19 +288,29 @@ private String[] wildcardIndices(List indices) { .toArray(value -> new String[indices.size()]); } - private SearchResult getSearchResult(SearchHit searchHit, List fields) { - SearchResult searchResult = new SearchResult(); - searchResult.setId(searchHit.getId()); + /** + * Transforms a {@link SearchHit} to a {@link SearchResult}. + * + * @param searchHit The result of a search request. + * @param fieldsToKeep The source fields to keep. All others are excluded. If null, all source fields are kept. + * @return A search result. + */ + private SearchResult getSearchResult(SearchHit searchHit, List fieldsToKeep) { + Map source; - if (fields != null) { - Map resultSourceAsMap = searchHit.getSourceAsMap(); + if (fieldsToKeep != null) { source = new HashMap<>(); - fields.forEach(field -> { - source.put(field, resultSourceAsMap.get(field)); + // must always keep the GUID + fieldsToKeep.add(Constants.GUID); + fieldsToKeep.forEach(field -> { + source.put(field, searchHit.getSourceAsMap().get(field)); }); } else { source = searchHit.getSource(); } + + SearchResult searchResult = new SearchResult(); + searchResult.setId(ElasticsearchUtils.getGUID(searchHit)); searchResult.setSource(source); searchResult.setScore(searchHit.getScore()); searchResult.setIndex(searchHit.getIndex()); diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 98dc66d813..84990f8dd7 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -18,6 +18,7 @@ package org.apache.metron.elasticsearch.utils; import static java.lang.String.format; +import static org.apache.metron.common.Constants.GUID; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -53,6 +54,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -373,16 +375,48 @@ public static SearchResponse queryAllResults(TransportClient transportClient, * @param searchResponse An Elasticsearch SearchHit to be converted. * @return The list of SearchResults for the SearchHit */ - protected static List getSearchResults( - org.elasticsearch.action.search.SearchResponse searchResponse) { - return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> { - SearchResult searchResult = new SearchResult(); - searchResult.setId(searchHit.getId()); - searchResult.setSource(searchHit.getSource()); - searchResult.setScore(searchHit.getScore()); - searchResult.setIndex(searchHit.getIndex()); - return searchResult; - } - ).collect(Collectors.toList()); + protected static List getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) { + SearchHit[] searchHits = searchResponse.getHits().getHits(); + return Arrays.stream(searchHits) + .map(hit -> toSearchResult(hit)) + .collect(Collectors.toList()); + } + + /** + * Transforms a {@link SearchHit} to a {@link SearchResult}. + * + * @param searchHit The search hit to transform. + * @return A {@link SearchResult} representing the {@link SearchHit}. + */ + protected static SearchResult toSearchResult(SearchHit searchHit) { + SearchResult searchResult = new SearchResult(); + searchResult.setId(getGUID(searchHit)); + searchResult.setSource(searchHit.getSource()); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + + /** + * Retrieves the Metron GUID from a {@link SearchHit}. + * + * @param searchHit The search hit containing a Metron GUID. + * @return The Metron GUID. + */ + public static String getGUID(SearchHit searchHit) { + String guid; + if(searchHit.hasSource() && searchHit.getSource().containsKey(GUID)) { + guid = (String) searchHit.getSource().get(GUID); + + } else if(!searchHit.hasSource()) { + String template = "No source found, has it been disabled in the mapping? index=%s, docId=%s"; + throw new IllegalStateException(String.format(template, searchHit.getIndex(), searchHit.getId())); + + } else { + String template = "Missing expected field; field=%s, index=%s, docId=%s, fields=%s"; + throw new IllegalStateException(String.format(template, GUID, searchHit.getIndex(), searchHit.getId(), searchHit.getSource().keySet())); + } + + return guid; } } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java index 6c3c3270da..a2c8de5a68 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.metron.common.Constants; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.FieldType; @@ -57,16 +59,20 @@ private void setup(RestStatus status, int maxSearchResults, Map() {{ put("field", "value1"); + put(Constants.GUID, "id1"); }}); when(hit1.getScore()).thenReturn(0.1f); SearchHit hit2 = mock(SearchHit.class); - when(hit2.getId()).thenReturn("id2"); + when(hit2.getId()).thenReturn("docId2"); + when(hit2.hasSource()).thenReturn(true); when(hit2.getSource()).thenReturn(new HashMap() {{ put("field", "value2"); + put(Constants.GUID, "id2"); }}); when(hit2.getScore()).thenReturn(0.2f); diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index 2e1968ac0a..7b4c68a4e3 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Optional; import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; @@ -901,16 +902,15 @@ public void queries_fields() throws Exception { SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class); SearchResponse response = getIndexDao().search(request); Assert.assertEquals(10, response.getTotal()); + List results = response.getResults(); - for (int i = 0; i < 5; ++i) { - Map source = results.get(i).getSource(); - Assert.assertEquals(1, source.size()); - Assert.assertNotNull(source.get("ip_src_addr")); - } - for (int i = 5; i < 10; ++i) { + Assert.assertEquals(10, response.getResults().size()); + + // validate the source fields contained in the search response + for (int i = 0; i < 10; ++i) { Map source = results.get(i).getSource(); - Assert.assertEquals(1, source.size()); - Assert.assertNotNull(source.get("ip_src_addr")); + Assert.assertNotNull(source.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertNotNull(source.get(Constants.GUID)); } } @@ -923,7 +923,7 @@ public void sort_by_guid() throws Exception { for (int i = 0; i < 5; ++i) { Map source = results.get(i).getSource(); Assert.assertEquals(1, source.size()); - Assert.assertEquals(source.get("guid"), "bro_" + (i + 1)); + Assert.assertEquals(source.get(Constants.GUID), "bro_" + (i + 1)); } } From 4ebb8000522540b6ff2a30e2d5ffeffcf6016bb4 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 2 Oct 2018 17:31:58 -0400 Subject: [PATCH 05/16] Small rename --- .../metron/elasticsearch/writer/ElasticsearchWriter.java | 3 +-- .../metron/elasticsearch/writer/ElasticsearchWriterConfig.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 9cb423349f..546d6c1433 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -75,7 +75,6 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura @Override public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable tuples, List messages) throws Exception { - // writer settings FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations); final String indexPostfix = dateFormat.format(new Date()); @@ -86,6 +85,7 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject message: messages) { + // clone the message to use as the document that will be indexed JSONObject esDoc = new JSONObject(); for(Object k : message.keySet()){ copyField(k.toString(), message, esDoc, fieldNameConverter); @@ -100,7 +100,6 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura String docId = (String) esDoc.get(docIdSourceField); if(docId != null) { indexRequestBuilder.setId(docId); - } else { LOG.warn("Message is missing document ID source field; document ID not set; sourceField={}", docIdSourceField); } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java index 90621ee056..1ac0665e58 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java @@ -37,7 +37,7 @@ public enum ElasticsearchWriterConfig { * *

By default, the document identifier is set to the message GUID. */ - DOC_ID_SOURCE_FIELD("es.doc.id.source.field", Constants.GUID, String.class); + DOC_ID_SOURCE_FIELD("es.document.id", Constants.GUID, String.class); /** * The key for the configuration value. From 13d698df8cb46a3cf8ef747f7f62acc298ef8e4b Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 2 Oct 2018 17:38:00 -0400 Subject: [PATCH 06/16] Removed unncessary part of error msg --- .../apache/metron/elasticsearch/utils/ElasticsearchUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 84990f8dd7..6dc10757ba 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -413,8 +413,8 @@ public static String getGUID(SearchHit searchHit) { throw new IllegalStateException(String.format(template, searchHit.getIndex(), searchHit.getId())); } else { - String template = "Missing expected field; field=%s, index=%s, docId=%s, fields=%s"; - throw new IllegalStateException(String.format(template, GUID, searchHit.getIndex(), searchHit.getId(), searchHit.getSource().keySet())); + String template = "Missing expected field; field=%s, index=%s, docId=%s"; + throw new IllegalStateException(String.format(template, GUID, searchHit.getIndex(), searchHit.getId())); } return guid; From 5711c89acf49e993022e4fa1bc367beebcad6d21 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Wed, 3 Oct 2018 08:05:16 -0400 Subject: [PATCH 07/16] Elasticsearch always returns a GUID, while Solr does not for now. Needed to fix-up the integration tests due to this. --- .../ElasticsearchSearchIntegrationTest.java | 20 +++++++++++++++++-- .../indexing/dao/SearchIntegrationTest.java | 17 ---------------- .../SolrSearchIntegrationTest.java | 19 ++++++++++++++++++ 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index 8071e68fc9..36f87512c2 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -266,8 +266,6 @@ public void bad_facet_query_throws_exception() throws Exception { dao.search(request); } - - @Override public void returns_column_metadata_for_specified_indices() throws Exception { // getColumnMetadata with only bro @@ -352,6 +350,24 @@ public void different_type_filter_query() throws Exception { Assert.assertEquals("data 1", results.get(0).getSource().get("ttl")); } + @Test + public void queries_fields() throws Exception { + SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class); + SearchResponse response = getIndexDao().search(request); + Assert.assertEquals(10, response.getTotal()); + + List results = response.getResults(); + Assert.assertEquals(10, response.getResults().size()); + + // validate the source fields contained in the search response + for (int i = 0; i < 10; ++i) { + Map source = results.get(i).getSource(); + Assert.assertNotNull(source); + Assert.assertNotNull(source.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertNotNull(source.get(Constants.GUID)); + } + } + @Override protected String getSourceTypeField() { return Constants.SENSOR_TYPE.replace('.', ':'); diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index 7b4c68a4e3..743bbbd0ea 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -897,23 +897,6 @@ public void group_by_returns_results_in_sorted_groups() throws Exception { Assert.assertNull(trueIpSrcAddrGroup4.getGroupResults()); } - @Test - public void queries_fields() throws Exception { - SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class); - SearchResponse response = getIndexDao().search(request); - Assert.assertEquals(10, response.getTotal()); - - List results = response.getResults(); - Assert.assertEquals(10, response.getResults().size()); - - // validate the source fields contained in the search response - for (int i = 0; i < 10; ++i) { - Map source = results.get(i).getSource(); - Assert.assertNotNull(source.get(Constants.Fields.SRC_ADDR.getName())); - Assert.assertNotNull(source.get(Constants.GUID)); - } - } - @Test public void sort_by_guid() throws Exception { SearchRequest request = JSONUtils.INSTANCE.load(sortByGuidQuery, SearchRequest.class); diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java index 4390fd1fc0..1ab2d7b9ed 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; @@ -33,6 +34,7 @@ import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.solr.dao.SolrDao; import org.apache.metron.solr.integration.components.SolrComponent; @@ -223,6 +225,23 @@ public void returns_column_data_for_multiple_indices() throws Exception { Assert.assertEquals(null, fieldTypes.get("fake.field")); } + @Test + public void queries_fields() throws Exception { + SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class); + SearchResponse response = getIndexDao().search(request); + Assert.assertEquals(10, response.getTotal()); + + List results = response.getResults(); + Assert.assertEquals(10, response.getResults().size()); + + // validate the source fields contained in the search response + for (int i = 0; i < 10; ++i) { + Map source = results.get(i).getSource(); + Assert.assertNotNull(source); + Assert.assertNotNull(source.get(Constants.Fields.SRC_ADDR.getName())); + } + } + @Test public void different_type_filter_query() throws Exception { thrown.expect(InvalidSearchException.class); From 1766cb387db6c1caa67fd4c3fbfdb003c5ffcaeb Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Wed, 3 Oct 2018 10:43:13 -0400 Subject: [PATCH 08/16] Added Mpack support --- .../METRON/CURRENT/configuration/metron-env.xml | 9 +++++++++ .../METRON/CURRENT/package/scripts/metron_service.py | 5 +++++ .../CURRENT/package/scripts/params/params_linux.py | 1 + .../METRON/CURRENT/themes/metron_theme.json | 10 ++++++++++ 4 files changed, 25 insertions(+) diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml index cdef7cfaf6..47a0ea7d4f 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml @@ -121,6 +121,15 @@ true + + es_document_id + Message field containing the Elasticsearch document ID + guid + Elasticsearch Document ID Source Field + + true + + solr_zookeeper_url {{zookeeper_quorum}} diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py index 9d15e93f63..9b9d99b889 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py @@ -83,6 +83,11 @@ def elasticsearch_global_config_patches(): "op": "add", "path": "/es.date.format", "value": "{{es_date_format}}" + }, + { + "op": "add", + "path": "/es.document.id", + "value": "{{es_document_id}}" } """ diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index dbad44d68a..adc548fba5 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -101,6 +101,7 @@ es_http_port = config['configurations']['metron-env']['es_http_port'] es_http_url = es_host_list[0] + ":" + es_http_port es_date_format = config['configurations']['metron-env']['es_date_format'] +es_document_id = config['configurations']['metron-env']['es_document_id'] # hadoop params stack_root = Script.get_stack_root() diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json index 2b64f8f91b..17853201fe 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json @@ -447,6 +447,10 @@ "config": "metron-env/es_date_format", "subsection-name": "subsection-index-settings" }, + { + "config": "metron-env/es_document_id", + "subsection-name": "subsection-index-settings" + }, { "config": "metron-env/solr_zookeeper_url", "subsection-name": "subsection-index-settings" @@ -872,6 +876,12 @@ "type": "text-field" } }, + { + "config": "metron-env/es_document_id", + "widget": { + "type": "text-field" + } + }, { "config": "metron-env/solr_zookeeper_url", "widget": { From 6fa5b5b36d6ead95ebd44c6010d0bfcaa17c35e8 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Wed, 3 Oct 2018 11:03:27 -0400 Subject: [PATCH 09/16] Added simple gitignore --- metron-deployment/ansible/callback_plugins/.gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 metron-deployment/ansible/callback_plugins/.gitignore diff --git a/metron-deployment/ansible/callback_plugins/.gitignore b/metron-deployment/ansible/callback_plugins/.gitignore new file mode 100644 index 0000000000..0d20b6487c --- /dev/null +++ b/metron-deployment/ansible/callback_plugins/.gitignore @@ -0,0 +1 @@ +*.pyc From dbb83ad8280a22a04cf7433255e0d43e3cfff285 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Wed, 3 Oct 2018 11:03:51 -0400 Subject: [PATCH 10/16] Added documentation for es.document.id --- metron-platform/metron-elasticsearch/README.md | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md index d889e27186..2bcfca548e 100644 --- a/metron-platform/metron-elasticsearch/README.md +++ b/metron-platform/metron-elasticsearch/README.md @@ -59,6 +59,19 @@ For instance, an `es.date.format` of `yyyy.MM.dd.HH` would have the consequence roll hourly, whereas an `es.date.format` of `yyyy.MM.dd` would have the consequence that the indices would roll daily. +### `es.document.id` + +This property defines the message field that is used to define the document ID when a message is indexed by Elasticsearch. This allows a user to customize the identifier that is used by Elasticsearch when indexing documents. + +* By default, Metron's GUID field will be used as the source of the document ID. Using a randomized UUID like Java's UUID.randomUUID() can negatively impact Elasticsearch performance. + +* To allow Elasticsearch to define its own document id, this property should be set to a blank or empty string. In this case, the document ID will not be set by the client and Elasticsearch will define its own. + +* If a user wants to set their own custom document ID, they can create an enrichment that defines a new message field; for example called `my_document_id`. They should then use this new message field to set the Elasticsearch document ID. + ``` + es.document.id = my_document_id + ``` + ## Upgrading to 5.6.2 Users should be prepared to re-index when migrating from Elasticsearch 2.3.3 to 5.6.2. There are a number of template changes, most notably around @@ -303,7 +316,7 @@ We'll want to put the template back into Elasticsearch: curl -XPUT "http://${ELASTICSEARCH}:9200/_template/${SENSOR}_index" -d @${SENSOR}.template ``` -To update existing indexes, update Elasticsearch mappings with the new field for each sensor. +To update existing indexes, update Elasticsearch mappings with the new field for each sensor. ``` curl -XPUT "http://${ELASTICSEARCH}:9200/${SENSOR}_index*/_mapping/${SENSOR}_doc" -d ' @@ -322,7 +335,7 @@ rm ${SENSOR}.template The stock set of Elasticsearch templates for bro, snort, yaf, error index and meta index are installed automatically during the first time install and startup of Metron Indexing service. -It is possible that Elasticsearch service is not available when the Metron Indexing Service startup, in that case the Elasticsearch template will not be installed. +It is possible that Elasticsearch service is not available when the Metron Indexing Service startup, in that case the Elasticsearch template will not be installed. For such a scenario, an Admin can have the template installed in two ways: From af36ebf55ce3d95d368d2f876fe6862c6d20a8e5 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Wed, 3 Oct 2018 13:21:32 -0400 Subject: [PATCH 11/16] Changed default behavior to use Elasticsearch generated doc ID, which is the most performant option --- Upgrading.md | 23 +++++++++++++++---- .../CURRENT/configuration/metron-env.xml | 2 +- .../metron-elasticsearch/README.md | 16 +++++++++---- .../writer/ElasticsearchWriterConfig.java | 4 +--- 4 files changed, 32 insertions(+), 13 deletions(-) diff --git a/Upgrading.md b/Upgrading.md index 2124ac51b8..c3b2a0f5fe 100644 --- a/Upgrading.md +++ b/Upgrading.md @@ -16,9 +16,22 @@ See the License for the specific language governing permissions and limitations under the License. --> # Upgrading + This document constitutes a per-version listing of changes of configuration which are non-backwards compatible. +## 0.6.0 to 0.6.1 + +### [METRON-1801 Allow Customization of Elasticsearch Document ID](https://issues.apache.org/jira/browse/METRON-1801) + +A global property named `es.document.id` was added to define the field from which the document ID is set when a message is indexed by Elasticsearch. To allow Elasticsearch to define its own document id, this property should be set to a blank or empty string. The client will not set the document ID and Elasticsearch will define its own. In most cases allowing Elasticsearch to define the document ID is the most performant option. This is now the default behavior. + +Metron versions 0.6.0 and earlier defined the document ID using the Metron GUID, which is a randomized UUID using Java's `UUID.randomUUID()`. Using a randomized UUID can negatively impact Elasticsearch indexing performance. To maintain backwards compatibility with legacy versions of Metron use the following global property setting. + + ``` + es.document.id = guid + ``` + ## 0.4.2 to 0.5.0 ### [METRON-941: native PaloAlto parser corrupts message when having a comma in the payload](https://issues.apache.org/jira/browse/METRON-941) @@ -89,7 +102,7 @@ For a more detailed description, please see metron-platform/metron-elasticsearch ### Description -In the 0.4.2 release, +In the 0.4.2 release, ## 0.3.1 to 0.4.0 @@ -107,7 +120,7 @@ This effectively limits the build environment to Docker supported [platforms](ht #### Description As of 0.3.0 the indexing configuration -* Is held in the enrichment configuration for a sensor +* Is held in the enrichment configuration for a sensor * Has properties which control every writers (i.e. HDFS, solr or elasticsearch). In the 0.3.1 release, this configuration has been broken out @@ -136,7 +149,7 @@ You would create a file to configure each writer for sensor `foo` called `$METRO "batchSize" : 100, "enabled" : true }, - "hdfs" : { + "hdfs" : { "index" : "foo", "batchSize" : 100, "enabled" : true @@ -151,7 +164,7 @@ You would create a file to configure each writer for sensor `foo` called `$METRO As of 0.3.0, threat triage rules were defined as a simple Map associating a Stellar expression with a score. As of 0.3.1, due to the fact that there may be many threat triage rules, we have made the rules more complex. To help organize these, we have made the threat triage objects in their own right that contain optional name and optional comment fields. - + This essentially makes the risk level rules slightly more complex. The format goes from: ``` "riskLevelRules" : { @@ -169,7 +182,7 @@ to: } ] ``` - + #### Migration For every sensor enrichment configuration, you will need to migrate the `riskLevelRules` section diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml index 47a0ea7d4f..6925bdb58b 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml @@ -124,7 +124,7 @@ es_document_id Message field containing the Elasticsearch document ID - guid + Elasticsearch Document ID Source Field true diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md index 2bcfca548e..abb9e9d19b 100644 --- a/metron-platform/metron-elasticsearch/README.md +++ b/metron-platform/metron-elasticsearch/README.md @@ -61,17 +61,25 @@ roll daily. ### `es.document.id` -This property defines the message field that is used to define the document ID when a message is indexed by Elasticsearch. This allows a user to customize the identifier that is used by Elasticsearch when indexing documents. +This property sets the message field that is used to define the document ID when a message is indexed by Elasticsearch. -* By default, Metron's GUID field will be used as the source of the document ID. Using a randomized UUID like Java's UUID.randomUUID() can negatively impact Elasticsearch performance. +* To allow Elasticsearch to define its own document id, this property should be set to a blank or empty string. The client will not set the document ID and Elasticsearch will define its own. -* To allow Elasticsearch to define its own document id, this property should be set to a blank or empty string. In this case, the document ID will not be set by the client and Elasticsearch will define its own. +* In most cases allowing Elasticsearch to define the document ID is the most performant option. This is the default behavior. -* If a user wants to set their own custom document ID, they can create an enrichment that defines a new message field; for example called `my_document_id`. They should then use this new message field to set the Elasticsearch document ID. +* Metron versions 0.6.0 and earlier defined the document ID using the Metron GUID, which is a randomized UUID using Java's `UUID.randomUUID()`. Using a randomized UUID can negatively impact Elasticsearch indexing performance. To maintain backwards compatibility with legacy versions of Metron use the following setting. + + ``` + es.document.id = guid + ``` + +* To use a custom document ID, create an enrichment that defines a new message field; for example one called `my_document_id`. Then use this field to set the document ID as follows. This will set the document ID to the value of the message field `my_document_id`. ``` es.document.id = my_document_id ``` +* If a message does not contain the `es.document.id` field, a warning is issued and no document ID is set by the client. + ## Upgrading to 5.6.2 Users should be prepared to re-index when migrating from Elasticsearch 2.3.3 to 5.6.2. There are a number of template changes, most notably around diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java index 1ac0665e58..3f41f2fd3f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java @@ -34,10 +34,8 @@ public enum ElasticsearchWriterConfig { * *

If defined, the value of the specified message field is set as the Elasticsearch doc ID. If * this field is undefined or blank, then the document identifier is not set. - * - *

By default, the document identifier is set to the message GUID. */ - DOC_ID_SOURCE_FIELD("es.document.id", Constants.GUID, String.class); + DOC_ID_SOURCE_FIELD("es.document.id", "", String.class); /** * The key for the configuration value. From 6b79429175919d41fe3c263a95fcd4b75f1e3f9a Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 8 Oct 2018 18:16:49 -0400 Subject: [PATCH 12/16] Improve description of `es.document.id` --- .../metron-elasticsearch/README.md | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md index abb9e9d19b..71e88a3d14 100644 --- a/metron-platform/metron-elasticsearch/README.md +++ b/metron-platform/metron-elasticsearch/README.md @@ -61,23 +61,34 @@ roll daily. ### `es.document.id` -This property sets the message field that is used to define the document ID when a message is indexed by Elasticsearch. +This property sets the message field that is used to define the document ID when a message is indexed by Elasticsearch. By default, the client does not set the document ID and document ID generation is deferred to Elasticsearch. -* To allow Elasticsearch to define its own document id, this property should be set to a blank or empty string. The client will not set the document ID and Elasticsearch will define its own. +#### Option 1: Defer to Elasticsearch -* In most cases allowing Elasticsearch to define the document ID is the most performant option. This is the default behavior. +Value: (Undefined, blank or empty string) -* Metron versions 0.6.0 and earlier defined the document ID using the Metron GUID, which is a randomized UUID using Java's `UUID.randomUUID()`. Using a randomized UUID can negatively impact Elasticsearch indexing performance. To maintain backwards compatibility with legacy versions of Metron use the following setting. +* This option allows Elasticsearch to generate the document ID. +* The client will not set a document ID. +* In most cases this is the most performant option. +#### Option 2: Legacy Compatibility + +Value: guid + +* Metron versions 0.6.0 and earlier defined the document ID using the Metron GUID, which is a randomized UUID using Java's `UUID.randomUUID()`. +* Using a randomized UUID can negatively impact Elasticsearch indexing performance. +* To maintain backwards compatibility with legacy versions of Metron, set the value to `guid`. ``` es.document.id = guid ``` -* To use a custom document ID, create an enrichment that defines a new message field; for example one called `my_document_id`. Then use this field to set the document ID as follows. This will set the document ID to the value of the message field `my_document_id`. +#### Option 3: Custom Document ID + +* Advanced users can define a custom document ID. +* Create an enrichment that defines a new message field; for example one called `my_document_id`. Use this field to set the document ID. This will set the document ID to the value of the message field `my_document_id`. ``` es.document.id = my_document_id ``` - * If a message does not contain the `es.document.id` field, a warning is issued and no document ID is set by the client. ## Upgrading to 5.6.2 From 5a793ce7c3785ec6972dbd68116e2ce4d23d2b7c Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 8 Oct 2018 18:18:14 -0400 Subject: [PATCH 13/16] Better define default --- metron-platform/metron-elasticsearch/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md index 71e88a3d14..8d40eba9c5 100644 --- a/metron-platform/metron-elasticsearch/README.md +++ b/metron-platform/metron-elasticsearch/README.md @@ -70,6 +70,7 @@ Value: (Undefined, blank or empty string) * This option allows Elasticsearch to generate the document ID. * The client will not set a document ID. * In most cases this is the most performant option. +* This is the default behavior. #### Option 2: Legacy Compatibility From f3fe193b74c60fe943e1dba6737855d6419a8eea Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Thu, 11 Oct 2018 13:53:44 -0400 Subject: [PATCH 14/16] Removed the ElasticsearchWriterConfig class to maintain consistency --- .../utils/ElasticsearchUtils.java | 46 +++--- .../writer/ElasticsearchWriter.java | 5 +- .../writer/ElasticsearchWriterConfig.java | 134 ------------------ 3 files changed, 30 insertions(+), 155 deletions(-) delete mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 6dc10757ba..31503faa1e 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -17,27 +17,9 @@ */ package org.apache.metron.elasticsearch.utils; -import static java.lang.String.format; -import static org.apache.metron.common.Constants.GUID; - import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.HDFSUtils; @@ -55,10 +37,28 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.metron.common.Constants.GUID; + public class ElasticsearchUtils { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -67,6 +67,14 @@ public class ElasticsearchUtils { private static final String USERNAME_CONFIG_KEY = "es.xpack.username"; private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user"; + /** + * Defines which message field, the document identifier is set to. + * + *

If defined, the value of the specified message field is set as the Elasticsearch doc ID. If + * this field is undefined or blank, then the document identifier is not set. + */ + public static final String DOC_ID_SOURCE_FIELD = "es.document.id"; + public static final String DOC_ID_SOURCE_FIELD_DEFAULT = ""; private static ThreadLocal> DATE_FORMAT_CACHE = ThreadLocal.withInitial(() -> new HashMap<>()); diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 546d6c1433..9a18e8cbdc 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -45,7 +45,8 @@ import java.util.List; import java.util.Map; -import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.DOC_ID_SOURCE_FIELD; +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD; +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD_DEFAULT; /** * A {@link BulkMessageWriter} that writes messages to Elasticsearch. @@ -80,7 +81,7 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura final String indexPostfix = dateFormat.format(new Date()); final String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); final String docType = sensorType + "_doc"; - final String docIdSourceField = DOC_ID_SOURCE_FIELD.get(configurations.getGlobalConfig(), String.class); + final String docIdSourceField = (String) configurations.getGlobalConfig().getOrDefault(DOC_ID_SOURCE_FIELD, DOC_ID_SOURCE_FIELD_DEFAULT); BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject message: messages) { diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java deleted file mode 100644 index 3f41f2fd3f..0000000000 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.elasticsearch.writer; - -import org.apache.metron.common.Constants; -import org.apache.metron.stellar.common.utils.ConversionUtils; - -import java.util.Map; - -/** - * Configuration settings that customize the behavior of the {@link ElasticsearchWriter}. - */ -public enum ElasticsearchWriterConfig { - - /** - * Defines which message field, the document identifier is set to. - * - *

If defined, the value of the specified message field is set as the Elasticsearch doc ID. If - * this field is undefined or blank, then the document identifier is not set. - */ - DOC_ID_SOURCE_FIELD("es.document.id", "", String.class); - - /** - * The key for the configuration value. - */ - private String key; - - /** - * The default value of the configuration, if none other is specified. - */ - private Object defaultValue; - - /** - * The type of the configuration value. - */ - private Class valueType; - - ElasticsearchWriterConfig(String key, Object defaultValue, Class valueType) { - this.key = key; - this.defaultValue = defaultValue; - this.valueType = valueType; - } - - /** - * Returns the key of the configuration value. - */ - public String getKey() { - return key; - } - - /** - * Returns the default value of the configuration. - */ - public Object getDefault() { - return getDefault(valueType); - } - - /** - * Returns the default value of the configuration, cast to the expected type. - * - * @param clazz The class of the expected type of the configuration value. - * @param The expected type of the configuration value. - */ - public T getDefault(Class clazz) { - return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz); - } - - /** - * Returns the configuration value from a map of configuration values. - * - * @param config A map containing configuration values. - */ - public Object get(Map config) { - return getOrDefault(config, defaultValue); - } - - /** - * Returns the configuration value from a map of configuration values, cast to the expected type. - * - * @param config A map containing configuration values. - */ - public T get(Map config, Class clazz) { - return getOrDefault(config, defaultValue, clazz); - } - - /** - * Returns the configuration value from a map of configuration values. If the value is not specified, - * the default value is returned. - * - * @param config A map containing configuration values. - * @param defaultValue The default value to return, if one is not defined. - * @return The configuration value or the specified default, if one is not defined. - */ - private Object getOrDefault(Map config, Object defaultValue) { - return getOrDefault(config, defaultValue, valueType); - } - - /** - * Returns the configuration value, cast to the expected type, from a map of configuration values. - * If the value is not specified, the default value is returned. - * - * @param config A map containing configuration values. - * @param defaultValue The default value to return, if one is not defined. - * @param clazz The class of the expected type of the configuration value. - * @param The expected type of the configuration value. - * @return The configuration value or the specified default, if one is not defined. - */ - private T getOrDefault(Map config, Object defaultValue, Class clazz) { - Object value = config.getOrDefault(key, defaultValue.toString()); - return value == null ? null : ConversionUtils.convert(value, clazz); - } - - @Override - public String toString() { - return key; - } -} From d98a81d2a4650b62d87a4c3382b11c0858803df3 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Thu, 11 Oct 2018 14:08:21 -0400 Subject: [PATCH 15/16] No need to fix in this PR. Others have already fixed this elsewhere in master --- metron-deployment/ansible/callback_plugins/.gitignore | 1 - 1 file changed, 1 deletion(-) delete mode 100644 metron-deployment/ansible/callback_plugins/.gitignore diff --git a/metron-deployment/ansible/callback_plugins/.gitignore b/metron-deployment/ansible/callback_plugins/.gitignore deleted file mode 100644 index 0d20b6487c..0000000000 --- a/metron-deployment/ansible/callback_plugins/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.pyc From 57cc1408d50891be04824cfade64ebcfcfd69ebe Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Thu, 11 Oct 2018 16:37:12 -0400 Subject: [PATCH 16/16] Refactor Elasticsearch configuration settings --- .../utils/ElasticsearchUtils.java | 77 +++--- .../writer/ElasticsearchWriter.java | 5 +- .../writer/ElasticsearchWriterConfig.java | 234 ++++++++++++++++++ .../ElasticsearchIndexingIntegrationTest.java | 11 +- ...ElasticsearchMetaAlertIntegrationTest.java | 55 ++-- .../ElasticsearchSearchIntegrationTest.java | 27 +- .../ElasticsearchUpdateIntegrationTest.java | 28 ++- 7 files changed, 347 insertions(+), 90 deletions(-) create mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 31503faa1e..c06f1c236f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -17,6 +17,7 @@ */ package org.apache.metron.elasticsearch.utils; +import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -58,24 +59,19 @@ import static java.lang.String.format; import static org.apache.metron.common.Constants.GUID; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLIENT_CLASS; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_XPACK_PASSWORD_FILE; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_XPACK_USERNAME; public class ElasticsearchUtils { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient"; - private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file"; - private static final String USERNAME_CONFIG_KEY = "es.xpack.username"; private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user"; - /** - * Defines which message field, the document identifier is set to. - * - *

If defined, the value of the specified message field is set as the Elasticsearch doc ID. If - * this field is undefined or blank, then the document identifier is not set. - */ - public static final String DOC_ID_SOURCE_FIELD = "es.document.id"; - public static final String DOC_ID_SOURCE_FIELD_DEFAULT = ""; - private static ThreadLocal> DATE_FORMAT_CACHE = ThreadLocal.withInitial(() -> new HashMap<>()); @@ -94,7 +90,7 @@ public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations } public static SimpleDateFormat getIndexFormat(Map globalConfig) { - String format = (String) globalConfig.get("es.date.format"); + String format = ELASTICSEARCH_DATE_FORMAT.get(globalConfig, String.class); return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new); } @@ -142,7 +138,10 @@ public static String getBaseIndexName(String indexName) { */ public static TransportClient getClient(Map globalConfiguration) { Set customESSettings = new HashSet<>(); - customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY)); + customESSettings.addAll(Arrays.asList( + ELASTICSEARCH_CLIENT_CLASS.getKey(), + ELASTICSEARCH_XPACK_USERNAME.getKey(), + ELASTICSEARCH_XPACK_PASSWORD_FILE.getKey())); Settings.Builder settingsBuilder = Settings.builder(); Map esSettings = getEsSettings(globalConfiguration); for (Map.Entry entry : esSettings.entrySet()) { @@ -152,7 +151,7 @@ public static TransportClient getClient(Map globalConfiguration) settingsBuilder.put(key, value); } } - settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); + settingsBuilder.put("cluster.name", ELASTICSEARCH_CLUSTER.get(globalConfiguration)); settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s")); setXPackSecurityOrNone(settingsBuilder, esSettings); @@ -182,25 +181,34 @@ private static Map getEsSettings(Map config) { } /* + * Append Xpack security settings (if any) */ - private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map esSettings) { - if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) { - - if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) { - throw new IllegalArgumentException("X-pack username is required and cannot be empty"); - } + /** + * Define X-Pack security settings, if required. + * + * @param settingsBuilder The X-pack security settings are appended to this. + * @param esSettings The Elasticsearch settings. + */ + private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map esSettings) { + if(ELASTICSEARCH_XPACK_PASSWORD_FILE.isStringDefined(esSettings)) { + LOG.info("Setting X-Pack security settings"); - settingsBuilder.put( - TRANSPORT_CLIENT_USER_KEY, - esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY)) - ); + String username = ELASTICSEARCH_XPACK_USERNAME.getString(esSettings); + String password = getPasswordFromFile(ELASTICSEARCH_XPACK_PASSWORD_FILE.getString(esSettings)); + settingsBuilder.put(TRANSPORT_CLIENT_USER_KEY, Joiner.on(":").join(username, password)); } } - /* - * Single password on first line + /** + * Retrieve the X-Pack password from a file in HDFS. + * + *

The file must contain a single password on the first line. + * + * @param hdfsPath The path to the password file in HDFS. + * @return The password stored in the file. + * @throws IllegalArgumentException If unable to read the password from the file. */ private static String getPasswordFromFile(String hdfsPath) { List lines = null; @@ -223,12 +231,11 @@ private static String getPasswordFromFile(String hdfsPath) { * @param esSettings client type to instantiate * @return client with provided settings */ - private static TransportClient createTransportClient(Settings settings, - Map esSettings) { - String esClientClassName = (String) esSettings - .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT); - return ReflectionUtils - .createInstance(esClientClassName, new Class[]{Settings.class, Class[].class}, + private static TransportClient createTransportClient(Settings settings, Map esSettings) { + String esClientClassName = ELASTICSEARCH_CLIENT_CLASS.getString(esSettings); + return ReflectionUtils.createInstance( + esClientClassName, + new Class[]{Settings.class, Class[].class}, new Object[]{settings, new Class[0]}); } @@ -242,8 +249,8 @@ public HostnamePort(String hostname, Integer port) { } protected static List getIps(Map globalConfiguration) { - Object ipObj = globalConfiguration.get("es.ip"); - Object portObj = globalConfiguration.get("es.port"); + Object ipObj = ELASTICSEARCH_IP.get(globalConfiguration, Object.class); + Object portObj = ELASTICSEARCH_PORT.get(globalConfiguration, Object.class); if(ipObj == null) { return Collections.emptyList(); } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 9a18e8cbdc..f3eacd3086 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -45,8 +45,7 @@ import java.util.List; import java.util.Map; -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD; -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD_DEFAULT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DOC_ID; /** * A {@link BulkMessageWriter} that writes messages to Elasticsearch. @@ -81,7 +80,7 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura final String indexPostfix = dateFormat.format(new Date()); final String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); final String docType = sensorType + "_doc"; - final String docIdSourceField = (String) configurations.getGlobalConfig().getOrDefault(DOC_ID_SOURCE_FIELD, DOC_ID_SOURCE_FIELD_DEFAULT); + final String docIdSourceField = ELASTICSEARCH_DOC_ID.get(configurations.getGlobalConfig(), String.class); BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject message: messages) { diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java new file mode 100644 index 0000000000..b92cf0dac6 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.elasticsearch.writer; + +import org.apache.metron.stellar.common.utils.ConversionUtils; + +import java.util.Map; + +/** + * Configuration settings that customize the behavior of the {@link ElasticsearchWriter}. + */ +public enum ElasticsearchWriterConfig { + + /** + * The name of the Elasticsearch cluster. + * + *

This is optional and defaults to 'metron'. + */ + ELASTICSEARCH_CLUSTER("es.clustername", "metron", String.class, false), + + /** + * Defines the nodes in the Elasticsearch cluster. + * + *

This is a required configuration. + */ + ELASTICSEARCH_IP("es.ip", "", Object.class, true), + + /** + * Defines the port to use when connecting with the Elasticsearch cluster. + * + *

This is optional and defaults to '9300'. + */ + ELASTICSEARCH_PORT("es.port", "9300", String.class, false), + + /** + * The date format to use when constructing the indices. + * + *

This is optional and defaults to 'yyyy.MM.dd.HH' which rolls the indices hourly. + */ + ELASTICSEARCH_DATE_FORMAT("es.date.format", "yyyy.MM.dd.HH", String.class, false), + + /** + * Defines which message field, the document identifier is set to. + * + *

This is optional and defaults to not setting the document ID. + */ + ELASTICSEARCH_DOC_ID("es.document.id", "", String.class, false), + + /** + * The class used for the Elasticsearch client. + * + *

This is an optional configuration. + */ + ELASTICSEARCH_CLIENT_CLASS("es.client.class", "org.elasticsearch.transport.client.PreBuiltTransportClient", String.class, false), + + /** + * Defines the X-Pack username. + * + *

This is a required configuration. + */ + ELASTICSEARCH_XPACK_USERNAME("es.xpack.username", "", String.class, true), + + /** + * Defines the path in HDFS to a file containing the X-Pack password. + * + *

This is a required configuration. + */ + ELASTICSEARCH_XPACK_PASSWORD_FILE("es.xpack.password.file", "", String.class, true); + + /** + * The key for the configuration value. + */ + private String key; + + /** + * The default value of the configuration, if none other is specified. + */ + private Object defaultValue; + + /** + * The type of the configuration value. + */ + private Class valueType; + + /** + * If the property is required. False indicates that the property is optional. + */ + private boolean isRequired; + + ElasticsearchWriterConfig(String key, Object defaultValue, Class valueType, boolean isRequired) { + this.key = key; + this.defaultValue = defaultValue; + this.valueType = valueType; + } + + /** + * Returns the key of the configuration value. + */ + public String getKey() { + return key; + } + + /** + * Returns the default value of the configuration. + */ + public Object getDefault() { + return getDefault(valueType); + } + + /** + * Returns the default value of the configuration, cast to the expected type. + * + * @param clazz The class of the expected type of the configuration value. + * @param The expected type of the configuration value. + */ + public T getDefault(Class clazz) { + return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values. + * + * @param config A map containing configuration values. + */ + public Object get(Map config) { + return getOrDefault(config, defaultValue); + } + + /** + * Returns true if the configuration is defined. + * + * @param config A map containing configuration values. + * @return True, if the configuration is defined. Otherwise, false. + */ + public boolean isDefined(Map config) { + return config != null && config.containsKey(key); + } + + /** + * Returns true if the configuration is defined. + * + * @param config A map containing configuration values. + * @return True, if the configuration is defined. Otherwise, false. + */ + public boolean isStringDefined(Map config) { + return config != null && config.containsKey(key); + } + + /** + * Returns the configuration value from a map of configuration values, cast to the expected type. + * + * @param config A map containing configuration values. + */ + public T get(Map config, Class clazz) { + return getOrDefault(config, defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values, cast to the expected type. + * + * @param config A map containing configuration values. + */ + public String getString(Map config) { + return getStringOrDefault(config, defaultValue); + } + + /** + * Returns the configuration value from a map of configuration values. If the value is not specified, + * the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @return The configuration value or the specified default, if one is not defined. + */ + private Object getOrDefault(Map config, Object defaultValue) { + return getOrDefault(config, defaultValue, valueType); + } + + /** + * Returns the configuration value, cast to the expected type, from a map of configuration values. + * If the value is not specified, the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @param clazz The class of the expected type of the configuration value. + * @param The expected type of the configuration value. + * @return The configuration value or the specified default, if one is not defined. + */ + private T getOrDefault(Map config, Object defaultValue, Class clazz) { + if(isRequired && !config.containsKey(key)) { + throw new IllegalArgumentException("Missing required configuration; " + key); + } + Object value = config.getOrDefault(key, defaultValue); + return value == null ? null : ConversionUtils.convert(value, clazz); + } + + /** + * Returns the configuration value, cast to the expected type, from a map of configuration values. + * If the value is not specified, the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @return The configuration value or the specified default, if one is not defined. + */ + private String getStringOrDefault(Map config, Object defaultValue) { + if(isRequired && !config.containsKey(key)) { + throw new IllegalArgumentException("Missing required configuration; " + key); + } + Object value = config.getOrDefault(key, defaultValue.toString()); + return value == null ? null : ConversionUtils.convert(value, String.class); + } + + @Override + public String toString() { + return key; + } +} diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index df5e96a8f8..a51afa99c6 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -18,7 +18,6 @@ package org.apache.metron.elasticsearch.integration; import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.field.DeDotFieldNameConverter; import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; @@ -39,6 +38,10 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; + public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTest { private String indexDir = "target/elasticsearch"; @@ -114,9 +117,9 @@ public ProcessorResult>> getResult() { @Override public void setAdditionalProperties(Properties topologyProperties) { - topologyProperties.setProperty("es.clustername", "metron"); - topologyProperties.setProperty("es.port", "9300"); - topologyProperties.setProperty("es.ip", "localhost"); + topologyProperties.setProperty(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + topologyProperties.setProperty(ELASTICSEARCH_PORT.getKey(), "9300"); + topologyProperties.setProperty(ELASTICSEARCH_IP.getKey(), "localhost"); topologyProperties.setProperty("ra_indexing_writer_class_name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter"); topologyProperties.setProperty("ra_indexing_kafka_start", "UNCOMMITTED_EARLIEST"); topologyProperties.setProperty("ra_indexing_workers", "1"); diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java index c05efc1195..3f909f7fdd 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java @@ -19,28 +19,7 @@ package org.apache.metron.elasticsearch.integration; -import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; - import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - import com.google.common.collect.ImmutableList; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; @@ -50,7 +29,6 @@ import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.metaalert.MetaAlertDao; import org.apache.metron.indexing.dao.metaalert.MetaAlertIntegrationTest; import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.GetRequest; @@ -66,6 +44,31 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; + @RunWith(Parameterized.class) public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationTest { @@ -143,10 +146,10 @@ public void setup() throws IOException { AccessConfig accessConfig = new AccessConfig(); Map globalConfig = new HashMap() { { - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", DATE_FORMAT); + put(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + put(ELASTICSEARCH_PORT.getKey(), "9300"); + put(ELASTICSEARCH_IP.getKey(), "localhost"); + put(ELASTICSEARCH_DATE_FORMAT.getKey(), DATE_FORMAT); } }; accessConfig.setMaxSearchResults(1000); diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index 36f87512c2..ddf3114adb 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -17,13 +17,6 @@ */ package org.apache.metron.elasticsearch.integration; - -import java.io.File; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; @@ -51,6 +44,18 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; + public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { private static String indexDir = "target/elasticsearch_search"; @@ -199,10 +204,10 @@ protected static IndexDao createDao() { config.setMaxSearchGroups(100); config.setGlobalConfigSupplier( () -> new HashMap() {{ - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); + put(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + put(ELASTICSEARCH_PORT.getKey(), "9300"); + put(ELASTICSEARCH_IP.getKey(), "localhost"); + put(ELASTICSEARCH_DATE_FORMAT.getKey(), dateFormat); }} ); diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java index c5c0bc1509..26efbc05e1 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java @@ -19,13 +19,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Iterables; -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.metron.common.utils.JSONUtils; @@ -44,6 +37,19 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; + public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { private static final String SENSOR_NAME= "test"; private static String indexDir = "target/elasticsearch_mutation"; @@ -103,10 +109,10 @@ public static void teardown() { protected static Map createGlobalConfig() { return new HashMap() {{ - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); + put(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + put(ELASTICSEARCH_PORT.getKey(), "9300"); + put(ELASTICSEARCH_IP.getKey(), "localhost"); + put(ELASTICSEARCH_DATE_FORMAT.getKey(), dateFormat); }}; }