Skip to content

[Bug]: UnboundedSourceAsSDFWrapperFn caches readers per DoFn instance #32968

@sjvanrossum

Description

@sjvanrossum

What happened?

Stumbled on the following in UnboundedSourceAsSDFWrapperFn while working on #32928:

private @Nullable Cache<Object, UnboundedReader<OutputT>> cachedReaders;
...
@Setup
public void setUp() throws Exception {
  restrictionCoder = restrictionCoder();
  cachedReaders =
      CacheBuilder.newBuilder()
          .expireAfterWrite(1, TimeUnit.MINUTES)
          .maximumSize(100)
...

This initializes a cache per bundle processor (which may remain be cached for up to 1 minute), which are not shared among splits. Decoupling the reader cache from the DoFn's lifetime may improve reuse of readers across bundle processors.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions