Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,58 @@ config:update
<1> Set the Kafka server IP or hostname by replacing the kafka-server-ip and port of an existing Kafka cluster that you want to connect to.
Add multiple hosts in a comma-separated list; for example, `kafka-server-1:9092,kafka-server-2:9092`


== Per-Topic Kafka Configuration
For advanced deployments, you can configure different Kafka clusters for different types of messages. This allows you to send events to one set of Kafka servers, alarms to another set, and metrics to yet another set.
The following configuration PIDs are available for per-topic configuration:

|===
| Message Type | Configuration PID

| Events
| `org.opennms.features.kafka.producer.client.events`

| Alarms
| `org.opennms.features.kafka.producer.client.alarms`

| Metrics
| `org.opennms.features.kafka.producer.client.metrics`

| Nodes
| `org.opennms.features.kafka.producer.client.nodes`

| Topology (vertices and edges)
| `org.opennms.features.kafka.producer.client.topology`

| Alarm Feedback
| `org.opennms.features.kafka.producer.client.alarmFeedback`
|===

If a topic-specific PID configuration does not exist or does not have `bootstrap.servers` configured, the system will fall back to the global configuration (`org.opennms.features.kafka.producer.client`).

.Configure events to go to a specific Kafka cluster
[source, karaf]
----
config:edit org.opennms.features.kafka.producer.client.events
config:property-set bootstrap.servers events-kafka:9092
config:update
----
.Configure alarms to go to a different Kafka cluster
[source, karaf]
----
config:edit org.opennms.features.kafka.producer.client.alarms
config:property-set bootstrap.servers alarms-kafka:9092
config:update
----

.Configure metrics to use a dedicated Kafka cluster
[source, karaf]
----
config:edit org.opennms.features.kafka.producer.client.metrics
config:property-set bootstrap.servers metrics-kafka:9092
config:update
----

Next, install the `opennms-kafka-producer` feature from that same shell using:

[source, karaf]
Expand All @@ -35,3 +87,5 @@ To ensure that the feature continues to be installed on subsequent restarts, add
----
echo "opennms-kafka-producer" | sudo tee ${OPENNMS_HOME}/etc/featuresBoot.d/kafka-producer.boot
----

NOTE: Whenever configuration is updated at runtime, the feature must be stopped and restarted.
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Licensed to The OpenNMS Group, Inc (TOG) under one or more
* contributor license agreements. See the LICENSE.md file
* distributed with this work for additional information
* regarding copyright ownership.
*
* TOG licenses this file to You under the GNU Affero General
* Public License Version 3 (the "License") or (at your option)
* any later version. You may not use this file except in
* compliance with the License. You may obtain a copy of the
* License at:
*
* https://www.gnu.org/licenses/agpl-3.0.txt
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
* License.
*/
package org.opennms.features.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opennms.core.ipc.common.kafka.Utils;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Objects;
import java.util.Map;
import java.util.Dictionary;
import java.util.Properties;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;

public class KafkaProducerManager {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerManager.class);

public static final String GLOBAL_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client";
public static final String EVENTS_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.events";
public static final String ALARMS_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.alarms";
public static final String METRICS_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.metrics";
public static final String NODES_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.nodes";
public static final String TOPOLOGY_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.topology";
public static final String ALARM_FEEDBACK_KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client.alarmFeedback";
public static final String BOOTSTRAP_SERVER= "bootstrap.servers";
public enum MessageType {
EVENT,
ALARM,
NODE,
METRIC,
TOPOLOGY_VERTEX,
TOPOLOGY_EDGE,
ALARM_FEEDBACK
}

private final ConfigurationAdmin configAdmin;
private final Map<MessageType, Producer<byte[], byte[]>> messageTypeToProducerMap = new ConcurrentHashMap<>();
private final Map<String, Producer<byte[], byte[]>> pidToProducerMap = new ConcurrentHashMap<>();

public KafkaProducerManager(ConfigurationAdmin configAdmin) {
this.configAdmin = Objects.requireNonNull(configAdmin);
}

public void init() {
LOG.info("Initializing KafkaProducerManager");
for (MessageType messageType : MessageType.values()) {
try {
getProducerForMessageType(messageType);
LOG.debug("Successfully initialized producer for message type: {}", messageType);
} catch (Exception e) {
LOG.warn("Failed to initialize producer for message type: {}. It will be initialized lazily.", messageType, e);
}
}
}

public void destroy() {
LOG.info("Destroying KafkaProducerManager");
pidToProducerMap.values().forEach(producer -> {
try {
producer.close();
} catch (Exception e) {
LOG.warn("Error closing Kafka producer", e);
}
});
pidToProducerMap.clear();
messageTypeToProducerMap.clear();
}

public Producer<byte[], byte[]> getProducerForMessageType(MessageType messageType) {
return messageTypeToProducerMap.computeIfAbsent(messageType, type -> {
String effectivePid = getEffectivePidForMessageType(type);
if (effectivePid == null) {
// Return a dummy producer that does nothing
return new NoOpProducer();
}
return getOrCreateProducerForPid(effectivePid);
});
}

private boolean hasValidConfiguration(String pid) {
try {
var config = configAdmin.getConfiguration(pid);
if (config != null && config.getProperties() != null && !config.getProperties().isEmpty()) {
Dictionary<String, Object> properties = config.getProperties();
return properties.get(BOOTSTRAP_SERVER) != null;
}
return false;
} catch (IOException e) {
LOG.warn("Failed to check configuration for PID: {}", pid, e);
return false;
}
}

private String getEffectivePidForMessageType(MessageType messageType) {
String topicSpecificPid = getPidForMessageType(messageType);
if (hasValidConfiguration(topicSpecificPid)) {
LOG.debug("Using topic-specific configuration for {}: {}", messageType, topicSpecificPid);
return topicSpecificPid;
}

if (hasValidConfiguration(GLOBAL_KAFKA_CLIENT_PID)) {
LOG.debug("Falling back to global configuration for {}", messageType);
return GLOBAL_KAFKA_CLIENT_PID;
}

LOG.debug("No configuration available for message type: {}", messageType);

return null;
}

private String getPidForMessageType(MessageType messageType) {
switch (messageType) {
case EVENT:
return EVENTS_KAFKA_CLIENT_PID;
case ALARM:
return ALARMS_KAFKA_CLIENT_PID;
case NODE:
return NODES_KAFKA_CLIENT_PID;
case METRIC:
return METRICS_KAFKA_CLIENT_PID;
case TOPOLOGY_VERTEX:
case TOPOLOGY_EDGE:
return TOPOLOGY_KAFKA_CLIENT_PID;
case ALARM_FEEDBACK:
return ALARM_FEEDBACK_KAFKA_CLIENT_PID;
default:
return GLOBAL_KAFKA_CLIENT_PID;
}
}

private String determinePidForMessageType(MessageType messageType) {
switch (messageType) {
case EVENT:
return getEffectivePid(EVENTS_KAFKA_CLIENT_PID);
case ALARM:
return getEffectivePid(ALARMS_KAFKA_CLIENT_PID);
case NODE:
return getEffectivePid(NODES_KAFKA_CLIENT_PID);
case METRIC:
return getEffectivePid(METRICS_KAFKA_CLIENT_PID);
case TOPOLOGY_VERTEX:
case TOPOLOGY_EDGE:
return getEffectivePid(TOPOLOGY_KAFKA_CLIENT_PID);
case ALARM_FEEDBACK:
return getEffectivePid(ALARM_FEEDBACK_KAFKA_CLIENT_PID);
default:
return GLOBAL_KAFKA_CLIENT_PID;
}
}

private String getEffectivePid(String topicSpecificPid) {
try {
var config = configAdmin.getConfiguration(topicSpecificPid);
if (config != null && config.getProperties() != null && !config.getProperties().isEmpty()) {
Dictionary<String, Object> properties = config.getProperties();
if (properties.get(BOOTSTRAP_SERVER) != null) {
LOG.debug("bootstrap.server found for PID: {}", topicSpecificPid);
return topicSpecificPid;
}
}
} catch (IOException e) {
LOG.warn("Failed to check configuration for PID: {}", topicSpecificPid, e);
}
LOG.debug("Falling back to global configuration for PID: {}", GLOBAL_KAFKA_CLIENT_PID);
return GLOBAL_KAFKA_CLIENT_PID;
}

private Producer<byte[], byte[]> getOrCreateProducerForPid(String pid) {
return pidToProducerMap.computeIfAbsent(pid, this::initializeProducerForPid);
}

private Producer<byte[], byte[]> initializeProducerForPid(String pid) {
try {
final Properties producerConfig = getConfigurationForPid(pid);

producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());


LOG.info("Creating Kafka producer for PID: {} with bootstrap.servers: {}",
pid, producerConfig.getProperty(BOOTSTRAP_SERVER, "not configured"));

return Utils.runWithGivenClassLoader(() -> new KafkaProducer<>(producerConfig),
KafkaProducer.class.getClassLoader());

} catch (Exception e) {
LOG.error("Failed to create Kafka producer for PID: {}", pid, e);
throw new RuntimeException("Failed to create Kafka producer for PID: " + pid, e);
}
}

public Properties getConfigurationForMessageType(MessageType messageType) {
String pid = determinePidForMessageType(messageType);
return getConfigurationForPid(pid);
}

public Properties getConfigurationForPid(String pid) {
try {
final Properties config = new Properties();
final Dictionary<String, Object> properties = configAdmin.getConfiguration(pid).getProperties();

if (properties != null) {
final Enumeration<String> keys = properties.keys();
while (keys.hasMoreElements()) {
final String key = keys.nextElement();
Object value = properties.get(key);
if (value != null) {
config.put(key, value);
}
}
}
return config;
} catch (IOException e) {
LOG.warn("Failed to load configuration for PID: {}, using empty properties", pid, e);
return new Properties();
}
}

public boolean hasConfigurationForMessageType(MessageType messageType) {
String effectivePid = getEffectivePidForMessageType(messageType);
return effectivePid != null && hasValidConfiguration(effectivePid);
}

public ConfigurationAdmin getConfigAdmin() {
return configAdmin;
}
}


Loading