Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <vector>

#include "common/status.h"
#include "gen_cpp/internal_service.pb.h"
#include "gutil/strings/split.h"
#include "runtime/small_file_mgr.h"
#include "service/backend_options.h"
Expand Down Expand Up @@ -293,6 +294,48 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
return Status::OK();
}

// get offsets of each partition for times.
// The input parameter "times" holds <partition, timestamps>
// The output parameter "offsets" returns <partition, offsets>
//
// The returned offset for each partition is the earliest offset whose
// timestamp is greater than or equal to the given timestamp in the
// corresponding partition.
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
std::vector<PIntegerPair>* offsets) {
// create topic partition
std::vector<RdKafka::TopicPartition*> topic_partitions;
for (const auto& entry : times) {
RdKafka::TopicPartition* tp1 =
RdKafka::TopicPartition::create(_topic, entry.key(), entry.val());
topic_partitions.push_back(tp1);
}
// delete TopicPartition finally
Defer delete_tp{[&topic_partitions]() {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
}};

// get offsets for times
RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "failed to get offsets for times: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

for (const auto& topic_partition : topic_partitions) {
PIntegerPair pair;
pair.set_key(topic_partition->partition());
pair.set_val(topic_partition->offset());
offsets->push_back(pair);
}

return Status::OK();
}

Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class DataConsumer {
time_t _last_visit_time;
};

class PIntegerPair;
class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) {
Expand Down Expand Up @@ -141,6 +142,9 @@ class KafkaDataConsumer : public DataConsumer {

// get the partitions ids of the topic
Status get_partition_meta(std::vector<int32_t>* partition_ids);
// get offsets for times
Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
std::vector<PIntegerPair>* offsets);

private:
std::string _brokers;
Expand Down
50 changes: 39 additions & 11 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_task_map.clear();
}

Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRequest& request,
std::vector<int32_t>* partition_ids) {
DCHECK(request.has_kafka_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
ctx.load_type = TLoadType::ROUTINE_LOAD;
ctx.load_src_type = TLoadSourceType::KAFKA;
ctx.label = "NaN";
// Create a temp StreamLoadContext and set some kafka connection info in it.
// So that we can use this ctx to get kafka data consumer instance.
Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest& request,
StreamLoadContext* ctx) {
ctx->load_type = TLoadType::ROUTINE_LOAD;
ctx->load_src_type = TLoadSourceType::KAFKA;
ctx->label = "NaN";

// convert PKafkaInfo to TKafkaLoadInfo
TKafkaLoadInfo t_info;
Expand All @@ -84,8 +82,18 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
}
t_info.__set_properties(std::move(properties));

ctx.kafka_info.reset(new KafkaLoadInfo(t_info));
ctx.need_rollback = false;
ctx->kafka_info.reset(new KafkaLoadInfo(t_info));
ctx->need_rollback = false;
return Status::OK();
}

Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRequest& request,
std::vector<int32_t>* partition_ids) {
CHECK(request.has_kafka_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
RETURN_IF_ERROR(_prepare_ctx(request, &ctx));

std::shared_ptr<DataConsumer> consumer;
RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
Expand All @@ -98,6 +106,26 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
return st;
}

Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& request,
std::vector<PIntegerPair>* partition_offsets) {
CHECK(request.has_kafka_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
RETURN_IF_ERROR(_prepare_ctx(request, &ctx));

std::shared_ptr<DataConsumer> consumer;
RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));

Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
std::vector<PIntegerPair>(request.offset_times().begin(), request.offset_times().end()),
partition_offsets);
if (st.ok()) {
_data_consumer_pool.return_consumer(consumer);
}
return st;
}

Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
std::unique_lock<std::mutex> l(_lock);
if (_task_map.find(task.id) != _task_map.end()) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class RoutineLoadTaskExecutor {
Status get_kafka_partition_meta(const PKafkaMetaProxyRequest& request,
std::vector<int32_t>* partition_ids);

Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& request,
std::vector<PIntegerPair>* partition_offsets);

private:
// execute the task
void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, ExecFinishCallback cb);
Expand All @@ -60,6 +63,8 @@ class RoutineLoadTaskExecutor {

// for test only
Status _execute_plan_for_test(StreamLoadContext* ctx);
// create a dummy StreamLoadContext for PKafkaMetaProxyRequest
Status _prepare_ctx(const PKafkaMetaProxyRequest& request, StreamLoadContext* ctx);

private:
ExecEnv* _exec_env;
Expand Down
43 changes: 33 additions & 10 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,42 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
const PProxyRequest* request, PProxyResult* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
// PProxyRequest is defined in gensrc/proto/internal_service.proto
// Currently it supports 2 kinds of requests:
// 1. get all kafka partition ids for given topic
// 2. get all kafka partition offsets for given topic and timestamp.
if (request->has_kafka_meta_request()) {
std::vector<int32_t> partition_ids;
Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta(
request->kafka_meta_request(), &partition_ids);
if (st.ok()) {
PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result();
for (int32_t id : partition_ids) {
kafka_result->add_partition_ids(id);
const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request();
if (!kafka_request.offset_times().empty()) {
// if offset_times() has elements, which means this request is to get offset by timestamp.
std::vector<PIntegerPair> partition_offsets;
Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
request->kafka_meta_request(), &partition_offsets);
if (st.ok()) {
PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
for (const auto& entry : partition_offsets) {
PIntegerPair* res = part_offsets->add_offset_times();
res->set_key(entry.key());
res->set_val(entry.val());
}
}
st.to_protobuf(response->mutable_status());
return;
} else {
// get partition ids of topic
std::vector<int32_t> partition_ids;
Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta(
request->kafka_meta_request(), &partition_ids);
if (st.ok()) {
PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result();
for (int32_t id : partition_ids) {
kafka_result->add_partition_ids(id);
}
}
st.to_protobuf(response->mutable_status());
return;
}
st.to_protobuf(response->mutable_status());
return;
}
}
Status::OK().to_protobuf(response->mutable_status());
}

Expand Down
21 changes: 21 additions & 0 deletions docs/en/administrator-guide/load-data/routine-load-manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,32 @@ The user can control the stop, pause and restart of the job by the three command
* If the broker of the user kafka cluster has `auto.create.topics.enable = false` set, topic will not be created automatically, and the routine will be paused before any data is read, with the status `PAUSED`.

So, if the user wants to be automatically created by the routine when the kafka topic does not exist, just set the broker in the kafka cluster** of the user's side to set auto.create.topics.enable = true` .

5. Problems that may occur in the some environment
In some environments, there are isolation measures for network segment and domain name resolution. So should pay attention to:
1. The broker list specified in the routine load task must be accessible on the doris environment.
2. If `advertised.listeners` is configured in kafka, The addresses in `advertised.listeners` need to be accessible on the doris environment.

6. About specified Partition and Offset

Doris supports specifying Partition and Offset to start consumption. The new version also supports the consumption function at a specified time point. The configuration relationship of the corresponding parameters is explained here.

There are three relevant parameters:

* `kafka_partitions`: Specify the list of partitions to be consumed, such as: "0, 1, 2, 3".
* `kafka_offsets`: Specify the starting offset of each partition, which must correspond to the number of `kafka_partitions` lists. Such as: "1000, 1000, 2000, 2000"
* `property.kafka_default_offset`: Specify the default starting offset of the partition.

When creating an routine load job, these three parameters can have the following combinations:

| Combinations | `kafka_partitions` | `kafka_offsets` | `property.kafka_default_offset` | Behavior |
|---|---|---|---|---|
|1| No | No | No | The system will automatically find all the partitions corresponding to the topic and start consumption from OFFSET_END |
|2| No | No | Yes | The system will automatically find all the partitions corresponding to the topic and start consumption from the position specified by the default offset |
|3| Yes | No | No | The system will start consumption from the OFFSET_END of the specified partition |
|4| Yes | Yes | No | The system will start consumption from the specified offset of the specified partition |
|5| Yes | No | Yes | The system will start consumption from the specified partition and the location specified by the default offset |

## Related parameters

Some system configuration parameters can affect the use of routine loads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,19 @@ FROM data_source

2) OFFSET_END: Subscribe from the end.

3) Timestamp, the format must be like: "2021-05-11 10:00:00", the system will automatically locate the offset of the first message greater than or equal to the timestamp.
Note that the offset of the timestamp format cannot be mixed with the number type, only one of them can be selected.

If not specified, all partitions under topic are subscribed by default fromSET_END.

Example:

```
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"

"kafka_partitions" = "0,1",
"kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
```

4. property
Expand Down Expand Up @@ -305,9 +311,12 @@ FROM data_source

2) OFFSET_END: Subscribe from the end.

3) Timestamp, the format is the same as kafka_offsets

Example:

`"property.kafka_default_offsets" = "OFFSET_BEGINNING"`
`"property.kafka_default_offsets" = "2021-05-11 10:00:00"`

8. load data format sample

Expand Down Expand Up @@ -552,6 +561,22 @@ FROM data_source
"kafka_offsets" = "101,0,0,200"
);

9. Start consumption from the specified point in time

CREATE ROUTINE LOAD example_db.test_job ON example_tbl
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "30",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
) FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.kafka_default_offsets" = "2021-10-10 11:00:00"
);

## keyword

CREATE, ROUTINE, LOAD
20 changes: 20 additions & 0 deletions docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,26 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
1. 创建Routine load 任务中指定的 Broker list 必须能够被Doris服务访问
2. Kafka 中如果配置了`advertised.listeners`, `advertised.listeners` 中的地址必须能够被Doris服务访问

6. 关于指定消费的 Partition 和 Offset

Doris 支持指定 Partition 和 Offset 开始消费。新版中还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。

有三个相关参数:

* `kafka_partitions`:指定待消费的 partition 列表,如:"0, 1, 2, 3"。
* `kafka_offsets`:指定每个分区的起始offset,必须和 `kafka_partitions` 列表个数对应。如:"1000, 1000, 2000, 2000"
* `property.kafka_default_offset`:指定分区默认的起始offset。

在创建导入作业时,这三个参数可以有以下组合:

| 组合 | `kafka_partitions` | `kafka_offsets` | `property.kafka_default_offset` | 行为 |
|---|---|---|---|---|
|1| No | No | No | 系统会自动查找topic对应的所有分区并从 OFFSET_END 开始消费 |
|2| No | No | Yes | 系统会自动查找topic对应的所有分区并从 default offset 指定的位置开始消费|
|3| Yes | No | No | 系统会从指定分区的 OFFSET_END 开始消费 |
|4| Yes | Yes | No | 系统会从指定分区的指定offset 处开始消费 |
|5| Yes | No | Yes | 系统会从指定分区,default offset 指定的位置开始消费 |

## 相关参数

一些系统配置参数会影响例行导入的使用。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,17 @@ under the License.
offset 可以指定从大于等于 0 的具体 offset,或者:
1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
2) OFFSET_END: 从末尾开始订阅。
3) 时间戳,格式必须如:"2021-05-11 10:00:00",系统会自动定位到大于等于该时间戳的第一个消息的offset。注意,时间戳格式的offset不能和数字类型混用,只能选其一。

如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。
示例:

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"

"kafka_partitions" = "0,1",
"kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"

4. property

指定自定义kafka参数。
Expand Down Expand Up @@ -264,8 +268,11 @@ under the License.
值为
1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
2) OFFSET_END: 从末尾开始订阅。
3) 时间戳,格式同 kafka_offsets

示例:
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
"property.kafka_default_offsets" = "2021-05-11 10:00:00"

8. 导入数据格式样例

Expand Down Expand Up @@ -507,6 +514,22 @@ under the License.
"kafka_offsets" = "101,0,0,200"
);

9. 从指定的时间点开始消费

CREATE ROUTINE LOAD example_db.test_job ON example_tbl
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "30",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
) FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.kafka_default_offsets" = "2021-10-10 11:00:00"
);

## keyword

CREATE,ROUTINE,LOAD
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ opt_datasource_properties ::=
:}
| KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
{:
RESULT = new RoutineLoadDataSourceProperties(type, customProperties);
// the 3rd parameter "true" means this is for AlterRoutineLoad operation.
RESULT = new RoutineLoadDataSourceProperties(type, customProperties, true);
:}
;

Expand Down
Loading