Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ message ClearSlowLogResponses {
required bool is_cleaned = 1;
}

message GetLogFileSizeIfBeingWrittenRequest {
required string wal_path = 1;
}

message GetLogFileSizeIfBeingWrittenResponse {
required bool is_being_written = 1;
optional uint64 length = 2;
}

service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
Expand Down Expand Up @@ -399,4 +408,7 @@ service AdminService {
rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc GetLogFileSizeIfBeingWritten(GetLogFileSizeIfBeingWrittenRequest)
returns(GetLogFileSizeIfBeingWrittenResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
Expand Down Expand Up @@ -216,4 +218,10 @@ public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
ExecuteProceduresRequest request) {
return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
}

public CompletableFuture<GetLogFileSizeIfBeingWrittenResponse> getLogFileSizeIfBeingWritten(
GetLogFileSizeIfBeingWrittenRequest request) {
return call((stub, controller, done) ->
stub.getLogFileSizeIfBeingWritten(controller, request, done));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2307,7 +2307,7 @@ public LogRoller getWalRoller() {
return walRoller;
}

WALFactory getWalFactory() {
public WALFactory getWalFactory() {
return walFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -135,6 +136,7 @@
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
Expand Down Expand Up @@ -188,6 +190,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
Expand Down Expand Up @@ -4019,6 +4023,26 @@ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
throw new ServiceException("Invalid request params");
}

@Override
public GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten(
RpcController controller, GetLogFileSizeIfBeingWrittenRequest request) throws ServiceException {
GetLogFileSizeIfBeingWrittenResponse.Builder builder =
GetLogFileSizeIfBeingWrittenResponse.newBuilder();
try {
WALFileLengthProvider walLengthProvider =
this.regionServer.getWalFactory().getWALProvider().getWALFileLengthProvider();
OptionalLong lengthOptional =
walLengthProvider.getLogFileSizeIfBeingWritten(new Path(request.getWalPath()));
if (lengthOptional.isPresent()) {
return builder.setIsBeingWritten(true).setLength(lengthOptional.getAsLong()).build();
} else {
return builder.setIsBeingWritten(false).build();
}
} catch (Exception e) {
throw new ServiceException(e);
}
}

public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.replication.replicationserver.RemoteWALFileLengthProvider;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -718,7 +720,7 @@ public void startReplicationSource(ServerName producer, String queueId)
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
// init replication source
src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this,
producer, queueId, clusterId, p -> OptionalLong.empty(), metrics);
producer, queueId, clusterId, createWALFileLengthProvider(producer, queueId), metrics);
queueStorage.getWALsInQueue(producer, queueId)
.forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
src.startup();
Expand Down Expand Up @@ -746,4 +748,11 @@ private void abortWhenFail(ReplicationQueueOperation op) {
abort("Failed to operate on replication queue", e);
}
}

private WALFileLengthProvider createWALFileLengthProvider(ServerName producer, String queueId) {
if (new ReplicationQueueInfo(queueId).isQueueRecovered()) {
return p -> OptionalLong.empty();
}
return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.OptionalLong;

import org.apache.hadoop.fs.Path;
Expand All @@ -33,5 +34,5 @@
@FunctionalInterface
public interface WALFileLengthProvider {

OptionalLong getLogFileSizeIfBeingWritten(Path path);
OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.replicationserver;

import java.io.IOException;
import java.util.OptionalLong;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;

/**
* Used by ReplicationServer while Replication offload enabled.
* On ReplicationServer, we need to know the length of the wal being writing from RegionServer that
* holds the wal. So achieve that through RPC call.
*/
@InterfaceAudience.Private
public class RemoteWALFileLengthProvider implements WALFileLengthProvider {

private static final Logger LOG = LoggerFactory.getLogger(RemoteWALFileLengthProvider.class);

private AsyncClusterConnection conn;

private ServerName rs;

public RemoteWALFileLengthProvider(AsyncClusterConnection conn, ServerName rs) {
this.conn = conn;
this.rs = rs;
}

@Override
public OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException {
AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(rs);
GetLogFileSizeIfBeingWrittenRequest request =
GetLogFileSizeIfBeingWrittenRequest.newBuilder().setWalPath(path.toString()).build();
try {
AdminProtos.GetLogFileSizeIfBeingWrittenResponse response =
FutureUtils.get(rsAdmin.getLogFileSizeIfBeingWritten(request));
if (response.getIsBeingWritten()) {
return OptionalLong.of(response.getLength());
} else {
return OptionalLong.empty();
}
} catch (IOException e) {
LOG.warn("Exceptionally get the length of wal {} from RS {}", path.getName(), rs);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,18 @@ interface AsyncWriter extends WriterBase {
void addWALActionsListener(WALActionsListener listener);

default WALFileLengthProvider getWALFileLengthProvider() {
return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path))
.filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
return path -> getWALs().stream().map(w -> {
try {
return w.getLogFileSizeIfBeingWritten(path);
} catch (IOException e) {
// Won't go here. For supporting replication offload in HBASE-24737, we introduce
// RemoteWALFileLengthProvider implementing WALFileLengthProvider, it is hold by
// ReplicationServer and gets the length of WALs from RS through RPC, it may throw an IOE.
// So we need declare WALFileLengthProvider.getLogFileSizeIfBeingWritten as throwing IOE.
// But this is safe here, WALProvider is only used by RS, getWALs returns WAL that extents
// WALFileLengthProvider and won't throw IOE.
return OptionalLong.empty();
}
}).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,13 @@ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
return null;
}

@Override
public AdminProtos.GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten(
RpcController controller, AdminProtos.GetLogFileSizeIfBeingWrittenRequest request)
throws ServiceException {
return null;
}

@Override
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
RpcController controller, GetSpaceQuotaSnapshotsRequest request)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.replicationserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import java.util.stream.Collectors;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ MediumTests.class})
public class TestRemoteWALFileLengthProvider {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRemoteWALFileLengthProvider.class);

private static final Logger LOG = LoggerFactory.getLogger(TestRemoteWALFileLengthProvider.class);

@Rule
public final TestName name = new TestName();

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static final byte[] CF = Bytes.toBytes("C");

@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster();
}

@AfterClass
public static void teardownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}

@Test
public void test() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
Table table = UTIL.createTable(tableName, CF);
UTIL.waitUntilAllRegionsAssigned(tableName);
assertEquals(1, UTIL.getMiniHBaseCluster().getNumLiveRegionServers());

// Find the RS which holds test table regions.
HRegionServer rs =
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer)
.filter(s -> !s.getRegions(tableName).isEmpty())
.findFirst().get();
assertNotNull(rs);

// Put some data and request rolling log, make multiple wals.
table.put(new Put(Bytes.toBytes("r1")).addColumn(CF, CF, Bytes.toBytes("v")));
rs.getWalRoller().requestRollAll();
table.put(new Put(Bytes.toBytes("r2")).addColumn(CF, CF, Bytes.toBytes("v")));
UTIL.waitFor(60000, rs::walRollRequestFinished);

WALFileLengthProvider rsLengthProvider =
rs.getWalFactory().getWALProvider().getWALFileLengthProvider();
WALFileLengthProvider remoteLengthProvider =
new RemoteWALFileLengthProvider(UTIL.getAsyncConnection(), rs.getServerName());

// Check that RegionServer and ReplicationServer can get same result whether the wal is being
// written
boolean foundWalIsBeingWritten = false;
List<Path> wals = getRsWalsOnFs(rs);
assertTrue(wals.size() > 1);
for (Path wal : wals) {
Path path = new Path(rs.getWALRootDir(), wal);
OptionalLong rsWalLength = rsLengthProvider.getLogFileSizeIfBeingWritten(path);
OptionalLong remoteLength = remoteLengthProvider.getLogFileSizeIfBeingWritten(path);
assertEquals(rsWalLength.isPresent(), remoteLength.isPresent());
if (rsWalLength.isPresent() && remoteLength.isPresent()) {
foundWalIsBeingWritten = true;
assertEquals(rsWalLength.getAsLong(), remoteLength.getAsLong());
}
}
assertTrue(foundWalIsBeingWritten);
}

private List<Path> getRsWalsOnFs(HRegionServer rs) throws IOException {
FileSystem fs = rs.getFileSystem();
FileStatus[] fileStatuses = fs.listStatus(new Path(rs.getWALRootDir(),
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().toString())));
return Arrays.stream(fileStatuses).map(FileStatus::getPath).collect(Collectors.toList());
}
}