diff --git a/CHANGES.md b/CHANGES.md index ff931802addf..936602238c09 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -81,6 +81,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed flaky DataflowRunnerTest when running in streaming mode (Java) ([#37371](https://github.com/apache/beam/issues/37371)). ## Security Fixes diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 775e7b91de93..335916a6abca 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -411,8 +411,10 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) { // Replace GCS file paths with local file paths dataflowOptions.setFilesToStage( replaceGcsFilesWithLocalFiles(dataflowOptions.getFilesToStage())); - // The user specifically requested these files, so fail now if they do not exist. - // (automatically detected classpath elements are permitted to not exist, so later + // The user specifically requested these files, so fail now if they do not + // exist. + // (automatically detected classpath elements are permitted to not exist, so + // later // staging will not fail on nonexistent files) dataflowOptions.getFilesToStage().stream() .forEach( @@ -446,7 +448,8 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) { } } - // Verify jobName according to service requirements, truncating converting to lowercase if + // Verify jobName according to service requirements, truncating converting to + // lowercase if // necessary. String jobName = dataflowOptions.getJobName().toLowerCase(); checkArgument( @@ -612,7 +615,8 @@ public final Map, ReplacementOutput> mapOutputs( private List getOverrides(boolean streaming) { ImmutableList.Builder overridesBuilder = ImmutableList.builder(); - // Create is implemented in terms of a Read, so it must precede the override to Read in + // Create is implemented in terms of a Read, so it must precede the override to + // Read in // streaming overridesBuilder .add( @@ -630,7 +634,8 @@ private List getOverrides(boolean streaming) { PTransformMatchers.classEqualTo(Create.Values.class), new AlwaysCreateViaRead())); } - // By default Dataflow runner replaces single-output ParDo with a ParDoSingle override. + // By default Dataflow runner replaces single-output ParDo with a ParDoSingle + // override. // However, we want a different expansion for single-output splittable ParDo. overridesBuilder .add( @@ -661,7 +666,8 @@ private List getOverrides(boolean streaming) { overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); overridesBuilder.add(KafkaIO.Read.KAFKA_REDISTRIBUTE_OVERRIDE); } catch (NoClassDefFoundError e) { - // Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java + // Do nothing. io-kafka is an optional dependency of + // runners-google-cloud-dataflow-java // and only needed when KafkaIO is used in the pipeline. } @@ -687,7 +693,8 @@ private List getOverrides(boolean streaming) { overridesBuilder .add( - // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and + // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, + // and // must precede it PTransformOverride.of( PTransformMatchers.classEqualTo(Read.Bounded.class), @@ -729,8 +736,10 @@ private List getOverrides(boolean streaming) { PTransformOverride.of( PTransformMatchers.stateOrTimerParDoSingle(), BatchStatefulParDoOverrides.singleOutputOverrideFactory())); - // Dataflow Batch runner uses the naive override of the SPLITTABLE_PROCESS_KEYED transform - // for now, but eventually (when liquid sharding is implemented) will also override it + // Dataflow Batch runner uses the naive override of the SPLITTABLE_PROCESS_KEYED + // transform + // for now, but eventually (when liquid sharding is implemented) will also + // override it // natively in the Dataflow service. overridesBuilder.add( PTransformOverride.of( @@ -762,20 +771,24 @@ private List getOverrides(boolean streaming) { new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsIterable.class, this))); } - /* TODO(Beam-4684): Support @RequiresStableInput on Dataflow in a more intelligent way - Use Reshuffle might cause an extra and unnecessary shuffle to be inserted. To enable this, we - should make sure that we do not add extra shuffles for transforms whose input is already stable. - // Uses Reshuffle, so has to be before the Reshuffle override - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.requiresStableInputParDoSingle(), - RequiresStableInputParDoOverrides.singleOutputOverrideFactory())); - // Uses Reshuffle, so has to be before the Reshuffle override - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.requiresStableInputParDoMulti(), - RequiresStableInputParDoOverrides.multiOutputOverrideFactory())); - */ + /* + * TODO(Beam-4684): Support @RequiresStableInput on Dataflow in a more + * intelligent way + * Use Reshuffle might cause an extra and unnecessary shuffle to be inserted. To + * enable this, we + * should make sure that we do not add extra shuffles for transforms whose input + * is already stable. + * // Uses Reshuffle, so has to be before the Reshuffle override + * overridesBuilder.add( + * PTransformOverride.of( + * PTransformMatchers.requiresStableInputParDoSingle(), + * RequiresStableInputParDoOverrides.singleOutputOverrideFactory())); + * // Uses Reshuffle, so has to be before the Reshuffle override + * overridesBuilder.add( + * PTransformOverride.of( + * PTransformMatchers.requiresStableInputParDoMulti(), + * RequiresStableInputParDoOverrides.multiOutputOverrideFactory())); + */ overridesBuilder .add( PTransformOverride.of( @@ -948,11 +961,14 @@ public void leaveCompositeTransform(Node node) { public Map, ReplacementOutput> mapOutputs( Map, PCollection> outputs, PCollectionView newOutput) { /* - The output of View.AsXYZ is a PCollectionView that expands to the PCollection to be materialized. - The PCollectionView itself must have the same tag since that tag may have been embedded in serialized DoFns - previously and cannot easily be rewired. The PCollection may differ, so we rewire it, even if the rewiring - is a noop. - */ + * The output of View.AsXYZ is a PCollectionView that expands to the PCollection + * to be materialized. + * The PCollectionView itself must have the same tag since that tag may have + * been embedded in serialized DoFns + * previously and cannot easily be rewired. The PCollection may differ, so we + * rewire it, even if the rewiring + * is a noop. + */ return ReplacementOutputs.singleton(outputs, newOutput); } } @@ -1062,7 +1078,8 @@ static String normalizeDataflowImageAndTag(String imageAndTag) { || imageAndTag.startsWith("/beam_go_")) { int tagIdx = imageAndTag.lastIndexOf(":"); if (tagIdx > 0) { - // For release candidates, apache/beam_ images has rc tag while Dataflow does not + // For release candidates, apache/beam_ images has rc tag while Dataflow does + // not String tag = imageAndTag.substring(tagIdx); // e,g, ":2.xx.0rc1" int mayRc = tag.toLowerCase().lastIndexOf("rc"); if (mayRc > 0) { @@ -1188,7 +1205,8 @@ private List getDefaultArtifacts() { options.as(DataflowStreamingPipelineOptions.class).getOverrideWindmillBinary(); String dataflowWorkerJar = options.getDataflowWorkerJar(); if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty() && !useUnifiedWorker(options)) { - // Put the user specified worker jar at the start of the classpath, to be consistent with the + // Put the user specified worker jar at the start of the classpath, to be + // consistent with the // built-in worker order. pathsToStageBuilder.add("dataflow-worker.jar=" + dataflowWorkerJar); } @@ -1243,7 +1261,8 @@ private static boolean includesTransformUpgrades(Pipeline pipeline) { @Override public DataflowPipelineJob run(Pipeline pipeline) { - // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded + // Multi-language pipelines and pipelines that include upgrades should + // automatically be upgraded // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); @@ -1326,15 +1345,18 @@ public DataflowPipelineJob run(Pipeline pipeline) { DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); - // This incorrectly puns the worker harness container image (which implements v1beta3 API) + // This incorrectly puns the worker harness container image (which implements + // v1beta3 API) // with the SDK harness image (which implements Fn API). // - // The same Environment is used in different and contradictory ways, depending on whether + // The same Environment is used in different and contradictory ways, depending + // on whether // it is a v1 or v2 job submission. RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(workerHarnessContainerImageURL); - // The SdkComponents for portable an non-portable job submission must be kept distinct. Both + // The SdkComponents for portable an non-portable job submission must be kept + // distinct. Both // need the default environment. SdkComponents portableComponents = SdkComponents.create(); portableComponents.registerEnvironment( @@ -1357,9 +1379,11 @@ public DataflowPipelineJob run(Pipeline pipeline) { "Portable pipeline proto:\n{}", TextFormat.printer().printToString(portablePipelineProto)); } - // Stage the portable pipeline proto, retrieving the staged pipeline path, then update + // Stage the portable pipeline proto, retrieving the staged pipeline path, then + // update // the options on the new job - // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options + // TODO: add an explicit `pipeline` parameter to the submission instead of + // pipeline options LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation()); byte[] serializedProtoPipeline = portablePipelineProto.toByteArray(); @@ -1369,6 +1393,22 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (useUnifiedWorker(options)) { LOG.info("Skipping v1 transform replacements since job will run on v2."); + if (!options.isStreaming()) { + LOG.info("Applying GroupIntoBatches overrides for V2 Batch mode."); + List overrides = + ImmutableList.builder() + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(GroupIntoBatches.class), + new GroupIntoBatchesOverride.BatchGroupIntoBatchesOverrideFactory<>(this))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(GroupIntoBatches.WithShardedKey.class), + new GroupIntoBatchesOverride + .BatchGroupIntoBatchesWithShardedKeyOverrideFactory<>(this))) + .build(); + pipeline.replaceAll(overrides); + } } else { // Now rewrite things to be as needed for v1 (mutates the pipeline) // This way the job submitted is valid for v1 and v2, simultaneously @@ -1468,7 +1508,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { newJob.getEnvironment().setFlexResourceSchedulingGoal("FLEXRS_SPEED_OPTIMIZED"); } - // Represent the minCpuPlatform pipeline option as an experiment, if not already present. + // Represent the minCpuPlatform pipeline option as an experiment, if not already + // present. if (!isNullOrEmpty(dataflowOptions.getMinCpuPlatform())) { List experiments = firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList()); @@ -1498,10 +1539,13 @@ public DataflowPipelineJob run(Pipeline pipeline) { ImmutableList.copyOf( firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList()))); - // Set the Docker container image that executes Dataflow worker harness, residing in Google - // Container Registry. Translator is guaranteed to create a worker pool prior to this point. + // Set the Docker container image that executes Dataflow worker harness, + // residing in Google + // Container Registry. Translator is guaranteed to create a worker pool prior to + // this point. // For runner_v1, only worker_harness_container is set. - // For runner_v2, both worker_harness_container and sdk_harness_container are set to the same + // For runner_v2, both worker_harness_container and sdk_harness_container are + // set to the same // value. String containerImage = getContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { @@ -1541,7 +1585,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { + "to runner v2 jobs. Option has been automatically removed."); } - // Upload the job to GCS and remove the graph object from the API call. The graph + // Upload the job to GCS and remove the graph object from the API call. The + // graph // will be downloaded from GCS by the service. if (hasExperiment(options, "upload_graph")) { DataflowPackage stagedGraph = @@ -1852,18 +1897,21 @@ public CompositeBehavior enterCompositeTransform(Node node) { String rootBigQueryTransform = ""; if (transform.getClass().equals(StorageApiLoads.class)) { StorageApiLoads storageLoads = (StorageApiLoads) transform; - // If the storage load is directing exceptions to an error handler, we don't need to + // If the storage load is directing exceptions to an error handler, we don't + // need to // warn for unconsumed rows if (!storageLoads.usesErrorHandler()) { failedTag = storageLoads.getFailedRowsTag(); } - // For storage API the transform that outputs failed rows is nested one layer below + // For storage API the transform that outputs failed rows is nested one layer + // below // BigQueryIO. rootBigQueryTransform = node.getEnclosingNode().getFullName(); } else if (transform.getClass().equals(StreamingWriteTables.class)) { StreamingWriteTables streamingInserts = (StreamingWriteTables) transform; failedTag = streamingInserts.getFailedRowsTupleTag(); - // For streaming inserts the transform that outputs failed rows is nested two layers + // For streaming inserts the transform that outputs failed rows is nested two + // layers // below BigQueryIO. rootBigQueryTransform = node.getEnclosingNode().getEnclosingNode().getFullName(); } @@ -1898,7 +1946,8 @@ public void visitPrimitiveTransform(Node node) { /** Outputs a warning about PCollection views without deterministic key coders. */ private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) { - // We need to wait till this point to determine the names of the transforms since only + // We need to wait till this point to determine the names of the transforms + // since only // at this time do we know the hierarchy of the transforms otherwise we could // have just recorded the full names during apply time. if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { @@ -2084,10 +2133,14 @@ private static void translateOverriddenPubsubSourceStep( stepTranslationContext.addInput( PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute()); } - // In both cases, the transform needs to read PubsubMessage. However, in case it needs - // the attributes or messageId, we supply an identity "parse fn" so the worker will - // read PubsubMessage's from Windmill and simply pass them around; and in case it - // doesn't need attributes, we're already implicitly using a "Coder" that interprets + // In both cases, the transform needs to read PubsubMessage. However, in case it + // needs + // the attributes or messageId, we supply an identity "parse fn" so the worker + // will + // read PubsubMessage's from Windmill and simply pass them around; and in case + // it + // doesn't need attributes, we're already implicitly using a "Coder" that + // interprets // the data as a PubsubMessage's payload. if (overriddenTransform.getNeedsAttributes() || overriddenTransform.getNeedsMessageId()) { stepTranslationContext.addInput( @@ -2210,7 +2263,8 @@ private static void translate( PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn()))); - // Using a GlobalWindowCoder as a placeholder because GlobalWindowCoder is known coder. + // Using a GlobalWindowCoder as a placeholder because GlobalWindowCoder is known + // coder. stepContext.addEncodingInput( WindowedValues.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE)); stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); @@ -2389,8 +2443,9 @@ public void translate( private static class Deduplicate extends PTransform>, PCollection> { - // Use a finite set of keys to improve bundling. Without this, the key space - // will be the space of ids which is potentially very large, which results in much + // Use a finite set of keys to improve bundling. Without this, the key space + // will be the space of ids which is potentially very large, which results in + // much // more per-key overhead. private static final int NUM_RESHARD_KEYS = 10000; @@ -2402,7 +2457,8 @@ public PCollection expand(PCollection> input) { (ValueWithRecordId value) -> Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS) .withKeyType(TypeDescriptors.integers())) - // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through + // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data + // through // WindmillSink. .apply(Reshuffle.of()) .apply( @@ -2604,7 +2660,8 @@ static class StreamingShardedWriteFactory WriteFilesResult, WriteFiles> { - // We pick 10 as a default, as it works well with the default number of workers started + // We pick 10 as a default, as it works well with the default number of workers + // started // by Dataflow. static final int DEFAULT_NUM_SHARDS = 10; DataflowPipelineWorkerPoolOptions options; @@ -2621,12 +2678,17 @@ static class StreamingShardedWriteFactory WriteFilesResult, WriteFiles> transform) { - // By default, if numShards is not set WriteFiles will produce one file per bundle. In - // streaming, there are large numbers of small bundles, resulting in many tiny files. - // Instead, we pick max workers * 2 to ensure full parallelism, but prevent too-many files. - // (current_num_workers * 2 might be a better choice, but that value is not easily available + // By default, if numShards is not set WriteFiles will produce one file per + // bundle. In + // streaming, there are large numbers of small bundles, resulting in many tiny + // files. + // Instead, we pick max workers * 2 to ensure full parallelism, but prevent + // too-many files. + // (current_num_workers * 2 might be a better choice, but that value is not + // easily available // today). - // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards. + // If the user does not set either numWorkers or maxNumWorkers, default to 10 + // shards. int numShards; if (options.getMaxNumWorkers() > 0) { numShards = options.getMaxNumWorkers() * 2; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ee5a7e1d26c3..31663b840f31 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -206,7 +206,8 @@ *

Implements {@link Serializable} because it is caught in closures. */ @RunWith(JUnit4.class) -// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is +// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of +// errorprone is // released (2.11.0) @SuppressWarnings("unused") public class DataflowRunnerTest implements Serializable { @@ -371,7 +372,8 @@ private DataflowPipelineOptions buildPipelineOptions() throws IOException { options.setProject(PROJECT_ID); options.setTempLocation(VALID_TEMP_BUCKET); options.setRegion(REGION_ID); - // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. + // Set FILES_PROPERTY to empty to prevent a default value calculated from + // classpath. options.setFilesToStage(new ArrayList<>()); options.setDataflowClient(buildMockDataflow(mockJobs)); options.setGcsUtil(mockGcsUtil); @@ -461,7 +463,8 @@ public void testFromOptionsUserAgentFromPipelineInfo() throws Exception { public void testSettingOfSdkPipelineOptions() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); - // These options are important only for this test, and need not be global to the test class + // These options are important only for this test, and need not be global to the + // test class options.setAppName(DataflowRunnerTest.class.getSimpleName()); options.setJobName("some-job-name"); @@ -1152,7 +1155,8 @@ public void testNonExistentProfileLocation() throws IOException { public void testNoProjectFails() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); - // Explicitly set to null to prevent the default instance factory from reading credentials + // Explicitly set to null to prevent the default instance factory from reading + // credentials // from a user's environment, causing this test to fail. options.setProject(null); @@ -1576,8 +1580,10 @@ public void testGcsUploadBufferSizeIsSetForStreamingWhenDefault() throws IOExcep streamingOptions.setRunner(DataflowRunner.class); Pipeline p = Pipeline.create(streamingOptions); - // Instantiation of a runner prior to run() currently has a side effect of mutating the options. - // This could be tested by DataflowRunner.fromOptions(streamingOptions) but would not ensure + // Instantiation of a runner prior to run() currently has a side effect of + // mutating the options. + // This could be tested by DataflowRunner.fromOptions(streamingOptions) but + // would not ensure // that the pipeline itself had the expected options set. p.run(); @@ -1865,7 +1871,8 @@ public void testApplyIsScopedToExactClass() throws IOException { CompositeTransformRecorder recorder = new CompositeTransformRecorder(); p.traverseTopologically(recorder); - // The recorder will also have seen a Create.Values composite as well, but we can't obtain that + // The recorder will also have seen a Create.Values composite as well, but we + // can't obtain that // transform. assertThat( "Expected to have seen CreateTimestamped composite transform.", @@ -2312,6 +2319,16 @@ public void testBatchGroupIntoBatchesWithShardedKeyOverrideCount() throws IOExce verifyGroupIntoBatchesOverrideCount(p, true, true); } + @Test + public void testBatchGroupIntoBatchesWithShardedKeyOverrideCountV2() throws IOException { + PipelineOptions options = buildPipelineOptions(); + options + .as(DataflowPipelineOptions.class) + .setExperiments(Arrays.asList("use_runner_v2", "use_unified_worker")); + Pipeline p = Pipeline.create(options); + verifyGroupIntoBatchesOverrideCount(p, true, true); + } + @Test public void testBatchGroupIntoBatchesWithShardedKeyOverrideBytes() throws IOException { PipelineOptions options = buildPipelineOptions(); @@ -2532,7 +2549,8 @@ public void testEnableAllowDuplicatesForRedistributeWithALO() throws IOException PCollection> output = input.apply(Redistribute.byKey()); pipeline.run(); - // The DataflowRedistributeByKey transform translated from Redistribute should have + // The DataflowRedistributeByKey transform translated from Redistribute should + // have // allowDuplicates set to true. AtomicBoolean redistributeAllowDuplicates = new AtomicBoolean(false); pipeline.traverseTopologically(