Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@
* APIs to support group management operations such as add, remove, list and info to a particular server.
*/
public interface GroupManagementApi {
/** Add a new group. */
RaftClientReply add(RaftGroup newGroup) throws IOException;
/**
* Add a new group.
* @param format Should it format the storage?
*/
RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException;

/** The same as add(newGroup, true). */
default RaftClientReply add(RaftGroup newGroup) throws IOException {
return add(newGroup, true);
}

/** Remove a group. */
RaftClientReply remove(RaftGroupId groupId, boolean deleteDirectory, boolean renameDirectory) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,9 @@ static GroupManagementRequest toGroupManagementRequest(GroupManagementRequestPro
final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
switch(p.getOpCase()) {
case GROUPADD:
final GroupAddRequestProto add = p.getGroupAdd();
return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(),
ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup()));
ProtoUtils.toRaftGroup(add.getGroup()), add.getFormat());
case GROUPREMOVE:
final GroupRemoveRequestProto remove = p.getGroupRemove();
return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(),
Expand Down Expand Up @@ -609,8 +610,10 @@ static GroupManagementRequestProto toGroupManagementRequestProto(GroupManagement
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
final GroupManagementRequest.Add add = request.getAdd();
if (add != null) {
b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup(
ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build());
b.setGroupAdd(GroupAddRequestProto.newBuilder()
.setGroup(ProtoUtils.toRaftGroupProtoBuilder(add.getGroup()))
.setFormat(add.isFormat())
.build());
}
final GroupManagementRequest.Remove remove = request.getRemove();
if (remove != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class GroupManagementImpl implements GroupManagementApi {
}

@Override
public RaftClientReply add(RaftGroup newGroup) throws IOException {
public RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException {
Objects.requireNonNull(newGroup, "newGroup == null");

final long callId = CallId.getAndIncrement();
client.getClientRpc().addRaftPeers(newGroup.getPeers());
return client.io().sendRequestWithRetry(
() -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup));
() -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup, format));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ public abstract static class Op {

public static class Add extends Op {
private final RaftGroup group;
private final boolean format;

public Add(RaftGroup group) {
public Add(RaftGroup group, boolean format) {
this.group = group;
this.format = format;
}

@Override
Expand All @@ -40,6 +42,10 @@ public RaftGroup getGroup() {
return group;
}

public boolean isFormat() {
return format;
}

@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + getGroup();
Expand Down Expand Up @@ -79,8 +85,9 @@ public String toString() {
}
}

public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) {
return new GroupManagementRequest(clientId, serverId, callId, new Add(group));
public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId,
RaftGroup group, boolean format) {
return new GroupManagementRequest(clientId, serverId, callId, new Add(group, format));
}

public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;

import org.apache.ratis.util.function.CheckedSupplier;

import java.util.Objects;

/**
* A memoized supplier is a {@link CheckedSupplier}
* which gets a value by invoking its initializer once.
* and then keeps returning the same value as its supplied results.
*
* This class is thread safe.
*
* @param <RETURN> The return type of the supplier.
* @param <THROW> The throwable type of the supplier.
*/
public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable>
implements CheckedSupplier<RETURN, THROW> {
/**
* @param supplier to supply at most one non-null value.
* @return a {@link MemoizedCheckedSupplier} with the given supplier.
*/
public static <RETURN, THROW extends Throwable> MemoizedCheckedSupplier<RETURN, THROW> valueOf(
CheckedSupplier<RETURN, THROW> supplier) {
return supplier instanceof MemoizedCheckedSupplier ?
(MemoizedCheckedSupplier<RETURN, THROW>) supplier : new MemoizedCheckedSupplier<>(supplier);
}

private final CheckedSupplier<RETURN, THROW> initializer;
private volatile RETURN value = null;

/**
* Create a memoized supplier.
* @param initializer to supply at most one non-null value.
*/
private MemoizedCheckedSupplier(CheckedSupplier<RETURN, THROW> initializer) {
Objects.requireNonNull(initializer, "initializer == null");
this.initializer = initializer;
}

/** @return the lazily initialized object. */
@Override
public RETURN get() throws THROW {
RETURN v = value;
if (v == null) {
synchronized (this) {
v = value;
if (v == null) {
v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null");
}
}
}
return v;
}

/**
* @return the already initialized object.
* @throws NullPointerException if the object is uninitialized.
*/
public RETURN getUnchecked() {
return Objects.requireNonNull(value, "value == null");
}

/** @return is the object initialized? */
public boolean isInitialized() {
return value != null;
}

@Override
public String toString() {
return isInitialized()? "Memoized:" + value: "UNINITIALIZED";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

Expand Down Expand Up @@ -113,6 +114,10 @@ static <T> T assertInstanceOf(Object object, Class<T> clazz) {
return clazz.cast(object);
}

static <K, V> void assertEmpty(Map<K, V> map, Object name) {
assertTrue(map.isEmpty(), () -> "The " + name + " map is non-empty: " + map);
}

static <T> void assertUnique(Iterable<T> first) {
assertUnique(first, Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Size which may be constructed with a {@link TraditionalBinaryPrefix}.
*/
public final class SizeInBytes {
public static final SizeInBytes ZERO = valueOf(0);
public static final SizeInBytes ONE_KB = valueOf("1k");
public static final SizeInBytes ONE_MB = valueOf("1m");

Expand Down
1 change: 1 addition & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ message StartLeaderElectionReplyProto {
// A request to add a new group
message GroupAddRequestProto {
RaftGroupProto group = 1; // the group to be added.
bool format = 2; // Should it format the storage?
}

message GroupRemoveRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ class Builder {

private static Method initNewRaftServerMethod() {
final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils";
final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class,
RaftProperties.class, Parameters.class};
final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, RaftStorage.StartupOption.class,
StateMachine.Registry.class, RaftProperties.class, Parameters.class};
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftServer", argClasses);
Expand All @@ -177,12 +177,12 @@ private static Method initNewRaftServerMethod() {
}
}

private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, RaftStorage.StartupOption option,
StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
throws IOException {
try {
return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null,
serverId, group, stateMachineRegistry, properties, parameters);
serverId, group, option, stateMachineRegistry, properties, parameters);
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + serverId, e);
} catch (InvocationTargetException e) {
Expand All @@ -193,6 +193,7 @@ private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
private RaftPeerId serverId;
private StateMachine.Registry stateMachineRegistry ;
private RaftGroup group = null;
private RaftStorage.StartupOption option = RaftStorage.StartupOption.RECOVER;
private RaftProperties properties;
private Parameters parameters;

Expand All @@ -201,6 +202,7 @@ public RaftServer build() throws IOException {
return newRaftServer(
serverId,
group,
option,
Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " +
"is initialized."),
Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
Expand Down Expand Up @@ -230,6 +232,12 @@ public Builder setGroup(RaftGroup group) {
return this;
}

/** Set the startup option for the group. */
public Builder setOption(RaftStorage.StartupOption option) {
this.option = option;
return this;
}

/** Set {@link RaftProperties}. */
public Builder setProperties(RaftProperties properties) {
this.properties = properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class Builder {

private static Method initNewRaftStorageMethod() {
final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils";
//final String className = "org.apache.ratis.server.storage.RaftStorageImpl";
final Class<?>[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class };
final Class<?>[] argClasses = {File.class, SizeInBytes.class, StartupOption.class, CorruptionPolicy.class};
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftStorage", argClasses);
Expand All @@ -76,7 +75,7 @@ private static RaftStorage newRaftStorage(File dir, CorruptionPolicy logCorrupti
StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException {
try {
return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null,
dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize());
dir, storageFreeSpaceMin, option, logCorruptionPolicy);
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + dir, e);
} catch (InvocationTargetException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ public long[] getFollowerNextIndices() {

private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);

RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
this.lifeCycle = new LifeCycle(id);
Expand All @@ -202,7 +203,7 @@ public long[] getFollowerNextIndices() {
this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
this.proxy = proxy;

this.state = new ServerState(id, group, properties, this, stateMachine);
this.state = new ServerState(id, group, stateMachine, this, option, properties);
this.retryCache = new RetryCacheImpl(properties);
this.dataStreamMap = new DataStreamMapImpl(id);

Expand Down Expand Up @@ -575,8 +576,8 @@ public Collection<CommitInfoProto> getCommitInfos() {
}

GroupInfoReply getGroupInfo(GroupInfoRequest request) {
return new GroupInfoReply(request, getCommitInfos(),
getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
final RaftStorageDirectory dir = state.getStorage().getStorageDir();
return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy());
}

RoleInfoProto getRoleInfoProto() {
Expand Down
Loading