From 417dab5dd99cc5d58d04b0fc0a2215f706834010 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 23 Jan 2025 15:03:27 -0500 Subject: [PATCH 1/2] =?UTF-8?q?Revert=20"[Dataflow=20Streaming=20Appliance?= =?UTF-8?q?]=20Fix=20per=20key=20commit=20size=20validation=20(#3=E2=80=A6?= =?UTF-8?q?"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit edc476634c66bcf5abebfaa4bd5dd8cbe13632d2. --- .../runners/dataflow/LargeCommitTest.java | 73 ------------------- .../dataflow/worker/OperationalLimits.java | 4 +- 2 files changed, 3 insertions(+), 74 deletions(-) delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java deleted file mode 100644 index 26f1e60b6773..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class LargeCommitTest { - - @Rule public transient TestPipeline p = TestPipeline.create(); - - @Test - @Category({ValidatesRunner.class}) - public void testLargeCommit() { - // 5 50MB values shuffling to a single key - String value = bigString('a', 50 << 20); - KV kv = KV.of("a", value); - PCollection>> result = - p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create()); - - PAssert.that(result) - .satisfies( - kvs -> { - assertTrue(kvs.iterator().hasNext()); - KV> outputKV = kvs.iterator().next(); - assertFalse(kvs.iterator().hasNext()); - assertEquals("a", outputKV.getKey()); - assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value)); - return null; - }); - p.run(); - } - - private static String bigString(char c, int size) { - char[] buf = new char[size]; - for (int i = 0; i < size; i++) { - buf[i] = c; - } - return new String(buf); - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java index 2a42e3cfa3f9..84f41c473fe0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -25,6 +25,8 @@ @Internal public abstract class OperationalLimits { + private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20; + // Maximum size of a commit from a single work item. public abstract long getMaxWorkItemCommitBytes(); // Maximum size of a single output element's serialized key. @@ -46,7 +48,7 @@ public abstract static class Builder { public static OperationalLimits.Builder builder() { return new AutoValue_OperationalLimits.Builder() - .setMaxWorkItemCommitBytes(Long.MAX_VALUE) + .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES) .setMaxOutputKeyBytes(Long.MAX_VALUE) .setMaxOutputValueBytes(Long.MAX_VALUE); } From 638c66d2c5df26315b8ab1ac8ca3b15db77ce67f Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 23 Jan 2025 15:08:03 -0500 Subject: [PATCH 2/2] keep main src change --- .../beam/runners/dataflow/worker/OperationalLimits.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java index 84f41c473fe0..2a42e3cfa3f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -25,8 +25,6 @@ @Internal public abstract class OperationalLimits { - private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20; - // Maximum size of a commit from a single work item. public abstract long getMaxWorkItemCommitBytes(); // Maximum size of a single output element's serialized key. @@ -48,7 +46,7 @@ public abstract static class Builder { public static OperationalLimits.Builder builder() { return new AutoValue_OperationalLimits.Builder() - .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES) + .setMaxWorkItemCommitBytes(Long.MAX_VALUE) .setMaxOutputKeyBytes(Long.MAX_VALUE) .setMaxOutputValueBytes(Long.MAX_VALUE); }