Skip to content

MergeInsert produces duplicated rows #4585

@jackye1995

Description

@jackye1995

When concurrent processes both write new rows of the same primary key through merge-insert, currently we do not have a way to detect it as conflict because it does not touch any existing fragments.

I think this is "technically" the right behavior under snapshot isolation, given we do not guarantee exactly one row per on column. But from the perspective of the people using merge insert as a way to ensure one row per primary key, this basically breaks the assumption.

I am able to verify with this test:

    #[tokio::test]
    async fn test_concurrent_merge_insert_duplicate_new_rows() {
        // This test verifies that when two writers both do merge insert to add a new row
        // with the same key, the result dataset will have duplicated rows because
        // new rows don't have addresses yet so conflict detection doesn't catch this.
        
        let test_dir = tempdir().unwrap();
        let test_uri = test_dir.path().to_str().unwrap();
        
        // Create initial dataset with some existing data
        let schema = Arc::new(Schema::new(vec![
            Field::new("key", DataType::Int32, false),
            Field::new("value", DataType::Utf8, false),
            Field::new("writer", DataType::Utf8, false),
        ]));
        
        let initial_batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(Int32Array::from(vec![1, 2, 3])),
                Arc::new(StringArray::from(vec!["value1", "value2", "value3"])),
                Arc::new(StringArray::from(vec!["initial", "initial", "initial"])),
            ],
        )
        .unwrap();
        
        let reader = Box::new(RecordBatchIterator::new(
            vec![Ok(initial_batch)],
            schema.clone(),
        ));
        
        let dataset = Dataset::write(reader, test_uri, Default::default())
            .await
            .unwrap();
        let initial_version = dataset.version().version;
        let dataset = Arc::new(dataset);
        
        // First writer: operates on current dataset version
        let writer1_batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(Int32Array::from(vec![4])),
                Arc::new(StringArray::from(vec!["value_from_writer1"])),
                Arc::new(StringArray::from(vec!["writer1"])),
            ],
        )
        .unwrap();
        
        let job1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["key".to_string()])
            .unwrap()
            .when_matched(WhenMatched::UpdateAll)
            .when_not_matched(WhenNotMatched::InsertAll)
            .try_build()
            .unwrap();
        
        let reader1 = Box::new(RecordBatchIterator::new(
            vec![Ok(writer1_batch)],
            schema.clone(),
        ));
        let stream1 = reader_to_stream(reader1);
        
        // Execute first merge insert
        let (dataset_after_writer1, stats1) = job1.execute(stream1).await.unwrap();
        assert_eq!(stats1.num_inserted_rows, 1, "Writer 1 should insert 1 row");
        
        // Second writer: simulates a concurrent writer by checking out the initial version
        // This mimics the scenario where writer2 started before writer1 committed
        let dataset_for_writer2 = dataset.checkout_version(initial_version).await.unwrap();
        
        let writer2_batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(Int32Array::from(vec![4])),
                Arc::new(StringArray::from(vec!["value_from_writer2"])),
                Arc::new(StringArray::from(vec!["writer2"])),
            ],
        )
        .unwrap();
        
        let job2 = MergeInsertBuilder::try_new(Arc::new(dataset_for_writer2), vec!["key".to_string()])
            .unwrap()
            .when_matched(WhenMatched::UpdateAll)
            .when_not_matched(WhenNotMatched::InsertAll)
            .try_build()
            .unwrap();
        
        let reader2 = Box::new(RecordBatchIterator::new(
            vec![Ok(writer2_batch)],
            schema.clone(),
        ));
        let stream2 = reader_to_stream(reader2);
        
        // Execute second merge insert - this should succeed despite writer1 already adding key=4
        // because new rows don't have addresses for conflict detection
        let result2 = job2.execute(stream2).await;
        
        match result2 {
            Ok((dataset_after_writer2, stats2)) => {
                assert_eq!(stats2.num_inserted_rows, 1, "Writer 2 should also insert 1 row");
                
                // Use the most recent dataset version
                let final_dataset = if dataset_after_writer1.version().version > dataset_after_writer2.version().version {
                    dataset_after_writer1
                } else {
                    dataset_after_writer2
                };
                
                // Check if we have duplicate rows with key=4
                let result = final_dataset
                    .scan()
                    .filter("key = 4")
                    .unwrap()
                    .try_into_batch()
                    .await
                    .unwrap();
                
                let num_rows = result.num_rows();
                
                // This assertion demonstrates the bug: we expect 2 duplicate rows
                // because both writers succeeded in inserting the same key
                assert_eq!(
                    num_rows, 2,
                    "Expected 2 duplicate rows with key=4 due to concurrent insert of new rows \
                     (conflict detection doesn't work for new rows without addresses)"
                );
                
                // Verify both values are present
                let writer_col = result
                    .column_by_name("writer")
                    .unwrap()
                    .as_any()
                    .downcast_ref::<StringArray>()
                    .unwrap();
                
                let writers: Vec<&str> = (0..num_rows)
                    .map(|i| writer_col.value(i))
                    .collect();
                
                assert!(
                    writers.contains(&"writer1") && writers.contains(&"writer2"),
                    "Both writers should have successfully inserted their rows"
                );
            }
            Err(e) => {
                // If writer2 fails due to conflict detection, that would be the desired behavior
                // but currently this doesn't happen for new rows
                panic!(
                    "Writer 2 failed with error: {:?} - this shouldn't happen since new rows \
                     don't have addresses for conflict detection",
                    e
                );
            }
        }
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions