diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java index d8f4fec398..bd2a59a2b8 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java @@ -18,10 +18,17 @@ package org.apache.ratis.server.storage; import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.ReflectionUtils; +import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; /** The storage of a raft server. */ public interface RaftStorage extends Closeable { @@ -35,4 +42,77 @@ public interface RaftStorage extends Closeable { /** @return the corruption policy for raft log. */ CorruptionPolicy getLogCorruptionPolicy(); + + static Builder newBuilder() { + return new Builder(); + } + + enum StartupOption { + /** Format the storage. */ + FORMAT, + RECOVER + } + + class Builder { + + private static final Method NEW_RAFT_STORAGE_METHOD = initNewRaftStorageMethod(); + + private static Method initNewRaftStorageMethod() { + final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils"; + //final String className = "org.apache.ratis.server.storage.RaftStorageImpl"; + final Class[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class }; + try { + final Class clazz = ReflectionUtils.getClassByName(className); + return clazz.getMethod("newRaftStorage", argClasses); + } catch (Exception e) { + throw new IllegalStateException("Failed to initNewRaftStorageMethod", e); + } + } + + private static RaftStorage newRaftStorage(File dir, CorruptionPolicy logCorruptionPolicy, + StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException { + try { + return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null, + dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize()); + } catch (IllegalAccessException e) { + throw new IllegalStateException("Failed to build " + dir, e); + } catch (InvocationTargetException e) { + Throwable t = e.getTargetException(); + if (t.getCause() instanceof IOException) { + throw IOUtils.asIOException(t.getCause()); + } + throw IOUtils.asIOException(e.getCause()); + } + } + + + private File directory; + private CorruptionPolicy logCorruptionPolicy; + private StartupOption option; + private SizeInBytes storageFreeSpaceMin; + + public Builder setDirectory(File directory) { + this.directory = directory; + return this; + } + + public Builder setLogCorruptionPolicy(CorruptionPolicy logCorruptionPolicy) { + this.logCorruptionPolicy = logCorruptionPolicy; + return this; + } + + public Builder setOption(StartupOption option) { + this.option = option; + return this; + } + + public Builder setStorageFreeSpaceMin(SizeInBytes storageFreeSpaceMin) { + this.storageFreeSpaceMin = storageFreeSpaceMin; + return this; + } + + public RaftStorage build() throws IOException { + return newRaftStorage(directory, logCorruptionPolicy, option, storageFreeSpaceMin); + } + } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index b66ba7336a..37c6fcd3ba 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -112,8 +112,12 @@ class ServerState implements Closeable { // use full uuid string to create a subdirectory File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString()); try { - storage = new RaftStorageImpl(dir, RaftServerConfigKeys.Log.corruptionPolicy(prop), - RaftServerConfigKeys.storageFreeSpaceMin(prop).getSize()); + storage = (RaftStorageImpl) RaftStorage.newBuilder() + .setDirectory(dir) + .setOption(RaftStorage.StartupOption.RECOVER) + .setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop)) + .setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop)) + .build(); storageFound = true; break; } catch (IOException e) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java index 0bbf902e27..fdf1b0b184 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java @@ -36,22 +36,12 @@ /** The storage of a {@link org.apache.ratis.server.RaftServer}. */ public class RaftStorageImpl implements RaftStorage { - public enum StartupOption { - /** Format the storage. */ - FORMAT - } - // TODO support multiple storage directories private final RaftStorageDirectoryImpl storageDir; private final StorageState state; private final CorruptionPolicy logCorruptionPolicy; private volatile RaftStorageMetadataFileImpl metaFile; - public RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, - long storageFeeSpaceMin) throws IOException { - this(dir, logCorruptionPolicy, null, storageFeeSpaceMin); - } - RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option, long storageFeeSpaceMin) throws IOException { this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java new file mode 100644 index 0000000000..10296657b4 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public final class StorageImplUtils { + + private StorageImplUtils() { + //Never constructed + } + + /** Create a {@link RaftStorageImpl}. */ + public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy, + RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException { + RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin); + + final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); + final RaftStorageImpl raftStorage; + try { + // attempt multiple times to avoid temporary bind exception + raftStorage = JavaUtils.attemptRepeatedly( + () -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin), + 5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw IOUtils.toInterruptedIOException( + "Interrupted when creating RaftStorage " + dir, e); + } catch (IOException e) { + throw new RuntimeException(e); + } + return raftStorage; + } +} diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java index d8c48ff672..d22bc5c4e7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java @@ -21,6 +21,7 @@ import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.RATIS_LOG_WORKER_METRICS; import org.apache.ratis.metrics.RatisMetrics; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.RaftLogBase; @@ -33,7 +34,11 @@ public interface RaftStorageTestUtils { static RaftStorage newRaftStorage(File dir) throws IOException { - return new RaftStorageImpl(dir, null, 0L); + return RaftStorage.newBuilder() + .setDirectory(dir) + .setOption(RaftStorage.StartupOption.RECOVER) + .setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT) + .build(); } static String getLogFlushTimeMetric(String memberId) { diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index 8dcff758b4..16e11f7d30 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -125,9 +125,13 @@ void runTestServerRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exc // the raft server proxy created earlier. Raft server proxy should close // the rpc server on failure. RaftServerConfigKeys.setStorageDir(p, Collections.singletonList(cluster.getStorageDir(leaderId))); - testFailureCase("start a new server with the same address", - () -> newRaftServer(cluster, leaderId, stateMachine, p).start(), - IOException.class, OverlappingFileLockException.class); + try { + LOG.info("start a new server with the same address"); + newRaftServer(cluster, leaderId, stateMachine, p).start(); + } catch (IOException e) { + Assert.assertTrue(e.getCause() instanceof OverlappingFileLockException); + Assert.assertTrue(e.getMessage().contains("directory is already locked")); + } // Try to start a raft server rpc at the leader address. cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId)); } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java index 89e2ccece8..5a34264cc2 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -22,11 +22,13 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.SnapshotRetentionPolicy; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.SizeInBytes; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -47,7 +49,11 @@ */ public class TestRaftStorage extends BaseTest { static RaftStorageImpl newRaftStorage(File dir) throws IOException { - return new RaftStorageImpl(dir, null, 0); + return (RaftStorageImpl) RaftStorage.newBuilder() + .setDirectory(dir) + .setOption(RaftStorage.StartupOption.RECOVER) + .setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT) + .build(); } private File storageDir; @@ -65,7 +71,11 @@ public void tearDown() throws Exception { } static RaftStorageImpl formatRaftStorage(File dir) throws IOException { - return new RaftStorageImpl(dir, null, RaftStorageImpl.StartupOption.FORMAT, 0); + return (RaftStorageImpl) RaftStorage.newBuilder() + .setDirectory(dir) + .setOption(RaftStorage.StartupOption.FORMAT) + .setStorageFreeSpaceMin(SizeInBytes.valueOf(0)) + .build(); } @Test