From a99d6b984fbb1e2e3f6d7725e61b5a22c55fe21b Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 22 Aug 2023 11:08:21 -0400 Subject: [PATCH 1/5] ensure connection attributes are not corrupted by released byte buffers --- .../org/apache/hadoop/hbase/ipc/RpcCall.java | 4 +-- .../apache/hadoop/hbase/ipc/ServerCall.java | 6 ++-- .../hadoop/hbase/ipc/ServerRpcConnection.java | 7 +++++ .../TestRequestAndConnectionAttributes.java | 29 +++++++++++++++---- .../namequeues/TestNamedQueueRecorder.java | 3 +- .../region/TestRegionProcedureStore.java | 3 +- 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index cc97a39c7ee4..82c9117716c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; @@ -27,7 +28,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; /** @@ -83,7 +83,7 @@ public interface RpcCall extends RpcCallContext { /** Returns The request header of this call. */ RequestHeader getHeader(); - ConnectionHeader getConnectionHeader(); + Map getConnectionAttributes(); /** Returns Port of remote address in this call */ int getRemotePort(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index f3568a36f144..0747612dd27f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.CellScanner; @@ -49,7 +50,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -209,8 +209,8 @@ public RequestHeader getHeader() { } @Override - public RPCProtos.ConnectionHeader getConnectionHeader() { - return this.connection.connectionHeader; + public Map getConnectionAttributes() { + return this.connection.connectionAttributes; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index b09f33c47f9a..3383255b5efe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -31,8 +31,10 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; +import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.stream.Collectors; import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.crypto.random.CryptoRandom; import org.apache.commons.crypto.random.CryptoRandomFactory; @@ -75,6 +77,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; @@ -103,6 +106,7 @@ abstract class ServerRpcConnection implements Closeable { protected int remotePort; protected InetAddress addr; protected ConnectionHeader connectionHeader; + protected Map connectionAttributes; /** * Codec the client asked use. @@ -405,6 +409,9 @@ private CodedInputStream createCis(ByteBuff buf) { // Reads the connection header following version private void processConnectionHeader(ByteBuff buf) throws IOException { this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); + this.connectionAttributes = connectionHeader.getAttributeList().stream() + .collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName, + nameBytesPair -> nameBytesPair.getValue().toByteArray())); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) { throw new EmptyServiceNameException(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java index b376bfc18557..c914eb42f551 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -27,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -84,6 +86,8 @@ public class TestRequestAndConnectionAttributes { @BeforeClass public static void setUp() throws Exception { TEST_UTIL = new HBaseTestingUtil(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(ALLOCATOR_POOL_ENABLED_KEY, true); TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE, new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE, @@ -101,15 +105,19 @@ public void setup() { } @Test - public void testConnectionAttributes() throws IOException { + public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException { TableName tableName = TableName.valueOf("testConnectionAttributes"); - TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, - HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + byte[] cf = Bytes.toBytes("0"); + TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE, + AttributesCoprocessor.class.getName()); Configuration conf = TEST_UTIL.getConfiguration(); try (Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { + + ensureBuffersAreOverwritten(table, cf); Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); @@ -118,6 +126,15 @@ public void testConnectionAttributes() throws IOException { } } + private void ensureBuffersAreOverwritten(Table table, byte[] cf) throws IOException { + // this will cause unread connection attributes on the + Put put = new Put(Bytes.toBytes(UUID.randomUUID().toString())); + byte[] bytes = new byte[100]; + new Random().nextBytes(bytes); + put.addColumn(cf, bytes, bytes); + table.put(put); + } + @Test public void testRequestAttributesGet() throws IOException { addRandomRequestAttributes(); @@ -275,10 +292,10 @@ public void preGetOp(ObserverContext c, Get get, .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName())) .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); } - for (HBaseProtos.NameBytesPair attr : rpcCall.getConnectionHeader().getAttributeList()) { + for (Map.Entry attr : rpcCall.getConnectionAttributes().entrySet()) { result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) - .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName())) - .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey())) + .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); } result.sort(CellComparator.getInstance()); c.bypass(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 7a3ca0b7cf9f..7f48bca86827 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -695,7 +696,7 @@ public RPCProtos.RequestHeader getHeader() { } @Override - public RPCProtos.ConnectionHeader getConnectionHeader() { + public Map getConnectionAttributes() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index dd49d00ac3a1..9f807ee70788 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.hadoop.hbase.CellScanner; @@ -222,7 +223,7 @@ public RPCProtos.RequestHeader getHeader() { } @Override - public RPCProtos.ConnectionHeader getConnectionHeader() { + public Map getConnectionAttributes() { return null; } From 8de34c2a542e281eb6491f5739096dc91a5c47ea Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 22 Aug 2023 16:09:01 -0400 Subject: [PATCH 2/5] pr feedback, general cleanup, add request attributes lazy eval --- .../org/apache/hadoop/hbase/ipc/RpcCall.java | 13 +++++++ .../apache/hadoop/hbase/ipc/ServerCall.java | 20 +++++++++++ .../hadoop/hbase/ipc/ServerRpcConnection.java | 16 ++++++--- .../TestRequestAndConnectionAttributes.java | 35 +++++++------------ .../namequeues/TestNamedQueueRecorder.java | 5 +++ .../region/TestRegionProcedureStore.java | 5 +++ 6 files changed, 67 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index 82c9117716c6..c4f21a84cb2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -19,8 +19,11 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.ExecutorService; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -83,8 +86,18 @@ public interface RpcCall extends RpcCallContext { /** Returns The request header of this call. */ RequestHeader getHeader(); + /** + * Returns the map of attributes specified when building the Connection See the Map argument on + * {@link org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration, ExecutorService, User, Map)} + */ Map getConnectionAttributes(); + /** + * Returns the map of attributes specified when building the request See + * {@link org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])} + */ + Map getRequestAttributes(); + /** Returns Port of remote address in this call */ int getRemotePort(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 0747612dd27f..0dba4683e022 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -24,10 +24,12 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseServerException; @@ -49,6 +51,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; @@ -99,6 +102,7 @@ public abstract class ServerCall implements RpcCa // cumulative size of serialized exceptions private long exceptionSize = 0; private final boolean retryImmediatelySupported; + private Map requestAttributes; // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and // the rest of the bits are for WAL reference count. We can only call release if all of them are @@ -213,6 +217,22 @@ public Map getConnectionAttributes() { return this.connection.connectionAttributes; } + @Override + public Map getRequestAttributes() { + if (this.requestAttributes == null) { + if (header.getAttributeList().isEmpty()) { + this.requestAttributes = Collections.emptyMap(); + } else { + this.requestAttributes = Maps.newHashMapWithExpectedSize(header.getAttributeList().size()); + for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { + this.requestAttributes.put(nameBytesPair.getName(), + nameBytesPair.getValue().toByteArray()); + } + } + } + return this.requestAttributes; + } + @Override public int getPriority() { return this.header.getPriority(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index 3383255b5efe..3529dcfc72dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -31,13 +31,14 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Properties; -import java.util.stream.Collectors; import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.crypto.random.CryptoRandom; import org.apache.commons.crypto.random.CryptoRandomFactory; +import org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.VersionInfoUtil; @@ -409,9 +410,16 @@ private CodedInputStream createCis(ByteBuff buf) { // Reads the connection header following version private void processConnectionHeader(ByteBuff buf) throws IOException { this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); - this.connectionAttributes = connectionHeader.getAttributeList().stream() - .collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName, - nameBytesPair -> nameBytesPair.getValue().toByteArray())); + if (connectionHeader.getAttributeList().isEmpty()) { + this.connectionAttributes = Collections.emptyMap(); + } else { + this.connectionAttributes = + Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size()); + for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) { + this.connectionAttributes.put(nameBytesPair.getName(), + nameBytesPair.getValue().toByteArray()); + } + } String serviceName = connectionHeader.getServiceName(); if (serviceName == null) { throw new EmptyServiceNameException(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java index c914eb42f551..728b877a32b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.io.ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -61,8 +60,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; - @Category({ ClientTests.class, MediumTests.class }) public class TestRequestAndConnectionAttributes { @@ -86,8 +83,6 @@ public class TestRequestAndConnectionAttributes { @BeforeClass public static void setUp() throws Exception { TEST_UTIL = new HBaseTestingUtil(); - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setBoolean(ALLOCATOR_POOL_ENABLED_KEY, true); TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE, new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE, @@ -115,8 +110,11 @@ public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException try (Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { - ensureBuffersAreOverwritten(table, cf); - Result result = table.get(new Get(Bytes.toBytes(0))); + // submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection + // header + byte[] bytes = new byte[300]; + new Random().nextBytes(bytes); + Result result = table.get(new Get(bytes)); assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { @@ -126,15 +124,6 @@ public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException } } - private void ensureBuffersAreOverwritten(Table table, byte[] cf) throws IOException { - // this will cause unread connection attributes on the - Put put = new Put(Bytes.toBytes(UUID.randomUUID().toString())); - byte[] bytes = new byte[100]; - new Random().nextBytes(bytes); - put.addColumn(cf, bytes, bytes); - table.put(put); - } - @Test public void testRequestAttributesGet() throws IOException { addRandomRequestAttributes(); @@ -287,10 +276,10 @@ public void preGetOp(ObserverContext c, Get get, // for connection attrs test RpcCall rpcCall = RpcServer.getCurrentCall().get(); - for (HBaseProtos.NameBytesPair attr : rpcCall.getHeader().getAttributeList()) { + for (Map.Entry attr : rpcCall.getRequestAttributes().entrySet()) { result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) - .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName())) - .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getKey())) + .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); } for (Map.Entry attr : rpcCall.getConnectionAttributes().entrySet()) { result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) @@ -316,15 +305,15 @@ public void prePut(ObserverContext c, Put put, WAL private void validateRequestAttributes() { RpcCall rpcCall = RpcServer.getCurrentCall().get(); - List attrs = rpcCall.getHeader().getAttributeList(); + Map attrs = rpcCall.getRequestAttributes(); if (attrs.size() != REQUEST_ATTRIBUTES.size()) { return; } - for (HBaseProtos.NameBytesPair attr : attrs) { - if (!REQUEST_ATTRIBUTES.containsKey(attr.getName())) { + for (Map.Entry attr : attrs.entrySet()) { + if (!REQUEST_ATTRIBUTES.containsKey(attr.getKey())) { return; } - if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getName()), attr.getValue().toByteArray())) { + if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getKey()), attr.getValue())) { return; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 7f48bca86827..c24b364a2277 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -700,6 +700,11 @@ public Map getConnectionAttributes() { return null; } + @Override + public Map getRequestAttributes() { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index 9f807ee70788..83f788ba1518 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -227,6 +227,11 @@ public Map getConnectionAttributes() { return null; } + @Override + public Map getRequestAttributes() { + return null; + } + @Override public int getRemotePort() { return 0; From 953a28b9e73d5d3b2ddb94e2af9447d7eebe2752 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 22 Aug 2023 16:53:50 -0400 Subject: [PATCH 3/5] thread safety for request attributes, doc updates --- .../main/java/org/apache/hadoop/hbase/ipc/RpcCall.java | 9 +++++---- .../java/org/apache/hadoop/hbase/ipc/ServerCall.java | 9 +++++---- .../org/apache/hadoop/hbase/ipc/ServerRpcConnection.java | 3 +++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index c4f21a84cb2c..c9eaa2560527 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -87,14 +87,15 @@ public interface RpcCall extends RpcCallContext { RequestHeader getHeader(); /** - * Returns the map of attributes specified when building the Connection See the Map argument on - * {@link org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration, ExecutorService, User, Map)} + * Returns the map of attributes specified when building the Connection. + * @see org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration, + * ExecutorService, User, Map) */ Map getConnectionAttributes(); /** - * Returns the map of attributes specified when building the request See - * {@link org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])} + * Returns the map of attributes specified when building the request. + * @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[]) */ Map getRequestAttributes(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 0dba4683e022..6da217333364 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -102,7 +102,7 @@ public abstract class ServerCall implements RpcCa // cumulative size of serialized exceptions private long exceptionSize = 0; private final boolean retryImmediatelySupported; - private Map requestAttributes; + private volatile Map requestAttributes; // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and // the rest of the bits are for WAL reference count. We can only call release if all of them are @@ -223,11 +223,12 @@ public Map getRequestAttributes() { if (header.getAttributeList().isEmpty()) { this.requestAttributes = Collections.emptyMap(); } else { - this.requestAttributes = Maps.newHashMapWithExpectedSize(header.getAttributeList().size()); + Map requestAttributes = + Maps.newHashMapWithExpectedSize(header.getAttributeList().size()); for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { - this.requestAttributes.put(nameBytesPair.getName(), - nameBytesPair.getValue().toByteArray()); + requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray()); } + this.requestAttributes = requestAttributes; } } return this.requestAttributes; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index 3529dcfc72dc..5beb273d327f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -410,6 +410,9 @@ private CodedInputStream createCis(ByteBuff buf) { // Reads the connection header following version private void processConnectionHeader(ByteBuff buf) throws IOException { this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); + + // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted + // eventually if (connectionHeader.getAttributeList().isEmpty()) { this.connectionAttributes = Collections.emptyMap(); } else { From a665364c9f213c119170023a7922e9b9adfe5463 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 22 Aug 2023 20:35:12 -0400 Subject: [PATCH 4/5] fix banned import --- .../src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java | 2 +- .../java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 6da217333364..66a2e44fac19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseServerException; @@ -45,6 +44,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index 5beb273d327f..e0f69e4b84c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -38,7 +38,6 @@ import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.crypto.random.CryptoRandom; import org.apache.commons.crypto.random.CryptoRandomFactory; -import org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.VersionInfoUtil; @@ -68,6 +67,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; From 4fa6c4a00e2875912c46719aa9af154bc29bfd56 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 23 Aug 2023 07:54:51 -0400 Subject: [PATCH 5/5] rm imports only for javadoc --- .../src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index c9eaa2560527..0555202f88b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -19,11 +19,9 @@ import java.io.IOException; import java.util.Map; -import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability;