Skip to content

Conversation

@arkadius
Copy link
Contributor

This PR extracts the concept of IcebergSinkBuilder interface from the #11219. This interface will be used to avoid code duplication in tests and to keep the interface of both implemenations of sinks similar for easier transition between them.

…f operations on FlinkSink and IcebergSink Builders
Copy link
Contributor

@rodmeneses rodmeneses left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, only minor comments. Thanks

@rodmeneses
Copy link
Contributor

@arkadius please take a look as the CI is broken

@arkadius
Copy link
Contributor Author

@arkadius please take a look as the CI is broken

Do you have an option to retry this build stage? It is rather impossible that extraction of an interface could cause a test to fail. I think that we have a flaky test (TestFlinkIcebergSinkRangeDistributionBucketing > testBucketNumberHigherThanWriterParallelismNotDivisible). I've just run it locally on my branch and it passed.

@rodmeneses
Copy link
Contributor

@arkadius please take a look as the CI is broken

Do you have an option to retry this build stage? It is rather impossible that extraction of an interface could cause a test to fail. I think that we have a flaky test (TestFlinkIcebergSinkRangeDistributionBucketing > testBucketNumberHigherThanWriterParallelismNotDivisible). I've just run it locally on my branch and it passed.

I do not, but @pvary or @stevenzwu should be able to do it

@arkadius
Copy link
Contributor Author

@arkadius please take a look as the CI is broken

Do you have an option to retry this build stage? It is rather impossible that extraction of an interface could cause a test to fail. I think that we have a flaky test (TestFlinkIcebergSinkRangeDistributionBucketing > testBucketNumberHigherThanWriterParallelismNotDivisible). I've just run it locally on my branch and it passed.

I do not, but @pvary or @stevenzwu should be able to do it

@pvary or @stevenzwu please retry the failing build stage

@pvary
Copy link
Contributor

pvary commented Oct 15, 2024

@stevenzwu: The failure for TestFlinkIcebergSinkRangeDistributionBucketing > testBucketNumberHigherThanWriterParallelismNotDivisible() should not be related. Do we know if it is still flaky?

@arkadius
Copy link
Contributor Author

@stevenzwu: The failure for TestFlinkIcebergSinkRangeDistributionBucketing > testBucketNumberHigherThanWriterParallelismNotDivisible() should not be related. Do we know if it is still flaky?

I've run this test in a loop, many times but without successfully reproducing it. But I have a pretty fast Ryzen so my machine may not be a good representative unit. I can reproduce a similar error in the testBucketNumberHigherThanWriterParallelismDivisible test with ROW_COUNT_PER_CHECKPOINT = 10000 but I don't know if this is related.

@stevenzwu
Copy link
Contributor

@pvary looks like the simple count based assertion is flaky.

      for (Snapshot snapshot : rangePartitionedCycles) {
        List<DataFile> addedDataFiles =
            Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
        assertThat(addedDataFiles)
            .hasSizeLessThanOrEqualTo(maxAddedDataFilesPerCheckpoint(parallelism));
      }
    }
  }

  /**
   * Traffic is not perfectly balanced across all buckets in the small sample size Range
   * distribution of the bucket id may cross subtask boundary. Hence the number of committed data
   * files per checkpoint maybe larger than writer parallelism or the number of buckets. But it
   * should not be more than the sum of those two. Without range distribution, the number of data
   * files per commit can be 4x of parallelism (as the number of buckets is 4).
   */
  private int maxAddedDataFilesPerCheckpoint(int parallelism) {
    return NUM_BUCKETS + parallelism;
  }

from the exception, it seems that there were 2 data files for the same bucket from the same subtask, which normally shouldn't happen. Not sure why it could happen. Note that file name is in the format of <subtaskId>-<attempId>-<uuid>-<fileCounter>, e.g. 00002-0-fdca1895-4698-4c56-8b40-2dbcec41352e-00018. I think we need to change the assertion to more sophisticated one - check range doesn't overlap.

        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=3/00002-0-fdca1895-4698-4c56-8b40-2dbcec41352e-00018.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=3}, record_count=15, file_size_in_bytes=1295, column_sizes=org.apache.iceberg.util.SerializableMap@1d0, value_counts=org.apache.iceberg.util.SerializableMap@27, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@11e0aa58, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@de87e38d, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=3/00002-0-fdca1895-4698-4c56-8b40-2dbcec41352e-00019.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=3}, record_count=2, file_size_in_bytes=1034, column_sizes=org.apache.iceberg.util.SerializableMap@cb, value_counts=org.apache.iceberg.util.SerializableMap@4, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@2ad8d1c4, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@b07e8329, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7}]
      [GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=2/00002-0-fdca1895-4698-4c56-8b40-2dbcec41352e-00017.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=2}, record_count=6, file_size_in_bytes=1119, column_sizes=org.apache.iceberg.util.SerializableMap@11a, value_counts=org.apache.iceberg.util.SerializableMap@10, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f3c0445f, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@3fd2d32a, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7},

        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=2/00001-0-c7968b11-96c1-4bbb-a63f-e56d47feeef3-00017.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=2}, record_count=10, file_size_in_bytes=1199, column_sizes=org.apache.iceberg.util.SerializableMap@170, value_counts=org.apache.iceberg.util.SerializableMap@1c, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@bf77a452, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@8c8e0666, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=2/00001-0-c7968b11-96c1-4bbb-a63f-e56d47feeef3-00019.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=2}, record_count=1, file_size_in_bytes=1021, column_sizes=org.apache.iceberg.util.SerializableMap@97, value_counts=org.apache.iceberg.util.SerializableMap@5, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@731c7f34, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@731c7f34, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=1/00001-0-c7968b11-96c1-4bbb-a63f-e56d47feeef3-00018.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=1}, record_count=8, file_size_in_bytes=1159, column_sizes=org.apache.iceberg.util.SerializableMap@148, value_counts=org.apache.iceberg.util.SerializableMap@1e, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@7ec5775a, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@1515b535, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7},

        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=1/00000-0-52445ce7-8fd1-4367-9433-5e8bc792886d-00017.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=1}, record_count=17, file_size_in_bytes=1336, column_sizes=org.apache.iceberg.util.SerializableMap@1f9, value_counts=org.apache.iceberg.util.SerializableMap@35, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@6739cb65, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@19de48bf, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-5157340844762714796/3664f021-0e50-4a7a-8048-93cdf3c3e236/default/t/data/ts_hour=2024-10-14-16/uuid_bucket=0/00000-0-52445ce7-8fd1-4367-9433-5e8bc792886d-00018.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=480256, uuid_bucket=0}, record_count=18, file_size_in_bytes=1353, column_sizes=org.apache.iceberg.util.SerializableMap@20e, value_counts=org.apache.iceberg.util.SerializableMap@34, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@95e6e88b, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@80a19270, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=7, file_sequence_number=7},

@rodmeneses
Copy link
Contributor

HI @stevenzwu @pvary what is needed so we can merge this change ? I understand there's a flaky test, but that's already in main. Could we move forward with this PR? I need it for my RANGE distribution PR for IcebergSink.
Thanks a lot for your time.
cc: @arkadius

@stevenzwu
Copy link
Contributor

@pvary I have created a PR to disable the flaky test for now. #11347

@pvary
Copy link
Contributor

pvary commented Oct 18, 2024

@arkadius: The flaky test will be handled. In the meantime, could you please retrigger the tests, so we can have a green run?

@arkadius
Copy link
Contributor Author

@arkadius: The flaky test will be handled. In the meantime, could you please retrigger the tests, so we can have a green run?

It looks like I have no right to do it - this button isn't visible in the place where I see it in my own repos. @pvary can you press it for me?

@pvary pvary merged commit d4f0d7e into apache:main Oct 21, 2024
@pvary
Copy link
Contributor

pvary commented Oct 21, 2024

Thanks for the patience @arkadius. Merged the PR. Thanks for the review @rodmeneses

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
…f operations on FlinkSink and IcebergSink Builders (apache#11305)
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
…f operations on FlinkSink and IcebergSink Builders (apache#11305)

(cherry picked from commit d4f0d7e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants