diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index e5d5eb6b1804..17229d2e261f 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -442,14 +442,9 @@ message ParDoPayload { // be placed in the pipeline requirements. map state_specs = 4; - // (Optional) A mapping of local timer names to timer specifications. - // If this is set, the stateful processing requirement should also - // be placed in the pipeline requirements. - map timer_specs = 5; - - // (Optional) A mapping of local timer family names to timer specifications. - // If this is set, the stateful processing requirement should also - // be placed in the pipeline requirements. + // (Optional) A mapping of local timer family names to timer family + // specifications. If this is set, the stateful processing requirement should + // also be placed in the pipeline requirements. map timer_family_specs = 9; // (Optional) Only set when this ParDo contains a splittable DoFn. @@ -507,11 +502,6 @@ message SetStateSpec { string element_coder_id = 1; } -message TimerSpec { - TimeDomain.Enum time_domain = 1; - string timer_coder_id = 2; -} - message TimerFamilySpec { TimeDomain.Enum time_domain = 1; string timer_family_coder_id = 2; diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 256a31d744e7..969ae25c517f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -185,7 +185,7 @@ public RunnerApi.PTransform translate( // https://s.apache.org/beam-portability-timers // Add a PCollection and coder for each timer. Also treat them as inputs and outputs. - for (String localTimerName : payload.getTimerSpecsMap().keySet()) { + for (String localTimerName : payload.getTimerFamilySpecsMap().keySet()) { PCollection timerPCollection = PCollection.createPrimitiveOutputInternal( // Create a dummy pipeline since we don't want to modify the current @@ -294,12 +294,14 @@ public Map translateStateSpecs(SdkComponents compon } @Override - public Map translateTimerSpecs(SdkComponents newComponents) { - Map timerSpecs = new HashMap<>(); + public Map translateTimerSpecs( + SdkComponents newComponents) { + Map timerSpecs = new HashMap<>(); for (Map.Entry timer : signature.timerDeclarations().entrySet()) { - RunnerApi.TimerSpec spec = - translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents); + RunnerApi.TimerFamilySpec spec = + translateTimerFamilySpec( + getTimerSpecOrThrow(timer.getValue(), doFn), newComponents); timerSpecs.put(timer.getKey(), spec); } @@ -538,7 +540,8 @@ private static String getMainInputName( return Iterables.getOnlyElement( Sets.difference( ptransform.getInputsMap().keySet(), - Sets.union(payload.getSideInputsMap().keySet(), payload.getTimerSpecsMap().keySet()))); + Sets.union( + payload.getSideInputsMap().keySet(), payload.getTimerFamilySpecsMap().keySet()))); } /** Translate state specs. */ @@ -654,15 +657,6 @@ private static String registerCoderOrThrow(SdkComponents components, Coder coder } } - public static RunnerApi.TimerSpec translateTimerSpec(TimerSpec timer, SdkComponents components) { - return RunnerApi.TimerSpec.newBuilder() - .setTimeDomain(translateTimeDomain(timer.getTimeDomain())) - // TODO: Add support for timer payloads to the SDK - // We currently assume that all payloads are unspecified. - .setTimerCoderId(registerCoderOrThrow(components, Timer.Coder.of(VoidCoder.of()))) - .build(); - } - public static RunnerApi.TimerFamilySpec translateTimerFamilySpec( TimerSpec timer, SdkComponents components) { return RunnerApi.TimerFamilySpec.newBuilder() @@ -756,9 +750,7 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform public static boolean usesStateOrTimers(AppliedPTransform transform) throws IOException { ParDoPayload payload = getParDoPayload(transform); - return payload.getStateSpecsCount() > 0 - || payload.getTimerSpecsCount() > 0 - || payload.getTimerFamilySpecsCount() > 0; + return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0; } public static boolean isSplittable(AppliedPTransform transform) throws IOException { @@ -783,7 +775,7 @@ public interface ParDoLike { Map translateStateSpecs(SdkComponents components) throws IOException; - Map translateTimerSpecs(SdkComponents newComponents); + Map translateTimerSpecs(SdkComponents newComponents); Map translateTimerFamilySpecs(SdkComponents newComponents); @@ -822,7 +814,7 @@ public static ParDoPayload payloadForParDoLike(ParDoLike parDo, SdkComponents co return ParDoPayload.newBuilder() .setDoFn(parDo.translateDoFn(components)) .putAllStateSpecs(parDo.translateStateSpecs(components)) - .putAllTimerSpecs(parDo.translateTimerSpecs(components)) + .putAllTimerFamilySpecs(parDo.translateTimerSpecs(components)) .putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components)) .putAllSideInputs(parDo.translateSideInputs(components)) .setRequiresStableInput(parDo.isRequiresStableInput()) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 4dab59e1b9bc..86384efd0422 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -32,7 +32,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec; -import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike; import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator; @@ -403,7 +402,7 @@ public Map translateStateSpecs(SdkComponents components) { } @Override - public Map translateTimerSpecs(SdkComponents components) { + public Map translateTimerSpecs(SdkComponents components) { // SDFs don't have timers. return ImmutableMap.of(); } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java index 072dbc70416c..66afe7c1aaba 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java @@ -54,7 +54,8 @@ public static Timer of(Instant timestamp, @Nullable T payload) { * Returns the timestamp of when the timer is scheduled to fire. * *

The time is relative to the time domain defined in the {@link - * org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec} that is associated with this timer. + * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this + * timer. */ public abstract Instant getTimestamp(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java index 3d7d4146282f..b3f18a29d51c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java @@ -206,12 +206,13 @@ private static boolean canFuseParDo( try { ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); if (Maps.filterKeys( - parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s)) + parDo.getTransform().getInputsMap(), + s -> payload.getTimerFamilySpecsMap().containsKey(s)) .values() .contains(candidate.getId())) { // Allow fusion across timer PCollections because they are a self loop. return true; - } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { + } else if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) { // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for // a key must execute serially. To avoid checking if the rest of the stage is // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java index c80947bfc95b..34e74a76596c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java @@ -229,7 +229,7 @@ private static void validateParDo( id, sideInputId); } - if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { + if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) { checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)); // TODO: Validate state_specs and timer_specs } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java index 099294de202b..1f5e3811714d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java @@ -414,7 +414,9 @@ private Set getLocalUserStateNames(PTransform transform) { private Set getLocalTimerNames(PTransform transform) { if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) { try { - return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getTimerSpecsMap().keySet(); + return ParDoPayload.parseFrom(transform.getSpec().getPayload()) + .getTimerFamilySpecsMap() + .keySet(); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java index 9a69ed877ed8..6101e6a59ed2 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java @@ -36,7 +36,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec; -import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec; +import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PTransformTranslation; @@ -68,7 +68,7 @@ public void testRoundTripToFromTransform() throws Exception { .setDoFn(FunctionSpec.newBuilder()) .putSideInputs("side_input", SideInput.getDefaultInstance()) .putStateSpecs("user_state", StateSpec.getDefaultInstance()) - .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) + .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance()) .build() .toByteString())) .setEnvironmentId("foo") diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java index 97b8091e84a6..78cba3129125 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java @@ -43,7 +43,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec; -import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec; +import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy; import org.apache.beam.runners.core.construction.Environments; @@ -1041,7 +1041,7 @@ public void parDoWithTimerRootsStage() { .setPayload( ParDoPayload.newBuilder() .setDoFn(FunctionSpec.newBuilder()) - .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) + .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance()) .build() .toByteString())) .setEnvironmentId("common") @@ -1100,7 +1100,7 @@ public void parDoWithStateAndTimerRootsStage() { ParDoPayload.newBuilder() .setDoFn(FunctionSpec.newBuilder()) .putStateSpecs("state", StateSpec.getDefaultInstance()) - .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) + .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance()) .build() .toByteString())) .setEnvironmentId("common") diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java index 8dc0f1ed84e5..c60f5e1cf6fb 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java @@ -36,7 +36,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec; -import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec; +import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PTransformTranslation; @@ -339,7 +339,7 @@ public void materializesWithConsumerWithTimer() { .setPayload( ParDoPayload.newBuilder() .setDoFn(FunctionSpec.newBuilder()) - .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) + .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance()) .build() .toByteString())) .setEnvironmentId("common") diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java index 69336b7d0cba..5d11bfc13b73 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java @@ -62,7 +62,8 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws Exception { .setDoFn(RunnerApi.FunctionSpec.newBuilder()) .putSideInputs("side_input", RunnerApi.SideInput.getDefaultInstance()) .putStateSpecs("user_state", RunnerApi.StateSpec.getDefaultInstance()) - .putTimerSpecs("timer", RunnerApi.TimerSpec.getDefaultInstance()) + .putTimerFamilySpecs( + "timer", RunnerApi.TimerFamilySpec.getDefaultInstance()) .build() .toByteString())) .build(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 64a2d4d8b298..e5696bf30a75 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -119,7 +120,7 @@ public void testOnTimerExceptionsWrappedAsUserCodeException() { thrown.expectCause(is(fn.exceptionToThrow)); runner.onTimer( - ThrowingDoFn.TIMER_ID, + TimerDeclaration.PREFIX + ThrowingDoFn.TIMER_ID, "", GlobalWindow.INSTANCE, new Instant(0), @@ -160,7 +161,7 @@ public void testTimerSet() { verify(mockTimerInternals) .setTimer( StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE), - DoFnWithTimers.TIMER_ID, + TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID, "", currentTime.plus(DoFnWithTimers.TIMER_OFFSET), currentTime.plus(DoFnWithTimers.TIMER_OFFSET), @@ -243,7 +244,7 @@ public void testOnTimerCalled() { // Mocking is not easily compatible with annotation analysis, so we manually record // the method call. runner.onTimer( - DoFnWithTimers.TIMER_ID, + TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID, "", GlobalWindow.INSTANCE, currentTime.plus(offset), diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java index 7b148a2c375a..4b9e8829f5f0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java @@ -19,7 +19,6 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN; import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerFamilySpec; -import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerSpec; import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow; import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerFamilySpecOrThrow; @@ -227,13 +226,14 @@ public Map translateStateSpecs(SdkComponents compon } @Override - public Map translateTimerSpecs( + public Map translateTimerSpecs( SdkComponents newComponents) { - Map timerSpecs = new HashMap<>(); + Map timerSpecs = new HashMap<>(); for (Map.Entry timer : signature.timerDeclarations().entrySet()) { - RunnerApi.TimerSpec spec = - translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents); + RunnerApi.TimerFamilySpec spec = + translateTimerFamilySpec( + getTimerSpecOrThrow(timer.getValue(), doFn), newComponents); timerSpecs.put(timer.getKey(), spec); } return timerSpecs; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java index db250002d92b..29ebcb91698a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java @@ -344,8 +344,8 @@ public Node apply(MutableNetwork input) { // Build the necessary components to inform the SDK Harness of the pipeline's // user timers and user state. - for (Map.Entry entry : - parDoPayload.getTimerSpecsMap().entrySet()) { + for (Map.Entry entry : + parDoPayload.getTimerFamilySpecsMap().entrySet()) { timerIds.add(entry.getKey()); } for (Map.Entry entry : diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java index a8c058699445..550115da12e0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java @@ -63,6 +63,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -164,6 +165,7 @@ public void tearDown() throws Exception { @Test public void testSingleTimerScheduling() throws Exception { final String timerId = "timerId"; + final String timerDeclarationId = TimerDeclaration.PREFIX + timerId; Pipeline p = Pipeline.create(); PCollection output = @@ -227,8 +229,8 @@ public void onTimer( } }); - String timerOutputPCollection = timerSpecMap.get(timerId).outputCollectionId(); - String timerInputPCollection = timerSpecMap.get(timerId).inputCollectionId(); + String timerOutputPCollection = timerSpecMap.get(timerDeclarationId).outputCollectionId(); + String timerInputPCollection = timerSpecMap.get(timerDeclarationId).inputCollectionId(); // Arbitrary offset. long testTimerOffset = 123456; @@ -270,6 +272,8 @@ public void onTimer( public void testMultiTimerScheduling() throws Exception { final String timerId1 = "timerId1"; final String timerId2 = "timerId2"; + final String timerDeclarationId1 = TimerDeclaration.PREFIX + timerId1; + final String timerDeclarationId2 = TimerDeclaration.PREFIX + timerId2; Pipeline p = Pipeline.create(); PCollection output = @@ -362,9 +366,11 @@ public void onTimer2( // Simulate the SDK Harness sending a timer element to the Runner Harness. assertTrue( - timerReceiver.receive(timerSpecMap.get(timerId1).outputCollectionId(), windowedTimer1)); + timerReceiver.receive( + timerSpecMap.get(timerDeclarationId1).outputCollectionId(), windowedTimer1)); assertTrue( - timerReceiver.receive(timerSpecMap.get(timerId2).outputCollectionId(), windowedTimer2)); + timerReceiver.receive( + timerSpecMap.get(timerDeclarationId2).outputCollectionId(), windowedTimer2)); // Expect that we get a timer element when we finish. Object expectedTimer1 = @@ -383,12 +389,12 @@ public void onTimer2( Mockito.verify(timerReceiver, Mockito.never()) .fireTimer( - timerSpecMap.get(timerId1).inputCollectionId(), + timerSpecMap.get(timerDeclarationId1).inputCollectionId(), (WindowedValue>) expectedTimer1); Mockito.verify(timerReceiver, Mockito.never()) .fireTimer( - timerSpecMap.get(timerId2).inputCollectionId(), + timerSpecMap.get(timerDeclarationId2).inputCollectionId(), (WindowedValue>) expectedTimer2); @@ -397,12 +403,12 @@ public void onTimer2( timerReceiver.finish(); Mockito.verify(timerReceiver) .fireTimer( - timerSpecMap.get(timerId1).inputCollectionId(), + timerSpecMap.get(timerDeclarationId1).inputCollectionId(), (WindowedValue>) expectedTimer1); Mockito.verify(timerReceiver) .fireTimer( - timerSpecMap.get(timerId2).inputCollectionId(), + timerSpecMap.get(timerDeclarationId2).inputCollectionId(), (WindowedValue>) expectedTimer2); } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java index 80b3f61ff01e..5bfee8f2753b 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java @@ -354,7 +354,7 @@ private static Map> forTimerSpecs( RunnerApi.ParDoPayload.parseFrom( timerReference.transform().getTransform().getSpec().getPayload()); RunnerApi.TimeDomain.Enum timeDomain = - payload.getTimerSpecsOrThrow(timerReference.localName()).getTimeDomain(); + payload.getTimerFamilySpecsOrThrow(timerReference.localName()).getTimeDomain(); org.apache.beam.sdk.state.TimerSpec spec; switch (timeDomain) { case EVENT_TIME: @@ -380,13 +380,15 @@ private static Map> forTimerSpecs( timerReference.transform().getTransform().getInputsMap().keySet(), Sets.union( payload.getSideInputsMap().keySet(), - payload.getTimerSpecsMap().keySet())))); + payload.getTimerFamilySpecsMap().keySet())))); String timerCoderId = keyValueCoderId( components .getCodersOrThrow(components.getPcollectionsOrThrow(mainInputName).getCoderId()) .getComponentCoderIds(0), - payload.getTimerSpecsOrThrow(timerReference.localName()).getTimerCoderId(), + payload + .getTimerFamilySpecsOrThrow(timerReference.localName()) + .getTimerFamilyCoderId(), components); RunnerApi.PCollection timerCollectionSpec = components diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 1df70db32851..8a423a200850 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -1135,6 +1135,9 @@ static OnWindowExpirationMethod create( */ @AutoValue public abstract static class TimerDeclaration { + + public static final String PREFIX = "ts-"; + public abstract String id(); public abstract Field field(); @@ -1150,6 +1153,9 @@ static TimerDeclaration create(String id, Field field) { */ @AutoValue public abstract static class TimerFamilyDeclaration { + + public static final String PREFIX = "tfs-"; + public abstract String id(); public abstract Field field(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index cea704bf89eb..31ad8170b5f9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -532,7 +532,7 @@ private static DoFnSignature parseSignature(Class> fnClass) HashMap onTimerMethodMap = Maps.newHashMapWithExpectedSize(onTimerMethods.size()); for (Method onTimerMethod : onTimerMethods) { - String id = onTimerMethod.getAnnotation(DoFn.OnTimer.class).value(); + String id = TimerDeclaration.PREFIX + onTimerMethod.getAnnotation(DoFn.OnTimer.class).value(); errors.checkArgument( fnContext.getTimerDeclarations().containsKey(id), "Callback %s is for undeclared timer %s", @@ -560,7 +560,9 @@ private static DoFnSignature parseSignature(Class> fnClass) Maps.newHashMapWithExpectedSize(onTimerFamilyMethods.size()); for (Method onTimerFamilyMethod : onTimerFamilyMethods) { - String id = onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value(); + String id = + TimerFamilyDeclaration.PREFIX + + onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value(); errors.checkArgument( fnContext.getTimerFamilyDeclarations().containsKey(id), "Callback %s is for undeclared timerFamily %s", @@ -1471,14 +1473,14 @@ private static Parameter analyzeExtraParameter( @Nullable private static String getTimerId(List annotations) { - DoFn.TimerId stateId = findFirstOfType(annotations, DoFn.TimerId.class); - return stateId != null ? stateId.value() : null; + DoFn.TimerId timerId = findFirstOfType(annotations, DoFn.TimerId.class); + return timerId != null ? TimerDeclaration.PREFIX + timerId.value() : null; } @Nullable private static String getTimerFamilyId(List annotations) { DoFn.TimerFamily timerFamilyId = findFirstOfType(annotations, DoFn.TimerFamily.class); - return timerFamilyId != null ? timerFamilyId.value() : null; + return timerFamilyId != null ? TimerFamilyDeclaration.PREFIX + timerFamilyId.value() : null; } @Nullable @@ -1761,7 +1763,8 @@ private static ImmutableMap analyzeTimerFamilyDe for (Field field : declaredFieldsWithAnnotation(DoFn.TimerFamily.class, fnClazz, DoFn.class)) { // TimerSpec fields may generally be private, but will be accessed via the signature field.setAccessible(true); - String id = field.getAnnotation(DoFn.TimerFamily.class).value(); + String id = + TimerFamilyDeclaration.PREFIX + field.getAnnotation(DoFn.TimerFamily.class).value(); validateTimerFamilyField(errors, declarations, id, field); declarations.put(id, TimerFamilyDeclaration.create(id, field)); } @@ -1775,7 +1778,9 @@ private static ImmutableMap analyzeTimerDeclarations( for (Field field : declaredFieldsWithAnnotation(DoFn.TimerId.class, fnClazz, DoFn.class)) { // TimerSpec fields may generally be private, but will be accessed via the signature field.setAccessible(true); - String id = field.getAnnotation(DoFn.TimerId.class).value(); + // Add fixed prefix to avoid key collision with TimerFamily. + String id = + DoFnSignature.TimerDeclaration.PREFIX + field.getAnnotation(DoFn.TimerId.class).value(); validateTimerField(errors, declarations, id, field); declarations.put(id, DoFnSignature.TimerDeclaration.create(id, field)); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index dc97ec82cce1..52248650584f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -60,6 +60,7 @@ import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator; @@ -123,7 +124,8 @@ private DoFn.ProcessContinuation invokeProcessElement(DoFn fn) { } private void invokeOnTimer(String timerId, DoFn fn) { - DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, "", mockArgumentProvider); + DoFnInvokers.invokerFor(fn) + .invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider); } @Test @@ -269,7 +271,7 @@ public void processElement(ProcessContext c, @StateId(stateId) ValueState { @TimerId(timerId) @@ -1038,7 +1040,7 @@ public void onMyTimer() { SimpleTimerDoFn fn = new SimpleTimerDoFn(); DoFnInvoker invoker = DoFnInvokers.invokerFor(fn); - invoker.invokeOnTimer(timerId, "", mockArgumentProvider); + invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider); assertThat(fn.status, equalTo("OK now")); } @@ -1067,7 +1069,7 @@ public void onMyTimer(IntervalWindow w) { SimpleTimerDoFn fn = new SimpleTimerDoFn(); DoFnInvoker invoker = DoFnInvokers.invokerFor(fn); - invoker.invokeOnTimer(timerId, "", mockArgumentProvider); + invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider); assertThat(fn.window, equalTo(testWindow)); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index 14143714bdb1..ee5ddc312611 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -72,6 +72,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures.FnAnalysisContext; import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -558,6 +559,7 @@ public void foo(ProcessContext context) {} @Test public void testWindowParamOnTimer() throws Exception { final String timerId = "some-timer-id"; + final String timerDeclarationId = TimerDeclaration.PREFIX + timerId; DoFnSignature sig = DoFnSignatures.getSignature( @@ -572,15 +574,16 @@ public void process(ProcessContext c) {} public void onTimer(BoundedWindow w) {} }.getClass()); - assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), equalTo(1)); + assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(1)); assertThat( - sig.onTimerMethods().get(timerId).extraParameters().get(0), + sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0), instanceOf(WindowParameter.class)); } @Test public void testAllParamsOnTimer() throws Exception { final String timerId = "some-timer-id"; + final String timerDeclarationId = TimerDeclaration.PREFIX + timerId; DoFnSignature sig = DoFnSignatures.getSignature( @@ -596,15 +599,15 @@ public void onTimer( @Timestamp Instant timestamp, TimeDomain timeDomain, BoundedWindow w) {} }.getClass()); - assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), equalTo(3)); + assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(3)); assertThat( - sig.onTimerMethods().get(timerId).extraParameters().get(0), + sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0), instanceOf(TimestampParameter.class)); assertThat( - sig.onTimerMethods().get(timerId).extraParameters().get(1), + sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(1), instanceOf(TimeDomainParameter.class)); assertThat( - sig.onTimerMethods().get(timerId).extraParameters().get(2), + sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(2), instanceOf(WindowParameter.class)); } @@ -631,7 +634,8 @@ public void testDeclAndUsageOfTimerInSuperclass() throws Exception { assertThat(sig.processElement().extraParameters().size(), equalTo(2)); DoFnSignature.TimerDeclaration decl = - sig.timerDeclarations().get(DoFnOverridingAbstractTimerUse.TIMER_ID); + sig.timerDeclarations() + .get(TimerDeclaration.PREFIX + DoFnOverridingAbstractTimerUse.TIMER_ID); TimerParameter timerParam = (TimerParameter) sig.processElement().extraParameters().get(1); assertThat( @@ -656,9 +660,11 @@ public void testOnTimerDeclaredAndUsedInSuperclass() throws Exception { assertThat(sig.onTimerMethods().size(), equalTo(1)); DoFnSignature.TimerDeclaration decl = - sig.timerDeclarations().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID); + sig.timerDeclarations() + .get(TimerDeclaration.PREFIX + DoFnDeclaringTimerAndAbstractCallback.TIMER_ID); DoFnSignature.OnTimerMethod callback = - sig.onTimerMethods().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID); + sig.onTimerMethods() + .get(TimerDeclaration.PREFIX + DoFnDeclaringTimerAndAbstractCallback.TIMER_ID); assertThat( decl.field(), @@ -727,10 +733,11 @@ public void foo(ProcessContext context) {} public void onFoo() {} }.getClass()); + final String timerDeclarationId = TimerDeclaration.PREFIX + "foo"; assertThat(sig.timerDeclarations().size(), equalTo(1)); - DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo"); + DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId); - assertThat(decl.id(), equalTo("foo")); + assertThat(decl.id(), equalTo(timerDeclarationId)); assertThat(decl.field().getName(), equalTo("bizzle")); } @@ -749,14 +756,15 @@ public void foo(ProcessContext context) {} public void onFoo(OnTimerContext c) {} }.getClass()); + final String timerDeclarationId = TimerDeclaration.PREFIX + "foo"; assertThat(sig.timerDeclarations().size(), equalTo(1)); - DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo"); + DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId); - assertThat(decl.id(), equalTo("foo")); + assertThat(decl.id(), equalTo(timerDeclarationId)); assertThat(decl.field().getName(), equalTo("bizzle")); assertThat( - sig.onTimerMethods().get("foo").extraParameters().get(0), + sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0), equalTo((Parameter) Parameter.onTimerContext())); } @@ -796,10 +804,12 @@ public void onFoo() {} // Test classes at the bottom of the file DoFnSignature sig = DoFnSignatures.signatureForDoFn(new DoFnForTestSimpleTimerIdNamedDoFn()); + final String timerDeclarationId = TimerDeclaration.PREFIX + "foo"; + assertThat(sig.timerDeclarations().size(), equalTo(1)); - DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo"); + DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId); - assertThat(decl.id(), equalTo("foo")); + assertThat(decl.id(), equalTo(timerDeclarationId)); assertThat( decl.field(), equalTo(DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle"))); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java index b4553c9f9115..dbf2e2e34b08 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.junit.Before; import org.junit.Rule; @@ -52,7 +53,8 @@ public void setUp() { } private void invokeOnTimer(DoFn fn, String timerId) { - OnTimerInvokers.forTimer(fn, timerId).invokeOnTimer(mockArgumentProvider); + OnTimerInvokers.forTimer(fn, TimerDeclaration.PREFIX + timerId) + .invokeOnTimer(mockArgumentProvider); } @Test @@ -123,7 +125,8 @@ public void onMyTimer() {} @Test public void testStableName() { OnTimerInvoker invoker = - OnTimerInvokers.forTimer(new StableNameTestDoFn(), StableNameTestDoFn.TIMER_ID); + OnTimerInvokers.forTimer( + new StableNameTestDoFn(), TimerDeclaration.PREFIX + StableNameTestDoFn.TIMER_ID); assertThat( invoker.getClass().getName(), @@ -132,7 +135,7 @@ public void testStableName() { "%s$%s$%s$%s", StableNameTestDoFn.class.getName(), OnTimerInvoker.class.getSimpleName(), - "timeridwithspecialChars" /* alphanum only; human readable but not unique */, - "dGltZXItaWQud2l0aCBzcGVjaWFsQ2hhcnN7fQ" /* base64 encoding of UTF-8 timerId */))); + "tstimeridwithspecialChars" /* alphanum only; human readable but not unique */, + "dHMtdGltZXItaWQud2l0aCBzcGVjaWFsQ2hhcnN7fQ" /* base64 encoding of UTF-8 timerId */))); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index ec26195f7b1d..49aa85087d6e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -203,7 +203,7 @@ static class Factory maybeWindowedValueInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId()); // TODO: Stop passing windowed value coders within PCollections. diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 426bf85faed8..7ed8b825eb47 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -84,6 +84,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -802,11 +803,14 @@ public void testTimers() throws Exception { RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection); String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection); - String eventTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event"; - String eventTimerOutputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event.output"; - String processingTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).processing"; + String eventTimerInputPCollectionId = + "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "event"; + String eventTimerOutputPCollectionId = + "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "event.output"; + String processingTimerInputPCollectionId = + "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "processing"; String processingTimerOutputPCollectionId = - "pTransformId/ParMultiDo(TestTimerful).processing.output"; + "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "processing.output"; RunnerApi.PTransform pTransform = pProto @@ -816,8 +820,8 @@ public void testTimers() throws Exception { .toBuilder() // We need to re-write the "output" PCollections that a runner would have inserted // on the way to a output sink. - .putOutputs("event", eventTimerOutputPCollectionId) - .putOutputs("processing", processingTimerOutputPCollectionId) + .putOutputs(TimerDeclaration.PREFIX + "event", eventTimerOutputPCollectionId) + .putOutputs(TimerDeclaration.PREFIX + "processing", processingTimerOutputPCollectionId) .build(); FakeBeamFnStateClient fakeClient = diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 96789e6c2818..8ccd2d44a1a6 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -253,8 +253,7 @@ def add_requirements(transform_id): if payload.requests_finalization: expected_requirements.add( common_urns.requirements.REQUIRES_BUNDLE_FINALIZATION.urn) - if (payload.state_specs or payload.timer_specs or - payload.timer_family_specs): + if (payload.state_specs or payload.timer_family_specs): expected_requirements.add( common_urns.requirements.REQUIRES_STATEFUL_PROCESSING.urn) if payload.requires_stable_input: diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py index 884fb485546d..83c9315b230d 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py @@ -255,7 +255,7 @@ def executable_stage_transform(self, user_states.append( beam_runner_api_pb2.ExecutableStagePayload.UserStateId( transform_id=transform_id, local_name=tag)) - for tag in payload.timer_specs.keys(): + for tag in payload.timer_family_specs.keys(): timers.append( beam_runner_api_pb2.ExecutableStagePayload.TimerId( transform_id=transform_id, local_name=tag)) @@ -633,7 +633,7 @@ def annotate_stateful_dofns_as_roots(stages, pipeline_context): if transform.spec.urn == common_urns.primitives.PAR_DO.urn: pardo_payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) - if pardo_payload.state_specs or pardo_payload.timer_specs: + if pardo_payload.state_specs or pardo_payload.timer_family_specs: stage.forced_root = True yield stage @@ -1319,7 +1319,7 @@ def inject_timer_pcollections(stages, pipeline_context): if transform.spec.urn in PAR_DO_URNS: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) - for tag, spec in payload.timer_specs.items(): + for tag, spec in payload.timer_family_specs.items(): if len(transform.inputs) > 1: raise NotImplementedError('Timers and side inputs.') input_pcoll = pipeline_context.components.pcollections[next( @@ -1334,7 +1334,9 @@ def inject_timer_pcollections(stages, pipeline_context): beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.FunctionSpec( urn=common_urns.coders.KV.urn), - component_coder_ids=[key_coder_id, spec.timer_coder_id])) + component_coder_ids=[ + key_coder_id, spec.timer_family_coder_id + ])) # Inject the read and write pcollections. timer_read_pcoll = unique_name( pipeline_context.components.pcollections, diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index a963a08cb517..918069529bc3 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -601,7 +601,7 @@ def __init__(self, transform_id, # type: str key_coder, # type: coders.Coder window_coder, # type: coders.Coder - timer_specs # type: Mapping[str, beam_runner_api_pb2.TimerSpec] + timer_family_specs # type: Mapping[str, beam_runner_api_pb2.TimerFamilySpec] ): # type: (...) -> None @@ -612,14 +612,14 @@ def __init__(self, transform_id: The name of the PTransform that this context is associated. key_coder: window_coder: - timer_specs: A list of ``userstate.TimerSpec`` objects specifying the - timers associated with this operation. + timer_family_specs: A list of ``userstate.TimerSpec`` objects specifying + the timers associated with this operation. """ self._state_handler = state_handler self._transform_id = transform_id self._key_coder = key_coder self._window_coder = window_coder - self._timer_specs = timer_specs + self._timer_family_specs = timer_family_specs self._timer_receivers = None # type: Optional[Dict[str, operations.ConsumerSet]] self._all_states = { } # type: Dict[tuple, userstate.AccumulatingRuntimeState] @@ -629,7 +629,7 @@ def update_timer_receivers(self, receivers): """TODO""" self._timer_receivers = {} - for tag in self._timer_specs: + for tag in self._timer_family_specs: self._timer_receivers[tag] = receivers.pop(tag) def get_timer( @@ -1478,7 +1478,7 @@ def _create_pardo_operation( if pardo_proto: other_input_tags = set.union( set(pardo_proto.side_inputs), - set(pardo_proto.timer_specs)) # type: Container[str] + set(pardo_proto.timer_family_specs)) # type: Container[str] else: other_input_tags = () pcoll_id, = [pcoll for tag, pcoll in transform_proto.inputs.items() @@ -1488,12 +1488,12 @@ def _create_pardo_operation( serialized_fn = pickler.dumps(dofn_data[:-1] + (windowing, )) timer_inputs = None # type: Optional[Dict[str, str]] - if pardo_proto and (pardo_proto.timer_specs or pardo_proto.state_specs or - pardo_proto.restriction_coder_id): + if pardo_proto and (pardo_proto.timer_family_specs or pardo_proto.state_specs + or pardo_proto.restriction_coder_id): main_input_coder = None # type: Optional[WindowedValueCoder] timer_inputs = {} for tag, pcoll_id in transform_proto.inputs.items(): - if tag in pardo_proto.timer_specs: + if tag in pardo_proto.timer_family_specs: timer_inputs[tag] = pcoll_id elif tag in pardo_proto.side_inputs: pass @@ -1504,13 +1504,13 @@ def _create_pardo_operation( main_input_coder = factory.get_windowed_coder(pcoll_id) assert main_input_coder is not None - if pardo_proto.timer_specs or pardo_proto.state_specs: + if pardo_proto.timer_family_specs or pardo_proto.state_specs: user_state_context = FnApiUserStateContext( factory.state_handler, transform_id, main_input_coder.key_coder(), main_input_coder.window_coder, - timer_specs=pardo_proto.timer_specs + timer_family_specs=pardo_proto.timer_family_specs ) # type: Optional[FnApiUserStateContext] else: user_state_context = None diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 8ebffbdbf36c..4f502c5114a6 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1325,7 +1325,7 @@ def to_runner_api_parameter(self, context): spec.name: spec.to_runner_api(context) for spec in state_specs }, - timer_specs={ + timer_family_specs={ spec.name: spec.to_runner_api(context) for spec in timer_specs }, diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py index 70cdca227cbe..f3ffa88dd482 100644 --- a/sdks/python/apache_beam/transforms/userstate.py +++ b/sdks/python/apache_beam/transforms/userstate.py @@ -131,8 +131,10 @@ def to_runner_api(self, context): class TimerSpec(object): """Specification for a user stateful DoFn timer.""" + prefix = "ts-" + def __init__(self, name, time_domain): - self.name = name + self.name = self.prefix + name if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME): raise ValueError('Unsupported TimeDomain: %r.' % (time_domain, )) self.time_domain = time_domain @@ -142,10 +144,30 @@ def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.name) def to_runner_api(self, context): - # type: (PipelineContext) -> beam_runner_api_pb2.TimerSpec - return beam_runner_api_pb2.TimerSpec( + # type: (PipelineContext) -> beam_runner_api_pb2.TimerFamilySpec + return beam_runner_api_pb2.TimerFamilySpec( + time_domain=TimeDomain.to_runner_api(self.time_domain), + timer_family_coder_id=context.coders.get_id( + coders._TimerCoder(coders.SingletonCoder(None)))) + + +# TODO(BEAM-9602): Provide support for dynamic timer. +class TimerFamilySpec(object): + prefix = "tfs-" + + def __init__(self, name, time_domain): + self.name = self.prefix + name + if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME): + raise ValueError('Unsupported TimeDomain: %r.' % (time_domain, )) + self.time_domain = time_domain + + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, self.name) + + def to_runner_api(self, context): + return beam_runner_api_pb2.TimerFamilySpec( time_domain=TimeDomain.to_runner_api(self.time_domain), - timer_coder_id=context.coders.get_id( + timer_family_coder_id=context.coders.get_id( coders._TimerCoder(coders.SingletonCoder(None)))) diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index 75d9416d28ef..8ad3e95f4299 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -265,7 +265,7 @@ def test_validation_typos(self): # different timer callbacks. with self.assertRaisesRegex( ValueError, - r'Multiple on_timer callbacks registered for TimerSpec\(expiry1\).'): + r'Multiple on_timer callbacks registered for TimerSpec\(.*expiry1\).'): class StatefulDoFnWithTimerWithTypo1(DoFn): # pylint: disable=unused-variable BUFFER_STATE = BagStateSpec('buffer', BytesCoder()) @@ -315,7 +315,7 @@ def __repr__(self): dofn = StatefulDoFnWithTimerWithTypo2() with self.assertRaisesRegex( ValueError, - (r'The on_timer callback for TimerSpec\(expiry1\) is not the ' + (r'The on_timer callback for TimerSpec\(.*expiry1\) is not the ' r'specified .on_expiry_1 method for DoFn ' r'StatefulDoFnWithTimerWithTypo2 \(perhaps it was overwritten\?\).')): validate_stateful_dofn(dofn) @@ -348,7 +348,7 @@ def __repr__(self): with self.assertRaisesRegex( ValueError, (r'DoFn StatefulDoFnWithTimerWithTypo3 has a TimerSpec without an ' - r'associated on_timer callback: TimerSpec\(expiry2\).')): + r'associated on_timer callback: TimerSpec\(.*expiry2\).')): validate_stateful_dofn(dofn)