Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public void testExtractWordsFn() throws Exception {
DoFnTester<String, String> extractWordsFn =
DoFnTester.of(new ExtractWordsFn());

Assert.assertThat(extractWordsFn.processBatch(" some input words "),
Assert.assertThat(extractWordsFn.processBundle(" some input words "),
CoreMatchers.hasItems("some", "input", "words"));
Assert.assertThat(extractWordsFn.processBatch(" "),
Assert.assertThat(extractWordsFn.processBundle(" "),
CoreMatchers.<String>hasItems());
Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"),
Assert.assertThat(extractWordsFn.processBundle(" some ", " input", " words"),
CoreMatchers.hasItems("some", "input", "words"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testExtractTornadoes() throws Exception {
.set("tornado", true);
DoFnTester<TableRow, Integer> extractWordsFn =
DoFnTester.of(new ExtractTornadoesFn());
Assert.assertThat(extractWordsFn.processBatch(row),
Assert.assertThat(extractWordsFn.processBundle(row),
CoreMatchers.hasItems(6));
}

Expand All @@ -56,7 +56,7 @@ public void testNoTornadoes() throws Exception {
.set("tornado", false);
DoFnTester<TableRow, Integer> extractWordsFn =
DoFnTester.of(new ExtractTornadoesFn());
Assert.assertTrue(extractWordsFn.processBatch(row).isEmpty());
Assert.assertTrue(extractWordsFn.processBundle(row).isEmpty());
}

@Test
Expand All @@ -65,12 +65,12 @@ public void testFormatCounts() throws Exception {
DoFnTester<KV<Integer, Long>, TableRow> formatCountsFn =
DoFnTester.of(new FormatCountsFn());
KV empty[] = {};
List<TableRow> results = formatCountsFn.processBatch(empty);
List<TableRow> results = formatCountsFn.processBundle(empty);
Assert.assertTrue(results.size() == 0);
KV input[] = { KV.of(3, 0L),
KV.of(4, Long.MAX_VALUE),
KV.of(5, Long.MIN_VALUE) };
results = formatCountsFn.processBatch(input);
results = formatCountsFn.processBundle(input);
Assert.assertEquals(results.size(), 3);
Assert.assertEquals(results.get(0).get("month"), 3);
Assert.assertEquals(results.get(0).get("tornado_count"), 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class CombinePerKeyExamplesTest {
public void testExtractLargeWordsFn() throws Exception {
DoFnTester<TableRow, KV<String, String>> extractLargeWordsFn =
DoFnTester.of(new ExtractLargeWordsFn());
List<KV<String, String>> results = extractLargeWordsFn.processBatch(ROWS_ARRAY);
List<KV<String, String>> results = extractLargeWordsFn.processBundle(ROWS_ARRAY);
Assert.assertThat(results, CoreMatchers.hasItem(tuple1));
Assert.assertThat(results, CoreMatchers.hasItem(tuple2));
Assert.assertThat(results, CoreMatchers.hasItem(tuple3));
Expand All @@ -85,7 +85,7 @@ public void testExtractLargeWordsFn() throws Exception {
public void testFormatShakespeareOutputFn() throws Exception {
DoFnTester<KV<String, String>, TableRow> formatShakespeareOutputFn =
DoFnTester.of(new FormatShakespeareOutputFn());
List<TableRow> results = formatShakespeareOutputFn.processBatch(COMBINED_TUPLES_ARRAY);
List<TableRow> results = formatShakespeareOutputFn.processBundle(COMBINED_TUPLES_ARRAY);
Assert.assertThat(results, CoreMatchers.hasItem(resultRow1));
Assert.assertThat(results, CoreMatchers.hasItem(resultRow2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class FilterExamplesTest {
public void testProjectionFn() throws Exception {
DoFnTester<TableRow, TableRow> projectionFn =
DoFnTester.of(new ProjectionFn());
List<TableRow> results = projectionFn.processBatch(ROWS_ARRAY);
List<TableRow> results = projectionFn.processBundle(ROWS_ARRAY);
Assert.assertThat(results, CoreMatchers.hasItem(outRow1));
Assert.assertThat(results, CoreMatchers.hasItem(outRow2));
Assert.assertThat(results, CoreMatchers.hasItem(outRow3));
Expand All @@ -81,7 +81,7 @@ public void testProjectionFn() throws Exception {
public void testFilterSingleMonthDataFn() throws Exception {
DoFnTester<TableRow, TableRow> filterSingleMonthDataFn =
DoFnTester.of(new FilterSingleMonthDataFn(7));
List<TableRow> results = filterSingleMonthDataFn.processBatch(PROJROWS_ARRAY);
List<TableRow> results = filterSingleMonthDataFn.processBundle(PROJROWS_ARRAY);
Assert.assertThat(results, CoreMatchers.hasItem(outRow2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class JoinExamplesTest {
public void testExtractEventDataFn() throws Exception {
DoFnTester<TableRow, KV<String, String>> extractEventDataFn =
DoFnTester.of(new ExtractEventDataFn());
List<KV<String, String>> results = extractEventDataFn.processBatch(EVENTS);
List<KV<String, String>> results = extractEventDataFn.processBundle(EVENTS);
Assert.assertThat(results, CoreMatchers.hasItem(kv1));
Assert.assertThat(results, CoreMatchers.hasItem(kv2));
}
Expand All @@ -96,7 +96,7 @@ public void testExtractEventDataFn() throws Exception {
public void testExtractCountryInfoFn() throws Exception {
DoFnTester<TableRow, KV<String, String>> extractCountryInfoFn =
DoFnTester.of(new ExtractCountryInfoFn());
List<KV<String, String>> results = extractCountryInfoFn.processBatch(CCS);
List<KV<String, String>> results = extractCountryInfoFn.processBundle(CCS);
Assert.assertThat(results, CoreMatchers.hasItem(kv3));
Assert.assertThat(results, CoreMatchers.hasItem(kv4));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class MaxPerKeyExamplesTest {
public void testExtractTempFn() throws Exception {
DoFnTester<TableRow, KV<Integer, Double>> extractTempFn =
DoFnTester.of(new ExtractTempFn());
List<KV<Integer, Double>> results = extractTempFn.processBatch(TEST_ROWS);
List<KV<Integer, Double>> results = extractTempFn.processBundle(TEST_ROWS);
Assert.assertThat(results, CoreMatchers.hasItem(kv1));
Assert.assertThat(results, CoreMatchers.hasItem(kv2));
Assert.assertThat(results, CoreMatchers.hasItem(kv3));
Expand All @@ -79,7 +79,7 @@ public void testExtractTempFn() throws Exception {
public void testFormatMaxesFn() throws Exception {
DoFnTester<KV<Integer, Double>, TableRow> formatMaxesFnFn =
DoFnTester.of(new FormatMaxesFn());
List<TableRow> results = formatMaxesFnFn.processBatch(TEST_KVS);
List<TableRow> results = formatMaxesFnFn.processBundle(TEST_KVS);
Assert.assertThat(results, CoreMatchers.hasItem(resultRow1));
Assert.assertThat(results, CoreMatchers.hasItem(resultRow2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ public void testExtractTotalFlow() throws Exception {
DoFnTester<String, KV<String, Integer>> extractFlowInfow = DoFnTester
.of(new ExtractFlowInfo());

List<KV<String, Integer>> results = extractFlowInfow.processBatch(INPUT);
List<KV<String, Integer>> results = extractFlowInfow.processBundle(INPUT);
Assert.assertEquals(results.size(), 1);
Assert.assertEquals(results.get(0).getKey(), "94");
Assert.assertEquals(results.get(0).getValue(), new Integer(29));

List<KV<String, Integer>> output = extractFlowInfow.processBatch("");
List<KV<String, Integer>> output = extractFlowInfow.processBundle("");
Assert.assertEquals(output.size(), 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testParseEventFn() throws Exception {
DoFnTester<String, GameActionInfo> parseEventFn =
DoFnTester.of(new ParseEventFn());

List<GameActionInfo> results = parseEventFn.processBatch(GAME_EVENTS_ARRAY);
List<GameActionInfo> results = parseEventFn.processBundle(GAME_EVENTS_ARRAY);
Assert.assertEquals(results.size(), 8);
Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo");
Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ public void testBatchViewAsSingletonToIsmRecord() throws Exception {
<String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));

assertThat(
doFnTester.processBatch(
doFnTester.processBundle(
ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(
0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))),
contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a"))));
Expand All @@ -978,7 +978,7 @@ public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException

thrown.expect(IllegalStateException.class);
thrown.expectMessage("found for singleton within window");
doFnTester.processBatch(ImmutableList.of(
doFnTester.processBundle(ImmutableList.of(
KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(0,
ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
Expand All @@ -990,7 +990,7 @@ public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception {
DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>());

// The order of the output elements is important relative to processing order
assertThat(doFnTester.processBatch(ImmutableList.of("a", "b", "c")), contains(
assertThat(doFnTester.processBundle(ImmutableList.of("a", "b", "c")), contains(
IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")),
IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")),
IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c"))));
Expand Down Expand Up @@ -1028,7 +1028,7 @@ public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception
)));

// The order of the output elements is important relative to processing order
assertThat(doFnTester.processBatch(inputElements), contains(
assertThat(doFnTester.processBundle(inputElements), contains(
IsmRecord.of(ImmutableList.of(windowA, 0L),
WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
IsmRecord.of(ImmutableList.of(windowA, 1L),
Expand Down Expand Up @@ -1100,7 +1100,7 @@ public void testToIsmRecordForMapLikeDoFn() throws Exception {
WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))));

// The order of the output elements is important relative to processing order
assertThat(doFnTester.processBatch(inputElements), contains(
assertThat(doFnTester.processBundle(inputElements), contains(
IsmRecord.of(
ImmutableList.of(1L, windowA, 0L),
WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
Expand Down Expand Up @@ -1188,7 +1188,7 @@ public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() thro

thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unique keys are expected but found key");
doFnTester.processBatch(inputElements);
doFnTester.processBundle(inputElements);
}

@Test
Expand Down Expand Up @@ -1231,7 +1231,7 @@ public void testToIsmMetadataRecordForSizeDoFn() throws Exception {
KV.of(windowC, 9L))));

// The order of the output elements is important relative to processing order
assertThat(doFnTester.processBatch(inputElements), contains(
assertThat(doFnTester.processBundle(inputElements), contains(
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)),
Expand Down Expand Up @@ -1286,7 +1286,7 @@ public void testToIsmMetadataRecordForKeyDoFn() throws Exception {
KV.of(windowC, 3L))));

// The order of the output elements is important relative to processing order
assertThat(doFnTester.processBatch(inputElements), contains(
assertThat(doFnTester.processBundle(inputElements), contains(
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)),
Expand Down Expand Up @@ -1339,7 +1339,7 @@ public void testToMapDoFn() throws Exception {
List<IsmRecord<WindowedValue<TransformedMap<Long,
WindowedValue<Long>,
Long>>>> output =
doFnTester.processBatch(inputElements);
doFnTester.processBundle(inputElements);
assertEquals(3, output.size());
Map<Long, Long> outputMap;

Expand Down Expand Up @@ -1396,7 +1396,7 @@ public void testToMultimapDoFn() throws Exception {
List<IsmRecord<WindowedValue<TransformedMap<Long,
Iterable<WindowedValue<Long>>,
Iterable<Long>>>>> output =
doFnTester.processBatch(inputElements);
doFnTester.processBundle(inputElements);
assertEquals(3, output.size());
Map<Long, Iterable<Long>> outputMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@
*
* DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn);
*
* // Set arguments shared across all batches:
* // Set arguments shared across all bundles:
* fnTester.setSideInputs(...); // If fn takes side inputs.
* fnTester.setSideOutputTags(...); // If fn writes to side outputs.
*
* // Process a batch containing a single input element:
* // Process a bundle containing a single input element:
* Input testInput = ...;
* List<OutputT> testOutputs = fnTester.processBatch(testInput);
* List<OutputT> testOutputs = fnTester.processBundle(testInput);
* Assert.assertThat(testOutputs,
* JUnitMatchers.hasItems(...));
*
* // Process a bigger batch:
* Assert.assertThat(fnTester.processBatch(i1, i2, ...),
* // Process a bigger bundle:
* Assert.assertThat(fnTester.processBundle(i1, i2, ...),
* JUnitMatchers.hasItems(...));
* } </pre>
*
Expand Down Expand Up @@ -163,7 +163,7 @@ public void setSideOutputTags(TupleTagList sideOutputTags) {
* calls {@link #finishBundle}, then returns the result of
* {@link #takeOutputElements}.
*/
public List<OutputT> processBatch(Iterable <? extends InputT> inputElements) throws Exception {
public List<OutputT> processBundle(Iterable <? extends InputT> inputElements) throws Exception {
startBundle();
for (InputT inputElement : inputElements) {
processElement(inputElement);
Expand All @@ -184,8 +184,8 @@ public List<OutputT> processBatch(Iterable <? extends InputT> inputElements) thr
* </ol>
*/
@SafeVarargs
public final List<OutputT> processBatch(InputT... inputElements) throws Exception {
return processBatch(Arrays.asList(inputElements));
public final List<OutputT> processBundle(InputT... inputElements) throws Exception {
return processBundle(Arrays.asList(inputElements));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public void processBatch() throws Exception {
CounterDoFn counterDoFn = new CounterDoFn();
DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);

// processBatch() returns all the output like takeOutputElements().
List<String> take = tester.processBatch(1L, 2L, 3L, 4L);
// processBundle() returns all the output like takeOutputElements().
List<String> take = tester.processBundle(1L, 2L, 3L, 4L);

assertThat(take, hasItems("1", "2", "3", "4"));

Expand Down Expand Up @@ -178,7 +178,7 @@ public void processElementWithTimestamp() throws Exception {
public void getAggregatorValuesShouldGetValueOfCounter() throws Exception {
CounterDoFn counterDoFn = new CounterDoFn();
DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
tester.processBatch(1L, 2L, 4L, 8L);
tester.processBundle(1L, 2L, 4L, 8L);

Long aggregatorVal = tester.getAggregatorValue(counterDoFn.agg);

Expand All @@ -189,7 +189,7 @@ public void getAggregatorValuesShouldGetValueOfCounter() throws Exception {
public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception {
CounterDoFn counterDoFn = new CounterDoFn();
DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
tester.processBatch();
tester.processBundle();
Long aggregatorVal = tester.getAggregatorValue(counterDoFn.agg);
// empty bundle
assertThat(aggregatorVal, equalTo(0L));
Expand All @@ -199,7 +199,7 @@ public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception
public void getAggregatorValuesInStartFinishBundleShouldGetValues() throws Exception {
CounterDoFn fn = new CounterDoFn(1L, 2L);
DoFnTester<Long, String> tester = DoFnTester.of(fn);
tester.processBatch(0L, 0L);
tester.processBundle(0L, 0L);

Long aggValue = tester.getAggregatorValue(fn.agg);
assertThat(aggValue, equalTo(1L + 2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public void testConsumingDoFn() throws Exception {
purchasesTag,
addressesTag,
namesTag))
.processBatch(
.processBundle(
KV.of(1, result1),
KV.of(2, result2),
KV.of(3, result3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public void testExtractWordsFn() throws Exception {
DoFnTester<String, String> extractWordsFn =
DoFnTester.of(new ExtractWordsFn());

Assert.assertThat(extractWordsFn.processBatch(" some input words "),
Assert.assertThat(extractWordsFn.processBundle(" some input words "),
CoreMatchers.hasItems("some", "input", "words"));
Assert.assertThat(extractWordsFn.processBatch(" "),
Assert.assertThat(extractWordsFn.processBundle(" "),
CoreMatchers.<String>hasItems());
Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"),
Assert.assertThat(extractWordsFn.processBundle(" some ", " input", " words"),
CoreMatchers.hasItems("some", "input", "words"));
}

Expand Down