From b13d97e9bf8254377ab7ab0e249967b878752174 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 21 Jan 2026 12:10:59 -0800 Subject: [PATCH 01/10] SOLR-18080: Initiate Leader election for ShardTerms --- .../solr/cloud/RecoveringCoreTermWatcher.java | 59 ++++- .../cloud/ShardLeaderElectionContext.java | 4 +- .../org/apache/solr/cloud/ShardTerms.java | 22 ++ .../org/apache/solr/cloud/SyncStrategy.java | 39 +++- .../org/apache/solr/cloud/ZkShardTerms.java | 5 + .../org/apache/solr/handler/IndexFetcher.java | 2 +- .../solr/update/SolrCmdDistributor.java | 5 + .../DistributedZkUpdateProcessor.java | 22 +- .../org/apache/solr/cloud/ShardTermsTest.java | 24 +++ .../solr/cloud/ZkShardTermsRecoveryTest.java | 202 ++++++++++++++++++ .../apache/solr/cloud/ZkShardTermsTest.java | 33 +++ .../solr/common/cloud/ZkStateReader.java | 29 ++- .../solr/common/cloud/ZkCoreNodeProps.java | 4 + 13 files changed, 399 insertions(+), 51 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java index 40b340b283ce..515110b95866 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java @@ -19,6 +19,7 @@ import java.lang.invoke.MethodHandles; import java.util.concurrent.atomic.AtomicLong; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrCore; @@ -52,15 +53,55 @@ public boolean onTermChanged(ShardTerms terms) { if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true; String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); - if (terms.haveHighestTermValue(coreNodeName)) return true; - if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) { - log.info( - "Start recovery on {} because core's term is less than leader's term", coreNodeName); - lastTermDoRecovery.set(terms.getTerm(coreNodeName)); - solrCore - .getUpdateHandler() - .getSolrCoreState() - .doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor()); + + // If we have the highest term, there is nothing to do + if (terms.haveHighestTermValue(coreNodeName)) { + return true; + } + + long lastRecoveryTerm; + long newTerm; + synchronized (lastTermDoRecovery) { + lastRecoveryTerm = lastTermDoRecovery.get(); + newTerm = terms.getTerm(coreNodeName); + if (lastRecoveryTerm < newTerm) { + lastTermDoRecovery.set(newTerm); + } + } + + if (coreDescriptor.getCloudDescriptor().isLeader()) { + log.warn( + "Removing {} leader as leader, since its term is no longer the highest. This will initiate recovery", + coreNodeName); + coreContainer.getZkController().giveupLeadership(coreDescriptor); + } else if (lastRecoveryTerm < newTerm) { + CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor(); + Replica leaderReplica = + solrCore + .getCoreContainer() + .getZkController() + .getClusterState() + .getCollection(cloudDescriptor.getCollectionName()) + .getSlice(cloudDescriptor.getShardId()) + .getLeader(); + + // Only recover if the leader replica still has the highest term. + // If not, then the leader-election process will take care of recovery. + if (leaderReplica != null && terms.canBecomeLeader(leaderReplica.getName())) { + log.info( + "Start recovery on {} because core's term is less than leader's term", coreNodeName); + solrCore + .getUpdateHandler() + .getSolrCoreState() + .doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor()); + } else { + if (log.isInfoEnabled()) { + log.info( + "Defer recovery on {} because leader-election will happen soon, old leader: {}", + coreNodeName, + leaderReplica == null ? null : leaderReplica.getName()); + } + } } } catch (Exception e) { if (log.isInfoEnabled()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java index 16a29f89a586..5b956eac5cfe 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java @@ -507,12 +507,10 @@ private void rejoinLeaderElection(SolrCore core) throws InterruptedException, Ke return; } - log.info("There may be a better leader candidate than us - going back into recovery"); + log.info("There may be a better leader candidate than us - rejoining the election"); cancelElection(); - core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor()); - leaderElector.joinElection(this, true); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ShardTerms.java index 323342c83535..dcc687f237f6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ShardTerms.java @@ -124,6 +124,28 @@ private boolean skipIncreaseTermOf(String key, Set replicasNeedingRecove return replicasNeedingRecovery.contains(key); } + public ShardTerms setHighestTerms(Set highestTermKeys) { + long newMaxTerm = maxTerm + 1; + boolean keyFound = false; + HashMap newValues = new HashMap<>(values); + long nextHighestTerm = -1; + for (String key : values.keySet()) { + if (highestTermKeys.contains(key)) { + newValues.put(key, newMaxTerm); + keyFound = true; + } else { + nextHighestTerm = Math.max(nextHighestTerm, values.get(key)); + } + } + // We only want to update if increasing the maxTerm makes an impact. + // If the nextHighestTerm is already < maxTerm, then upping the maxTerm doesn't do anything. + if (nextHighestTerm == maxTerm && keyFound) { + return new ShardTerms(newValues, version); + } else { + return null; + } + } + /** * Return a new {@link ShardTerms} in which the highest terms are not zero * diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index 1b2789cb3a0e..c58bafd2602c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; @@ -173,7 +174,7 @@ private PeerSync.PeerSyncResult syncWithReplicas( String shardId, boolean peerSyncOnlyWithActive) throws Exception { - List nodes = + List replicas = zkController .getZkStateReader() .getReplicaProps( @@ -186,13 +187,13 @@ private PeerSync.PeerSyncResult syncWithReplicas( return PeerSync.PeerSyncResult.failure(); } - if (nodes == null) { + if (replicas == null) { // I have no replicas return PeerSync.PeerSyncResult.success(); } - List syncWith = new ArrayList<>(nodes.size()); - for (ZkCoreNodeProps node : nodes) { + List syncWith = new ArrayList<>(replicas.size()); + for (Replica node : replicas) { syncWith.add(node.getCoreUrl()); } @@ -230,11 +231,11 @@ private void syncToMe( // sync everyone else // TODO: we should do this in parallel at least - List nodes = + List replicas = zkController .getZkStateReader() .getReplicaProps(collection, shardId, cd.getCloudDescriptor().getCoreNodeName()); - if (nodes == null) { + if (replicas == null) { if (log.isInfoEnabled()) { log.info("{} has no replicas", ZkCoreNodeProps.getCoreUrl(leaderProps)); } @@ -242,20 +243,36 @@ private void syncToMe( } ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps); - for (ZkCoreNodeProps node : nodes) { + ZkShardTerms shardTerms = zkController.getShardTerms(collection, shardId); + for (Replica replica : replicas) { try { + if (shardTerms.registered(replica.getName()) + && !shardTerms.canBecomeLeader(replica.getName())) { + if (log.isInfoEnabled()) { + log.info( + "{}: do NOT ask {} to sync, as it is not of the same shardTerm. Issue a recovery instead.", + ZkCoreNodeProps.getCoreUrl(leaderProps), + replica.getCoreUrl()); + } + RecoveryRequest rr = new RecoveryRequest(); + rr.leaderProps = leaderProps; + rr.baseUrl = replica.getBaseUrl(); + rr.coreName = replica.getCoreName(); + recoveryRequests.add(rr); + continue; + } if (log.isInfoEnabled()) { log.info( "{}: try and ask {} to sync", ZkCoreNodeProps.getCoreUrl(leaderProps), - node.getCoreUrl()); + replica.getCoreUrl()); } requestSync( - node.getBaseUrl(), - node.getCoreUrl(), + replica.getBaseUrl(), + replica.getCoreUrl(), zkLeader.getCoreUrl(), - node.getCoreName(), + replica.getCoreName(), nUpdates); } catch (Exception e) { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index 6ec5b09afd41..ffc444dda134 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -120,6 +120,11 @@ public void ensureTermsIsHigher(String leader, Set replicasNeedingRecove mutate(terms -> terms.increaseTerms(leader, replicasNeedingRecovery)); } + public void ensureHighestTerms(Set mostUpToDateCores) { + if (mostUpToDateCores.isEmpty()) return; + mutate(terms -> terms.setHighestTerms(mostUpToDateCores)); + } + public ShardTerms getShardTerms() { return terms.get(); } diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index 704806028075..a5dc2985acdc 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -494,7 +494,7 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreRel // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no // side-car indexes) - IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit(); + IndexCommit commit = solrCore.getDeletionPolicy().getAndSaveLatestCommit(); if (commit == null) { // Presumably the IndexWriter hasn't been opened yet, and hence the deletion policy hasn't // been updated with commit points diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 839c8a9c6192..40c972aefcbb 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -39,6 +39,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.JavaBinResponseParser; import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; @@ -526,6 +527,10 @@ public StdNode(ZkCoreNodeProps nodeProps) { this(nodeProps, null, null, 0); } + public StdNode(Replica replica, String collection, String shardId) { + this(new ZkCoreNodeProps(replica), collection, shardId); + } + public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) { this(nodeProps, collection, shardId, 0); } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index 34532421bf56..b8febecd2856 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -546,7 +546,7 @@ protected void doDistribDeleteByQuery( Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, myShardId); // DBQ forwarded to NRT and TLOG replicas - List replicaProps = + List replicaProps = zkController .getZkStateReader() .getReplicaProps( @@ -558,8 +558,8 @@ protected void doDistribDeleteByQuery( EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); if (replicaProps != null) { final List myReplicas = new ArrayList<>(replicaProps.size()); - for (ZkCoreNodeProps replicaProp : replicaProps) { - myReplicas.add(new SolrCmdDistributor.StdNode(replicaProp, collection, myShardId)); + for (Replica replica : replicaProps) { + myReplicas.add(new SolrCmdDistributor.StdNode(replica, collection, myShardId)); } cmdDistrib.distribDelete( cmd, myReplicas, params, false, rollupReplicationTracker, leaderReplicationTracker); @@ -620,7 +620,7 @@ private List setupRequestForDBQ() { // TODO: what if we are no longer the leader? forwardToLeader = false; - List replicaProps = + List replicas = zkController .getZkStateReader() .getReplicaProps( @@ -630,10 +630,10 @@ private List setupRequestForDBQ() { null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); - if (replicaProps != null) { - nodes = new ArrayList<>(replicaProps.size()); - for (ZkCoreNodeProps props : replicaProps) { - nodes.add(new SolrCmdDistributor.StdNode(props, collection, shardId)); + if (replicas != null) { + nodes = new ArrayList<>(replicas.size()); + for (Replica replica : replicas) { + nodes.add(new SolrCmdDistributor.StdNode(replica, collection, shardId)); } } } catch (InterruptedException e) { @@ -1271,14 +1271,14 @@ protected void doDistribFinish() { getLeaderExc); } - List myReplicas = + List myReplicas = zkController .getZkStateReader() .getReplicaProps(collection, cloudDesc.getShardId(), cloudDesc.getCoreNodeName()); boolean foundErrorNodeInReplicaList = false; if (myReplicas != null) { - for (ZkCoreNodeProps replicaProp : myReplicas) { - if (((Replica) replicaProp.getNodeProps()) + for (Replica replica : myReplicas) { + if (replica .getName() .equals(((Replica) stdNode.getNodeProps().getNodeProps()).getName())) { foundErrorNodeInReplicaList = true; diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java index 586706a92fba..579285d7207d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.solr.SolrTestCase; import org.junit.Test; @@ -43,4 +44,27 @@ public void testIncreaseTerms() { assertEquals(2L, terms.getTerm("leader").longValue()); assertEquals(1L, terms.getTerm("dead-replica").longValue()); } + + @Test + public void testSetHighestTerms() { + Map map = new HashMap<>(); + map.put("leader", 0L); + ShardTerms terms = new ShardTerms(map, 0); + terms = terms.setHighestTerms(Set.of("leader")); + assertNull(terms); + + map.put("leader", 2L); + map.put("live-replica", 2L); + map.put("another-replica", 2L); + map.put("bad-replica", 2L); + map.put("dead-replica", 1L); + terms = new ShardTerms(map, 0); + + terms = terms.setHighestTerms(Set.of("live-replica", "another-replica")); + assertEquals(3L, terms.getTerm("live-replica").longValue()); + assertEquals(3L, terms.getTerm("another-replica").longValue()); + assertEquals(2L, terms.getTerm("leader").longValue()); + assertEquals(2L, terms.getTerm("bad-replica").longValue()); + assertEquals(1L, terms.getTerm("dead-replica").longValue()); + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java new file mode 100644 index 000000000000..4d78d28c7543 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import static org.hamcrest.Matchers.in; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.embedded.JettySolrRunner; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ZkShardTermsRecoveryTest extends SolrCloudTestCase { + private static final String COLLECTION = "collection1"; + private static final int NUM_SHARDS = 2; + private static final int NUM_REPLICAS = 5; + private static int NUM_DOCS = 0; + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(3).addConfig("conf", configset("cloud-minimal")).configure(); + assertEquals( + 0, + CollectionAdminRequest.createCollection(COLLECTION, "conf", NUM_SHARDS, NUM_REPLICAS) + .process(cluster.getSolrClient()) + .getStatus()); + cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2, NUM_SHARDS * NUM_REPLICAS); + + UpdateRequest up = new UpdateRequest(); + for (int i = 0; i < 200; i++) { + up.add("id", "id-" + i); + } + up.commit(cluster.getSolrClient(), COLLECTION); + NUM_DOCS += 200; + } + + @Before + public void waitForActiveState() { + cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2, NUM_SHARDS * NUM_REPLICAS); + } + + @Test + public void testShardTermsInducedReplication() throws Exception { + String shard = "shard2"; + if (random().nextBoolean()) { + // Add uncommitted documents, to test that part of the recovery + UpdateRequest up = new UpdateRequest(); + for (int i = 0; i < 1000; i++) { + up.add("id", "id2-" + i); + } + up.process(cluster.getSolrClient(), COLLECTION); + NUM_DOCS += 1000; + } + + DocCollection docCollection = cluster.getZkStateReader().getCollection(COLLECTION); + JettySolrRunner jetty = cluster.getRandomJetty(random()); + + Slice shard1 = docCollection.getSlice(shard); + Replica leader = shard1.getLeader(); + Replica replica = shard1.getReplicas(r -> !r.isLeader()).getFirst(); + List recoveryReplicas = + shard1.getReplicas(r -> r.getType().leaderEligible && (r != leader && r != replica)); + + ZkShardTerms shardTerms = + jetty.getCoreContainer().getZkController().getShardTerms(COLLECTION, shard); + // Increase the leader and another replica's shardTerms + shardTerms.ensureHighestTerms(Set.of(leader.getName(), replica.getName())); + + waitForState( + "Waiting for replicas to go into recovery", + COLLECTION, + 5, + TimeUnit.SECONDS, + state -> { + Slice shardState = state.getSlice(shard); + for (Replica r : recoveryReplicas) { + if (shardState.getReplica(r.name).getState() != Replica.State.RECOVERING) { + return false; + } + } + return true; + }); + shardTerms.refreshTerms(); + for (Replica r : recoveryReplicas) { + assertTrue(shardTerms.isRecovering(r.getName())); + } + + // Recovery should succeed relatively quickly + cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, 2, NUM_SHARDS * NUM_REPLICAS); + shardTerms.refreshTerms(); + long maxTerm = shardTerms.getHighestTerm(); + for (Replica r : recoveryReplicas) { + assertFalse(shardTerms.isRecovering(r.getName())); + assertEquals(maxTerm, shardTerms.getTerm(r.getName())); + } + + new UpdateRequest().commit(cluster.getSolrClient(), COLLECTION); + waitForNumDocsInAllReplicas(NUM_DOCS, shard1.getReplicas(), "*:*"); + } + + @Test + public void testShardTermsInducedLeaderElection() throws IOException, SolrServerException { + String shard = "shard1"; + if (random().nextBoolean()) { + // Add uncommitted documents, to test that part of the recovery + UpdateRequest up = new UpdateRequest(); + for (int i = 0; i < 1000; i++) { + up.add("id", "id3-" + i); + } + up.process(cluster.getSolrClient(), COLLECTION); + NUM_DOCS += 1000; + } + + DocCollection docCollection = cluster.getZkStateReader().getCollection(COLLECTION); + JettySolrRunner jetty = cluster.getRandomJetty(random()); + + // Increase the leader and another replica's shardTerms + Slice shard1 = docCollection.getSlice(shard); + Set replicasToSetHighest = + shard1.getReplicas(r -> !r.isLeader()).subList(1, 3).stream() + .map(Replica::getName) + .collect(Collectors.toSet()); + List recoveryReplicas = + shard1.getReplicas(r -> !replicasToSetHighest.contains(r.getName())); + ZkShardTerms shardTerms = + jetty.getCoreContainer().getZkController().getShardTerms(COLLECTION, shard); + shardTerms.ensureHighestTerms(replicasToSetHighest); + waitForState( + "Wait for leadership to be given up", COLLECTION, dc -> dc.getLeader(shard) == null); + waitForState( + "Waiting for replicas to go into recovery", + COLLECTION, + 5, + TimeUnit.SECONDS, + state -> { + Slice shardState = state.getSlice(shard); + for (Replica r : recoveryReplicas) { + if (shardState.getReplica(r.name).getState() != Replica.State.RECOVERING) { + return false; + } + } + return true; + }); + waitForState("Wait for leadership to be taken", COLLECTION, dc -> dc.getLeader(shard) != null); + cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, 2, NUM_SHARDS * NUM_REPLICAS); + // Make sure that a leader election took place + assertThat( + cluster.getZkStateReader().getCollection(COLLECTION).getLeader(shard).getName(), + in(replicasToSetHighest)); + shardTerms.refreshTerms(); + long maxTerm = shardTerms.getHighestTerm(); + for (Replica r : recoveryReplicas) { + assertFalse(shardTerms.isRecovering(r.getName())); + assertEquals(maxTerm, shardTerms.getTerm(r.getName())); + } + + new UpdateRequest().commit(cluster.getSolrClient(), COLLECTION); + waitForNumDocsInAllReplicas(NUM_DOCS, shard1.getReplicas(), "*:*"); + } + + private void waitForNumDocsInAllReplicas(int numDocs, Collection replicas, String query) + throws IOException, SolrServerException { + for (Replica r : replicas) { + if (!r.isActive(cluster.getSolrClient().getClusterState().getLiveNodes())) { + continue; + } + try (SolrClient replicaClient = getHttpSolrClient(r)) { + assertEquals( + "Replica " + r.getName() + " not up to date", + numDocs, + replicaClient.query(new SolrQuery(query)).getResults().getNumFound()); + } + } + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java index ead1a49e6ea6..99d365f230cb 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -329,6 +330,38 @@ public void testSetTermEqualsToLeader() throws InterruptedException { replicaTerms.close(); } + public void testSetHighestTerms() throws InterruptedException { + String collection = "setHighestTerms"; + ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); + ZkShardTerms replica1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); + ZkShardTerms replica2Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); + ZkShardTerms replica3Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient()); + leaderTerms.registerTerm("leader"); + replica1Terms.registerTerm("replica1"); + replica2Terms.registerTerm("replica2"); + replica3Terms.registerTerm("replica3"); + + leaderTerms.ensureHighestTerms(Set.of("leader", "replica1")); + waitFor(true, () -> leaderTerms.canBecomeLeader("leader")); + waitFor(true, () -> replica1Terms.canBecomeLeader("replica1")); + waitFor(false, () -> replica2Terms.canBecomeLeader("replica2")); + waitFor(false, () -> replica3Terms.canBecomeLeader("replica3")); + waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica1")); + waitFor(true, () -> leaderTerms.skipSendingUpdatesTo("replica2")); + waitFor(true, () -> leaderTerms.skipSendingUpdatesTo("replica3")); + + leaderTerms.ensureHighestTerms(Set.of("replica2", "replica3")); + waitFor(false, () -> leaderTerms.canBecomeLeader("leader")); + waitFor(false, () -> replica1Terms.canBecomeLeader("replica1")); + waitFor(true, () -> replica2Terms.canBecomeLeader("replica2")); + waitFor(true, () -> replica3Terms.canBecomeLeader("replica3")); + + leaderTerms.close(); + replica1Terms.close(); + replica2Terms.close(); + replica3Terms.close(); + } + private void waitFor(T expected, Supplier supplier) throws InterruptedException { TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource()); while (!timeOut.hasTimedOut()) { diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 2f60c7aad9b7..306fb3c44d7e 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -1047,8 +1047,7 @@ public static String getShardLeadersElectPath(String collection, String shardId) + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE) : ""); } - public List getReplicaProps( - String collection, String shardId, String thisCoreNodeName) { + public List getReplicaProps(String collection, String shardId, String thisCoreNodeName) { // TODO: It's odd that the default is to return replicas of type TLOG and NRT only return getReplicaProps( collection, @@ -1059,7 +1058,7 @@ public List getReplicaProps( EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)); } - public List getReplicaProps( + public List getReplicaProps( String collection, String shardId, String thisCoreNodeName, @@ -1078,39 +1077,37 @@ public List getReplicaProps( } Map slices = docCollection.getSlicesMap(); - Slice replicas = slices.get(shardId); - if (replicas == null) { + Slice shard = slices.get(shardId); + if (shard == null) { throw new ZooKeeperException( ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId); } - Map shardMap = replicas.getReplicasMap(); - List nodes = new ArrayList<>(shardMap.size()); + Map shardMap = shard.getReplicasMap(); + List replicas = new ArrayList<>(shardMap.size()); for (Entry entry : shardMap.entrySet().stream() .filter((e) -> acceptReplicaType.contains(e.getValue().getType())) .collect(Collectors.toList())) { - ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); + Replica replica = entry.getValue(); String coreNodeName = entry.getValue().getName(); - if (clusterState.liveNodesContain(nodeProps.getNodeName()) + if (clusterState.liveNodesContain(replica.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) { - if (mustMatchStateFilter == null - || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) { - if (mustNotMatchStateFilter == null - || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) { - nodes.add(nodeProps); + if (mustMatchStateFilter == null || mustMatchStateFilter == replica.getState()) { + if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != replica.getState()) { + replicas.add(replica); } } } } - if (nodes.size() == 0) { + if (replicas.size() == 0) { // no replicas return null; } - return nodes; + return replicas; } public SolrZkClient getZkClient() { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java index 34da8d239a02..788cb3866f17 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java @@ -47,6 +47,10 @@ public String getCoreName() { return nodeProps.getStr(ReplicaStateProps.CORE_NAME); } + public String getCoreNodeName() { + return nodeProps.getStr(ReplicaStateProps.CORE_NODE_NAME); + } + private static String getBaseUrl(ZkNodeProps nodeProps) { // if storing baseUrl in ZK is enabled, and it's stored, just use what's stored, i.e. no // self-healing here From 7522caa0affd61c306d944991c7cce131a058f38 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 21 Jan 2026 12:11:49 -0800 Subject: [PATCH 02/10] Add changelog entry --- .../solr-18080-shard-term-induce-leader-election.yml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 changelog/unreleased/solr-18080-shard-term-induce-leader-election.yml diff --git a/changelog/unreleased/solr-18080-shard-term-induce-leader-election.yml b/changelog/unreleased/solr-18080-shard-term-induce-leader-election.yml new file mode 100644 index 000000000000..49c3f8f4db3e --- /dev/null +++ b/changelog/unreleased/solr-18080-shard-term-induce-leader-election.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: ShardTerms can now induce a leader election if needed +type: other # added, changed, fixed, deprecated, removed, dependency_update, security, other +authors: + - name: Houston Putman + nick: HoustonPutman +links: + - name: SOLR-18080 + url: https://issues.apache.org/jira/browse/SOLR-18080 From 776c28b7d1580d3b831a57be592d3bb9ef3a7694 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 21 Jan 2026 12:44:52 -0800 Subject: [PATCH 03/10] Fix precommit issues --- .../test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java index 4d78d28c7543..4e337cdfb572 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java @@ -86,7 +86,8 @@ public void testShardTermsInducedReplication() throws Exception { Replica leader = shard1.getLeader(); Replica replica = shard1.getReplicas(r -> !r.isLeader()).getFirst(); List recoveryReplicas = - shard1.getReplicas(r -> r.getType().leaderEligible && (r != leader && r != replica)); + shard1.getReplicas( + r -> r.getType().leaderEligible && !(r.equals(leader) || r.equals(replica))); ZkShardTerms shardTerms = jetty.getCoreContainer().getZkController().getShardTerms(COLLECTION, shard); From e2b0e84a5b98671f06564c93ebda71e030bef6f7 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Fri, 23 Jan 2026 17:00:09 -0800 Subject: [PATCH 04/10] Fix for empty versions, still fails with uncommitted docs --- .../cloud/ShardLeaderElectionContext.java | 34 ++----------------- .../cloud/ShardLeaderElectionContextBase.java | 17 ++++++++-- .../org/apache/solr/cloud/SyncStrategy.java | 30 +++++++++++++--- .../handler/admin/RequestSyncShardOp.java | 4 +-- .../solr/cloud/ZkShardTermsRecoveryTest.java | 13 ++++--- 5 files changed, 52 insertions(+), 46 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java index 5b956eac5cfe..384d0436de20 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java @@ -205,41 +205,13 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup } } - PeerSync.PeerSyncResult result = null; boolean success = false; try { - result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement); + PeerSync.PeerSyncResult result = + syncStrategy.sync(zkController, core, leaderProps, weAreReplacement, true); success = result.isSuccess(); } catch (Exception e) { log.error("Exception while trying to sync", e); - result = PeerSync.PeerSyncResult.failure(); - } - - UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - - if (!success) { - boolean hasRecentUpdates = false; - if (ulog != null) { - // TODO: we could optimize this if necessary - try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { - hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty(); - } - } - - if (!hasRecentUpdates) { - // we failed sync, but we have no versions - we can't sync in that case - // - we were active - // before, so become leader anyway if no one else has any versions either - if (result.getOtherHasVersions().orElse(false)) { - log.info( - "We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader"); - success = false; - } else { - log.info( - "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway"); - success = true; - } - } } // solrcloud_debug @@ -250,7 +222,7 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup try { if (log.isDebugEnabled()) { log.debug( - "{} synched {}", + "{} synced {}", core.getCoreContainer().getZkController().getNodeName(), searcher.count(new MatchAllDocsQuery())); } diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java index b91733845954..d50dd58d10cd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java @@ -19,6 +19,8 @@ import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.framework.api.transaction.OperationType; import org.apache.solr.cloud.overseer.OverseerAction; @@ -182,8 +184,7 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup .getClusterState() .getCollection(collection) .getSlice(shardId) - .getReplicas() - .size() + .getNumLeaderReplicas() < 2) { Replica leader = zkStateReader.getLeader(collection, shardId); if (leader != null @@ -239,6 +240,18 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup prs) .persist(coll.getZNode(), zkStateReader.getZkClient()); } + try { + zkStateReader.waitForState( + collection, + 10, + TimeUnit.SECONDS, + state -> { + Replica leader = state.getLeader(shardId); + return leader != null && id.equals(leader.getName()); + }); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index c58bafd2602c..051d5c95b556 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -41,6 +41,7 @@ import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardResponse; import org.apache.solr.update.PeerSync; +import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateShardHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,14 +81,15 @@ private static class ShardCoreRequest extends ShardRequest { public PeerSync.PeerSyncResult sync( ZkController zkController, SolrCore core, ZkNodeProps leaderProps) { - return sync(zkController, core, leaderProps, false); + return sync(zkController, core, leaderProps, false, false); } public PeerSync.PeerSyncResult sync( ZkController zkController, SolrCore core, ZkNodeProps leaderProps, - boolean peerSyncOnlyWithActive) { + boolean peerSyncOnlyWithActive, + boolean ignoreNoVersionsFailure) { if (SKIP_AUTO_RECOVERY) { return PeerSync.PeerSyncResult.success(); } @@ -108,14 +110,16 @@ public PeerSync.PeerSyncResult sync( return PeerSync.PeerSyncResult.failure(); } - return syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive); + return syncReplicas( + zkController, core, leaderProps, peerSyncOnlyWithActive, ignoreNoVersionsFailure); } private PeerSync.PeerSyncResult syncReplicas( ZkController zkController, SolrCore core, ZkNodeProps leaderProps, - boolean peerSyncOnlyWithActive) { + boolean peerSyncOnlyWithActive, + boolean ignoreNoVersionsFailure) { if (isClosed) { log.info("We have been closed, won't sync with replicas"); return PeerSync.PeerSyncResult.failure(); @@ -133,6 +137,24 @@ private PeerSync.PeerSyncResult syncReplicas( result = syncWithReplicas( zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive); + + if (!result.isSuccess() && ignoreNoVersionsFailure) { + UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); + boolean hasRecentUpdates = false; + if (ulog != null) { + // TODO: we could optimize this if necessary + try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { + hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty(); + } + } + // we failed sync, but we have no versions - we can't sync in that case + // - we were active before, so continue if no one else has any versions either + if (!hasRecentUpdates && !result.getOtherHasVersions().orElse(false)) { + log.info( + "We failed sync, but we have no versions - we can't sync in that case - so continue"); + result = PeerSync.PeerSyncResult.success(); + } + } success = result.isSuccess(); } catch (Exception e) { log.error("Sync Failed", e); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java index d781834e3e79..815a3cc76246 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java @@ -65,7 +65,7 @@ public void execute(CallInfo it) throws Exception { zkController.getZkStateReader().getBaseUrlForNodeName(zkController.getNodeName())); boolean success = - syncStrategy.sync(zkController, core, new ZkNodeProps(props), true).isSuccess(); + syncStrategy.sync(zkController, core, new ZkNodeProps(props), true, false).isSuccess(); // solrcloud_debug if (log.isDebugEnabled()) { try { @@ -74,7 +74,7 @@ public void execute(CallInfo it) throws Exception { try { if (log.isDebugEnabled()) { log.debug( - "{} synched {}", + "{} synced {}", core.getCoreContainer().getZkController().getNodeName(), searcher.count(new MatchAllDocsQuery())); } diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java index 4e337cdfb572..c1d7bf2b74bf 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java @@ -52,13 +52,6 @@ public static void setupCluster() throws Exception { .process(cluster.getSolrClient()) .getStatus()); cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2, NUM_SHARDS * NUM_REPLICAS); - - UpdateRequest up = new UpdateRequest(); - for (int i = 0; i < 200; i++) { - up.add("id", "id-" + i); - } - up.commit(cluster.getSolrClient(), COLLECTION); - NUM_DOCS += 200; } @Before @@ -77,6 +70,9 @@ public void testShardTermsInducedReplication() throws Exception { } up.process(cluster.getSolrClient(), COLLECTION); NUM_DOCS += 1000; + if (random().nextBoolean()) { + cluster.getSolrClient().commit(COLLECTION); + } } DocCollection docCollection = cluster.getZkStateReader().getCollection(COLLECTION); @@ -137,6 +133,9 @@ public void testShardTermsInducedLeaderElection() throws IOException, SolrServer } up.process(cluster.getSolrClient(), COLLECTION); NUM_DOCS += 1000; + if (random().nextBoolean()) { + cluster.getSolrClient().commit(COLLECTION); + } } DocCollection docCollection = cluster.getZkStateReader().getCollection(COLLECTION); From 9f102cbf63658f53e451581190ba9a446e56faca Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jan 2026 16:01:15 -0800 Subject: [PATCH 05/10] Fix no uLog error case --- .../java/org/apache/solr/cloud/SyncStrategy.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index 051d5c95b556..289cd6e67007 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -105,11 +105,6 @@ public PeerSync.PeerSyncResult sync( log.info("Sync replicas to {}", ZkCoreNodeProps.getCoreUrl(leaderProps)); } - if (core.getUpdateHandler().getUpdateLog() == null) { - log.error("No UpdateLog found - cannot sync"); - return PeerSync.PeerSyncResult.failure(); - } - return syncReplicas( zkController, core, leaderProps, peerSyncOnlyWithActive, ignoreNoVersionsFailure); } @@ -134,9 +129,14 @@ private PeerSync.PeerSyncResult syncReplicas( // first sync ourselves - we are the potential leader after all try { - result = - syncWithReplicas( - zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive); + if (core.getUpdateHandler().getUpdateLog() == null) { + log.error("No UpdateLog found - cannot sync"); + result = PeerSync.PeerSyncResult.failure(); + } else { + result = + syncWithReplicas( + zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive); + } if (!result.isSuccess() && ignoreNoVersionsFailure) { UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); From d68acf30eb5d4381e45325173e4cfd5ce41a7a1c Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jan 2026 16:36:52 -0800 Subject: [PATCH 06/10] Fix test --- .../solr/opentelemetry/TestDistributedTracing.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/solr/modules/opentelemetry/src/test/org/apache/solr/opentelemetry/TestDistributedTracing.java b/solr/modules/opentelemetry/src/test/org/apache/solr/opentelemetry/TestDistributedTracing.java index 97916c007ca9..6391b8d7db9b 100644 --- a/solr/modules/opentelemetry/src/test/org/apache/solr/opentelemetry/TestDistributedTracing.java +++ b/solr/modules/opentelemetry/src/test/org/apache/solr/opentelemetry/TestDistributedTracing.java @@ -228,7 +228,15 @@ private void verifyCollectionCreation(String collection) throws Exception { // db.instance=testInternalCollectionApiCommands_shard2_replica_n1 // db.instance=testInternalCollectionApiCommands_shard1_replica_n6 // - // 7..8 (2 times) name=post:/{core}/get + // 7..8 (2 times) name=post:/{core}/get (FingerPrinting to get versions from non-leaders) + // db.instance=testInternalCollectionApiCommands_shard2_replica_n1 + // db.instance=testInternalCollectionApiCommands_shard1_replica_n6 + // + // 9..10 (2 times) name=post:/{core}/get (PeerSync request to non-leaders) + // db.instance=testInternalCollectionApiCommands_shard2_replica_n1 + // db.instance=testInternalCollectionApiCommands_shard1_replica_n6 + // + // 11..12 (2 times) name=post:/{core}/get (FingerPrinting to get versions from leaders PeerSync) // db.instance=testInternalCollectionApiCommands_shard2_replica_n4 // db.instance=testInternalCollectionApiCommands_shard1_replica_n2 @@ -238,7 +246,7 @@ private void verifyCollectionCreation(String collection) throws Exception { assertEquals("create:/admin/collections", s0.getName()); Map ops = new HashMap<>(); - assertEquals(7, finishedSpans.size()); + assertEquals(11, finishedSpans.size()); var parentTraceId = getRootTraceId(finishedSpans); for (var span : finishedSpans) { if (isRootSpan(span)) { @@ -251,7 +259,7 @@ private void verifyCollectionCreation(String collection) throws Exception { ops.put(span.getName(), ops.getOrDefault(span.getName(), 0) + 1); } var expectedOps = - Map.of("CreateCollectionCmd", 1, "post:/admin/cores", 4, "post:/{core}/get", 2); + Map.of("CreateCollectionCmd", 1, "post:/admin/cores", 4, "post:/{core}/get", 6); assertEquals(expectedOps, ops); } From b60d971e13536cb4b89994ff5ed84012cd3e0509 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 28 Jan 2026 12:42:50 -0800 Subject: [PATCH 07/10] Make some fixes, primarily indexFetcher checking files that are still open --- .../cloud/ShardLeaderElectionContextBase.java | 14 ---- .../org/apache/solr/handler/IndexFetcher.java | 83 ++++++++----------- solr/core/src/test-files/log4j2.xml | 2 +- .../solr/cloud/ZkShardTermsRecoveryTest.java | 4 +- 4 files changed, 39 insertions(+), 64 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java index d50dd58d10cd..4ed392b3a852 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java @@ -19,8 +19,6 @@ import java.lang.invoke.MethodHandles; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.framework.api.transaction.OperationType; import org.apache.solr.cloud.overseer.OverseerAction; @@ -240,18 +238,6 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup prs) .persist(coll.getZNode(), zkStateReader.getZkClient()); } - try { - zkStateReader.waitForState( - collection, - 10, - TimeUnit.SECONDS, - state -> { - Replica leader = state.getLeader(shardId); - return leader != null && id.equals(leader.getName()); - }); - } catch (TimeoutException e) { - throw new RuntimeException(e); - } } } diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index a5dc2985acdc..265928edb616 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -494,7 +494,7 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreRel // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no // side-car indexes) - IndexCommit commit = solrCore.getDeletionPolicy().getAndSaveLatestCommit(); + IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit(); if (commit == null) { // Presumably the IndexWriter hasn't been opened yet, and hence the deletion policy hasn't // been updated with commit points @@ -1300,11 +1300,11 @@ protected static CompareResult compareFile( Directory indexDir, String filename, Long backupIndexFileLen, Long backupIndexFileChecksum) { CompareResult compareResult = new CompareResult(); try { - try (final IndexInput indexInput = indexDir.openInput(filename, IOContext.READONCE)) { - long indexFileLen = indexInput.length(); - long indexFileChecksum = 0; - - if (backupIndexFileChecksum != null) { + long indexFileLen; + long indexFileChecksum = 0; + if (backupIndexFileChecksum != null) { + try (final IndexInput indexInput = indexDir.openInput(filename, IOContext.READONCE)) { + indexFileLen = indexInput.length(); try { indexFileChecksum = CodecUtil.retrieveChecksum(indexInput); compareResult.checkSummed = true; @@ -1312,47 +1312,49 @@ protected static CompareResult compareFile( log.warn("Could not retrieve checksum from file.", e); } } + } else { + indexFileLen = indexDir.fileLength(filename); + } - if (!compareResult.checkSummed) { - // we don't have checksums to compare - - if (indexFileLen == backupIndexFileLen) { - compareResult.equal = true; - return compareResult; - } else { - log.info( - "File {} did not match. expected length is {} and actual length is {}", - filename, - backupIndexFileLen, - indexFileLen); - compareResult.equal = false; - return compareResult; - } - } - - // we have checksums to compare + if (!compareResult.checkSummed) { + // we don't have checksums to compare - if (indexFileLen == backupIndexFileLen && indexFileChecksum == backupIndexFileChecksum) { + if (indexFileLen == backupIndexFileLen) { compareResult.equal = true; return compareResult; } else { - log.warn( - "File {} did not match. expected checksum is {} and actual is checksum {}. " - + "expected length is {} and actual length is {}", + log.info( + "File {} did not match. expected length is {} and actual length is {}", filename, - backupIndexFileChecksum, - indexFileChecksum, backupIndexFileLen, indexFileLen); compareResult.equal = false; return compareResult; } } + + // we have checksums to compare + + if (indexFileLen == backupIndexFileLen && indexFileChecksum == backupIndexFileChecksum) { + compareResult.equal = true; + return compareResult; + } else { + log.warn( + "File {} did not match. expected checksum is {} and actual is checksum {}. " + + "expected length is {} and actual length is {}", + filename, + backupIndexFileChecksum, + indexFileChecksum, + backupIndexFileLen, + indexFileLen); + compareResult.equal = false; + return compareResult; + } } catch (NoSuchFileException | FileNotFoundException e) { compareResult.equal = false; return compareResult; } catch (IOException e) { - log.error("Could not read file {}. Downloading it again", filename, e); + log.error("Could not read file {}. Assuming it is out of date", filename, e); compareResult.equal = false; return compareResult; } @@ -1383,22 +1385,9 @@ private boolean isIndexStale(Directory dir) throws IOException { String filename = (String) file.get(NAME); Long length = (Long) file.get(SIZE); Long checksum = (Long) file.get(CHECKSUM); - if (slowFileExists(dir, filename)) { - if (checksum != null) { - if (!(compareFile(dir, filename, length, checksum).equal)) { - // file exists and size or checksum is different, therefore we must download it again - return true; - } - } else { - if (length != dir.fileLength(filename)) { - log.warn( - "File {} did not match. expected length is {} and actual length is {}", - filename, - length, - dir.fileLength(filename)); - return true; - } - } + if (!(compareFile(dir, filename, length, checksum).equal)) { + // file exists and size or checksum is different, therefore we must download it again + return true; } } return false; diff --git a/solr/core/src/test-files/log4j2.xml b/solr/core/src/test-files/log4j2.xml index 9078571bce0d..d42c102b1ac4 100644 --- a/solr/core/src/test-files/log4j2.xml +++ b/solr/core/src/test-files/log4j2.xml @@ -22,7 +22,7 @@ %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{ - =>%ex{short}}}{10240}%n + =>%ex}}{10240}%n diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java index c1d7bf2b74bf..4fb5bbfda56c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java @@ -63,7 +63,7 @@ public void waitForActiveState() { public void testShardTermsInducedReplication() throws Exception { String shard = "shard2"; if (random().nextBoolean()) { - // Add uncommitted documents, to test that part of the recovery + // Add uncommitted/committed documents, to test that part of the recovery UpdateRequest up = new UpdateRequest(); for (int i = 0; i < 1000; i++) { up.add("id", "id2-" + i); @@ -126,7 +126,7 @@ public void testShardTermsInducedReplication() throws Exception { public void testShardTermsInducedLeaderElection() throws IOException, SolrServerException { String shard = "shard1"; if (random().nextBoolean()) { - // Add uncommitted documents, to test that part of the recovery + // Add uncommitted/committed documents, to test that part of the recovery UpdateRequest up = new UpdateRequest(); for (int i = 0; i < 1000; i++) { up.add("id", "id3-" + i); From 94463123788150dd6dbeeba4d2950d4cfe3bbc1d Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Thu, 29 Jan 2026 17:08:18 -0800 Subject: [PATCH 08/10] Undo logging change --- solr/core/src/test-files/log4j2.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/test-files/log4j2.xml b/solr/core/src/test-files/log4j2.xml index d42c102b1ac4..9078571bce0d 100644 --- a/solr/core/src/test-files/log4j2.xml +++ b/solr/core/src/test-files/log4j2.xml @@ -22,7 +22,7 @@ %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{ - =>%ex}}{10240}%n + =>%ex{short}}}{10240}%n From 34cba84e4bad063fb20349cb68668be86a1e8fa2 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Thu, 29 Jan 2026 18:03:51 -0800 Subject: [PATCH 09/10] Make test resilient to possible different recovery speeds --- .../solr/cloud/ZkShardTermsRecoveryTest.java | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java index 4fb5bbfda56c..40ef1a133fca 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java @@ -78,36 +78,22 @@ public void testShardTermsInducedReplication() throws Exception { DocCollection docCollection = cluster.getZkStateReader().getCollection(COLLECTION); JettySolrRunner jetty = cluster.getRandomJetty(random()); - Slice shard1 = docCollection.getSlice(shard); - Replica leader = shard1.getLeader(); - Replica replica = shard1.getReplicas(r -> !r.isLeader()).getFirst(); + Slice shard2 = docCollection.getSlice(shard); + Replica leader = shard2.getLeader(); + Replica replica = shard2.getReplicas(r -> !r.isLeader()).getFirst(); List recoveryReplicas = - shard1.getReplicas( + shard2.getReplicas( r -> r.getType().leaderEligible && !(r.equals(leader) || r.equals(replica))); ZkShardTerms shardTerms = jetty.getCoreContainer().getZkController().getShardTerms(COLLECTION, shard); // Increase the leader and another replica's shardTerms shardTerms.ensureHighestTerms(Set.of(leader.getName(), replica.getName())); - - waitForState( - "Waiting for replicas to go into recovery", - COLLECTION, - 5, - TimeUnit.SECONDS, - state -> { - Slice shardState = state.getSlice(shard); - for (Replica r : recoveryReplicas) { - if (shardState.getReplica(r.name).getState() != Replica.State.RECOVERING) { - return false; - } - } - return true; - }); - shardTerms.refreshTerms(); + ShardTerms shardTermsSnapshot = shardTerms.getShardTerms(); for (Replica r : recoveryReplicas) { - assertTrue(shardTerms.isRecovering(r.getName())); + assertFalse(shardTermsSnapshot.canBecomeLeader(r.getName())); } + waitForReplicasToGoIntoRecovery(shard, recoveryReplicas); // Recovery should succeed relatively quickly cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, 2, NUM_SHARDS * NUM_REPLICAS); @@ -119,7 +105,7 @@ public void testShardTermsInducedReplication() throws Exception { } new UpdateRequest().commit(cluster.getSolrClient(), COLLECTION); - waitForNumDocsInAllReplicas(NUM_DOCS, shard1.getReplicas(), "*:*"); + waitForNumDocsInAllReplicas(NUM_DOCS, shard2.getReplicas(), "*:*"); } @Test @@ -152,23 +138,15 @@ public void testShardTermsInducedLeaderElection() throws IOException, SolrServer ZkShardTerms shardTerms = jetty.getCoreContainer().getZkController().getShardTerms(COLLECTION, shard); shardTerms.ensureHighestTerms(replicasToSetHighest); + ShardTerms shardTermsSnapshot = shardTerms.getShardTerms(); + for (Replica r : recoveryReplicas) { + assertFalse(shardTermsSnapshot.canBecomeLeader(r.getName())); + } + waitForState( "Wait for leadership to be given up", COLLECTION, dc -> dc.getLeader(shard) == null); - waitForState( - "Waiting for replicas to go into recovery", - COLLECTION, - 5, - TimeUnit.SECONDS, - state -> { - Slice shardState = state.getSlice(shard); - for (Replica r : recoveryReplicas) { - if (shardState.getReplica(r.name).getState() != Replica.State.RECOVERING) { - return false; - } - } - return true; - }); waitForState("Wait for leadership to be taken", COLLECTION, dc -> dc.getLeader(shard) != null); + waitForReplicasToGoIntoRecovery(shard, recoveryReplicas); cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, 2, NUM_SHARDS * NUM_REPLICAS); // Make sure that a leader election took place assertThat( @@ -199,4 +177,24 @@ private void waitForNumDocsInAllReplicas(int numDocs, Collection replic } } } + + private void waitForReplicasToGoIntoRecovery(String shard, Collection recoveryReplicas) { + Set remainingReplicasToRecover = + recoveryReplicas.stream().map(Replica::getName).collect(Collectors.toSet()); + + waitForState( + "Waiting for replicas to go into recovery", + COLLECTION, + 5, + TimeUnit.SECONDS, + state -> { + Slice shardState = state.getSlice(shard); + for (Replica r : recoveryReplicas) { + if (shardState.getReplica(r.getName()).getState() == Replica.State.RECOVERING) { + remainingReplicasToRecover.remove(r.getName()); + } + } + return remainingReplicasToRecover.isEmpty(); + }); + } } From 68475bc43172ad3c3dfd8a672cb347450ed7c2fb Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Thu, 29 Jan 2026 18:18:21 -0800 Subject: [PATCH 10/10] Tidy --- .../test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java index 40ef1a133fca..21b1577c76c9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java @@ -142,7 +142,7 @@ public void testShardTermsInducedLeaderElection() throws IOException, SolrServer for (Replica r : recoveryReplicas) { assertFalse(shardTermsSnapshot.canBecomeLeader(r.getName())); } - + waitForState( "Wait for leadership to be given up", COLLECTION, dc -> dc.getLeader(shard) == null); waitForState("Wait for leadership to be taken", COLLECTION, dc -> dc.getLeader(shard) != null);