diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java index 1382612bcfb4..ff117dc8f49a 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java @@ -52,11 +52,11 @@ public void testExtractWordsFn() throws Exception { DoFnTester extractWordsFn = DoFnTester.of(new ExtractWordsFn()); - Assert.assertThat(extractWordsFn.processBatch(" some input words "), + Assert.assertThat(extractWordsFn.processBundle(" some input words "), CoreMatchers.hasItems("some", "input", "words")); - Assert.assertThat(extractWordsFn.processBatch(" "), + Assert.assertThat(extractWordsFn.processBundle(" "), CoreMatchers.hasItems()); - Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"), + Assert.assertThat(extractWordsFn.processBundle(" some ", " input", " words"), CoreMatchers.hasItems("some", "input", "words")); } diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java index 36be5680796d..b986c0bdfa3b 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java @@ -45,7 +45,7 @@ public void testExtractTornadoes() throws Exception { .set("tornado", true); DoFnTester extractWordsFn = DoFnTester.of(new ExtractTornadoesFn()); - Assert.assertThat(extractWordsFn.processBatch(row), + Assert.assertThat(extractWordsFn.processBundle(row), CoreMatchers.hasItems(6)); } @@ -56,7 +56,7 @@ public void testNoTornadoes() throws Exception { .set("tornado", false); DoFnTester extractWordsFn = DoFnTester.of(new ExtractTornadoesFn()); - Assert.assertTrue(extractWordsFn.processBatch(row).isEmpty()); + Assert.assertTrue(extractWordsFn.processBundle(row).isEmpty()); } @Test @@ -65,12 +65,12 @@ public void testFormatCounts() throws Exception { DoFnTester, TableRow> formatCountsFn = DoFnTester.of(new FormatCountsFn()); KV empty[] = {}; - List results = formatCountsFn.processBatch(empty); + List results = formatCountsFn.processBundle(empty); Assert.assertTrue(results.size() == 0); KV input[] = { KV.of(3, 0L), KV.of(4, Long.MAX_VALUE), KV.of(5, Long.MIN_VALUE) }; - results = formatCountsFn.processBatch(input); + results = formatCountsFn.processBundle(input); Assert.assertEquals(results.size(), 3); Assert.assertEquals(results.get(0).get("month"), 3); Assert.assertEquals(results.get(0).get("tornado_count"), 0L); diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java index 5d6456a9233f..6d0b16793865 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java @@ -75,7 +75,7 @@ public class CombinePerKeyExamplesTest { public void testExtractLargeWordsFn() throws Exception { DoFnTester> extractLargeWordsFn = DoFnTester.of(new ExtractLargeWordsFn()); - List> results = extractLargeWordsFn.processBatch(ROWS_ARRAY); + List> results = extractLargeWordsFn.processBundle(ROWS_ARRAY); Assert.assertThat(results, CoreMatchers.hasItem(tuple1)); Assert.assertThat(results, CoreMatchers.hasItem(tuple2)); Assert.assertThat(results, CoreMatchers.hasItem(tuple3)); @@ -85,7 +85,7 @@ public void testExtractLargeWordsFn() throws Exception { public void testFormatShakespeareOutputFn() throws Exception { DoFnTester, TableRow> formatShakespeareOutputFn = DoFnTester.of(new FormatShakespeareOutputFn()); - List results = formatShakespeareOutputFn.processBatch(COMBINED_TUPLES_ARRAY); + List results = formatShakespeareOutputFn.processBundle(COMBINED_TUPLES_ARRAY); Assert.assertThat(results, CoreMatchers.hasItem(resultRow1)); Assert.assertThat(results, CoreMatchers.hasItem(resultRow2)); } diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java index 13beab067380..2598a971dd2f 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java @@ -71,7 +71,7 @@ public class FilterExamplesTest { public void testProjectionFn() throws Exception { DoFnTester projectionFn = DoFnTester.of(new ProjectionFn()); - List results = projectionFn.processBatch(ROWS_ARRAY); + List results = projectionFn.processBundle(ROWS_ARRAY); Assert.assertThat(results, CoreMatchers.hasItem(outRow1)); Assert.assertThat(results, CoreMatchers.hasItem(outRow2)); Assert.assertThat(results, CoreMatchers.hasItem(outRow3)); @@ -81,7 +81,7 @@ public void testProjectionFn() throws Exception { public void testFilterSingleMonthDataFn() throws Exception { DoFnTester filterSingleMonthDataFn = DoFnTester.of(new FilterSingleMonthDataFn(7)); - List results = filterSingleMonthDataFn.processBatch(PROJROWS_ARRAY); + List results = filterSingleMonthDataFn.processBundle(PROJROWS_ARRAY); Assert.assertThat(results, CoreMatchers.hasItem(outRow2)); } } diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java index 22fe6a1fc59f..9b0466740755 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java @@ -87,7 +87,7 @@ public class JoinExamplesTest { public void testExtractEventDataFn() throws Exception { DoFnTester> extractEventDataFn = DoFnTester.of(new ExtractEventDataFn()); - List> results = extractEventDataFn.processBatch(EVENTS); + List> results = extractEventDataFn.processBundle(EVENTS); Assert.assertThat(results, CoreMatchers.hasItem(kv1)); Assert.assertThat(results, CoreMatchers.hasItem(kv2)); } @@ -96,7 +96,7 @@ public void testExtractEventDataFn() throws Exception { public void testExtractCountryInfoFn() throws Exception { DoFnTester> extractCountryInfoFn = DoFnTester.of(new ExtractCountryInfoFn()); - List> results = extractCountryInfoFn.processBatch(CCS); + List> results = extractCountryInfoFn.processBundle(CCS); Assert.assertThat(results, CoreMatchers.hasItem(kv3)); Assert.assertThat(results, CoreMatchers.hasItem(kv4)); } diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java index 9e129a15748d..1d5bcf473c1c 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java @@ -69,7 +69,7 @@ public class MaxPerKeyExamplesTest { public void testExtractTempFn() throws Exception { DoFnTester> extractTempFn = DoFnTester.of(new ExtractTempFn()); - List> results = extractTempFn.processBatch(TEST_ROWS); + List> results = extractTempFn.processBundle(TEST_ROWS); Assert.assertThat(results, CoreMatchers.hasItem(kv1)); Assert.assertThat(results, CoreMatchers.hasItem(kv2)); Assert.assertThat(results, CoreMatchers.hasItem(kv3)); @@ -79,7 +79,7 @@ public void testExtractTempFn() throws Exception { public void testFormatMaxesFn() throws Exception { DoFnTester, TableRow> formatMaxesFnFn = DoFnTester.of(new FormatMaxesFn()); - List results = formatMaxesFnFn.processBatch(TEST_KVS); + List results = formatMaxesFnFn.processBundle(TEST_KVS); Assert.assertThat(results, CoreMatchers.hasItem(resultRow1)); Assert.assertThat(results, CoreMatchers.hasItem(resultRow2)); } diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java index cddce7ff2aa2..6f58389df683 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java @@ -101,12 +101,12 @@ public void testExtractTotalFlow() throws Exception { DoFnTester> extractFlowInfow = DoFnTester .of(new ExtractFlowInfo()); - List> results = extractFlowInfow.processBatch(INPUT); + List> results = extractFlowInfow.processBundle(INPUT); Assert.assertEquals(results.size(), 1); Assert.assertEquals(results.get(0).getKey(), "94"); Assert.assertEquals(results.get(0).getValue(), new Integer(29)); - List> output = extractFlowInfow.processBatch(""); + List> output = extractFlowInfow.processBundle(""); Assert.assertEquals(output.size(), 0); } diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java index 6f581142cb05..cc3e7fa59c42 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java @@ -89,7 +89,7 @@ public void testParseEventFn() throws Exception { DoFnTester parseEventFn = DoFnTester.of(new ParseEventFn()); - List results = parseEventFn.processBatch(GAME_EVENTS_ARRAY); + List results = parseEventFn.processBundle(GAME_EVENTS_ARRAY); Assert.assertEquals(results.size(), 8); Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo"); Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo"); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java index f7068b019aaf..38d4c96cb377 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java @@ -961,7 +961,7 @@ public void testBatchViewAsSingletonToIsmRecord() throws Exception { (GlobalWindow.Coder.INSTANCE)); assertThat( - doFnTester.processBatch( + doFnTester.processBundle( ImmutableList.of(KV.>>>of( 0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))), contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a")))); @@ -978,7 +978,7 @@ public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException thrown.expect(IllegalStateException.class); thrown.expectMessage("found for singleton within window"); - doFnTester.processBatch(ImmutableList.of( + doFnTester.processBundle(ImmutableList.of( KV.>>>of(0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")), KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b")))))); @@ -990,7 +990,7 @@ public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception { DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn()); // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBatch(ImmutableList.of("a", "b", "c")), contains( + assertThat(doFnTester.processBundle(ImmutableList.of("a", "b", "c")), contains( IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")), IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")), IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c")))); @@ -1028,7 +1028,7 @@ public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception ))); // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBatch(inputElements), contains( + assertThat(doFnTester.processBundle(inputElements), contains( IsmRecord.of(ImmutableList.of(windowA, 0L), WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), IsmRecord.of(ImmutableList.of(windowA, 1L), @@ -1100,7 +1100,7 @@ public void testToIsmRecordForMapLikeDoFn() throws Exception { WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))))); // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBatch(inputElements), contains( + assertThat(doFnTester.processBundle(inputElements), contains( IsmRecord.of( ImmutableList.of(1L, windowA, 0L), WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), @@ -1188,7 +1188,7 @@ public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() thro thrown.expect(IllegalStateException.class); thrown.expectMessage("Unique keys are expected but found key"); - doFnTester.processBatch(inputElements); + doFnTester.processBundle(inputElements); } @Test @@ -1231,7 +1231,7 @@ public void testToIsmMetadataRecordForSizeDoFn() throws Exception { KV.of(windowC, 9L)))); // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBatch(inputElements), contains( + assertThat(doFnTester.processBundle(inputElements), contains( IsmRecord.>meta( ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L), CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)), @@ -1286,7 +1286,7 @@ public void testToIsmMetadataRecordForKeyDoFn() throws Exception { KV.of(windowC, 3L)))); // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBatch(inputElements), contains( + assertThat(doFnTester.processBundle(inputElements), contains( IsmRecord.>meta( ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L), CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)), @@ -1339,7 +1339,7 @@ public void testToMapDoFn() throws Exception { List, Long>>>> output = - doFnTester.processBatch(inputElements); + doFnTester.processBundle(inputElements); assertEquals(3, output.size()); Map outputMap; @@ -1396,7 +1396,7 @@ public void testToMultimapDoFn() throws Exception { List>, Iterable>>>> output = - doFnTester.processBatch(inputElements); + doFnTester.processBundle(inputElements); assertEquals(3, output.size()); Map> outputMap; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 332ea1380cfd..f4a223655314 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -56,18 +56,18 @@ * * DoFnTester fnTester = DoFnTester.of(fn); * - * // Set arguments shared across all batches: + * // Set arguments shared across all bundles: * fnTester.setSideInputs(...); // If fn takes side inputs. * fnTester.setSideOutputTags(...); // If fn writes to side outputs. * - * // Process a batch containing a single input element: + * // Process a bundle containing a single input element: * Input testInput = ...; - * List testOutputs = fnTester.processBatch(testInput); + * List testOutputs = fnTester.processBundle(testInput); * Assert.assertThat(testOutputs, * JUnitMatchers.hasItems(...)); * - * // Process a bigger batch: - * Assert.assertThat(fnTester.processBatch(i1, i2, ...), + * // Process a bigger bundle: + * Assert.assertThat(fnTester.processBundle(i1, i2, ...), * JUnitMatchers.hasItems(...)); * } * @@ -163,7 +163,7 @@ public void setSideOutputTags(TupleTagList sideOutputTags) { * calls {@link #finishBundle}, then returns the result of * {@link #takeOutputElements}. */ - public List processBatch(Iterable inputElements) throws Exception { + public List processBundle(Iterable inputElements) throws Exception { startBundle(); for (InputT inputElement : inputElements) { processElement(inputElement); @@ -184,8 +184,8 @@ public List processBatch(Iterable inputElements) thr * */ @SafeVarargs - public final List processBatch(InputT... inputElements) throws Exception { - return processBatch(Arrays.asList(inputElements)); + public final List processBundle(InputT... inputElements) throws Exception { + return processBundle(Arrays.asList(inputElements)); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index ec22251b671d..c6e97a57aef5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -123,8 +123,8 @@ public void processBatch() throws Exception { CounterDoFn counterDoFn = new CounterDoFn(); DoFnTester tester = DoFnTester.of(counterDoFn); - // processBatch() returns all the output like takeOutputElements(). - List take = tester.processBatch(1L, 2L, 3L, 4L); + // processBundle() returns all the output like takeOutputElements(). + List take = tester.processBundle(1L, 2L, 3L, 4L); assertThat(take, hasItems("1", "2", "3", "4")); @@ -178,7 +178,7 @@ public void processElementWithTimestamp() throws Exception { public void getAggregatorValuesShouldGetValueOfCounter() throws Exception { CounterDoFn counterDoFn = new CounterDoFn(); DoFnTester tester = DoFnTester.of(counterDoFn); - tester.processBatch(1L, 2L, 4L, 8L); + tester.processBundle(1L, 2L, 4L, 8L); Long aggregatorVal = tester.getAggregatorValue(counterDoFn.agg); @@ -189,7 +189,7 @@ public void getAggregatorValuesShouldGetValueOfCounter() throws Exception { public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception { CounterDoFn counterDoFn = new CounterDoFn(); DoFnTester tester = DoFnTester.of(counterDoFn); - tester.processBatch(); + tester.processBundle(); Long aggregatorVal = tester.getAggregatorValue(counterDoFn.agg); // empty bundle assertThat(aggregatorVal, equalTo(0L)); @@ -199,7 +199,7 @@ public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception public void getAggregatorValuesInStartFinishBundleShouldGetValues() throws Exception { CounterDoFn fn = new CounterDoFn(1L, 2L); DoFnTester tester = DoFnTester.of(fn); - tester.processBatch(0L, 0L); + tester.processBundle(0L, 0L); Long aggValue = tester.getAggregatorValue(fn.agg); assertThat(aggValue, equalTo(1L + 2L)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index d99c536ca466..10a2a7e4b10e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -438,7 +438,7 @@ public void testConsumingDoFn() throws Exception { purchasesTag, addressesTag, namesTag)) - .processBatch( + .processBundle( KV.of(1, result1), KV.of(2, result2), KV.of(3, result3), diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java index 4b2619874029..debfc78623a2 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java @@ -52,11 +52,11 @@ public void testExtractWordsFn() throws Exception { DoFnTester extractWordsFn = DoFnTester.of(new ExtractWordsFn()); - Assert.assertThat(extractWordsFn.processBatch(" some input words "), + Assert.assertThat(extractWordsFn.processBundle(" some input words "), CoreMatchers.hasItems("some", "input", "words")); - Assert.assertThat(extractWordsFn.processBatch(" "), + Assert.assertThat(extractWordsFn.processBundle(" "), CoreMatchers.hasItems()); - Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"), + Assert.assertThat(extractWordsFn.processBundle(" some ", " input", " words"), CoreMatchers.hasItems("some", "input", "words")); }