From aed1272b023239ed0d1c80ba86187f972290af70 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Fri, 7 May 2021 16:12:00 +0800 Subject: [PATCH 01/10] HDDS-5111. DataNode should not always report full information in heartbeat --- .../common/report/ReportPublisher.java | 8 +- .../common/statemachine/StateContext.java | 114 ++++++++++++------ .../endpoint/HeartbeatEndpointTask.java | 5 +- .../server/ratis/XceiverServerRatis.java | 4 +- .../container/ozoneimpl/OzoneContainer.java | 2 +- .../common/report/TestReportPublisher.java | 3 +- .../common/statemachine/TestStateContext.java | 111 ++++++----------- .../endpoint/TestHeartbeatEndpointTask.java | 14 ++- .../commandhandler/TestBlockDeletion.java | 3 +- 9 files changed, 137 insertions(+), 127 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java index 5d181eca10d7..a08fc281eb96 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -80,7 +81,12 @@ public void run() { */ private void publishReport() { try { - context.addReport(getReport()); + GeneratedMessage report = getReport(); + if (report instanceof CommandStatusReportsProto) { + context.addIncrementalReport(report); + } else { + context.refreshFullReport(report); + } } catch (IOException e) { LOG.error("Exception while publishing report.", e); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 3051638a0796..752715713d5f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -40,7 +41,6 @@ import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; import com.google.protobuf.Descriptors.Descriptor; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; @@ -62,10 +62,11 @@ import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import static java.lang.Math.min; -import org.apache.commons.collections.CollectionUtils; - import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval; import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval; + +import org.apache.commons.collections.CollectionUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,10 +90,6 @@ public class StateContext { @VisibleForTesting static final String INCREMENTAL_CONTAINER_REPORT_PROTO_NAME = IncrementalContainerReportProto.getDescriptor().getFullName(); - // Accepted types of reports that can be queued to incrementalReportsQueue - private static final Set ACCEPTED_INCREMENTAL_REPORT_TYPE_SET = - Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME, - INCREMENTAL_CONTAINER_REPORT_PROTO_NAME); static final Logger LOG = LoggerFactory.getLogger(StateContext.class); @@ -116,6 +113,11 @@ public class StateContext { private boolean shutdownOnError = false; private boolean shutdownGracefully = false; private final AtomicLong threadPoolNotAvailableCount; + private Map> fullReportSendIndicator; + + private List fullReportTypeList; + private Map> type2Reports; /** * term of latest leader SCM, extract from SCMCommand. @@ -161,6 +163,22 @@ public StateContext(ConfigurationSource conf, lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); threadPoolNotAvailableCount = new AtomicLong(0); + fullReportSendIndicator = new HashMap<>(); + fullReportTypeList = new ArrayList<>(); + type2Reports = new HashMap<>(); + initReportTypeCollection(); + } + + /** + * init related ReportType Collections. + */ + private void initReportTypeCollection(){ + fullReportTypeList.add(CONTAINER_REPORTS_PROTO_NAME); + type2Reports.put(CONTAINER_REPORTS_PROTO_NAME, containerReports); + fullReportTypeList.add(NODE_REPORT_PROTO_NAME); + type2Reports.put(NODE_REPORT_PROTO_NAME, nodeReport); + fullReportTypeList.add(PIPELINE_REPORTS_PROTO_NAME); + type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports); } /** @@ -248,7 +266,7 @@ public boolean getShutdownOnError() { * * @param report report to be added */ - public void addReport(GeneratedMessage report) { + public void addIncrementalReport(GeneratedMessage report) { if (report == null) { return; } @@ -256,21 +274,38 @@ public void addReport(GeneratedMessage report) { Preconditions.checkState(descriptor != null); final String reportType = descriptor.getFullName(); Preconditions.checkState(reportType != null); - if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) { - containerReports.set(report); - } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) { - nodeReport.set(report); - } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) { - pipelineReports.set(report); - } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) { - synchronized (incrementalReportsQueue) { - for (InetSocketAddress endpoint : endpoints) { - incrementalReportsQueue.get(endpoint).add(report); - } + // in some case, we want to add a fullReportType message + // as an incremental message. + // see XceiverServerRatis#sendPipelineReport + synchronized (incrementalReportsQueue) { + for (InetSocketAddress endpoint : endpoints) { + incrementalReportsQueue.get(endpoint).add(report); } - } else { + } + } + + /** + * refresh Full report. + * + * @param report report to be refreshed + */ + public void refreshFullReport(GeneratedMessage report) { + if (report == null) { + return; + } + final Descriptor descriptor = report.getDescriptorForType(); + Preconditions.checkState(descriptor != null); + final String reportType = descriptor.getFullName(); + Preconditions.checkState(reportType != null); + if (!fullReportTypeList.contains(reportType)) { throw new IllegalArgumentException( - "Unidentified report message type: " + reportType); + "not full report message type: " + reportType); + } + type2Reports.get(reportType).set(report); + if (null != fullReportSendIndicator){ + for (Map mp : fullReportSendIndicator.values()){ + mp.get(reportType).set(true); + } } } @@ -293,10 +328,6 @@ public void putBackReports(List reportsToPutBack, Preconditions.checkState(descriptor != null); final String reportType = descriptor.getFullName(); Preconditions.checkState(reportType != null); - if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) { - throw new IllegalArgumentException( - "Unaccepted report message type: " + reportType); - } } synchronized (incrementalReportsQueue) { if (incrementalReportsQueue.containsKey(endpoint)){ @@ -332,19 +363,21 @@ List getIncrementalReports( return reportsToReturn; } - List getNonIncrementalReports() { + List getNonIncrementalReports( + InetSocketAddress endpoint) { + Map mp = fullReportSendIndicator.get(endpoint); List nonIncrementalReports = new LinkedList<>(); - GeneratedMessage report = containerReports.get(); - if (report != null) { - nonIncrementalReports.add(report); - } - report = nodeReport.get(); - if (report != null) { - nonIncrementalReports.add(report); - } - report = pipelineReports.get(); - if (report != null) { - nonIncrementalReports.add(report); + if (null != mp){ + for (Map.Entry kv : mp.entrySet()) { + if (kv.getValue().get()) { + String reportType = kv.getKey(); + GeneratedMessage msg = type2Reports.get(reportType).get(); + if (null != msg) { + nonIncrementalReports.add(msg); + mp.get(reportType).set(false); + } + } + } } return nonIncrementalReports; } @@ -360,7 +393,7 @@ public List getReports(InetSocketAddress endpoint, if (maxLimit < 0) { throw new IllegalArgumentException("Illegal maxLimit value: " + maxLimit); } - List reports = getNonIncrementalReports(); + List reports = getNonIncrementalReports(endpoint); if (maxLimit <= reports.size()) { return reports.subList(0, maxLimit); } else { @@ -788,6 +821,11 @@ public void addEndpoint(InetSocketAddress endpoint) { this.containerActions.put(endpoint, new LinkedList<>()); this.pipelineActions.put(endpoint, new LinkedList<>()); this.incrementalReportsQueue.put(endpoint, new LinkedList<>()); + Map mp = new HashMap<>(); + fullReportTypeList.forEach(e ->{ + mp.putIfAbsent(e, new AtomicBoolean(true)); + }); + this.fullReportSendIndicator.putIfAbsent(endpoint, mp); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 006c62647515..0ca3404717ae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -150,7 +150,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception { } catch (IOException ex) { Preconditions.checkState(requestBuilder != null); // put back the reports which failed to be sent - putBackReports(requestBuilder); + putBackIncrementalReports(requestBuilder); rpcEndpoint.logIfNeeded(ex); } finally { rpcEndpoint.unlock(); @@ -159,7 +159,8 @@ public EndpointStateMachine.EndPointStates call() throws Exception { } // TODO: Make it generic. - private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { + private void putBackIncrementalReports( + SCMHeartbeatRequestProto.Builder requestBuilder) { List reports = new LinkedList<>(); // We only put back CommandStatusReports and IncrementalContainerReport // because those are incremental. Container/Node/PipelineReport are diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 6fd2706a8b34..2683e640e60e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -916,8 +916,8 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, private void sendPipelineReport() { if (context != null) { - // TODO: Send IncrementalPipelineReport instead of full PipelineReport - context.addReport(context.getParent().getContainer().getPipelineReport()); + context.addIncrementalReport( + context.getParent().getContainer().getPipelineReport()); context.getParent().triggerHeartbeat(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 41779812495a..78d7904d5e7d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -127,7 +127,7 @@ public OzoneContainer( .newBuilder() .addReport(containerReplicaProto) .build(); - context.addReport(icr); + context.addIncrementalReport(icr); context.getParent().triggerHeartbeat(); }; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index aa1337f6198b..c0a87b5e40f6 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -121,8 +121,7 @@ public void testPublishReport() throws InterruptedException { Thread.sleep(150); executorService.shutdown(); Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount); - verify(dummyContext, times(1)).addReport(null); - + verify(dummyContext, times(1)).refreshFullReport(null); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index 586d171fff50..d26233f4c82b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; @@ -106,28 +106,7 @@ public void testPutBackReports() { // getReports dequeues incremental reports expectedReportCount.clear(); - // Case 2: Attempt to put back a full report - - try { - ctx.putBackReports(Collections.singletonList( - newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - try { - ctx.putBackReports(Collections.singletonList( - newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - try { - ctx.putBackReports(Collections.singletonList( - newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - - // Case 3: Put back mixed types of incremental reports + // Case 2: Put back mixed types of incremental reports ctx.putBackReports(Arrays.asList( newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME), @@ -146,30 +125,6 @@ public void testPutBackReports() { checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); // getReports dequeues incremental reports expectedReportCount.clear(); - - // Case 4: Attempt to put back mixed types of full reports - - try { - ctx.putBackReports(Arrays.asList( - newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME), - newMockReport(StateContext.NODE_REPORT_PROTO_NAME), - newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME) - ), scm1); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - - // Case 5: Attempt to put back mixed full and incremental reports - - try { - ctx.putBackReports(Arrays.asList( - newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME), - newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME), - newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME) - ), scm2); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } } @Test @@ -191,35 +146,35 @@ public void testReportQueueWithAddReports() { Map expectedReportCount = new HashMap<>(); // Add a bunch of ContainerReports - batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); + batchRefreshfullReports(ctx, + StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); // Should only keep the latest one expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1); checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // every time getAllAvailableReports is called , if we want to get a full + // report of a certain type, we must call "batchRefreshfullReports" for + // this type to refresh. + expectedReportCount.remove(StateContext.CONTAINER_REPORTS_PROTO_NAME); // Add a bunch of NodeReport - batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); + batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); // Should only keep the latest one expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1); checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + expectedReportCount.remove(StateContext.NODE_REPORT_PROTO_NAME); // Add a bunch of PipelineReports - batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); - // Should only keep the latest one - expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1); - checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); - checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); - - // Add a bunch of PipelineReports - batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); + batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); // Should only keep the latest one expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1); checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + expectedReportCount.remove(StateContext.PIPELINE_REPORTS_PROTO_NAME); // Add a bunch of CommandStatusReports - batchAddReports(ctx, + batchAddIncrementalReport(ctx, StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128); expectedReportCount.put( StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128); @@ -231,7 +186,7 @@ public void testReportQueueWithAddReports() { StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME); // Add a bunch of IncrementalContainerReport - batchAddReports(ctx, + batchAddIncrementalReport(ctx, StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128); // Should keep all of them expectedReportCount.put( @@ -243,9 +198,16 @@ public void testReportQueueWithAddReports() { StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME); } - void batchAddReports(StateContext ctx, String reportName, int count) { + void batchRefreshfullReports(StateContext ctx, String reportName, int count) { + for (int i = 0; i < count; i++) { + ctx.refreshFullReport(newMockReport(reportName)); + } + } + + void batchAddIncrementalReport(StateContext ctx, + String reportName, int count) { for (int i = 0; i < count; i++) { - ctx.addReport(newMockReport(reportName)); + ctx.addIncrementalReport(newMockReport(reportName)); } } @@ -276,7 +238,7 @@ public void testContainerNodePipelineReportAPIs() { assertNull(context1.getPipelineReports()); GeneratedMessage containerReports = newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME); - context1.addReport(containerReports); + context1.refreshFullReport(containerReports); assertNotNull(context1.getContainerReports()); assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME, @@ -288,7 +250,7 @@ public void testContainerNodePipelineReportAPIs() { StateContext context2 = newStateContext(conf, datanodeStateMachineMock); GeneratedMessage nodeReport = newMockReport(StateContext.NODE_REPORT_PROTO_NAME); - context2.addReport(nodeReport); + context2.refreshFullReport(nodeReport); assertNull(context2.getContainerReports()); assertNotNull(context2.getNodeReport()); @@ -300,7 +262,7 @@ public void testContainerNodePipelineReportAPIs() { StateContext context3 = newStateContext(conf, datanodeStateMachineMock); GeneratedMessage pipelineReports = newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME); - context3.addReport(pipelineReports); + context3.refreshFullReport(pipelineReports); assertNull(context3.getContainerReports()); assertNull(context3.getNodeReport()); @@ -347,7 +309,7 @@ public void testReportAPIs() { "hadoop.hdds.CommandStatusReportsProto"); // Try to add report with zero endpoint. Should not be stored. - stateContext.addReport(generatedMessage); + stateContext.addIncrementalReport(generatedMessage); assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty()); // Add 2 scm endpoints. @@ -355,7 +317,7 @@ public void testReportAPIs() { stateContext.addEndpoint(scm2); // Add report. Should be added to all endpoints. - stateContext.addReport(generatedMessage); + stateContext.addIncrementalReport(generatedMessage); List allAvailableReports = stateContext.getAllAvailableReports(scm1); assertEquals(1, allAvailableReports.size()); @@ -498,9 +460,9 @@ public void testIsThreadPoolAvailable() throws Exception { // task num greater than pool size for (int i = 0; i < threadPoolSize; i++) { - executorService.submit(() -> futureOne.get()); + executorService.submit((Callable) futureOne::get); } - executorService.submit(() -> futureTwo.get()); + executorService.submit((Callable) futureTwo::get); Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService)); @@ -519,8 +481,8 @@ public void doesNotAwaitWithoutExecute() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); CompletableFuture future = new CompletableFuture<>(); - executorService.submit(() -> future.get()); - executorService.submit(() -> future.get()); + executorService.submit((Callable) future::get); + executorService.submit((Callable) future::get); StateContext subject = new StateContext(new OzoneConfiguration(), DatanodeStates.INIT, mock(DatanodeStateMachine.class)) { @@ -585,10 +547,11 @@ public void testGetReports() { Map expectedReportCount = new HashMap<>(); // Add a bunch of ContainerReports - batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); - batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); - batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); - batchAddReports(ctx, + batchRefreshfullReports(ctx, + StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); + batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); + batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); + batchAddIncrementalReport(ctx, StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128); // Should only keep the latest one diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 9b238a188eb6..0f215f4238c9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -96,7 +96,7 @@ public void testheartbeatWithNodeReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(NodeReportProto.getDefaultInstance()); + context.refreshFullReport(NodeReportProto.getDefaultInstance()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); Assert.assertTrue(heartbeat.hasDatanodeDetails()); @@ -128,7 +128,7 @@ public void testheartbeatWithContainerReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(ContainerReportsProto.getDefaultInstance()); + context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); Assert.assertTrue(heartbeat.hasDatanodeDetails()); @@ -160,7 +160,8 @@ public void testheartbeatWithCommandStatusReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(CommandStatusReportsProto.getDefaultInstance()); + context.addIncrementalReport( + CommandStatusReportsProto.getDefaultInstance()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); Assert.assertTrue(heartbeat.hasDatanodeDetails()); @@ -224,9 +225,10 @@ public void testheartbeatWithAllReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(NodeReportProto.getDefaultInstance()); - context.addReport(ContainerReportsProto.getDefaultInstance()); - context.addReport(CommandStatusReportsProto.getDefaultInstance()); + context.refreshFullReport(NodeReportProto.getDefaultInstance()); + context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); + context.addIncrementalReport( + CommandStatusReportsProto.getDefaultInstance()); context.addContainerAction(getContainerAction()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index c8811c17d634..5443b3c542e7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -392,7 +392,8 @@ private void verifyPendingDeleteEvent() logCapturer.clearOutput(); cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().getContext().addReport(dummyReport); + .getDatanodeStateMachine().getContext(). + addIncrementalReport(dummyReport); cluster.getHddsDatanodes().get(0) .getDatanodeStateMachine().triggerHeartbeat(); // wait for event to be handled by event handler From 30baf9fd75eedb36ebbd0520241927f9c7837907 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Thu, 3 Jun 2021 15:33:49 +0800 Subject: [PATCH 02/10] Update hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java add Code Comments Co-authored-by: Siyao Meng <50227127+smengcl@users.noreply.github.com> --- .../container/common/statemachine/StateContext.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 752715713d5f..e5190b05b45b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -113,11 +113,14 @@ public class StateContext { private boolean shutdownOnError = false; private boolean shutdownGracefully = false; private final AtomicLong threadPoolNotAvailableCount; - private Map ReportType -> Boolean of whether the full report should be + // queued in getFullReports call. + private final Map> fullReportSendIndicator; - - private List fullReportTypeList; - private Map> type2Reports; + // List of supported full report types. + private final List fullReportTypeList; + // ReportType -> Report. + private final Map> type2Reports; /** * term of latest leader SCM, extract from SCMCommand. From ec37453069def2816f5e6a76f1b774ba07906f2e Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Thu, 3 Jun 2021 16:10:49 +0800 Subject: [PATCH 03/10] Update hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java Co-authored-by: Siyao Meng <50227127+smengcl@users.noreply.github.com> --- .../ozone/container/common/statemachine/StateContext.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index e5190b05b45b..e449ba04b202 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -305,8 +305,8 @@ public void refreshFullReport(GeneratedMessage report) { "not full report message type: " + reportType); } type2Reports.get(reportType).set(report); - if (null != fullReportSendIndicator){ - for (Map mp : fullReportSendIndicator.values()){ + if (fullReportSendIndicator != null) { + for (Map mp : fullReportSendIndicator.values()) { mp.get(reportType).set(true); } } From 3c0f641562948e1e3bb0d4c3a2ecf4eb334305d1 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Thu, 3 Jun 2021 16:11:18 +0800 Subject: [PATCH 04/10] Update hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java Co-authored-by: Siyao Meng <50227127+smengcl@users.noreply.github.com> --- .../ozone/container/common/statemachine/StateContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index e449ba04b202..7704d86e3dc6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -825,7 +825,7 @@ public void addEndpoint(InetSocketAddress endpoint) { this.pipelineActions.put(endpoint, new LinkedList<>()); this.incrementalReportsQueue.put(endpoint, new LinkedList<>()); Map mp = new HashMap<>(); - fullReportTypeList.forEach(e ->{ + fullReportTypeList.forEach(e -> { mp.putIfAbsent(e, new AtomicBoolean(true)); }); this.fullReportSendIndicator.putIfAbsent(endpoint, mp); From c7fb3edf2d0722d1476307cf9b6b560dadf97be7 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Thu, 3 Jun 2021 16:49:59 +0800 Subject: [PATCH 05/10] update --- .../container/common/statemachine/StateContext.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 7704d86e3dc6..896b46e6ce72 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -366,7 +366,7 @@ List getIncrementalReports( return reportsToReturn; } - List getNonIncrementalReports( + List getFullReports( InetSocketAddress endpoint) { Map mp = fullReportSendIndicator.get(endpoint); List nonIncrementalReports = new LinkedList<>(); @@ -375,10 +375,12 @@ List getNonIncrementalReports( if (kv.getValue().get()) { String reportType = kv.getKey(); GeneratedMessage msg = type2Reports.get(reportType).get(); - if (null != msg) { - nonIncrementalReports.add(msg); - mp.get(reportType).set(false); + if (null == msg) { + throw new NullPointerException( "Error on getting report, Type: " + + reportType); } + nonIncrementalReports.add(msg); + mp.get(reportType).set(false); } } } @@ -396,7 +398,7 @@ public List getReports(InetSocketAddress endpoint, if (maxLimit < 0) { throw new IllegalArgumentException("Illegal maxLimit value: " + maxLimit); } - List reports = getNonIncrementalReports(endpoint); + List reports = getFullReports(endpoint); if (maxLimit <= reports.size()) { return reports.subList(0, maxLimit); } else { From ee825a95946f13bfc7f1f4398187030cdad29187 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Thu, 3 Jun 2021 19:50:04 +0800 Subject: [PATCH 06/10] fix checkstyle --- .../ozone/container/common/statemachine/StateContext.java | 2 +- .../ozone/container/common/statemachine/TestStateContext.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index cdc54b8d4e0d..b1754c6c53cf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -384,7 +384,7 @@ List getFullReports( String reportType = kv.getKey(); GeneratedMessage msg = type2Reports.get(reportType).get(); if (null == msg) { - throw new NullPointerException( "Error on getting report, Type: " + throw new NullPointerException("Error on getting report, Type: " + reportType); } nonIncrementalReports.add(msg); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index d93f36adacfb..6d0ad16f0406 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -547,7 +547,8 @@ public void testGetReports() { StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); - batchRefreshfullReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128); + batchRefreshfullReports(ctx, + StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128); batchAddIncrementalReport(ctx, StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128); From 927e094ad12573d7c92671ffde81eab2ee2c7f38 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Thu, 3 Jun 2021 20:27:43 +0800 Subject: [PATCH 07/10] fix unit test --- .../ozone/container/common/statemachine/StateContext.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index b1754c6c53cf..1ec496ca7f9f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -383,12 +383,10 @@ List getFullReports( if (kv.getValue().get()) { String reportType = kv.getKey(); GeneratedMessage msg = type2Reports.get(reportType).get(); - if (null == msg) { - throw new NullPointerException("Error on getting report, Type: " - + reportType); + if (msg != null) { + nonIncrementalReports.add(msg); + mp.get(reportType).set(false); } - nonIncrementalReports.add(msg); - mp.get(reportType).set(false); } } } From be283146c32fc5787c14cc91b9cccda6ad40869c Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Thu, 3 Jun 2021 23:15:36 +0800 Subject: [PATCH 08/10] triger CI for TestSCMInstallSnapshot.setup From 44f3887aa50605ec5614dd6dbf604bfad71ca625 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Sat, 5 Jun 2021 18:21:28 +0800 Subject: [PATCH 09/10] Update hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java check before AtomicReference#get Co-authored-by: Siyao Meng <50227127+smengcl@users.noreply.github.com> --- .../ozone/container/common/statemachine/StateContext.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 1ec496ca7f9f..ca734681d77f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -382,7 +382,13 @@ List getFullReports( for (Map.Entry kv : mp.entrySet()) { if (kv.getValue().get()) { String reportType = kv.getKey(); - GeneratedMessage msg = type2Reports.get(reportType).get(); + final AtomicReference ref = + type2Reports.get(reportType); + if (ref == null) { + throw new RuntimeException(reportType + " is not a valid full " + + "report type!"); + } + final GeneratedMessage msg = ref.get(); if (msg != null) { nonIncrementalReports.add(msg); mp.get(reportType).set(false); From 4cc0399e9d7cbd63d0aac17099e0d1226c775cf0 Mon Sep 17 00:00:00 2001 From: Jackson Yao Date: Sat, 5 Jun 2021 19:52:02 +0800 Subject: [PATCH 10/10] triger ci for TestSCMUpdateServiceGrpcServer.testClientUpdateWithRestart