Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Sep 2, 2019

What changes were proposed in this pull request?

Currently Data Source V2 has 2 major use cases:

  1. users plug in a custom catalog, which is tightly coupled with its own data. For example, users can plug in a cassandra catalog, and use Spark to read/write cassandra tables directly.
  2. users read/write the external data as a table directly via DataFrameReader/Writer, or register it as a table in Spark.

Use case 1 is newly introduced in the master branch, which greatly improves the user experience when interacting with external storage systems that have catalogs, e.g. cassandra, JDBC, etc.

Use case 2 is the main use case of Data Source V1, which works well if the external storage system doesn't have a catalog, e.g. parquet files on S3.

However, use case 2 is not well supported. For example

class MyTableProvider extends TableProvider ...
sql("CREATE TABLE t USING com.abc.MyTableProvider")

This fails with AnalysisException: com.abc.MyTableProvider is not a valid Spark SQL Data Source. The session catalog always treats table provider as v1 source.

To support it, this PR updates TableProvider#getTable to accept additional table metadata info. The expected behaviors are defined in https://docs.google.com/document/d/1oaS0eIVL1WsCjr4CqIpRv6CGkS5EoMQrngn3FsY1d-Q/edit?usp=sharing

Why are the changes needed?

Make Data Source V2 supports the use case that is supported by Data Source V1.

Does this PR introduce any user-facing change?

Yes, it's a new feature

How was this patch tested?

a new test suite

* @throws UnsupportedOperationException
*/
default Table getTable(CaseInsensitiveStringMap options, StructType schema) {
default Table getTable(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll refine the classdoc of this interface, after we reach an agreement of the proposal.

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Sep 2, 2019

Test build #110014 has finished for PR 25651 at commit 7fd7d23.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 3, 2019

Test build #110027 has finished for PR 25651 at commit 0da5453.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 3, 2019

Test build #110035 has finished for PR 25651 at commit 0da5453.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

private[sql] object V2SessionCatalog {
case class SchemaChangedException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just SchemaException

@gengliangwang
Copy link
Member

+1 with the proposal.

val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val table = userSpecifiedSchema match {
case Some(schema) => provider.getTable(dsOptions, schema)
case Some(schema) => provider.getTable(dsOptions, schema, Array.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. The DataFrameReader does not know that the table is unpartitioned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is one of the most annoying and confusing behaviors of DataSourceV1. Being able to provide a schema but not partitioning information, which leads to minutes of partition schema inference.

override def getTable(
options: CaseInsensitiveStringMap,
schema: StructType,
partitions: Array[Transform]): Table = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not identical to the metadata passed to createTable? Is there a reason not to pass the table properties as well as the read options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this should look similar to createTable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read options should be passed in Table.newScanBuilder. The options here is the table properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we do have a problem here. Table properties are case sensitive while scan options are case insensitive.

Think about 2 cases:

  1. spark.read.format("myFormat").options(...).schema(...).load().
    We need to get the table with the user-specifed options and schema. When scan the table, we need to use the user-specified options as scan options. The problem is, DataFrameReader.options specifies both table properties and scan options in this case.
  2. CREATE TABLE t USING myFormat TABLEPROP ... and then spark.read.options(...).table("t")
    In this case, DataFrameReader.options only specifies scan options.

Ideally, TableProvider.getTable takes table properties which should be case sensitive. However, DataFrameReader.options also specifies scan options which should be case insensitive.

I don't have a good idea now. Maybe it's OK to treat this as a special table which accepts case insensitive table properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can make table properties case insensitive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface should pass the table properties. There is no need to pass read or write options at this point, unless they can't be separated from table properties (as in the DataFrameReader case). The read options and write options should be passed to the logical plan -- this is added in #25681: https://github.com/apache/spark/pull/25681/files#diff-94fbd986b04087223f53697d4b6cab24R275

I propose passing table properties as a string map (java.util) through this interface. When the properties come from the metastore, then this is fine. When the properties come from DataFrameReader.option (or the write equivalent) then the original case sensitive map should be passed. Then the read options should additionally be passed to the correct plan node so that the physical plan can push them into the scan or the write.

val partitions = new mutable.ArrayBuffer[Transform]()

v1Table.partitionColumnNames.foreach { col =>
partitions += LogicalExpressions.identity(col)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This parses the column's name as a multi-part identifier, which is subtly incorrect. (It'll cause issues if the column name contains special characters like ':'.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't column names with special characters supported? I thought you could escape any identifier using back-ticks.

@rdblue
Copy link
Contributor

rdblue commented Sep 4, 2019

@cloud-fan, can you update the PR title and description? The USING clause is not the problem. That is passed to all catalogs. The problem is that generic catalogs can't pass table information to Table instances created by TableProvider. I think the title should be "Support passing all Table metadata in TableProvider".

I think that clarifying the problem statement will also help clean up the proposed changes. For example, this passes some -- but not all -- Table metadata. It should probably pass all of the fields needed to create a Table instance that behaves like V1Table.

@cloud-fan cloud-fan changed the title [SPARK-28948][SQL] support data source v2 in CREATE TABLE USING [SPARK-28948][SQL] Support passing all Table metadata in TableProvider Sep 12, 2019
throw new NoSuchTableException(ident)
}

tryResolveTableProvider(V1Table(catalogTable))
Copy link
Contributor Author

@cloud-fan cloud-fan Sep 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core change of this PR.

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110919 has finished for PR 25651 at commit 2333585.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111290 has finished for PR 25651 at commit 40e2894.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CatalogExtensionForTableProvider extends DelegatingCatalogExtension

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111293 has finished for PR 25651 at commit 3a6d13d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CatalogExtensionForTableProvider extends DelegatingCatalogExtension

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111298 has finished for PR 25651 at commit 0f9faca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CatalogExtensionForTableProvider extends DelegatingCatalogExtension

@rdblue
Copy link
Contributor

rdblue commented Sep 25, 2019

@cloud-fan, thanks for working on this. I plan to review it tomorrow. Looks like this is huge and touches about 50 files. Is there a way to make it smaller?

@cloud-fan
Copy link
Contributor Author

@rdblue yea it's possible. In this PR, I try to adopt your suggestion to make it clear that TableProvider.getTable should take all the table metadata, so the method signature becomes

def getTable(schema: StructType, partitions: Array[Transform], properties: Map[String, String])

TableProvider has another getTable method which needs to infer schema/partitioning, and previously the method signature was

def getTable(options: CaseInsensitiveStringMap)

To make it consistent, I change it to use properties: Map[String, String], also rename it to loadTable since we need to touch many files anyway.

We can still keep the old method signature with a TODO to change it later, so that this PR can be much smaller.

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111905 has finished for PR 25651 at commit 1124b47.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 14, 2019

Test build #112044 has finished for PR 25651 at commit 1124b47.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2019

Test build #112108 has finished for PR 25651 at commit 1235d78.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Oct 15, 2019

I should have time to review this on Thursday if it is ready. Until then, I commented on some of the open threads.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Oct 21, 2019

@rdblue , when I try to have separated methods inferSchema and inferPartitioning, problems keep popping up (mostly from file source v2).

The major problems hit so far:

  1. inferSchema/inferPartitioning need to list files, and we should only do file listing once when we scan a directory without user-specified schema. This can be resolved by using a static cache or simply cache the listed files in the FileDataSourceV2 instance.
  2. when writing to a directory, no schema/partition inference should be done. It looks to me that we need to have 2 separated methods getTableToRead and getTableToWrite, while getTableToWrite does not take schema/partitioning and we don't need to call inferSchema/inferPartitioning. But this makes the API ugly.

I feel that, the existing API (getTable with several overloads) is more flexible and allows implementations to have its special logic. For example, it allows file source to do schema inference lazily, which won't be triggered at all during write.

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112405 has finished for PR 25651 at commit cfbe0a7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Oct 21, 2019

inferSchema/inferPartitioning need to list files, and we should only do file listing once when we scan a directory without user-specified schema

Why should this internal concern of one source affect the API? Partitioning inference does not require listing all the files in a table, and I doubt that it is a good idea to do that for schema inference either. If a table is small, it doesn't matter if this work is done twice (before it is fixed); and if a table is really large, then it isn't a good idea to do this for schema inference anyway.

when writing to a directory, no schema/partition inference should be done.

This statement makes assumptions about the behavior of path-based tables and that behavior hasn't been clearly defined yet. Can you be more specific about the case and how you think path-based tables will behave?

I disagree that no schema or partition inference should be done for writing. Maybe it isn't done today, but if there is existing data, Spark shouldn't allow writing new data that will break a table by using an incompatible schema or partition layout. In that case, we would want to infer the schema and partitioning.

Also, if it isn't necessary to infer schema and partitioning, then this information still needs to be passed to the table. When running a CTAS operation, Spark might be called with partitionBy. In that case, if Spark doesn't call inferPartitioning then what is the problem?

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Oct 22, 2019

I was not trying to define the behavior, but talking about the existing behavior. df.write.mode("append").parquet("path_with_existing_data") will not do schema/partition inference, and just append the data even if the schema is incompatible. The following read will fail during schema inference. I think this is a reasonable behavior, as there is no "user-specified schema" in DataFrameWriter to skip schema inference.

@cloud-fan
Copy link
Contributor Author

If you really think that schema or partition inference should be done for writing, we should disable file source v2 by default to not surprise users.

@rdblue
Copy link
Contributor

rdblue commented Oct 22, 2019

If you really think that schema or partition inference should be done for writing, we should disable file source v2 by default to not surprise users.

I'm not saying that it is what we should do. That should be covered by a design doc for path-based tables. My point is that the claim that it won't be done is not necessarily true and makes assumptions about how these tables will behave.

@cloud-fan
Copy link
Contributor Author

Do you mean we should block this PR until we figure out the behavior of path-based tables? This PR simply makes TableProvider to accept user-specified partitioning, while keeping the API style and existing file source behavior unchanged. I think we've gone too far about proposing a new API style for TableProvider.

I'm OK to adopt the new API style if it doesn't break the existing behavior. But seems now we are unable to keep file source skipping schema/partition inference during write. Shall we discuss the new API style later?

@rdblue
Copy link
Contributor

rdblue commented Oct 23, 2019

Do you mean we should block this PR until we figure out the behavior of path-based tables?

No, and sorry for the misunderstanding! My point is that your claim that inference won't be used in the write path is not necessarily correct and depends on the behavior we decide for path-based tables.

But seems now we are unable to keep file source skipping schema/partition inference during write.

I think it's an exaggeration to say "unable". Partition inference in particular can be done much more easily and efficiently than depending on a recursive directory listing to find all data files. Granted, the current implementation would need to change, but do you really think that "unable" is an accurate description?

The problem is that this needs to be decided because it affects the API that will go into Spark 3.0. I think we should go with what we agreed was a good solution for the API -- adding the inferSchema and inferPartitioning methods -- because I haven't heard a very strong argument against it. Let's talk about this in the next v2 sync to get more opinions.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 19, 2020
@cloud-fan cloud-fan removed the Stale label Mar 19, 2020
@SparkQA
Copy link

SparkQA commented May 18, 2020

Test build #122766 has finished for PR 25651 at commit cfbe0a7.

  • This patch fails build dependency tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Aug 27, 2020
@cloud-fan cloud-fan removed the Stale label Aug 27, 2020
@github-actions
Copy link

github-actions bot commented Dec 6, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 6, 2020
@github-actions github-actions bot closed this Dec 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants