Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.metron.elasticsearch.utils;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -58,24 +59,19 @@

import static java.lang.String.format;
import static org.apache.metron.common.Constants.GUID;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLIENT_CLASS;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_CLUSTER;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DATE_FORMAT;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_IP;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_PORT;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_XPACK_PASSWORD_FILE;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_XPACK_USERNAME;

public class ElasticsearchUtils {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient";
private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file";
private static final String USERNAME_CONFIG_KEY = "es.xpack.username";
private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user";

/**
* Defines which message field, the document identifier is set to.
*
* <p>If defined, the value of the specified message field is set as the Elasticsearch doc ID. If
* this field is undefined or blank, then the document identifier is not set.
*/
public static final String DOC_ID_SOURCE_FIELD = "es.document.id";
public static final String DOC_ID_SOURCE_FIELD_DEFAULT = "";

private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
= ThreadLocal.withInitial(() -> new HashMap<>());

Expand All @@ -94,7 +90,7 @@ public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations
}

public static SimpleDateFormat getIndexFormat(Map<String, Object> globalConfig) {
String format = (String) globalConfig.get("es.date.format");
String format = ELASTICSEARCH_DATE_FORMAT.get(globalConfig, String.class);
return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new);
}

Expand Down Expand Up @@ -142,7 +138,10 @@ public static String getBaseIndexName(String indexName) {
*/
public static TransportClient getClient(Map<String, Object> globalConfiguration) {
Set<String> customESSettings = new HashSet<>();
customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY));
customESSettings.addAll(Arrays.asList(
ELASTICSEARCH_CLIENT_CLASS.getKey(),
ELASTICSEARCH_XPACK_USERNAME.getKey(),
ELASTICSEARCH_XPACK_PASSWORD_FILE.getKey()));
Settings.Builder settingsBuilder = Settings.builder();
Map<String, String> esSettings = getEsSettings(globalConfiguration);
for (Map.Entry<String, String> entry : esSettings.entrySet()) {
Expand All @@ -152,7 +151,7 @@ public static TransportClient getClient(Map<String, Object> globalConfiguration)
settingsBuilder.put(key, value);
}
}
settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername"));
settingsBuilder.put("cluster.name", ELASTICSEARCH_CLUSTER.get(globalConfiguration));
settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s"));
setXPackSecurityOrNone(settingsBuilder, esSettings);

Expand Down Expand Up @@ -182,25 +181,34 @@ private static Map<String, String> getEsSettings(Map<String, Object> config) {
}

/*

* Append Xpack security settings (if any)
*/
private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) {

if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) {

if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) {
throw new IllegalArgumentException("X-pack username is required and cannot be empty");
}
/**
* Define X-Pack security settings, if required.
*
* @param settingsBuilder The X-pack security settings are appended to this.
* @param esSettings The Elasticsearch settings.
*/
private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) {
if(ELASTICSEARCH_XPACK_PASSWORD_FILE.isStringDefined(esSettings)) {
LOG.info("Setting X-Pack security settings");

settingsBuilder.put(
TRANSPORT_CLIENT_USER_KEY,
esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))
);
String username = ELASTICSEARCH_XPACK_USERNAME.getString(esSettings);
String password = getPasswordFromFile(ELASTICSEARCH_XPACK_PASSWORD_FILE.getString(esSettings));
settingsBuilder.put(TRANSPORT_CLIENT_USER_KEY, Joiner.on(":").join(username, password));
}
}

/*
* Single password on first line
/**
* Retrieve the X-Pack password from a file in HDFS.
*
* <p>The file must contain a single password on the first line.
*
* @param hdfsPath The path to the password file in HDFS.
* @return The password stored in the file.
* @throws IllegalArgumentException If unable to read the password from the file.
*/
private static String getPasswordFromFile(String hdfsPath) {
List<String> lines = null;
Expand All @@ -223,12 +231,11 @@ private static String getPasswordFromFile(String hdfsPath) {
* @param esSettings client type to instantiate
* @return client with provided settings
*/
private static TransportClient createTransportClient(Settings settings,
Map<String, String> esSettings) {
String esClientClassName = (String) esSettings
.getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT);
return ReflectionUtils
.createInstance(esClientClassName, new Class[]{Settings.class, Class[].class},
private static TransportClient createTransportClient(Settings settings, Map<String, String> esSettings) {
String esClientClassName = ELASTICSEARCH_CLIENT_CLASS.getString(esSettings);
return ReflectionUtils.createInstance(
esClientClassName,
new Class[]{Settings.class, Class[].class},
new Object[]{settings, new Class[0]});
}

Expand All @@ -242,8 +249,8 @@ public HostnamePort(String hostname, Integer port) {
}

protected static List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
Object ipObj = globalConfiguration.get("es.ip");
Object portObj = globalConfiguration.get("es.port");
Object ipObj = ELASTICSEARCH_IP.get(globalConfiguration, Object.class);
Object portObj = ELASTICSEARCH_PORT.get(globalConfiguration, Object.class);
if(ipObj == null) {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD;
import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD_DEFAULT;
import static org.apache.metron.elasticsearch.writer.ElasticsearchWriterConfig.ELASTICSEARCH_DOC_ID;

/**
* A {@link BulkMessageWriter} that writes messages to Elasticsearch.
Expand Down Expand Up @@ -81,7 +80,7 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura
final String indexPostfix = dateFormat.format(new Date());
final String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
final String docType = sensorType + "_doc";
final String docIdSourceField = (String) configurations.getGlobalConfig().getOrDefault(DOC_ID_SOURCE_FIELD, DOC_ID_SOURCE_FIELD_DEFAULT);
final String docIdSourceField = ELASTICSEARCH_DOC_ID.get(configurations.getGlobalConfig(), String.class);

BulkRequestBuilder bulkRequest = client.prepareBulk();
for(JSONObject message: messages) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.metron.elasticsearch.writer;

import org.apache.metron.stellar.common.utils.ConversionUtils;

import java.util.Map;

/**
* Configuration settings that customize the behavior of the {@link ElasticsearchWriter}.
*/
public enum ElasticsearchWriterConfig {

/**
* The name of the Elasticsearch cluster.
*
* <p>This is optional and defaults to 'metron'.
*/
ELASTICSEARCH_CLUSTER("es.clustername", "metron", String.class, false),

/**
* Defines the nodes in the Elasticsearch cluster.
*
* <p>This is a required configuration.
*/
ELASTICSEARCH_IP("es.ip", "", Object.class, true),

/**
* Defines the port to use when connecting with the Elasticsearch cluster.
*
* <p>This is optional and defaults to '9300'.
*/
ELASTICSEARCH_PORT("es.port", "9300", String.class, false),

/**
* The date format to use when constructing the indices.
*
* <p>This is optional and defaults to 'yyyy.MM.dd.HH' which rolls the indices hourly.
*/
ELASTICSEARCH_DATE_FORMAT("es.date.format", "yyyy.MM.dd.HH", String.class, false),

/**
* Defines which message field, the document identifier is set to.
*
* <p>This is optional and defaults to not setting the document ID.
*/
ELASTICSEARCH_DOC_ID("es.document.id", "", String.class, false),

/**
* The class used for the Elasticsearch client.
*
* <p>This is an optional configuration.
*/
ELASTICSEARCH_CLIENT_CLASS("es.client.class", "org.elasticsearch.transport.client.PreBuiltTransportClient", String.class, false),

/**
* Defines the X-Pack username.
*
* <p>This is a required configuration.
*/
ELASTICSEARCH_XPACK_USERNAME("es.xpack.username", "", String.class, true),

/**
* Defines the path in HDFS to a file containing the X-Pack password.
*
* <p>This is a required configuration.
*/
ELASTICSEARCH_XPACK_PASSWORD_FILE("es.xpack.password.file", "", String.class, true);

/**
* The key for the configuration value.
*/
private String key;

/**
* The default value of the configuration, if none other is specified.
*/
private Object defaultValue;

/**
* The type of the configuration value.
*/
private Class<?> valueType;

/**
* If the property is required. False indicates that the property is optional.
*/
private boolean isRequired;

ElasticsearchWriterConfig(String key, Object defaultValue, Class<?> valueType, boolean isRequired) {
this.key = key;
this.defaultValue = defaultValue;
this.valueType = valueType;
}

/**
* Returns the key of the configuration value.
*/
public String getKey() {
return key;
}

/**
* Returns the default value of the configuration.
*/
public Object getDefault() {
return getDefault(valueType);
}

/**
* Returns the default value of the configuration, cast to the expected type.
*
* @param clazz The class of the expected type of the configuration value.
* @param <T> The expected type of the configuration value.
*/
public <T> T getDefault(Class<T> clazz) {
return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz);
}

/**
* Returns the configuration value from a map of configuration values.
*
* @param config A map containing configuration values.
*/
public Object get(Map<String, Object> config) {
return getOrDefault(config, defaultValue);
}

/**
* Returns true if the configuration is defined.
*
* @param config A map containing configuration values.
* @return True, if the configuration is defined. Otherwise, false.
*/
public boolean isDefined(Map<String, Object> config) {
return config != null && config.containsKey(key);
}

/**
* Returns true if the configuration is defined.
*
* @param config A map containing configuration values.
* @return True, if the configuration is defined. Otherwise, false.
*/
public boolean isStringDefined(Map<String, String> config) {
return config != null && config.containsKey(key);
}

/**
* Returns the configuration value from a map of configuration values, cast to the expected type.
*
* @param config A map containing configuration values.
*/
public <T> T get(Map<String, Object> config, Class<T> clazz) {
return getOrDefault(config, defaultValue, clazz);
}

/**
* Returns the configuration value from a map of configuration values, cast to the expected type.
*
* @param config A map containing configuration values.
*/
public String getString(Map<String, String> config) {
return getStringOrDefault(config, defaultValue);
}

/**
* Returns the configuration value from a map of configuration values. If the value is not specified,
* the default value is returned.
*
* @param config A map containing configuration values.
* @param defaultValue The default value to return, if one is not defined.
* @return The configuration value or the specified default, if one is not defined.
*/
private Object getOrDefault(Map<String, Object> config, Object defaultValue) {
return getOrDefault(config, defaultValue, valueType);
}

/**
* Returns the configuration value, cast to the expected type, from a map of configuration values.
* If the value is not specified, the default value is returned.
*
* @param config A map containing configuration values.
* @param defaultValue The default value to return, if one is not defined.
* @param clazz The class of the expected type of the configuration value.
* @param <T> The expected type of the configuration value.
* @return The configuration value or the specified default, if one is not defined.
*/
private <T> T getOrDefault(Map<String, Object> config, Object defaultValue, Class<T> clazz) {
if(isRequired && !config.containsKey(key)) {
throw new IllegalArgumentException("Missing required configuration; " + key);
}
Object value = config.getOrDefault(key, defaultValue);
return value == null ? null : ConversionUtils.convert(value, clazz);
}

/**
* Returns the configuration value, cast to the expected type, from a map of configuration values.
* If the value is not specified, the default value is returned.
*
* @param config A map containing configuration values.
* @param defaultValue The default value to return, if one is not defined.
* @return The configuration value or the specified default, if one is not defined.
*/
private String getStringOrDefault(Map<String, String> config, Object defaultValue) {
if(isRequired && !config.containsKey(key)) {
throw new IllegalArgumentException("Missing required configuration; " + key);
}
Object value = config.getOrDefault(key, defaultValue.toString());
return value == null ? null : ConversionUtils.convert(value, String.class);
}

@Override
public String toString() {
return key;
}
}
Loading