From d2f3cfdde5d34a5d6017547342aafbe1d64d13fc Mon Sep 17 00:00:00 2001 From: Istvan Fajth Date: Tue, 11 Jan 2022 12:25:26 +0100 Subject: [PATCH 1/2] HDDS-5954. EC: Review the TODOs in GRPC Xceiver client and fix them. --- .../org/apache/hadoop/hdds/scm/README.gRPC.md | 236 ++++++++++++++++++ .../hadoop/hdds/scm/XceiverClientGrpc.java | 106 ++++---- .../apache/hadoop/hdds/scm/package-info.java | 20 +- 3 files changed, 298 insertions(+), 64 deletions(-) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md new file mode 100644 index 000000000000..0f6e8915fdc1 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md @@ -0,0 +1,236 @@ +# The gRPC client internals +This document is about the current gRPC client internals. It aims to give enough +context about gRPC and the Ratis client with which it shares its interface to +understand the current behaviour, and implementation better. +Besides this, at the end one can find information about possibilities that gRPC +can provide, and why we are not using it at the moment. + + +## Responsibilities of client code using the client +There are two types of requests in the client, sync and async. These requests +are considered to be synced from the caller's point of view, and they just block +the thread in which the caller sends the request. +If the client code is using the same client from multiple threads, the requests +are sent in via multiple message streams on the same channel. + +### When client code uses synced calls +There is no additional responsibilities on the client code about single +requests, the requests will be sent and the thread is blocked until the response +arrives. +However, if there are multiple dependent calls that are sent from multiple +threads, the client code has to externally synchronize the dependent calls in +between its threads. + +### When client code uses async calls +There are two async calls `WriteChunk` and `PutBlock` that can be sent via +the `ContainerProtocolCalls` though nothing prevents a client code to send +other type of commands in an asynchronous fashion by calling the gRPC client's +sendCommandAsync directly. +When using async calls, the client code has to ensure ordering between requests +if they need to be ordered. +For example `PutBlock` requires all related `WriteChunk` to finish successfully +before it is called, in order to have consistent data and metadata stored on the +DataNode in all the cases. The gRPC client itself does not do any measures to +guarantee such an ordering. More details about the reasons will be shared later +in this doc. In a nutshell, it brings in performance and complexity penalties +with no real gains. + +Retries are not handled nor provided for async calls, for these calls, the +client code has to implement the retry logic on its own, as that may involve +re-sending multiple requests, and may affect request ordering as well. + + +## How clients can be created +The `XCeiverClientManager` class is managing creation of the client instances, +as defined in the `XCeiverClientFactory` interface. +A user of this class can acquire a client with the help of acquire methods, and +then release it when it is not using it anymore. + +It is important to note that the lifecycle of the clients are up to the user, +and if a client is not released it may leak, as the clients are reference +counted and evicted from the cache if they are not references for more than +`scm.container.client.idle.threshold` milliseconds ago. The default idle +threshold is 10 seconds. + +The client is getting connected to the DataNode upon creation, and at every +request it checks if the connection is still up and ready. In case the +connection was terminated for any reason, the client simply tries to reconnect +to the DataNode. + + +## Client to server communication in a nutshell +In gRPC there are 3 things that is part of the communication, stub, channel, and +messages. + +Stub provides tha server side API on the client side. +Channel manages the communication properties, and the socket level connection. +Messages in our case are sent via bi-directional streaming with the help of +StreamObservers. + +When we initiate a connection we create a stub, and a channel, and when we send +a request (a message) we create the bi-directional stream, we send in a +request proto message, and we get back a response proto message, then we close +the stream. +We never send multiple requests via the same stream, although we can if we want +to, there are reasons that will be discussed with more details later on. + +The server side of communication processes the requests in the same thread in +which they arrive, via `HDDSDispatcher` in `GrpcXceiverService`, and this also +means that the requests are processed in more threads as the gRPC server side +provides multi-threaded processing per streams. + + +## Client to server communication in more detail + +### How requests are handled on client and server side +The gRPC client is also known as the standalone client, is used only for +reads in the current environment, but during the Erasure Coded storage +development we already selected the standalone client for writes. That effort +initiated the analysis of the client side, and this writeup. +The plan is to continue to use the gRPC client only for reads, and for EC writes +in the future, if that changes, then this document might as well need an update. + +We have two type of requests, sync and async requests from the client point of +view. +There are only two type of requests the client may -by design - send in an async +way, `PutBlock` and `WriteChunk`. +All the other requests are sent in a synced way from client side, and the client +is blocked until the response arrives from the server side. (The implementation +of the requests in `ContainerProtocolCalls` ensures this.) + +The one request per message communication strategy ensures that even though +clients are loaded from a cache, and multiple clients or multiple threads can +use the same stub and channel to talk to one server, all the requests sent from +one client to the same server are running in parallel on the server side. + +### Client side caching of gRPC clients +In Ozone there is the `XCeiverClientManager` class, which has an internal cache +for the actual clients that are connecting to DataNodes. The cache has a key +structure that consist of the `PipeLineID`, the Pipeline's `ReplicationType`, +and for reads in topology aware configurations the hostname of the closest host +in the PipeLine. + +For Erasure Coded writes, the cache key also contains the hostname of the node, +and the server port as well, for testing it was necessary to distinguish between +different DataNodes running locally, but in a real environment, this will not +affect the number of clients cached as Pipelines used are having different ids +per host:port, but the same id for the same host:port during a write. + + +## The problem with out-of-order `WriteChunk` requests +Let's say we process `PutBlock` first, and then for the block we process a +`WriteChunk` after `PutBlock` has finished. In this case `PutBlock` will persist +metadata of the block for example length of the block. Now if an out-of-order +`WriteChunk` fails, there can be inconsistencies between the metadata and the +block data itself, as `PutBlock` happened before all data was written. + +In order to avoid this problem, the initial implementation ensures that even the +`WriteChunk` and `PutBlock` requests are sent in synchronized from the client +side, by synchronizing these calls as well internally. Until Erasure Coded +storage, it was not a priority to deal with this, as all write operations were +redirected to the Ratis client prior to that. + +Another edge case is when there is a successful out-of-order `WriteChunk`, but +the recorded length of the block in the metadata is less and does not account +for the out-of-order `WriteChunk`. In this case there won't be a problem seen +from the DataNode side, as the metadata is consistent, but the additional data +will not be visible for readers, and will be treated as garbage at the end of +the block file. From the client's point of view though data will be missing and +the block file won't be complete. + + +## Experiment with really async calls +In order to avoid the ordering problem of `WriteChunk` and `PutBlock` requests, +it seems to be tempting to rely on gRPC's guarantees about the processing order +of messages sent via the same StreamObserver pair, so experiments with this +has happened as part of HDDS-5954. + +### Promising first results +During an experiment, by creating a simple server side, that does nothing +but sends back a default response, and a simple client side, that sends in +simple default requests, there was a comparison running. +The comparison was between the way of creating StreamObserver pair per request, +and re-using the same StreamObserver pair to send in the same amount of +requests. + +The measures showed that there is a significant time difference between the two +ways of sending 10k messages via gRPC, and re-using the StreamObserver pair is +two orders of magnitude faster. It is also that much faster if one uses multiple +StreamObserver pairs from multiple threads, and in this case, the ordering +guarantees were held on a per-thread basis. + +### The reality a.k.a. facepalm +After going further and creating a mock server side and write a test against it +using the real client implementation, reality kicks in hard. +The server side implementation was prepared to inject customizable delays on a +per-request-type basis, and suddenly the gain with reusing the same +StreamObserver pair became a bottleneck in a multi-threaded environment, and +an insignificant gain in a single threaded environment. + +Here is what happens when requests are sent from multiple threads: the +StreamObserver ensures ordering of requests, so the requests become synced +and single threaded within the same pair of StreamObservers, even though they +were sent by multiple threads. Not to mention the necessity of locking around +request sending, as if multiple threads write to the same stream messages can +get overlapped. This means that via the same StreamObserver pairs, messages can +be sent, and responses can arrive in order, but the processing time of all +requests is an accumulation of the processing time of the single requests. + +### Conclusion from the experiment +Even though we may re-use StreamObservers, we loose the ability to send data +in parallel via the client, and we do not gain with re-use. Not to mention the +already synced requests which when arrive from different threads, now are being +processed in that thread without waiting for other requests, but would be +waiting for each other if we start to use the same StreamObserver pairs for +them. + + +## So how to solve the ordering problem? +On the gRPC client level this problem can not be solved, as that would be way +more complex and complicated to handle different requests via different +StreamObserver pairs, and caching the StreamObservers could become a nightmare. +Also, even if let's say we cache the StreamObservers, and we use one message +stream to do `WriteChunks` then the `PutBlock` call during writing a block, we +would lose the possibly async nature of these calls by the synchronization +inside gRPC that ensures the ordering. Oh, by the way, that synchronization +comes from the fact that the server side processes the request on the same +thread where it has arrived, but if we move that to a background thread, and +send the response when it is ready, we loose the ordering guarantee of gRPC... + +So, all in all, the solution that hurst us the least is to ensure that, +all the client code that uses the gRPC client is aware the need and responsible +for ordering and ensures that all the `WriteChunk` requests are finished +successfully before issuing a `PutBlock` request. + + +## A possible solution +During experiments, there was a solution that started to show its shape slowly, +and got frightening at the end. Here is what we could end up with. + +We have one StreamObserver pair, or a pool of a few pairs, which we can use to +send sync requests over, as sync requests are mostly coming from the main client +thread, one pair of StreamObservers might be enough, for parallel use we might +supply a pool, noting the fact that `onNext(request)` method have to be guarded +by a lock, as multiple messages can not be sent properly on the same channel. + +We have a separate way of posting async requests, where we create a +StreamObserver pair for every request, and complete it upon sending the request. +In this case the client has to return an empty `CompleteableFuture` object, to +which later on it has to set the response for the request. + +This sounds easy, but when it comes to implementation it turns out that it is +not that simple. Either we have request IDs, and the server side response also +carries the request ID, so we can pair up the response with the request, or we +use some other technics (like sliding window) to ensure the responses are sent +back in the order of requests even though they were processed asynchronously, or +even both technics together. + +### Why we just do not engage in this +This whole question came up as part of Erasure Coded storage development, and +in the EC write path, we have to synchronize at multiple points of a write. +Once an EC data block group is written, we need to ensure all the data and +parity blocks were written successfully, before we do a `PutBlock` operation, +this is to avoid a coslty and enormously complex recovery after write algorithm. +So that EC writes does not really need anything to be implemented at the moment +as the external synchronization of the async calls are already coded into the +logic behind `ECKeyOutputStream`. \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index f19853c1aa49..3a840e2da5ea 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -25,13 +25,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.function.SupplierWithIOException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -70,28 +70,41 @@ import static org.apache.hadoop.hdds.HddsUtils.processForDebug; /** - * A Client for the storageContainer protocol for read object data. + * {@link XceiverClientSpi} implementation, the standalone client. + * + * This class can be used to connect to a DataNode and use the + * DatanodeClientProtocol to read and write data. + * Writes via this client does not go through the Ratis protocol, and does not + * replicate to any other nodes, usage of this client implementation to + * write replicated data is ill-advised. + * + * When using the sendCommandAsync method, the client code need to ensure + * ordering of requests, and response processing, or synchronize whenever it is + * necessary. + * When using the sendCommand method note the fact that it is blocking the + * current thread while waiting for the response. */ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private final Pipeline pipeline; private final ConfigurationSource config; - private Map asyncStubs; - private XceiverClientMetrics metrics; - private Map channels; + private final Map asyncStubs; + private final XceiverClientMetrics metrics; + private final Map channels; private final Semaphore semaphore; - private boolean closed = false; private final long timeout; - private SecurityConfig secConfig; + private final SecurityConfig secConfig; private final boolean topologyAwareRead; - private List caCerts; + private final List caCerts; // Cache the DN which returned the GetBlock command so that the ReadChunk // command can be sent to the same DN. - private Map getBlockDNcache; + private final Map getBlockDNcache; + + private boolean closed = false; /** * Constructs a client that can communicate with the Container framework on - * data nodes. + * data nodes via DatanodeClientProtocol. * * @param pipeline - Pipeline that defines the machines. * @param config -- Ozone Config @@ -122,7 +135,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config, /** * Constructs a client that can communicate with the Container framework on - * data nodes. + * data nodes via DatanodeClientProtocol. * * @param pipeline - Pipeline that defines the machines. * @param config -- Ozone Config @@ -132,7 +145,8 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config) { } /** - * To be used when grpc token is not enabled. + * Sets up the connection to a DataNode. Initializes the gRPC server stub, and + * opens the gRPC channel to be used to send requests to the server. */ @Override public void connect() throws Exception { @@ -145,16 +159,12 @@ public void connect() throws Exception { } /** - * Passed encoded token to GRPC header when security is enabled. + * Token based auth is not currently supported, so this method works the same + * way as {@link #connect()}. */ @Override public void connect(String encodedToken) throws Exception { - // connect to the closest node, if closest node doesn't exist, delegate to - // first node, which is usually the leader in the pipeline. - DatanodeDetails dn = topologyAwareRead ? this.pipeline.getClosestNode() : - this.pipeline.getFirstNode(); - // just make a connection to the picked datanode at the beginning - connectToDatanode(dn); + connect(); } private synchronized void connectToDatanode(DatanodeDetails dn) @@ -200,7 +210,8 @@ private synchronized void connectToDatanode(DatanodeDetails dn) } /** - * Returns if the xceiver client connects to all servers in the pipeline. + * Checks if the client has a live connection channel to the specified + * Datanode. * * @return True if the connection is alive, false otherwise. */ @@ -213,6 +224,14 @@ private boolean isConnected(ManagedChannel channel) { return channel != null && !channel.isTerminated() && !channel.isShutdown(); } + /** + * Closes all the communication channels of the client one-by-one. + * When a channel is closed, no further requests can be sent via the channel, + * and the method waits to finish all ongoing communication. + * + * Note: the method wait 1 hour per channel tops and if that is not enough + * to finish ongoing communication, then interrupts the connection anyways. + */ @Override public synchronized void close() { closed = true; @@ -329,9 +348,6 @@ private XceiverClientReply sendCommandWithRetry( // In case of an exception or an error, we will try to read from the // datanodes in the pipeline in a round robin fashion. - - // TODO: cache the correct leader info in here, so that any subsequent calls - // should first go to leader XceiverClientReply reply = new XceiverClientReply(null); List datanodeList = null; @@ -407,11 +423,9 @@ private XceiverClientReply sendCommandWithRetry( } ioException = new IOException(e); - responseProto = null; } catch (InterruptedException e) { LOG.error("Command execution was interrupted ", e); Thread.currentThread().interrupt(); - responseProto = null; } } @@ -419,27 +433,13 @@ private XceiverClientReply sendCommandWithRetry( reply.setResponse(CompletableFuture.completedFuture(responseProto)); return reply; } else { - Preconditions.checkNotNull(ioException); + Objects.requireNonNull(ioException); LOG.error("Failed to execute command {} on the pipeline {}.", processForDebug(request), pipeline); throw ioException; } } - // TODO: for a true async API, once the waitable future while executing - // the command on one channel fails, it should be retried asynchronously - // on the future Task for all the remaining datanodes. - - // Note: this Async api is not used currently used in any active I/O path. - // In case it gets used, the asynchronous retry logic needs to be plugged - // in here. - /** - * Sends a given command to server gets a waitable future back. - * - * @param request Request - * @return Response to the command - * @throws IOException - */ @Override public XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request) @@ -448,22 +448,13 @@ public XceiverClientReply sendCommandAsync( Span span = GlobalTracer.get() .buildSpan("XceiverClientGrpc." + request.getCmdType().name()).start(); - try (Scope scope = GlobalTracer.get().activateSpan(span)) { + try (Scope ignored = GlobalTracer.get().activateSpan(span)) { ContainerCommandRequestProto finalPayload = ContainerCommandRequestProto.newBuilder(request) .setTraceID(TracingUtil.exportCurrentSpan()) .build(); - XceiverClientReply asyncReply = - sendCommandAsync(finalPayload, pipeline.getFirstNode()); - // TODO : for now make this API sync in nature as async requests are - // served out of order over XceiverClientGrpc. This needs to be fixed - // if this API is to be used for I/O path. Currently, this is not - // used for Read/Write Operation but for tests. - if (!HddsUtils.isReadOnly(request)) { - asyncReply.getResponse().get(); - } - return asyncReply; + return sendCommandAsync(finalPayload, pipeline.getFirstNode()); } finally { span.finish(); @@ -485,11 +476,8 @@ public XceiverClientReply sendCommandAsync( semaphore.acquire(); long requestTime = System.currentTimeMillis(); metrics.incrPendingContainerOpsMetrics(request.getCmdType()); - // create a new grpc stream for each non-async call. - // TODO: for async calls, we should reuse StreamObserver resources. - // set the grpc dealine here so as if the response is not received - // in the configured time, the rpc will fail with DEADLINE_EXCEEDED here + // create a new grpc message stream pair for each call. final StreamObserver requestObserver = asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS) .send(new StreamObserver() { @@ -562,7 +550,7 @@ private void reconnect(DatanodeDetails dn) throw new IOException("Error while connecting", e); } - if (channel == null || !isConnected(channel)) { + if (!isConnected(channel)) { throw new IOException("This channel is not connected."); } } @@ -579,11 +567,7 @@ public XceiverClientReply watchForCommit(long index) public long getReplicatedMinCommitIndex() { return 0; } - /** - * Returns pipeline Type. - * - * @return - Stand Alone as the type. - */ + @Override public HddsProtos.ReplicationType getPipelineType() { return HddsProtos.ReplicationType.STAND_ALONE; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java index 9390bc102034..9e5ba6a256e5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java @@ -16,8 +16,22 @@ * limitations under the License. */ -package org.apache.hadoop.hdds.scm; - /** * Classes for different type of container service client. - */ \ No newline at end of file + * + * In Ozone there are two type of clients, the Ratis client and the Standalone + * client. + * + * The Ratis client we use for writing data, and it is using Ratis communication + * facilities to connect to the server side. + * + * The Standalone client is used for reading data via protobuf messages over + * gRPC facilities. For more information on how the gRPC client works, and why + * one can check the README file in this package. + * + * We are using a caching mechanism to cache the created clients inside the + * {@link org.apache.hadoop.hdds.scm.XceiverClientManager}, and the client + * interface is defined by the + * {@link org.apache.hadoop.hdds.scm.XceiverClientSpi} interface. + */ +package org.apache.hadoop.hdds.scm; \ No newline at end of file From ad45c924866af934fa413712e7fc6708f74c2b1a Mon Sep 17 00:00:00 2001 From: Istvan Fajth Date: Tue, 11 Jan 2022 23:59:40 +0100 Subject: [PATCH 2/2] Fix rat. --- .../org/apache/hadoop/hdds/scm/README.gRPC.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md index 0f6e8915fdc1..39f2b626b084 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/README.gRPC.md @@ -1,3 +1,20 @@ + + # The gRPC client internals This document is about the current gRPC client internals. It aims to give enough context about gRPC and the Ratis client with which it shares its interface to