Skip to content

[Bug] when streaming-read-overwrite is true, an exception will occur when the streaming read overwrite encounters retract. #4696

@liming30

Description

@liming30

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

1.0-SNAPSHOT

Compute Engine

flink-1.20

Minimal reproduce step

CREATE TABLE `test`(
    currency STRING,
     rate BIGINT,
     dt STRING ,
     PRIMARY KEY (currency) NOT ENFORCED) 
 WITH ( 'bucket' = '1','streaming-read-overwrite' = 'true' );

SELECT * FROM `test`; /** streaming mode */

INSERT INTO `test`  VALUES ('US Dollar', 102, '2022-01-01'),('Yen', 1, '2022-01-02'),('Euro', 119, '2022-01-02');

DELETE FROM `test` WHERE currency = 'Euro'

INSERT OVERWRITE `test`  VALUES ('US Dollar', 100, '2022-01-02'),('Yen', 10, '2022-01-01');

What doesn't meet your expectations?

Caused by: java.lang.IllegalStateException: In reverse reader, the value kind of records cannot be UPDATE_BEFORE or DELETE.
	at org.apache.paimon.operation.ReverseReader$1.next(ReverseReader.java:58)
	at org.apache.paimon.operation.ReverseReader$1.next(ReverseReader.java:50)
	at org.apache.paimon.table.source.ResetRowKindRecordIterator.nextKeyValue(ResetRowKindRecordIterator.java:50)
	at org.apache.paimon.table.source.ValueContentRowDataRecordIterator.next(ValueContentRowDataRecordIterator.java:36)
	at org.apache.paimon.table.source.ValueContentRowDataRecordIterator.next(ValueContentRowDataRecordIterator.java:28)
	at org.apache.paimon.flink.source.FileStoreSourceSplitReader$FileStoreRecordIterator.next(FileStoreSourceSplitReader.java:278)
	at org.apache.paimon.flink.source.FlinkRecordsWithSplitIds.emitRecord(FlinkRecordsWithSplitIds.java:126)
	at org.apache.paimon.flink.source.FileStoreSourceReader.lambda$new$1(FileStoreSourceReader.java:58)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions