Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/unreleased/solr-18062.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: CrossDC - support arbitrary Kafka properties
type: added # added, changed, fixed, deprecated, removed, dependency_update, security, other
authors:
- name: Andrzej Bialecki
nick: ab
links:
- name: SOLR-18062
url: https://issues.apache.org/jira/browse/SOLR-18062
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import java.io.ByteArrayInputStream;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.SuppressForbidden;
Expand All @@ -34,6 +36,9 @@
public class ConfUtil {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String KAFKA_ENV_PREFIX = "KAFKA_";
public static final String KAFKA_PROP_PREFIX = "kafka.";

public static void fillProperties(SolrZkClient solrClient, Map<String, Object> properties) {
// fill in from environment
Map<String, String> env = System.getenv();
Expand All @@ -47,13 +52,30 @@ public static void fillProperties(SolrZkClient solrClient, Map<String, Object> p
properties.put(configKey.getKey(), val);
}
}
// fill in aux Kafka env with prefix
env.forEach(
(key, val) -> {
if (key.startsWith(KAFKA_ENV_PREFIX)) {
properties.put(normalizeKafkaEnvKey(key), val);
}
});

// fill in from system properties
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
String val = System.getProperty(configKey.getKey());
if (val != null) {
properties.put(configKey.getKey(), val);
}
}
// fill in aux Kafka system properties with prefix
System.getProperties()
.forEach(
(key, val) -> {
if (key.toString().startsWith(KAFKA_PROP_PREFIX)) {
properties.put(normalizeKafkaSysPropKey(key.toString()), val);
}
});

Properties zkProps = new Properties();
if (solrClient != null) {
try {
Expand Down Expand Up @@ -90,6 +112,34 @@ public static void fillProperties(SolrZkClient solrClient, Map<String, Object> p
e);
}
}
// normalize any left aux properties by stripping prefixes
if (!properties.isEmpty()) {
Set<String> keys = new HashSet<>(properties.keySet());
keys.forEach(
key -> {
if (key.startsWith(KAFKA_ENV_PREFIX)) {
properties.put(normalizeKafkaEnvKey(key), properties.remove(key));
} else if (key.startsWith(KAFKA_PROP_PREFIX)) {
properties.put(normalizeKafkaSysPropKey(key), properties.remove(key));
}
});
}
}

public static String normalizeKafkaEnvKey(String key) {
if (key.startsWith(KAFKA_ENV_PREFIX)) {
return key.substring(KAFKA_ENV_PREFIX.length()).toLowerCase(Locale.ROOT).replace('_', '.');
} else {
return key;
}
}

public static String normalizeKafkaSysPropKey(String key) {
if (key.startsWith(KAFKA_PROP_PREFIX)) {
return key.substring(KAFKA_PROP_PREFIX.length()).toLowerCase(Locale.ROOT);
} else {
return key;
}
}

public static void verifyProperties(Map<String, Object> properties) {
Expand Down
Loading