From a3262187f46a3689e7f92dd64775c41fcc9cef49 Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Wed, 28 Jan 2026 16:27:04 +0100 Subject: [PATCH 1/3] SOLR-18062: CrossDC - support arbitrary Kafka properties. --- changelog/unreleased/solr-18062.yml | 9 + .../apache/solr/crossdc/common/ConfUtil.java | 50 +++ .../solr/crossdc/common/ConfUtilTest.java | 355 ++++++++++++++++++ 3 files changed, 414 insertions(+) create mode 100644 changelog/unreleased/solr-18062.yml create mode 100644 solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java diff --git a/changelog/unreleased/solr-18062.yml b/changelog/unreleased/solr-18062.yml new file mode 100644 index 000000000000..30ef0381bb8e --- /dev/null +++ b/changelog/unreleased/solr-18062.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: CrossDC - support arbitrary Kafka properties +type: added # added, changed, fixed, deprecated, removed, dependency_update, security, other +authors: + - name: Andrzej Bialecki + nick: ab +links: + - name: SOLR-18062 + url: https://issues.apache.org/jira/browse/SOLR-18062 diff --git a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java index 2fde6a6d270d..83f00db38904 100644 --- a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java +++ b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/ConfUtil.java @@ -21,9 +21,11 @@ import java.io.ByteArrayInputStream; import java.lang.invoke.MethodHandles; +import java.util.HashSet; import java.util.Locale; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.util.SuppressForbidden; @@ -34,6 +36,9 @@ public class ConfUtil { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String KAFKA_ENV_PREFIX = "KAFKA_"; + public static final String KAFKA_PROP_PREFIX = "kafka."; + public static void fillProperties(SolrZkClient solrClient, Map properties) { // fill in from environment Map env = System.getenv(); @@ -47,6 +52,14 @@ public static void fillProperties(SolrZkClient solrClient, Map p properties.put(configKey.getKey(), val); } } + // fill in aux Kafka env with prefix + env.forEach( + (key, val) -> { + if (key.startsWith(KAFKA_ENV_PREFIX)) { + properties.put(normalizeKafkaEnvKey(key), val); + } + }); + // fill in from system properties for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) { String val = System.getProperty(configKey.getKey()); @@ -54,6 +67,15 @@ public static void fillProperties(SolrZkClient solrClient, Map p properties.put(configKey.getKey(), val); } } + // fill in aux Kafka system properties with prefix + System.getProperties() + .forEach( + (key, val) -> { + if (key.toString().startsWith(KAFKA_PROP_PREFIX)) { + properties.put(normalizeKafkaSysPropKey(key.toString()), val); + } + }); + Properties zkProps = new Properties(); if (solrClient != null) { try { @@ -90,6 +112,34 @@ public static void fillProperties(SolrZkClient solrClient, Map p e); } } + // normalize any left aux properties by stripping prefixes + if (!properties.isEmpty()) { + Set keys = new HashSet<>(properties.keySet()); + keys.forEach( + key -> { + if (key.startsWith(KAFKA_ENV_PREFIX)) { + properties.put(normalizeKafkaEnvKey(key), properties.remove(key)); + } else if (key.startsWith(KAFKA_PROP_PREFIX)) { + properties.put(normalizeKafkaSysPropKey(key), properties.remove(key)); + } + }); + } + } + + public static String normalizeKafkaEnvKey(String key) { + if (key.startsWith(KAFKA_ENV_PREFIX)) { + return key.substring(KAFKA_ENV_PREFIX.length()).toLowerCase(Locale.ROOT).replace('_', '.'); + } else { + return key; + } + } + + public static String normalizeKafkaSysPropKey(String key) { + if (key.startsWith(KAFKA_PROP_PREFIX)) { + return key.substring(KAFKA_PROP_PREFIX.length()).toLowerCase(Locale.ROOT); + } else { + return key; + } } public static void verifyProperties(Map properties) { diff --git a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java new file mode 100644 index 000000000000..75af0748142f --- /dev/null +++ b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.common; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.SolrZkClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Comprehensive unit tests for ConfUtil utility class. Tests configuration property resolution from + * multiple sources: - Environment variables - System properties - ZooKeeper And validates the + * correct priority and handling of custom Kafka properties. + */ +public class ConfUtilTest extends SolrTestCaseJ4 { + + @Mock private SolrZkClient mockZkClient; + + private AutoCloseable mocks; + private Properties originalSysProps; + + @Before + public void setUp() throws Exception { + super.setUp(); + assumeWorkingMockito(); + mocks = MockitoAnnotations.openMocks(this); + + // Save original system properties + originalSysProps = new Properties(); + originalSysProps.putAll(System.getProperties()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + // Restore original system properties + System.setProperties(originalSysProps); + + if (mocks != null) { + mocks.close(); + } + } + + @Test + public void testFillProperties_WithStandardConfigKeys() { + Map properties = new HashMap<>(); + + // Set system properties with standard config keys + System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092"); + System.setProperty(KafkaCrossDcConf.TOPIC_NAME, "test-topic"); + System.setProperty(KafkaCrossDcConf.GROUP_ID, "test-group"); + + ConfUtil.fillProperties(null, properties); + + assertEquals("localhost:9092", properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + assertEquals("test-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME)); + assertEquals("test-group", properties.get(KafkaCrossDcConf.GROUP_ID)); + } + + @Test + public void testFillProperties_WithKafkaPrefixSystemProperties() { + Map properties = new HashMap<>(); + + // Set custom Kafka properties with kafka. prefix + System.setProperty("kafka.max.request.size", "2097152"); + System.setProperty("kafka.compression.type", "gzip"); + System.setProperty("kafka.acks", "all"); + + ConfUtil.fillProperties(null, properties); + + // Verify custom Kafka properties are added with correct keys (lowercase, dots) + assertEquals("2097152", properties.get("max.request.size")); + assertEquals("gzip", properties.get("compression.type")); + assertEquals("all", properties.get("acks")); + } + + @Test + public void testFillProperties_WithMixedCaseKafkaPrefix() { + Map properties = new HashMap<>(); + + // Test kafka. prefix handles case conversion correctly + System.setProperty("kafka.SSL.Protocol", "TLSv1.2"); + System.setProperty("kafka.security.protocol", "SSL"); + + ConfUtil.fillProperties(null, properties); + + // Properties should be converted to lowercase + assertEquals("TLSv1.2", properties.get("ssl.protocol")); + assertEquals("SSL", properties.get("security.protocol")); + } + + @Test + public void testFillProperties_FromZooKeeper() throws Exception { + Map properties = new HashMap<>(); + + // Mock ZooKeeper data + Properties zkProps = new Properties(); + zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092"); + zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic"); + zkProps.setProperty("custom.zk.property", "zk-value"); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + zkProps.store(baos, null); + byte[] zkData = baos.toByteArray(); + + when(mockZkClient.exists(anyString())).thenReturn(true); + when(mockZkClient.getData(anyString(), isNull(), isNull())).thenReturn(zkData); + + ConfUtil.fillProperties(mockZkClient, properties); + + // Verify ZooKeeper properties are loaded + assertEquals("zk-kafka:9092", properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME)); + assertEquals("zk-value", properties.get("custom.zk.property")); + } + + @Test + public void testFillProperties_PriorityOrder() throws Exception { + Map properties = new HashMap<>(); + + // Set up ZooKeeper with lowest priority + Properties zkProps = new Properties(); + zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092"); + zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic"); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + zkProps.store(baos, null); + byte[] zkData = baos.toByteArray(); + + when(mockZkClient.exists(anyString())).thenReturn(true); + when(mockZkClient.getData(anyString(), isNull(), isNull())).thenReturn(zkData); + + // Set system property with higher priority + System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "sys-kafka:9092"); + + ConfUtil.fillProperties(mockZkClient, properties); + + // System property should override ZooKeeper value + assertEquals("sys-kafka:9092", properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + // ZooKeeper value should be used when no system property is set + assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME)); + } + + @Test + public void testFillProperties_CustomKafkaPropertiesFromSystemProps() { + Map properties = new HashMap<>(); + + // Set various custom Kafka properties + System.setProperty("kafka.batch.size", "16384"); + System.setProperty("kafka.linger.ms", "10"); + System.setProperty("kafka.buffer.memory", "33554432"); + System.setProperty("kafka.retries", "3"); + + ConfUtil.fillProperties(null, properties); + + // Verify all custom properties are present with correct transformation + assertEquals("16384", properties.get("batch.size")); + assertEquals("10", properties.get("linger.ms")); + assertEquals("33554432", properties.get("buffer.memory")); + assertEquals("3", properties.get("retries")); + } + + @Test + public void testFillProperties_NullZkClient() { + Map properties = new HashMap<>(); + + System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092"); + System.setProperty("kafka.compression.type", "snappy"); + + // Should not throw exception with null ZK client + ConfUtil.fillProperties(null, properties); + + assertEquals("localhost:9092", properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + assertEquals("snappy", properties.get("compression.type")); + } + + @Test(expected = SolrException.class) + public void testFillProperties_ZkDataNull() throws Exception { + Map properties = new HashMap<>(); + + when(mockZkClient.exists(anyString())).thenReturn(true); + when(mockZkClient.getData(anyString(), isNull(), isNull())).thenReturn(null); + + // Should throw SolrException when ZK data is null + ConfUtil.fillProperties(mockZkClient, properties); + } + + @Test(expected = SolrException.class) + public void testFillProperties_ZkInterrupted() throws Exception { + Map properties = new HashMap<>(); + + when(mockZkClient.exists(anyString())).thenReturn(true); + when(mockZkClient.getData(anyString(), isNull(), isNull())) + .thenThrow(new InterruptedException("Test interrupt")); + + // Should throw SolrException and set interrupt flag + ConfUtil.fillProperties(mockZkClient, properties); + } + + @Test(expected = SolrException.class) + public void testFillProperties_ZkException() throws Exception { + Map properties = new HashMap<>(); + + when(mockZkClient.exists(anyString())).thenReturn(true); + when(mockZkClient.getData(anyString(), isNull(), isNull())) + .thenThrow(new RuntimeException("Test exception")); + + // Should throw SolrException wrapping the original exception + ConfUtil.fillProperties(mockZkClient, properties); + } + + @Test + public void testVerifyProperties_ValidConfiguration() { + Map properties = new HashMap<>(); + properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092"); + properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic"); + + // Should not throw exception + ConfUtil.verifyProperties(properties); + } + + @Test(expected = SolrException.class) + public void testVerifyProperties_MissingBootstrapServers() { + Map properties = new HashMap<>(); + properties.put(KafkaCrossDcConf.TOPIC_NAME, "test-topic"); + + // Should throw SolrException due to missing bootstrapServers + ConfUtil.verifyProperties(properties); + } + + @Test(expected = SolrException.class) + public void testVerifyProperties_MissingTopicName() { + Map properties = new HashMap<>(); + properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092"); + + // Should throw SolrException due to missing topicName + ConfUtil.verifyProperties(properties); + } + + @Test(expected = SolrException.class) + public void testVerifyProperties_BothMissing() { + Map properties = new HashMap<>(); + + // Should throw SolrException due to missing both required properties + ConfUtil.verifyProperties(properties); + } + + @Test + public void testFillProperties_EmptyProperties() { + Map properties = new HashMap<>(); + + // No system properties or environment variables set + ConfUtil.fillProperties(null, properties); + + // Should complete without error, properties may be empty or contain defaults + assertNotNull(properties); + } + + @Test + public void testFillProperties_SecurityProperties() throws Exception { + Map properties = new HashMap<>(); + + // Set security-related properties + System.setProperty("kafka.ssl.truststore.location", "/path/to/truststore"); + System.setProperty("kafka.ssl.keystore.location", "/path/to/keystore"); + System.setProperty("kafka.security.protocol", "SSL"); + + ConfUtil.fillProperties(null, properties); + + // Verify security properties are correctly transformed + assertEquals("/path/to/truststore", properties.get("ssl.truststore.location")); + assertEquals("/path/to/keystore", properties.get("ssl.keystore.location")); + assertEquals("SSL", properties.get("security.protocol")); + } + + @Test + public void testFillProperties_ComplexScenario() throws Exception { + Map properties = new HashMap<>(); + + // Create a complex scenario with multiple sources + Properties zkProps = new Properties(); + zkProps.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "zk-kafka:9092"); + zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic"); + zkProps.setProperty(KafkaCrossDcConf.GROUP_ID, "zk-group"); + zkProps.setProperty("zk.only.property", "zk-value"); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + zkProps.store(baos, null); + byte[] zkData = baos.toByteArray(); + + when(mockZkClient.exists(anyString())).thenReturn(true); + when(mockZkClient.getData(anyString(), isNull(), isNull())).thenReturn(zkData); + + // Set system properties - should override ZK + System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "sys-kafka:9092"); + System.setProperty("kafka.max.poll.records", "1000"); + System.setProperty("kafka.enable.auto.commit", "false"); + + ConfUtil.fillProperties(mockZkClient, properties); + + // Verify priority: system props > ZK + assertEquals("sys-kafka:9092", properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + assertEquals("zk-topic", properties.get(KafkaCrossDcConf.TOPIC_NAME)); + assertEquals("zk-group", properties.get(KafkaCrossDcConf.GROUP_ID)); + + // Verify custom Kafka properties from system props + assertEquals("1000", properties.get("max.poll.records")); + assertEquals("false", properties.get("enable.auto.commit")); + + // Verify ZK-only property + assertEquals("zk-value", properties.get("zk.only.property")); + } + + // we can't easily modify envvars, test just the key conversion in properties + @Test + public void testUnderscoreToDotsConversion() { + Map properties = new HashMap<>(); + + // Test underscore to dot conversion in kafka. prefix env + properties.put("KAFKA_MAX_POLL_RECORDS", "500"); + properties.put("KAFKA_FETCH_MAX_WAIT_MS", "1000"); + + ConfUtil.fillProperties(null, properties); + + // Verify underscores are converted to dots + assertEquals("500", properties.get("max.poll.records")); + assertEquals("1000", properties.get("fetch.max.wait.ms")); + } +} From 0eb69875877f6234caba0cf7a2dd5b3301b1ea4c Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Wed, 28 Jan 2026 16:42:38 +0100 Subject: [PATCH 2/3] Missing overrides. --- .../src/test/org/apache/solr/crossdc/common/ConfUtilTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java index 75af0748142f..5120ced02f84 100644 --- a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java +++ b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java @@ -46,6 +46,7 @@ public class ConfUtilTest extends SolrTestCaseJ4 { private Properties originalSysProps; @Before + @Override public void setUp() throws Exception { super.setUp(); assumeWorkingMockito(); @@ -57,6 +58,7 @@ public void setUp() throws Exception { } @After + @Override public void tearDown() throws Exception { super.tearDown(); // Restore original system properties From 2a1c2b5db53fbfe708b47ca84074544bffb48f4f Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Thu, 29 Jan 2026 12:41:25 +0100 Subject: [PATCH 3/3] Forbidden APIs. --- .../apache/solr/crossdc/common/ConfUtilTest.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java index 5120ced02f84..e56dbb604837 100644 --- a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java +++ b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/common/ConfUtilTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -128,7 +129,9 @@ public void testFillProperties_FromZooKeeper() throws Exception { zkProps.setProperty("custom.zk.property", "zk-value"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - zkProps.store(baos, null); + OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8"); + zkProps.store(writer, null); + writer.close(); byte[] zkData = baos.toByteArray(); when(mockZkClient.exists(anyString())).thenReturn(true); @@ -152,7 +155,9 @@ public void testFillProperties_PriorityOrder() throws Exception { zkProps.setProperty(KafkaCrossDcConf.TOPIC_NAME, "zk-topic"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - zkProps.store(baos, null); + OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8"); + zkProps.store(writer, null); + writer.close(); byte[] zkData = baos.toByteArray(); when(mockZkClient.exists(anyString())).thenReturn(true); @@ -313,7 +318,9 @@ public void testFillProperties_ComplexScenario() throws Exception { zkProps.setProperty("zk.only.property", "zk-value"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - zkProps.store(baos, null); + OutputStreamWriter writer = new OutputStreamWriter(baos, "UTF-8"); + zkProps.store(writer, null); + writer.close(); byte[] zkData = baos.toByteArray(); when(mockZkClient.exists(anyString())).thenReturn(true);