Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: ShardTerms can now induce a leader election if needed
type: other # added, changed, fixed, deprecated, removed, dependency_update, security, other
authors:
- name: Houston Putman
nick: HoustonPutman
links:
- name: SOLR-18080
url: https://issues.apache.org/jira/browse/SOLR-18080
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
Expand Down Expand Up @@ -52,15 +53,55 @@ public boolean onTermChanged(ShardTerms terms) {
if (solrCore.getCoreDescriptor() == null
|| solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (terms.haveHighestTermValue(coreNodeName)) return true;
if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
log.info(
"Start recovery on {} because core's term is less than leader's term", coreNodeName);
lastTermDoRecovery.set(terms.getTerm(coreNodeName));
solrCore
.getUpdateHandler()
.getSolrCoreState()
.doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());

// If we have the highest term, there is nothing to do
if (terms.haveHighestTermValue(coreNodeName)) {
return true;
}

long lastRecoveryTerm;
long newTerm;
synchronized (lastTermDoRecovery) {
lastRecoveryTerm = lastTermDoRecovery.get();
newTerm = terms.getTerm(coreNodeName);
if (lastRecoveryTerm < newTerm) {
lastTermDoRecovery.set(newTerm);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastTermDoRecovery is set here but its possible recovery is deferred below because of leader election now. Is that right? The old logic set it, then actually does recovery regardless. Reading this, seems like there is a possibility that lastTermDoRecovery is set to the new term but can skip actually doing recovery further down. So the term this was set to is incorrect based on the name if recovery is skipped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah lastTermDoRecovery might be a bad name, but after leader election, recovery is guaranteed for these replicas at this term value. So while recovery is not explicitly being done here, we know that leader election will do the recovery. So lastTermDoRecovery is still technically correct, just assuming the leader election succeeds.

}
}

if (coreDescriptor.getCloudDescriptor().isLeader()) {
log.warn(
"Removing {} leader as leader, since its term is no longer the highest. This will initiate recovery",
coreNodeName);
coreContainer.getZkController().giveupLeadership(coreDescriptor);
} else if (lastRecoveryTerm < newTerm) {
CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
Replica leaderReplica =
solrCore
.getCoreContainer()
.getZkController()
.getClusterState()
.getCollection(cloudDescriptor.getCollectionName())
.getSlice(cloudDescriptor.getShardId())
.getLeader();

// Only recover if the leader replica still has the highest term.
// If not, then the leader-election process will take care of recovery.
if (leaderReplica != null && terms.canBecomeLeader(leaderReplica.getName())) {
log.info(
"Start recovery on {} because core's term is less than leader's term", coreNodeName);
solrCore
.getUpdateHandler()
.getSolrCoreState()
.doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
} else {
if (log.isInfoEnabled()) {
log.info(
"Defer recovery on {} because leader-election will happen soon, old leader: {}",
coreNodeName,
leaderReplica == null ? null : leaderReplica.getName());
}
}
}
} catch (Exception e) {
if (log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,41 +205,13 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup
}
}

PeerSync.PeerSyncResult result = null;
boolean success = false;
try {
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
PeerSync.PeerSyncResult result =
syncStrategy.sync(zkController, core, leaderProps, weAreReplacement, true);
success = result.isSuccess();
} catch (Exception e) {
log.error("Exception while trying to sync", e);
result = PeerSync.PeerSyncResult.failure();
}

UpdateLog ulog = core.getUpdateHandler().getUpdateLog();

if (!success) {
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
}
}

if (!hasRecentUpdates) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway if no one else has any versions either
if (result.getOtherHasVersions().orElse(false)) {
log.info(
"We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
success = false;
} else {
log.info(
"We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
}
}

// solrcloud_debug
Expand All @@ -250,7 +222,7 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup
try {
if (log.isDebugEnabled()) {
log.debug(
"{} synched {}",
"{} synced {}",
core.getCoreContainer().getZkController().getNodeName(),
searcher.count(new MatchAllDocsQuery()));
}
Expand Down Expand Up @@ -507,12 +479,10 @@ private void rejoinLeaderElection(SolrCore core) throws InterruptedException, Ke
return;
}

log.info("There may be a better leader candidate than us - going back into recovery");
log.info("There may be a better leader candidate than us - rejoining the election");

cancelElection();

core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());

leaderElector.joinElection(this, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup
.getClusterState()
.getCollection(collection)
.getSlice(shardId)
.getReplicas()
.size()
.getNumLeaderReplicas()
< 2) {
Replica leader = zkStateReader.getLeader(collection, shardId);
if (leader != null
Expand Down
22 changes: 22 additions & 0 deletions solr/core/src/java/org/apache/solr/cloud/ShardTerms.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,28 @@ private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecove
return replicasNeedingRecovery.contains(key);
}

public ShardTerms setHighestTerms(Set<String> highestTermKeys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole "term" algorithm makes some pretty strict assumptions about who can update term values, and on what conditions. From class javadocs on ZkShardTerms:

 * <p>Terms can only updated in two strict ways:
 *
 * <ul>
 *   <li>A replica sets its term equals to leader's term
 *   <li>The leader increase its term and some other replicas by 1
 * </ul>

This method seems to fit under the latter provision, which is good. But could we add Javadocs here to indicate that this method should only be called by current shard-leaders? Or if this is safe for non-leaders to call in certain situations, add javadocs to describe what those are and why.

Just trying to defend against the possibility of someone coming back through here in a month or two and thinking: "Hey this doesn't fit with the documented algorithm at all"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this ultimately doesn't happen from the leader, and the leaders term is not guaranteed to be increased. If we see in our new test class, leader election can be triggered because of this new API.

I think the easiest thing to do here is insist that the collection is in a read-only state when this API is called. I'm not sure how I'll do that, but it will definitely guard against any issues with missing updates or anything like that.

long newMaxTerm = maxTerm + 1;
boolean keyFound = false;
HashMap<String, Long> newValues = new HashMap<>(values);
long nextHighestTerm = -1;
for (String key : values.keySet()) {
if (highestTermKeys.contains(key)) {
newValues.put(key, newMaxTerm);
keyFound = true;
} else {
nextHighestTerm = Math.max(nextHighestTerm, values.get(key));
}
}
// We only want to update if increasing the maxTerm makes an impact.
// If the nextHighestTerm is already < maxTerm, then upping the maxTerm doesn't do anything.
if (nextHighestTerm == maxTerm && keyFound) {
return new ShardTerms(newValues, version);
} else {
return null;
}
}

/**
* Return a new {@link ShardTerms} in which the highest terms are not zero
*
Expand Down
85 changes: 62 additions & 23 deletions solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
Expand All @@ -40,6 +41,7 @@
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,14 +81,15 @@ private static class ShardCoreRequest extends ShardRequest {

public PeerSync.PeerSyncResult sync(
ZkController zkController, SolrCore core, ZkNodeProps leaderProps) {
return sync(zkController, core, leaderProps, false);
return sync(zkController, core, leaderProps, false, false);
}

public PeerSync.PeerSyncResult sync(
ZkController zkController,
SolrCore core,
ZkNodeProps leaderProps,
boolean peerSyncOnlyWithActive) {
boolean peerSyncOnlyWithActive,
boolean ignoreNoVersionsFailure) {
if (SKIP_AUTO_RECOVERY) {
return PeerSync.PeerSyncResult.success();
}
Expand All @@ -102,19 +105,16 @@ public PeerSync.PeerSyncResult sync(
log.info("Sync replicas to {}", ZkCoreNodeProps.getCoreUrl(leaderProps));
}

if (core.getUpdateHandler().getUpdateLog() == null) {
log.error("No UpdateLog found - cannot sync");
return PeerSync.PeerSyncResult.failure();
}

return syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive);
return syncReplicas(
zkController, core, leaderProps, peerSyncOnlyWithActive, ignoreNoVersionsFailure);
}

private PeerSync.PeerSyncResult syncReplicas(
ZkController zkController,
SolrCore core,
ZkNodeProps leaderProps,
boolean peerSyncOnlyWithActive) {
boolean peerSyncOnlyWithActive,
boolean ignoreNoVersionsFailure) {
if (isClosed) {
log.info("We have been closed, won't sync with replicas");
return PeerSync.PeerSyncResult.failure();
Expand All @@ -129,9 +129,32 @@ private PeerSync.PeerSyncResult syncReplicas(

// first sync ourselves - we are the potential leader after all
try {
result =
syncWithReplicas(
zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive);
if (core.getUpdateHandler().getUpdateLog() == null) {
log.error("No UpdateLog found - cannot sync");
result = PeerSync.PeerSyncResult.failure();
} else {
result =
syncWithReplicas(
zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive);
}

if (!result.isSuccess() && ignoreNoVersionsFailure) {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
}
}
// we failed sync, but we have no versions - we can't sync in that case
// - we were active before, so continue if no one else has any versions either
if (!hasRecentUpdates && !result.getOtherHasVersions().orElse(false)) {
log.info(
"We failed sync, but we have no versions - we can't sync in that case - so continue");
result = PeerSync.PeerSyncResult.success();
}
}
success = result.isSuccess();
} catch (Exception e) {
log.error("Sync Failed", e);
Expand Down Expand Up @@ -173,7 +196,7 @@ private PeerSync.PeerSyncResult syncWithReplicas(
String shardId,
boolean peerSyncOnlyWithActive)
throws Exception {
List<ZkCoreNodeProps> nodes =
List<Replica> replicas =
zkController
.getZkStateReader()
.getReplicaProps(
Expand All @@ -186,13 +209,13 @@ private PeerSync.PeerSyncResult syncWithReplicas(
return PeerSync.PeerSyncResult.failure();
}

if (nodes == null) {
if (replicas == null) {
// I have no replicas
return PeerSync.PeerSyncResult.success();
}

List<String> syncWith = new ArrayList<>(nodes.size());
for (ZkCoreNodeProps node : nodes) {
List<String> syncWith = new ArrayList<>(replicas.size());
for (Replica node : replicas) {
syncWith.add(node.getCoreUrl());
}

Expand Down Expand Up @@ -230,32 +253,48 @@ private void syncToMe(

// sync everyone else
// TODO: we should do this in parallel at least
List<ZkCoreNodeProps> nodes =
List<Replica> replicas =
zkController
.getZkStateReader()
.getReplicaProps(collection, shardId, cd.getCloudDescriptor().getCoreNodeName());
if (nodes == null) {
if (replicas == null) {
if (log.isInfoEnabled()) {
log.info("{} has no replicas", ZkCoreNodeProps.getCoreUrl(leaderProps));
}
return;
}

ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
for (ZkCoreNodeProps node : nodes) {
ZkShardTerms shardTerms = zkController.getShardTerms(collection, shardId);
for (Replica replica : replicas) {
try {
if (shardTerms.registered(replica.getName())
&& !shardTerms.canBecomeLeader(replica.getName())) {
if (log.isInfoEnabled()) {
log.info(
"{}: do NOT ask {} to sync, as it is not of the same shardTerm. Issue a recovery instead.",
ZkCoreNodeProps.getCoreUrl(leaderProps),
replica.getCoreUrl());
}
RecoveryRequest rr = new RecoveryRequest();
rr.leaderProps = leaderProps;
rr.baseUrl = replica.getBaseUrl();
rr.coreName = replica.getCoreName();
recoveryRequests.add(rr);
continue;
}
if (log.isInfoEnabled()) {
log.info(
"{}: try and ask {} to sync",
ZkCoreNodeProps.getCoreUrl(leaderProps),
node.getCoreUrl());
replica.getCoreUrl());
}

requestSync(
node.getBaseUrl(),
node.getCoreUrl(),
replica.getBaseUrl(),
replica.getCoreUrl(),
zkLeader.getCoreUrl(),
node.getCoreName(),
replica.getCoreName(),
nUpdates);

} catch (Exception e) {
Expand Down
5 changes: 5 additions & 0 deletions solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecove
mutate(terms -> terms.increaseTerms(leader, replicasNeedingRecovery));
}

public void ensureHighestTerms(Set<String> mostUpToDateCores) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, re: my previous comment on ShardTerms.setHighestTerms. We should add some Javadocs to make sure this only called by leaders, or if it's actually safe to call elsewhere describe where and why.

if (mostUpToDateCores.isEmpty()) return;
mutate(terms -> terms.setHighestTerms(mostUpToDateCores));
}

public ShardTerms getShardTerms() {
return terms.get();
}
Expand Down
Loading
Loading