Skip to content

[prism] Java and Python SplittableDoFnTests - invalid stream header (likely coders) #32004

@lostluck

Description

@lostluck

Six Java Splittable DoFn tests have stream corrupted coder issues, and Python has three SDF tests that are failing with the same,

First, this means that Java and Python are doing something different than the Go SDK (which doesn't run into this issue) WRT to dealing with coders for SplittableDoFns.

It's a side change to fix the Go SDK's implementation, since Go's SDF does work on other portable runners. This just means Prism is doing something that works for Go, but not the other SDKs. The goal is to make them work for both.

Python error:

ValueError: Error decoding input stream with coder WindowedValueCoder[TupleCoder[TupleCoder[StrUtf8Coder, TupleCoder[LengthPrefixCoder[FastPrimitivesCoder], LengthPrefixCoder[FastPrimitivesCoder]]], FloatCoder]]

Sample Java Error:

java.lang.RuntimeException: The Runner experienced the following error during execution:
bundle inst005 stage-005 failed:java.io.StreamCorruptedException: invalid stream header: 05737200
	at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938)
	at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396)
	at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:199)
	at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:57)
	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
	at org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:83)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:78)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:83)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558)
	at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172)
	at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:136)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:550)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
	at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

	at org.apache.beam.runners.portability.JobServicePipelineResult.propagateErrors(JobServicePipelineResult.java:176)
	at org.apache.beam.runners.portability.JobServicePipelineResult.waitUntilFinish(JobServicePipelineResult.java:117)
	at org.apache.beam.runners.portability.testing.TestUniversalRunner.run(TestUniversalRunner.java:83)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:404)
	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:343)
	at org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpoint(SplittableDoFnTest.java:326)
	at org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointBounded(SplittableDoFnTest.java:310)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:750)

SplittableDoFns require than an SDK is able to sub element split while a bundle is in process. In that situation, the SDK needs to return encoded windowed values of the SDF processSizedElementAndRestriction type back to the runner over the FnAPI.

The Go SDK defers that encoding to the DataSource code, and uses the root input coder:

func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) {

It's likely that Java and Python are using the coder on the input PCollection to the transform itself, rather than the one specified on the DataSource:

https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L401

This is likely the same for Python.

In particular, the issue is likely due to not replacing / wrapping all the coders properly, in particular on SDF components. In particular if a length prefix ends up being needed. This would lead to corrupt decodes like we've been seeing, since length prefix coders are result type transparent.

This insight is brought to you by my hobby re-implementation of the Go SDK with generics, and having just reached trying to implement splittableDoFns, which just ran into the same issue.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions