-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-35237] Allow Sink to Choose HashFunction in PrePartitionOperator #3414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lvyanquan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this great contribution, LGTM. left some minor comments about java doc.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunction.java
Outdated
Show resolved
Hide resolved
lvyanquan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
And CC @PatrickRen @leonardBang
|
What's more, considering that the number of buckets and parallelism may not be consistent, should we remove the constraint on EventPartitioner? |
Although the number of buckets and parallelism will differ, we can only distribute based on parallelism rather than buckets, right? We have already distributed the hash values to the various parallelisms here, so I think there's no need to change anything here. |
|
Got it, There is indeed no need for adjustment. |
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java
Outdated
Show resolved
Hide resolved
|
Can this PR be merged before the other PR? Both PRs are marked for inclusion in version 3.2, but the other PR depends on this one. I will need some time to make the necessary adjustments. |
|
@leonardBang @PatrickRen can you help to review and merge this? |
|
@yuxiqian CC. |
yuxiqian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @dingxin-tech's contribution. I wonder if DefaultHashFunctionProvider implementation could be improved when migrating from existing PrePartitionOperator#HashFunction.
...k-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultHashFunctionProvider.java
Outdated
Show resolved
Hide resolved
...k-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultHashFunctionProvider.java
Outdated
Show resolved
Hide resolved
...k-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultHashFunctionProvider.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunction.java
Outdated
Show resolved
Hide resolved
| // -------------------------------------------------------------------------------------------- | ||
| default void open() {} | ||
|
|
||
| default void close() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a simple Provider interface need these life cycle methods? do we use them in any implementation classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we provided the getHashFunction method based on TableId, connector implementers might use TableId to obtain the actual schema of the database and perform some caching operations. We can establish connections and initialize caches in the open method. These two lifecycle methods were added following @lvyanquan 's suggestion, and he might respond with further additions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually, it will need to obtain partition or bucket information based on TableId.
I wonder if it is possible to cache catalog or connection here to reuse objects or connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A function with lifecycle management makes sense to me, why the factory need to care the resource management? could we push the logic that use TableId to obtain partition or buckets info to HashFunction internal? The HashFunction need to open/close may be better, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, HashFunctionalProvider is similar to the role of a catalog, and HashFunction is similar to the role of a table, and the open/lose method is called on the catalog role.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, Catalog is used as a metadata manager instance instead of a table factory, table is not constructed by the instance. A metadata manager has its life cycle makes sense to me, a factory own lifecycle confuse me a little. Come back to the function itself, Flink also has many functions as well as the function factories, I didn't see the necessary why a function factory need to care the required resource in runtime, but function manages its resources is pretty common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also visit flink code about:
-
RichFunction & SourceFunctionProvider
-
InputFormat & InputFormatProvider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the key point is that what we need is a metadata manager or a function factory. I agree that what we need is one function factory, though it may need the assistance of specific metadata manager.
And the resource utilization is indeed overthinking, because we can release database connection after extracting the parameters for calculation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I want to propose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, I believe that HashFunction should not have lifecycle functions. It is merely a hash function, not a Flink operator, and it will be cached and disappear automatically. Its lifecycle can be perceived as a kind of "constant" after creation, so we should not concern ourselves with its lifecycle.
Based on this viewpoint, whether HashFunctionProvider should have lifecycle functions depends solely on whether we need to reuse some resources created at runtime when creating a HashFunction.
In fact, I also think that currently, the connectors do not have any tasks that need to be done within lifecycle functions. Therefore, I have removed the lifecycle functions for now.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java
Outdated
Show resolved
Hide resolved
|
hi, @leonardBang, can you help to review again and merge this? |
9084d60 to
5222dfa
Compare
5222dfa to
7ac39fd
Compare
I append one commit to polish the interface and package, Could you also take a look ? |
sure and looks good for me. |
https://issues.apache.org/jira/browse/FLINK-35237