Skip to content

[Bug]: Reshuffle failed to use custom coders in non-global window scenario #33356

@shunping

Description

@shunping

What happened?

Code to reproduce:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window

class _Unpicklable(object):

  def __init__(self, value):
    self.value = value

  def __getstate__(self):
    raise NotImplementedError()

  def __setstate__(self, state):
    raise NotImplementedError()

class _UnpicklableCoder(beam.coders.Coder):

  def encode(self, value):
    return str(value.value).encode()

  def decode(self, encoded):
    return _Unpicklable(int(encoded.decode()))

  def to_type_hint(self):
    return _Unpicklable

  def is_deterministic(self):
    return True

beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)

def pipeline_fn(root):
  values = [_Unpicklable(i) for i in range(5)]
  #return root | beam.Create(values) | beam.Reshuffle() | beam.Map(lambda x: x.value*2)
  return root | beam.Create(values).with_output_types(_Unpicklable) \
      | beam.WindowInto(window.SlidingWindows(size=3, period=1)) \
      | beam.Reshuffle().with_output_types(_Unpicklable) \
      | beam.Map(lambda x: x.value*2)

options = PipelineOptions(runner='DirectRunner', direct_num_workers=1)
pipeline = beam.Pipeline(options=options)
out = pipeline_fn(pipeline)
result = pipeline.run()
result.wait_until_finish()

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions