Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ public class TestCoprocessorEndpointTracing {
})
.build();
private static final ConnectionRule connectionRule =
new ConnectionRule(miniclusterRule::createConnection);
ConnectionRule.createAsyncConnectionRule(miniclusterRule::createAsyncConnection);

private static final class Setup extends ExternalResource {
@Override
protected void before() throws Throwable {
final HBaseTestingUtil util = miniclusterRule.getTestingUtility();
final AsyncConnection connection = connectionRule.getConnection();
final AsyncConnection connection = connectionRule.getAsyncConnection();
final AsyncAdmin admin = connection.getAdmin();
final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TEST_TABLE)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build();
Expand All @@ -149,7 +149,7 @@ protected void before() throws Throwable {

@Test
public void traceAsyncTableEndpoint() {
final AsyncConnection connection = connectionRule.getConnection();
final AsyncConnection connection = connectionRule.getAsyncConnection();
final AsyncTable<?> table = connection.getTable(TEST_TABLE);
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final CompletableFuture<Map<byte[], String>> future = new CompletableFuture<>();
Expand Down Expand Up @@ -228,7 +228,7 @@ public void onError(Throwable error) {

@Test
public void traceSyncTableEndpointCall() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final RpcController controller = new ServerRpcController();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
Expand Down Expand Up @@ -280,7 +280,7 @@ public void traceSyncTableEndpointCall() throws Exception {

@Test
public void traceSyncTableEndpointCallAndCallback() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final RpcController controller = new ServerRpcController();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
Expand Down Expand Up @@ -336,7 +336,7 @@ public void traceSyncTableEndpointCallAndCallback() throws Exception {

@Test
public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final EchoResponseProto response = TraceUtil.trace(() -> {
Expand Down Expand Up @@ -376,7 +376,7 @@ public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception {

@Test
public void traceSyncTableBatchEndpoint() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final Descriptors.MethodDescriptor descriptor =
TestProtobufRpcProto.getDescriptor().findMethodByName("echo");
Expand Down Expand Up @@ -423,7 +423,7 @@ public void traceSyncTableBatchEndpoint() throws Exception {

@Test
public void traceSyncTableBatchEndpointCallback() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) {
final Descriptors.MethodDescriptor descriptor =
TestProtobufRpcProto.getDescriptor().findMethodByName("echo");
Expand Down Expand Up @@ -472,7 +472,7 @@ public void traceSyncTableBatchEndpointCallback() throws Exception {

@Test
public void traceAsyncAdminEndpoint() throws Exception {
final AsyncConnection connection = connectionRule.getConnection();
final AsyncConnection connection = connectionRule.getAsyncConnection();
final AsyncAdmin admin = connection.getAdmin();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final ServiceCaller<TestProtobufRpcProto, EchoResponseProto> callback =
Expand Down Expand Up @@ -504,7 +504,7 @@ public void traceAsyncAdminEndpoint() throws Exception {

@Test
public void traceSyncAdminEndpoint() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection();
final Connection connection = connectionRule.getConnection();
try (final Admin admin = connection.getAdmin()) {
final TestProtobufRpcProto.BlockingInterface service =
TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
Expand Down Expand Up @@ -537,7 +537,7 @@ public void traceSyncAdminEndpoint() throws Exception {
}

private void waitForAndLog(Matcher<SpanData> spanMatcher) {
final Configuration conf = connectionRule.getConnection().getConfiguration();
final Configuration conf = connectionRule.getAsyncConnection().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
otelClassRule::getSpans, hasItem(spanMatcher)));
final List<SpanData> spans = otelClassRule.getSpans();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TestShellExecEndpointCoprocessor {

@Rule
public final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);

@Test
public void testShellExecUnspecified() {
Expand All @@ -69,7 +69,7 @@ public void testShellExecForeground() {
}

private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) {
final AsyncConnection conn = connectionRule.getConnection();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin();

final String command = "echo -n \"hello world\"";
Expand All @@ -87,7 +87,7 @@ private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> co

@Test
public void testShellExecBackground() throws IOException {
final AsyncConnection conn = connectionRule.getConnection();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin();

final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility());
Expand Down Expand Up @@ -121,7 +121,7 @@ private static File ensureTestDataDirExists(
final Path testDataDir = Optional.of(testingUtility)
.map(HBaseTestingUtil::getDataTestDir)
.map(Object::toString)
.map(val -> Paths.get(val))
.map(Paths::get)
.orElseThrow(() -> new RuntimeException("Unable to locate temp directory path."));
final File testDataDirFile = Files.createDirectories(testDataDir).toFile();
assertTrue(testDataDirFile.exists());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
Expand All @@ -35,7 +36,7 @@
* public class TestMyClass {
* private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
* private static final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createConnection);
*
* @ClassRule
* public static final TestRule rule = RuleChain
Expand All @@ -44,32 +45,90 @@
* }
* }</pre>
*/
public class ConnectionRule extends ExternalResource {
public final class ConnectionRule extends ExternalResource {

private final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier;
private AsyncConnection connection;
private final Supplier<Connection> connectionSupplier;
private final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier;

public ConnectionRule(final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier) {
private Connection connection;
private AsyncConnection asyncConnection;

public static ConnectionRule createConnectionRule(
final Supplier<Connection> connectionSupplier
) {
return new ConnectionRule(connectionSupplier, null);
}

public static ConnectionRule createAsyncConnectionRule(
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(null, asyncConnectionSupplier);
}

public static ConnectionRule createConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(connectionSupplier, asyncConnectionSupplier);
}

private ConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
this.connectionSupplier = connectionSupplier;
this.asyncConnectionSupplier = asyncConnectionSupplier;
}

public AsyncConnection getConnection() {
public Connection getConnection() {
if (connection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with a synchronous connection.");
}
return connection;
}

public AsyncConnection getAsyncConnection() {
if (asyncConnection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with an asynchronous connection.");
}
return asyncConnection;
}

@Override
protected void before() throws Throwable {
this.connection = connectionSupplier.get().join();
if (connectionSupplier != null) {
this.connection = connectionSupplier.get();
}
if (asyncConnectionSupplier != null) {
this.asyncConnection = asyncConnectionSupplier.get().join();
}
if (connection == null && asyncConnection != null) {
this.connection = asyncConnection.toConnection();
}
}

@Override
protected void after() {
if (this.connection != null) {
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
CompletableFuture<Void> closeConnection = CompletableFuture.runAsync(() -> {
if (this.connection != null) {
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
});
CompletableFuture<Void> closeAsyncConnection = CompletableFuture.runAsync(() -> {
if (this.asyncConnection != null) {
try {
asyncConnection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
CompletableFuture.allOf(closeConnection, closeAsyncConnection).join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.junit.ClassRule;
import org.junit.Rule;
Expand All @@ -44,7 +46,7 @@
*
* &#64;Rule
* public final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
* }
* }
* </pre>
Expand Down Expand Up @@ -108,11 +110,23 @@ public HBaseTestingUtil getTestingUtility() {
return testingUtility;
}

/**
* Create a {@link Connection} to the managed {@link SingleProcessHBaseCluster}. It's up to
* the caller to {@link Connection#close() close()} the connection when finished.
*/
public Connection createConnection() {
try {
return createAsyncConnection().get().toConnection();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

/**
* Create a {@link AsyncConnection} to the managed {@link SingleProcessHBaseCluster}. It's up to
* the caller to {@link AsyncConnection#close() close()} the connection when finished.
*/
public CompletableFuture<AsyncConnection> createConnection() {
public CompletableFuture<AsyncConnection> createAsyncConnection() {
if (miniCluster == null) {
throw new IllegalStateException("test cluster not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public class TestApiV1ClusterMetricsResource {
})
.build();
private static final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
private static final ClassSetup classRule = new ClassSetup(connectionRule::getConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private static final ClassSetup classRule = new ClassSetup(connectionRule::getAsyncConnection);

private static final class ClassSetup extends ExternalResource {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public class TestMetaBrowser {
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();

private final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
new ClearUserNamespacesAndTablesRule(connectionRule::getConnection);
new ClearUserNamespacesAndTablesRule(connectionRule::getAsyncConnection);

@Rule
public TestRule rule = RuleChain.outerRule(connectionRule)
Expand All @@ -84,7 +84,7 @@ public class TestMetaBrowser {

@Before
public void before() {
connection = connectionRule.getConnection();
connection = connectionRule.getAsyncConnection();
admin = connection.getAdmin();
}

Expand Down