Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.common.transport.server.ratis;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -31,7 +32,6 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -79,13 +79,16 @@ abstract class TestContainerStateMachine {
.setNameFormat("ChunkWriter-" + i + "-%d")
.build())).collect(Collectors.toList());
private final boolean isLeader;
private static final String CONTAINER_DATA = "Test Data";

TestContainerStateMachine(boolean isLeader) {
this.isLeader = isLeader;
}

@BeforeEach
public void setup() throws IOException {
conf.setTimeDuration(HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL,
1000_000_000, TimeUnit.NANOSECONDS);
dispatcher = mock(ContainerDispatcher.class);
ContainerController controller = mock(ContainerController.class);
XceiverServerRatis ratisServer = mock(XceiverServerRatis.class);
Expand Down Expand Up @@ -129,52 +132,26 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
TransactionContext trx = mock(TransactionContext.class);
ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class);
when(trx.getStateMachineContext()).thenReturn(context);
if (failWithException) {
when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException());
} else {
when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
.setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
.build());
}

when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
.setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
.setContainerID(1)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
AtomicReference<Throwable> throwable = new AtomicReference<>(null);
Function<Throwable, ? extends Message> throwableSetter = t -> {
throwable.set(t);
return null;
};
stateMachine.write(entry, trx).exceptionally(throwableSetter).get();
setUpMockDispatcherReturn(failWithException);
setUpMockRequestProtoReturn(context, 1, 1);

ThrowableCatcher catcher = new ThrowableCatcher();

stateMachine.write(entry, trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
reset(dispatcher);
assertNotNull(throwable.get());
if (failWithException) {
assertInstanceOf(RuntimeException.class, throwable.get());
} else {
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
}
assertNotNull(catcher.getReceived());
assertResults(failWithException, catcher.getCaught());

// Writing data to another container(containerId 2) should also fail.
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
.setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build())
.setContainerID(2)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
stateMachine.write(entryNext, trx).exceptionally(throwableSetter).get();
setUpMockRequestProtoReturn(context, 2, 1);
stateMachine.write(entryNext, trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher, times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
assertInstanceOf(StorageContainerException.class, catcher.getReceived());
StorageContainerException sce = (StorageContainerException) catcher.getReceived();
assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());
}

Expand All @@ -189,66 +166,39 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class);
when(trx.getLogEntry()).thenReturn(entry);
when(trx.getStateMachineContext()).thenReturn(context);
if (failWithException) {
when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException());
} else {
when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
.setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
.build());
}

setUpMockDispatcherReturn(failWithException);
// Failing apply transaction on congtainer 1.
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
.setContainerID(1)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
AtomicReference<Throwable> throwable = new AtomicReference<>(null);
Function<Throwable, ? extends Message> throwableSetter = t -> {
throwable.set(t);
return null;
};
setUpLogProtoReturn(context, 1, 1);
ThrowableCatcher catcher = new ThrowableCatcher();
//apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first
// failure on container 1.
stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
reset(dispatcher);
assertNotNull(throwable.get());
if (failWithException) {
assertInstanceOf(RuntimeException.class, throwable.get());
} else {
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
}
assertNotNull(catcher.getCaught());
assertResults(failWithException, catcher.getCaught());
// Another apply transaction on same container 1 should fail because the previous apply transaction failed.
stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher, times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
assertInstanceOf(StorageContainerException.class, catcher.getReceived());
StorageContainerException sce = (StorageContainerException) catcher.getReceived();
assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());

// Another apply transaction on a different container 2 shouldn't fail because the previous apply transaction
// failure was only on container 1.
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build())
.setContainerID(2)
.setDatanodeUuid(UUID.randomUUID().toString()).build());

setUpLogProtoReturn(context, 2, 1);
reset(dispatcher);
throwable.set(null);
catcher.getCaught().set(null);
when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk).setResult(ContainerProtos.Result.SUCCESS)
.build());
Message succcesfulTransaction = stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
Message succcesfulTransaction = stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
assertNull(throwable.get());
assertNull(catcher.getReceived());
ContainerProtos.ContainerCommandResponseProto resp =
ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent());
assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult());
Expand All @@ -275,32 +225,87 @@ public void testWriteTimout() throws Exception {
return null;
}).when(dispatcher).dispatch(any(), any());

when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
.setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
.setContainerID(1)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
AtomicReference<Throwable> throwable = new AtomicReference<>(null);
Function<Throwable, ? extends Message> throwableSetter = t -> {
throwable.set(t);
return null;
};
Field writeChunkWaitMaxNs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs");
writeChunkWaitMaxNs.setAccessible(true);
writeChunkWaitMaxNs.set(stateMachine, 1000_000_000);
setUpMockRequestProtoReturn(context, 1, 1);
ThrowableCatcher catcher = new ThrowableCatcher();

CompletableFuture<Message> firstWrite = stateMachine.write(entry, trx);
Thread.sleep(2000);
CompletableFuture<Message> secondWrite = stateMachine.write(entryNext, trx);
firstWrite.exceptionally(throwableSetter).get();
assertNotNull(throwable.get());
assertInstanceOf(InterruptedException.class, throwable.get());

secondWrite.exceptionally(throwableSetter).get();
assertNotNull(throwable.get());
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
firstWrite.exceptionally(catcher.asSetter()).get();
assertNotNull(catcher.getCaught());
assertInstanceOf(InterruptedException.class, catcher.getReceived());

secondWrite.exceptionally(catcher.asSetter()).get();
assertNotNull(catcher.getReceived());
assertInstanceOf(StorageContainerException.class, catcher.getReceived());
StorageContainerException sce = (StorageContainerException) catcher.getReceived();
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
}

private void setUpMockDispatcherReturn(boolean failWithException) {
if (failWithException) {
when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException());
} else {
when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
.setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
.build());
}
}

private void setUpMockRequestProtoReturn(ContainerStateMachine.Context context,
int containerId, int localId) {
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8(CONTAINER_DATA))
.setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(containerId)
.setLocalID(localId).build()).build())
.setContainerID(containerId)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
}

private void assertResults(boolean failWithException, AtomicReference<Throwable> throwable) {
if (failWithException) {
assertInstanceOf(RuntimeException.class, throwable.get());
} else {
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
}
}

private void setUpLogProtoReturn(ContainerStateMachine.Context context, int containerId, int localId) {
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().
setContainerID(containerId).setLocalID(localId).build()).build())
.setContainerID(containerId)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
}

private static class ThrowableCatcher {

private final AtomicReference<Throwable> caught = new AtomicReference<>(null);

public Function<Throwable, ? extends Message> asSetter() {
return t -> {
caught.set(t);
return null;
};
}

public AtomicReference<Throwable> getCaught() {
return caught;
}

public Throwable getReceived() {
return caught.get();
}

public void reset() {
caught.set(null);
}
}
}