diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java index a0c543d72325..f021ca3ffd6d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -31,7 +32,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; -import java.lang.reflect.Field; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -79,6 +79,7 @@ abstract class TestContainerStateMachine { .setNameFormat("ChunkWriter-" + i + "-%d") .build())).collect(Collectors.toList()); private final boolean isLeader; + private static final String CONTAINER_DATA = "Test Data"; TestContainerStateMachine(boolean isLeader) { this.isLeader = isLeader; @@ -86,6 +87,8 @@ abstract class TestContainerStateMachine { @BeforeEach public void setup() throws IOException { + conf.setTimeDuration(HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL, + 1000_000_000, TimeUnit.NANOSECONDS); dispatcher = mock(ContainerDispatcher.class); ContainerController controller = mock(ContainerController.class); XceiverServerRatis ratisServer = mock(XceiverServerRatis.class); @@ -129,52 +132,26 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio TransactionContext trx = mock(TransactionContext.class); ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class); when(trx.getStateMachineContext()).thenReturn(context); - if (failWithException) { - when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException()); - } else { - when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto - .newBuilder().setCmdType(ContainerProtos.Type.WriteChunk) - .setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR) - .build()); - } - when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( - ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data")) - .setBlockID( - ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build()) - .setContainerID(1) - .setDatanodeUuid(UUID.randomUUID().toString()).build()); - AtomicReference throwable = new AtomicReference<>(null); - Function throwableSetter = t -> { - throwable.set(t); - return null; - }; - stateMachine.write(entry, trx).exceptionally(throwableSetter).get(); + setUpMockDispatcherReturn(failWithException); + setUpMockRequestProtoReturn(context, 1, 1); + + ThrowableCatcher catcher = new ThrowableCatcher(); + + stateMachine.write(entry, trx).exceptionally(catcher.asSetter()).get(); verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class), any(DispatcherContext.class)); reset(dispatcher); - assertNotNull(throwable.get()); - if (failWithException) { - assertInstanceOf(RuntimeException.class, throwable.get()); - } else { - assertInstanceOf(StorageContainerException.class, throwable.get()); - StorageContainerException sce = (StorageContainerException) throwable.get(); - assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult()); - } + assertNotNull(catcher.getReceived()); + assertResults(failWithException, catcher.getCaught()); + // Writing data to another container(containerId 2) should also fail. - when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( - ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data")) - .setBlockID( - ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build()) - .setContainerID(2) - .setDatanodeUuid(UUID.randomUUID().toString()).build()); - stateMachine.write(entryNext, trx).exceptionally(throwableSetter).get(); + setUpMockRequestProtoReturn(context, 2, 1); + stateMachine.write(entryNext, trx).exceptionally(catcher.asSetter()).get(); verify(dispatcher, times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class), any(DispatcherContext.class)); - assertInstanceOf(StorageContainerException.class, throwable.get()); - StorageContainerException sce = (StorageContainerException) throwable.get(); + assertInstanceOf(StorageContainerException.class, catcher.getReceived()); + StorageContainerException sce = (StorageContainerException) catcher.getReceived(); assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult()); } @@ -189,66 +166,39 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class); when(trx.getLogEntry()).thenReturn(entry); when(trx.getStateMachineContext()).thenReturn(context); - if (failWithException) { - when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException()); - } else { - when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto - .newBuilder().setCmdType(ContainerProtos.Type.WriteChunk) - .setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR) - .build()); - } + + setUpMockDispatcherReturn(failWithException); // Failing apply transaction on congtainer 1. - when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( - ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID( - ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build()) - .setContainerID(1) - .setDatanodeUuid(UUID.randomUUID().toString()).build()); - AtomicReference throwable = new AtomicReference<>(null); - Function throwableSetter = t -> { - throwable.set(t); - return null; - }; + setUpLogProtoReturn(context, 1, 1); + ThrowableCatcher catcher = new ThrowableCatcher(); //apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first // failure on container 1. - stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get(); + stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get(); verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class), any(DispatcherContext.class)); reset(dispatcher); - assertNotNull(throwable.get()); - if (failWithException) { - assertInstanceOf(RuntimeException.class, throwable.get()); - } else { - assertInstanceOf(StorageContainerException.class, throwable.get()); - StorageContainerException sce = (StorageContainerException) throwable.get(); - assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult()); - } + assertNotNull(catcher.getCaught()); + assertResults(failWithException, catcher.getCaught()); // Another apply transaction on same container 1 should fail because the previous apply transaction failed. - stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get(); + stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get(); verify(dispatcher, times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class), any(DispatcherContext.class)); - assertInstanceOf(StorageContainerException.class, throwable.get()); - StorageContainerException sce = (StorageContainerException) throwable.get(); + assertInstanceOf(StorageContainerException.class, catcher.getReceived()); + StorageContainerException sce = (StorageContainerException) catcher.getReceived(); assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult()); // Another apply transaction on a different container 2 shouldn't fail because the previous apply transaction // failure was only on container 1. - when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( - ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID( - ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build()) - .setContainerID(2) - .setDatanodeUuid(UUID.randomUUID().toString()).build()); - + setUpLogProtoReturn(context, 2, 1); reset(dispatcher); - throwable.set(null); + catcher.getCaught().set(null); when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto .newBuilder().setCmdType(ContainerProtos.Type.WriteChunk).setResult(ContainerProtos.Result.SUCCESS) .build()); - Message succcesfulTransaction = stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get(); + Message succcesfulTransaction = stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get(); verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class), any(DispatcherContext.class)); - assertNull(throwable.get()); + assertNull(catcher.getReceived()); ContainerProtos.ContainerCommandResponseProto resp = ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent()); assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult()); @@ -275,32 +225,87 @@ public void testWriteTimout() throws Exception { return null; }).when(dispatcher).dispatch(any(), any()); - when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( - ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data")) - .setBlockID( - ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build()) - .setContainerID(1) - .setDatanodeUuid(UUID.randomUUID().toString()).build()); - AtomicReference throwable = new AtomicReference<>(null); - Function throwableSetter = t -> { - throwable.set(t); - return null; - }; - Field writeChunkWaitMaxNs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs"); - writeChunkWaitMaxNs.setAccessible(true); - writeChunkWaitMaxNs.set(stateMachine, 1000_000_000); + setUpMockRequestProtoReturn(context, 1, 1); + ThrowableCatcher catcher = new ThrowableCatcher(); + CompletableFuture firstWrite = stateMachine.write(entry, trx); Thread.sleep(2000); CompletableFuture secondWrite = stateMachine.write(entryNext, trx); - firstWrite.exceptionally(throwableSetter).get(); - assertNotNull(throwable.get()); - assertInstanceOf(InterruptedException.class, throwable.get()); - - secondWrite.exceptionally(throwableSetter).get(); - assertNotNull(throwable.get()); - assertInstanceOf(StorageContainerException.class, throwable.get()); - StorageContainerException sce = (StorageContainerException) throwable.get(); + firstWrite.exceptionally(catcher.asSetter()).get(); + assertNotNull(catcher.getCaught()); + assertInstanceOf(InterruptedException.class, catcher.getReceived()); + + secondWrite.exceptionally(catcher.asSetter()).get(); + assertNotNull(catcher.getReceived()); + assertInstanceOf(StorageContainerException.class, catcher.getReceived()); + StorageContainerException sce = (StorageContainerException) catcher.getReceived(); assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult()); } + + private void setUpMockDispatcherReturn(boolean failWithException) { + if (failWithException) { + when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException()); + } else { + when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto + .newBuilder().setCmdType(ContainerProtos.Type.WriteChunk) + .setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR) + .build()); + } + } + + private void setUpMockRequestProtoReturn(ContainerStateMachine.Context context, + int containerId, int localId) { + when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( + ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8(CONTAINER_DATA)) + .setBlockID( + ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(containerId) + .setLocalID(localId).build()).build()) + .setContainerID(containerId) + .setDatanodeUuid(UUID.randomUUID().toString()).build()); + } + + private void assertResults(boolean failWithException, AtomicReference throwable) { + if (failWithException) { + assertInstanceOf(RuntimeException.class, throwable.get()); + } else { + assertInstanceOf(StorageContainerException.class, throwable.get()); + StorageContainerException sce = (StorageContainerException) throwable.get(); + assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult()); + } + } + + private void setUpLogProtoReturn(ContainerStateMachine.Context context, int containerId, int localId) { + when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( + ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID( + ContainerProtos.DatanodeBlockID.newBuilder(). + setContainerID(containerId).setLocalID(localId).build()).build()) + .setContainerID(containerId) + .setDatanodeUuid(UUID.randomUUID().toString()).build()); + } + + private static class ThrowableCatcher { + + private final AtomicReference caught = new AtomicReference<>(null); + + public Function asSetter() { + return t -> { + caught.set(t); + return null; + }; + } + + public AtomicReference getCaught() { + return caught; + } + + public Throwable getReceived() { + return caught.get(); + } + + public void reset() { + caught.set(null); + } + } }