Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -415,20 +415,13 @@ private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
@Setup
public void setup() {
invoker = DoFnInvokers.invokerFor(fn);
invoker.invokeSetup();
}

@ProcessElement
public void processElement(ProcessContext context) {
context.output(
KV.of(context.element(), invoker.invokeGetInitialRestriction(context.element())));
}

@Teardown
public void tearDown() {
invoker.invokeTeardown();
invoker = null;
}
}

/** Splits the restriction using the given {@link SplitRestriction} method. */
Expand All @@ -446,7 +439,6 @@ private static class SplitRestrictionFn<InputT, RestrictionT>
@Setup
public void setup() {
invoker = DoFnInvokers.invokerFor(splittableFn);
invoker.invokeSetup();
}

@ProcessElement
Expand All @@ -467,11 +459,5 @@ public void outputWithTimestamp(RestrictionT part, Instant timestamp) {
}
});
}

@Teardown
public void tearDown() {
invoker.invokeTeardown();
invoker = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -513,19 +513,18 @@ private enum State {

private State state = State.BEFORE_SETUP;

@ProcessElement
public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
assertEquals(State.INSIDE_BUNDLE, state);
assertTrue(tracker.tryClaim(0L));
c.output(c.element());
}

@GetInitialRestriction
public OffsetRange getInitialRestriction(String value) {
assertEquals(State.OUTSIDE_BUNDLE, state);
return new OffsetRange(0, 1);
}

@SplitRestriction
public void splitRestriction(
String value, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
assertEquals(State.OUTSIDE_BUNDLE, state);
receiver.output(range);
}

@Setup
public void setUp() {
assertEquals(State.BEFORE_SETUP, state);
Expand All @@ -538,13 +537,6 @@ public void startBundle() {
state = State.INSIDE_BUNDLE;
}

@ProcessElement
public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
assertEquals(State.INSIDE_BUNDLE, state);
assertTrue(tracker.tryClaim(0L));
c.output(c.element());
}

@FinishBundle
public void finishBundle() {
assertEquals(State.INSIDE_BUNDLE, state);
Expand All @@ -561,9 +553,12 @@ public void tearDown() {
@Test
@Category({ValidatesRunner.class, UsesSplittableParDo.class})
public void testLifecycleMethods() throws Exception {

PCollection<String> res =
p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle()));

PAssert.that(res).containsInAnyOrder("a", "b", "c");

p.run();
}

Expand Down