Skip to content

[Bug]: apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithDisabledCaching:test_reshuffle is flaky #24178

@tvalentyn

Description

@tvalentyn

What happened?

Saw in #24106.

@damccorm I haven't seen this error before, could you please take a look whether this is a new error by trying to repro it in several runs?

Note that this test uses slow coders (as per stacktrace), so to repro we may need to install apache-beam from sources in an environment without cython.

If we can't repro and the error doesn't look obvious we can wait and see.

Thanks

______________ FnApiRunnerTestWithDisabledCaching.test_reshuffle _______________
[gw2] darwin -- Python 3.8.14 /Users/runner/work/beam/beam/sdks/python/target/.tox/py38/bin/python

self = <apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithDisabledCaching testMethod=test_reshuffle>

    def test_reshuffle(self):
      with self.create_pipeline() as p:
>       assert_that(
            p | beam.Create([1, 2, 3]) | beam.Reshuffle(), equal_to([1, 2, 3]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1052: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:600: in __exit__
    self.result = self.run()
apache_beam/pipeline.py:577: in run
    return self.runner.run_pipeline(self, self._options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:201: in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:222: in run_via_runner_api
    return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:453: in run_stages
    bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:781: in _execute_bundle
    self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1010: in _run_bundle
    result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:[137](https://github.com/apache/beam/actions/runs/3471900531/jobs/5802006567#step:6:138)8: in process_bundle
    self.bundle_context_manager.get_buffer(
apache_beam/runners/portability/fn_api_runner/execution.py:244: in append
    windowed_key_value = coder_impl.decode_from_stream(input_stream, True)
apache_beam/coders/coder_impl.py:1472: in decode_from_stream
    value = self._value_coder.decode_from_stream(in_stream, nested)
apache_beam/coders/coder_impl.py:1015: in decode_from_stream
    return self._construct_from_components([
apache_beam/coders/coder_impl.py:1016: in <listcomp>
    c.decode_from_stream(
apache_beam/coders/coder_impl.py:625: in decode_from_stream
    return in_stream.read_all(nested)
apache_beam/coders/slow_stream.py:140: in read_all
    return self.read(self.read_var_int64() if nested else self.size())
apache_beam/coders/slow_stream.py:151: in read_var_int64
    byte = self.read_byte()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.coders.slow_stream.InputStream object at 0x12ff52d00>

    def read_byte(self):
      # type: () -> int
      self.pos += 1
>     return self.data[self.pos - 1]
E     IndexError: index out of range

apache_beam/coders/slow_stream.py:[145](https://github.com/apache/beam/actions/runs/3471900531/jobs/5802006567#step:6:146): IndexError
----------------------------- Captured stderr call -----------------------------
Traceback (most recent call last):
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 648, in process_bundle
    self.bundle_processor_cache.release(instruction_id)
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 528, in release
    self.active_bundle_processors.pop(instruction_id))
KeyError: 'bundle_681'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
    response = task()
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
    return getattr(self, request_type)(
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 652, in process_bundle
    self.bundle_processor_cache.discard(instruction_id)
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 507, in discard
    processor = self.active_bundle_processors[instruction_id][1]
KeyError: 'bundle_681'

------------------------------ Captured log call -------------------------------
ERROR    apache_beam.runners.worker.sdk_worker:sdk_worker.py:291 Error processing instruction bundle_681. Original traceback is
Traceback (most recent call last):
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 648, in process_bundle
    self.bundle_processor_cache.release(instruction_id)
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 528, in release
    self.active_bundle_processors.pop(instruction_id))
KeyError: 'bundle_681'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
    response = task()
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
    return getattr(self, request_type)(
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 652, in process_bundle
    self.bundle_processor_cache.discard(instruction_id)
  File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 507, in discard
    processor = self.active_bundle_processors[instruction_id][1]
KeyError: 'bundle_681'

Issue Priority

Priority: 1

Issue Component

Component: sdk-py-core

Metadata

Metadata

Assignees

Labels

P1bugcoredone & doneIssue has been reviewed after it was closed for verification, followups, etc.flakepython

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions