diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java index d6aa744200..d35c03076a 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java @@ -52,7 +52,11 @@ private static void help() System.out.println("-topics Accelerator - Alarm topics to be logged, they can be defined as a comma separated list"); System.out.println("-es_host localhost - elastic server host"); System.out.println("-es_port 9200 - elastic server port"); + System.out.println("-es_urls http://localhost:9200 - comma-separated list of elastic server URLs"); System.out.println("-es_sniff false - elastic server sniff feature"); + System.out.println("-es_auth_header header - elastic authorization header"); + System.out.println("-es_auth_username username - elastic username"); + System.out.println("-es_auth_password password - elastic password"); System.out.println("-bootstrap.servers localhost:9092 - Kafka server address"); System.out.println("-kafka_properties /opt/client.properties - Properties file to load kafka client settings from"); System.out.println("-properties /opt/alarm_logger.properties - Properties file to be used (instead of command line arguments)"); @@ -123,14 +127,54 @@ public static void main(final String[] original_args) throws Exception { if (!iter.hasNext()) throw new Exception("Missing -es_host hostname"); iter.remove(); + if (!properties.getProperty("es_urls", "").isEmpty()) + throw new Exception("es_host must not be specified if es_urls is specified."); properties.put("es_host",iter.next()); iter.remove(); } else if (cmd.equals("-es_port")) { if (!iter.hasNext()) throw new Exception("Missing -es_port port number"); iter.remove(); + if (!properties.getProperty("es_urls", "").isEmpty()) + throw new Exception("es_port must not be specified if es_urls is specified."); properties.put("es_port",iter.next()); iter.remove(); + } else if (cmd.equals("-es_urls")) { + if (!iter.hasNext()) + throw new Exception("Missing -es_urls URLs"); + if (!properties.getProperty("es_host", "").isEmpty()) + throw new Exception("es_urls must not be specified if es_host is specified."); + if (!properties.getProperty("es_port", "").isEmpty()) + throw new Exception("es_urls must not be specified if es_port is specified."); + iter.remove(); + properties.put("es_urls",iter.next()); + iter.remove(); + } else if (cmd.equals("-es_auth_header")) { + if (!iter.hasNext()) + throw new Exception("Missing -es_auth_header header"); + iter.remove(); + if (!properties.getProperty("es_auth_username", "").isEmpty()) + throw new Exception("es_auth_header must not be specified if es_auth_username is specified."); + if (!properties.getProperty("es_auth_password", "").isEmpty()) + throw new Exception("es_auth_header must not be specified if es_auth_password is specified."); + properties.put("es_auth_header",iter.next()); + iter.remove(); + } else if (cmd.equals("-es_auth_username")) { + if (!iter.hasNext()) + throw new Exception("Missing -es_auth_username username"); + iter.remove(); + if (!properties.getProperty("es_auth_header", "").isEmpty()) + throw new Exception("es_auth_username must not be specified if es_auth_header is specified."); + properties.put("es_auth_username",iter.next()); + iter.remove(); + } else if (cmd.equals("-es_auth_password")) { + if (!iter.hasNext()) + throw new Exception("Missing -es_auth_password password"); + iter.remove(); + if (!properties.getProperty("es_auth_header", "").isEmpty()) + throw new Exception("es_auth_password must not be specified if es_auth_header is specified."); + properties.put("es_auth_password",iter.next()); + iter.remove(); } else if (cmd.equals("-es_sniff")) { if (!iter.hasNext()) throw new Exception("Missing -es_sniff sniff feature true/false"); diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/ElasticClientHelper.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/ElasticClientHelper.java index af8b1b147f..c4230bdc04 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/ElasticClientHelper.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/ElasticClientHelper.java @@ -15,7 +15,12 @@ import co.elastic.clients.transport.rest_client.RestClientTransport; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.http.Header; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.sniff.Sniffer; import org.phoebus.applications.alarm.messages.AlarmCommandMessage; @@ -36,6 +41,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import static org.phoebus.alarm.logging.AlarmLoggingService.logger; @@ -53,7 +59,7 @@ public class ElasticClientHelper { private static ElasticsearchTransport transport; private static ElasticsearchClient client; - private static ElasticClientHelper instance; + private static AtomicReference instance = new AtomicReference<>(); private static Sniffer sniffer; private static final AtomicBoolean esInitialized = new AtomicBoolean(); @@ -85,8 +91,37 @@ private ElasticClientHelper() { })); // Create the low-level client - restClient = RestClient.builder( - new HttpHost(props.getProperty("es_host"), Integer.parseInt(props.getProperty("es_port")))).build(); + final var esHost = props.getProperty("es_host", ""); + final var esPort = props.getProperty("es_port", ""); + final var esUrls = props.getProperty("es_urls", ""); + HttpHost[] esHttpHosts; + if (esUrls.isEmpty()) { + final var http_host = new HttpHost( + esHost.isEmpty() ? "localhost" : esHost, + esPort.isEmpty() ? 9200 : Integer.parseInt(esPort)); + esHttpHosts = new HttpHost[] {http_host}; + } else { + if (!esHost.isEmpty() || !esPort.isEmpty()) { + logger.warning("Only one of es_urls or es_host and es_port can be specified, ignoring es_host and es_port."); + } + esHttpHosts = Arrays.stream(esUrls.split(",")).map(HttpHost::create).toArray(HttpHost[]::new); + } + final var esAuthHeader = props.getProperty("es_auth_header", ""); + final var esAuthUsername = props.getProperty("es_auth_username", ""); + final var esAuthPassword = props.getProperty("es_auth_password", ""); + final var restClientBuilder = RestClient.builder(esHttpHosts); + if (!esAuthHeader.isEmpty()) { + if (!esAuthUsername.isEmpty() || !esAuthPassword.isEmpty()) { + logger.warning("Only one of es_auth_header or es_auth_username and es_auth_password can be specified. Ignoring es_auth_username and es_auth_password."); + } + restClientBuilder.setDefaultHeaders( + new Header[] {new BasicHeader("Authorization", esAuthHeader)}); + } else if (!esAuthUsername.isEmpty() || !esAuthPassword.isEmpty()) { + final var credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esAuthUsername, esAuthPassword)); + restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + restClient = restClientBuilder.build(); mapper.registerModule(new JavaTimeModule()); transport = new RestClientTransport( @@ -118,10 +153,26 @@ private ElasticClientHelper() { } public static ElasticClientHelper getInstance() { - if (instance == null) { - instance = new ElasticClientHelper(); + var helper = instance.get(); + if (helper == null) { + // The helper instance is associated with static resources, so we + // want to be certain that it is never created twice. In order to + // ensure this, we have to create it inside a synchronized block, + // but we only do this if we expect that there is no instance yet. + // This looks like the double-checked-locking anti-pattern, but it + // is not an anti-pattern here, because instance is an atomic + // reference, so getting the value establishes a happens-before + // relationship, and we can be sure that we won’t retrieve an + // uninitialized object. + synchronized (instance) { + helper = instance.get(); + if (helper == null) { + helper = new ElasticClientHelper(); + instance.set(helper); + } + } } - return instance; + return helper; } public ElasticsearchClient getClient() { diff --git a/services/alarm-logger/src/main/resources/application.properties b/services/alarm-logger/src/main/resources/application.properties index e5da002c53..6993a94bc7 100644 --- a/services/alarm-logger/src/main/resources/application.properties +++ b/services/alarm-logger/src/main/resources/application.properties @@ -16,8 +16,18 @@ logging.level.root=WARN alarm_topics=Accelerator # Location of elastic node/s -es_host=localhost -es_port=9200 +# +# Either es_host and es_port or es_urls can be specified. +# If neither is specified, the URL http://localhost:9200 is used. +es_host= +es_port= +# Comma-separated list of elastic node URLs. All nodes must belong to the same cluster. +es_urls= +# Authorization header sent with elastic requests +es_auth_header= +# Username and password sent with elastic requests. Cannot be combined with es_auth_header. +es_auth_username= +es_auth_password= # Max default size for es queries es_max_size=1000 # Set to 'true' if sniffing to be enabled to discover other cluster nodes