Skip to content

[Bug]: FnApiRunner does not handle event-time timer correctly. #35168

@shunping

Description

@shunping

What happened?

The issue can be reproduced with the following code.

In this example, the correct behavior is that the event-time timer will never be called. I verified that this is the behavior from dataflow and prism (after #35089 and #35106).

import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.coders import coders
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.transforms.userstate import TimerSpec
from apache_beam.transforms.userstate import on_timer
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

logging.basicConfig(level=logging.INFO)

# direct runner option
options = PipelineOptions([
    "--streaming",
    "--runner=DirectRunner"
])

# prism runner option
# options = PipelineOptions([
#     "--streaming",
#     #"--environment_type=LOOPBACK",
#     #"--runner=PrismRunner",
# ])

# dataflow runner option
# options = PipelineOptions([
#     "--streaming",
#     "--runner=DataflowRunner",
#     "--temp_location=<temp-location>",
#     "--staging_location=<staging-location>",
#     "--project=apache-beam-testing",
#     "--region=us-central1",
#     "--sdk_location=dist/apache_beam-2.66.0.dev0.tar.gz",
# ])


class AnotherTimerDoFn(beam.DoFn):
  COUNT = ReadModifyWriteStateSpec('count', coders.VarInt32Coder())
  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)

  def __init__(self, n, timer_delay, reset_count=True, clear_timer=True):
    self._n = n
    self._timer_delay = timer_delay
    self._reset_count = reset_count
    self._clear_timer = clear_timer

  def start_bundle(self):
    # logging.info("start bundle")
    pass

  def finish_bundle(self):
    # logging.info("finish bundle")
    pass

  def process(self,
              element_pair,
              t=beam.DoFn.TimestampParam,
              count=beam.DoFn.StateParam(COUNT),
              timer=beam.DoFn.TimerParam(TIMER)):
    local_count = count.read() or 0
    local_count += 1

    logging.info(f"get element {element_pair[1]}, count={local_count}")
    if local_count == 1:
      logging.info(f"set timer to {t+self._timer_delay}")
      timer.set(t + self._timer_delay)

    if local_count == self._n:
      if self._reset_count:
        logging.info("reset count")
        local_count = 0

      # don't need the timer now
      if self._clear_timer:
        logging.info("clear timer")
        timer.clear()

    count.write(local_count)

  @on_timer(TIMER)
  def timer_callback(self, t=beam.DoFn.TimestampParam):
    logging.error("Timer should not fire here")
    logging.info(f"timer callback start (timestamp={t})")
    yield (">>>timer fired")


with beam.Pipeline(options=options) as p:
  _ = (
      p | PeriodicImpulse(
          start_timestamp=Timestamp.now(),
          stop_timestamp=Timestamp.now() + 14,
          fire_interval=1)
      | beam.WithKeys(0)
      | beam.ParDo(AnotherTimerDoFn(5, 10, True, True))
      | beam.Map(print))

The expected output is like

INFO:root:get element 1749138092.53311, count=1
INFO:root:set timer to Timestamp(1749138102.533000)
INFO:root:get element 1749138093.53311, count=2
INFO:root:get element 1749138094.53311, count=3
INFO:root:get element 1749138095.53311, count=4
INFO:root:get element 1749138096.53311, count=5
INFO:root:reset count
INFO:root:clear timer
INFO:root:get element 1749138097.53311, count=1
INFO:root:set timer to Timestamp(1749138107.533000)
INFO:root:get element 1749138098.53311, count=2
INFO:root:get element 1749138099.53311, count=3
INFO:root:get element 1749138100.53311, count=4
INFO:root:get element 1749138101.53311, count=5
INFO:root:reset count
INFO:root:clear timer
INFO:root:get element 1749138102.53311, count=1
INFO:root:set timer to Timestamp(1749138112.533000)
INFO:root:get element 1749138103.53311, count=2
INFO:root:get element 1749138104.53311, count=3
INFO:root:get element 1749138105.53311, count=4
INFO:root:get element 1749138106.53311, count=5
INFO:root:reset count
INFO:root:clear timer

However, FnApiRunner gives output as

INFO:root:get element 1749138208.636279, count=1
INFO:root:set timer to Timestamp(1749138218.636279)
ERROR:root:Timer should not fire here
INFO:root:timer callback start (timestamp=Timestamp(1749138218.636000))
>>>timer fired
INFO:root:get element 1749138209.636279, count=2
INFO:root:get element 1749138210.636279, count=3
INFO:root:get element 1749138211.636279, count=4
INFO:root:get element 1749138212.636279, count=5
INFO:root:reset count
INFO:root:clear timer
INFO:root:get element 1749138213.636279, count=1
INFO:root:set timer to Timestamp(1749138223.636279)
ERROR:root:Timer should not fire here
INFO:root:timer callback start (timestamp=Timestamp(1749138223.636000))
>>>timer fired
INFO:root:get element 1749138214.636279, count=2
INFO:root:get element 1749138215.636279, count=3
INFO:root:get element 1749138216.636279, count=4
INFO:root:get element 1749138217.636279, count=5
INFO:root:reset count
INFO:root:clear timer
INFO:root:get element 1749138218.636279, count=1
INFO:root:set timer to Timestamp(1749138228.636279)
ERROR:root:Timer should not fire here
INFO:root:timer callback start (timestamp=Timestamp(1749138228.636000))
>>>timer fired
INFO:root:get element 1749138219.636279, count=2
INFO:root:get element 1749138220.636279, count=3
INFO:root:get element 1749138221.636279, count=4
INFO:root:get element 1749138222.636279, count=5
INFO:root:reset count
INFO:root:clear timer

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions