From 34adf89e9d59ad43ddc91ad08dea821e1853d193 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 17 May 2021 13:19:16 +0800 Subject: [PATCH] first second thrid 4 5 6 self review doc2 fix alter fix bug fix ut fix docs fix doc --- be/src/runtime/routine_load/data_consumer.cpp | 43 +++ be/src/runtime/routine_load/data_consumer.h | 4 + .../routine_load_task_executor.cpp | 50 ++- .../routine_load/routine_load_task_executor.h | 5 + be/src/service/internal_service.cpp | 43 ++- .../load-data/routine-load-manual.md | 21 ++ .../Data Manipulation/ROUTINE LOAD.md | 25 ++ .../load-data/routine-load-manual.md | 20 + .../Data Manipulation/ROUTINE LOAD.md | 23 ++ fe/fe-core/src/main/cup/sql_parser.cup | 3 +- .../doris/analysis/AlterRoutineLoadStmt.java | 15 +- .../doris/analysis/CreateRoutineLoadStmt.java | 216 ++--------- .../RoutineLoadDataSourceProperties.java | 309 ++++++++++++++-- .../apache/doris/common/util/KafkaUtil.java | 74 ++++ .../apache/doris/common/util/TimeUtils.java | 16 +- .../load/routineload/KafkaRoutineLoadJob.java | 72 +++- .../load/routineload/RoutineLoadJob.java | 10 +- .../load/routineload/RoutineLoadManager.java | 2 +- .../analysis/AlterRoutineLoadStmtTest.java | 22 +- .../RoutineLoadDataSourcePropertiesTest.java | 347 ++++++++++++++++++ .../routineload/KafkaRoutineLoadJobTest.java | 20 +- .../AlterRoutineLoadOperationLogTest.java | 12 +- gensrc/proto/internal_service.proto | 11 + 23 files changed, 1086 insertions(+), 277 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 301ad246b0b1d6..ea712c9d496552 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -23,6 +23,7 @@ #include #include "common/status.h" +#include "gen_cpp/internal_service.pb.h" #include "gutil/strings/split.h" #include "runtime/small_file_mgr.h" #include "service/backend_options.h" @@ -293,6 +294,48 @@ Status KafkaDataConsumer::get_partition_meta(std::vector* partition_ids return Status::OK(); } +// get offsets of each partition for times. +// The input parameter "times" holds +// The output parameter "offsets" returns +// +// The returned offset for each partition is the earliest offset whose +// timestamp is greater than or equal to the given timestamp in the +// corresponding partition. +// See librdkafka/rdkafkacpp.h##offsetsForTimes() +Status KafkaDataConsumer::get_offsets_for_times(const std::vector& times, + std::vector* offsets) { + // create topic partition + std::vector topic_partitions; + for (const auto& entry : times) { + RdKafka::TopicPartition* tp1 = + RdKafka::TopicPartition::create(_topic, entry.key(), entry.val()); + topic_partitions.push_back(tp1); + } + // delete TopicPartition finally + Defer delete_tp{[&topic_partitions]() { + std::for_each(topic_partitions.begin(), topic_partitions.end(), + [](RdKafka::TopicPartition* tp1) { delete tp1; }); + }}; + + // get offsets for times + RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, 5000); + if (err != RdKafka::ERR_NO_ERROR) { + std::stringstream ss; + ss << "failed to get offsets for times: " << RdKafka::err2str(err); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + for (const auto& topic_partition : topic_partitions) { + PIntegerPair pair; + pair.set_key(topic_partition->partition()); + pair.set_val(topic_partition->offset()); + offsets->push_back(pair); + } + + return Status::OK(); +} + Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) { std::unique_lock l(_lock); if (!_init) { diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index f0c1c6eb811cc4..fcbd01774fa2f0 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -77,6 +77,7 @@ class DataConsumer { time_t _last_visit_time; }; +class PIntegerPair; class KafkaEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event& event) { @@ -141,6 +142,9 @@ class KafkaDataConsumer : public DataConsumer { // get the partitions ids of the topic Status get_partition_meta(std::vector* partition_ids); + // get offsets for times + Status get_offsets_for_times(const std::vector& times, + std::vector* offsets); private: std::string _brokers; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 8bcbfa999d9fc5..f8f9c71bbe8aea 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -63,15 +63,13 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() { _task_map.clear(); } -Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRequest& request, - std::vector* partition_ids) { - DCHECK(request.has_kafka_info()); - - // This context is meaningless, just for unifing the interface - StreamLoadContext ctx(_exec_env); - ctx.load_type = TLoadType::ROUTINE_LOAD; - ctx.load_src_type = TLoadSourceType::KAFKA; - ctx.label = "NaN"; +// Create a temp StreamLoadContext and set some kafka connection info in it. +// So that we can use this ctx to get kafka data consumer instance. +Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest& request, + StreamLoadContext* ctx) { + ctx->load_type = TLoadType::ROUTINE_LOAD; + ctx->load_src_type = TLoadSourceType::KAFKA; + ctx->label = "NaN"; // convert PKafkaInfo to TKafkaLoadInfo TKafkaLoadInfo t_info; @@ -84,8 +82,18 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe } t_info.__set_properties(std::move(properties)); - ctx.kafka_info.reset(new KafkaLoadInfo(t_info)); - ctx.need_rollback = false; + ctx->kafka_info.reset(new KafkaLoadInfo(t_info)); + ctx->need_rollback = false; + return Status::OK(); +} + +Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRequest& request, + std::vector* partition_ids) { + CHECK(request.has_kafka_info()); + + // This context is meaningless, just for unifing the interface + StreamLoadContext ctx(_exec_env); + RETURN_IF_ERROR(_prepare_ctx(request, &ctx)); std::shared_ptr consumer; RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer)); @@ -98,6 +106,26 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe return st; } +Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& request, + std::vector* partition_offsets) { + CHECK(request.has_kafka_info()); + + // This context is meaningless, just for unifing the interface + StreamLoadContext ctx(_exec_env); + RETURN_IF_ERROR(_prepare_ctx(request, &ctx)); + + std::shared_ptr consumer; + RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer)); + + Status st = std::static_pointer_cast(consumer)->get_offsets_for_times( + std::vector(request.offset_times().begin(), request.offset_times().end()), + partition_offsets); + if (st.ok()) { + _data_consumer_pool.return_consumer(consumer); + } + return st; +} + Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { std::unique_lock l(_lock); if (_task_map.find(task.id) != _task_map.end()) { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index ffc8b2c38cc693..e5c7939cc9ec31 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -52,6 +52,9 @@ class RoutineLoadTaskExecutor { Status get_kafka_partition_meta(const PKafkaMetaProxyRequest& request, std::vector* partition_ids); + Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& request, + std::vector* partition_offsets); + private: // execute the task void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, ExecFinishCallback cb); @@ -60,6 +63,8 @@ class RoutineLoadTaskExecutor { // for test only Status _execute_plan_for_test(StreamLoadContext* ctx); + // create a dummy StreamLoadContext for PKafkaMetaProxyRequest + Status _prepare_ctx(const PKafkaMetaProxyRequest& request, StreamLoadContext* ctx); private: ExecEnv* _exec_env; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 62e917f9c105fe..8023751c2cdc4a 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -187,19 +187,42 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controll const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { brpc::ClosureGuard closure_guard(done); + // PProxyRequest is defined in gensrc/proto/internal_service.proto + // Currently it supports 2 kinds of requests: + // 1. get all kafka partition ids for given topic + // 2. get all kafka partition offsets for given topic and timestamp. if (request->has_kafka_meta_request()) { - std::vector partition_ids; - Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( - request->kafka_meta_request(), &partition_ids); - if (st.ok()) { - PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); - for (int32_t id : partition_ids) { - kafka_result->add_partition_ids(id); + const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); + if (!kafka_request.offset_times().empty()) { + // if offset_times() has elements, which means this request is to get offset by timestamp. + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } + st.to_protobuf(response->mutable_status()); + return; + } else { + // get partition ids of topic + std::vector partition_ids; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( + request->kafka_meta_request(), &partition_ids); + if (st.ok()) { + PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); + for (int32_t id : partition_ids) { + kafka_result->add_partition_ids(id); + } + } + st.to_protobuf(response->mutable_status()); + return; } - st.to_protobuf(response->mutable_status()); - return; - } + } Status::OK().to_protobuf(response->mutable_status()); } diff --git a/docs/en/administrator-guide/load-data/routine-load-manual.md b/docs/en/administrator-guide/load-data/routine-load-manual.md index 592ebe51d62436..ab299015af072b 100644 --- a/docs/en/administrator-guide/load-data/routine-load-manual.md +++ b/docs/en/administrator-guide/load-data/routine-load-manual.md @@ -273,11 +273,32 @@ The user can control the stop, pause and restart of the job by the three command * If the broker of the user kafka cluster has `auto.create.topics.enable = false` set, topic will not be created automatically, and the routine will be paused before any data is read, with the status `PAUSED`. So, if the user wants to be automatically created by the routine when the kafka topic does not exist, just set the broker in the kafka cluster** of the user's side to set auto.create.topics.enable = true` . + 5. Problems that may occur in the some environment In some environments, there are isolation measures for network segment and domain name resolution. So should pay attention to: 1. The broker list specified in the routine load task must be accessible on the doris environment. 2. If `advertised.listeners` is configured in kafka, The addresses in `advertised.listeners` need to be accessible on the doris environment. +6. About specified Partition and Offset + + Doris supports specifying Partition and Offset to start consumption. The new version also supports the consumption function at a specified time point. The configuration relationship of the corresponding parameters is explained here. + + There are three relevant parameters: + + * `kafka_partitions`: Specify the list of partitions to be consumed, such as: "0, 1, 2, 3". + * `kafka_offsets`: Specify the starting offset of each partition, which must correspond to the number of `kafka_partitions` lists. Such as: "1000, 1000, 2000, 2000" + * `property.kafka_default_offset`: Specify the default starting offset of the partition. + + When creating an routine load job, these three parameters can have the following combinations: + + | Combinations | `kafka_partitions` | `kafka_offsets` | `property.kafka_default_offset` | Behavior | + |---|---|---|---|---| + |1| No | No | No | The system will automatically find all the partitions corresponding to the topic and start consumption from OFFSET_END | + |2| No | No | Yes | The system will automatically find all the partitions corresponding to the topic and start consumption from the position specified by the default offset | + |3| Yes | No | No | The system will start consumption from the OFFSET_END of the specified partition | + |4| Yes | Yes | No | The system will start consumption from the specified offset of the specified partition | + |5| Yes | No | Yes | The system will start consumption from the specified partition and the location specified by the default offset | + ## Related parameters Some system configuration parameters can affect the use of routine loads. diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index ea6487f68db6ae..ee0e775ec34d84 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -243,6 +243,9 @@ FROM data_source 2) OFFSET_END: Subscribe from the end. + 3) Timestamp, the format must be like: "2021-05-11 10:00:00", the system will automatically locate the offset of the first message greater than or equal to the timestamp. + Note that the offset of the timestamp format cannot be mixed with the number type, only one of them can be selected. + If not specified, all partitions under topic are subscribed by default fromSET_END. Example: @@ -250,6 +253,9 @@ FROM data_source ``` "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" + + "kafka_partitions" = "0,1", + "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00" ``` 4. property @@ -305,9 +311,12 @@ FROM data_source 2) OFFSET_END: Subscribe from the end. + 3) Timestamp, the format is the same as kafka_offsets + Example: `"property.kafka_default_offsets" = "OFFSET_BEGINNING"` + `"property.kafka_default_offsets" = "2021-05-11 10:00:00"` 8. load data format sample @@ -552,6 +561,22 @@ FROM data_source "kafka_offsets" = "101,0,0,200" ); + 9. Start consumption from the specified point in time + + CREATE ROUTINE LOAD example_db.test_job ON example_tbl + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "30", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "property.kafka_default_offsets" = "2021-10-10 11:00:00" + ); + ## keyword CREATE, ROUTINE, LOAD diff --git a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md index 0a2801210126cd..976446b1cfca57 100644 --- a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md +++ b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md @@ -277,6 +277,26 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或 1. 创建Routine load 任务中指定的 Broker list 必须能够被Doris服务访问 2. Kafka 中如果配置了`advertised.listeners`, `advertised.listeners` 中的地址必须能够被Doris服务访问 +6. 关于指定消费的 Partition 和 Offset + + Doris 支持指定 Partition 和 Offset 开始消费。新版中还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。 + + 有三个相关参数: + + * `kafka_partitions`:指定待消费的 partition 列表,如:"0, 1, 2, 3"。 + * `kafka_offsets`:指定每个分区的起始offset,必须和 `kafka_partitions` 列表个数对应。如:"1000, 1000, 2000, 2000" + * `property.kafka_default_offset`:指定分区默认的起始offset。 + + 在创建导入作业时,这三个参数可以有以下组合: + + | 组合 | `kafka_partitions` | `kafka_offsets` | `property.kafka_default_offset` | 行为 | + |---|---|---|---|---| + |1| No | No | No | 系统会自动查找topic对应的所有分区并从 OFFSET_END 开始消费 | + |2| No | No | Yes | 系统会自动查找topic对应的所有分区并从 default offset 指定的位置开始消费| + |3| Yes | No | No | 系统会从指定分区的 OFFSET_END 开始消费 | + |4| Yes | Yes | No | 系统会从指定分区的指定offset 处开始消费 | + |5| Yes | No | Yes | 系统会从指定分区,default offset 指定的位置开始消费 | + ## 相关参数 一些系统配置参数会影响例行导入的使用。 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 65fd54cb9674ec..4200ad4b04bdc8 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -220,6 +220,7 @@ under the License. offset 可以指定从大于等于 0 的具体 offset,或者: 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 2) OFFSET_END: 从末尾开始订阅。 + 3) 时间戳,格式必须如:"2021-05-11 10:00:00",系统会自动定位到大于等于该时间戳的第一个消息的offset。注意,时间戳格式的offset不能和数字类型混用,只能选其一。 如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。 示例: @@ -227,6 +228,9 @@ under the License. "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" + "kafka_partitions" = "0,1", + "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00" + 4. property 指定自定义kafka参数。 @@ -264,8 +268,11 @@ under the License. 值为 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 2) OFFSET_END: 从末尾开始订阅。 + 3) 时间戳,格式同 kafka_offsets + 示例: "property.kafka_default_offsets" = "OFFSET_BEGINNING" + "property.kafka_default_offsets" = "2021-05-11 10:00:00" 8. 导入数据格式样例 @@ -507,6 +514,22 @@ under the License. "kafka_offsets" = "101,0,0,200" ); + 9. 从指定的时间点开始消费 + + CREATE ROUTINE LOAD example_db.test_job ON example_tbl + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "30", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "property.kafka_default_offsets" = "2021-10-10 11:00:00" + ); + ## keyword CREATE,ROUTINE,LOAD diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index ea6c6d63893089..02aa5ed7663b28 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -803,7 +803,8 @@ opt_datasource_properties ::= :} | KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN {: - RESULT = new RoutineLoadDataSourceProperties(type, customProperties); + // the 3rd parameter "true" means this is for AlterRoutineLoad operation. + RESULT = new RoutineLoadDataSourceProperties(type, customProperties, true); :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 47fb3806ab2302..0bd3f6f438d966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -17,11 +17,14 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.load.routineload.RoutineLoadJob; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -42,7 +45,7 @@ public class AlterRoutineLoadStmt extends DdlStmt { private static final String NAME_TYPE = "ROUTINE LOAD NAME"; - private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + private static final ImmutableSet CONFIGURABLE_JOB_PROPERTIES_SET = new ImmutableSet.Builder() .add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY) .add(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY) .add(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY) @@ -110,7 +113,7 @@ public void analyze(Analyzer analyzer) throws UserException { private void checkJobProperties() throws UserException { Optional optional = jobProperties.keySet().stream().filter( - entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity)).findFirst(); + entity -> !CONFIGURABLE_JOB_PROPERTIES_SET.contains(entity)).findFirst(); if (optional.isPresent()) { throw new AnalysisException(optional.get() + " is invalid property"); } @@ -194,7 +197,13 @@ private void checkJobProperties() throws UserException { } } - private void checkDataSourceProperties() throws AnalysisException { + private void checkDataSourceProperties() throws UserException { + if (!FeConstants.runningUnitTest) { + RoutineLoadJob job = Catalog.getCurrentCatalog().getRoutineLoadManager().checkPrivAndGetJob(getDbName(), getLabel()); + dataSourceProperties.setTimezone(job.getTimezone()); + } else { + dataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + } dataSourceProperties.analyze(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index a828b1a2369141..ffd782a5975357 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -31,21 +31,20 @@ import org.apache.doris.common.util.Util; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.loadv2.LoadTask; -import org.apache.doris.load.routineload.KafkaProgress; -import org.apache.doris.load.routineload.LoadDataSourceType; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Predicate; -import java.util.regex.Pattern; /* Create routine Load statement, continually load data from a streaming app @@ -87,6 +86,8 @@ KAFKA */ public class CreateRoutineLoadStmt extends DdlStmt { + private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmt.class); + // routine load properties public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number"; // max error number in ten thousand records @@ -111,9 +112,10 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets"; public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets"; - + public static final String KAFKA_ORIGIN_DEFAULT_OFFSETS = "kafka_origin_default_offsets"; + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; - private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; + public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) @@ -132,19 +134,12 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(EXEC_MEM_LIMIT_PROPERTY) .build(); - private static final ImmutableSet KAFKA_PROPERTIES_SET = new ImmutableSet.Builder() - .add(KAFKA_BROKER_LIST_PROPERTY) - .add(KAFKA_TOPIC_PROPERTY) - .add(KAFKA_PARTITIONS_PROPERTY) - .add(KAFKA_OFFSETS_PROPERTY) - .build(); - private final LabelName labelName; private final String tableName; private final List loadPropertyList; private final Map jobProperties; private final String typeName; - private final Map dataSourceProperties; + private final RoutineLoadDataSourceProperties dataSourceProperties; // the following variables will be initialized after analyze // -1 as unset, the default value will set in RoutineLoadJob @@ -165,29 +160,21 @@ public class CreateRoutineLoadStmt extends DdlStmt { * 1) dataFormat = "json" * 2) jsonPaths = "$.XXX.xxx" */ - private String format = ""; //default is csv. - private String jsonPaths = ""; - private String jsonRoot = ""; // MUST be a jsonpath string + private String format = ""; //default is csv. + private String jsonPaths = ""; + private String jsonRoot = ""; // MUST be a jsonpath string private boolean stripOuterArray = false; private boolean numAsString = false; private boolean fuzzyParse = false; - // kafka related properties - private String kafkaBrokerList; - private String kafkaTopic; - // pair - private List> kafkaPartitionOffsets = Lists.newArrayList(); - - // custom kafka property map - private Map customKafkaProperties = Maps.newHashMap(); private LoadTask.MergeType mergeType; - public static final Predicate DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; }; - public static final Predicate MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; }; - public static final Predicate MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; }; - public static final Predicate MAX_BATCH_ROWS_PRED = (v) -> { return v >= 200000; }; - public static final Predicate MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; }; - public static final Predicate EXEC_MEM_LIMIT_PRED = (v) -> { return v >= 0L; }; + public static final Predicate DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L; + public static final Predicate MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L; + public static final Predicate MAX_BATCH_INTERVAL_PRED = (v) -> v >= 5 && v <= 60; + public static final Predicate MAX_BATCH_ROWS_PRED = (v) -> v >= 200000; + public static final Predicate MAX_BATCH_SIZE_PRED = (v) -> v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; + public static final Predicate EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L; public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, Map jobProperties, String typeName, @@ -197,7 +184,7 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List> getKafkaPartitionOffsets() { - return kafkaPartitionOffsets; + return this.dataSourceProperties.getKafkaPartitionOffsets(); } public Map getCustomKafkaProperties() { - return customKafkaProperties; + return this.dataSourceProperties.getCustomKafkaProperties(); } public LoadTask.MergeType getMergeType() { return mergeType; } + public boolean isOffsetsForTimes() { + return this.dataSourceProperties.isOffsetsForTimes(); + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); @@ -467,155 +458,8 @@ private void checkJobProperties() throws UserException { } } - private void checkDataSourceProperties() throws AnalysisException { - LoadDataSourceType type; - try { - type = LoadDataSourceType.valueOf(typeName); - } catch (IllegalArgumentException e) { - throw new AnalysisException("routine load job does not support this type " + typeName); - } - switch (type) { - case KAFKA: - checkKafkaProperties(); - break; - default: - break; - } - } - - private void checkKafkaProperties() throws AnalysisException { - Optional optional = dataSourceProperties.keySet().stream() - .filter(entity -> !KAFKA_PROPERTIES_SET.contains(entity)) - .filter(entity -> !entity.startsWith("property.")).findFirst(); - if (optional.isPresent()) { - throw new AnalysisException(optional.get() + " is invalid kafka custom property"); - } - - // check broker list - kafkaBrokerList = Strings.nullToEmpty(dataSourceProperties.get(KAFKA_BROKER_LIST_PROPERTY)).replaceAll(" ", ""); - if (Strings.isNullOrEmpty(kafkaBrokerList)) { - throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + " is a required property"); - } - String[] kafkaBrokerList = this.kafkaBrokerList.split(","); - for (String broker : kafkaBrokerList) { - if (!Pattern.matches(ENDPOINT_REGEX, broker)) { - throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + ":" + broker - + " not match pattern " + ENDPOINT_REGEX); - } - } - - // check topic - kafkaTopic = Strings.nullToEmpty(dataSourceProperties.get(KAFKA_TOPIC_PROPERTY)).replaceAll(" ", ""); - if (Strings.isNullOrEmpty(kafkaTopic)) { - throw new AnalysisException(KAFKA_TOPIC_PROPERTY + " is a required property"); - } - - // check partitions - String kafkaPartitionsString = dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY); - if (kafkaPartitionsString != null) { - analyzeKafkaPartitionProperty(kafkaPartitionsString, this.kafkaPartitionOffsets); - } - - // check offset - String kafkaOffsetsString = dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY); - if (kafkaOffsetsString != null) { - analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets); - } - - // check custom kafka property - analyzeCustomProperties(this.dataSourceProperties, this.customKafkaProperties); - } - - public static void analyzeKafkaPartitionProperty(String kafkaPartitionsString, - List> kafkaPartitionOffsets) throws AnalysisException { - kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", ""); - if (kafkaPartitionsString.isEmpty()) { - throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could not be a empty string"); - } - String[] kafkaPartitionsStringList = kafkaPartitionsString.split(","); - for (String s : kafkaPartitionsStringList) { - try { - kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, KAFKA_PARTITIONS_PROPERTY), - KafkaProgress.OFFSET_END_VAL)); - } catch (AnalysisException e) { - throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY - + " must be a number string with comma-separated"); - } - } - } - - public static void analyzeKafkaOffsetProperty(String kafkaOffsetsString, - List> kafkaPartitionOffsets) throws AnalysisException { - kafkaOffsetsString = kafkaOffsetsString.replaceAll(" ", ""); - if (kafkaOffsetsString.isEmpty()) { - throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not be a empty string"); - } - String[] kafkaOffsetsStringList = kafkaOffsetsString.split(","); - if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) { - throw new AnalysisException("Partitions number should be equals to offsets number"); - } - - for (int i = 0; i < kafkaOffsetsStringList.length; i++) { - // defined in librdkafka/rdkafkacpp.h - // OFFSET_BEGINNING: -2 - // OFFSET_END: -1 - try { - kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i], - KAFKA_OFFSETS_PROPERTY); - if (kafkaPartitionOffsets.get(i).second < 0) { - throw new AnalysisException("Can not specify offset smaller than 0"); - } - } catch (AnalysisException e) { - if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { - kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL; - } else if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) { - kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL; - } else { - throw e; - } - } - } - } - - public static void analyzeCustomProperties(Map dataSourceProperties, - Map customKafkaProperties) throws AnalysisException { - for (Map.Entry dataSourceProperty : dataSourceProperties.entrySet()) { - if (dataSourceProperty.getKey().startsWith("property.")) { - String propertyKey = dataSourceProperty.getKey(); - String propertyValue = dataSourceProperty.getValue(); - String propertyValueArr[] = propertyKey.split("\\."); - if (propertyValueArr.length < 2) { - throw new AnalysisException("kafka property value could not be a empty string"); - } - customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), propertyValue); - } - // can be extended in the future which other prefix - } - } - - private static int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException { - if (valueString.isEmpty()) { - throw new AnalysisException(propertyName + " could not be a empty string"); - } - int value; - try { - value = Integer.valueOf(valueString); - } catch (NumberFormatException e) { - throw new AnalysisException(propertyName + " must be a integer"); - } - return value; - } - - private static long getLongValueFromString(String valueString, String propertyName) throws AnalysisException { - if (valueString.isEmpty()) { - throw new AnalysisException(propertyName + " could not be a empty string"); - } - long value; - try { - value = Long.valueOf(valueString); - } catch (NumberFormatException e) { - throw new AnalysisException(propertyName + " must be a integer: " + valueString); - } - return value; + private void checkDataSourceProperties() throws UserException { + this.dataSourceProperties.setTimezone(this.timezone); + this.dataSourceProperties.analyze(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java index 1743b2be32f111..ca6f8003921386 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java @@ -19,42 +19,78 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.routineload.KafkaProgress; import org.apache.doris.load.routineload.LoadDataSourceType; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.math.NumberUtils; + import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.TimeZone; +import java.util.regex.Pattern; public class RoutineLoadDataSourceProperties { - private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + + private static final ImmutableSet DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder() + .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY) + .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY) .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY) .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY) + .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS) .build(); - - @SerializedName(value = "type") - private String type = "KAFKA"; + + private static final ImmutableSet CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder() + .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY) + .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY) + .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS) + .build(); + // origin properties, no need to persist private Map properties = Maps.newHashMap(); + private boolean isAlter = false; + + @SerializedName(value = "type") + private String type = "KAFKA"; @SerializedName(value = "kafkaPartitionOffsets") private List> kafkaPartitionOffsets = Lists.newArrayList(); @SerializedName(value = "customKafkaProperties") private Map customKafkaProperties = Maps.newHashMap(); + @SerializedName(value = "isOffsetsForTimes") + private boolean isOffsetsForTimes = false; + @SerializedName(value = "kafkaBrokerList") + private String kafkaBrokerList; + @SerializedName(value = "KafkaTopic") + private String kafkaTopic; + @SerializedName(value = "timezone") + private String timezone; public RoutineLoadDataSourceProperties() { - // empty + // for unit test, and empty data source properties when altering routine load + this.isAlter = true; } - public RoutineLoadDataSourceProperties(String type, Map properties) { + public RoutineLoadDataSourceProperties(String type, Map properties, boolean isAlter) { this.type = type.toUpperCase(); this.properties = properties; + this.isAlter = isAlter; } - public void analyze() throws AnalysisException { + public void analyze() throws UserException { + if (properties.isEmpty()) { + throw new AnalysisException("No properties"); + } + Preconditions.checkState(!Strings.isNullOrEmpty(timezone), "timezone must be set before analyzing"); checkDataSourceProperties(); } @@ -70,11 +106,31 @@ public List> getKafkaPartitionOffsets() { return kafkaPartitionOffsets; } + public void setKafkaPartitionOffsets(List> kafkaPartitionOffsets) { + this.kafkaPartitionOffsets = kafkaPartitionOffsets; + } + public Map getCustomKafkaProperties() { return customKafkaProperties; } - private void checkDataSourceProperties() throws AnalysisException { + public void setTimezone(String timezone) { + this.timezone = timezone; + } + + public String getKafkaBrokerList() { + return kafkaBrokerList; + } + + public String getKafkaTopic() { + return kafkaTopic; + } + + public boolean isOffsetsForTimes() { + return isOffsetsForTimes; + } + + private void checkDataSourceProperties() throws UserException { LoadDataSourceType sourceType; try { sourceType = LoadDataSourceType.valueOf(type); @@ -90,37 +146,242 @@ private void checkDataSourceProperties() throws AnalysisException { } } - private void checkKafkaProperties() throws AnalysisException { + /* + * Kafka properties includes follows: + * 1. broker list + * 2. topic + * 3. partition offset info + * 4. other properties start with "property." + */ + private void checkKafkaProperties() throws UserException { + ImmutableSet propertySet = isAlter ? CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET : DATA_SOURCE_PROPERTIES_SET; Optional optional = properties.keySet().stream().filter( - entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity)).filter( - entity -> !entity.startsWith("property.")).findFirst(); + entity -> !propertySet.contains(entity)).filter( + entity -> !entity.startsWith("property.")).findFirst(); if (optional.isPresent()) { - throw new AnalysisException(optional.get() + " is invalid kafka custom property"); + throw new AnalysisException(optional.get() + " is invalid kafka property or can not be set"); + } + + // check broker list + kafkaBrokerList = Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)).replaceAll(" ", ""); + if (!isAlter && Strings.isNullOrEmpty(kafkaBrokerList)) { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + " is a required property"); + } + if (!Strings.isNullOrEmpty(kafkaBrokerList)) { + String[] kafkaBrokerList = this.kafkaBrokerList.split(","); + for (String broker : kafkaBrokerList) { + if (!Pattern.matches(CreateRoutineLoadStmt.ENDPOINT_REGEX, broker)) { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + ":" + broker + + " not match pattern " + CreateRoutineLoadStmt.ENDPOINT_REGEX); + } + } + } + + // check topic + kafkaTopic = Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)).replaceAll(" ", ""); + if (!isAlter && Strings.isNullOrEmpty(kafkaTopic)) { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY + " is a required property"); } + // check custom kafka property + // This should be done before check partition and offsets, because we need KAFKA_DEFAULT_OFFSETS, + // which is in custom properties. + analyzeCustomProperties(this.properties, this.customKafkaProperties); + + // The partition offset properties are all optional, + // and there are 5 valid cases for specifying partition offsets: + // A. partition, offset and default offset are not set + // Doris will set default offset to OFFSET_END + // B. partition and offset are set, default offset is not set + // fill the "kafkaPartitionOffsets" with partition and offset + // C. partition and default offset are set, offset is not set + // fill the "kafkaPartitionOffsets" with partition and default offset + // D. partition is set, offset and default offset are not set + // this is only valid when doing create routine load operation, + // fill the "kafkaPartitionOffsets" with partition and OFFSET_END + // E. only default offset is set. + // this is only valid when doing alter routine load operation. + // Other cases are illegal. + // check partitions - final String kafkaPartitionsString = properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY); + String kafkaPartitionsString = properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY); if (kafkaPartitionsString != null) { + analyzeKafkaPartitionProperty(kafkaPartitionsString, this.kafkaPartitionOffsets); + } - if (!properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) { - throw new AnalysisException("Partition and offset must be specified at the same time"); + // check offset + String kafkaOffsetsString = properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY); + String kafkaDefaultOffsetString = customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); + if (kafkaOffsetsString != null && kafkaDefaultOffsetString != null) { + throw new AnalysisException("Only one of " + CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + + " and " + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " can be set."); + } + if (isAlter && kafkaPartitionsString != null && kafkaOffsetsString == null && kafkaDefaultOffsetString == null) { + // if this is an alter operation, the partition and (default)offset must be set together. + throw new AnalysisException("Must set offset or default offset with partition property"); + } + + if (kafkaOffsetsString != null) { + this.isOffsetsForTimes = analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets, this.timezone); + } else { + // offset is not set, check default offset. + this.isOffsetsForTimes = analyzeKafkaDefaultOffsetProperty(this.customKafkaProperties, this.timezone); + if (!this.kafkaPartitionOffsets.isEmpty()) { + // Case C + kafkaDefaultOffsetString = customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); + setDefaultOffsetForPartition(this.kafkaPartitionOffsets, kafkaDefaultOffsetString, this.isOffsetsForTimes); } + } + } - CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString, kafkaPartitionOffsets); + private static void setDefaultOffsetForPartition(List> kafkaPartitionOffsets, + String kafkaDefaultOffsetString, boolean isOffsetsForTimes) { + if (isOffsetsForTimes) { + for (Pair pair : kafkaPartitionOffsets) { + pair.second = Long.valueOf(kafkaDefaultOffsetString); + } } else { - if (properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) { - throw new AnalysisException("Missing kafka partition info"); + for (Pair pair : kafkaPartitionOffsets) { + if (kafkaDefaultOffsetString.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + pair.second = KafkaProgress.OFFSET_BEGINNING_VAL; + } else { + pair.second = KafkaProgress.OFFSET_END_VAL; + } } } + } - // check offset - String kafkaOffsetsString = properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY); - if (kafkaOffsetsString != null) { - CreateRoutineLoadStmt.analyzeKafkaOffsetProperty(kafkaOffsetsString, kafkaPartitionOffsets); + // If the default offset is not set, set the default offset to OFFSET_END. + // If the offset is in datetime format, convert it to a timestamp, and also save the origin datatime formatted offset + // in "customKafkaProperties" + // return true if the offset is in datetime format. + private static boolean analyzeKafkaDefaultOffsetProperty(Map customKafkaProperties, String timeZoneStr) + throws AnalysisException { + customKafkaProperties.putIfAbsent(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, KafkaProgress.OFFSET_END); + String defaultOffsetStr = customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); + TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr); + long defaultOffset = TimeUtils.timeStringToLong(defaultOffsetStr, timeZone); + if (defaultOffset != -1) { + // this is a datetime format offset + customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, String.valueOf(defaultOffset)); + // we convert datetime to timestamp, and save the origin datetime formatted offset for further use. + customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS, defaultOffsetStr); + return true; + } else { + if (!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING) && !defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " can only be set to OFFSET_BEGINNING, OFFSET_END or date time"); + } + return false; + } + } + + // init "kafkaPartitionOffsets" with partition property. + // The offset will be set to OFFSET_END for now, and will be changed in later analysis process. + private static void analyzeKafkaPartitionProperty(String kafkaPartitionsString, + List> kafkaPartitionOffsets) throws AnalysisException { + kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", ""); + if (kafkaPartitionsString.isEmpty()) { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY + " could not be a empty string"); + } + String[] kafkaPartitionsStringList = kafkaPartitionsString.split(","); + for (String s : kafkaPartitionsStringList) { + try { + kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY), + KafkaProgress.OFFSET_END_VAL)); + } catch (AnalysisException e) { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY + + " must be a number string with comma-separated"); + } + } + } + + // Fill the partition's offset with given kafkaOffsetsString, + // Return true if offset is specified by timestamp. + private static boolean analyzeKafkaOffsetProperty(String kafkaOffsetsString, List> kafkaPartitionOffsets, + String timeZoneStr) + throws UserException { + if (Strings.isNullOrEmpty(kafkaOffsetsString)) { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " could not be a empty string"); + } + List kafkaOffsetsStringList = Splitter.on(",").trimResults().splitToList(kafkaOffsetsString); + if (kafkaOffsetsStringList.size() != kafkaPartitionOffsets.size()) { + throw new AnalysisException("Partitions number should be equals to offsets number"); } - // check custom properties - CreateRoutineLoadStmt.analyzeCustomProperties(properties, customKafkaProperties); + // We support two ways to specify the offset, + // one is to specify the offset directly, the other is to specify a timestamp. + // Doris will get the offset of the corresponding partition through the timestamp. + // The user can only choose one of these methods. + boolean foundTime = false; + boolean foundOffset = false; + for (String kafkaOffsetsStr : kafkaOffsetsStringList) { + if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) { + foundTime = true; + } else { + foundOffset = true; + } + } + if (foundTime && foundOffset) { + throw new AnalysisException("The offset of the partition cannot be specified by the timestamp " + + "and the offset at the same time"); + } + + if (foundTime) { + // convert all datetime strs to timestamps + // and set them as the partition's offset. + // These timestamps will be converted to real offset when job is running. + TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr); + for (int i = 0; i < kafkaOffsetsStringList.size(); i++) { + String kafkaOffsetsStr = kafkaOffsetsStringList.get(i); + long timestamp = TimeUtils.timeStringToLong(kafkaOffsetsStr, timeZone); + Preconditions.checkState(timestamp != -1); + kafkaPartitionOffsets.get(i).second = timestamp; + } + } else { + for (int i = 0; i < kafkaOffsetsStringList.size(); i++) { + String kafkaOffsetsStr = kafkaOffsetsStringList.get(i); + if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL; + } else if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL; + } else if (NumberUtils.isDigits(kafkaOffsetsStr)) { + kafkaPartitionOffsets.get(i).second = Long.valueOf(NumberUtils.toLong(kafkaOffsetsStr)); + } else { + throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " must be an integer or a date time"); + } + } + } + + return foundTime; + } + + private static void analyzeCustomProperties(Map dataSourceProperties, + Map customKafkaProperties) throws AnalysisException { + for (Map.Entry dataSourceProperty : dataSourceProperties.entrySet()) { + if (dataSourceProperty.getKey().startsWith("property.")) { + String propertyKey = dataSourceProperty.getKey(); + String propertyValue = dataSourceProperty.getValue(); + String propertyValueArr[] = propertyKey.split("\\."); + if (propertyValueArr.length < 2) { + throw new AnalysisException("kafka property value could not be a empty string"); + } + customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), propertyValue); + } + // can be extended in the future which other prefix + } + } + + private static int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException { + if (valueString.isEmpty()) { + throw new AnalysisException(propertyName + " could not be a empty string"); + } + int value; + try { + value = Integer.valueOf(valueString); + } catch (NumberFormatException e) { + throw new AnalysisException(propertyName + " must be a integer"); + } + return value; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 4ecb520c490b93..6691af6e933aae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.common.ClientPool; import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.rpc.BackendServiceProxy; @@ -28,6 +29,8 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; +import com.google.common.collect.Lists; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -93,5 +96,76 @@ public static List getAllKafkaPartitions(String brokerList, String topi } } } + + // Get offsets by times. + // The input parameter "timestampOffsets" is + // Tne return value is + public static List> getOffsetsForTimes(String brokerList, String topic, + Map convertedCustomProperties, + List> timestampOffsets) throws LoadException { + BackendService.Client client = null; + TNetworkAddress address = null; + LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets); + boolean ok = false; + try { + List backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true); + if (backendIds.isEmpty()) { + throw new LoadException("Failed to get offset for times. No alive backends"); + } + Collections.shuffle(backendIds); + Backend be = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + + // create request + InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = + InternalService.PKafkaMetaProxyRequest.newBuilder() + .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() + .setBrokers(brokerList) + .setTopic(topic) + .addAllProperties( + convertedCustomProperties.entrySet().stream().map( + e -> InternalService.PStringPair.newBuilder() + .setKey(e.getKey()) + .setVal(e.getValue()) + .build() + ).collect(Collectors.toList()) + ) + ); + for (Pair pair : timestampOffsets) { + metaRequestBuilder.addOffsetTimes(InternalService.PIntegerPair.newBuilder().setKey(pair.first) + .setVal(pair.second).build()); + } + + InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( + metaRequestBuilder).build(); + + // get info + Future future = BackendServiceProxy.getInstance().getInfo(address, request); + InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList()); + } else { + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.create(pair.getKey(), pair.getVal())); + } + LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets); + return partitionOffsets; + } + } catch (Exception e) { + LOG.warn("failed to get offsets for times.", e); + throw new LoadException( + "Failed to get offsets for times of kafka topic: " + topic + ". error: " + e.getMessage()); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index 85901be163644a..4feded8076d482 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -233,10 +233,6 @@ public static long dateTransform(long time, PrimitiveType type) { } } - public static long dateTransform(long time, Type type) { - return dateTransform(time, type.getPrimitiveType()); - } - public static long timeStringToLong(String timeStr) { Date d; try { @@ -247,6 +243,18 @@ public static long timeStringToLong(String timeStr) { return d.getTime(); } + public static long timeStringToLong(String timeStr, TimeZone timeZone) { + SimpleDateFormat dateFormatTimeZone = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateFormatTimeZone.setTimeZone(timeZone); + Date d; + try { + d = dateFormatTimeZone.parse(timeStr); + } catch (ParseException e) { + return -1; + } + return d.getTime(); + } + // Check if the time zone_value is valid public static String checkTimeZoneValidAndStandardize(String value) throws DdlException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 856ae0d23c5845..e66751572f4df2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -40,14 +40,12 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.SmallFileMgr.SmallFile; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.system.SystemInfoService; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -55,6 +53,9 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -62,6 +63,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.TimeZone; import java.util.UUID; /** @@ -80,6 +82,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // current kafka partitions is the actually partition which will be fetched private List currentKafkaPartitions = Lists.newArrayList(); // optional, user want to set default offset when new partition add or offset not set. + // kafkaDefaultOffSet has two formats, one is the time format, eg: "2021-10-10 11:00:00", + // the other is string value, including OFFSET_END and OFFSET_BEGINNING. + // We should check it by calling isOffsetForTimes() method before use it. private String kafkaDefaultOffSet = ""; // kafka properties ,property prefix will be mapped to kafka custom parameters, which can be extended in the future private Map customProperties = Maps.newHashMap(); @@ -110,6 +115,16 @@ public Map getConvertedCustomProperties() { return convertedCustomProperties; } + private boolean isOffsetForTimes() { + long offset = TimeUtils.timeStringToLong(this.kafkaDefaultOffSet); + return offset != -1; + } + + private long convertedDefaultOffsetToTimestamp() { + TimeZone timeZone = TimeUtils.getOrSystemTimeZone(getTimezone()); + return TimeUtils.timeStringToLong(this.kafkaDefaultOffSet, timeZone); + } + @Override public void prepare() throws UserException { super.prepare(); @@ -142,7 +157,14 @@ private void convertCustomProperties(boolean rebuild) throws DdlException { convertedCustomProperties.put(entry.getKey(), entry.getValue()); } } - if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)) { + + // This is mainly for compatibility. In the previous version, we directly obtained the value of the + // KAFKA_DEFAULT_OFFSETS attribute. In the new version, we support date time as the value of KAFKA_DEFAULT_OFFSETS, + // and this attribute will be converted into a timestamp during the analyzing phase, thus losing some information. + // So we use KAFKA_ORIGIN_DEFAULT_OFFSETS to store the original datetime formatted KAFKA_DEFAULT_OFFSETS value + if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)) { + kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS); + } else if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)) { kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); } } @@ -254,7 +276,7 @@ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoad } @Override - protected void unprotectUpdateProgress() { + protected void unprotectUpdateProgress() throws UserException { updateNewPartitionProgress(); } @@ -389,14 +411,21 @@ private void checkCustomProperties() throws DdlException { } } - private void updateNewPartitionProgress() { + private void updateNewPartitionProgress() throws LoadException { // update the progress of new partitions for (Integer kafkaPartition : currentKafkaPartitions) { if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { // if offset is not assigned, start from OFFSET_END long beginOffSet = KafkaProgress.OFFSET_END_VAL; if (!kafkaDefaultOffSet.isEmpty()) { - if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + if (isOffsetForTimes()) { + // get offset by time + List> offsets = Lists.newArrayList(); + offsets.add(Pair.create(kafkaPartition, convertedDefaultOffsetToTimestamp())); + offsets = KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, convertedCustomProperties, offsets); + Preconditions.checkState(offsets.size() == 1); + beginOffSet = offsets.get(0).second; + } else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL; } else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { beginOffSet = KafkaProgress.OFFSET_END_VAL; @@ -420,7 +449,7 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { super.setOptional(stmt); if (!stmt.getKafkaPartitionOffsets().isEmpty()) { - setCustomKafkaPartitions(stmt.getKafkaPartitionOffsets()); + setCustomKafkaPartitions(stmt); } if (!stmt.getCustomKafkaProperties().isEmpty()) { setCustomKafkaProperties(stmt.getCustomKafkaProperties()); @@ -428,7 +457,15 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { } // this is a unprotected method which is called in the initialization function - private void setCustomKafkaPartitions(List> kafkaPartitionOffsets) throws LoadException { + private void setCustomKafkaPartitions(CreateRoutineLoadStmt stmt) throws LoadException { + List> kafkaPartitionOffsets = stmt.getKafkaPartitionOffsets(); + boolean isForTimes = stmt.isOffsetsForTimes(); + if (isForTimes) { + // the offset is set by date time, we need to get the real offset by time + kafkaPartitionOffsets = KafkaUtil.getOffsetsForTimes(stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), + convertedCustomProperties, stmt.getKafkaPartitionOffsets()); + } + for (Pair partitionOffset : kafkaPartitionOffsets) { this.customKafkaPartitions.add(partitionOffset.first); ((KafkaProgress) progress).addPartitionOffset(partitionOffset); @@ -497,9 +534,13 @@ public void readFields(DataInput in) throws IOException { } @Override - public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException { + public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException { Map jobProperties = stmt.getAnalyzedJobProperties(); RoutineLoadDataSourceProperties dataSourceProperties = stmt.getDataSourceProperties(); + if (dataSourceProperties.isOffsetsForTimes()) { + // if the partition offset is set by timestamp, convert it to real offset + convertTimestampToOffset(dataSourceProperties); + } writeLock(); try { @@ -507,6 +548,7 @@ public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException { throw new DdlException("Only supports modification of PAUSED jobs"); } + modifyPropertiesInternal(jobProperties, dataSourceProperties); AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, @@ -517,6 +559,16 @@ public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException { } } + private void convertTimestampToOffset(RoutineLoadDataSourceProperties dataSourceProperties) throws UserException { + List> partitionOffsets = dataSourceProperties.getKafkaPartitionOffsets(); + if (partitionOffsets.isEmpty()) { + return; + } + List> newOffsets = KafkaUtil.getOffsetsForTimes(brokerList, topic, + convertedCustomProperties, partitionOffsets); + dataSourceProperties.setKafkaPartitionOffsets(newOffsets); + } + private void modifyPropertiesInternal(Map jobProperties, RoutineLoadDataSourceProperties dataSourceProperties) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 96ddf79b0b7d95..3ca38250912429 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -67,9 +67,6 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -79,6 +76,9 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -1269,7 +1269,7 @@ public void update() throws UserException { } } - protected void unprotectUpdateProgress() { + protected void unprotectUpdateProgress() throws UserException { } protected boolean unprotectNeedReschedule() throws UserException { @@ -1544,7 +1544,7 @@ public void readFields(DataInput in) throws IOException { } } - abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException; + abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException; abstract public void replayModifyProperties(AlterRoutineLoadJobOperationLog log); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 682d3a1cfd5dc4..d89bde719d1042 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -205,7 +205,7 @@ private boolean isNameUsed(Long dbId, String name) { return false; } - private RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName) + public RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName) throws MetaNotFoundException, DdlException, AnalysisException { RoutineLoadJob routineLoadJob = getJob(dbName, jobName); if (routineLoadJob == null) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java index 4246b305861043..f9ddb688e985ed 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java @@ -18,6 +18,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -49,7 +50,7 @@ public class AlterRoutineLoadStmtTest { @Before public void setUp() { analyzer = AccessTestUtil.fetchAdminAnalyzer(false); - + FeConstants.runningUnitTest = true; new Expectations() { { auth.checkGlobalPriv((ConnectContext) any, (PrivPredicate) any); @@ -80,7 +81,7 @@ public void testNormal() { dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000"); RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties); + typeName, dataSourceProperties, true); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), jobProperties, routineLoadDataSourceProperties); try { @@ -131,7 +132,7 @@ public void testUnsupportedProperties() { Map dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "new_topic"); RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties); + typeName, dataSourceProperties, true); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), jobProperties, routineLoadDataSourceProperties); @@ -139,8 +140,9 @@ public void testUnsupportedProperties() { stmt.analyze(analyzer); Assert.fail(); } catch (AnalysisException e) { - Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka custom property")); + Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka property")); } catch (UserException e) { + e.printStackTrace(); Assert.fail(); } } @@ -152,14 +154,14 @@ public void testUnsupportedProperties() { Map dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties); + typeName, dataSourceProperties, true); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), jobProperties, routineLoadDataSourceProperties); try { stmt.analyze(analyzer); Assert.fail(); } catch (AnalysisException e) { - Assert.assertTrue(e.getMessage().contains("Partition and offset must be specified at the same time")); + Assert.assertTrue(e.getMessage().contains("Must set offset or default offset with partition property")); } catch (UserException e) { Assert.fail(); } @@ -173,7 +175,7 @@ public void testUnsupportedProperties() { dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000"); RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties); + typeName, dataSourceProperties, true); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), jobProperties, routineLoadDataSourceProperties); try { @@ -193,14 +195,14 @@ public void testUnsupportedProperties() { Map dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000, 3000"); RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties); + typeName, dataSourceProperties, true); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), jobProperties, routineLoadDataSourceProperties); try { stmt.analyze(analyzer); Assert.fail(); } catch (AnalysisException e) { - Assert.assertTrue(e.getMessage().contains("Missing kafka partition info")); + Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number")); } catch (UserException e) { Assert.fail(); } @@ -217,7 +219,7 @@ public void testUnsupportedProperties() { dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000"); RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties); + typeName, dataSourceProperties, true); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), jobProperties, routineLoadDataSourceProperties); try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java new file mode 100644 index 00000000000000..bcadc902803c87 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java @@ -0,0 +1,347 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.routineload.KafkaProgress; + +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class RoutineLoadDataSourcePropertiesTest { + + @Test + public void testCreateNormal() throws UserException { + // normal + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100, 101, 102"); + RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); + Assert.assertEquals("test", dsProperties.getKafkaTopic()); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(3, partitinOffsets.size()); + Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); + Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); + Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); + Assert.assertEquals(Long.valueOf(100), partitinOffsets.get(0).second); + Assert.assertEquals(Long.valueOf(101), partitinOffsets.get(1).second); + Assert.assertEquals(Long.valueOf(102), partitinOffsets.get(2).second); + Assert.assertFalse(dsProperties.isOffsetsForTimes()); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + + // normal, with datetime + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); + Assert.assertEquals("test", dsProperties.getKafkaTopic()); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(3, partitinOffsets.size()); + Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); + Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); + Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); + Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(0).second); + Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(1).second); + Assert.assertEquals(Long.valueOf(1633838400000L), partitinOffsets.get(2).second); + Assert.assertTrue(dsProperties.isOffsetsForTimes()); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + + // normal, with default offset as datetime + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); + Assert.assertEquals("test", dsProperties.getKafkaTopic()); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(3, partitinOffsets.size()); + Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); + Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); + Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); + Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(0).second); + Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(1).second); + Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(2).second); + Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); + Assert.assertEquals("1578585600000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); + Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); + Assert.assertTrue(dsProperties.isOffsetsForTimes()); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + + // normal, only set default offset as datetime + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); + Assert.assertEquals("test", dsProperties.getKafkaTopic()); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(0, partitinOffsets.size()); + Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); + Assert.assertEquals("1578585600000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); + Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + + // normal, only set default offset as integer + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, KafkaProgress.OFFSET_END); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); + Assert.assertEquals("test", dsProperties.getKafkaTopic()); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(0, partitinOffsets.size()); + Assert.assertEquals(1, dsProperties.getCustomKafkaProperties().size()); + Assert.assertEquals(KafkaProgress.OFFSET_END, dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testCreateAbnormal() { + // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS together + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 1"); + properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1"); + RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("Only one of kafka_offsets and kafka_default_offsets can be set.")); + } + + // can not set datetime formatted offset and integer offset together + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 2020-10-10 12:11:11, 1"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("The offset of the partition cannot be specified by the timestamp " + + "and the offset at the same time")); + } + + // no partitions but has offset + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 1"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number")); + } + } + + @Test + public void testAlterNormal() throws UserException { + // normal + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100, 101, 102"); + RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.assertEquals("", dsProperties.getKafkaBrokerList()); + Assert.assertEquals("", dsProperties.getKafkaTopic()); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(3, partitinOffsets.size()); + Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); + Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); + Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); + Assert.assertEquals(Long.valueOf(100), partitinOffsets.get(0).second); + Assert.assertEquals(Long.valueOf(101), partitinOffsets.get(1).second); + Assert.assertEquals(Long.valueOf(102), partitinOffsets.get(2).second); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + + // normal, with datetime + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather + dsProperties.analyze(); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(3, partitinOffsets.size()); + Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); + Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); + Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); + Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(0).second); + Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(1).second); + Assert.assertEquals(Long.valueOf(1633838400000L), partitinOffsets.get(2).second); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + + // normal, with default offset as datetime + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather + dsProperties.analyze(); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(3, partitinOffsets.size()); + Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); + Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); + Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); + Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(0).second); + Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(1).second); + Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(2).second); + Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); + Assert.assertEquals("1578585600000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); + Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + + // normal, only set default offset, with utc timezone + properties = Maps.newHashMap(); + properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.UTC_TIME_ZONE); + try { + // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather + dsProperties.analyze(); + List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); + Assert.assertEquals(0, partitinOffsets.size()); + Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); + Assert.assertEquals("1578614400000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); + Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); + } catch (AnalysisException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testAlterAbnormal() { + // can not set KAFKA_BROKER_LIST_PROPERTY + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); + properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1"); + RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("kafka_broker_list is invalid kafka property")); + } + + // can not set datetime formatted offset and integer offset together + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 2020-10-10 12:11:11, 1"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather + dsProperties.analyze(); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("The offset of the partition cannot be specified by the timestamp " + + "and the offset at the same time")); + } + + // no partitions but has offset + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 1"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather + dsProperties.analyze(); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number")); + } + + // only set partition + properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1, 1, 1"); + dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); + dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + try { + dsProperties.analyze(); + Assert.fail(); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("Must set offset or default offset with partition property")); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 701eb809ed81f8..7ce75a7c939338 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -17,12 +17,13 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.ImportSequenceStmt; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.RoutineLoadDataSourceProperties; +import org.apache.doris.analysis.Separator; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; @@ -43,6 +44,10 @@ import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.GlobalTransactionMgr; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.kafka.common.PartitionInfo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,10 +55,6 @@ import org.junit.Before; import org.junit.Test; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -279,9 +280,12 @@ public void testFromCreateStmt(@Mocked Catalog catalog, PartitionInfo partitionInfo = new PartitionInfo(topicName, Integer.valueOf(s), null, null, null); kafkaPartitionInfoList.add(partitionInfo); } - Deencapsulation.setField(createRoutineLoadStmt, "kafkaPartitionOffsets", partitionIdToOffset); - Deencapsulation.setField(createRoutineLoadStmt, "kafkaBrokerList", serverAddress); - Deencapsulation.setField(createRoutineLoadStmt, "kafkaTopic", topicName); + RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties(); + dsProperties.setKafkaPartitionOffsets(partitionIdToOffset); + Deencapsulation.setField(dsProperties, "kafkaBrokerList", serverAddress); + Deencapsulation.setField(dsProperties, "kafkaTopic", topicName); + Deencapsulation.setField(createRoutineLoadStmt, "dataSourceProperties", dsProperties); + long dbId = 1l; long tableId = 2L; diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java index 3cba3fb9276372..f09a509934bab7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -19,7 +19,8 @@ import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.RoutineLoadDataSourceProperties; -import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; import com.google.common.collect.Maps; @@ -38,7 +39,7 @@ public class AlterRoutineLoadOperationLogTest { private static String fileName = "./AlterRoutineLoadOperationLogTest"; @Test - public void testSerializeAlterViewInfo() throws IOException, AnalysisException { + public void testSerializeAlterRoutineLoadOperationLog() throws IOException, UserException { // 1. Write objects to file File file = new File(fileName); file.createNewFile(); @@ -47,14 +48,15 @@ public void testSerializeAlterViewInfo() throws IOException, AnalysisException { long jobId = 1000; Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); - + String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1"); dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000"); dataSourceProperties.put("property.group.id", "mygroup"); RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(typeName, - dataSourceProperties); + dataSourceProperties, true); + routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); routineLoadDataSourceProperties.analyze(); AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, @@ -69,6 +71,8 @@ public void testSerializeAlterViewInfo() throws IOException, AnalysisException { AlterRoutineLoadJobOperationLog log2 = AlterRoutineLoadJobOperationLog.read(in); Assert.assertEquals(1, log2.getJobProperties().size()); Assert.assertEquals("5", log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + Assert.assertEquals("", log2.getDataSourceProperties().getKafkaBrokerList()); + Assert.assertEquals("", log2.getDataSourceProperties().getKafkaTopic()); Assert.assertEquals(1, log2.getDataSourceProperties().getCustomKafkaProperties().size()); Assert.assertEquals("mygroup", log2.getDataSourceProperties().getCustomKafkaProperties().get("group.id")); Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(0), diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1a50e94dbc70df..437fc2f3dd3ef7 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -216,6 +216,11 @@ message PStringPair { required string val = 2; }; +message PIntegerPair { + required int32 key = 1; + required int64 val = 2; +}; + message PKafkaLoadInfo { required string brokers = 1; required string topic = 2; @@ -224,6 +229,7 @@ message PKafkaLoadInfo { message PKafkaMetaProxyRequest { optional PKafkaLoadInfo kafka_info = 1; + repeated PIntegerPair offset_times = 3; }; message PProxyRequest { @@ -234,9 +240,14 @@ message PKafkaMetaProxyResult { repeated int32 partition_ids = 1; }; +message PKafkaPartitionOffsets { + repeated PIntegerPair offset_times = 1; +}; + message PProxyResult { required PStatus status = 1; optional PKafkaMetaProxyResult kafka_meta_result = 2; + optional PKafkaPartitionOffsets partition_offsets = 3; }; // NOTE(zc): If you want to add new method here,