Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

The TableProvider only accepts table schema and properties. It should accept table partitioning as well.

This is extracted from #25651, to only keep the API changes and make the diff smaller.

Why are the changes needed?

Although DataFrameReader/DataStreamReader don't support user-specified partitioning, we need to pass the table partitioning when getting tables from TableProvider if we store tables in Hive metastore with v2 provider.

Does this PR introduce any user-facing change?

not yet.

How was this patch tested?

existing tests

@cloud-fan
Copy link
Contributor Author

This is preferred over #26297, because

  1. This follows the existing API style, so much less diff.
  2. It's hard to decouple scheme and partition inference. For example, file source needs to infer partitioning before reporting its schema, as partition columns are part of the table schema.

@cloud-fan
Copy link
Contributor Author

retest this please

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan . Could you fix the following two lint-java errors detected by GitHub Action? Our Jenkins PR builder seems to ignore it.

[ERROR] src/test/java/test/org/apache/spark/sql/connector/JavaSimpleDataSourceV2.java:[43] (sizes) LineLength: Line is longer than 100 characters (found 102).
34
[ERROR] src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java:[73] (sizes) LineLength: Line is longer than 100 characters (found 102).

@SparkQA
Copy link

SparkQA commented Dec 4, 2019

Test build #114857 has finished for PR 26750 at commit 9fe392b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

@apache apache deleted a comment from SparkQA Dec 5, 2019
@apache apache deleted a comment from SparkQA Dec 5, 2019
@apache apache deleted a comment from SparkQA Dec 5, 2019
@cloud-fan
Copy link
Contributor Author

cc @rdblue @brkyvz @gengliangwang

@SparkQA
Copy link

SparkQA commented Dec 5, 2019

Test build #114889 has finished for PR 26750 at commit 7fcee0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

* schema.
*/
Table getTable(CaseInsensitiveStringMap options);
Table getTable(StructType schema, Map<String, String> properties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the main idea of this PR is to remove the case-insensitive requirements from the original java TableProvider DSv2 design?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dbtsai and @aokolnychyi for Iceberg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the main idea is to add a new overload method to accept user-specified partitioning. Since we need to change the API, we change the option type as well, see https://github.com/apache/spark/pull/26750/files#r354692676

* Return a {@link Table} instance with specified table properties to do read/write.
* Implementations should infer the table schema and partitioning.
*
* @param properties The specified table properties. It's case preserving (contains exactly what
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did miss most of the discussions unfortunately. Would this be used as table properties directly? Also include read/write options? Or are the read/write options going to translate to a catalog and identifier as we had discussed some time ago?

I guess a path based table would have a location table property, which constitutes the old option "path"`, correct?

Copy link
Contributor Author

@cloud-fan cloud-fan Dec 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's used as table properties directly, not read/write options. The read/write options are case-insensitive and passed through Table.newScanBuilder/Table.newWriteBuilder.

Table properties doesn't have to be case-insensitive. So here I just define it as case-preserving. The implementation is free to interpret it case-sensitive or case-insensitive. (Spark can't control it anyway)

If you read a table with DataFrameReader, then the options will be passed to the data source twice: once with TableProvider.getTable, once with Table.newScanOptions.

If you create the table first with CREATE TABLE USING v2Provider TBLPROPERTIES ..., and then read the table with DataFrameReader.option(...).table, then table properties and read options are different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a path based table would have a location table property, but I don't know why we can't reuse the old name path. cc @rdblue

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot. After this, we're a single step away from being able to create/replace v2 tables through the "save" API. I'd love others to also weigh in, since I've missed most V2 discussions in the last month.

One concern I have is the mismatch between path and location as data source options and Hive metastore table properties


// A simple version of `TableProvider` which doesn't support specified table schema/partitioning
// and treats table properties case-insensitively. This is private and only used in builtin sources.
private[sql] trait SimpleTableProvider extends TableProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is private and only used by built-in sources, it should not be in the catalyst connector package. That is the public API package, and this will appear public outside of Scala.

I think this should be in org.apache.spark.sql.execution.datasources.v2.

override def getTable(properties: util.Map[String, String]): Table = {
getTable(new CaseInsensitiveStringMap(properties))
}
override def getTable(schema: StructType, properties: util.Map[String, String]): Table = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These methods should have a newline between them.

}

private [connector] trait SessionCatalogTest[T <: Table, Catalog <: TestV2SessionCatalogBase[T]]
private[connector] trait SessionCatalogTest[T <: Table, Catalog <: TestV2SessionCatalogBase[T]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a necessary change. If you were already changing this line it would be fine, but as it is this change can cause conflicts and should probably be reverted.

@rdblue
Copy link
Contributor

rdblue commented Dec 8, 2019

I find this approach a little awkward because it mixes really different use cases into the same API. One is where you have a metastore as the source of truth for schema and partitioning, and the other is where the implementation is the source of truth.

This leads to strange requirements, like throwing an IllegalArgumentException to reject a schema or partitioning. That doesn't make much sense when the source of truth is the metastore. And, the API doesn't distinguish between these cases, so an implementation doesn't know whether the table is being created by a DataFrameWriter (and should reject partitioning that doesn't match) or if it is created from metastore information (and should use the partitioning from the metastore).

That's why I liked the approach of moving the schema and partitioning inference outside of this API. That way, Spark is responsible for determining things like whether schemas "match" and can use more context to make a reasonable choice.

Why abandon the other approach? I thought that we were making progress and that the primary blocker was trying to do too much to be reviewed in a single PR.

@cloud-fan
Copy link
Contributor Author

That way, Spark is responsible for determining things like whether schemas "match" and can use more context to make a reasonable choice.

We can also do that too, e.g. first call getTable(schema, partition, properties) and then check the returned table reports the compatible schema/partitioning as the one passed in.

Even if we have separated method inferSchema and inferPartitioning, we still require the getTable method to throw IllegalArgumentException to reject non-compatible schema/partitioning. E.g. there is a user-provided schema and we pass it to getTable directly.

My main point is, this PR is a natural extension of the existing API: if the TableProvider accepts user-specified schema, why not accept user-specified partitioning? The refactor might be good, but it should be a separated story.

If we all agree that the existing API is wrong (the way we accept user-specified schema), then this PR should be rejected as it extends a wrong API. But this seems not the case here.

throw new UnsupportedOperationException(
this.getClass().getSimpleName() + " source does not support user-specified schema");
}
Table getTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit curious about the parameter order in these 3 methods:

getTable(properties)
getTable(schema, properties)
getTable(schema, partitioning, properties)

Is it on purpose? Why not:

getTable(properties)
getTable(properties, schema)
getTable(properties, schema, partitioning)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just follow the order in TableCatalog.createTable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind changing the parameter order? It is a bit wired.
Besides, previously the parameter order is like

getTable(options)
getTable(options, schema)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference, but probably better to be consistent with createTable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think consistency in the trait TableProvider itself is more important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not consistent?

getTable(properties)
getTable(schema, properties)
getTable(schema, partitioning, properties)

Copy link
Member

@gengliangwang gengliangwang Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the common practice. The three methods look neater when each parameter is in a fixed position

getTable(properties)
getTable(properties, schema)
getTable(properties, schema, partitioning)

@cloud-fan cloud-fan closed this Feb 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants