Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Jan 11, 2021

Provides a switch in org.apache.flink.sink.FlinkSink to shuffle by partition key, so that each partition/bucket will be wrote by only one task. That will reduce lots of small files in partitioned fanout write policy for flink sink.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great and will be really useful.

SimpleDataUtil.createRecord(3, "ccc")
));

Assert.assertEquals("Should 1 data file in partition 'aaa'", 1, partitionFiles(tableName, "aaa").size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit: Consider There should be [only] 1 data file in partition 'aaa'. And similarly for the other files.

WRITE_SHUFFLE_BY_PARTITION,
WRITE_SHUFFLE_BY_PARTITION_DEFAULT);
} else {
return shuffleByPartition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat unrelated question: Is it possible to set these values at the cluster level (like with Flink's properties), or is it only code property then table property only?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's flexible to provide job-level and table-level for this option, shuffling by partition for the whole cluster's job seems to be coarse granularity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair assessment.

I ask as we typically use job-clusters at my work and then try to specify as much configuration as possible on the cluster's config. This is mostly to provide one easy way to track this, and is definitely tied to how our build and deployment system and configuration system is set up internally at my work.

Outside of job clusters, I would agree that it's too coarse grained. And having the possibility to set it as a job config or a table config is good enough. I'm not even sure if Flink would accept arbitrary configurations at the cluster level (that it isn't aware of).

public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;

public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make sure all query engines are aligned with this.
In my view, we should support the following cases:

  • local sort using the table sort order
  • repartition using partition spec and local sort by the table sort order
  • global sort using the table sort order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark, our plan was to support the following commands:

-- global
ALTER TABLE WRITE
ORDERED BY p1, bucket(id, 128), c1, c2

-- hash + local sort
ALTER TABLE WRITE
DISTRIBUTED BY p1, bucket(id, 128)
LOCALLY ORDERED BY p1, bucket(id, 128), c1, c2

-- local sort
ALTER TABLE WRITE
LOCALLY ORDERED BY p1, bucket(id, 128), c1, c2

Copy link
Contributor

@stevenzwu stevenzwu Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on more generalized semantics. Current option only works if the data is relatively evenly distrubuted across table partitions. Otherwise, heavy data skew can be problematic for writer. The other problem is that effective writer parallelism now is limited by the number of partition values. Let's say the writer parallelism is 100, but the number of unique partition values are only 10. Then only 10 writer subtasks will get the data.

I will add some notes for the streaming write mode. In a streaming job, it is probably impossible to do true sorting. Instead, what can be useful is some sort of "groupBy/bucketing" shuffle in the streaming sink. It can help with reducing too many concurrent open files per writer and improving read performance (predicate pushdown) with better data locality.

E.g., a table is partition by (event_date, country). Without the shuffle, each writer task can write to ~200 files/countries. However, a simple keyBy is also problematic as it can produce heavy data skew for countries like US. Instead, we should calculate stats for each bucket/country and distribute the data based on the weight of each bucket. E.g., we may allocate 100 downstream subtasks for US, while allocating 1 downstream subtask for multiple small countries (like bin packing).

This can also be extended to non-partition column (as logical partitioning), which can improve read performance with filtering. Similar to the above example with the tweak that country is not a partition column anymore. groupBy/bucketing shuffle can help improve data locality.

I was thinking about a groupBy/orderBy operator where each subtask (running in taskmanager) can constantly report local statistics to operator coordinator (running in jobmanager), which then does the global aggregation and notify subtasks with the globally aggregated stats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same concern as @stevenzwu that a hash distribution by partition spec would co-locate all entries for the same partition in the same task, potentially leading to having too much data in a task. The global sort in Spark would be a better option here for batch jobs as it will do skew estimation and the sort order can be used to split data for the same partition across multiple tasks.

To sum up, I think we should be flexible and support 3 modes to cover different use cases.

Copy link
Contributor

@aokolnychyi aokolnychyi Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @jacques-n @omalley @rdblue as it is related to the discussion we had during the last sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @electrum. You may be interested in this discussion for recommended write behavior from table config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink may eventually provide a way to order within data files, but I think that is less important than clustering data across files so that data files can be skipped in queries.

Agreed. Though sorting within data file would be really helpful for page skipping, but that would introduce more cost for streaming processing job. Range distribution by sorted keys is some kind of coarse granularity, but it's good enough for streaming job to cluster keys for filtering among data files, I think it's a better balanced choice when trade off between write efficiency and read performances.

It make sense to me that rewriting those range distributed data files into row-ordering files if there're heavy reads that depends on them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue thanks for the pointer. Here are my thoughts on how this would work for Trino (formerly Presto SQL).

Trino does streaming execution between stages -- there is no materialized shuffle phase. This means that global sorting would only be possible using a fixed range, not based on statistics, so it would be vulnerable to skew. I'd like to understand the use case for global "sort" compared to "partition".

For local sorting, I see two choices:

  1. Write arbitrarily large files. Use a fixed size in-memory buffer, sort when full, write to temporary file, then merge files at end. There may be multiple merge passes in order to limit the number of files read at once during the merge. This is what we do for Hive bucketed-sorted tables, since sorting per bucket is required.
  2. Write multiple size-limited files. Use a fixed size in-memory buffer, sort when full, write final output file. Repeat until all input data for writer has been consumed.

I would prefer the second option as it is simpler and uses fewer resources. It satisfies the property that each file is sorted and helps with compression and within-file filtering. The downside is that there are more files, but if they are of sufficient size, it shouldn't affect reads as we split files anyway when reading.

Another option is to sort data using a fixed size buffer before writing each batch of rows. This would help with compression and within-file filtering, but wouldn't provide a guarantee on sorting for readers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@electrum, as far as what a "local sort" means, I think option 2 sounds good to me for a task-level sort. If that sort is needlessly expensive, then it is okay for Trino to skip it. But I think that if a table has a defined sort order, the right thing would be for Trino to apply it.

For data distribution, it sounds like Trino will only support none and hash modes in the short term. That's reasonable given that you can't stage data and use it twice. Even with shuffle data reuse, global sort in Spark is quite expensive in some cases (doing a large join twice, for example). Eventually, we want to get to where the table metadata has a sketch of the data distribution so you can use that to get ranges for a global ordering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we want to get to where the table metadata has a sketch of the data distribution so you can use that to get ranges for a global ordering.

I was also thinking about how to partition the (-oo, +oo) into several even key ranges ( partition key ranges or sort key ranges) for flink. Seems this idea is similar to the @stevenzwu 's list-of-values column stats from #2064 (comment) . Yes, that helps a lot if we have such fine-grained column range stats.

@github-actions github-actions bot added the API label Jan 19, 2021
@openinx openinx changed the title Flink: Add option to shuffle by partition key in iceberg sink. Flink: Support write.distribution-mode. Jan 19, 2021
* suitable for the scenarios where rows are located into different partitions with skew distribution.
*/
public enum DistributionMode {
NONE("none"), HASH("hash-partition"), RANGE("range-partition");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I noted in the comment thread, I think that we should use "partition" to describe only table partitions. Otherwise, we are going to create confusion. We can use "hash" and "range" here if there is consensus that "partition" and "sort" are not clear, but I don't think that we should use the term "partition" to refer to distribution within a processing engine.

Copy link
Contributor

@stevenzwu stevenzwu Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Partition" and "sort" aren't very clear to me. Both "hash partition" and "range partition" are "partitions". But in the table, they are listed as "partition" and "sort".

The general concepts that @rdblue defined in the table above are still very good guidances for us to think about those dimensions. But if Flink and Spark are going to support different behaviors, maybe it is better for them to define different values to be more accurately describe the behavior.

Copy link
Contributor

@aokolnychyi aokolnychyi Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on not using the term "partition" when talking about distribution.

W.r.t. naming, I did ask myself the question what should be the best names here. I tend to like "hash" and "range" a bit more as it may not be clear that partition refers to the table's partition spec.

I guess the real question here is what does this table property control? Are we allowing to control whether to use hash or range distribution or do we control whether the distribution is based on the partition spec or sort order?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it controls whether we use hash distribution or range distribution. I agree that's more clear from a developer's perspective. My concern is that users won't know what hash and range are, but they do understand what a partition is and what sorting is.

Let's go with hash and range for now. I think we can explain it well enough in docs, and we can also add aliases that are more clear if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we call config name as write.shuffle-mode? would it make it more clear to user regarding hash vs range?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu , I think write.shuffle-mode is enough to express the write behavior of flink, but not enough to express the write behavior of spark, because spark will distribute those records with local sort or global sort.

+1 on keeping write.distribution-mode and using the hash & range values now ( Though the range does not fully express the sort meaning from spark, but I can not think of a better word to express the exact meaning for both flink and spark ).

return name;
}

public static DistributionMode fromName(String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we used hash and range, then this would just need to use valueOf(name.toUpperCase(Locale.ROOT)) (with a null check, of course).


RowType flinkSchema;
case RANGE:
throw new UnsupportedOperationException("The write.distribution-mode=range is not supported in flink now");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By throwing an exception here, users could break jobs by setting the distribution mode. Is that okay? I guess it wouldn't affect running jobs because they are already configured.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two cases:
Case.1 : people configure the distribution-mode in job-level to RANGE, as we don't support it now so we'd better to throw UnsupportedOperationException now;

Case. 2: people change an existing table's properties from NONE to RANGE, then all running flink jobs wont' be affected unless restarting, the newly started flink job will be required to use NONE or HASH. It's not friendly to break all existing jobs when restarting, let me add a warn log and just keep the default NONE behavior.

config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot());
config.put(CatalogProperties.HIVE_URI, getURI(hiveConf));
}
config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related? This looks like a fix for something else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a fix, just for abstraction, so that we could get all data files under the given partition here: https://github.com/apache/iceberg/pull/2064/files#diff-0aaa93576853d5b379da121bc5d6161eb888fe15b88e3597374ed894d8c94917R275

@rdblue
Copy link
Contributor

rdblue commented Jan 19, 2021

This looks nearly ready. Mainly, I would like to get consensus on the config values.

if (partitionSpec.isUnpartitioned()) {
return input;
} else {
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have concerns on supporting this hash partition of keyBy the partition key due to the data skew problem that mentioned in the discussion thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to cover all cases, but I think it is a necessary first step. Data skew is going to require range distribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is fair

@rdblue
Copy link
Contributor

rdblue commented Jan 20, 2021

I think there is consensus around using "none", "hash", and "range" for the distribution mode. Once that's implemented, I think this is ready to commit. I also had some other minor comments.

@rdblue rdblue merged commit c75ac35 into apache:master Jan 20, 2021
@rdblue
Copy link
Contributor

rdblue commented Jan 20, 2021

Looks great, thanks for working on this @openinx!

And thanks to everyone that helped discuss the configuration!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants