Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Oct 29, 2019

What changes were proposed in this pull request?

Instead of having several overloads of getTable method in TableProvider, it's better to have 2 methods explicitly: inferSchema and inferPartitioning. With a single getTable method that takes everything: schema, partitioning and properties.

Spark supports: 1) infer both schema and partitioning. 2) specifies schema and infer partitioning 3) specifies both schema and partitioning. So inferPartitioning method takes schema as input, as schema must be known before inferring partitioning.

A lot of changes are made to file source v2, to move the schema inference from FileTable to FileDataSourceV2.

Why are the changes needed?

This is inspired by the discussion in #25651 (comment)

It's better to let the APIs have explicit meanings.

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing tests.

@SparkQA
Copy link

SparkQA commented Oct 29, 2019

Test build #112859 has finished for PR 26297 at commit f8999a1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaTable(override val schema: StructType) extends Table
  • implicit class PartitionColNameHelper(partColNames: Seq[String])
  • class NoopDataSource extends TableProvider with SupportsDirectWrite with DataSourceRegister
  • trait FileDataSourceV2 extends TableProvider with SupportsDirectWrite with DataSourceRegister
  • abstract class FileTable extends Table with SupportsRead with SupportsWrite
  • class TextSocketTable(

@SparkQA
Copy link

SparkQA commented Oct 29, 2019

Test build #112860 has finished for PR 26297 at commit 25aeebf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaTable(override val schema: StructType) extends Table
  • implicit class PartitionColNameHelper(partColNames: Seq[String])
  • trait FileDataSourceV2 extends TableProvider with SupportsDirectWrite with DataSourceRegister
  • abstract class FileTable extends Table with SupportsRead with SupportsWrite
  • class TextSocketTable(

public static Transform bucket(int numBuckets, String... columns) {
return LogicalExpressions.bucket(numBuckets,
JavaConverters.asScalaBuffer(Arrays.asList(columns)).toSeq());
NamedReference[] references = Arrays.stream(columns)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll send another for these related changes.

// For file source, it's expensive to infer schema/partition at each write. Here we pass
// the schema of input query and the user-specified partitioning to `getTable`. If the
// query schema is not compatible with the existing data, the write can still success but
// following reads would fail.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can introduce a general API for it, but I don't think it's worth. It only applies to file source + DataFrameWriter, not file source tables or DataFrameWriterV2.

@cloud-fan cloud-fan changed the title [DO NOT MERGE] [SPARK-29665][SQL] refine the TableProvider interface Oct 30, 2019
@SparkQA
Copy link

SparkQA commented Oct 30, 2019

Test build #112945 has finished for PR 26297 at commit aec3b6b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaTable(override val schema: StructType) extends Table
  • implicit class PartitionColNameHelper(partColNames: Seq[String])
  • abstract class FileTable extends Table with SupportsRead with SupportsWrite
  • class TextSocketTable(

@SparkQA
Copy link

SparkQA commented Oct 30, 2019

Test build #112947 has finished for PR 26297 at commit e583dab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaTable(override val schema: StructType) extends Table
  • implicit class PartitionColNameHelper(partColNames: Seq[String])
  • abstract class FileTable extends Table with SupportsRead with SupportsWrite
  • class TextSocketTable(

StructType(fields)
}

override def inferPartitioning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only one I see that uses schema, and that is just to create the file index.

* @param options The options that can identify a table, e.g. file path, Kafka topic name, etc.
* It's an immutable case-insensitive string-to-string map.
*/
Transform[] inferPartitioning(StructType schema, CaseInsensitiveStringMap options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the schema is actually needed. Partitioning and schemas are mostly orthogonal. If anything, you could argue that identity partitions should be in the schema and that inferSchema could accept the result of inferPartitioning.

Also, none of the implementations actually use it besides the one that uses it to create a file index. It seems to me like this is more of a convenience for that implementation than something that is generally needed. Can we remove it from the API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the schema parameter and make the API more flexible, but I'm not sure we need such flexibility.

As I mentioned in the PR description, Spark only supports

  1. infer both schema and partitioning.
  2. specifies schema and infer partitioning.
  3. specifies both schema and partitioning.

It seems very weird if we allow users to specify partitioning and infer schema. Since partitioning is something depending on the schema (e.g. you can't pick a non-existing column as partition column), I think in general it makes sense to have the schema parameter in inferPartitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've also had bugs in the past where the inference picks a different data type than what you want, therefore it's safer that if a user provides a schema to use the data type in the provided schema for the partition columns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems very weird if we allow users to specify partitioning and infer schema.

This isn't what I'm suggesting. We can set up rules for when schema and partition inference are called that restrict to just those 3 cases.

What I'm suggesting is that schema inference and partition inference are independent so we don't need to pass a schema in to inferPartition. The schema isn't actually used by file sources, and file sources are why we are making these changes.

you can't pick a non-existing column as partition column

There's no reason why this must be the case. Another partition column could be added to the schema. Data files don't usually store partition columns, so the schema is usually the union of all file schemas plus whatever is inferred for the partition schema. That means schema depends on partitioning, not the other way around.

We've also had bugs in the past where the inference picks a different data type than what you want.

I see what you mean here, but I think is better to reconcile the differences in Spark instead of in the source. If the source infers that a partition is a string, but the user supplies a schema with an integer type, then all the source would do is throw an exception. Spark can do that once the partitioning is passed back, couldn't it?

@rdblue
Copy link
Contributor

rdblue commented Nov 14, 2019

@cloud-fan, I'll take a look at this one next. Is it possible to make these changes in multiple PRs? 57 files is a lot to go through for a single review.

@cloud-fan
Copy link
Contributor Author

This changes the API so all the implementations need to be updated. Now sure how to make it smaller.

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114540 has finished for PR 26297 at commit 89bc758.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaTable(override val schema: StructType) extends Table
  • abstract class FileTable extends Table with SupportsRead with SupportsWrite
  • class TextSocketTable(

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114542 has finished for PR 26297 at commit 95b62b6.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaTable(override val schema: StructType) extends Table
  • abstract class FileTable extends Table with SupportsRead with SupportsWrite
  • class TextSocketTable(

@cloud-fan
Copy link
Contributor Author

Can we move forward with this PR? The only argument I see is: why have the schema parameter in inferPartitioning?

First, we can guarantee that inferSchema is called before inferPartitioning at compile time. It's better than documentation.

Second, if the schema is specified and partitioning needs to be inferred, we can save the time of inferring partition column data type, by simply picking the column data type from the specified schema. This is needed for file source.

If we can't reach a consensus, I'd propose to keep the existing API style, and simply add a new method getTable(schema, partitioning, properties), to accept specified partitioning and unblock #25651

@rdblue
Copy link
Contributor

rdblue commented Nov 27, 2019

I think this needs to be split into multiple commits to avoid problems. I think it can be done by adding the interface changes, along with a stub implementation on the file source base classes.

@cloud-fan
Copy link
Contributor Author

It's hard to write a stub implementation. The API change is too big. Previously file source v2 implements schema/partition inference in XYZFileTable , now it needs to be moved to XYZDataSourceV2.

I've open #26750 to give up the refactor and only do the necessary change to accept user-specified partitioning in TableProvider. Please take a look, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants