Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class SCMRatisServerImpl implements SCMRatisServer {
final StorageContainerManager scm, final SCMHADBTransactionBuffer buffer)
throws IOException {
this.scm = scm;
this.stateMachine = new SCMStateMachine(scm, this, buffer);
final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
LOG.info("starting Raft server for scm:{}", scm.getScmId());
// During SCM startup, the bootstrapped node will be started just with
Expand All @@ -100,9 +99,13 @@ public class SCMRatisServerImpl implements SCMRatisServer {
Parameters parameters = createSCMServerTlsParameters(grpcTlsConfig);

this.server = newRaftServer(scm.getScmId(), conf)
.setStateMachine(stateMachine)
.setStateMachineRegistry((gId) -> new SCMStateMachine(scm, buffer))
.setGroup(RaftGroup.valueOf(groupId))
.setParameters(parameters).build();

this.stateMachine =
(SCMStateMachine) server.getDivision(groupId).getStateMachine();

this.division = server.getDivision(groupId);
}

Expand All @@ -111,7 +114,9 @@ public static void initialize(String clusterId, String scmId,
final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
RaftServer server = null;
try {
server = newRaftServer(scmId, conf).setGroup(group).build();
server = newRaftServer(scmId, conf).setGroup(group)
.setStateMachineRegistry((groupId -> new SCMStateMachine()))
.build();
server.start();
waitForLeaderToBeReady(server, conf, group);
} finally {
Expand Down Expand Up @@ -158,8 +163,7 @@ private static RaftServer.Builder newRaftServer(final String scmId,
final RaftProperties serverProperties =
RatisUtil.newRaftProperties(haConf, conf);
return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
.setProperties(serverProperties)
.setStateMachine(new SCMStateMachine());
.setProperties(serverProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ratis.proto.RaftProtos;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupId;
Expand All @@ -59,8 +58,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.SCM_NOT_INITIALIZED;

/**
* The SCMStateMachine is the state machine for SCMRatisServer. It is
* responsible for applying ratis committed transactions to
Expand All @@ -84,20 +81,16 @@ public class SCMStateMachine extends BaseStateMachine {
private DBCheckpoint installingDBCheckpoint = null;

public SCMStateMachine(final StorageContainerManager scm,
final SCMRatisServer ratisServer, SCMHADBTransactionBuffer buffer)
throws SCMException {
SCMHADBTransactionBuffer buffer) {
this.scm = scm;
this.handlers = new EnumMap<>(RequestType.class);
this.transactionBuffer = buffer;
TransactionInfo latestTrxInfo = this.transactionBuffer.getLatestTrxInfo();
if (!latestTrxInfo.isDefault() &&
!updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
latestTrxInfo.getTransactionIndex())) {
throw new SCMException(
String.format("Failed to update LastAppliedTermIndex " +
"in StateMachine to term:{} index:{}",
latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex()
), SCM_NOT_INITIALIZED);
if (!latestTrxInfo.isDefault()) {
updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
latestTrxInfo.getTransactionIndex());
LOG.info("Updated lastAppliedTermIndex {} with transactionInfo term and" +
"Index", latestTrxInfo);
}
this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
isInitialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,19 +946,36 @@ public static boolean scmInit(OzoneConfiguration conf,
scmStorageConfig.setClusterId(clusterId);
}

if (SCMHAUtils.isSCMHAEnabled(conf)) {
SCMRatisServerImpl.initialize(scmStorageConfig.getClusterID(),
scmStorageConfig.getScmId(), haDetails.getLocalNodeDetails(),
conf);
scmStorageConfig.setSCMHAFlag(true);
}

if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
HASecurityUtils.initializeSecurity(scmStorageConfig, conf,
getScmAddress(haDetails, conf), true);
}

// Ensure scmRatisServer#initialize() is called post scm storage
// config initialization.. If SCM version file is created,
// the subsequent scm init should use the clusterID from version file.
// So, scmStorageConfig#initialize() should happen before ratis server
// initialize. In this way,we do not leave ratis storage directory
// with multiple raft group directories in failure scenario.

// The order of init should be
// 1. SCM storage config initialize to create version file.
// 2. Initialize Ratis server.

scmStorageConfig.setPrimaryScmNodeId(scmStorageConfig.getScmId());
scmStorageConfig.initialize();

if (SCMHAUtils.isSCMHAEnabled(conf)) {
SCMRatisServerImpl.initialize(scmStorageConfig.getClusterID(),
scmStorageConfig.getScmId(), haDetails.getLocalNodeDetails(),
conf);
scmStorageConfig = new SCMStorageConfig(conf);
scmStorageConfig.setSCMHAFlag(true);
// Do force initialize to persist SCM_HA flag.
scmStorageConfig.forceInitialize();
}

LOG.info("SCM initialization succeeded. Current cluster id for sd={}"
+ "; cid={}; layoutVersion={}; scmId={}",
scmStorageConfig.getStorageDir(), scmStorageConfig.getClusterID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,31 @@ public void testSCMReinitialization() throws Exception {
}
}


@Test
public void testSCMReinitializationWithHAUpgrade() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
final String path = GenericTestUtils.getTempPath(
UUID.randomUUID().toString());
Path scmPath = Paths.get(path, "scm-meta");
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
//This will set the cluster id in the version file
final UUID clusterId = UUID.randomUUID();
// This will initialize SCM

StorageContainerManager.scmInit(conf, clusterId.toString());
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
Assert.assertEquals(clusterId.toString(), scmStore.getClusterID());
Assert.assertFalse(scmStore.isSCMHAEnabled());

conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
StorageContainerManager.scmInit(conf, clusterId.toString());
scmStore = new SCMStorageConfig(conf);
Assert.assertTrue(scmStore.isSCMHAEnabled());
validateRatisGroupExists(conf, clusterId.toString());

}

@VisibleForTesting
public static void validateRatisGroupExists(OzoneConfiguration conf,
String clusterId) throws IOException {
Expand Down