-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Implement the routine load process of Kafka on Backend #671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the routine load process of Kafka on Backend #671
Conversation
| } | ||
|
|
||
| // delete TopicPartition finally | ||
| auto tp_deleter = [] (const std::vector<RdKafka::TopicPartition*>& vec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto tp_deleter = [] (const std::vector<RdKafka::TopicPartition*>& vec) { | |
| auto tp_deleter = [&topic_partitions] () { |
| std::for_each(vec.begin(), vec.end(), | ||
| [](RdKafka::TopicPartition* tp1) { delete tp1; }); | ||
| }; | ||
| DeferOp delete_tp(std::bind<void>(tp_deleter, topic_partitions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to use std::bind
| return; | ||
| } | ||
|
|
||
| #define HANDLE_ERROR(stmt, err_msg) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use macro here, you can create another function do following work, and this function call that function, and checks its return and handler error
| DeferOp delete_conf(std::bind<void>(conf_deleter, conf)); | ||
|
|
||
| std::string errstr; | ||
| #define SET_KAFKA_CONF(conf_key, conf_val) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use lambda instead
| public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { | ||
|
|
||
| public enum RoutineLoadType { | ||
| public enum LoadSourceType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a LoadDataSourceTypeenum in package routineload. Using 'LoadDataSourceType' is better
Extract the common part of stream load utilities from stream_load.cpp, such as StreamLoadContext, StreamLoadExecutor, move it to
runtime/stream_load/Implement the routine load task executor, and can consuming data via kafka client. All implementations are in
runtime/routine_load/