diff --git a/changelog/unreleased/SOLR-18094-zk-quorum-noderole.yml b/changelog/unreleased/SOLR-18094-zk-quorum-noderole.yml new file mode 100644 index 000000000000..08e8c319f3d6 --- /dev/null +++ b/changelog/unreleased/SOLR-18094-zk-quorum-noderole.yml @@ -0,0 +1,10 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Capability for Solr to run embedded ZooKeeper in a quorum/ensemble mode, allowing multiple Solr nodes to form a distributed ZooKeeper ensemble within their own processes. Controlled by a new solr node-role. +type: added # added, changed, fixed, deprecated, removed, dependency_update, security, other +authors: + - name: Eric Pugh + - name: Jason Gerlowski + - name: Jan Høydahl +links: + - name: SOLR-18094 + url: https://issues.apache.org/jira/browse/SOLR-18094 diff --git a/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java b/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java index 3b18f2ac7a99..856b1606bd12 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java +++ b/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java @@ -42,26 +42,58 @@ public class SolrZkServer { public static final String ZK_WHITELIST_PROPERTY = "zookeeper.4lw.commands.whitelist"; - boolean zkRun = false; String zkHost; int solrPort; Properties props; SolrZkServerProps zkProps; - private Thread zkThread; // the thread running a zookeeper server, only if zkRun is true + private Thread zkThread; // the thread running a zookeeper server, only if zkServerEnabled is true private Path dataHome; // o.a.zookeeper.**.QuorumPeerConfig needs a File not a Path private String confHome; - public SolrZkServer(boolean zkRun, String zkHost, Path dataHome, String confHome, int solrPort) { - this.zkRun = zkRun; + public SolrZkServer(String zkHost, Path dataHome, String confHome, int solrPort) { this.zkHost = zkHost; this.dataHome = dataHome; this.confHome = confHome; this.solrPort = solrPort; } + /** + * Creates and initializes a SolrZkServer instance for standalone (non-quorum) mode. + * + * @param zkHost the ZooKeeper host string (chroot will be stripped) + * @param solrHome the Solr home directory path + * @param solrHostPort the Solr host port + * @return initialized and started SolrZkServer instance + */ + public static SolrZkServer createAndStart(String zkHost, Path solrHome, int solrHostPort) { + String zkDataHome = + EnvUtils.getProperty( + "solr.zookeeper.server.datadir", solrHome.resolve("zoo_data").toString()); + String zkConfHome = EnvUtils.getProperty("solr.zookeeper.server.confdir", solrHome.toString()); + + String strippedZkHost = stripChroot(zkHost); + SolrZkServer zkServer = + new SolrZkServer(strippedZkHost, Path.of(zkDataHome), zkConfHome, solrHostPort); + zkServer.parseConfig(); + zkServer.start(); + + return zkServer; + } + + /** + * Strips the chroot portion from a ZooKeeper host string. + * + * @param zkRun the ZooKeeper host string (e.g., "localhost:2181/solr") + * @return the host string without chroot (e.g., "localhost:2181") + */ + private static String stripChroot(String zkRun) { + if (zkRun == null || zkRun.trim().isEmpty() || zkRun.lastIndexOf('/') < 0) return zkRun; + return zkRun.substring(0, zkRun.lastIndexOf('/')); + } + public String getClientString() { if (zkHost != null) { return zkHost; @@ -71,11 +103,6 @@ public String getClientString() { return null; } - // if the string wasn't passed as zkHost, then use the standalone server we started - if (!zkRun) { - return null; - } - InetSocketAddress addr = zkProps.getClientPortAddress(); String hostName; // We cannot advertise 0.0.0.0, so choose the best host to advertise @@ -94,7 +121,6 @@ public void parseConfig() { // set default data dir // TODO: use something based on IP+port??? support ensemble all from same solr home? zkProps.setDataDir(dataHome); - zkProps.zkRun = zkRun; zkProps.solrPort = Integer.toString(solrPort); } @@ -113,7 +139,7 @@ public void parseConfig() { try { props = SolrZkServerProps.getProperties(zooCfgPath); - SolrZkServerProps.injectServers(props, zkRun, zkHost); + SolrZkServerProps.injectServers(props, zkHost); // This is the address that the embedded Zookeeper will bind to. Like Solr, it defaults to // "127.0.0.1". props.setProperty( @@ -123,9 +149,8 @@ public void parseConfig() { } zkProps.parseProperties(props); } catch (QuorumPeerConfig.ConfigException | IOException e) { - if (zkRun) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); - } + + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } } @@ -134,9 +159,6 @@ public Map getServers() { } public void start() { - if (!zkRun) { - return; - } if (System.getProperty(ZK_WHITELIST_PROPERTY) == null) { System.setProperty(ZK_WHITELIST_PROPERTY, "ruok, mntr, conf"); @@ -163,20 +185,11 @@ public void start() { }, "embeddedZkServer"); - if (zkProps.getServers().size() > 1) { - if (log.isInfoEnabled()) { - log.info( - "STARTING EMBEDDED ENSEMBLE ZOOKEEPER SERVER at port {}, listening on host {}", - zkProps.getClientPortAddress().getPort(), - zkProps.getClientPortAddress().getAddress().getHostAddress()); - } - } else { - if (log.isInfoEnabled()) { - log.info( - "STARTING EMBEDDED ENSEMBLE ZOOKEEPER SERVER at port {}, listening on host {}", - zkProps.getClientPortAddress().getPort(), - zkProps.getClientPortAddress().getAddress().getHostAddress()); - } + if (log.isInfoEnabled()) { + log.info( + "STARTING EMBEDDED ENSEMBLE ZOOKEEPER SERVER at port {}, listening on host {}", + zkProps.getClientPortAddress().getPort(), + zkProps.getClientPortAddress().getAddress().getHostAddress()); } zkThread.setDaemon(true); @@ -203,9 +216,7 @@ public void start() { } public void stop() { - if (!zkRun) { - return; - } + zkThread.interrupt(); } } @@ -216,7 +227,6 @@ class SolrZkServerProps extends QuorumPeerConfig { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); String solrPort; // port that Solr is listening on - boolean zkRun; /** * Parse a ZooKeeper configuration file @@ -245,10 +255,10 @@ public static Properties getProperties(Path configPath) throws ConfigException { // Given zkHost=localhost:1111,localhost:2222 this will inject // server.0=localhost:1112:1113 // server.1=localhost:2223:2224 - public static void injectServers(Properties props, boolean zkRun, String zkHost) { + public static void injectServers(Properties props, String zkHost) { // if clientPort not already set, use zkRun - if (zkRun && props.getProperty("clientPort") == null) { + if (props.getProperty("clientPort") == null) { // int portIdx = zkRun.lastIndexOf(':'); int portIdx = "".lastIndexOf(':'); if (portIdx > 0) { diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 50dd516f5e39..16ec6c5b9615 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -290,7 +290,7 @@ public JerseyAppHandlerCache getJerseyAppHandlerCache() { private final ObjectCache objectCache = new ObjectCache(); - public final NodeRoles nodeRoles = new NodeRoles(System.getProperty(NodeRoles.NODE_ROLES_PROP)); + public final NodeRoles nodeRoles = new NodeRoles(EnvUtils.getProperty(NodeRoles.NODE_ROLES_PROP)); private final ExecutorService indexSearcherExecutor; diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java index ad69e6d90f4b..0e779935d397 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java +++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java @@ -236,11 +236,16 @@ public static NodeConfig loadNodeConfig(Path solrHome, Properties nodeProperties initModules(loader, null); nodeProperties = SolrXmlConfig.wrapAndSetZkHostFromSysPropIfNeeded(nodeProperties); - // TODO: Only job of this block is to - // delay starting a solr core to satisfy - // ZkFailoverTest test case... String zkHost = nodeProperties.getProperty(SolrXmlConfig.ZK_HOST); - if (StrUtils.isNotNullOrEmpty(zkHost)) { + NodeRoles nodeRoles = new NodeRoles(EnvUtils.getProperty(NodeRoles.NODE_ROLES_PROP)); + boolean zookeeperQuorumNode = + NodeRoles.MODE_ON.equals(nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM)); + + // This block demonstrates how we pause and wait for a ZooKeeper to be available before + // continuing. + // See the ZkFailoverTest to see how changing solr.cloud.wait.for.zk.seconds impacts this + // capability. + if (StrUtils.isNotNullOrEmpty(zkHost) && !zookeeperQuorumNode) { int startUpZkTimeOut = 1000 * Integer.getInteger( diff --git a/solr/core/src/java/org/apache/solr/core/NodeRoles.java b/solr/core/src/java/org/apache/solr/core/NodeRoles.java index c38c92297c76..00c5c3d57ef5 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeRoles.java +++ b/solr/core/src/java/org/apache/solr/core/NodeRoles.java @@ -113,6 +113,18 @@ public String modeWhenRoleIsAbsent() { public Set supportedModes() { return Set.of(MODE_ON, MODE_OFF); } + }, + + ZOOKEEPER_QUORUM("zookeeper_quorum") { + @Override + public Set supportedModes() { + return Set.of(MODE_ON, MODE_OFF); + } + + @Override + public String modeWhenRoleIsAbsent() { + return MODE_OFF; + } }; public final String roleName; diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index 29efbf888ba7..3175e1a9d7b1 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -20,12 +20,16 @@ import static org.apache.solr.common.cloud.ZkStateReader.HTTPS_PORT_PROP; import io.opentelemetry.api.common.Attributes; +import java.io.FileReader; import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; @@ -47,6 +51,7 @@ import org.apache.solr.metrics.SolrMetricsContext; import org.apache.solr.metrics.otel.OtelUnit; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +68,12 @@ public class ZkContainer { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected ZkController zkController; + + // zkServer (and SolrZkServer) wrap a ZooKeeperServerMain if standalone mode, but in quorum we + // just use ZooKeeperServerEmbedded + // directly! Why? Can we use ZooKeeperServerEmbedded in one node directly instead? private SolrZkServer zkServer; + private ZooKeeperServerEmbedded zkServerEmbedded; private ExecutorService coreZkRegister = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("coreZkRegister")); @@ -75,42 +85,105 @@ public class ZkContainer { public ZkContainer() {} public void initZooKeeper(final CoreContainer cc, CloudConfig config) { - boolean zkRun = EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false); + // zkServerEnabled is set whenever in solrCloud mode ('-c') but no explicit zkHost/ZK_HOST is + // provided. + final boolean zkServerEnabled = + EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false); + boolean zkQuorumNode = false; + if (NodeRoles.MODE_ON.equals(cc.nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM))) { + zkQuorumNode = true; + log.info("Starting node in ZooKeeper Quorum role."); + } - if (zkRun && config == null) + if (zkServerEnabled && config == null) { throw new SolrException( SolrException.ErrorCode.SERVER_ERROR, "Cannot start Solr in cloud mode - no cloud config provided"); + } + + if (config == null) { + log.info("Solr is running in standalone mode"); + return; + } - if (config == null) return; // not in zk mode + final boolean runAsQuorum = config.getZkHost() != null && zkQuorumNode; String zookeeperHost = config.getZkHost(); + final var solrHome = cc.getSolrHome(); + if (zkServerEnabled) { + if (!runAsQuorum) { + // Old school ZooKeeperServerMain being used under the covers. + zkServer = + SolrZkServer.createAndStart(config.getZkHost(), solrHome, config.getSolrHostPort()); + + // set client from server config if not already set + if (zookeeperHost == null) { + zookeeperHost = zkServer.getClientString(); + } + } else { + // ZooKeeperServerEmbedded being used under the covers. + // Figure out where to put zoo-data + final var zkHomeDir = solrHome.resolve("zoo_home"); + final var zkDataDir = zkHomeDir.resolve("data"); + + // Populate a zoo.cfg + final String zooCfgTemplate = + "" + + "tickTime=2000\n" + + "initLimit=10\n" + + "syncLimit=5\n" + + "dataDir=@@DATA_DIR@@\n" + + "4lw.commands.whitelist=mntr,conf,ruok\n" + + "admin.enableServer=false\n" + + "clientPort=@@ZK_CLIENT_PORT@@\n"; + + final int zkPort = config.getSolrHostPort() + 1000; + String zooCfgContents = + zooCfgTemplate + .replace("@@DATA_DIR@@", zkDataDir.toString()) + .replace("@@ZK_CLIENT_PORT@@", String.valueOf(zkPort)); + final String[] zkHosts = config.getZkHost().split(","); + int myId = -1; + final String targetConnStringSection = config.getHost() + ":" + zkPort; + if (log.isInfoEnabled()) { + log.info( + "Trying to match {} against zkHostString {} to determine myid", + targetConnStringSection, + config.getZkHost()); + } + for (int i = 0; i < zkHosts.length; i++) { + final String host = zkHosts[i]; + if (targetConnStringSection.equals(zkHosts[i])) { + myId = (i + 1); + } + final var hostComponents = host.split(":"); + final var zkServer = hostComponents[0]; + final var zkClientPort = Integer.valueOf(hostComponents[1]); + final var zkQuorumPort = zkClientPort - 4000; + final var zkLeaderPort = zkClientPort - 3000; + final String configEntry = + "server." + (i + 1) + "=" + zkServer + ":" + zkQuorumPort + ":" + zkLeaderPort + "\n"; + zooCfgContents = zooCfgContents + configEntry; + } + + if (myId == -1) { + throw new IllegalStateException( + "Unable to determine ZK 'myid' for target " + targetConnStringSection); + } - // zookeeper in quorum mode currently causes a failure when trying to - // register log4j mbeans. See SOLR-2369 - // TODO: remove after updating to an slf4j based zookeeper - System.setProperty("zookeeper.jmx.log4j.disable", "true"); - - Path solrHome = cc.getSolrHome(); - if (zkRun) { - String zkDataHome = - EnvUtils.getProperty( - "solr.zookeeper.server.datadir", solrHome.resolve("zoo_data").toString()); - String zkConfHome = - EnvUtils.getProperty("solr.zookeeper.server.confdir", solrHome.toString()); - zkServer = - new SolrZkServer( - zkRun, - stripChroot(config.getZkHost()), - Path.of(zkDataHome), - zkConfHome, - config.getSolrHostPort()); - zkServer.parseConfig(); - zkServer.start(); - - // set client from server config if not already set - if (zookeeperHost == null) { - zookeeperHost = zkServer.getClientString(); + try { + Files.createDirectories(zkHomeDir); + Files.writeString(zkHomeDir.resolve("zoo.cfg"), zooCfgContents); + Files.createDirectories(zkDataDir); + Files.writeString(zkDataDir.resolve("myid"), String.valueOf(myId)); + // Run ZKSE + startZooKeeperServerEmbedded(zkPort, zkHomeDir.toString()); + } catch (Exception e) { + throw new ZooKeeperException( + SolrException.ErrorCode.SERVER_ERROR, + "IOException bootstrapping zk quorum instance", + e); + } } } @@ -121,9 +194,13 @@ public void initZooKeeper(final CoreContainer cc, CloudConfig config) { // we are ZooKeeper enabled try { // If this is an ensemble, allow for a long connect time for other servers to come up - if (zkRun && zkServer.getServers().size() > 1) { + if (zkServerEnabled && zkServer != null && zkServer.getServers().size() > 1) { zkClientConnectTimeout = 24 * 60 * 60 * 1000; // 1 day for embedded ensemble log.info("Zookeeper client={} Waiting for a quorum.", zookeeperHost); + } else if (zkServerEnabled && runAsQuorum) { + // Quorum mode also needs long timeout for other nodes to start + zkClientConnectTimeout = 24 * 60 * 60 * 1000; // 1 day for embedded quorum + log.info("Zookeeper client={} (quorum mode) Waiting for a quorum.", zookeeperHost); } else { log.info("Zookeeper client={}", zookeeperHost); } @@ -138,7 +215,7 @@ public void initZooKeeper(final CoreContainer cc, CloudConfig config) { ZkController zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config); - if (zkRun) { + if (zkServerEnabled) { if (StrUtils.isNotNullOrEmpty(System.getProperty(HTTPS_PORT_PROP))) { // Embedded ZK and probably running with SSL new ClusterProperties(zkController.getZkClient()) @@ -255,9 +332,17 @@ public SolrMetricsContext getSolrMetricsContext() { } } - private String stripChroot(String zkRun) { - if (zkRun == null || zkRun.trim().isEmpty() || zkRun.lastIndexOf('/') < 0) return zkRun; - return zkRun.substring(0, zkRun.lastIndexOf('/')); + private void startZooKeeperServerEmbedded(int port, String zkHomeDir) throws Exception { + Properties p = new Properties(); + try (FileReader fr = new FileReader(zkHomeDir + "/zoo.cfg", StandardCharsets.UTF_8)) { + p.load(fr); + } + p.setProperty("clientPort", String.valueOf(port)); + + zkServerEmbedded = + ZooKeeperServerEmbedded.builder().baseDir(Path.of(zkHomeDir)).configuration(p).build(); + zkServerEmbedded.start(); + log.info("Started embedded ZooKeeper server in quorum mode on port {}", port); } public static volatile Predicate testing_beforeRegisterInZk; @@ -327,8 +412,19 @@ public void close() { zkController.close(); } } finally { - if (zkServer != null) { - zkServer.stop(); + try { + if (zkServer != null) { + zkServer.stop(); + } + } finally { + if (zkServerEmbedded != null) { + try { + zkServerEmbedded.close(); + log.info("Closed embedded ZooKeeper server in quorum mode"); + } catch (Exception e) { + log.error("Error closing embedded ZooKeeper server", e); + } + } } } IOUtils.closeQuietly(toClose); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java b/solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java new file mode 100644 index 000000000000..92ab78efef2a --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.embedded.JettySolrRunner; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test embedded ZooKeeper running in quorum mode within Solr nodes. + * + *

This test verifies that: + * + *

    + *
  • Multiple Solr nodes can start with embedded ZK in quorum mode + *
  • The ZK quorum forms correctly + *
  • Collections can be created and used + *
  • Documents can be indexed and queried + *
  • All resources are properly closed on shutdown + *
+ */ +@SolrTestCaseJ4.SuppressSSL +public class TestEmbeddedZkQuorum extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String COLLECTION_NAME = "test_quorum_collection"; + private static final int NUM_NODES = 3; + + @BeforeClass + public static void setupCluster() throws Exception { + // Disable ZooKeeper JMX to avoid MBean registration conflicts during beasting + System.setProperty("zookeeper.jmx.log4j.disable", "true"); + + // Get path to a test config + Path configPath = TEST_PATH().resolve("collection1").resolve("conf"); + + // Configure cluster with 3 nodes, each running embedded ZK + cluster = + configureCluster(NUM_NODES).addConfig("conf1", configPath).withEmbeddedZkQuorum().build(); + cluster.waitForAllNodes(60); + } + + @Test + public void testBasicQuorumFunctionality() + throws IOException, InterruptedException, TimeoutException { + for (int i = 0; i < NUM_NODES; i++) { + JettySolrRunner node = cluster.getJettySolrRunner(i); + assertTrue("Node " + i + " should be running", node.isRunning()); + assertNotNull("Node " + i + " should have a NodeName", node.getNodeName()); + } + } + + @Test + public void testCollectionIndexing() throws Exception { + try (CloudSolrClient client = cluster.getSolrClient(COLLECTION_NAME)) { + CollectionAdminRequest.Create createCmd = + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1", 1, 3); + createCmd.process(client); + cluster.waitForActiveCollection(COLLECTION_NAME, 1, 3); + + // Index some documents + for (int i = 0; i < 10; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", i); + doc.addField("title_s", "Test Document " + i); + doc.addField("content_t", "This is test content for document " + i); + client.add(doc); + } + client.commit(); + + // Query the documents + SolrQuery query = new SolrQuery("*:*"); + query.setRows(100); + QueryResponse response = client.query(query); + SolrDocumentList results = response.getResults(); + + // Verify results + assertEquals("Should have 10 documents", 10, results.getNumFound()); + + CollectionAdminRequest.Delete deleteCmd = + CollectionAdminRequest.deleteCollection(COLLECTION_NAME); + deleteCmd.process(client); + } + } + + /** + * Tests ZK quorum resilience when a single node fails and recovers. + * + *

This test verifies that: + * + *

    + *
  • A 3-node ZK quorum can lose 1 node and maintain quorum (2/3) + *
  • The cluster continues to accept writes with 2 nodes + *
  • A failed node can rejoin the quorum using the same ports + *
  • All data is preserved after node recovery + *
+ * + *

This test creates its own private cluster to avoid interfering with other tests. + */ + @Test + public void testQuorumResilienceWithNodeFailure() throws Exception { + final String collectionName = "quorum_resilience"; + final int initialDocs = 5; + final int docsWhileDown = 5; + final int docsAfterRecovery = 5; + + // Create a private cluster for this test + Path configPath = TEST_PATH().resolve("collection1").resolve("conf"); + MiniSolrCloudCluster privateCluster = + configureCluster(NUM_NODES).addConfig("conf1", configPath).withEmbeddedZkQuorum().build(); + + try { + privateCluster.waitForAllNodes(60); + + // Create collection with replica on each node + CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 3) + .process(privateCluster.getSolrClient()); + privateCluster.waitForActiveCollection(collectionName, 1, 3); + + try (CloudSolrClient client = privateCluster.getSolrClient(collectionName)) { + // Index initial documents and verify + indexDocuments(client, 0, initialDocs, "initial"); + privateCluster.waitForDocCount( + collectionName, initialDocs, "initial documents", 120, TimeUnit.SECONDS); + + // Stop one node (quorum maintained with 2/3 nodes) + JettySolrRunner stoppedNode = privateCluster.getJettySolrRunner(2); + String stoppedNodeName = stoppedNode.getNodeName(); + if (log.isInfoEnabled()) { + log.info("Stopping node to test quorum resilience: {}", stoppedNodeName); + } + privateCluster.stopJettySolrRunner(stoppedNode); + + // Wait for ZK to detect node loss and verify cluster still operational + privateCluster.waitForLiveNodes(2, 120); + indexDocuments(client, initialDocs, docsWhileDown, "during_failure"); + privateCluster.waitForDocCount( + collectionName, + initialDocs + docsWhileDown, + "documents while node down", + 120, + TimeUnit.SECONDS); + if (log.isInfoEnabled()) { + log.info("Starting node {} again and testing functionality", stoppedNodeName); + } + + privateCluster.startJettySolrRunner(stoppedNode, true); + privateCluster.waitForNode(stoppedNode, 120); + + // Wait for cluster to stabilize and verify all nodes running + privateCluster.waitForLiveNodes(3, 120); + + // CRITICAL: Wait for collection to become active (replicas up, leader elected) + // before attempting to index documents + privateCluster.waitForActiveCollection(collectionName, 120, TimeUnit.SECONDS, 1, 3); + + privateCluster.waitForDocCount( + collectionName, + initialDocs + docsWhileDown, + "documents after recovery", + 120, + TimeUnit.SECONDS); + + // Verify full cluster functionality by adding more documents + indexDocuments(client, initialDocs + docsWhileDown, docsAfterRecovery, "after_recovery"); + privateCluster.waitForDocCount( + collectionName, + initialDocs + docsWhileDown + docsAfterRecovery, + "all documents", + 120, + TimeUnit.SECONDS); + } + } finally { + CollectionAdminRequest.deleteCollection(collectionName) + .process(privateCluster.getSolrClient()); + privateCluster.shutdown(); + } + } + + /** + * Tests ZK quorum loss and recovery when majority of nodes fail. + * + *

This test verifies that: + * + *

    + *
  • A 3-node ZK quorum loses quorum when 2 nodes are down (1/3 remaining) + *
  • The surviving node maintains its replica but cannot process updates without quorum + *
  • Both failed nodes can be restarted to restore quorum + *
  • The cluster becomes operational again (can query and index documents) + *
  • Note: After catastrophic failure, some replicas may need time or manual intervention to + * fully recover + *
+ * + *

This test creates its own private cluster to avoid interfering with other tests. Hard to + * make this test pass + */ + @AwaitsFix(bugUrl = "https://example.com/foo") + @Test + public void testQuorumLossAndRecovery() throws Exception { + final String collectionName = "quorum_loss"; + + // Create a private cluster for this test + Path configPath = TEST_PATH().resolve("collection1").resolve("conf"); + MiniSolrCloudCluster privateCluster = + configureCluster(NUM_NODES).addConfig("conf1", configPath).withEmbeddedZkQuorum().build(); + + try { + privateCluster.waitForAllNodes(60); + + // Create collection with 3 replicas (one on each node) to ensure at least + // one replica survives when we stop 2 nodes + CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 3) + .process(privateCluster.getSolrClient()); + privateCluster.waitForActiveCollection(collectionName, 1, 3); + + try (CloudSolrClient client = privateCluster.getSolrClient(collectionName)) { + indexDocuments(client, 0, 1, "before_loss"); + privateCluster.waitForDocCount( + collectionName, 1, "initial document", 120, TimeUnit.SECONDS); + + // Stop 2 out of 3 nodes to lose quorum + JettySolrRunner node1 = privateCluster.getJettySolrRunner(1); + JettySolrRunner node2 = privateCluster.getJettySolrRunner(2); + String node1Name = node1.getNodeName(); + String node2Name = node2.getNodeName(); + + if (log.isInfoEnabled()) { + log.info("Stopping 2 nodes to lose quorum: {}, {}", node1Name, node2Name); + } + privateCluster.stopJettySolrRunner(node1); + privateCluster.stopJettySolrRunner(node2); + + // Wait for ZK to detect quorum loss + privateCluster.waitForLiveNodes(1, 120); + + // Restart both nodes to restore quorum + if (log.isInfoEnabled()) { + log.info("Restarting nodes to restore quorum"); + } + privateCluster.startJettySolrRunner(node1, true); + privateCluster.startJettySolrRunner(node2, true); + + // Wait for both nodes to register with ZK (they should appear in live_nodes) + // but we don't require them to be fully recovered immediately + privateCluster.waitForNode(node1, 120); + privateCluster.waitForNode(node2, 120); + privateCluster.waitForLiveNodes(3, 120); + + // CRITICAL: Wait for collection to become active (replicas up, leader elected) + // After catastrophic failure, we need to ensure at least one replica is active + // before attempting operations + privateCluster.waitForActiveCollection(collectionName, 120, TimeUnit.SECONDS, 1, 1); + + // After catastrophic failure, the cluster should be operational with quorum restored + // even if not all replicas are immediately active + try { + privateCluster.waitForDocCount( + collectionName, 1, "document after recovery", 120, TimeUnit.SECONDS); + + // Verify cluster accepts writes + indexDocuments(client, 1, 1, "after_recovery"); + privateCluster.waitForDocCount( + collectionName, 2, "all documents after recovery", 120, TimeUnit.SECONDS); + + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Cluster failed to become operational after quorum restoration"); + } + throw e; + } + } + } finally { + // Clean up collection and cluster + CollectionAdminRequest.deleteCollection(collectionName) + .process(privateCluster.getSolrClient()); + privateCluster.shutdown(); + } + } + + // Helper methods for improved test clarity and reusability + + /** + * Index a batch of documents with a specific phase tag. + * + * @param client the CloudSolrClient to use + * @param startId starting document ID + * @param count number of documents to index + * @param phase phase tag to add to documents + */ + private void indexDocuments(CloudSolrClient client, int startId, int count, String phase) + throws Exception { + for (int i = 0; i < count; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", startId + i); + doc.addField("phase_s", phase); + doc.addField( + "content_t", String.format(Locale.ROOT, "Document %d in phase %s", startId + i, phase)); + client.add(doc); + } + client.commit(); + } +} diff --git a/solr/packaging/build.gradle b/solr/packaging/build.gradle index 0e260f6c9ee6..bf3f8db70c1d 100644 --- a/solr/packaging/build.gradle +++ b/solr/packaging/build.gradle @@ -303,6 +303,7 @@ task integrationTests(type: BatsTask) { environment SOLR2_PORT: solrPort + 1 environment SOLR3_PORT: solrPort + 2 environment ZK_PORT: solrPort + 1000 + environment SOLR_EXPORTER_PORT: solrPort + 100 environment SOLR_LOGS_DIR: "$solrHome/logs" environment TEST_OUTPUT_DIR: integrationTestOutput environment TEST_FAILURE_DIR: solrTestFailuresDir diff --git a/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java b/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java index 298b89d38da7..5ee1008cf936 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java @@ -88,6 +88,11 @@ public boolean reject(Thread t) { return true; } + // ZooKeeper quorum threads that persist after embedded ZK shutdown + if (threadName.startsWith("WorkerSender") || threadName.startsWith("WorkerReceiver")) { + return true; + } + if (threadName.startsWith("closeThreadPool")) { return true; } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java index ee8ced5a367d..45b7ffe2b909 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.PrintStream; import java.lang.invoke.MethodHandles; +import java.net.ServerSocket; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -56,11 +57,14 @@ import java.util.function.Consumer; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.apache.CloudLegacySolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.jetty.SSLConfig; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.ConfigSetAdminRequest; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Aliases; @@ -160,6 +164,7 @@ public class MiniSolrCloudCluster { private final JettyConfig jettyConfig; private final String solrXml; private final boolean trackJettyMetrics; + private final String zkHost; // ZK connection string (used in quorum mode when zkServer is null) private final AtomicInteger nodeIds = new AtomicInteger(); private final Map solrClientByCollection = new ConcurrentHashMap<>(); @@ -294,6 +299,7 @@ public MiniSolrCloudCluster( } } this.zkServer = zkTestServer; + this.zkHost = null; // Not used in standard mode try (SolrZkClient zkClient = new SolrZkClient.Builder() @@ -341,6 +347,274 @@ public MiniSolrCloudCluster( } } + /** + * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each Solr node runs its own + * embedded ZooKeeper server, and together they form a quorum. + * + * @param numServers number of Solr servers (must be at least 3 for quorum) + * @param baseDir base directory that the mini cluster should be run from + * @param solrXml solr.xml file content + * @param jettyConfig Jetty configuration + * @param securityJson Optional security.json configuration + * @param trackJettyMetrics whether to track Jetty metrics + * @throws Exception if there was an error starting the cluster + */ + MiniSolrCloudCluster( + int numServers, + Path baseDir, + String solrXml, + JettyConfig jettyConfig, + Optional securityJson, + boolean trackJettyMetrics, + boolean useEmbeddedZkQuorum) + throws Exception { + + if (!useEmbeddedZkQuorum) { + throw new IllegalArgumentException("This constructor is only for embedded ZK quorum mode"); + } + if (numServers < 3) { + throw new IllegalArgumentException( + "ZooKeeper quorum requires at least 3 nodes, got: " + numServers); + } + + Objects.requireNonNull(securityJson); + this.baseDir = Objects.requireNonNull(baseDir); + this.jettyConfig = Objects.requireNonNull(jettyConfig); + this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml; + this.trackJettyMetrics = trackJettyMetrics; + this.externalZkServer = true; // No ZkTestServer in quorum mode + this.zkServer = null; // No single ZK server + + log.info("Starting cluster of {} servers with embedded ZK quorum in {}", numServers, baseDir); + Files.createDirectories(baseDir); + + // Phase 1: Reserve random ports for all nodes + int[] ports = reservePortPairs(numServers); + + // Build the zkHost string with all ZK ports (Solr port + 1000) + StringBuilder zkHostBuilder = new StringBuilder(); + for (int i = 0; i < numServers; i++) { + if (i > 0) { + zkHostBuilder.append(","); + } + int zkPort = ports[i] + 1000; + zkHostBuilder.append("127.0.0.1:").append(zkPort); + } + this.zkHost = zkHostBuilder.toString(); // Save for later use + + if (log.isInfoEnabled()) { + log.info("Reserved ports for {} nodes: {}", numServers, java.util.Arrays.toString(ports)); + log.info("ZK connection string: {}", this.zkHost); + } + + // Set system properties for embedded ZK quorum mode + System.setProperty("solr.zookeeper.server.enabled", "true"); + System.setProperty("solr.security.manager.enabled", "false"); + System.setProperty("solr.node.roles", "data:on,overseer:allowed,zookeeper_quorum:on"); + System.setProperty("solr.test.sys.prop1", "propone"); + System.setProperty("solr.test.sys.prop2", "proptwo"); + System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes + + // Phase 2: Start all nodes in parallel + List> startups = new ArrayList<>(numServers); + for (int i = 0; i < numServers; i++) { + final int solrPort = ports[i]; + final String nodeName = newNodeName(); + startups.add( + () -> { + Path runnerPath = createInstancePath(nodeName); + Files.write(runnerPath.resolve("solr.xml"), solrXml.getBytes(StandardCharsets.UTF_8)); + + Properties nodeProps = new Properties(); + nodeProps.setProperty("zkHost", this.zkHost); + nodeProps.setProperty("hostPort", String.valueOf(solrPort)); + + JettyConfig newConfig = JettyConfig.builder(jettyConfig).setPort(solrPort).build(); + + JettySolrRunner jetty = + !trackJettyMetrics + ? new JettySolrRunner(runnerPath.toString(), nodeProps, newConfig) + : new JettySolrRunnerWithMetrics(runnerPath.toString(), nodeProps, newConfig); + + int zkPort = solrPort + 1000; + log.info("Starting {} on port {} with ZK on port {}", nodeName, solrPort, zkPort); + jetty.start(); + log.info("Node {} started successfully", nodeName); + + jettys.add(jetty); + synchronized (startupWait) { + startupWait.notifyAll(); + } + return jetty; + }); + } + + final ExecutorService executorLauncher = + ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("jetty-launcher")); + Collection> futures = executorLauncher.invokeAll(startups); + ExecutorUtil.shutdownAndAwaitTermination(executorLauncher); + Exception startupError = + checkForExceptions( + "Error starting up MiniSolrCloudCluster with embedded ZK quorum", futures); + if (startupError != null) { + try { + this.shutdown(); + } catch (Throwable t) { + startupError.addSuppressed(t); + } + throw startupError; + } + + log.info("All {} nodes started, waiting for quorum formation...", numServers); + Thread.sleep(10000); // Wait for ZK quorum to fully form + + // Initialize ZK paths and security (if provided) + try (SolrZkClient zkClient = + new SolrZkClient.Builder() + .withUrl(this.zkHost) + .withTimeout(60000, TimeUnit.MILLISECONDS) + .build()) { + if (!zkClient.exists("/solr")) { + zkClient.makePath("/solr", true); + } + + if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) { + zkClient.makePath( + "/solr" + ZkStateReader.CLUSTER_PROPS, + "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8), + true); + } + if (securityJson.isPresent()) { + zkClient.makePath( + "/solr/security.json", securityJson.get().getBytes(Charset.defaultCharset()), true); + } + } + + solrClient = buildSolrClientForQuorum(this.zkHost); + + if (numServers > 0) { + waitForAllNodes(numServers, 60); + } + + log.info("Embedded ZK quorum cluster started successfully with {} nodes", numServers); + } + + /** + * Reserves port pairs for embedded ZK quorum mode. For each node, we need both a Solr port and a + * ZK port (Solr port + 1000). This method ensures both ports in each pair are available before + * returning. + * + *

The method keeps all ServerSockets open during the search to prevent race conditions where + * another process might grab a port between our check and actual usage. + * + * @param numPairs the number of port pairs to reserve + * @return array of Solr ports (ZK ports are Solr port + 1000) + * @throws IOException if unable to find enough available port pairs + */ + private int[] reservePortPairs(int numPairs) throws IOException { + List solrSockets = new ArrayList<>(); + List zkSockets = new ArrayList<>(); + int[] ports = new int[numPairs]; + + try { + int pairsFound = 0; + int maxAttempts = numPairs * 100; // Reasonable limit to avoid infinite loops + int attempts = 0; + + while (pairsFound < numPairs && attempts < maxAttempts) { + attempts++; + ServerSocket solrSocket = null; + ServerSocket zkSocket = null; + + try { + // Try to get a random available port for Solr + solrSocket = new ServerSocket(0); + int solrPort = solrSocket.getLocalPort(); + int zkPort = solrPort + 1000; + + // Check if ZK port would exceed the valid port range (0-65535) + if (zkPort > 65535) { + solrSocket.close(); + continue; // Skip this port and try again + } + + // Verify the corresponding ZK port is also available + zkSocket = new ServerSocket(zkPort); + + // Both ports are available - keep the sockets and record the port + solrSockets.add(solrSocket); + zkSockets.add(zkSocket); + ports[pairsFound] = solrPort; + pairsFound++; + + if (log.isDebugEnabled()) { + log.debug( + "Reserved port pair {}/{}: Solr={}, ZK={}", pairsFound, numPairs, solrPort, zkPort); + } + + } catch (IOException | IllegalArgumentException e) { + // ZK port was not available or invalid, close sockets and try again + if (solrSocket != null) { + try { + solrSocket.close(); + } catch (IOException ignored) { + } + } + if (zkSocket != null) { + try { + zkSocket.close(); + } catch (IOException ignored) { + } + } + } + } + + if (pairsFound < numPairs) { + throw new IOException( + "Unable to find " + numPairs + " available port pairs after " + attempts + " attempts"); + } + return ports; + + } finally { + // Close all sockets now that we've recorded the ports + // The ports will remain available for immediate reuse + for (ServerSocket socket : solrSockets) { + try { + socket.close(); + } catch (IOException e) { + log.warn("Error closing Solr socket", e); + } + } + for (ServerSocket socket : zkSockets) { + try { + socket.close(); + } catch (IOException e) { + log.warn("Error closing ZK socket", e); + } + } + } + } + + /** + * Get the ZK connection string. Works for both standard mode (using zkServer) and quorum mode + * (using zkHost field). + * + * @return ZK connection string + */ + private String getZkAddress() { + if (zkHost != null) { + return zkHost; // Quorum mode + } + return zkServer.getZkAddress(); // Standard mode + } + + private CloudSolrClient buildSolrClientForQuorum(String zkHost) { + return new CloudLegacySolrClient.Builder(Collections.singletonList(zkHost), Optional.empty()) + .withSocketTimeout(90000, TimeUnit.MILLISECONDS) + .withConnectionTimeout(15000, TimeUnit.MILLISECONDS) + .build(); + } + private void waitForAllNodes(int numServers, int timeoutSeconds) throws InterruptedException, TimeoutException { log.info("waitForAllNodes: numServers={}", numServers); @@ -390,6 +664,87 @@ public void waitForNode(JettySolrRunner jetty, int timeoutSeconds) timeoutSeconds, TimeUnit.SECONDS, (o, n) -> n != null && n.contains(nodeName)); } + /** + * Wait for the expected number of live nodes in the cluster. + * + * @param expectedCount expected number of live nodes + * @param timeoutSeconds timeout in seconds + * @throws InterruptedException if interrupted while waiting + * @throws TimeoutException if the expected count is not reached within the timeout + */ + public void waitForLiveNodes(int expectedCount, int timeoutSeconds) + throws InterruptedException, TimeoutException { + TimeOut timeout = new TimeOut(timeoutSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + long runningNodes = jettys.stream().filter(JettySolrRunner::isRunning).count(); + if (runningNodes == expectedCount) { + log.info("Verified {} live nodes", runningNodes); + return; + } + Thread.sleep(200); + } + // Final check after timeout + long actualCount = jettys.stream().filter(JettySolrRunner::isRunning).count(); + throw new TimeoutException( + "Live node count mismatch: expected " + expectedCount + " but got " + actualCount); + } + + /** + * Wait for the document count in a collection to reach the expected value. + * + * @param collectionName name of the collection to check + * @param expectedCount expected number of documents + * @param description description for logging + * @param timeoutValue timeout value in seconds + * @param timeoutUnit timeout unit + * @throws InterruptedException if interrupted while waiting + * @throws TimeoutException if the expected count is not reached within the timeout + */ + public void waitForDocCount( + String collectionName, + long expectedCount, + String description, + int timeoutValue, + TimeUnit timeoutUnit) + throws InterruptedException, TimeoutException { + TimeOut timeout = new TimeOut(timeoutValue, timeoutUnit, TimeSource.NANO_TIME); + SolrClient client = getSolrClient(collectionName); + while (!timeout.hasTimedOut()) { + try { + QueryResponse response = client.query(new SolrQuery("*:*").setRows(0)); + long actualCount = response.getResults().getNumFound(); + if (actualCount == expectedCount) { + log.info("Verified {}: {} documents", description, actualCount); + return; + } + Thread.sleep(100); + } catch (Exception e) { + // Cluster might be temporarily unavailable during recovery + Thread.sleep(500); + } + } + // Final check after timeout + try { + QueryResponse response = client.query(new SolrQuery("*:*").setRows(0)); + long actualCount = response.getResults().getNumFound(); + throw new TimeoutException( + "Document count mismatch for: " + + description + + ". Expected " + + expectedCount + + " but got " + + actualCount); + } catch (Exception e) { + throw new TimeoutException( + "Document count check failed for: " + + description + + ". Expected " + + expectedCount + + " but query failed: " + + e.getMessage()); + } + } + /** * This method wait till all Solr JVMs ( Jettys ) are running . It waits up to the timeout (in * seconds) for the JVMs to be up before throwing IllegalStateException. This is called @@ -483,7 +838,7 @@ public JettySolrRunner getJettySolrRunner(int index) { public JettySolrRunner startJettySolrRunner(String name, JettyConfig config, String solrXml) throws Exception { final Properties nodeProps = new Properties(); - nodeProps.setProperty("zkHost", zkServer.getZkAddress()); + nodeProps.setProperty("zkHost", getZkAddress()); Path runnerPath = createInstancePath(name); if (solrXml == null) { @@ -573,7 +928,7 @@ public JettySolrRunner stopJettySolrRunner(JettySolrRunner jetty) throws Excepti public void uploadConfigSet(Path configDir, String configName) throws IOException { try (SolrZkClient zkClient = new SolrZkClient.Builder() - .withUrl(zkServer.getZkAddress()) + .withUrl(getZkAddress()) .withTimeout(AbstractZkTestCase.TIMEOUT, TimeUnit.MILLISECONDS) .withConnTimeOut(AbstractZkTestCase.TIMEOUT, TimeUnit.MILLISECONDS) .build()) { @@ -679,7 +1034,9 @@ public void shutdown() throws Exception { throw shutdownError; } } finally { - if (!externalZkServer) { + // Only shut down zkServer if it exists (not null) and we created it (!externalZkServer) + // In quorum mode, zkServer is null and each node's embedded ZK is shut down with the node + if (!externalZkServer && zkServer != null) { zkServer.shutdown(); } resetRecordingFlag(); @@ -707,7 +1064,7 @@ public CloudSolrClient getSolrClient(String collectionName) { k -> { CloudSolrClient solrClient = new CloudLegacySolrClient.Builder( - Collections.singletonList(zkServer.getZkAddress()), Optional.empty()) + Collections.singletonList(getZkAddress()), Optional.empty()) .withDefaultCollection(collectionName) .withSocketTimeout(90000) .withConnectionTimeout(15000) @@ -743,7 +1100,7 @@ public void zkSetData(String path, byte[] data) throws InterruptedException { protected CloudSolrClient buildSolrClient() { return new CloudLegacySolrClient.Builder( - Collections.singletonList(getZkServer().getZkAddress()), Optional.empty()) + Collections.singletonList(getZkAddress()), Optional.empty()) .withSocketTimeout(90000, TimeUnit.MILLISECONDS) .withConnectionTimeout(15000, TimeUnit.MILLISECONDS) .build(); // we choose 90 because we run in some harsh envs @@ -757,7 +1114,7 @@ protected CloudSolrClient buildSolrClient() { */ public CloudLegacySolrClient.Builder basicSolrClientBuilder() { return new CloudLegacySolrClient.Builder( - Collections.singletonList(getZkServer().getZkAddress()), Optional.empty()) + Collections.singletonList(getZkAddress()), Optional.empty()) .withSocketTimeout(90000) // we choose 90 because we run in some harsh envs .withConnectionTimeout(15000); } @@ -996,6 +1353,7 @@ public static class Builder { EnvUtils.getPropertyAsBool("solr.cloud.overseer.enabled", true); private boolean formatZkServer = true; private boolean disableTraceIdGeneration = false; + private boolean useEmbeddedZkQuorum = false; /** * Create a builder @@ -1114,6 +1472,27 @@ public Builder formatZkServer(boolean formatZkServer) { return this; } + /** + * Configure cluster to use embedded ZooKeeper quorum mode where each Solr node runs its own + * ZooKeeper server. + * + *

When enabled, instead of using a separate {@link ZkTestServer}, each Solr node will run an + * embedded ZooKeeper server, and together they form a quorum. This tests the embedded ZK quorum + * functionality. + * + *

Requires at least 3 nodes for a valid quorum. + * + * @return this Builder + */ + public Builder withEmbeddedZkQuorum() { + if (nodeCount < 3) { + throw new IllegalArgumentException( + "ZooKeeper quorum requires at least 3 nodes, got: " + nodeCount); + } + this.useEmbeddedZkQuorum = true; + return this; + } + /** * Configure and run the {@link MiniSolrCloudCluster} * @@ -1137,16 +1516,33 @@ public MiniSolrCloudCluster build() throws Exception { } JettyConfig jettyConfig = jettyConfigBuilder.build(); - MiniSolrCloudCluster cluster = - new MiniSolrCloudCluster( - nodeCount, - baseDir, - solrXml, - jettyConfig, - null, - securityJson, - trackJettyMetrics, - formatZkServer); + MiniSolrCloudCluster cluster; + + if (useEmbeddedZkQuorum) { + // Use embedded ZK quorum mode constructor + cluster = + new MiniSolrCloudCluster( + nodeCount, + baseDir, + solrXml, + jettyConfig, + securityJson, + trackJettyMetrics, + true); // useEmbeddedZkQuorum = true + } else { + // Use standard constructor with ZkTestServer + cluster = + new MiniSolrCloudCluster( + nodeCount, + baseDir, + solrXml, + jettyConfig, + null, + securityJson, + trackJettyMetrics, + formatZkServer); + } + for (Config config : configs) { cluster.uploadConfigSet(config.path, config.name); }