From bc3d64441e1ef1c9c60946fe6ca5dd152a1d48a7 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 3 Mar 2022 13:09:53 -0500 Subject: [PATCH 1/7] Chunk commit requests dynamically This ensures they do not materialize all data at once for large requests --- .../worker/windmill/GrpcWindmillServer.java | 73 ++++++++++++------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 48526b80ee2f..77e436ba6179 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -20,6 +20,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintWriter; import java.io.SequenceInputStream; import java.net.URI; @@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map requests) { } } - private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) { - Preconditions.checkNotNull(pendingRequest.computation); - final ByteString serializedCommit = pendingRequest.request.toByteString(); + private class ChunkingByteStream extends OutputStream { + private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE); + private final Consumer chunkWriter; - synchronized (this) { - pending.put(id, pendingRequest); - for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) { - int end = i + COMMIT_STREAM_CHUNK_SIZE; - ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size())); - - StreamingCommitRequestChunk.Builder chunkBuilder = - StreamingCommitRequestChunk.newBuilder() - .setRequestId(id) - .setSerializedWorkItemCommit(chunk) - .setComputationId(pendingRequest.computation) - .setShardingKey(pendingRequest.request.getShardingKey()); - int remaining = serializedCommit.size() - end; - if (remaining > 0) { - chunkBuilder.setRemainingBytesForWorkItem(remaining); - } + ChunkingByteStream(Consumer chunkWriter) { + this.chunkWriter = chunkWriter; + } - StreamingCommitWorkRequest requestChunk = - StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); - try { - send(requestChunk); - } catch (IllegalStateException e) { - // Stream was broken, request will be retried when stream is reopened. - break; - } + @Override + public void close() { + flushBytes(); + } + + @Override + public void write(int b) throws IOException { + output.write(b); + if (output.size() == COMMIT_STREAM_CHUNK_SIZE) { + flushBytes(); + } + } + + private void flushBytes() { + chunkWriter.accept(output.toByteString()); + output.reset(); + } + } + + private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) { + Preconditions.checkNotNull(pendingRequest.computation); + AtomicLong remaining = new AtomicLong(pendingRequest.request.getSerializedSize()); + Consumer chunkWriter = chunk -> { + StreamingCommitRequestChunk.Builder chunkBuilder = + StreamingCommitRequestChunk.newBuilder() + .setRequestId(id) + .setSerializedWorkItemCommit(chunk) + .setComputationId(pendingRequest.computation) + .setShardingKey(pendingRequest.request.getShardingKey()); + if (remaining.addAndGet(-chunk.size()) > 0) { + chunkBuilder.setRemainingBytesForWorkItem(remaining.get()); } + StreamingCommitWorkRequest requestChunk = + StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); + send(requestChunk); + }; + try (ChunkingByteStream s = new ChunkingByteStream(chunkWriter)) { + pendingRequest.request.writeTo(s); + } catch (IllegalStateException|IOException e) { + LOG.info("Stream was broken, request will be retried when stream is reopened.", e); } } } From 7a9c32c55e81a9435d8b2a63518f4aaff69f4edd Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 3 Mar 2022 13:11:24 -0500 Subject: [PATCH 2/7] Chunk commit requests dynamically This ensures they do not materialize all data at once for large requests --- .../worker/windmill/GrpcWindmillServer.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 77e436ba6179..cb47901ed003 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -1493,23 +1493,24 @@ private void flushBytes() { private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) { Preconditions.checkNotNull(pendingRequest.computation); AtomicLong remaining = new AtomicLong(pendingRequest.request.getSerializedSize()); - Consumer chunkWriter = chunk -> { - StreamingCommitRequestChunk.Builder chunkBuilder = - StreamingCommitRequestChunk.newBuilder() - .setRequestId(id) - .setSerializedWorkItemCommit(chunk) - .setComputationId(pendingRequest.computation) - .setShardingKey(pendingRequest.request.getShardingKey()); - if (remaining.addAndGet(-chunk.size()) > 0) { - chunkBuilder.setRemainingBytesForWorkItem(remaining.get()); - } - StreamingCommitWorkRequest requestChunk = - StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); - send(requestChunk); - }; + Consumer chunkWriter = + chunk -> { + StreamingCommitRequestChunk.Builder chunkBuilder = + StreamingCommitRequestChunk.newBuilder() + .setRequestId(id) + .setSerializedWorkItemCommit(chunk) + .setComputationId(pendingRequest.computation) + .setShardingKey(pendingRequest.request.getShardingKey()); + if (remaining.addAndGet(-chunk.size()) > 0) { + chunkBuilder.setRemainingBytesForWorkItem(remaining.get()); + } + StreamingCommitWorkRequest requestChunk = + StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); + send(requestChunk); + }; try (ChunkingByteStream s = new ChunkingByteStream(chunkWriter)) { pendingRequest.request.writeTo(s); - } catch (IllegalStateException|IOException e) { + } catch (IllegalStateException | IOException e) { LOG.info("Stream was broken, request will be retried when stream is reopened.", e); } } From 86b5b3d28211df830df431e041c4ec669d172329 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 3 Mar 2022 13:36:22 -0500 Subject: [PATCH 3/7] Chunk commit requests dynamically This ensures they do not materialize all data at once for large requests --- .../worker/windmill/GrpcWindmillServer.java | 50 ++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index cb47901ed003..dc8bdb8563d9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -1463,6 +1463,10 @@ private void issueBatchedRequest(Map requests) { } } + // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE + // before calling the chunkWriter on each. + // + // This avoids materializing the whole serialized request in the case it is large. private class ChunkingByteStream extends OutputStream { private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE); private final Consumer chunkWriter; @@ -1484,7 +1488,22 @@ public void write(int b) throws IOException { } } + @Override + public void write(byte b[], int off, int len) throws IOException { + // Fast path for larger writes that don't make the chunk too large. + if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) { + output.write(b, off, len); + return; + } + for (int i = 0; i < len; i++) { + write(b[off + i]); + } + } + private void flushBytes() { + if (output.size() == 0) { + return; + } chunkWriter.accept(output.toByteString()); output.reset(); } @@ -1492,21 +1511,26 @@ private void flushBytes() { private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) { Preconditions.checkNotNull(pendingRequest.computation); - AtomicLong remaining = new AtomicLong(pendingRequest.request.getSerializedSize()); Consumer chunkWriter = - chunk -> { - StreamingCommitRequestChunk.Builder chunkBuilder = - StreamingCommitRequestChunk.newBuilder() - .setRequestId(id) - .setSerializedWorkItemCommit(chunk) - .setComputationId(pendingRequest.computation) - .setShardingKey(pendingRequest.request.getShardingKey()); - if (remaining.addAndGet(-chunk.size()) > 0) { - chunkBuilder.setRemainingBytesForWorkItem(remaining.get()); + new Consumer() { + private long remaining = pendingRequest.request.getSerializedSize(); + + @Override + public void accept(ByteString byteString) { + StreamingCommitRequestChunk.Builder chunkBuilder = + StreamingCommitRequestChunk.newBuilder() + .setRequestId(id) + .setSerializedWorkItemCommit(chunk) + .setComputationId(pendingRequest.computation) + .setShardingKey(pendingRequest.request.getShardingKey()); + remaining -= chunk.size(); + if (remaining > 0) { + chunkBuilder.setRemainingBytesForWorkItem(remaining.get()); + } + StreamingCommitWorkRequest requestChunk = + StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); + send(requestChunk); } - StreamingCommitWorkRequest requestChunk = - StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); - send(requestChunk); }; try (ChunkingByteStream s = new ChunkingByteStream(chunkWriter)) { pendingRequest.request.writeTo(s); From 839c0fc39fbcf2fd1cac224524a94dd0d23fe911 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 3 Mar 2022 15:38:01 -0500 Subject: [PATCH 4/7] Chunk commit requests dynamically This ensures they do not materialize all data at once for large requests --- .../worker/windmill/GrpcWindmillServer.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index dc8bdb8563d9..fb5162616237 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -1489,14 +1489,16 @@ public void write(int b) throws IOException { } @Override - public void write(byte b[], int off, int len) throws IOException { - // Fast path for larger writes that don't make the chunk too large. - if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) { - output.write(b, off, len); - return; + public void write(byte b[], int currentOffset, int len) throws IOException { + final int endOffset = currentOffset + len; + while ((endOffset - currentOffset) + output.size() >= COMMIT_STREAM_CHUNK_SIZE) { + int writeSize = COMMIT_STREAM_CHUNK_SIZE - output.size(); + output.write(b, currentOffset, writeSize); + currentOffset += writeSize; + flushBytes(); } - for (int i = 0; i < len; i++) { - write(b[off + i]); + if (currentOffset != endOffset) { + output.write(b, currentOffset, endOffset - currentOffset); } } @@ -1516,7 +1518,7 @@ private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest private long remaining = pendingRequest.request.getSerializedSize(); @Override - public void accept(ByteString byteString) { + public void accept(ByteString chunk) { StreamingCommitRequestChunk.Builder chunkBuilder = StreamingCommitRequestChunk.newBuilder() .setRequestId(id) From 2e6e34ffcd1336032dfe7e3208b02a14e8f5929f Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 3 Mar 2022 15:53:41 -0500 Subject: [PATCH 5/7] Chunk commit requests dynamically This ensures they do not materialize all data at once for large requests --- .../runners/dataflow/worker/windmill/GrpcWindmillServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index fb5162616237..a5b0f2098553 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -1527,7 +1527,7 @@ public void accept(ByteString chunk) { .setShardingKey(pendingRequest.request.getShardingKey()); remaining -= chunk.size(); if (remaining > 0) { - chunkBuilder.setRemainingBytesForWorkItem(remaining.get()); + chunkBuilder.setRemainingBytesForWorkItem(remaining); } StreamingCommitWorkRequest requestChunk = StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); From 00a83cb5095903b54d34c3d27f734fe4d859bc60 Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Mon, 7 Mar 2022 12:55:15 -0500 Subject: [PATCH 6/7] Add precondition check --- .../runners/dataflow/worker/windmill/GrpcWindmillServer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index a5b0f2098553..d11428b8755c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -1525,6 +1525,7 @@ public void accept(ByteString chunk) { .setSerializedWorkItemCommit(chunk) .setComputationId(pendingRequest.computation) .setShardingKey(pendingRequest.request.getShardingKey()); + Preconditions.checkState(remaining >= chunk.size()); remaining -= chunk.size(); if (remaining > 0) { chunkBuilder.setRemainingBytesForWorkItem(remaining); From 5f163a8437404228292e5aa3b0adbacbd280f02a Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Wed, 16 Mar 2022 09:46:10 -0400 Subject: [PATCH 7/7] Noop commit to rerun CI --- .../runners/dataflow/worker/windmill/GrpcWindmillServer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index d11428b8755c..781ee596708a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -120,8 +120,8 @@ public class GrpcWindmillServer extends WindmillServerStub { private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServer.class); - // If a connection cannot be established, gRPC will fail fast so this deadline can be relatively - // high. + // If a connection cannot be established, gRPC will fail fast so this deadline can be + // relatively high. private static final long DEFAULT_UNARY_RPC_DEADLINE_SECONDS = 300; private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;