diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 31503faa1e..c06f1c236f 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -17,6 +17,7 @@ */ package org.apache.metron.elasticsearch.utils; +import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -58,24 +59,19 @@ import static java.lang.String.format; import static org.apache.metron.common.Constants.GUID; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLIENT_CLASS; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_XPACK_PASSWORD_FILE; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_XPACK_USERNAME; public class ElasticsearchUtils { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient"; - private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file"; - private static final String USERNAME_CONFIG_KEY = "es.xpack.username"; private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user"; - /** - * Defines which message field, the document identifier is set to. - * - *

If defined, the value of the specified message field is set as the Elasticsearch doc ID. If - * this field is undefined or blank, then the document identifier is not set. - */ - public static final String DOC_ID_SOURCE_FIELD = "es.document.id"; - public static final String DOC_ID_SOURCE_FIELD_DEFAULT = ""; - private static ThreadLocal> DATE_FORMAT_CACHE = ThreadLocal.withInitial(() -> new HashMap<>()); @@ -94,7 +90,7 @@ public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations } public static SimpleDateFormat getIndexFormat(Map globalConfig) { - String format = (String) globalConfig.get("es.date.format"); + String format = ELASTICSEARCH_DATE_FORMAT.get(globalConfig, String.class); return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new); } @@ -142,7 +138,10 @@ public static String getBaseIndexName(String indexName) { */ public static TransportClient getClient(Map globalConfiguration) { Set customESSettings = new HashSet<>(); - customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY)); + customESSettings.addAll(Arrays.asList( + ELASTICSEARCH_CLIENT_CLASS.getKey(), + ELASTICSEARCH_XPACK_USERNAME.getKey(), + ELASTICSEARCH_XPACK_PASSWORD_FILE.getKey())); Settings.Builder settingsBuilder = Settings.builder(); Map esSettings = getEsSettings(globalConfiguration); for (Map.Entry entry : esSettings.entrySet()) { @@ -152,7 +151,7 @@ public static TransportClient getClient(Map globalConfiguration) settingsBuilder.put(key, value); } } - settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); + settingsBuilder.put("cluster.name", ELASTICSEARCH_CLUSTER.get(globalConfiguration)); settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s")); setXPackSecurityOrNone(settingsBuilder, esSettings); @@ -182,25 +181,34 @@ private static Map getEsSettings(Map config) { } /* + * Append Xpack security settings (if any) */ - private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map esSettings) { - if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) { - - if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) { - throw new IllegalArgumentException("X-pack username is required and cannot be empty"); - } + /** + * Define X-Pack security settings, if required. + * + * @param settingsBuilder The X-pack security settings are appended to this. + * @param esSettings The Elasticsearch settings. + */ + private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map esSettings) { + if(ELASTICSEARCH_XPACK_PASSWORD_FILE.isStringDefined(esSettings)) { + LOG.info("Setting X-Pack security settings"); - settingsBuilder.put( - TRANSPORT_CLIENT_USER_KEY, - esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY)) - ); + String username = ELASTICSEARCH_XPACK_USERNAME.getString(esSettings); + String password = getPasswordFromFile(ELASTICSEARCH_XPACK_PASSWORD_FILE.getString(esSettings)); + settingsBuilder.put(TRANSPORT_CLIENT_USER_KEY, Joiner.on(":").join(username, password)); } } - /* - * Single password on first line + /** + * Retrieve the X-Pack password from a file in HDFS. + * + *

The file must contain a single password on the first line. + * + * @param hdfsPath The path to the password file in HDFS. + * @return The password stored in the file. + * @throws IllegalArgumentException If unable to read the password from the file. */ private static String getPasswordFromFile(String hdfsPath) { List lines = null; @@ -223,12 +231,11 @@ private static String getPasswordFromFile(String hdfsPath) { * @param esSettings client type to instantiate * @return client with provided settings */ - private static TransportClient createTransportClient(Settings settings, - Map esSettings) { - String esClientClassName = (String) esSettings - .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT); - return ReflectionUtils - .createInstance(esClientClassName, new Class[]{Settings.class, Class[].class}, + private static TransportClient createTransportClient(Settings settings, Map esSettings) { + String esClientClassName = ELASTICSEARCH_CLIENT_CLASS.getString(esSettings); + return ReflectionUtils.createInstance( + esClientClassName, + new Class[]{Settings.class, Class[].class}, new Object[]{settings, new Class[0]}); } @@ -242,8 +249,8 @@ public HostnamePort(String hostname, Integer port) { } protected static List getIps(Map globalConfiguration) { - Object ipObj = globalConfiguration.get("es.ip"); - Object portObj = globalConfiguration.get("es.port"); + Object ipObj = ELASTICSEARCH_IP.get(globalConfiguration, Object.class); + Object portObj = ELASTICSEARCH_PORT.get(globalConfiguration, Object.class); if(ipObj == null) { return Collections.emptyList(); } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 9a18e8cbdc..f3eacd3086 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -45,8 +45,7 @@ import java.util.List; import java.util.Map; -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD; -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD_DEFAULT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DOC_ID; /** * A {@link BulkMessageWriter} that writes messages to Elasticsearch. @@ -81,7 +80,7 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura final String indexPostfix = dateFormat.format(new Date()); final String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); final String docType = sensorType + "_doc"; - final String docIdSourceField = (String) configurations.getGlobalConfig().getOrDefault(DOC_ID_SOURCE_FIELD, DOC_ID_SOURCE_FIELD_DEFAULT); + final String docIdSourceField = ELASTICSEARCH_DOC_ID.get(configurations.getGlobalConfig(), String.class); BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject message: messages) { diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java new file mode 100644 index 0000000000..b92cf0dac6 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterConfig.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.elasticsearch.writer; + +import org.apache.metron.stellar.common.utils.ConversionUtils; + +import java.util.Map; + +/** + * Configuration settings that customize the behavior of the {@link ElasticsearchWriter}. + */ +public enum ElasticsearchWriterConfig { + + /** + * The name of the Elasticsearch cluster. + * + *

This is optional and defaults to 'metron'. + */ + ELASTICSEARCH_CLUSTER("es.clustername", "metron", String.class, false), + + /** + * Defines the nodes in the Elasticsearch cluster. + * + *

This is a required configuration. + */ + ELASTICSEARCH_IP("es.ip", "", Object.class, true), + + /** + * Defines the port to use when connecting with the Elasticsearch cluster. + * + *

This is optional and defaults to '9300'. + */ + ELASTICSEARCH_PORT("es.port", "9300", String.class, false), + + /** + * The date format to use when constructing the indices. + * + *

This is optional and defaults to 'yyyy.MM.dd.HH' which rolls the indices hourly. + */ + ELASTICSEARCH_DATE_FORMAT("es.date.format", "yyyy.MM.dd.HH", String.class, false), + + /** + * Defines which message field, the document identifier is set to. + * + *

This is optional and defaults to not setting the document ID. + */ + ELASTICSEARCH_DOC_ID("es.document.id", "", String.class, false), + + /** + * The class used for the Elasticsearch client. + * + *

This is an optional configuration. + */ + ELASTICSEARCH_CLIENT_CLASS("es.client.class", "org.elasticsearch.transport.client.PreBuiltTransportClient", String.class, false), + + /** + * Defines the X-Pack username. + * + *

This is a required configuration. + */ + ELASTICSEARCH_XPACK_USERNAME("es.xpack.username", "", String.class, true), + + /** + * Defines the path in HDFS to a file containing the X-Pack password. + * + *

This is a required configuration. + */ + ELASTICSEARCH_XPACK_PASSWORD_FILE("es.xpack.password.file", "", String.class, true); + + /** + * The key for the configuration value. + */ + private String key; + + /** + * The default value of the configuration, if none other is specified. + */ + private Object defaultValue; + + /** + * The type of the configuration value. + */ + private Class valueType; + + /** + * If the property is required. False indicates that the property is optional. + */ + private boolean isRequired; + + ElasticsearchWriterConfig(String key, Object defaultValue, Class valueType, boolean isRequired) { + this.key = key; + this.defaultValue = defaultValue; + this.valueType = valueType; + } + + /** + * Returns the key of the configuration value. + */ + public String getKey() { + return key; + } + + /** + * Returns the default value of the configuration. + */ + public Object getDefault() { + return getDefault(valueType); + } + + /** + * Returns the default value of the configuration, cast to the expected type. + * + * @param clazz The class of the expected type of the configuration value. + * @param The expected type of the configuration value. + */ + public T getDefault(Class clazz) { + return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values. + * + * @param config A map containing configuration values. + */ + public Object get(Map config) { + return getOrDefault(config, defaultValue); + } + + /** + * Returns true if the configuration is defined. + * + * @param config A map containing configuration values. + * @return True, if the configuration is defined. Otherwise, false. + */ + public boolean isDefined(Map config) { + return config != null && config.containsKey(key); + } + + /** + * Returns true if the configuration is defined. + * + * @param config A map containing configuration values. + * @return True, if the configuration is defined. Otherwise, false. + */ + public boolean isStringDefined(Map config) { + return config != null && config.containsKey(key); + } + + /** + * Returns the configuration value from a map of configuration values, cast to the expected type. + * + * @param config A map containing configuration values. + */ + public T get(Map config, Class clazz) { + return getOrDefault(config, defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values, cast to the expected type. + * + * @param config A map containing configuration values. + */ + public String getString(Map config) { + return getStringOrDefault(config, defaultValue); + } + + /** + * Returns the configuration value from a map of configuration values. If the value is not specified, + * the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @return The configuration value or the specified default, if one is not defined. + */ + private Object getOrDefault(Map config, Object defaultValue) { + return getOrDefault(config, defaultValue, valueType); + } + + /** + * Returns the configuration value, cast to the expected type, from a map of configuration values. + * If the value is not specified, the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @param clazz The class of the expected type of the configuration value. + * @param The expected type of the configuration value. + * @return The configuration value or the specified default, if one is not defined. + */ + private T getOrDefault(Map config, Object defaultValue, Class clazz) { + if(isRequired && !config.containsKey(key)) { + throw new IllegalArgumentException("Missing required configuration; " + key); + } + Object value = config.getOrDefault(key, defaultValue); + return value == null ? null : ConversionUtils.convert(value, clazz); + } + + /** + * Returns the configuration value, cast to the expected type, from a map of configuration values. + * If the value is not specified, the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @return The configuration value or the specified default, if one is not defined. + */ + private String getStringOrDefault(Map config, Object defaultValue) { + if(isRequired && !config.containsKey(key)) { + throw new IllegalArgumentException("Missing required configuration; " + key); + } + Object value = config.getOrDefault(key, defaultValue.toString()); + return value == null ? null : ConversionUtils.convert(value, String.class); + } + + @Override + public String toString() { + return key; + } +} diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index df5e96a8f8..a51afa99c6 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -18,7 +18,6 @@ package org.apache.metron.elasticsearch.integration; import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.field.DeDotFieldNameConverter; import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; @@ -39,6 +38,10 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; + public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTest { private String indexDir = "target/elasticsearch"; @@ -114,9 +117,9 @@ public ProcessorResult>> getResult() { @Override public void setAdditionalProperties(Properties topologyProperties) { - topologyProperties.setProperty("es.clustername", "metron"); - topologyProperties.setProperty("es.port", "9300"); - topologyProperties.setProperty("es.ip", "localhost"); + topologyProperties.setProperty(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + topologyProperties.setProperty(ELASTICSEARCH_PORT.getKey(), "9300"); + topologyProperties.setProperty(ELASTICSEARCH_IP.getKey(), "localhost"); topologyProperties.setProperty("ra_indexing_writer_class_name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter"); topologyProperties.setProperty("ra_indexing_kafka_start", "UNCOMMITTED_EARLIEST"); topologyProperties.setProperty("ra_indexing_workers", "1"); diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java index c05efc1195..3f909f7fdd 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java @@ -19,28 +19,7 @@ package org.apache.metron.elasticsearch.integration; -import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; - import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - import com.google.common.collect.ImmutableList; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; @@ -50,7 +29,6 @@ import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.metaalert.MetaAlertDao; import org.apache.metron.indexing.dao.metaalert.MetaAlertIntegrationTest; import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.GetRequest; @@ -66,6 +44,31 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; + @RunWith(Parameterized.class) public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationTest { @@ -143,10 +146,10 @@ public void setup() throws IOException { AccessConfig accessConfig = new AccessConfig(); Map globalConfig = new HashMap() { { - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", DATE_FORMAT); + put(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + put(ELASTICSEARCH_PORT.getKey(), "9300"); + put(ELASTICSEARCH_IP.getKey(), "localhost"); + put(ELASTICSEARCH_DATE_FORMAT.getKey(), DATE_FORMAT); } }; accessConfig.setMaxSearchResults(1000); diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index df41cbae49..f6e07e7ede 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -17,15 +17,6 @@ */ package org.apache.metron.elasticsearch.integration; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; @@ -52,6 +43,19 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; + public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { private static String indexDir = "target/elasticsearch_search"; @@ -76,10 +80,10 @@ protected static IndexDao createDao() { config.setMaxSearchGroups(100); config.setGlobalConfigSupplier( () -> new HashMap() {{ - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); + put(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + put(ELASTICSEARCH_PORT.getKey(), "9300"); + put(ELASTICSEARCH_IP.getKey(), "localhost"); + put(ELASTICSEARCH_DATE_FORMAT.getKey(), dateFormat); }} ); diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java index c5c0bc1509..26efbc05e1 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java @@ -19,13 +19,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Iterables; -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.metron.common.utils.JSONUtils; @@ -44,6 +37,19 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP; +import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT; + public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { private static final String SENSOR_NAME= "test"; private static String indexDir = "target/elasticsearch_mutation"; @@ -103,10 +109,10 @@ public static void teardown() { protected static Map createGlobalConfig() { return new HashMap() {{ - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); + put(ELASTICSEARCH_CLUSTER.getKey(), "metron"); + put(ELASTICSEARCH_PORT.getKey(), "9300"); + put(ELASTICSEARCH_IP.getKey(), "localhost"); + put(ELASTICSEARCH_DATE_FORMAT.getKey(), dateFormat); }}; }