Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Feb 3, 2023

What changes were proposed in this pull request?

This pr makes metadata output consistent during analysis by checking the output and reuse these if exists.

This pr also deduplicates the metadata output when merging into the output.

Why are the changes needed?

Let's say a process of resolving metadata:

Project (_metadata.file_size)
  File (_metadata.file_size > 0)
    Relation
  1. ResolveReferences resolves _metadata.file_size for Filter
  2. ResolveReferences can not resolve _metadata.file_size for Project, due to Filter is not resolved (data type does not match)
  3. then AddMetadataColumns will merge metadata output into output
  4. the next round of ResolveReferences can not resolve _metadata.file_size for Project since we filter not the confict names(output already contains the metadata output), see code:
        def isOutputColumn(col: MetadataColumn): Boolean = {
          outputNames.exists(name => resolve(col.name, name))
        }
        // filter out metadata columns that have names conflicting with output columns. if the table
        // has a column "line" and the table can produce a metadata column called "line", then the
        // data column should be returned, not the metadata column.
        hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
    
    And we also can not skip metadata column during filter confict name, otherwise the new generated metadata attribute will have different expr id with previous.

One failed example:

SELECT _metadata.row_index  FROM t WHERE _metadata.row_index >= 0;

Does this PR introduce any user-facing change?

yes, bug fix

How was this patch tested?

add test for v1, v2 and streaming relation

@ulysses-you
Copy link
Contributor Author

cc @Yaohua628 @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused, why do we care about metadata output anymore if it has been added to the output? The column in project will just be resolved as a normal attribute, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but there is a conflict in AddMetadataColumns. We will add a new project(original output), see

val newNode = addMetadataCol(node, metaCols.map(_.exprId).toSet)
// We should not change the output schema of the plan. We should project away the extra
// metadata columns if necessary.
if (newNode.sameOutput(node)) {
newNode
} else {
Project(node.output, newNode)
}

then with this case, it will be:

Project (_metadata.file_size)
  Project(c)
    File (_metadata.file_size > 0)
      Relation c, metadataoutput

I guess the original idea is we should always use children.metadataoutputs to do resolving to prevent exposing unnecessary metadata columns, isn't it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #39895

Copy link
Contributor Author

@ulysses-you ulysses-you Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's acutally the different issue, my test cases still fail with your pr.

After addMetadataCol, the output of newNode is always different with original node(metadata output merges into output), so an extra project can not be avoid. (Your pr fixs a special case of hiddenOutputTag with NaturalJoin)
Then, the only way to resolve rest metadata columns is using metadata output. But before this pr, the metadata output is lost once call addMetadataCol.

Copy link
Contributor

@cloud-fan cloud-fan Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the problem here is we reference metadata col twice in two different nodes. I think the issue is we add the extra project too early. We should only do it once for the root node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's safe to add a project for root node. One issue is we do not know the output of root node because it's not resolved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be safe, let's only do it under case hasMeta: SupportsMetadataColumns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add some comments to explain when it happens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, move it under case relation: HadoopFsRelation =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, and add comments as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a util method in object FileFormat to reduce code duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

// has a column "line" and the table can produce a metadata column called "line", then the
// data column should be returned, not the metadata column.
hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
metadataOutputWithOutConflicts(hasMeta.metadataColumns.toAttributes)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small difference is we call toAttributes before filter out conflicts. Seems fine.

@ulysses-you
Copy link
Contributor Author

@cloud-fan any comments ?

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.4!

@cloud-fan cloud-fan closed this in 5705436 Feb 13, 2023
cloud-fan pushed a commit that referenced this pull request Feb 13, 2023
### What changes were proposed in this pull request?

This pr makes metadata output consistent during analysis by checking the output and reuse these if exists.

This pr also deduplicates the metadata output when merging into the output.

### Why are the changes needed?

Let's say a process of resolving metadata:
```
Project (_metadata.file_size)
  File (_metadata.file_size > 0)
    Relation
```

1. `ResolveReferences` resolves _metadata.file_size for `Filter`
2. `ResolveReferences` can not resolve _metadata.file_size for `Project`, due to Filter is not resolved (data type does not match)
3. then `AddMetadataColumns` will merge metadata output into output
4. the next round of `ResolveReferences` can not resolve _metadata.file_size for `Project` since we filter not the confict names(output already contains the metadata output), see code:
    ```
        def isOutputColumn(col: MetadataColumn): Boolean = {
          outputNames.exists(name => resolve(col.name, name))
        }
        // filter out metadata columns that have names conflicting with output columns. if the table
        // has a column "line" and the table can produce a metadata column called "line", then the
        // data column should be returned, not the metadata column.
        hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
   ```
   And we also can not skip metadata column during filter confict name, otherwise the new generated metadata attribute will have different expr id with previous.

One failed example:
```scala
SELECT _metadata.row_index  FROM t WHERE _metadata.row_index >= 0;
```

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

add test for v1, v2 and streaming relation

Closes #39870 from ulysses-you/SPARK-42331.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5705436)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@ulysses-you
Copy link
Contributor Author

thank you @cloud-fan !

@ulysses-you ulysses-you deleted the SPARK-42331 branch February 13, 2023 11:22
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

This pr makes metadata output consistent during analysis by checking the output and reuse these if exists.

This pr also deduplicates the metadata output when merging into the output.

### Why are the changes needed?

Let's say a process of resolving metadata:
```
Project (_metadata.file_size)
  File (_metadata.file_size > 0)
    Relation
```

1. `ResolveReferences` resolves _metadata.file_size for `Filter`
2. `ResolveReferences` can not resolve _metadata.file_size for `Project`, due to Filter is not resolved (data type does not match)
3. then `AddMetadataColumns` will merge metadata output into output
4. the next round of `ResolveReferences` can not resolve _metadata.file_size for `Project` since we filter not the confict names(output already contains the metadata output), see code:
    ```
        def isOutputColumn(col: MetadataColumn): Boolean = {
          outputNames.exists(name => resolve(col.name, name))
        }
        // filter out metadata columns that have names conflicting with output columns. if the table
        // has a column "line" and the table can produce a metadata column called "line", then the
        // data column should be returned, not the metadata column.
        hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
   ```
   And we also can not skip metadata column during filter confict name, otherwise the new generated metadata attribute will have different expr id with previous.

One failed example:
```scala
SELECT _metadata.row_index  FROM t WHERE _metadata.row_index >= 0;
```

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

add test for v1, v2 and streaming relation

Closes apache#39870 from ulysses-you/SPARK-42331.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5705436)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants