Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ private static void help()
System.out.println("-topics Accelerator - Alarm topics to be logged, they can be defined as a comma separated list");
System.out.println("-es_host localhost - elastic server host");
System.out.println("-es_port 9200 - elastic server port");
System.out.println("-es_urls http://localhost:9200 - comma-separated list of elastic server URLs");
System.out.println("-es_sniff false - elastic server sniff feature");
System.out.println("-es_auth_header header - elastic authorization header");
System.out.println("-es_auth_username username - elastic username");
System.out.println("-es_auth_password password - elastic password");
System.out.println("-bootstrap.servers localhost:9092 - Kafka server address");
System.out.println("-kafka_properties /opt/client.properties - Properties file to load kafka client settings from");
System.out.println("-properties /opt/alarm_logger.properties - Properties file to be used (instead of command line arguments)");
Expand Down Expand Up @@ -123,14 +127,54 @@ public static void main(final String[] original_args) throws Exception {
if (!iter.hasNext())
throw new Exception("Missing -es_host hostname");
iter.remove();
if (!properties.getProperty("es_urls", "").isEmpty())
throw new Exception("es_host must not be specified if es_urls is specified.");
properties.put("es_host",iter.next());
iter.remove();
} else if (cmd.equals("-es_port")) {
if (!iter.hasNext())
throw new Exception("Missing -es_port port number");
iter.remove();
if (!properties.getProperty("es_urls", "").isEmpty())
throw new Exception("es_port must not be specified if es_urls is specified.");
properties.put("es_port",iter.next());
iter.remove();
} else if (cmd.equals("-es_urls")) {
if (!iter.hasNext())
throw new Exception("Missing -es_urls URLs");
if (!properties.getProperty("es_host", "").isEmpty())
throw new Exception("es_urls must not be specified if es_host is specified.");
if (!properties.getProperty("es_port", "").isEmpty())
throw new Exception("es_urls must not be specified if es_port is specified.");
iter.remove();
properties.put("es_urls",iter.next());
iter.remove();
} else if (cmd.equals("-es_auth_header")) {
if (!iter.hasNext())
throw new Exception("Missing -es_auth_header header");
iter.remove();
if (!properties.getProperty("es_auth_username", "").isEmpty())
throw new Exception("es_auth_header must not be specified if es_auth_username is specified.");
if (!properties.getProperty("es_auth_password", "").isEmpty())
throw new Exception("es_auth_header must not be specified if es_auth_password is specified.");
properties.put("es_auth_header",iter.next());
iter.remove();
} else if (cmd.equals("-es_auth_username")) {
if (!iter.hasNext())
throw new Exception("Missing -es_auth_username username");
iter.remove();
if (!properties.getProperty("es_auth_header", "").isEmpty())
throw new Exception("es_auth_username must not be specified if es_auth_header is specified.");
properties.put("es_auth_username",iter.next());
iter.remove();
} else if (cmd.equals("-es_auth_password")) {
if (!iter.hasNext())
throw new Exception("Missing -es_auth_password password");
iter.remove();
if (!properties.getProperty("es_auth_header", "").isEmpty())
throw new Exception("es_auth_password must not be specified if es_auth_header is specified.");
properties.put("es_auth_password",iter.next());
iter.remove();
} else if (cmd.equals("-es_sniff")) {
if (!iter.hasNext())
throw new Exception("Missing -es_sniff sniff feature true/false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.sniff.Sniffer;
import org.phoebus.applications.alarm.messages.AlarmCommandMessage;
Expand All @@ -36,6 +41,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;

import static org.phoebus.alarm.logging.AlarmLoggingService.logger;
Expand All @@ -53,7 +59,7 @@ public class ElasticClientHelper {
private static ElasticsearchTransport transport;

private static ElasticsearchClient client;
private static ElasticClientHelper instance;
private static AtomicReference<ElasticClientHelper> instance = new AtomicReference<>();
private static Sniffer sniffer;

private static final AtomicBoolean esInitialized = new AtomicBoolean();
Expand Down Expand Up @@ -85,8 +91,37 @@ private ElasticClientHelper() {
}));

// Create the low-level client
restClient = RestClient.builder(
new HttpHost(props.getProperty("es_host"), Integer.parseInt(props.getProperty("es_port")))).build();
final var esHost = props.getProperty("es_host", "");
final var esPort = props.getProperty("es_port", "");
final var esUrls = props.getProperty("es_urls", "");
HttpHost[] esHttpHosts;
if (esUrls.isEmpty()) {
final var http_host = new HttpHost(
esHost.isEmpty() ? "localhost" : esHost,
esPort.isEmpty() ? 9200 : Integer.parseInt(esPort));
esHttpHosts = new HttpHost[] {http_host};
} else {
if (!esHost.isEmpty() || !esPort.isEmpty()) {
logger.warning("Only one of es_urls or es_host and es_port can be specified, ignoring es_host and es_port.");
}
esHttpHosts = Arrays.stream(esUrls.split(",")).map(HttpHost::create).toArray(HttpHost[]::new);
}
final var esAuthHeader = props.getProperty("es_auth_header", "");
final var esAuthUsername = props.getProperty("es_auth_username", "");
final var esAuthPassword = props.getProperty("es_auth_password", "");
final var restClientBuilder = RestClient.builder(esHttpHosts);
if (!esAuthHeader.isEmpty()) {
if (!esAuthUsername.isEmpty() || !esAuthPassword.isEmpty()) {
logger.warning("Only one of es_auth_header or es_auth_username and es_auth_password can be specified. Ignoring es_auth_username and es_auth_password.");
}
restClientBuilder.setDefaultHeaders(
new Header[] {new BasicHeader("Authorization", esAuthHeader)});
} else if (!esAuthUsername.isEmpty() || !esAuthPassword.isEmpty()) {
final var credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esAuthUsername, esAuthPassword));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
restClient = restClientBuilder.build();

mapper.registerModule(new JavaTimeModule());
transport = new RestClientTransport(
Expand Down Expand Up @@ -118,10 +153,26 @@ private ElasticClientHelper() {
}

public static ElasticClientHelper getInstance() {
if (instance == null) {
instance = new ElasticClientHelper();
var helper = instance.get();
if (helper == null) {
// The helper instance is associated with static resources, so we
// want to be certain that it is never created twice. In order to
// ensure this, we have to create it inside a synchronized block,
// but we only do this if we expect that there is no instance yet.
// This looks like the double-checked-locking anti-pattern, but it
// is not an anti-pattern here, because instance is an atomic
// reference, so getting the value establishes a happens-before
// relationship, and we can be sure that we won’t retrieve an
// uninitialized object.
synchronized (instance) {
helper = instance.get();
if (helper == null) {
helper = new ElasticClientHelper();
instance.set(helper);
}
}
}
return instance;
return helper;
}

public ElasticsearchClient getClient() {
Expand Down
14 changes: 12 additions & 2 deletions services/alarm-logger/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ logging.level.root=WARN
alarm_topics=Accelerator

# Location of elastic node/s
es_host=localhost
es_port=9200
#
# Either es_host and es_port or es_urls can be specified.
# If neither is specified, the URL http://localhost:9200 is used.
es_host=
es_port=
# Comma-separated list of elastic node URLs. All nodes must belong to the same cluster.
es_urls=
# Authorization header sent with elastic requests
es_auth_header=
# Username and password sent with elastic requests. Cannot be combined with es_auth_header.
es_auth_username=
es_auth_password=
# Max default size for es queries
es_max_size=1000
# Set to 'true' if sniffing to be enabled to discover other cluster nodes
Expand Down