diff --git a/docs/modules/operation/pages/deep-dive/kafka-producer/enable-kafka.adoc b/docs/modules/operation/pages/deep-dive/kafka-producer/enable-kafka.adoc index 37c84a5a9acc..1b9f06d1c16e 100644 --- a/docs/modules/operation/pages/deep-dive/kafka-producer/enable-kafka.adoc +++ b/docs/modules/operation/pages/deep-dive/kafka-producer/enable-kafka.adoc @@ -23,6 +23,58 @@ config:update <1> Set the Kafka server IP or hostname by replacing the kafka-server-ip and port of an existing Kafka cluster that you want to connect to. Add multiple hosts in a comma-separated list; for example, `kafka-server-1:9092,kafka-server-2:9092` + +== Per-Topic Kafka Configuration +For advanced deployments, you can configure different Kafka clusters for different types of messages. This allows you to send events to one set of Kafka servers, alarms to another set, and metrics to yet another set. +The following configuration PIDs are available for per-topic configuration: + +|=== +| Message Type | Configuration PID + +| Events +| `org.opennms.features.kafka.producer.client.events` + +| Alarms +| `org.opennms.features.kafka.producer.client.alarms` + +| Metrics +| `org.opennms.features.kafka.producer.client.metrics` + +| Nodes +| `org.opennms.features.kafka.producer.client.nodes` + +| Topology (vertices and edges) +| `org.opennms.features.kafka.producer.client.topology` + +| Alarm Feedback +| `org.opennms.features.kafka.producer.client.alarmFeedback` +|=== + +If a topic-specific PID configuration does not exist or does not have `bootstrap.servers` configured, the system will fall back to the global configuration (`org.opennms.features.kafka.producer.client`). + +.Configure events to go to a specific Kafka cluster +[source, karaf] +---- +config:edit org.opennms.features.kafka.producer.client.events +config:property-set bootstrap.servers events-kafka:9092 +config:update +---- +.Configure alarms to go to a different Kafka cluster +[source, karaf] +---- +config:edit org.opennms.features.kafka.producer.client.alarms +config:property-set bootstrap.servers alarms-kafka:9092 +config:update +---- + +.Configure metrics to use a dedicated Kafka cluster +[source, karaf] +---- +config:edit org.opennms.features.kafka.producer.client.metrics +config:property-set bootstrap.servers metrics-kafka:9092 +config:update +---- + Next, install the `opennms-kafka-producer` feature from that same shell using: [source, karaf] @@ -35,3 +87,5 @@ To ensure that the feature continues to be installed on subsequent restarts, add ---- echo "opennms-kafka-producer" | sudo tee ${OPENNMS_HOME}/etc/featuresBoot.d/kafka-producer.boot ---- + +NOTE: Whenever configuration is updated at runtime, the feature must be stopped and restarted. \ No newline at end of file diff --git a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/KafkaProducerManager.java b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/KafkaProducerManager.java new file mode 100644 index 000000000000..3e848cc8bf68 --- /dev/null +++ b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/KafkaProducerManager.java @@ -0,0 +1,256 @@ +/* + * Licensed to The OpenNMS Group, Inc (TOG) under one or more + * contributor license agreements. See the LICENSE.md file + * distributed with this work for additional information + * regarding copyright ownership. + * + * TOG licenses this file to You under the GNU Affero General + * Public License Version 3 (the "License") or (at your option) + * any later version. You may not use this file except in + * compliance with the License. You may obtain a copy of the + * License at: + * + * https://www.gnu.org/licenses/agpl-3.0.txt + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. + */ +package org.opennms.features.kafka.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.opennms.core.ipc.common.kafka.Utils; +import org.osgi.service.cm.ConfigurationAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.Map; +import java.util.Dictionary; +import java.util.Properties; +import java.util.Enumeration; +import java.util.concurrent.ConcurrentHashMap; + +public class KafkaProducerManager { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerManager.class); + + public static final String GLOBAL_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client"; + public static final String EVENTS_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.events"; + public static final String ALARMS_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.alarms"; + public static final String METRICS_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.metrics"; + public static final String NODES_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.nodes"; + public static final String TOPOLOGY_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.topology"; + public static final String ALARM_FEEDBACK_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.alarmFeedback"; + public static final String BOOTSTRAP_SERVER= "bootstrap.servers"; + public enum MessageType { + EVENT, + ALARM, + NODE, + METRIC, + TOPOLOGY_VERTEX, + TOPOLOGY_EDGE, + ALARM_FEEDBACK + } + + private final ConfigurationAdmin configAdmin; + private final Map> messageTypeToProducerMap = new ConcurrentHashMap<>(); + private final Map> pidToProducerMap = new ConcurrentHashMap<>(); + + public KafkaProducerManager(ConfigurationAdmin configAdmin) { + this.configAdmin = Objects.requireNonNull(configAdmin); + } + + public void init() { + LOG.info("Initializing KafkaProducerManager"); + for (MessageType messageType : MessageType.values()) { + try { + getProducerForMessageType(messageType); + LOG.debug("Successfully initialized producer for message type: {}", messageType); + } catch (Exception e) { + LOG.warn("Failed to initialize producer for message type: {}. It will be initialized lazily.", messageType, e); + } + } + } + + public void destroy() { + LOG.info("Destroying KafkaProducerManager"); + pidToProducerMap.values().forEach(producer -> { + try { + producer.close(); + } catch (Exception e) { + LOG.warn("Error closing Kafka producer", e); + } + }); + pidToProducerMap.clear(); + messageTypeToProducerMap.clear(); + } + + public Producer getProducerForMessageType(MessageType messageType) { + return messageTypeToProducerMap.computeIfAbsent(messageType, type -> { + String effectivePid = getEffectivePidForMessageType(type); + if (effectivePid == null) { + // Return a dummy producer that does nothing + return new NoOpProducer(); + } + return getOrCreateProducerForPid(effectivePid); + }); + } + + private boolean hasValidConfiguration(String pid) { + try { + var config = configAdmin.getConfiguration(pid); + if (config != null && config.getProperties() != null && !config.getProperties().isEmpty()) { + Dictionary properties = config.getProperties(); + return properties.get(BOOTSTRAP_SERVER) != null; + } + return false; + } catch (IOException e) { + LOG.warn("Failed to check configuration for PID: {}", pid, e); + return false; + } + } + + private String getEffectivePidForMessageType(MessageType messageType) { + String topicSpecificPid = getPidForMessageType(messageType); + if (hasValidConfiguration(topicSpecificPid)) { + LOG.debug("Using topic-specific configuration for {}: {}", messageType, topicSpecificPid); + return topicSpecificPid; + } + + if (hasValidConfiguration(GLOBAL_KAFKA_CLIENT_PID)) { + LOG.debug("Falling back to global configuration for {}", messageType); + return GLOBAL_KAFKA_CLIENT_PID; + } + + LOG.debug("No configuration available for message type: {}", messageType); + + return null; + } + + private String getPidForMessageType(MessageType messageType) { + switch (messageType) { + case EVENT: + return EVENTS_KAFKA_CLIENT_PID; + case ALARM: + return ALARMS_KAFKA_CLIENT_PID; + case NODE: + return NODES_KAFKA_CLIENT_PID; + case METRIC: + return METRICS_KAFKA_CLIENT_PID; + case TOPOLOGY_VERTEX: + case TOPOLOGY_EDGE: + return TOPOLOGY_KAFKA_CLIENT_PID; + case ALARM_FEEDBACK: + return ALARM_FEEDBACK_KAFKA_CLIENT_PID; + default: + return GLOBAL_KAFKA_CLIENT_PID; + } + } + + private String determinePidForMessageType(MessageType messageType) { + switch (messageType) { + case EVENT: + return getEffectivePid(EVENTS_KAFKA_CLIENT_PID); + case ALARM: + return getEffectivePid(ALARMS_KAFKA_CLIENT_PID); + case NODE: + return getEffectivePid(NODES_KAFKA_CLIENT_PID); + case METRIC: + return getEffectivePid(METRICS_KAFKA_CLIENT_PID); + case TOPOLOGY_VERTEX: + case TOPOLOGY_EDGE: + return getEffectivePid(TOPOLOGY_KAFKA_CLIENT_PID); + case ALARM_FEEDBACK: + return getEffectivePid(ALARM_FEEDBACK_KAFKA_CLIENT_PID); + default: + return GLOBAL_KAFKA_CLIENT_PID; + } + } + + private String getEffectivePid(String topicSpecificPid) { + try { + var config = configAdmin.getConfiguration(topicSpecificPid); + if (config != null && config.getProperties() != null && !config.getProperties().isEmpty()) { + Dictionary properties = config.getProperties(); + if (properties.get(BOOTSTRAP_SERVER) != null) { + LOG.debug("bootstrap.server found for PID: {}", topicSpecificPid); + return topicSpecificPid; + } + } + } catch (IOException e) { + LOG.warn("Failed to check configuration for PID: {}", topicSpecificPid, e); + } + LOG.debug("Falling back to global configuration for PID: {}", GLOBAL_KAFKA_CLIENT_PID); + return GLOBAL_KAFKA_CLIENT_PID; + } + + private Producer getOrCreateProducerForPid(String pid) { + return pidToProducerMap.computeIfAbsent(pid, this::initializeProducerForPid); + } + + private Producer initializeProducerForPid(String pid) { + try { + final Properties producerConfig = getConfigurationForPid(pid); + + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + + + LOG.info("Creating Kafka producer for PID: {} with bootstrap.servers: {}", + pid, producerConfig.getProperty(BOOTSTRAP_SERVER, "not configured")); + + return Utils.runWithGivenClassLoader(() -> new KafkaProducer<>(producerConfig), + KafkaProducer.class.getClassLoader()); + + } catch (Exception e) { + LOG.error("Failed to create Kafka producer for PID: {}", pid, e); + throw new RuntimeException("Failed to create Kafka producer for PID: " + pid, e); + } + } + + public Properties getConfigurationForMessageType(MessageType messageType) { + String pid = determinePidForMessageType(messageType); + return getConfigurationForPid(pid); + } + + public Properties getConfigurationForPid(String pid) { + try { + final Properties config = new Properties(); + final Dictionary properties = configAdmin.getConfiguration(pid).getProperties(); + + if (properties != null) { + final Enumeration keys = properties.keys(); + while (keys.hasMoreElements()) { + final String key = keys.nextElement(); + Object value = properties.get(key); + if (value != null) { + config.put(key, value); + } + } + } + return config; + } catch (IOException e) { + LOG.warn("Failed to load configuration for PID: {}, using empty properties", pid, e); + return new Properties(); + } + } + + public boolean hasConfigurationForMessageType(MessageType messageType) { + String effectivePid = getEffectivePidForMessageType(messageType); + return effectivePid != null && hasValidConfiguration(effectivePid); + } + + public ConfigurationAdmin getConfigAdmin() { + return configAdmin; + } +} + + diff --git a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/NoOpProducer.java b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/NoOpProducer.java new file mode 100644 index 000000000000..11616d0cd7b0 --- /dev/null +++ b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/NoOpProducer.java @@ -0,0 +1,134 @@ +/* + * Licensed to The OpenNMS Group, Inc (TOG) under one or more + * contributor license agreements. See the LICENSE.md file + * distributed with this work for additional information + * regarding copyright ownership. + * + * TOG licenses this file to You under the GNU Affero General + * Public License Version 3 (the "License") or (at your option) + * any later version. You may not use this file except in + * compliance with the License. You may obtain a copy of the + * License at: + * + * https://www.gnu.org/licenses/agpl-3.0.txt + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. + */ + +package org.opennms.features.kafka.producer; + +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +public class NoOpProducer implements Producer { + private static final Logger LOG = LoggerFactory.getLogger(NoOpProducer.class); + + @Override + public Future send(ProducerRecord record) { + return send(record, null); + } + + @Override + public Future send(ProducerRecord record, Callback callback) { + LOG.debug("No-op producer: Not sending message to topic {}", record.topic()); + + if (callback != null) { + // Create a dummy RecordMetadata + RecordMetadata dummyMetadata = new RecordMetadata( + new TopicPartition(record.topic(), -1), + -1L, + -1L, + System.currentTimeMillis(), + -1L, + -1, + -1 + ); + callback.onCompletion(dummyMetadata, null); + } + + CompletableFuture future = new CompletableFuture<>(); + RecordMetadata dummyMetadata = new RecordMetadata( + new TopicPartition(record.topic(), -1), + -1L, + -1L, + System.currentTimeMillis(), + -1L, + -1, + -1 + ); + future.complete(dummyMetadata); + return future; + } + + @Override + public void flush() {} + + @Override + public void close() {} + + @Override + public void close(Duration duration) {} + + @Override + public Map metrics() { + return Collections.emptyMap(); + } + + @Override + public List partitionsFor(String topic) { + return Collections.emptyList(); + } + + @Override + public void initTransactions() { + throw new UnsupportedOperationException("No-op producer does not support transactions"); + } + + @Override + public void beginTransaction() { + throw new UnsupportedOperationException("No-op producer does not support transactions"); + } + + @Override + public void sendOffsetsToTransaction(Map map, String s) throws ProducerFencedException { + throw new UnsupportedOperationException("No-op producer does not support transactions"); + } + + @Override + public void sendOffsetsToTransaction(Map map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException { + throw new UnsupportedOperationException("No-op producer does not support transactions"); + } + + @Override + public void commitTransaction() { + throw new UnsupportedOperationException("No-op producer does not support transactions"); + } + + @Override + public void abortTransaction() { + throw new UnsupportedOperationException("No-op producer does not support transactions"); + } +} diff --git a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/OpennmsKafkaProducer.java b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/OpennmsKafkaProducer.java index 736012a9eb74..e8203d808568 100644 --- a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/OpennmsKafkaProducer.java +++ b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/OpennmsKafkaProducer.java @@ -24,12 +24,9 @@ import java.io.IOException; import java.time.Duration; import java.util.Collections; -import java.util.Dictionary; -import java.util.Enumeration; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.Callable; @@ -42,12 +39,10 @@ import java.util.function.Consumer; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.opennms.core.ipc.common.kafka.Utils; import org.opennms.features.kafka.producer.datasync.KafkaAlarmDataSync; import org.opennms.features.kafka.producer.model.OpennmsModelProtos; import org.opennms.features.situationfeedback.api.AlarmFeedback; @@ -68,7 +63,6 @@ import org.opennms.netmgt.topologies.service.api.OnmsTopologyVertex; import org.opennms.netmgt.topologies.service.api.TopologyVisitor; import org.opennms.netmgt.xml.event.Event; -import org.osgi.service.cm.ConfigurationAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.expression.Expression; @@ -94,7 +88,7 @@ public class OpennmsKafkaProducer implements AlarmLifecycleListener, EventListen private final ProtobufMapper protobufMapper; private final NodeCache nodeCache; - private final ConfigurationAdmin configAdmin; + private final KafkaProducerManager kafkaProducerManager; private final EventSubscriptionService eventSubscriptionService; private KafkaAlarmDataSync dataSync; @@ -120,7 +114,6 @@ public class OpennmsKafkaProducer implements AlarmLifecycleListener, EventListen private final CountDownLatch forwardedTopologyVertexMessage = new CountDownLatch(1); private final CountDownLatch forwardedTopologyEdgeMessage = new CountDownLatch(1); - private KafkaProducer producer; private final Map outstandingAlarms = new ConcurrentHashMap<>(); private final AlarmEqualityChecker alarmEqualityChecker = @@ -137,33 +130,19 @@ public class OpennmsKafkaProducer implements AlarmLifecycleListener, EventListen private String encoding = "UTF8"; private int numEventListenerThreads = 4; - public OpennmsKafkaProducer(ProtobufMapper protobufMapper, NodeCache nodeCache, - ConfigurationAdmin configAdmin, EventSubscriptionService eventSubscriptionService, + public OpennmsKafkaProducer(ProtobufMapper protobufMapper, NodeCache nodeCache, KafkaProducerManager kafkaProducerManager, + EventSubscriptionService eventSubscriptionService, OnmsTopologyDao topologyDao, int nodeAsyncUpdateThreads) { this.protobufMapper = Objects.requireNonNull(protobufMapper); this.nodeCache = Objects.requireNonNull(nodeCache); - this.configAdmin = Objects.requireNonNull(configAdmin); + this.kafkaProducerManager = Objects.requireNonNull(kafkaProducerManager); this.eventSubscriptionService = Objects.requireNonNull(eventSubscriptionService); this.topologyDao = Objects.requireNonNull(topologyDao); this.nodeUpdateExecutor = Executors.newFixedThreadPool(nodeAsyncUpdateThreads, nodeUpdateThreadFactory); } public void init() throws IOException { - // Create the Kafka producer - final Properties producerConfig = new Properties(); - final Dictionary properties = configAdmin.getConfiguration(KAFKA_CLIENT_PID).getProperties(); - if (properties != null) { - final Enumeration keys = properties.keys(); - while (keys.hasMoreElements()) { - final String key = keys.nextElement(); - producerConfig.put(key, properties.get(key)); - } - } - // Overwrite the serializers, since we rely on these - producerConfig.put("key.serializer", ByteArraySerializer.class.getCanonicalName()); - producerConfig.put("value.serializer", ByteArraySerializer.class.getCanonicalName()); - // Class-loader hack for accessing the kafka classes when initializing producer. - producer = Utils.runWithGivenClassLoader(() -> new KafkaProducer<>(producerConfig), KafkaProducer.class.getClassLoader()); + // Start processing records that have been queued for sending if (kafkaSendQueueCapacity <= 0) { kafkaSendQueueCapacity = 1000; @@ -184,10 +163,7 @@ public void destroy() { kafkaSendQueueExecutor.shutdownNow(); nodeUpdateExecutor.shutdownNow(); - if (producer != null) { - producer.close(); - producer = null; - } + kafkaProducerManager.destroy(); if (forwardEvents) { eventSubscriptionService.removeEventListener(this); @@ -218,7 +194,7 @@ private void forwardTopologyMessage(OnmsTopologyMessage message) { } private void forwardTopologyEdgeMessage(byte[] refid, byte[] message) { - sendRecord(() -> { + sendRecord(KafkaProducerManager.MessageType.TOPOLOGY_EDGE,() -> { return new ProducerRecord<>(topologyEdgeTopic, refid, message); }, recordMetadata -> { // We've got an ACK from the server that the event was forwarded @@ -253,7 +229,7 @@ private void forwardEvent(Event event) { } // Forward! - sendRecord(() -> { + sendRecord(KafkaProducerManager.MessageType.EVENT,() -> { final OpennmsModelProtos.Event mappedEvent = protobufMapper.toEvent(event).build(); LOG.debug("Sending event with UEI: {}", mappedEvent.getUei()); return new ProducerRecord<>(eventTopic, mappedEvent.toByteArray()); @@ -304,7 +280,7 @@ private void updateAlarm(String reductionKey, OnmsAlarm alarm) { outstandingAlarms.remove(reductionKey); // The alarm was deleted, push a null record to the reduction key - sendRecord(() -> { + sendRecord(KafkaProducerManager.MessageType.ALARM,() -> { LOG.debug("Deleting alarm with reduction key: {}", reductionKey); return new ProducerRecord<>(alarmTopic, reductionKey.getBytes(encoding), null); }, recordMetadata -> { @@ -330,7 +306,7 @@ private void updateAlarm(String reductionKey, OnmsAlarm alarm) { } // Forward! - sendRecord(() -> { + sendRecord(KafkaProducerManager.MessageType.ALARM,() -> { final OpennmsModelProtos.Alarm mappedAlarm = protobufMapper.toAlarm(alarm).build(); LOG.debug("Sending alarm with reduction key: {}", reductionKey); return new ProducerRecord<>(alarmTopic, reductionKey.getBytes(encoding), mappedAlarm.toByteArray()); @@ -362,14 +338,14 @@ private void maybeUpdateNode(long nodeId) { if (node == null) { // The node was deleted, push a null record - sendRecord(() -> { + sendRecord(KafkaProducerManager.MessageType.NODE,() -> { LOG.debug("Deleting node with criteria: {}", nodeCriteria); return new ProducerRecord<>(nodeTopic, nodeCriteria.getBytes(encoding), null); }); return; } - sendRecord(() -> { + sendRecord(KafkaProducerManager.MessageType.NODE,() -> { final OpennmsModelProtos.Node mappedNode = protobufMapper.toNode(node).build(); LOG.debug("Sending node with criteria: {}", nodeCriteria); return new ProducerRecord<>(nodeTopic, nodeCriteria.getBytes(encoding), mappedNode.toByteArray()); @@ -381,12 +357,17 @@ private void maybeUpdateNode(long nodeId) { }); } - private void sendRecord(Callable> callable) { - sendRecord(callable, null); + private void sendRecord(KafkaProducerManager.MessageType messageType, Callable> callable) { + sendRecord(messageType,callable, null); } - private void sendRecord(Callable> callable, Consumer callback) { - if (producer == null) { + private void sendRecord(KafkaProducerManager.MessageType messageType, Callable> callable, Consumer callback) { + if (kafkaProducerManager == null) { + return; + } + + if (!kafkaProducerManager.hasConfigurationForMessageType(messageType)) { + LOG.debug("Skipping message of type {} as no Kafka configuration is available", messageType); return; } @@ -398,12 +379,20 @@ record = callable.call(); throw new RuntimeException(e); } + // Get the appropriate producer for the message type + Producer topicProducer = kafkaProducerManager.getProducerForMessageType(messageType); + + if (topicProducer == null) { + LOG.debug("No producer available for message type: {}", messageType); + return; + } + // Rather than attempt to send, we instead queue the record to avoid blocking since KafkaProducer's send() // method can block if Kafka is not available when metadata is attempted to be retrieved // Any offer that fails due to capacity overflow will simply be dropped and will have to wait until the next // sync to be processed so this is just a best effort attempt - if (!kafkaSendDeque.offer(new KafkaRecord(record, callback))) { + if (!kafkaSendDeque.offer(new KafkaRecord(record, callback, topicProducer))) { RATE_LIMITED_LOGGER.warn("Dropped a Kafka record due to queue capacity being full."); } } @@ -415,7 +404,7 @@ private void processKafkaSendQueue() { KafkaRecord kafkaRecord = kafkaSendDeque.take(); ProducerRecord producerRecord = kafkaRecord.getProducerRecord(); Consumer consumer = kafkaRecord.getConsumer(); - + Producer producer = kafkaRecord.getProducer(); try { producer.send(producerRecord, (recordMetadata, e) -> { if (e != null) { @@ -564,7 +553,7 @@ public void handleAlarmFeedback(List alarmFeedback) { } // NOTE: This will currently block while waiting for Kafka metadata if Kafka is not available. - alarmFeedback.forEach(feedback -> sendRecord(() -> { + alarmFeedback.forEach(feedback -> sendRecord(KafkaProducerManager.MessageType.ALARM_FEEDBACK,() -> { LOG.debug("Sending alarm feedback with key: {}", feedback.getAlarmKey()); return new ProducerRecord<>(alarmFeedbackTopic, feedback.getAlarmKey().getBytes(encoding), @@ -621,10 +610,14 @@ public int getNumThreads() { private static final class KafkaRecord { private final ProducerRecord producerRecord; private final Consumer consumer; + private final Producer producer; - KafkaRecord(ProducerRecord producerRecord, Consumer consumer) { + KafkaRecord(ProducerRecord producerRecord, + Consumer consumer, + Producer producer) { this.producerRecord = producerRecord; this.consumer = consumer; + this.producer = producer; } ProducerRecord getProducerRecord() { @@ -634,6 +627,10 @@ ProducerRecord getProducerRecord() { Consumer getConsumer() { return consumer; } + + Producer getProducer() { + return producer; + } } public CountDownLatch getForwardedTopologyVertexMessage() { diff --git a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersister.java b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersister.java index 40b27b3ad09a..4a54f4020ea3 100644 --- a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersister.java +++ b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersister.java @@ -24,6 +24,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Iterables; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.opennms.features.kafka.producer.model.CollectionSetProtos; import org.opennms.features.kafka.producer.model.CollectionSetProtos.CollectionSetResource; @@ -50,7 +51,7 @@ public class KafkaPersister implements Persister { private final ServiceParameters m_params; - private KafkaProducer producer; + private Producer producer; private String topicName = "metrics"; @@ -210,7 +211,7 @@ public void setTopicName(String topicName) { } } - public void setProducer(KafkaProducer producer) { + public void setProducer(Producer producer) { this.producer = producer; } diff --git a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersisterFactory.java b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersisterFactory.java index 6aa912888547..bbf3fd4f4f22 100644 --- a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersisterFactory.java +++ b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/collection/KafkaPersisterFactory.java @@ -22,10 +22,13 @@ package org.opennms.features.kafka.producer.collection; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.opennms.core.ipc.common.kafka.Utils; +import org.opennms.features.kafka.producer.KafkaProducerManager; +import org.opennms.features.kafka.producer.NoOpProducer; import org.opennms.features.kafka.producer.OpennmsKafkaProducer; import org.opennms.netmgt.collection.api.Persister; import org.opennms.netmgt.collection.api.PersisterFactory; @@ -39,13 +42,14 @@ import java.util.Dictionary; import java.util.Enumeration; import java.util.Properties; +import static org.opennms.features.kafka.producer.KafkaProducerManager.BOOTSTRAP_SERVER; public class KafkaPersisterFactory implements PersisterFactory { private static final Logger LOG = LoggerFactory.getLogger(KafkaPersisterFactory.class); private CollectionSetMapper collectionSetMapper; - private KafkaProducer producer; + private Producer producer; private ConfigurationAdmin configAdmin; private String topicName; private boolean disableMetricsSplitting = false; @@ -67,23 +71,44 @@ public Persister createPersister(ServiceParameters params, RrdRepository reposit } public void init() throws IOException { - // Create the Kafka producer + String metricsPid = KafkaProducerManager.METRICS_KAFKA_CLIENT_PID; + String globalPid = OpennmsKafkaProducer.KAFKA_CLIENT_PID; + final Properties producerConfig = new Properties(); - final Dictionary properties = configAdmin - .getConfiguration(OpennmsKafkaProducer.KAFKA_CLIENT_PID).getProperties(); - if (properties != null) { + Dictionary properties = configAdmin.getConfiguration(metricsPid).getProperties(); + if (hasValidConfiguration(properties)) { + LOG.info("Using metrics-specific Kafka configuration"); + } else { + LOG.debug("Metrics-specific config not valid, falling back to global"); + properties = configAdmin.getConfiguration(globalPid).getProperties(); + } + + // Load properties + if (hasValidConfiguration(properties)) { final Enumeration keys = properties.keys(); while (keys.hasMoreElements()) { final String key = keys.nextElement(); producerConfig.put(key, properties.get(key)); } + + // Overwrite the serializers + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + // Class-loader hack for accessing the kafka classes when initializing producer. + producer = Utils.runWithGivenClassLoader(() -> new KafkaProducer<>(producerConfig), KafkaProducer.class.getClassLoader()); + LOG.info(" kafka producer initialized with {} ", producerConfig); + } else { + producer = new NoOpProducer(); + LOG.warn("No Kafka configuration found for metrics. Using NoOpProducer."); } - // Overwrite the serializers - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); - // Class-loader hack for accessing the kafka classes when initializing producer. - producer = Utils.runWithGivenClassLoader(() -> new KafkaProducer<>(producerConfig), KafkaProducer.class.getClassLoader()); - LOG.info(" kafka producer initialized with {} ", producerConfig); + + + } + + private boolean hasValidConfiguration(Dictionary properties) { + return properties != null && + !properties.isEmpty() && + properties.get(BOOTSTRAP_SERVER) != null; } public void destroy() { diff --git a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java index dcbeed0f0fe7..ab511534ec6c 100644 --- a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java +++ b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java @@ -39,9 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; - import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; @@ -60,6 +58,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.opennms.core.ipc.common.kafka.Utils; import org.opennms.features.kafka.producer.AlarmEqualityChecker; +import org.opennms.features.kafka.producer.KafkaProducerManager; import org.opennms.features.kafka.producer.OpennmsKafkaProducer; import org.opennms.features.kafka.producer.ProtobufMapper; import org.opennms.features.kafka.producer.model.OpennmsModelProtos; @@ -79,8 +78,7 @@ public class KafkaAlarmDataSync implements AlarmDataStore, Runnable { private static final String ALARM_STORE_NAME = "alarm_store"; public static final String KAFKA_STREAMS_PID = "org.opennms.features.kafka.producer.streams"; - - private final ConfigurationAdmin configAdmin; + private final KafkaProducerManager kafkaProducerManager; private final OpennmsKafkaProducer kafkaProducer; private final ProtobufMapper protobufMapper; private final AtomicBoolean closed = new AtomicBoolean(true); @@ -98,8 +96,8 @@ public class KafkaAlarmDataSync implements AlarmDataStore, Runnable { AlarmEqualityChecker.with(AlarmEqualityChecker.Exclusions::defaultExclusions); private boolean suppressIncrementalAlarms; - public KafkaAlarmDataSync(ConfigurationAdmin configAdmin, OpennmsKafkaProducer kafkaProducer, ProtobufMapper protobufMapper) { - this.configAdmin = Objects.requireNonNull(configAdmin); + public KafkaAlarmDataSync(KafkaProducerManager kafkaProducerManager, OpennmsKafkaProducer kafkaProducer, ProtobufMapper protobufMapper) { + this.kafkaProducerManager = Objects.requireNonNull(kafkaProducerManager); this.kafkaProducer = Objects.requireNonNull(kafkaProducer); this.protobufMapper = Objects.requireNonNull(protobufMapper); } @@ -116,7 +114,17 @@ public void init() throws IOException { return; } + if (!kafkaProducerManager.hasConfigurationForMessageType(KafkaProducerManager.MessageType.ALARM)) { + LOG.warn("No Kafka configuration found for alarms. Alarm synchronization will not be initialized."); + return; + } + final Properties streamProperties = loadStreamsProperties(); + if (streamProperties == null || !streamProperties.containsKey("bootstrap.servers")) { + LOG.warn("No bootstrap.servers configured for alarm synchronization. Skipping initialization."); + return; + } + final StreamsBuilder builder = new StreamsBuilder(); final GlobalKTable alarmBytesKtable = builder.globalTable(alarmTopic, Consumed.with(Serdes.String(), Serdes.ByteArray()), Materialized.as(ALARM_STORE_NAME)); @@ -291,16 +299,21 @@ private Properties loadStreamsProperties() throws IOException { streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, kafkaDir.toString()); // Copy common properties from client configuration, which should save the user from having to configure // properties for the stream client 99% of time - final Dictionary clientProperties = configAdmin.getConfiguration(OpennmsKafkaProducer.KAFKA_CLIENT_PID).getProperties(); - if (clientProperties != null) { - copyPropIfNonNull(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clientProperties, streamsProperties); - copyPropIfNonNull(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clientProperties, streamsProperties); - copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, clientProperties, streamsProperties); - copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, clientProperties, streamsProperties); + Properties clientProperties = kafkaProducerManager.getConfigurationForMessageType(KafkaProducerManager.MessageType.ALARM); + + if (clientProperties == null || clientProperties.isEmpty()) { + LOG.warn("No Kafka configuration found for alarms. Cannot load streams properties."); + return null; } + + copyPropertyIfSet(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clientProperties, streamsProperties); + copyPropertyIfSet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clientProperties, streamsProperties); + copyPropertyIfSet(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, clientProperties, streamsProperties); + copyPropertyIfSet(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, clientProperties, streamsProperties); + // Now add all of the stream properties, overriding any of the properties inherited from the producer config - final Dictionary properties = configAdmin.getConfiguration(KAFKA_STREAMS_PID).getProperties(); + final Dictionary properties = getConfigurationAdmin().getConfiguration(KAFKA_STREAMS_PID).getProperties(); if (properties != null) { final Enumeration keys = properties.keys(); while (keys.hasMoreElements()) { @@ -311,9 +324,18 @@ private Properties loadStreamsProperties() throws IOException { // Override the deserializers unconditionally streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + + if (!streamsProperties.containsKey("bootstrap.servers")) { + LOG.warn("No bootstrap.servers configured in alarm synchronization properties."); + return null; + } return streamsProperties; } + private ConfigurationAdmin getConfigurationAdmin() { + return kafkaProducerManager.getConfigAdmin(); + } + private static void copyPropIfNonNull(String propName, Dictionary sourceMap, Properties targetMap) { Object propValue = sourceMap.get(propName); if (propValue != null) { @@ -321,6 +343,14 @@ private static void copyPropIfNonNull(String propName, Dictionary - + + + + + - + @@ -96,7 +102,7 @@ - + diff --git a/features/kafka/producer/src/test/java/org/opennms/features/kafka/producer/KafkaForwarderIT.java b/features/kafka/producer/src/test/java/org/opennms/features/kafka/producer/KafkaForwarderIT.java index ec8a643e485a..b2608a49fcb2 100644 --- a/features/kafka/producer/src/test/java/org/opennms/features/kafka/producer/KafkaForwarderIT.java +++ b/features/kafka/producer/src/test/java/org/opennms/features/kafka/producer/KafkaForwarderIT.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -63,6 +64,7 @@ import org.opennms.netmgt.model.OnmsSeverity; import org.opennms.netmgt.topologies.service.api.OnmsTopologyDao; import org.opennms.test.JUnitConfigurationEnvironment; +import org.osgi.service.cm.Configuration; import org.osgi.service.cm.ConfigurationAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +95,7 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -105,6 +108,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -134,6 +138,7 @@ public class KafkaForwarderIT implements TemporaryDatabaseAware { private static final Logger LOG = LoggerFactory.getLogger(KafkaForwarderIT.class); + private static final String BOOTSTRAP_SERVER= "bootstrap.servers"; private static final String EVENT_TOPIC_NAME = "events"; private static final String ALARM_TOPIC_NAME = "test-alarms"; private static final String NODE_TOPIC_NAME = "test-nodes"; @@ -173,6 +178,8 @@ public class KafkaForwarderIT implements TemporaryDatabaseAware { private OpennmsKafkaProducer kafkaProducer; + private KafkaProducerManager kafkaProducerManager; + private KafkaAlarmDataSync kafkaAlarmaDataStore; private ExecutorService executor; @@ -238,10 +245,22 @@ public void onShutdown(DatabasePopulator populator, HwEntityDao dao) { streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, data.getAbsolutePath()); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfig.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 1000); - when(configAdmin.getConfiguration(OpennmsKafkaProducer.KAFKA_CLIENT_PID).getProperties()).thenReturn(producerConfig); - when(configAdmin.getConfiguration(KafkaAlarmDataSync.KAFKA_STREAMS_PID).getProperties()).thenReturn(streamsConfig); + Configuration producerConfiguration = mock(Configuration.class); + when(producerConfiguration.getProperties()).thenReturn(producerConfig); + + when(configAdmin.getConfiguration( + eq(OpennmsKafkaProducer.KAFKA_CLIENT_PID) + )).thenReturn(producerConfiguration); + + Configuration streamsConfiguration = mock(Configuration.class); + when(streamsConfiguration.getProperties()).thenReturn(streamsConfig); + + when(configAdmin.getConfiguration( + eq(KafkaAlarmDataSync.KAFKA_STREAMS_PID) + )).thenReturn(streamsConfiguration); + kafkaProducerManager = new KafkaProducerManager(configAdmin); - kafkaProducer = new OpennmsKafkaProducer(protobufMapper, nodeCache, configAdmin, eventdIpcMgr, onmsTopologyDao, 5); + kafkaProducer = new OpennmsKafkaProducer(protobufMapper, nodeCache, kafkaProducerManager,eventdIpcMgr, onmsTopologyDao, 5); kafkaProducer.setEventTopic(EVENT_TOPIC_NAME); // Don't forward newSuspect events kafkaProducer.setEventFilter("!getUei().equals(\"" + EventConstants.NEW_SUSPECT_INTERFACE_EVENT_UEI + "\")"); @@ -251,8 +270,9 @@ public void onShutdown(DatabasePopulator populator, HwEntityDao dao) { kafkaProducer.setAlarmFilter(null); kafkaProducer.setNodeTopic(NODE_TOPIC_NAME); kafkaProducer.init(); + kafkaProducerManager.init(); - kafkaAlarmaDataStore = new KafkaAlarmDataSync(configAdmin, kafkaProducer, protobufMapper); + kafkaAlarmaDataStore = new KafkaAlarmDataSync(kafkaProducerManager, kafkaProducer, protobufMapper); kafkaAlarmaDataStore.setAlarmTopic(ALARM_TOPIC_NAME); kafkaAlarmaDataStore.setAlarmSync(true); kafkaAlarmaDataStore.init(); @@ -313,13 +333,11 @@ public void canProduceAndConsumeMessages() throws Exception { // Wait for alarm feedback to be consumed await().atMost(1, TimeUnit.MINUTES).until(() -> kafkaConsumer.getAlarmFeedback(), not(empty())); - // Events, nodes and alarms were forwarded and consumed! - // Verify the alarm feedback consumed + OpennmsModelProtos.AlarmFeedback consumedAlarmFeedback = kafkaConsumer.getAlarmFeedback().get(0); assertThat(consumedAlarmFeedback.getSituationKey(), is(equalTo(alarmFeedback.getSituationKey()))); - - // Ensure that we have some events with a fs:fid + List eventsWithFsAndFid = kafkaConsumer.getEvents().stream() .filter(e -> !Strings.isNullOrEmpty(e.getNodeCriteria().getForeignId()) @@ -392,6 +410,104 @@ public void testProducerSuppression() throws Exception { Thread.sleep(10000); assertEquals(1, kafkaConsumer.getAlarms().size()); } + @Test + public void testFallbackToGlobalConfiguration() throws Exception { + + + Hashtable globalConfigProps = new Hashtable<>(); + globalConfigProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer.getKafkaConnectString()); + globalConfigProps.put("client.id", "global-producer"); + + ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class); + Configuration globalConfig = mock(Configuration.class); + when(globalConfig.getProperties()).thenReturn(globalConfigProps); + + Configuration emptyConfig = mock(Configuration.class); + when(emptyConfig.getProperties()).thenReturn(null); + + // when(configAdmin.getConfiguration(KafkaProducerManager.EVENTS_KAFKA_CLIENT_PID)).thenReturn(emptyConfig); + when(configAdmin.getConfiguration( + eq(KafkaProducerManager.EVENTS_KAFKA_CLIENT_PID) + )).thenReturn(emptyConfig); + + when(configAdmin.getConfiguration( + eq(KafkaProducerManager.ALARMS_KAFKA_CLIENT_PID) + )).thenReturn(emptyConfig); + + when(configAdmin.getConfiguration( + eq(KafkaProducerManager.NODES_KAFKA_CLIENT_PID) + )).thenReturn(emptyConfig); + + when(configAdmin.getConfiguration( + eq(KafkaProducerManager.GLOBAL_KAFKA_CLIENT_PID) + )).thenReturn(globalConfig); + + when(configAdmin.getConfiguration(anyString())).thenReturn(globalConfig); + + KafkaProducerManager testManager = new KafkaProducerManager(configAdmin); + testManager.init(); + + for (KafkaProducerManager.MessageType messageType : KafkaProducerManager.MessageType.values()) { + Properties config = testManager.getConfigurationForMessageType(messageType); + assertThat("use global config when topic-specific is missing", + config.getProperty("client.id"), is("global-producer")); + } + + testManager.destroy(); + LOG.info("Fallback to global configuration test completed successfully"); + } + + @Test + public void testEffectivePidSelectionLogic() throws Exception { + + Hashtable configWithBootstrap = new Hashtable<>(); + configWithBootstrap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer.getKafkaConnectString()); + + Hashtable configWithoutBootstrap = new Hashtable<>(); + configWithoutBootstrap.put("othertest.property", "value"); + + ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class); + + Configuration eventsConfig = mock(Configuration.class); + when(eventsConfig.getProperties()).thenReturn(configWithBootstrap); + + Configuration alarmsConfig = mock(Configuration.class); + when(alarmsConfig.getProperties()).thenReturn(configWithoutBootstrap); + + Configuration globalConfig = mock(Configuration.class); + when(globalConfig.getProperties()).thenReturn(configWithBootstrap); + + // when(configAdmin.getConfiguration(KafkaProducerManager.EVENTS_KAFKA_CLIENT_PID)).thenReturn(eventsConfig); + when(configAdmin.getConfiguration( + eq(KafkaProducerManager.EVENTS_KAFKA_CLIENT_PID) + )).thenReturn(eventsConfig); + + when(configAdmin.getConfiguration( + eq(KafkaProducerManager.ALARMS_KAFKA_CLIENT_PID) + )).thenReturn(alarmsConfig); + + when(configAdmin.getConfiguration( + eq(KafkaProducerManager.GLOBAL_KAFKA_CLIENT_PID) + )).thenReturn(globalConfig); + + when(configAdmin.getConfiguration(anyString())).thenReturn(globalConfig); + + KafkaProducerManager testManager = new KafkaProducerManager(configAdmin); + + testManager.init(); + + Properties eventsResult = testManager.getConfigurationForMessageType(KafkaProducerManager.MessageType.EVENT); + + Properties alarmsResult = testManager.getConfigurationForMessageType(KafkaProducerManager.MessageType.ALARM); + + assertThat("Events config should have bootstrap servers", + eventsResult.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), not(nullValue())); + assertThat("Alarms config should have bootstrap servers (from global fallback)", + alarmsResult.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), not(nullValue())); + + testManager.destroy(); + LOG.info("Effective PID selection logic test completed"); + } @Test public void testSyncSuppression() { @@ -544,6 +660,8 @@ public static class KafkaMessageConsumerRunner implements Runnable { private List collectionSetValues = new ArrayList<>(); private Map alarmsByReductionKey = new LinkedHashMap<>(); private AtomicInteger numRecordsConsumed = new AtomicInteger(0); + private String groupId; + private List topics; private AtomicInteger numOfMetricRecords = new AtomicInteger(0); @@ -551,6 +669,17 @@ public KafkaMessageConsumerRunner(String kafkaConnectString) { this.kafkaConnectString = Objects.requireNonNull(kafkaConnectString); } + public KafkaMessageConsumerRunner(String kafkaConnectString, String topic, String groupId) { + this(kafkaConnectString, Collections.singletonList(topic), groupId); + } + + public KafkaMessageConsumerRunner(String kafkaConnectString, List topics, String groupId) { + this.kafkaConnectString = Objects.requireNonNull(kafkaConnectString); + this.topics = Objects.requireNonNull(topics); + this.groupId = Objects.requireNonNull(groupId); + } + + @Override public void run() { Properties props = new Properties(); @@ -753,6 +882,146 @@ public void tearDown() throws Exception { databasePopulator.resetDatabase(); } + @Test + public void testMessagesSentToDifferentKafkaClusters() throws Exception { + TemporaryFolder globalTempFolder = new TemporaryFolder(); + TemporaryFolder eventsTempFolder = new TemporaryFolder(); + + globalTempFolder.create(); + eventsTempFolder.create(); + + JUnitKafkaServer globalKafkaServer = new JUnitKafkaServer(globalTempFolder); + JUnitKafkaServer eventsKafkaServer = new JUnitKafkaServer(eventsTempFolder); + + + globalKafkaServer.before(); + eventsKafkaServer.before(); + + String globalConnectString = globalKafkaServer.getKafkaConnectString(); + String eventsConnectString = eventsKafkaServer.getKafkaConnectString(); + + ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class, RETURNS_DEEP_STUBS); + + Hashtable globalConfig = new Hashtable<>(); + globalConfig.put(BOOTSTRAP_SERVER, globalConnectString); + globalConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); + globalConfig.put(ProducerConfig.LINGER_MS_CONFIG, 0); + globalConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + + Hashtable eventsConfig = new Hashtable<>(); + eventsConfig.put(BOOTSTRAP_SERVER, eventsConnectString); + eventsConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); + eventsConfig.put(ProducerConfig.LINGER_MS_CONFIG, 0); + eventsConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + + // Mock all configurations with proper method signature + when(configAdmin.getConfiguration(eq(KafkaProducerManager.GLOBAL_KAFKA_CLIENT_PID)).getProperties()) + .thenReturn(globalConfig); + when(configAdmin.getConfiguration(eq(KafkaProducerManager.EVENTS_KAFKA_CLIENT_PID)).getProperties()) + .thenReturn(eventsConfig); + when(configAdmin.getConfiguration(eq(KafkaProducerManager.ALARMS_KAFKA_CLIENT_PID)).getProperties()) + .thenReturn(globalConfig); + when(configAdmin.getConfiguration(eq(KafkaProducerManager.NODES_KAFKA_CLIENT_PID)).getProperties()) + .thenReturn(globalConfig); + when(configAdmin.getConfiguration(eq(KafkaProducerManager.METRICS_KAFKA_CLIENT_PID)).getProperties()) + .thenReturn(globalConfig); + when(configAdmin.getConfiguration(eq(KafkaProducerManager.TOPOLOGY_KAFKA_CLIENT_PID)).getProperties()) + .thenReturn(globalConfig); + when(configAdmin.getConfiguration(eq(KafkaProducerManager.ALARM_FEEDBACK_KAFKA_CLIENT_PID)).getProperties()) + .thenReturn(globalConfig); + + // Create a unique state directory for this test + File uniqueStreamsDir = tempFolder.newFolder("streams-" + System.currentTimeMillis()); + Hashtable streamsConfig = new Hashtable<>(); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, uniqueStreamsDir.getAbsolutePath()); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfig.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 1000); + + // Add unique application ID to avoid conflicts + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-datasync-" + System.currentTimeMillis()); + + when(configAdmin.getConfiguration(eq(KafkaAlarmDataSync.KAFKA_STREAMS_PID)).getProperties()) + .thenReturn(streamsConfig); + + // Mock individual configurations for event-specific PID + Configuration eventsPidConfig = mock(Configuration.class); + when(eventsPidConfig.getProperties()).thenReturn(eventsConfig); + when(configAdmin.getConfiguration(KafkaProducerManager.EVENTS_KAFKA_CLIENT_PID, null)) + .thenReturn(eventsPidConfig); + + // Mock global configuration separately + Configuration globalPidConfig = mock(Configuration.class); + when(globalPidConfig.getProperties()).thenReturn(globalConfig); + when(configAdmin.getConfiguration(KafkaProducerManager.GLOBAL_KAFKA_CLIENT_PID, null)) + .thenReturn(globalPidConfig); + + KafkaProducerManager multiClusterManager = new KafkaProducerManager(configAdmin); + multiClusterManager.init(); + + Properties eventConfigResult = multiClusterManager.getConfigurationForMessageType( + KafkaProducerManager.MessageType.EVENT); + Properties alarmConfigResult = multiClusterManager.getConfigurationForMessageType( + KafkaProducerManager.MessageType.ALARM); + + assertThat("Events should use events Kafka cluster", + eventConfigResult.getProperty(BOOTSTRAP_SERVER), + equalTo(eventsConnectString)); + + assertThat("Alarms should fall back to global Kafka cluster", + alarmConfigResult.getProperty(BOOTSTRAP_SERVER), + equalTo(globalConnectString)); + + // CRITICAL: Destroy existing producer and data sync before creating new ones + if (kafkaProducer != null) { + kafkaProducer.destroy(); + } + + + OpennmsKafkaProducer multiClusterProducer = new OpennmsKafkaProducer( + protobufMapper, nodeCache, multiClusterManager, eventdIpcMgr, onmsTopologyDao, 5); + multiClusterProducer.setEventTopic(EVENT_TOPIC_NAME); + multiClusterProducer.setEventFilter("!getUei().equals(\"" + EventConstants.NEW_SUSPECT_INTERFACE_EVENT_UEI + "\")"); + multiClusterProducer.setAlarmTopic(ALARM_TOPIC_NAME); + multiClusterProducer.setAlarmFeedbackTopic(ALARM_FEEDBACK_TOPIC_NAME); + multiClusterProducer.setAlarmFilter(null); + multiClusterProducer.setNodeTopic(NODE_TOPIC_NAME); + multiClusterProducer.setEncoding("UTF-8"); + multiClusterProducer.init(); + + + // Unregister old producer and register new one + alarmLifecycleListenerManager.onListenerUnregistered(kafkaProducer, Collections.emptyMap()); + alarmLifecycleListenerManager.onListenerRegistered(multiClusterProducer, Collections.emptyMap()); + + String eventsGroupId = "events-consumer-group-" + UUID.randomUUID(); + String globalGroupId = "global-consumer-group-" + UUID.randomUUID(); + + KafkaMessageConsumerRunner eventsConsumer = new KafkaMessageConsumerRunner( + eventsConnectString, EVENT_TOPIC_NAME, eventsGroupId); + KafkaMessageConsumerRunner globalConsumer = new KafkaMessageConsumerRunner( + globalConnectString, EVENT_TOPIC_NAME, globalGroupId); + + ExecutorService consumerExecutor = Executors.newFixedThreadPool(2); + consumerExecutor.execute(eventsConsumer); + consumerExecutor.execute(globalConsumer); + + await().atMost(2, TimeUnit.MINUTES).until(() -> { + eventdIpcMgr.sendNow( + MockEventUtil.createNodeDownEventBuilder("test", databasePopulator.getNode1()).getEvent() + ); + return eventsConsumer.getEvents().size() >= 1; + }); + + List eventUeis = eventsConsumer.getEvents().stream() + .map(OpennmsModelProtos.Event::getUei) + .collect(Collectors.toList()); + assertThat(eventUeis, hasItem(EventConstants.NODE_DOWN_EVENT_UEI)); + assertThat("Event should NOT be in global cluster", + globalConsumer.getEvents(), empty()); + + } + + @Override public void setTemporaryDatabase(MockDatabase database) { mockDatabase = database;