Skip to content

Conversation

@chenjunjiedada
Copy link
Collaborator

This fixes #715.

@chenjunjiedada chenjunjiedada force-pushed the namemapping branch 2 times, most recently from dd7fb25 to b3d2f4e Compare March 8, 2020 12:50
if (filter != null) {
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
NameMapping nameMapping = MappingUtil.create(ParquetSchemaUtil.convert(fileSchema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the nameMapping should be taken as a parameter in Parquet as part of the builder and passed in from there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if the nameMapping is not passed in and file schema does not have ids. We can infer the nameMapping, similar to what we've done here. We had done this for Avro #580

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense if we build name mapping from the expected schema, while here I need to build it from file schema so that we can use the metrics correctly.

ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
ParquetSchemaUtil.pruneColumnsByName(fileSchema, expectedSchema);
Copy link
Contributor

@rdsr rdsr Mar 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Avro we assigned ids to fileSchema based on the name mapping and projected ids, which we derived from expected schema. [Set<Integer> projectedIds = TypeUtil.getProjectedIds(expectedSchema)] . This was piggy backed on column pruning code for Avro see - AvroSchemaUtil.pruneColumns

I'm unsure whether the same general approach can be applied here as well. @rdblue Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a reversed way. I build name mapping based on fileSchema, and then project ids for expected schema according to names. The reason is the metrics are built from parquet footer so we have to bind the expression to fileSchema instead of expected schema in ParquetMetricsRowGroupFilter and ParquetDictionaryRowGroupFilter.

@chenjunjiedada
Copy link
Collaborator Author

@rdsr, I think again about your comments, that should be doable. I will submit an update later.

@chenjunjiedada
Copy link
Collaborator Author

@rdsr, ready for another review.

@chenjunjiedada
Copy link
Collaborator Author

@rdsr, any new comments? The new commit uses the way you suggest.

@rdblue, could you please help to take a look at your convenience as well?

@rdsr
Copy link
Contributor

rdsr commented Mar 13, 2020

@chenjunjiedada Been a little busy lately. I'll try to find time to look into it next week. Hope that's ok.

@chenjunjiedada
Copy link
Collaborator Author

@rdsr no problem, take you time:)

struct = schema.asStruct();
}

this.expr = Binder.bind(struct, Expressions.rewriteNot(expr), caseSensitive);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class doesn't need to know about the name mapping. The mapping should be used to add IDs to the file schema so that most classes don't need to add specific support.

Look at how the other fallback happens using ParquetSchemaUtil.addFallbackIds. I think this should mimic that fallback in all cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added unit test failed if we only use the ID fallback strategy. How about we keep both the original logic and the name mapping logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep both ID assignment strategies, but we don't need to change these filters. Just pass the file schema with assigned IDs into this like before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to pass typeWithIds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the name mapping still passed in here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The evaluation build metadata mapping with BlockMetadata read from the footer which may contain columns that not in the file schema passed in, so I use name mapping here to filter out them. The new commit changed to use a try/catch block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Parquet schema will match exactly because the IDs were added to the file schema. Adding IDs doesn't change the names, so meta.getPath() will be the same one used by fileSchema.getType(...).


public ParquetMetricsRowGroupFilter withNameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file also should not be modified.

MessageType projection = ParquetSchemaUtil.hasIds(fileSchema) ?
ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
ParquetSchemaUtil.pruneColumnsByName(fileSchema, expectedSchema, MappingUtil.create(expectedSchema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not replace the existing fallback strategy. If the mapping is present, it should be used. Otherwise, the existing position-based fallback strategy should be used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

private final boolean reuseContainers;
@Nullable
private final Integer batchSize;
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this annotation. Looks like we should have removed it in the other PR as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.


boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);
this.nameMapping = nameMapping == null ? MappingUtil.create(expectedSchema) : nameMapping;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the right place to infer a name mapping. This class should apply the name mapping if it exists, and use a position-based fallback otherwise.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

statsFilter.shouldRead(typeWithIds, rowGroup) &&
dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
statsFilter.shouldRead(fileSchema, rowGroup) &&
dictFilter.shouldRead(fileSchema, rowGroup, reader.getDictionaryReader(rowGroup)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the purpose of this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this back to typeWithIds

@rdblue
Copy link
Contributor

rdblue commented Apr 7, 2020

Thanks for working on this, @chenjunjiedada!

A few high-level things that I think need to be fixed:

  • This should not replace the existing fallback that assigns IDs based on position. If the name mapping is configured, that should be used instead. Both are options. Feel free to do this by adding a name mapping to the existing ParquetSchemaUtil methods. We can do the fallback choice there.
  • This also needs to update ParquetSchemaUtil.hasIds so that any ID causes it to return true, instead of any missing ID. This should be okay since files either have IDs or do not, so it is not a behavior change.
  • As few classes should change as possible. Fallback ID assignment already works with the row group filters, so I don't think that we need to alter them. (Unless I'm missing some test case where it's failing!)

@chenjunjiedada
Copy link
Collaborator Author

@rdblue, I addressed some of the comments. The remaining needs a double check.

The ID fallback way cannot work when the iceberg table schema is different from the parquet file schema as the original issue mentioned. Also, the use case in the unit test (import partial data of existing parquet files as new iceberg table) would lead to the following exception which is related to metric row group filters. Therefore I removed the ID fallback way. I 'm not very sure that is the common use case, but if the name mapping way could handle more cases correctly, then it would be better to keep that way unless there is any case that it cannot handle. Does that make sense to you?

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.CharSequence
	at org.apache.iceberg.types.Comparators$CharSeqComparator.compare(Comparators.java:180)
	at org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:146)
	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.lt(ParquetMetricsRowGroupFilter.java:202)
	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.lt(ParquetMetricsRowGroupFilter.java:85)
	at org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:122)
	at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:321)
	at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:336)
	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:108)
	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.access$000(ParquetMetricsRowGroupFilter.java:85)
	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:79)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:96)
	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:64)
	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:74)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:155)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:117)
	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

}

private int getId(org.apache.parquet.schema.Type type) {
protected int getId(org.apache.parquet.schema.Type type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is changed so that we can override getid and use that to detect whether a schema has ID or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to make a new visitor for this, rather than reusing MessageTypeToType. That maintains independence between classes that don't have the same purpose. It was easy before to use MessageTypeToType without modification, but there's no need to do that if it is no longer easy.

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will add a new one.

protected int getId(org.apache.parquet.schema.Type type) {
org.apache.parquet.schema.Type.ID id = type.getId();
if (id != null) {
throw new IllegalStateException("at least one ID exists");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine if not everything has an ID. Columns without IDs will be ignored and should not cause a read to fail.

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Apr 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I misunderstand the comment? I thought you meant file schema either has all columns with IDs or without ID. Do you mean that a file schema can have some of the columns with IDs and some without?

This also needs to update ParquetSchemaUtil.hasIds so that any ID causes it to return true, instead of any missing ID. This should be okay since files either have IDs or do not, so it is not a behavior change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we expect schemas to either have all IDs or none, but we don't need to fail if that assumption is violated. If we detect any ID in the schema, then we should trust the file's IDs. If there are no IDs, then we should infer them using a name mapping or position-based fallback, whatever was configured.

@rdblue
Copy link
Contributor

rdblue commented Apr 7, 2020

The remaining needs a double check.

I don't see replies to my comments. Can you reply so I know what you want me to double-check and why?

The ID fallback way cannot work when the iceberg table schema is different from the parquet file schema as the original issue mentioned

Non-Iceberg table schemas are maintained either by position (like CSV) or by name (like JSON). The fallback strategy needs to match how schemas have been maintained for a table. We implemented position-based fallback because before Iceberg we maintained by position so we could rename fields and add fields, but not drop or reorder. Support for name mapping supports the name-based approach for maintaining a schema. Both approaches are valid.

You're right that some files are incompatible with position-based, but that just shows that you have to maintain a schema consistently. You can't use both position-based and name-based in a table.

@chenjunjiedada
Copy link
Collaborator Author

@rdblue , thanks for the detail, I meant the comments about the ID fallback way. I didn't take the CSV and JSON format into consideration. Now I understood the idea and will add ID fallback strategy back.

@rdblue
Copy link
Contributor

rdblue commented Apr 8, 2020

Great, thank you @chenjunjiedada!

@chenjunjiedada chenjunjiedada force-pushed the namemapping branch 3 times, most recently from 7af5eda to 6190877 Compare April 8, 2020 16:53
@chenjunjiedada
Copy link
Collaborator Author

@rdblue , I saw your comments while committing. You might want to see the new commit (with some rebasing commits.). I will address your lastest comments tomorrow.

}

public ReadBuilder nameMapping(NameMapping newNameMapping) {
public ReadBuilder withNameMapping(NameMapping newNameMapping) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdsr, just want to confirm that you're okay with this change? If it is going to cause you problems because you've already deployed it, we can go with the original.

@rdblue
Copy link
Contributor

rdblue commented Apr 9, 2020

@chenjunjiedada, it looks like lots of places have been changed to use the original file schema, rather than the file schema with fallback IDs added. I don't think that is correct.

Can you revert those changes, or help me understand why they are needed?

@chenjunjiedada
Copy link
Collaborator Author

@rdblue , I addressed the latest two comments about using the passing name mapping and allow parquet type without IDs. Would you please take another look? Thanks for your patience!

@chenjunjiedada
Copy link
Collaborator Author

@rdblue , This is ready for another review. Thanks

@chenjunjiedada
Copy link
Collaborator Author

Merged again. @rdblue, would you please take another look?

Function<MessageType, VectorizedReader<?>> readerFunc,
Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
InputFile input, Schema expectedSchema, ParquetReadOptions options, Function<MessageType,
VectorizedReader<?>> readerFunc, NameMapping nameMapping, Expression filter, boolean reuseContainers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't split types across lines. I think moving Function<MessageType to the previous line should be reverted.

.asStructType().fields());
NameMapping nameMapping = MappingUtil.create(schema);
MessageType messageType = ParquetSchemaUtil.convert(schema, "complex_schema");
MessageType typeWithIdsFromNameMapping = ParquetSchemaUtil.applyNameMapping(messageType, nameMapping);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a problem here: applyNameMapping could be a noop and this would still work because the conversion to Parquet and back preserves IDs. For Avro, there is a test utility to remove IDs from the schema so that we can test adding them back with the name mapping. I think it would make sense to take the Avro name mapping tests and adapt them for Parquet as well. That can be done in a follow-up, but this test does need to be fixed before committing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me, let me file an issue to record this.

hasChange = true;
Integer fieldId = getId(originalField);
if (fieldId != null) {
if (selectedIds.contains(fieldId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not if (fieldId != null && selectedIds.contains(fieldId))?

The else case is used when a sub-field is projected by ID. So the question is whether a sub-field can be projected if its parents aren't mapped. I think we should allow it because it would be confusing to have a value mapped, but still get nulls because a parent is not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me. Updated.

filteredFields.add(originalField);
hasChange = true;
Integer fieldId = getId(originalField);
if (fieldId != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same logic as above applies here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in the rest of the updates for this file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks correct to me.

.reuseContainers();

iter = nameMapping != null ?
builder.withNameMapping(NameMappingParser.fromJson(nameMapping)).build() : builder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep the name mapping and building separate. There's no need to mix these together.

if (nameMapping != null) {
  builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}

iter = builder.build();

}

@Test
public void testImportWithNameMapping() throws Exception {
Copy link
Contributor

@rdblue rdblue Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for name mapping with the vectorized read path? Just set the property on the table when you add the mapping and it should take that path.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, also updated the vectorized reader builder. Thanks for catching this

.caseSensitive(caseSensitive);

return nameMapping != null ?
builder.withNameMapping(NameMappingParser.fromJson(nameMapping)).build() : builder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, let's separate the name mapping from build.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

protected int nextId() {
private int nextId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also prefer to revert this change. There may not be a use of it as a protected method any more, but we don't need to change this file and risk git conflicts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


@Override
public void beforeRepeatedElement(Type element) {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments to these methods to explain why they are here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let me add the comments back.

@rdblue
Copy link
Contributor

rdblue commented Jun 16, 2020

@chenjunjiedada, looks like there are blockers with the new logic in PruneColumns and the test in TestParquetSchemaUtil. Otherwise, I think we can get this in. Please also fix the minor issues and style. Thanks!

@rdblue
Copy link
Contributor

rdblue commented Jun 17, 2020

Looks good! I would still like to fix the tests that don't remove IDs, but I'm going to merge this since it is a big commit to keep updating and to review. Thanks for all your work on this, @chenjunjiedada!

@rdblue rdblue merged commit 5a3cd22 into apache:master Jun 17, 2020
@chenjunjiedada chenjunjiedada deleted the namemapping branch June 17, 2020 23:22
@rdblue rdblue added this to the Java 0.9.0 Release milestone Jul 10, 2020
cmathiesen pushed a commit to ExpediaGroup/iceberg that referenced this pull request Aug 19, 2020
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Convert error while query imported spark table

3 participants