Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
package org.apache.ratis.server.storage;

import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.ReflectionUtils;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/** The storage of a raft server. */
public interface RaftStorage extends Closeable {
Expand All @@ -35,4 +42,77 @@ public interface RaftStorage extends Closeable {

/** @return the corruption policy for raft log. */
CorruptionPolicy getLogCorruptionPolicy();

static Builder newBuilder() {
return new Builder();
}

enum StartupOption {
/** Format the storage. */
FORMAT,
RECOVER
}

class Builder {

private static final Method NEW_RAFT_STORAGE_METHOD = initNewRaftStorageMethod();

private static Method initNewRaftStorageMethod() {
final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils";
//final String className = "org.apache.ratis.server.storage.RaftStorageImpl";
final Class<?>[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class };
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftStorage", argClasses);
} catch (Exception e) {
throw new IllegalStateException("Failed to initNewRaftStorageMethod", e);
}
}

private static RaftStorage newRaftStorage(File dir, CorruptionPolicy logCorruptionPolicy,
StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException {
try {
return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null,
dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + dir, e);
} catch (InvocationTargetException e) {
Throwable t = e.getTargetException();
if (t.getCause() instanceof IOException) {
throw IOUtils.asIOException(t.getCause());
}
throw IOUtils.asIOException(e.getCause());
}
}


private File directory;
private CorruptionPolicy logCorruptionPolicy;
private StartupOption option;
private SizeInBytes storageFreeSpaceMin;

public Builder setDirectory(File directory) {
this.directory = directory;
return this;
}

public Builder setLogCorruptionPolicy(CorruptionPolicy logCorruptionPolicy) {
this.logCorruptionPolicy = logCorruptionPolicy;
return this;
}

public Builder setOption(StartupOption option) {
this.option = option;
return this;
}

public Builder setStorageFreeSpaceMin(SizeInBytes storageFreeSpaceMin) {
this.storageFreeSpaceMin = storageFreeSpaceMin;
return this;
}

public RaftStorage build() throws IOException {
return newRaftStorage(directory, logCorruptionPolicy, option, storageFreeSpaceMin);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ class ServerState implements Closeable {
// use full uuid string to create a subdirectory
File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString());
try {
storage = new RaftStorageImpl(dir, RaftServerConfigKeys.Log.corruptionPolicy(prop),
RaftServerConfigKeys.storageFreeSpaceMin(prop).getSize());
storage = (RaftStorageImpl) RaftStorage.newBuilder()
.setDirectory(dir)
.setOption(RaftStorage.StartupOption.RECOVER)
.setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop))
.setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop))
.build();
storageFound = true;
break;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,12 @@
/** The storage of a {@link org.apache.ratis.server.RaftServer}. */
public class RaftStorageImpl implements RaftStorage {

public enum StartupOption {
/** Format the storage. */
FORMAT
}

// TODO support multiple storage directories
private final RaftStorageDirectoryImpl storageDir;
private final StorageState state;
private final CorruptionPolicy logCorruptionPolicy;
private volatile RaftStorageMetadataFileImpl metaFile;

public RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy,
long storageFeeSpaceMin) throws IOException {
this(dir, logCorruptionPolicy, null, storageFeeSpaceMin);
}

RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option,
long storageFeeSpaceMin) throws IOException {
this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.storage;

import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public final class StorageImplUtils {

private StorageImplUtils() {
//Never constructed
}

/** Create a {@link RaftStorageImpl}. */
public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy,
RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException {
RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin);

final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
final RaftStorageImpl raftStorage;
try {
// attempt multiple times to avoid temporary bind exception
raftStorage = JavaUtils.attemptRepeatedly(
() -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin),
5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(
"Interrupted when creating RaftStorage " + dir, e);
} catch (IOException e) {
throw new RuntimeException(e);
}
return raftStorage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.RATIS_LOG_WORKER_METRICS;

import org.apache.ratis.metrics.RatisMetrics;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogBase;
Expand All @@ -33,7 +34,11 @@

public interface RaftStorageTestUtils {
static RaftStorage newRaftStorage(File dir) throws IOException {
return new RaftStorageImpl(dir, null, 0L);
return RaftStorage.newBuilder()
.setDirectory(dir)
.setOption(RaftStorage.StartupOption.RECOVER)
.setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT)
.build();
}

static String getLogFlushTimeMetric(String memberId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,13 @@ void runTestServerRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exc
// the raft server proxy created earlier. Raft server proxy should close
// the rpc server on failure.
RaftServerConfigKeys.setStorageDir(p, Collections.singletonList(cluster.getStorageDir(leaderId)));
testFailureCase("start a new server with the same address",
() -> newRaftServer(cluster, leaderId, stateMachine, p).start(),
IOException.class, OverlappingFileLockException.class);
try {
LOG.info("start a new server with the same address");
newRaftServer(cluster, leaderId, stateMachine, p).start();
} catch (IOException e) {
Assert.assertTrue(e.getCause() instanceof OverlappingFileLockException);
Assert.assertTrue(e.getMessage().contains("directory is already locked"));
}
// Try to start a raft server rpc at the leader address.
cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.SizeInBytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -47,7 +49,11 @@
*/
public class TestRaftStorage extends BaseTest {
static RaftStorageImpl newRaftStorage(File dir) throws IOException {
return new RaftStorageImpl(dir, null, 0);
return (RaftStorageImpl) RaftStorage.newBuilder()
.setDirectory(dir)
.setOption(RaftStorage.StartupOption.RECOVER)
.setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT)
.build();
}

private File storageDir;
Expand All @@ -65,7 +71,11 @@ public void tearDown() throws Exception {
}

static RaftStorageImpl formatRaftStorage(File dir) throws IOException {
return new RaftStorageImpl(dir, null, RaftStorageImpl.StartupOption.FORMAT, 0);
return (RaftStorageImpl) RaftStorage.newBuilder()
.setDirectory(dir)
.setOption(RaftStorage.StartupOption.FORMAT)
.setStorageFreeSpaceMin(SizeInBytes.valueOf(0))
.build();
}

@Test
Expand Down