Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,15 @@ private void initTermOfLeaderSCM() {
* Always record the latest term that has seen.
*/
private void updateTermOfLeaderSCM(SCMCommand<?> command) {
updateTermOfLeaderSCM(command.getTerm());
}

public void updateTermOfLeaderSCM(final long newTerm) {
if (!termOfLeaderSCM.isPresent()) {
return;
}

final long currentTerm = termOfLeaderSCM.getAsLong();
final long newTerm = command.getTerm();
if (currentTerm < newTerm) {
setTermOfLeaderSCM(newTerm);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ private void processResponse(SCMHeartbeatResponseProto response,
Preconditions.checkState(response.getDatanodeUUID()
.equalsIgnoreCase(datanodeDetails.getUuid()),
"Unexpected datanode ID in the response.");
if (response.hasTerm()) {
context.updateTermOfLeaderSCM(response.getTerm());
}
// Verify the response is indeed for this datanode.
for (SCMCommandProto commandResponseProto : response.getCommandsList()) {
switch (commandResponseProto.getCommandType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.UUID;

import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand Down Expand Up @@ -107,6 +108,7 @@ public void handlesReconstructContainerCommand() throws Exception {

@Test
public void testheartbeatWithoutReports() throws Exception {
final long termInSCM = 42;
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
Expand All @@ -118,16 +120,25 @@ public void testheartbeatWithoutReports() throws Exception {
.setDatanodeUUID(
((SCMHeartbeatRequestProto)invocation.getArgument(0))
.getDatanodeDetails().getUuid())
.setTerm(termInSCM)
.build());

HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
context.setTermOfLeaderSCM(1);
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assertions.assertTrue(heartbeat.hasDatanodeDetails());
Assertions.assertFalse(heartbeat.hasNodeReport());
Assertions.assertFalse(heartbeat.hasContainerReport());
Assertions.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
Assertions.assertFalse(heartbeat.hasContainerActions());
OptionalLong termInDatanode = context.getTermOfLeaderSCM();
Assertions.assertTrue(termInDatanode.isPresent());
Assertions.assertEquals(termInSCM, termInDatanode.getAsLong());
}

@Test
Expand Down Expand Up @@ -314,22 +325,6 @@ public void testheartbeatWithAllReports() throws Exception {
}
}

/**
* Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
*
* @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
*
* @return HeartbeatEndpointTask
*/
private HeartbeatEndpointTask getHeartbeatEndpointTask(
StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
return getHeartbeatEndpointTask(conf, context, proxy);

}

/**
* Creates HeartbeatEndpointTask with the given conf, context and
* StorageContainerManager client side proxy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ message CommandQueueReportProto {
message SCMHeartbeatResponseProto {
required string datanodeUUID = 1;
repeated SCMCommandProto commands = 2;

// Same as term in SCMCommandProto
optional int64 term = 3;
}

message SCMNodeAddressList {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

Expand All @@ -48,6 +49,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
Expand Down Expand Up @@ -80,6 +82,7 @@
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.ratis.protocol.exceptions.NotLeaderException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -104,6 +107,7 @@
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -130,9 +134,12 @@ public class SCMDatanodeProtocolServer implements
private final EventPublisher eventPublisher;
private ProtocolMessageMetrics<ProtocolMessageEnum> protocolMessageMetrics;

private final SCMContext scmContext;

public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
OzoneStorageContainerManager scm,
EventPublisher eventPublisher)
EventPublisher eventPublisher,
SCMContext scmContext)
throws IOException {

// This constructor has broken down to smaller methods so that Recon's
Expand All @@ -142,6 +149,7 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf,

this.scm = scm;
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;

heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher(
scm.getScmNodeManager(), eventPublisher);
Expand Down Expand Up @@ -274,14 +282,19 @@ public SCMHeartbeatResponseProto sendHeartbeat(
for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
cmdResponses.add(getCommandResponse(cmd, scm));
}
final OptionalLong term = getTermIfLeader();
boolean auditSuccess = true;
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("datanodeUUID", heartbeat.getDatanodeDetails().getUuid());
auditMap.put("command", flatten(cmdResponses.toString()));
term.ifPresent(t -> auditMap.put("term", String.valueOf(t)));
try {
return SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
.addAllCommands(cmdResponses).build();
SCMHeartbeatResponseProto.Builder builder =
SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
.addAllCommands(cmdResponses);
term.ifPresent(builder::setTerm);
return builder.build();
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logWriteFailure(
Expand All @@ -297,6 +310,17 @@ public SCMHeartbeatResponseProto sendHeartbeat(
}
}

private OptionalLong getTermIfLeader() {
if (scmContext != null && scmContext.isLeader()) {
try {
return OptionalLong.of(scmContext.getTermOfLeader());
} catch (NotLeaderException e) {
// only leader should distribute current term
}
}
return OptionalLong.empty();
}

/**
* Returns a SCMCommandRepose from the SCM Command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ private StorageContainerManager(OzoneConfiguration conf,
scmAdmins = new OzoneAdmins(scmAdminUsernames, scmAdminGroups);

datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this,
eventQueue);
eventQueue, scmContext);
Copy link
Contributor

@szetszwo szetszwo May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adoroszlai , scmContext is null since it is not yet initialized. Found this problem when reviewing #4683 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it is not true since initializeSystemManagers will initialize it.

blockProtocolServer = new SCMBlockProtocolServer(conf, this);
clientProtocolServer = new SCMClientProtocolServer(conf, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ReconDatanodeProtocolServer(OzoneConfiguration conf,
OzoneStorageContainerManager scm,
EventPublisher eventPublisher)
throws IOException {
super(conf, scm, eventPublisher);
super(conf, scm, eventPublisher, null);
}

@Override
Expand Down