Skip to content

Conversation

@wypoon
Copy link
Contributor

@wypoon wypoon commented Oct 2, 2024

This fixes #11221.

There is a bug in VectorizedDictionaryEncodedParquetValuesReader.BaseDictEncodedReader::nextBatch where nextVal of the BaseDictEncodedReader subclass is called with the incorrect index for certain subclasses (in particular, for FixedSizeBinaryDictEncodedReader), leading to the value being set at the incorrect index in the FieldVector that is used to hold the values. E.g., for a Decimal column that requires 16 bytes to store, the values are stored in 16-byte fixed length byte arrays and the typewidth is 16. FixedSizeBinaryDictEncodedReader::nextVal is called with index 0, 16, 32, 48, etc instead of 0, 1, 2, 3, etc.
The fix is to not premultiply the index by the typewidth before calling nextVal, and instead, in each nextVal method, to account for the typewidth as appropriate.

A test is included that fails without the fix and passes with it.

@github-actions github-actions bot added the arrow label Oct 2, 2024
@github-actions github-actions bot added the spark label Oct 5, 2024
@wypoon wypoon changed the title [DRAFT] Fix indexing in dictionary encoded Parquet readers Spark: Fix indexing in dictionary encoded Parquet readers Oct 5, 2024
@wypoon wypoon changed the title Spark: Fix indexing in dictionary encoded Parquet readers Arrow: Fix indexing in dictionary encoded Parquet readers Oct 5, 2024
@wypoon wypoon changed the title Arrow: Fix indexing in dictionary encoded Parquet readers Arrow: Fix indexing in Parquet dictionary encoded values readers Oct 5, 2024
@wypoon
Copy link
Contributor Author

wypoon commented Oct 5, 2024

The bug occurs when reading a Parquet column chunk with multiple pages where some but not all of the pages are dictionary encoded. In particular, Impala tries to use dictionary encoding where possible when writing Parquet data, and when the number of values in the dictionary exceeds 40,000, it then switches to plain encoding. The data file uploaded in #11221 is such a data file written by Impala.
I wanted to write data programmatically from the test, and I tried to get the Java (parquet-mr) Parquet writer to write such data, but I was not able to. With the v1 writer, I was not able to get it to write dictionary encoded data at all for decimal values stored as fixed length byte arrays. With the v2 writer, I was able to get it to write some pages with RLE dictionary encoding and some not dictionary encoded, but those other pages use a delta encoding, which Iceberg does not support (and thus I cannot use it to reproduce the bug).
Eventually, I used pyarrow's C++ writer to write a small data file (less than 4 KB) with 2 pages, each with 200 rows, one RLE dictionary encoded and one plain encoded. I think this is acceptably small to be checked in.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 5, 2024

CI failed with

Execution failed for task ':iceberg-flink:iceberg-flink-1.20:compileJmhJava'.
> Java heap space

I have seen such Java heap space failure before in the CI. It is unrelated to my change.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 5, 2024

@nastra @rymurr the bug was introduced in 2842f0b (#2746).
Assuming the code was correct before that refactoring, an analysis of the refactoring will show that my change is correct.
Before that refactoring, there was a lot of duplicated code that began

    int left = numValuesToRead;
    int idx = startOffset;
    while (left > 0) {
      if (this.currentCount == 0) {
        this.readNextGroup();
      }
      int num = Math.min(left, this.currentCount);

and then, in many (but not all) cases, calls of the form vector.getDataBuffer().setXXX(idx * typeWidth, ...) are made.
For those, we want to use idx * typeWidth. However, there are cases where we want to use idx in setting the value in the vector. E.g., in readBatchOfDictionaryEncodedFixedSizeBinary, we do

           ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);

What the refactoring did was to use index = idx * typeWidth and call overridden nextVal methods with that index, so that in this case, we end up doing ((FixedSizeBinaryVector) vector).set(index, vectorBytes);.

My change is to not premultiply idx by typeWidth before making the nextVal calls and in the nextVal calls, to account for the typeWidth (which is also passed to nextVal anyway).

cc @rdblue

@nastra nastra self-requested a review October 7, 2024 06:59

public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVectorizedReads {

protected static SparkSession spark = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test this without having to start up spark? Also it seems like IntBackedDecimalDictEncodedReader/ LongBackedDecimalDictEncodedReader / VarWidthBinaryDictEncodedReader / FixedLengthDecimalDictEncodedReader have been affected, so maybe there should be tests for these too?

Copy link
Contributor Author

@wypoon wypoon Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Spark DataFrameReader to read the Parquet file is the easiest way for the test verification.

IIUC, if a decimal can be represented by 4 bytes, we use IntBackedDecimalDictEncodedReader and if it cannot be represented by 4 bytes but can be by 8 bytes, we use LongBackedDecimalDictEncodedReader, but can you please tell me what types use FixedLengthDecimalDictEncodedReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, afaics, VectorizedParquetDefinitionLevelReader.FixedLengthDecimalReader is never used, and thus neither is VectorizedDictionaryEncodedParquetValuesReader.FixedLengthDecimalDictEncodedReader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, even VectorizedParquetDefinitionLevelReader.IntBackedDecimalReader and VectorizedParquetDefinitionLevelReader.LongBackedDecimalReader are never used, afait.
In VectorizedParquetDefinitionLevelReader, the methods fixedLengthDecimalReader(), intBackedDecimalReader() and longBackedDecimalReader() are never called.

I see now that VectorizedPageIterator.IntBackedDecimalPageReader, VectorizedPageIterator.LongBackedDecimalPageReader, VectorizedPageIterator::intBackedDecimalPageReader(), and VectorizedPageIterator::longBackedDecimalPageReader() were deprecated in #3249 and subsequently all removed in Iceberg 1.4.0. So we have some dead code now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we remove the dead code then instead of fixing it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test this without having to add a Parquet file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have explained in a comment why I used a Parquet file. If you can tell me how to generate the Parquet file with the required conditions using the Java Parquet writer, I can generate it from the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know the parquet-java code, but looking around it a bit, I see, e.g., https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java#L79, that suggests that the v1 writer does not support dictionary encoding for fixed_len_byte_array. This accords with my empirical experience. I already explained the issue with the v2 writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wypoon you need to use the one from iceberg-parquet as Iceberg has its own Parquet reader/writer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra the code in iceberg-parquet still has to use the parquet-java code underneath to do the actual writing, and when using v1, dictionary encoding does not appear to be supported for fixed_len_byte_array.
When I use

    Schema schema =
        new Schema(Types.NestedField.required(1, "dec_38_0", Types.DecimalType.of(38, 0)));
    File parquetFile = File.createTempFile("junit", null, temp.toFile());
    assertThat(parquetFile.delete()).as("Delete should succeed").isTrue();

    Iterable<GenericData.Record> records = RandomData.generate(schema, 500, 0L, 0.0f);
    try (FileAppender<GenericData.Record> writer =
        Parquet.write(Files.localOutput(parquetFile))
            .schema(schema)
            .set(PARQUET_DICT_SIZE_BYTES, "2048")
            .set(PARQUET_PAGE_ROW_LIMIT, "100")
            .build()) {
      writer.addAll(records);
    }

that writes a Parquet file with

Column: dec_38_0
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-0    data  G _  100     16.00 B    1.563 kB                    
  0-1    data  G _  100     16.00 B    1.563 kB                    
  0-2    data  G _  100     16.00 B    1.563 kB                    
  0-3    data  G _  100     16.00 B    1.563 kB                    
  0-4    data  G _  100     16.00 B    1.563 kB                    

while if I use v2:

    try (FileAppender<GenericData.Record> writer =
        Parquet.write(Files.localOutput(parquetFile))
            .schema(schema)
            .set(PARQUET_DICT_SIZE_BYTES, "2048")
            .set(PARQUET_PAGE_ROW_LIMIT, "100")
            .writerVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
            .build()) {
      writer.addAll(records);
    }

that writes a Parquet file with

Column: dec_38_0
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  G _  96      16.00 B    1.500 kB  
  0-1    data  _ R  100     0.93 B     93 B       100      0       
  0-2    data  _ D  100     15.56 B    1.520 kB   100      0       
  0-3    data  _ D  100     14.76 B    1.441 kB   100      0       
  0-4    data  _ D  100     15.47 B    1.511 kB   100      0       
  0-5    data  _ D  100     15.06 B    1.471 kB   100      0       

As you can see, I am using the APIs in iceberg-parquet, generating the same data in both cases, using the same dictionary size and page row limit; in the v1 case, plain encoding is used for all the pages, while in the v2 case, one page is written with dictionary encoding (unfortunately the other pages are written with DELTA_BYTE_ARRAY encoding).

Copy link
Contributor Author

@wypoon wypoon Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried

    try (FileAppender<GenericData.Record> writer =
        Parquet.write(Files.localOutput(parquetFile))
            .schema(schema)
            .createWriterFunc(ParquetAvroWriter::buildWriter)
            .set(PARQUET_DICT_SIZE_BYTES, "2048")
            .set(PARQUET_PAGE_ROW_LIMIT, "100")
            .build()) {
      writer.addAll(records);
    }

and debugged what happens in writer.
The following chain is called to get the ValuesWriter for the decimal column:
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java#L84
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java#L167
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java#L52
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java#L55
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java#L80
which is the code I cited previously:

  private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) {
    // dictionary encoding was not enabled in PARQUET 1.0
    return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
  }

I am using the iceberg-parquet APIs to create a writer, but eventually it calls parquet-java code.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the changes look correct to me, but it would be great if we could generally improve testing coverage for the affected readers

@wypoon
Copy link
Contributor Author

wypoon commented Oct 7, 2024

@nastra thank you for reviewing.

I have added a test that exercises VectorizedDictionaryEncodedParquetValuesReader.VarWidthBinaryDictEncodedReader::nextVal. This is a test that actually passes both before and after the fix, for this is a case where typeWidth == -1.

The int backed, long backed, and fixed length decimal readers are all dead (never called) code. I'll put up a separate PR to remove the dead code.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 8, 2024

@nastra please see #11276 for the dead code removal.

Comment on lines -63 to -65
if (typeWidth == -1) {
index = idx;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens in the case of var width binary, so the call to VarWidthBinaryDictEncodedReader::nextVal works correctly in that case.
Of course, it still works correctly with the change.

}
// After this, parquetFile contains one column chunk of binary data in five pages,
// the first two RLE dictionary encoded, and the remaining three plain encoded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

.build()) {
writer.addAll(records);
}
// After this, parquetFile contains one column chunk of binary data in five pages,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add a newline before this

Copy link
Contributor Author

@wypoon wypoon Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added blank line before and removed blank line after.
The reason I did it the way I did was that the comment applies to the code before the comment rather than the code following it (which is more usual).
I tweaked the text of the comment slightly so that "After this" is not read as applying to the code following the comment.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @wypoon for finding and fixing this.

@amogh-jahagirdar or @RussellSpitzer could you also take a look please?

@amogh-jahagirdar amogh-jahagirdar self-requested a review October 13, 2024 15:16
@wypoon
Copy link
Contributor Author

wypoon commented Oct 16, 2024

@amogh-jahagirdar I see you that you plan to review; can you please do so?

@amogh-jahagirdar
Copy link
Contributor

@wypoon Sure, I will take a look tomorrow!

Comment on lines +157 to +162
Path path =
Paths.get(
getClass()
.getClassLoader()
.getResource("decimal_dict_and_plain_encoding.parquet")
.toURI());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we can generate this parquet file in the test locally via existing writers instead of uploading this resource?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this thread #11247 (comment) let me go through it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amogh-jahagirdar I have already explained this at length and convinced @nastra.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK so if I read through those threads, the issue is that only a Parquet V2 writer can produce the file you want but we don't actually support reading V2 Parquet files in our vectorized readers in Iceberg. I know Spark supports this in its readers now so we should probably fix up our read code as well.

I think it's fine to keep this file as a temporary test situation but long term we should fix our V2 writers/readers so that we can have this all self contained.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the summary @RussellSpitzer , I think that's right. I'm all good with including this test file as a temporary solution

@wypoon
Copy link
Contributor Author

wypoon commented Oct 21, 2024

@amogh-jahagirdar to recap:

This change fixes a bug introduced by a refactoring (see comment). An analysis of the refactor will show the correctness of this change. This is where your second pair of eyes would be helpful.

The fix affects the readers in VectorizedDictionaryEncodedParquetValuesReader. On the face of it, (with this change) the logic appears to have changed for a number of readers: FixedLengthDecimalDictEncodedReader, VarWidthBinaryDictEncodedReader, IntBackedDecimalDictEncodedReader, LongBackedDecimalDictEncodedReader, and FixedSizeBinaryDictEncodedReader.

FixedLengthDecimalDictEncodedReader, IntBackedDecimalDictEncodedReader and LongBackedDecimalDictEncodedReader are dead code and are removed in #11276. That leaves VarWidthBinaryDictEncodedReader and FixedSizeBinaryDictEncodedReader. I added tests for these two. In actuality, the logic has not changed for VarWidthBinaryDictEncodedReader because this is the case covered before the change by

          int index = idx * typeWidth;
          if (typeWidth == -1) {
            index = idx;
          }

The readers in VectorizedDictionaryEncodedParquetValuesReader are only called when dictionaryDecodeMode in VectorizedPageIterator is EAGER (which means decode the dictionary encoded values and set the decoded values in the Arrow vector used to store values for creating the Spark ColumnarBatch), and the DictionaryDecodeMode is EAGER if and only if some page in the column chunk is dictionary encoded but not all pages are. I have explained in this thread why I am not able to write such a Parquet data file programmatically from Java for the fixed length byte array type.

@RussellSpitzer RussellSpitzer added this to the Iceberg 1.7.0 milestone Oct 21, 2024
Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks right to me, as I noted I think there are 2 important follow ups

  1. We need to fully support all parquet v2 written files #11371
  2. We need to drop the dead code from our vectorized read path #11370

@RussellSpitzer RussellSpitzer merged commit d0a7ff9 into apache:main Oct 21, 2024
@wypoon
Copy link
Contributor Author

wypoon commented Oct 21, 2024

@RussellSpitzer thanks for merging this.

We need to drop the dead code from our vectorized read path #11370

FYI, this is already done in #11276 (I mentioned it in my recap).

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Spark vectorized read of Parquet produces incorrect result for a decimal column

4 participants