diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java index 1d3bc00b12..5587470487 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java @@ -29,8 +29,16 @@ * APIs to support group management operations such as add, remove, list and info to a particular server. */ public interface GroupManagementApi { - /** Add a new group. */ - RaftClientReply add(RaftGroup newGroup) throws IOException; + /** + * Add a new group. + * @param format Should it format the storage? + */ + RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException; + + /** The same as add(newGroup, true). */ + default RaftClientReply add(RaftGroup newGroup) throws IOException { + return add(newGroup, true); + } /** Remove a group. */ RaftClientReply remove(RaftGroupId groupId, boolean deleteDirectory, boolean renameDirectory) throws IOException; diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 859e1d4f01..1ac8258503 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -571,8 +571,9 @@ static GroupManagementRequest toGroupManagementRequest(GroupManagementRequestPro final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId()); switch(p.getOpCase()) { case GROUPADD: + final GroupAddRequestProto add = p.getGroupAdd(); return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(), - ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup())); + ProtoUtils.toRaftGroup(add.getGroup()), add.getFormat()); case GROUPREMOVE: final GroupRemoveRequestProto remove = p.getGroupRemove(); return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(), @@ -609,8 +610,10 @@ static GroupManagementRequestProto toGroupManagementRequestProto(GroupManagement .setRpcRequest(toRaftRpcRequestProtoBuilder(request)); final GroupManagementRequest.Add add = request.getAdd(); if (add != null) { - b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup( - ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build()); + b.setGroupAdd(GroupAddRequestProto.newBuilder() + .setGroup(ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())) + .setFormat(add.isFormat()) + .build()); } final GroupManagementRequest.Remove remove = request.getRemove(); if (remove != null) { diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java index 27e0bbffce..9501bc2eaf 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java @@ -43,13 +43,13 @@ class GroupManagementImpl implements GroupManagementApi { } @Override - public RaftClientReply add(RaftGroup newGroup) throws IOException { + public RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException { Objects.requireNonNull(newGroup, "newGroup == null"); final long callId = CallId.getAndIncrement(); client.getClientRpc().addRaftPeers(newGroup.getPeers()); return client.io().sendRequestWithRetry( - () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup)); + () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup, format)); } @Override diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java index d370dfc4ca..2783d2c658 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java @@ -26,9 +26,11 @@ public abstract static class Op { public static class Add extends Op { private final RaftGroup group; + private final boolean format; - public Add(RaftGroup group) { + public Add(RaftGroup group, boolean format) { this.group = group; + this.format = format; } @Override @@ -40,6 +42,10 @@ public RaftGroup getGroup() { return group; } + public boolean isFormat() { + return format; + } + @Override public String toString() { return JavaUtils.getClassSimpleName(getClass()) + ":" + getGroup(); @@ -79,8 +85,9 @@ public String toString() { } } - public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) { - return new GroupManagementRequest(clientId, serverId, callId, new Add(group)); + public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, + RaftGroup group, boolean format) { + return new GroupManagementRequest(clientId, serverId, callId, new Add(group, format)); } public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId, diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java new file mode 100644 index 0000000000..cf2d060239 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.util.function.CheckedSupplier; + +import java.util.Objects; + +/** + * A memoized supplier is a {@link CheckedSupplier} + * which gets a value by invoking its initializer once. + * and then keeps returning the same value as its supplied results. + * + * This class is thread safe. + * + * @param The return type of the supplier. + * @param The throwable type of the supplier. + */ +public final class MemoizedCheckedSupplier + implements CheckedSupplier { + /** + * @param supplier to supply at most one non-null value. + * @return a {@link MemoizedCheckedSupplier} with the given supplier. + */ + public static MemoizedCheckedSupplier valueOf( + CheckedSupplier supplier) { + return supplier instanceof MemoizedCheckedSupplier ? + (MemoizedCheckedSupplier) supplier : new MemoizedCheckedSupplier<>(supplier); + } + + private final CheckedSupplier initializer; + private volatile RETURN value = null; + + /** + * Create a memoized supplier. + * @param initializer to supply at most one non-null value. + */ + private MemoizedCheckedSupplier(CheckedSupplier initializer) { + Objects.requireNonNull(initializer, "initializer == null"); + this.initializer = initializer; + } + + /** @return the lazily initialized object. */ + @Override + public RETURN get() throws THROW { + RETURN v = value; + if (v == null) { + synchronized (this) { + v = value; + if (v == null) { + v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null"); + } + } + } + return v; + } + + /** + * @return the already initialized object. + * @throws NullPointerException if the object is uninitialized. + */ + public RETURN getUnchecked() { + return Objects.requireNonNull(value, "value == null"); + } + + /** @return is the object initialized? */ + public boolean isInitialized() { + return value != null; + } + + @Override + public String toString() { + return isInitialized()? "Memoized:" + value: "UNINITIALIZED"; + } +} diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java index e9a002612c..902c1f5e6c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -113,6 +114,10 @@ static T assertInstanceOf(Object object, Class clazz) { return clazz.cast(object); } + static void assertEmpty(Map map, Object name) { + assertTrue(map.isEmpty(), () -> "The " + name + " map is non-empty: " + map); + } + static void assertUnique(Iterable first) { assertUnique(first, Collections.emptyList()); } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java index 25667a378e..683f0da628 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java @@ -23,6 +23,7 @@ * Size which may be constructed with a {@link TraditionalBinaryPrefix}. */ public final class SizeInBytes { + public static final SizeInBytes ZERO = valueOf(0); public static final SizeInBytes ONE_KB = valueOf("1k"); public static final SizeInBytes ONE_MB = valueOf("1m"); diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index c571f0c737..9fe2494bf0 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -469,6 +469,7 @@ message StartLeaderElectionReplyProto { // A request to add a new group message GroupAddRequestProto { RaftGroupProto group = 1; // the group to be added. + bool format = 2; // Should it format the storage? } message GroupRemoveRequestProto { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java index e9719b96c4..8d00d29db3 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java @@ -167,8 +167,8 @@ class Builder { private static Method initNewRaftServerMethod() { final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils"; - final Class[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class, - RaftProperties.class, Parameters.class}; + final Class[] argClasses = {RaftPeerId.class, RaftGroup.class, RaftStorage.StartupOption.class, + StateMachine.Registry.class, RaftProperties.class, Parameters.class}; try { final Class clazz = ReflectionUtils.getClassByName(className); return clazz.getMethod("newRaftServer", argClasses); @@ -177,12 +177,12 @@ private static Method initNewRaftServerMethod() { } } - private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, + private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, RaftStorage.StartupOption option, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) throws IOException { try { return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null, - serverId, group, stateMachineRegistry, properties, parameters); + serverId, group, option, stateMachineRegistry, properties, parameters); } catch (IllegalAccessException e) { throw new IllegalStateException("Failed to build " + serverId, e); } catch (InvocationTargetException e) { @@ -193,6 +193,7 @@ private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, private RaftPeerId serverId; private StateMachine.Registry stateMachineRegistry ; private RaftGroup group = null; + private RaftStorage.StartupOption option = RaftStorage.StartupOption.RECOVER; private RaftProperties properties; private Parameters parameters; @@ -201,6 +202,7 @@ public RaftServer build() throws IOException { return newRaftServer( serverId, group, + option, Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " + "is initialized."), Objects.requireNonNull(properties, "The 'properties' field is not initialized."), @@ -230,6 +232,12 @@ public Builder setGroup(RaftGroup group) { return this; } + /** Set the startup option for the group. */ + public Builder setOption(RaftStorage.StartupOption option) { + this.option = option; + return this; + } + /** Set {@link RaftProperties}. */ public Builder setProperties(RaftProperties properties) { this.properties = properties; diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java index dde3c31bce..59d87c37ae 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java @@ -62,8 +62,7 @@ class Builder { private static Method initNewRaftStorageMethod() { final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils"; - //final String className = "org.apache.ratis.server.storage.RaftStorageImpl"; - final Class[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class }; + final Class[] argClasses = {File.class, SizeInBytes.class, StartupOption.class, CorruptionPolicy.class}; try { final Class clazz = ReflectionUtils.getClassByName(className); return clazz.getMethod("newRaftStorage", argClasses); @@ -76,7 +75,7 @@ private static RaftStorage newRaftStorage(File dir, CorruptionPolicy logCorrupti StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException { try { return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null, - dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize()); + dir, storageFreeSpaceMin, option, logCorruptionPolicy); } catch (IllegalAccessException e) { throw new IllegalStateException("Failed to build " + dir, e); } catch (InvocationTargetException e) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 2ee191c567..258e94d9fa 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -189,7 +189,8 @@ public long[] getFollowerNextIndices() { private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); - RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException { + RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) + throws IOException { final RaftPeerId id = proxy.getId(); LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine); this.lifeCycle = new LifeCycle(id); @@ -202,7 +203,7 @@ public long[] getFollowerNextIndices() { this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties); this.proxy = proxy; - this.state = new ServerState(id, group, properties, this, stateMachine); + this.state = new ServerState(id, group, stateMachine, this, option, properties); this.retryCache = new RetryCacheImpl(properties); this.dataStreamMap = new DataStreamMapImpl(id); @@ -575,8 +576,8 @@ public Collection getCommitInfos() { } GroupInfoReply getGroupInfo(GroupInfoRequest request) { - return new GroupInfoReply(request, getCommitInfos(), - getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy()); + final RaftStorageDirectory dir = state.getStorage().getStorageDir(); + return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy()); } RoleInfoProto getRoleInfoProto() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index ad4d988ab7..ad488f53bd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -38,6 +38,7 @@ import org.apache.ratis.server.DataStreamServerRpc; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.ServerFactory; +import org.apache.ratis.server.storage.RaftStorage.StartupOption; import org.apache.ratis.util.ConcurrentUtils; import org.apache.ratis.util.JvmPauseMonitor; import org.apache.ratis.server.RaftServerConfigKeys; @@ -80,7 +81,7 @@ class ImplMap implements Closeable { private final ConcurrentMap> map = new ConcurrentHashMap<>(); private boolean isClosed = false; - synchronized CompletableFuture addNew(RaftGroup group) { + synchronized CompletableFuture addNew(RaftGroup group, StartupOption option) { if (isClosed) { return JavaUtils.completeExceptionally(new AlreadyClosedException( getId() + ": Failed to add " + group + " since the server is already closed")); @@ -90,7 +91,7 @@ synchronized CompletableFuture addNew(RaftGroup group) { getId() + ": Failed to add " + group + " since the group already exists in the map.")); } final RaftGroupId groupId = group.getGroupId(); - final CompletableFuture newImpl = newRaftServerImpl(group); + final CompletableFuture newImpl = newRaftServerImpl(group, option); final CompletableFuture previous = map.put(groupId, newImpl); Preconditions.assertNull(previous, "previous"); LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl)); @@ -230,7 +231,7 @@ private void handleJvmPause(TimeDuration extraSleep, TimeDuration closeThreshold } /** Check the storage dir and add groups*/ - void initGroups(RaftGroup group) { + void initGroups(RaftGroup group, StartupOption option) { final Optional raftGroup = Optional.ofNullable(group); final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null); final Predicate shouldAdd = gid -> gid != null && !gid.equals(raftGroupId); @@ -241,7 +242,7 @@ void initGroups(RaftGroup group) { .filter(File::isDirectory) .forEach(sub -> initGroupDir(sub, shouldAdd)), executor).join(); - raftGroup.ifPresent(this::addGroup); + raftGroup.ifPresent(g -> addGroup(g, option)); } private void initGroupDir(File sub, Predicate shouldAdd) { @@ -255,7 +256,7 @@ private void initGroupDir(File sub, Predicate shouldAdd) { " ignoring it. ", getId(), sub.getAbsolutePath()); } if (shouldAdd.test(groupId)) { - addGroup(RaftGroup.valueOf(groupId)); + addGroup(RaftGroup.valueOf(groupId), StartupOption.RECOVER); } } catch (Exception e) { LOG.warn(getId() + ": Failed to initialize the group directory " @@ -269,11 +270,11 @@ void addRaftPeers(Collection peers) { getDataStreamServerRpc().addRaftPeers(others); } - private CompletableFuture newRaftServerImpl(RaftGroup group) { + private CompletableFuture newRaftServerImpl(RaftGroup group, StartupOption option) { return CompletableFuture.supplyAsync(() -> { try { addRaftPeers(group.getPeers()); - return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this); + return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this, option); } catch(IOException e) { throw new CompletionException(getId() + ": Failed to initialize server for " + group, e); } @@ -341,8 +342,8 @@ public DataStreamServerRpc getDataStreamServerRpc() { return dataStreamServerRpc; } - private CompletableFuture addGroup(RaftGroup group) { - return impls.addNew(group); + private CompletableFuture addGroup(RaftGroup group, StartupOption option) { + return impls.addNew(group, option); } private CompletableFuture getImplFuture(RaftGroupId groupId) { @@ -466,7 +467,7 @@ public CompletableFuture groupManagementAsync(GroupManagementRe } final GroupManagementRequest.Add add = request.getAdd(); if (add != null) { - return groupAddAsync(request, add.getGroup()); + return groupAddAsync(request, add.getGroup(), add.isFormat()); } final GroupManagementRequest.Remove remove = request.getRemove(); if (remove != null) { @@ -477,12 +478,13 @@ public CompletableFuture groupManagementAsync(GroupManagementRe getId() + ": Request not supported " + request)); } - private CompletableFuture groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) { + private CompletableFuture groupAddAsync( + GroupManagementRequest request, RaftGroup newGroup, boolean format) { if (!request.getRaftGroupId().equals(newGroup.getGroupId())) { return JavaUtils.completeExceptionally(new GroupMismatchException( getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup)); } - return impls.addNew(newGroup) + return impls.addNew(newGroup, format? StartupOption.FORMAT: StartupOption.RECOVER) .thenApplyAsync(newImpl -> { LOG.debug("{}: newImpl = {}", getId(), newImpl); try { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 6777b90936..6e1ddd548a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; @@ -45,7 +46,7 @@ private ServerImplUtils() { /** Create a {@link RaftServerProxy}. */ public static RaftServerProxy newRaftServer( - RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry, + RaftPeerId id, RaftGroup group, RaftStorage.StartupOption option, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) throws IOException { RaftServer.LOG.debug("newRaftServer: {}, {}", id, group); if (group != null && !group.getPeers().isEmpty()) { @@ -53,7 +54,7 @@ public static RaftServerProxy newRaftServer( Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group); } final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters); - proxy.initGroups(group); + proxy.initGroups(group, option); return proxy; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 1ee2cab5e6..e92f9b9112 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -36,20 +36,14 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedCheckedSupplier; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; -import java.io.Closeable; -import java.io.File; import java.io.IOException; -import java.nio.channels.OverlappingFileLockException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -63,7 +57,7 @@ /** * Common states of a raft peer. Protected by RaftServer's lock. */ -class ServerState implements Closeable { +class ServerState { private final RaftGroupMemberId memberId; private final RaftServerImpl server; /** Raft log */ @@ -73,7 +67,7 @@ class ServerState implements Closeable { /** The thread that applies committed log entries to the state machine */ private final MemoizedSupplier stateMachineUpdater; /** local storage for log and snapshot */ - private RaftStorageImpl storage; + private final MemoizedCheckedSupplier raftStorage; private final SnapshotManager snapshotManager; private volatile Timestamp lastNoLeaderTime; private final TimeDuration noLeaderTimeout; @@ -100,9 +94,8 @@ class ServerState implements Closeable { */ private final AtomicReference latestInstalledSnapshot = new AtomicReference<>(); - ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop, - RaftServerImpl server, StateMachine stateMachine) - throws IOException { + ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server, + RaftStorage.StartupOption option, RaftProperties prop) { this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId()); this.server = server; Collection followerPeers = group.getPeers().stream() @@ -117,36 +110,11 @@ class ServerState implements Closeable { configurationManager = new ConfigurationManager(initialConf); LOG.info("{}: {}", getMemberId(), configurationManager); - boolean storageFound = false; - List directories = RaftServerConfigKeys.storageDir(prop); - while (!directories.isEmpty()) { - // use full uuid string to create a subdirectory - File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString()); - try { - storage = (RaftStorageImpl) RaftStorage.newBuilder() - .setDirectory(dir) - .setOption(RaftStorage.StartupOption.RECOVER) - .setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop)) - .setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop)) - .build(); - storageFound = true; - break; - } catch (IOException e) { - if (e.getCause() instanceof OverlappingFileLockException) { - throw e; - } - LOG.warn("Failed to init RaftStorage under {} for {}: {}", - dir.getParent(), group.getGroupId().getUuid().toString(), e); - directories.removeIf(d -> d.getAbsolutePath().equals(dir.getParent())); - } - } - - if (!storageFound) { - throw new IOException("No healthy directories found for RaftStorage among: " + - RaftServerConfigKeys.storageDir(prop)); - } + final String storageDirName = group.getGroupId().getUuid().toString(); + this.raftStorage = MemoizedCheckedSupplier.valueOf( + () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop)); - snapshotManager = new SnapshotManager(storage, id); + this.snapshotManager = StorageImplUtils.newSnapshotManager(id); // On start the leader is null, start the clock now this.leaderId = null; @@ -163,7 +131,8 @@ class ServerState implements Closeable { } void initialize(StateMachine stateMachine) throws IOException { - storage.initialize(); + // initialize raft storage + final RaftStorageImpl storage = raftStorage.get(); // read configuration from the storage Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf); @@ -180,32 +149,8 @@ RaftGroupMemberId getMemberId() { return memberId; } - static File chooseStorageDir(List volumes, String targetSubDir) throws IOException { - final Map numberOfStorageDirPerVolume = new HashMap<>(); - final File[] empty = {}; - final List resultList = new ArrayList<>(); - volumes.stream().flatMap(volume -> { - final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(empty); - numberOfStorageDirPerVolume.put(volume, dirs.length); - return Arrays.stream(dirs); - }).filter(dir -> targetSubDir.equals(dir.getName())) - .forEach(resultList::add); - - if (resultList.size() > 1) { - throw new IOException("More than one directories found for " + targetSubDir + ": " + resultList); - } - if (resultList.size() == 1) { - return resultList.get(0); - } - return numberOfStorageDirPerVolume.entrySet().stream() - .min(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey) - .map(v -> new File(v, targetSubDir)) - .orElseThrow(() -> new IOException("No storage directory found.")); - } - void writeRaftConfiguration(LogEntryProto conf) { - storage.writeRaftConfiguration(conf); + getStorage().writeRaftConfiguration(conf); } void start() { @@ -214,7 +159,8 @@ void start() { private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) { try { - return initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop); + return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf, + getSnapshotIndexFromStateMachine, prop); } catch (IOException e) { throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e); } @@ -259,10 +205,6 @@ RaftPeerId getLeaderId() { return leaderId; } - boolean hasLeader() { - return leaderId != null; - } - /** * Become a candidate and start leader election */ @@ -434,7 +376,7 @@ void setRaftConf(RaftConfiguration conf) { void updateConfiguration(List entries) { if (entries != null && !entries.isEmpty()) { configurationManager.removeConfigurations(entries.get(0).getIndex()); - entries.stream().forEach(this::setRaftConf); + entries.forEach(this::setRaftConf); } } @@ -455,29 +397,45 @@ void reloadStateMachine(long lastIndexInSnapshot) { getStateMachineUpdater().reloadStateMachine(); } - @Override - public void close() throws IOException { + void close() { + try { + if (stateMachineUpdater.isInitialized()) { + getStateMachineUpdater().stopAndJoin(); + } + } catch (Throwable e) { + LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e); + } + LOG.info("{}: applyIndex: {}", getMemberId(), getLastAppliedIndex()); + try { - getStateMachineUpdater().stopAndJoin(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e); + if (log.isInitialized()) { + getLog().close(); + } + } catch (Throwable e) { + LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e); } - LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex()); - getLog().close(); - storage.close(); + try { + if (raftStorage.isInitialized()) { + getStorage().close(); + } + } catch (Throwable e) { + LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e); + } } - RaftStorage getStorage() { - return storage; + RaftStorageImpl getStorage() { + if (!raftStorage.isInitialized()) { + throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized."); + } + return raftStorage.getUnchecked(); } void installSnapshot(InstallSnapshotRequestProto request) throws IOException { // TODO: verify that we need to install the snapshot StateMachine sm = server.getStateMachine(); sm.pause(); // pause the SM to prepare for install snapshot - snapshotManager.installSnapshot(sm, request); + snapshotManager.installSnapshot(request, sm, getStorage().getStorageDir()); updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex())); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java index a7fa13b53e..a86cdf56bc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java @@ -19,6 +19,7 @@ import org.apache.ratis.util.AtomicFileOutputStream; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.SizeInBytes; import java.io.File; import java.io.IOException; @@ -51,13 +52,13 @@ enum StorageState { private final File root; // root directory private FileLock lock; // storage lock - private long freeSpaceMin; + private final SizeInBytes freeSpaceMin; /** * Constructor * @param dir directory corresponding to the storage */ - RaftStorageDirectoryImpl(File dir, long freeSpaceMin) { + RaftStorageDirectoryImpl(File dir, SizeInBytes freeSpaceMin) { this.root = dir; this.lock = null; this.freeSpaceMin = freeSpaceMin; @@ -177,7 +178,7 @@ public boolean isHealthy() { } private boolean hasEnoughSpace() { - return root.getFreeSpace() > freeSpaceMin; + return root.getFreeSpace() >= freeSpaceMin.getSize(); } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java index 6513efd30e..56972c3f70 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java @@ -23,6 +23,7 @@ import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.SizeInBytes; import java.io.File; import java.io.FileNotFoundException; @@ -34,17 +35,16 @@ /** The storage of a {@link org.apache.ratis.server.RaftServer}. */ public class RaftStorageImpl implements RaftStorage { - - // TODO support multiple storage directories private final RaftStorageDirectoryImpl storageDir; private final StartupOption startupOption; private final CorruptionPolicy logCorruptionPolicy; private volatile StorageState state = StorageState.UNINITIALIZED; private volatile RaftStorageMetadataFileImpl metaFile; - RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option, - long storageFeeSpaceMin) { - this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin); + RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option, CorruptionPolicy logCorruptionPolicy) { + LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={}, logCorruptionPolicy={}", + dir, freeSpaceMin, option, logCorruptionPolicy); + this.storageDir = new RaftStorageDirectoryImpl(dir, freeSpaceMin); this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault); this.startupOption = option; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index aaa62a7837..17294b572f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -54,21 +54,17 @@ public class SnapshotManager { private static final String CORRUPT = ".corrupt"; private static final String TMP = ".tmp"; - private final RaftStorage storage; private final RaftPeerId selfId; private final Supplier digester = JavaUtils.memoize(MD5Hash::getDigester); - public SnapshotManager(RaftStorage storage, RaftPeerId selfId) { - this.storage = storage; + SnapshotManager(RaftPeerId selfId) { this.selfId = selfId; } - public void installSnapshot(StateMachine stateMachine, - InstallSnapshotRequestProto request) throws IOException { - final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = - request.getSnapshotChunk(); + public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir) + throws IOException { + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); - final RaftStorageDirectory dir = storage.getStorageDir(); // create a unique temporary directory final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId()); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java index aeff601481..865e2b2b16 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java @@ -17,38 +17,158 @@ */ package org.apache.ratis.server.storage; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.util.IOUtils; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.server.RaftServerConfigKeys.Log; +import org.apache.ratis.server.storage.RaftStorage.StartupOption; +import org.apache.ratis.util.SizeInBytes; import java.io.File; import java.io.IOException; -import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.ratis.server.RaftServer.Division.LOG; public final class StorageImplUtils { + private static final File[] EMPTY_FILE_ARRAY = {}; private StorageImplUtils() { //Never constructed } + public static SnapshotManager newSnapshotManager(RaftPeerId id) { + return new SnapshotManager(id); + } + /** Create a {@link RaftStorageImpl}. */ - public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy, - RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException { - RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin); - - final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); - final RaftStorageImpl raftStorage; - try { - // attempt multiple times to avoid temporary bind exception - raftStorage = JavaUtils.attemptRepeatedly( - () -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin), - 5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw IOUtils.toInterruptedIOException( - "Interrupted when creating RaftStorage " + dir, e); + public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes freeSpaceMin, + RaftStorage.StartupOption option, Log.CorruptionPolicy logCorruptionPolicy) { + return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy); + } + + /** @return a list of existing subdirectories matching the given storage directory name from the given volumes. */ + static List getExistingStorageSubs(List volumes, String targetSubDir, Map dirsPerVol) { + return volumes.stream().flatMap(volume -> { + final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(EMPTY_FILE_ARRAY); + Optional.ofNullable(dirsPerVol).ifPresent(map -> map.put(volume, dirs.length)); + return Arrays.stream(dirs); + }).filter(dir -> targetSubDir.equals(dir.getName())) + .collect(Collectors.toList()); + } + + /** @return a volume with the min dirs. */ + static File chooseMin(Map dirsPerVol) throws IOException { + return dirsPerVol.entrySet().stream() + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .orElseThrow(() -> new IOException("No storage directory found.")); + } + + /** + * Choose a {@link RaftStorage} for the given storage directory name from the given configuration properties + * and then try to call {@link RaftStorage#initialize()}. + *

+ * {@link StartupOption#FORMAT}: + * - When there are more than one existing directories, throw an exception. + * - When there is an existing directory, throw an exception. + * - When there is no existing directory, try to initialize a new directory from the list specified + * in the configuration properties until a directory succeeded or all directories failed. + *

+ * {@link StartupOption#RECOVER}: + * - When there are more than one existing directories, throw an exception. + * - When there is an existing directory, if it fails to initialize, throw an exception but not try a new directory. + * - When there is no existing directory, throw an exception. + * + * @param storageDirName the storage directory name + * @param option the startup option + * @param properties the configuration properties + * @return the chosen storage, which is initialized successfully. + */ + public static RaftStorageImpl initRaftStorage(String storageDirName, StartupOption option, + RaftProperties properties) throws IOException { + return new Op(storageDirName, option, properties).run(); + } + + private static class Op { + private final String storageDirName; + private final StartupOption option; + + private final SizeInBytes freeSpaceMin; + private final Log.CorruptionPolicy logCorruptionPolicy; + private final List dirsInConf; + + private final List existingSubs; + private final Map dirsPerVol = new HashMap<>(); + + Op(String storageDirName, StartupOption option, RaftProperties properties) { + this.storageDirName = storageDirName; + this.option = option; + + this.freeSpaceMin = RaftServerConfigKeys.storageFreeSpaceMin(properties); + this.logCorruptionPolicy = RaftServerConfigKeys.Log.corruptionPolicy(properties); + this.dirsInConf = RaftServerConfigKeys.storageDir(properties); + + this.existingSubs = getExistingStorageSubs(dirsInConf, this.storageDirName, dirsPerVol); + } + + RaftStorageImpl run() throws IOException { + if (option == StartupOption.FORMAT) { + return format(); + } else if (option == StartupOption.RECOVER) { + return recover(); + } else { + throw new IllegalArgumentException("Illegal option: " + option); + } + } + + private RaftStorageImpl format() throws IOException { + if (!existingSubs.isEmpty()) { + throw new IOException("Failed to " + option + ": One or more existing directories found " + existingSubs + + " for " + storageDirName); + } + + for (; !dirsPerVol.isEmpty(); ) { + final File vol = chooseMin(dirsPerVol); + final File dir = new File(vol, storageDirName); + try { + final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.FORMAT, logCorruptionPolicy); + storage.initialize(); + return storage; + } catch (Throwable e) { + LOG.warn("Failed to initialize a new directory " + dir.getAbsolutePath(), e); + dirsPerVol.remove(vol); + } + } + throw new IOException("Failed to FORMAT a new storage dir for " + storageDirName + " from " + dirsInConf); + } + + private RaftStorageImpl recover() throws IOException { + final int size = existingSubs.size(); + if (size > 1) { + throw new IOException("Failed to " + option + ": More than one existing directories found " + + existingSubs + " for " + storageDirName); + } else if (size == 0) { + throw new IOException("Failed to " + option + ": Storage directory not found for " + + storageDirName + " from " + dirsInConf); + } + + final File dir = existingSubs.get(0); + try { + final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.RECOVER, logCorruptionPolicy); + storage.initialize(); + return storage; + } catch (Throwable e) { + if (e instanceof IOException) { + throw e; + } + throw new IOException("Failed to initialize the existing directory " + dir.getAbsolutePath(), e); + } } - return raftStorage; } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 1f40475245..dfed969525 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -40,6 +40,7 @@ import org.apache.ratis.server.ServerFactory; import org.apache.ratis.server.raftlog.memory.MemoryRaftLog; import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.util.CollectionUtils; @@ -385,8 +386,9 @@ private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, boolean fo } final RaftProperties prop = new RaftProperties(properties); RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir)); - return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop, - setPropertiesAndInitParameters(id, group, prop)); + return ServerImplUtils.newRaftServer(id, group, + format? RaftStorage.StartupOption.FORMAT: RaftStorage.StartupOption.RECOVER, + getStateMachineRegistry(prop), prop, setPropertiesAndInitParameters(id, group, prop)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index e09ca19d15..5e6353f925 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -365,7 +365,7 @@ void runTestReconfTimeout(CLUSTER cluster) throws Exception { // start the two new peers LOG.info("Start new peers"); for (RaftPeer np : c1.newPeers) { - cluster.restartServer(np.getId(), false); + cluster.restartServer(np.getId(), true); } Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess()); } @@ -504,7 +504,7 @@ void runTestKillLeaderDuringReconf(CLUSTER cluster) throws Exception { LOG.info("start new peers: {}", Arrays.asList(c1.newPeers)); for (RaftPeer np : c1.newPeers) { - cluster.restartServer(np.getId(), false); + cluster.restartServer(np.getId(), true); } try { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java index ffc8de597b..7027bd8eab 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -114,7 +114,7 @@ public void testNotExistent() throws IOException { */ @Test public void testStorage() throws Exception { - final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0); + final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO); try { StorageState state = sd.analyzeStorage(true); Assert.assertEquals(StorageState.NOT_FORMATTED, state); @@ -171,7 +171,7 @@ public void testCleanMetaTmpFile() throws Exception { Assert.assertEquals(StorageState.NORMAL, storage.getState()); storage.close(); - final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0); + final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO); File metaFile = sd.getMetaFile(); FileUtils.move(metaFile, sd.getMetaTmpFile()); @@ -286,7 +286,7 @@ public void testNotEnoughSpace() throws IOException { File mockStorageDir = Mockito.spy(storageDir); Mockito.when(mockStorageDir.getFreeSpace()).thenReturn(100L); // 100B - final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, 104857600); // 100MB + final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, SizeInBytes.valueOf("100M")); StorageState state = sd.analyzeStorage(false); Assert.assertEquals(StorageState.NO_SPACE, state); } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java similarity index 82% rename from ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java rename to ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java index 75aef53a11..ff38a6bd94 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.ratis.server.impl; +package org.apache.ratis.server.storage; import org.apache.ratis.BaseTest; import org.apache.ratis.util.FileUtils; @@ -28,7 +28,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; @@ -37,13 +39,20 @@ /** * Test cases to verify ServerState. */ -public class TestServerState { +public class TestStorageImplUtils { private static final Supplier rootTestDir = JavaUtils.memoize( () -> new File(BaseTest.getRootTestDir(), - JavaUtils.getClassSimpleName(TestServerState.class) + + JavaUtils.getClassSimpleName(TestStorageImplUtils.class) + Integer.toHexString(ThreadLocalRandom.current().nextInt()))); + static File chooseNewStorageDir(List volumes, String sub) throws IOException { + final Map numDirPerVolume = new HashMap<>(); + StorageImplUtils.getExistingStorageSubs(volumes, sub, numDirPerVolume); + final File vol = StorageImplUtils.chooseMin(numDirPerVolume); + return new File(vol, sub); + } + @AfterClass public static void tearDown() throws IOException { FileUtils.deleteFully(rootTestDir.get()); @@ -60,8 +69,8 @@ public void testChooseStorageDirWithOneVolume() throws IOException { List directories = Collections.singletonList(testDir); String subDirOne = UUID.randomUUID().toString(); String subDirTwo = UUID.randomUUID().toString(); - File storageDirOne = ServerState.chooseStorageDir(directories, subDirOne); - File storageDirTwo = ServerState.chooseStorageDir(directories, subDirTwo); + final File storageDirOne = chooseNewStorageDir(directories, subDirOne); + final File storageDirTwo = chooseNewStorageDir(directories, subDirTwo); File expectedOne = new File(testDir, subDirOne); File expectedTwo = new File(testDir, subDirTwo); Assert.assertEquals(expectedOne.getCanonicalPath(), @@ -100,7 +109,7 @@ public void testChooseStorageDirWithMultipleVolumes() throws IOException { } }); String subDir = UUID.randomUUID().toString(); - File storageDirectory = ServerState.chooseStorageDir(directories, subDir); + final File storageDirectory = chooseNewStorageDir(directories, subDir); File expected = new File(directories.get(6), subDir); Assert.assertEquals(expected.getCanonicalPath(), storageDirectory.getCanonicalPath()); @@ -108,19 +117,15 @@ public void testChooseStorageDirWithMultipleVolumes() throws IOException { /** * Tests choosing of storage directory when only no volume is configured. - * - * @throws IOException in case of exception. */ @Test public void testChooseStorageDirWithNoVolume() { try { - ServerState.chooseStorageDir( - Collections.emptyList(), UUID.randomUUID().toString()); + chooseNewStorageDir(Collections.emptyList(), UUID.randomUUID().toString()); Assert.fail(); } catch (IOException ex) { String expectedErrMsg = "No storage directory found."; Assert.assertEquals(expectedErrMsg, ex.getMessage()); } } - } \ No newline at end of file