Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,9 @@ message ParDoPayload {
// be placed in the pipeline requirements.
map<string, StateSpec> state_specs = 4;

// (Optional) A mapping of local timer names to timer specifications.
// If this is set, the stateful processing requirement should also
// be placed in the pipeline requirements.
map<string, TimerSpec> timer_specs = 5;

// (Optional) A mapping of local timer family names to timer specifications.
// If this is set, the stateful processing requirement should also
// be placed in the pipeline requirements.
// (Optional) A mapping of local timer family names to timer family
// specifications. If this is set, the stateful processing requirement should
// also be placed in the pipeline requirements.
map<string, TimerFamilySpec> timer_family_specs = 9;

// (Optional) Only set when this ParDo contains a splittable DoFn.
Expand Down Expand Up @@ -507,11 +502,6 @@ message SetStateSpec {
string element_coder_id = 1;
}

message TimerSpec {
TimeDomain.Enum time_domain = 1;
string timer_coder_id = 2;
}

message TimerFamilySpec {
TimeDomain.Enum time_domain = 1;
string timer_family_coder_id = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public RunnerApi.PTransform translate(

// https://s.apache.org/beam-portability-timers
// Add a PCollection and coder for each timer. Also treat them as inputs and outputs.
for (String localTimerName : payload.getTimerSpecsMap().keySet()) {
for (String localTimerName : payload.getTimerFamilySpecsMap().keySet()) {
PCollection<?> timerPCollection =
PCollection.createPrimitiveOutputInternal(
// Create a dummy pipeline since we don't want to modify the current
Expand Down Expand Up @@ -294,12 +294,14 @@ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents compon
}

@Override
public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents) {
Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
public Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(
SdkComponents newComponents) {
Map<String, RunnerApi.TimerFamilySpec> timerSpecs = new HashMap<>();
for (Map.Entry<String, TimerDeclaration> timer :
signature.timerDeclarations().entrySet()) {
RunnerApi.TimerSpec spec =
translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
RunnerApi.TimerFamilySpec spec =
translateTimerFamilySpec(
getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
timerSpecs.put(timer.getKey(), spec);
}

Expand Down Expand Up @@ -538,7 +540,8 @@ private static String getMainInputName(
return Iterables.getOnlyElement(
Sets.difference(
ptransform.getInputsMap().keySet(),
Sets.union(payload.getSideInputsMap().keySet(), payload.getTimerSpecsMap().keySet())));
Sets.union(
payload.getSideInputsMap().keySet(), payload.getTimerFamilySpecsMap().keySet())));
}

/** Translate state specs. */
Expand Down Expand Up @@ -654,15 +657,6 @@ private static String registerCoderOrThrow(SdkComponents components, Coder coder
}
}

public static RunnerApi.TimerSpec translateTimerSpec(TimerSpec timer, SdkComponents components) {
return RunnerApi.TimerSpec.newBuilder()
.setTimeDomain(translateTimeDomain(timer.getTimeDomain()))
// TODO: Add support for timer payloads to the SDK
// We currently assume that all payloads are unspecified.
.setTimerCoderId(registerCoderOrThrow(components, Timer.Coder.of(VoidCoder.of())))
.build();
}

public static RunnerApi.TimerFamilySpec translateTimerFamilySpec(
TimerSpec timer, SdkComponents components) {
return RunnerApi.TimerFamilySpec.newBuilder()
Expand Down Expand Up @@ -756,9 +750,7 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform

public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
ParDoPayload payload = getParDoPayload(transform);
return payload.getStateSpecsCount() > 0
|| payload.getTimerSpecsCount() > 0
|| payload.getTimerFamilySpecsCount() > 0;
return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
}

public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform) throws IOException {
Expand All @@ -783,7 +775,7 @@ public interface ParDoLike {
Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
throws IOException;

Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents);
Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(SdkComponents newComponents);

Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(SdkComponents newComponents);

Expand Down Expand Up @@ -822,7 +814,7 @@ public static ParDoPayload payloadForParDoLike(ParDoLike parDo, SdkComponents co
return ParDoPayload.newBuilder()
.setDoFn(parDo.translateDoFn(components))
.putAllStateSpecs(parDo.translateStateSpecs(components))
.putAllTimerSpecs(parDo.translateTimerSpecs(components))
.putAllTimerFamilySpecs(parDo.translateTimerSpecs(components))
.putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components))
.putAllSideInputs(parDo.translateSideInputs(components))
.setRequiresStableInput(parDo.isRequiresStableInput())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
Expand Down Expand Up @@ -403,7 +402,7 @@ public Map<String, StateSpec> translateStateSpecs(SdkComponents components) {
}

@Override
public Map<String, TimerSpec> translateTimerSpecs(SdkComponents components) {
public Map<String, TimerFamilySpec> translateTimerSpecs(SdkComponents components) {
// SDFs don't have timers.
return ImmutableMap.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
* Returns the timestamp of when the timer is scheduled to fire.
*
* <p>The time is relative to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec} that is associated with this timer.
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
* timer.
*/
public abstract Instant getTimestamp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,13 @@ private static boolean canFuseParDo(
try {
ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
if (Maps.filterKeys(
parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s))
parDo.getTransform().getInputsMap(),
s -> payload.getTimerFamilySpecsMap().containsKey(s))
.values()
.contains(candidate.getId())) {
// Allow fusion across timer PCollections because they are a self loop.
return true;
} else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
} else if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) {
// Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for
// a key must execute serially. To avoid checking if the rest of the stage is
// key-partitioned and preserves keys, these ParDos do not fuse into an existing stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private static void validateParDo(
id,
sideInputId);
}
if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) {
checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN));
// TODO: Validate state_specs and timer_specs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ private Set<String> getLocalUserStateNames(PTransform transform) {
private Set<String> getLocalTimerNames(PTransform transform) {
if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) {
try {
return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getTimerSpecsMap().keySet();
return ParDoPayload.parseFrom(transform.getSpec().getPayload())
.getTimerFamilySpecsMap()
.keySet();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testRoundTripToFromTransform() throws Exception {
.setDoFn(FunctionSpec.newBuilder())
.putSideInputs("side_input", SideInput.getDefaultInstance())
.putStateSpecs("user_state", StateSpec.getDefaultInstance())
.putTimerSpecs("timer", TimerSpec.getDefaultInstance())
.putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.runners.core.construction.Environments;
Expand Down Expand Up @@ -1041,7 +1041,7 @@ public void parDoWithTimerRootsStage() {
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(FunctionSpec.newBuilder())
.putTimerSpecs("timer", TimerSpec.getDefaultInstance())
.putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("common")
Expand Down Expand Up @@ -1100,7 +1100,7 @@ public void parDoWithStateAndTimerRootsStage() {
ParDoPayload.newBuilder()
.setDoFn(FunctionSpec.newBuilder())
.putStateSpecs("state", StateSpec.getDefaultInstance())
.putTimerSpecs("timer", TimerSpec.getDefaultInstance())
.putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("common")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
Expand Down Expand Up @@ -339,7 +339,7 @@ public void materializesWithConsumerWithTimer() {
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(FunctionSpec.newBuilder())
.putTimerSpecs("timer", TimerSpec.getDefaultInstance())
.putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("common")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws Exception {
.setDoFn(RunnerApi.FunctionSpec.newBuilder())
.putSideInputs("side_input", RunnerApi.SideInput.getDefaultInstance())
.putStateSpecs("user_state", RunnerApi.StateSpec.getDefaultInstance())
.putTimerSpecs("timer", RunnerApi.TimerSpec.getDefaultInstance())
.putTimerFamilySpecs(
"timer", RunnerApi.TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void testOnTimerExceptionsWrappedAsUserCodeException() {
thrown.expectCause(is(fn.exceptionToThrow));

runner.onTimer(
ThrowingDoFn.TIMER_ID,
TimerDeclaration.PREFIX + ThrowingDoFn.TIMER_ID,
"",
GlobalWindow.INSTANCE,
new Instant(0),
Expand Down Expand Up @@ -160,7 +161,7 @@ public void testTimerSet() {
verify(mockTimerInternals)
.setTimer(
StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE),
DoFnWithTimers.TIMER_ID,
TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID,
"",
currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
Expand Down Expand Up @@ -243,7 +244,7 @@ public void testOnTimerCalled() {
// Mocking is not easily compatible with annotation analysis, so we manually record
// the method call.
runner.onTimer(
DoFnWithTimers.TIMER_ID,
TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID,
"",
GlobalWindow.INSTANCE,
currentTime.plus(offset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerFamilySpec;
import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerSpec;
import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerFamilySpecOrThrow;
Expand Down Expand Up @@ -227,13 +226,14 @@ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents compon
}

@Override
public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(
public Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(
SdkComponents newComponents) {
Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
Map<String, RunnerApi.TimerFamilySpec> timerSpecs = new HashMap<>();
for (Map.Entry<String, DoFnSignature.TimerDeclaration> timer :
signature.timerDeclarations().entrySet()) {
RunnerApi.TimerSpec spec =
translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
RunnerApi.TimerFamilySpec spec =
translateTimerFamilySpec(
getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
timerSpecs.put(timer.getKey(), spec);
}
return timerSpecs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ public Node apply(MutableNetwork<Node, Edge> input) {

// Build the necessary components to inform the SDK Harness of the pipeline's
// user timers and user state.
for (Map.Entry<String, RunnerApi.TimerSpec> entry :
parDoPayload.getTimerSpecsMap().entrySet()) {
for (Map.Entry<String, RunnerApi.TimerFamilySpec> entry :
parDoPayload.getTimerFamilySpecsMap().entrySet()) {
timerIds.add(entry.getKey());
}
for (Map.Entry<String, RunnerApi.StateSpec> entry :
Expand Down
Loading