Skip to content

[Bug]: Python event-time timers clearing behavior not consistent on different bundle sizes #35176

@shunping

Description

@shunping

What happened?

Assuming the first event's event time is T, the following code sets an event-time timer to T+1 after receiving the first event. Then on the third event, it clears the timer.

The code behaves differently on different bundle sizes of different runners.

  • On prism, the bundle size seems to be small. So the timer setting operation is on a different bundle than the timer clearing operation. As a result, timer is triggered before the timer clearing operation.
  • On dataflow, the bundle size seems to be bigger. Both setting and clearing are in the same bundle. In the current python implementation, the timer will not be fired because it is cleared before the bundle ends.
class AnotherTimerDoFn(beam.DoFn):
  COUNT = ReadModifyWriteStateSpec('count', coders.VarInt32Coder())
  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)

  def __init__(self):
    pass

  def start_bundle(self):
    logging.info("start bundle")
    pass

  def finish_bundle(self):
    logging.info("finish bundle")
    pass

  def process(self,
              element_pair,
              t=beam.DoFn.TimestampParam,
              count=beam.DoFn.StateParam(COUNT),
              timer=beam.DoFn.TimerParam(TIMER)):
    local_count = count.read() or 0
    local_count += 1

    logging.info(f"get element {element_pair[1]}, count={local_count}")
    if local_count == 1:
      logging.info(f"set timer to {t+1}")
      timer.set(t+1)

    if local_count == 3: # change this to 5 so the clear is on a different bundle
      logging.info("clear timer")
      timer.clear()

    count.write(local_count)

  @on_timer(TIMER)
  def timer_callback(self, t=beam.DoFn.TimestampParam):
    logging.error("Timer should not fire here")
    logging.info(f"timer callback start (timestamp={t})")
    yield (">>>timer fired")


with beam.Pipeline(options=options) as p:
  _ = (
      p | PeriodicImpulse(
          start_timestamp=Timestamp.now(),
          stop_timestamp=Timestamp.now() + 14,
          fire_interval=1)
      | beam.WithKeys(0)
      | beam.ParDo(AnotherTimerDoFn())
      | beam.Map(print))

A workaround of this is to add a state to keep track of the last set timestamp, and only clear the timer when the event time when clearing is triggered is smaller than or equal to the set timestamp.

class AnotherTimerDoFn(beam.DoFn):
  COUNT = ReadModifyWriteStateSpec('count', coders.VarInt32Coder())
  LAST_TIMESTAMP = ReadModifyWriteStateSpec('ts', coders.TimestampCoder())
  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)

  def __init__(self):
    pass

  def start_bundle(self):
    logging.info("start bundle")
    pass

  def finish_bundle(self):
    logging.info("finish bundle")
    pass

  def process(self,
              element_pair,
              t=beam.DoFn.TimestampParam,
              count=beam.DoFn.StateParam(COUNT),
              last_ts=beam.DoFn.StateParam(LAST_TIMESTAMP),
              timer=beam.DoFn.TimerParam(TIMER)):
    local_count = count.read() or 0
    local_count += 1

    logging.info(f"get element {element_pair[1]}, count={local_count}")
    if local_count == 1:
      logging.info(f"set timer to {t+1}")
      timer.set(t+1)
      last_ts.write(t+1)

    if local_count == 3:
      logging.info("clear timer")
      ts = last_ts.read() or 0
      if t <= ts:
        timer.clear()

    count.write(local_count)

  @on_timer(TIMER)
  def timer_callback(self, t=beam.DoFn.TimestampParam):
    logging.error("Timer should not fire here")
    logging.info(f"timer callback start (timestamp={t})")
    yield (">>>timer fired")

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions