From ec20b801f75c8a48cb9694bf0fdef94c14ca2e47 Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 24 Dec 2024 10:15:25 +0800 Subject: [PATCH 01/15] [CH] Support MicroBatchScanExec with KafkaScan in batch mode --- backends-clickhouse/pom.xml | 62 +++ .../backendsapi/clickhouse/CHBackend.scala | 1 + .../clickhouse/CHIteratorApi.scala | 2 + .../ClickhouseGlutenKafkaScanSuite.scala | 29 ++ cpp-ch/local-engine/Common/CHUtil.cpp | 3 + .../Parser/RelParsers/ReadRelParser.cpp | 25 +- .../Parser/RelParsers/ReadRelParser.h | 2 + .../RelParsers/StreamKafkaRelParser.cpp | 80 ++++ .../Parser/RelParsers/StreamKafkaRelParser.h | 53 +++ .../Storages/Kafka/GlutenKafkaSource.cpp | 356 ++++++++++++++++++ .../Storages/Kafka/GlutenKafkaSource.h | 109 ++++++ .../Storages/Kafka/GlutenKafkaUtils.cpp | 301 +++++++++++++++ .../Storages/Kafka/GlutenKafkaUtils.h | 74 ++++ .../Kafka/ReadFromGlutenStorageKafka.cpp | 100 +++++ .../Kafka/ReadFromGlutenStorageKafka.h | 65 ++++ gluten-kafka/pom.xml | 149 ++++++++ .../rel/StreamKafkaSourceBuilder.java | 41 ++ ...xecution.DataSourceScanTransformerRegister | 1 + .../MicroBatchScanExecTransformer.scala | 108 ++++++ .../StreamKafkaTransformerProvider.scala | 30 ++ .../GlutenStreamKafkaSourceUtil.scala | 48 +++ .../kafka/GlutenKafkaScanSuite.scala | 89 +++++ .../gluten/substrait/rel/LocalFilesNode.java | 1 + .../gluten/substrait/rel/ReadRelNode.java | 6 + .../substrait/rel/StreamKafkaSourceNode.java | 80 ++++ .../substrait/proto/substrait/algebra.proto | 17 + .../offload/OffloadSingleNodeRules.scala | 4 +- .../columnar/validator/Validators.scala | 2 +- pom.xml | 9 + 29 files changed, 1843 insertions(+), 4 deletions(-) create mode 100644 backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala create mode 100644 cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp create mode 100644 cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h create mode 100644 cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp create mode 100644 cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h create mode 100644 cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp create mode 100644 cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h create mode 100644 cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp create mode 100644 cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h create mode 100644 gluten-kafka/pom.xml create mode 100644 gluten-kafka/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java create mode 100644 gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister create mode 100644 gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala create mode 100644 gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala create mode 100644 gluten-kafka/src/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala create mode 100644 gluten-kafka/src/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala create mode 100644 gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index fa6863b8774e..428af5b0f368 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -68,6 +68,68 @@ + + kafka + + false + + + + org.apache.gluten + gluten-kafka + ${project.version} + + + org.apache.gluten + gluten-kafka + ${project.version} + test-jar + test + + + org.apache.spark + spark-sql-kafka-0-10_${scala.binary.version} + ${spark.version} + provided + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-kafka-sources + generate-sources + + add-source + + + + ${project.basedir}/src/main-kafka/scala + ${project.basedir}/src/main-kafka/java + + + + + add-kafka-test-sources + generate-test-sources + + add-test-source + + + + ${project.basedir}/src/test-kafka/scala + ${project.basedir}/src/test-kafka/java + + + + + + + + diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 962698759320..d2e1a35a9aad 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -200,6 +200,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { ValidationResult.failed("Has complex type.") } case JsonReadFormat => ValidationResult.succeeded + case KafkaReadFormat => ValidationResult.succeeded case _ => ValidationResult.failed(s"Unsupported file format $format") } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 0b20e5aeeabb..b041b1b81756 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -243,6 +243,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { (filesNode.toProtobuf.toByteArray, filesNode.getPaths.asScala.toSeq) case extensionTableNode: ExtensionTableNode => (extensionTableNode.toProtobuf.toByteArray, extensionTableNode.getPartList) + case kafkaSourceNode: StreamKafkaSourceNode => + (kafkaSourceNode.toProtobuf.toByteArray, Seq.empty) } }.unzip diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala new file mode 100644 index 000000000000..3c13d335274c --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.kafka + +import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite + +class ClickhouseGlutenKafkaScanSuite + extends GlutenClickHouseWholeStageTransformerSuite + with GlutenKafkaScanSuite { + + override protected val fileFormat: String = "parquet" + + protected val kafkaBootstrapServers: String = "localhost:9092" + +} diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 37d1a1bbd717..5d7a5c803bb5 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -880,6 +881,8 @@ void registerGlutenDisks() void BackendInitializerUtil::registerAllFactories() { + registerFormats(); + registerGlutenDisks(); registerReadBufferBuilders(); diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp index 2e4dd4af1b82..5be79ecb1c23 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -51,7 +52,7 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra if (query_plan) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null"); const auto & read = rel.read(); - if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read))) + if (isReadFromDefault(read)) { assert(read.has_base_schema()); DB::QueryPlanStepPtr read_step; @@ -70,6 +71,13 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra query_plan->addStep(std::move(buffer_step)); } } + else if (isReadFromStreamKafka(read)) + { + StreamKafkaRelParser kafka_parser(parser_context, getContext()); + kafka_parser.setSplitInfo(split_info); + query_plan = kafka_parser.parseReadRel(std::make_unique(), read); + steps = kafka_parser.getSteps(); + } else { substrait::ReadRel::ExtensionTable extension_table; @@ -87,6 +95,11 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra return query_plan; } +bool ReadRelParser::isReadFromDefault(const substrait::ReadRel & read) +{ + return read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read) && !isReadFromStreamKafka(read)); +} + bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel) { return rel.has_local_files() && rel.local_files().items().size() == 1 @@ -95,7 +108,9 @@ bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel) bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel) { - assert(rel.has_advanced_extension()); + if (!rel.has_advanced_extension()) + return false; + bool is_read_from_merge_tree; google::protobuf::StringValue optimization; optimization.ParseFromString(rel.advanced_extension().optimization().value()); @@ -107,6 +122,12 @@ bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel) return is_read_from_merge_tree; } + +bool ReadRelParser::isReadFromStreamKafka(const substrait::ReadRel & rel) +{ + return rel.has_stream_kafka() && rel.stream_kafka(); +} + DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const substrait::ReadRel & rel) { GET_JNIENV(env) diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h index 7f84a89ed49c..838ce6392003 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h @@ -38,8 +38,10 @@ class ReadRelParser : public RelParser // This is source node, there is no input std::optional getSingleInput(const substrait::Rel & rel) override { return {}; } + bool isReadFromDefault(const substrait::ReadRel & rel); bool isReadRelFromJava(const substrait::ReadRel & rel); bool isReadFromMergeTree(const substrait::ReadRel & rel); + bool isReadFromStreamKafka(const substrait::ReadRel & rel); void setInputIter(jobject input_iter_, bool is_materialze) { diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp new file mode 100644 index 000000000000..94b18bc8bd45 --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp @@ -0,0 +1,80 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "StreamKafkaRelParser.h" + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int NO_SUCH_DATA_PART; +extern const int LOGICAL_ERROR; +extern const int UNKNOWN_FUNCTION; +extern const int UNKNOWN_TYPE; +} +} + + +namespace local_engine +{ +using namespace DB; + +DB::QueryPlanPtr StreamKafkaRelParser::parseReadRel(DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel) +{ + if (!read_rel.has_stream_kafka()) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can't not parse kafka rel, because of read rel don't contained stream kafka"); + + auto kafka_task = BinaryToMessage(split_info); + auto topic = kafka_task.topic_partition().topic(); + auto partition = kafka_task.topic_partition().partition(); + auto start_offset = kafka_task.start_offset(); + auto end_offset = kafka_task.end_offset(); + auto poll_timeout_ms = kafka_task.poll_timeout_ms(); + String group_id; + String brokers; + + for (auto param : kafka_task.params()) + if (param.first == "poll_timeout_ms") + poll_timeout_ms = std::stoi(param.second); + else if (param.first == "group.id") + group_id = param.second; + else if (param.first == "bootstrap.servers") + brokers = param.second; + else + std::cout << param.first << " : " << param.second << std::endl; + + Names topics; + topics.emplace_back(topic); + + auto header = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema()); + Names names = header.getNames(); + auto source = std::make_unique( + names, header, getContext(), topics, partition, start_offset, end_offset, poll_timeout_ms, group_id, brokers); + + query_plan->addStep(std::move(source)); + + + return query_plan; +} + + +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h new file mode 100644 index 000000000000..d99f8fda7412 --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace local_engine +{ +class StreamKafkaRelParser : public RelParser +{ +public: + explicit StreamKafkaRelParser(ParserContextPtr parser_context_, const ContextPtr & context_) + : RelParser(parser_context_), context(context_) + { + } + + ~StreamKafkaRelParser() override = default; + + DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call parse(), call parseReadRel instead."); + } + + std::optional getSingleInput(const substrait::Rel &) override + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call getSingleInput()."); + } + + DB::QueryPlanPtr parseReadRel( + DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel); + + void setSplitInfo(String split_info_) { split_info = split_info_; } + +private: + ContextPtr context; + + String split_info; +}; +} diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp new file mode 100644 index 000000000000..eb1dd92756a5 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp @@ -0,0 +1,356 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GlutenKafkaSource.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace ProfileEvents +{ +extern const Event KafkaMessagesRead; +extern const Event KafkaMessagesFailed; +extern const Event KafkaRowsRead; +extern const Event KafkaRowsRejected; +} + +namespace DB +{ +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + +namespace Setting +{ +extern const SettingsUInt64 max_block_size; +extern const SettingsUInt64 max_insert_block_size; +extern const SettingsUInt64 output_format_avro_rows_in_file; +extern const SettingsMilliseconds stream_flush_interval_ms; +extern const SettingsMilliseconds stream_poll_timeout_ms; +extern const SettingsBool use_concurrency_control; +} + +namespace KafkaSetting +{ +extern const KafkaSettingsUInt64 input_format_allow_errors_num; +extern const KafkaSettingsFloat input_format_allow_errors_ratio; +extern const KafkaSettingsString kafka_broker_list; +extern const KafkaSettingsString kafka_client_id; +extern const KafkaSettingsBool kafka_commit_every_batch; +extern const KafkaSettingsBool kafka_commit_on_select; +extern const KafkaSettingsUInt64 kafka_consumers_pool_ttl_ms; +extern const KafkaSettingsMilliseconds kafka_flush_interval_ms; +extern const KafkaSettingsString kafka_format; +extern const KafkaSettingsString kafka_group_name; +extern const KafkaSettingsStreamingHandleErrorMode kafka_handle_error_mode; +extern const KafkaSettingsUInt64 kafka_max_block_size; +extern const KafkaSettingsUInt64 kafka_max_rows_per_message; +extern const KafkaSettingsUInt64 kafka_num_consumers; +extern const KafkaSettingsUInt64 kafka_poll_max_batch_size; +extern const KafkaSettingsMilliseconds kafka_poll_timeout_ms; +extern const KafkaSettingsString kafka_schema; +extern const KafkaSettingsBool kafka_thread_per_consumer; +extern const KafkaSettingsString kafka_topic_list; +} +} + + +namespace local_engine +{ + +size_t GlutenKafkaSource::getPollMaxBatchSize() const +{ + size_t batch_size = (*kafka_settings)[KafkaSetting::kafka_poll_max_batch_size].changed + ? (*kafka_settings)[KafkaSetting::kafka_poll_max_batch_size].value + : context->getSettingsRef()[Setting::max_block_size].value; + + return std::min(batch_size, getMaxBlockSize()); +} + +size_t GlutenKafkaSource::getMaxBlockSize() const +{ + return (*kafka_settings)[KafkaSetting::kafka_max_block_size].changed + ? (*kafka_settings)[KafkaSetting::kafka_max_block_size].value + : (context->getSettingsRef()[Setting::max_insert_block_size].value / /*num_consumers*/ 1); +} + +size_t GlutenKafkaSource::getPollTimeoutMillisecond() const +{ + return (*kafka_settings)[KafkaSetting::kafka_poll_timeout_ms].changed + ? (*kafka_settings)[KafkaSetting::kafka_poll_timeout_ms].totalMilliseconds() + : context->getSettingsRef()[Setting::stream_poll_timeout_ms].totalMilliseconds(); +} + +GlutenKafkaSource::GlutenKafkaSource( + const Block & result_header_, + const ContextPtr & context_, + const Names & topics_, + const size_t & partition_, + const String & brokers_, + const String & group_, + const size_t & poll_timeout_ms_, + const size_t & start_offset_, + const size_t & end_offset_, + const std::shared_ptr & kafka_settings_) + : ISource(result_header_) + , context(context_) + , log(getLogger("GlutenKafkaSource")) + , kafka_settings(kafka_settings_) + , result_header(result_header_) + , topics(topics_) + , brokers(brokers_) + , group(group_) + , poll_timeout_ms(poll_timeout_ms_) + , start_offset(start_offset_) + , end_offset(end_offset_) + , partition(partition_) +{ + max_block_size = end_offset - start_offset; + + for (const auto & columns_with_type_and_name : result_header.getColumnsWithTypeAndName()) + { + if (columns_with_type_and_name.name == "value") + { + const auto no_null_datatype = removeNullable(columns_with_type_and_name.type); + non_virtual_header.insert(ColumnWithTypeAndName(no_null_datatype->createColumn(), no_null_datatype, "value")); + continue; + } + + virtual_header.insert(columns_with_type_and_name); + } +} + +GlutenKafkaSource::~GlutenKafkaSource() +{ + std::lock_guard lock(consumer_mutex); + auto topic_partition = TopicPartition{topics[0], partition}; + consumers_in_memory[topic_partition].emplace_back(consumer); +} + +void GlutenKafkaSource::initConsumer() +{ + std::lock_guard lock(consumer_mutex); + auto topic_partition = TopicPartition{topics[0], partition}; + consumers_in_memory.try_emplace(topic_partition, std::vector()); + + auto & consumers = consumers_in_memory[topic_partition]; + + if (!consumers.empty()) + { + consumer = consumers.back(); + consumers.pop_back(); + } + + if (!consumer) + { + LOG_DEBUG(log, "Creating new Kafka consumer for topic: {}, partition: {}", topics[0], partition); + String collection_name = ""; + KafkaConsumerPtr kafka_consumer_ptr = std::make_shared( + log, + getPollMaxBatchSize(), + getPollTimeoutMillisecond(), + /*intermediate_commit*/ false, + /*stream_cancelled*/ is_cancelled, + topics); + + KafkaConfigLoader::ConsumerConfigParams params{ + {context->getConfigRef(), /*collection_name*/ collection_name, topics, log}, + brokers, + group, + false, + 1, + client_id, + getMaxBlockSize()}; + + kafka_consumer_ptr->createConsumer(GlutenKafkaUtils::getConsumerConfiguration(params)); + consumer = kafka_consumer_ptr; + LOG_DEBUG(log, "Created new Kafka consumer for topic: {}, partition: {}", topics[0], partition); + } + + consumer->subscribe(topics[0], partition, start_offset); +} + + +Chunk GlutenKafkaSource::generateImpl() +{ + if (!consumer) + initConsumer(); + + // TODO: delete + bool put_error_to_stream = false; + std::optional exception_message; + + auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) + { + ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); + + if (put_error_to_stream) + { + exception_message = e.message(); + for (size_t i = 0; i < result_columns.size(); ++i) + { + // We could already push some rows to result_columns before exception, we need to fix it. + result_columns[i]->rollback(*checkpoints[i]); + + // all data columns will get default value in case of error + result_columns[i]->insertDefault(); + } + + return 1; + } + + e.addMessage( + "while parsing Kafka message (topic: {}, partition: {}, offset: {})'", + consumer->currentTopic(), + consumer->currentPartition(), + consumer->currentOffset()); + consumer->setExceptionInfo(e.message()); + throw std::move(e); + }; + + EmptyReadBuffer empty_buf; + auto input_format + = FormatFactory::instance().getInput("Raw", empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); + + StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); + size_t total_rows = 0; + MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); + + while (true) + { + size_t new_rows = 0; + if (auto buf = consumer->consume()) + { + ProfileEvents::increment(ProfileEvents::KafkaMessagesRead); + new_rows = executor.execute(*buf); + } + + if (new_rows) + { + // In read_kafka_message(), KafkaConsumer::nextImpl() + // will be called, that may make something unusable, i.e. clean + // KafkaConsumer::messages, which is accessed from + // KafkaConsumer::currentTopic() (and other helpers). + if (consumer->isStalled()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Polled messages became unusable"); + + ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows); + consumer->storeLastReadMessageOffset(); + + auto topic = consumer->currentTopic(); + auto key = consumer->currentKey(); + auto offset = consumer->currentOffset(); + auto partition = consumer->currentPartition(); + auto timestamp_raw = consumer->currentTimestamp(); + auto header_list = consumer->currentHeaderList(); + + for (size_t i = 0; i < new_rows; ++i) + { + virtual_columns[0]->insert(key); + virtual_columns[1]->insert(topic); + virtual_columns[2]->insert(partition); + virtual_columns[3]->insert(offset); + + if (timestamp_raw) + { + auto ts = timestamp_raw->get_timestamp(); + virtual_columns[4]->insert( + DecimalField(std::chrono::duration_cast(ts).count(), 3)); + } + else + { + virtual_columns[4]->insertDefault(); + } + + virtual_columns[5]->insertDefault(); + } + + total_rows = total_rows + new_rows; + } + else if (consumer->polledDataUnusable()) + { + break; + } + else + { + // We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal + // TODO: it seems like in case of put_error_to_stream=true we may need to process those differently + // currently we just skip them with note in logs. + consumer->storeLastReadMessageOffset(); + LOG_DEBUG( + log, + "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", + consumer->currentTopic(), + consumer->currentPartition(), + consumer->currentOffset()); + } + + if (!consumer->hasMorePolledMessages() || total_rows >= max_block_size) + break; + } + + if (total_rows == 0) + return {}; + + if (consumer->polledDataUnusable()) + { + // the rows were counted already before by KafkaRowsRead, + // so let's count the rows we ignore separately + // (they will be retried after the rebalance) + ProfileEvents::increment(ProfileEvents::KafkaRowsRejected, total_rows); + return {}; + } + + auto result_block = non_virtual_header.cloneWithColumns(executor.getResultColumns()); //.cloneWithCutColumns(0, max_block_size); + + auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); //.cloneWithCutColumns(0, max_block_size); + for (const auto & column : virtual_block.getColumnsWithTypeAndName()) + result_block.insert(column); + + auto converting_dag = ActionsDAG::makeConvertingActions( + result_block.cloneEmpty().getColumnsWithTypeAndName(), + getPort().getHeader().getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name); + + auto converting_actions = std::make_shared(std::move(converting_dag)); + converting_actions->execute(result_block); + + return Chunk(result_block.getColumns(), result_block.rows()); +} + +Chunk GlutenKafkaSource::generate() +{ + if (isCancelled() || finished) + return {}; + + auto chunk = generateImpl(); + finished = true; + + return chunk; +} +} diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h new file mode 100644 index 000000000000..e007bdc2f5b4 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h @@ -0,0 +1,109 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + + +#include +#include + +namespace local_engine +{ +using namespace DB; + +class GlutenKafkaSource : public ISource +{ +public: + GlutenKafkaSource( + const Block & result_header_, + const ContextPtr & context_, + const Names & topics_, + const size_t & partition_, + const String & brokers_, + const String & group_, + const size_t & poll_timeout_ms_, + const size_t & start_offset_, + const size_t & end_offset_, + const std::shared_ptr & kafka_settings_); + + ~GlutenKafkaSource() override; + + struct TopicPartition + { + String topic; + size_t partition; + + bool operator==(const TopicPartition & other) const { return topic == other.topic && partition == other.partition; } + + TopicPartition(const String & topic, size_t partition) : topic(topic), partition(partition) { } + }; + + String getName() const override { return "GlutenKafkaSource"; } + +protected: + Chunk generate() override; + +private: + Chunk generateImpl(); + void initConsumer(); + + size_t getPollMaxBatchSize() const; + size_t getMaxBlockSize() const; + size_t getPollTimeoutMillisecond() const; + + LoggerPtr log; + ContextPtr context; + UInt64 max_block_size; + KafkaConsumerPtr consumer; + + Block result_header; + Block virtual_header; + Block non_virtual_header; + std::shared_ptr kafka_settings; + + const Names topics; + const size_t partition; + const String brokers; + const String group; + const String client_id = "123"; + const size_t poll_timeout_ms; + const size_t start_offset; + const size_t end_offset; + bool finished = false; +}; + +} + +namespace std +{ +template <> +struct hash +{ + std::size_t operator()(const local_engine::GlutenKafkaSource::TopicPartition & tp) const noexcept + { + std::size_t h1 = std::hash{}(tp.topic); + std::size_t h2 = std::hash{}(tp.partition); + return h1 ^ (h2 << 1); // Combine the two hash values + } +}; +} + +namespace local_engine +{ +static std::mutex consumer_mutex; +static std::unordered_map> consumers_in_memory; +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp new file mode 100644 index 000000000000..9dc25a0a0860 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp @@ -0,0 +1,301 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GlutenKafkaUtils.h" + + +#include +#include +#include +#include + +namespace local_engine +{ +using namespace DB; +void GlutenKafkaUtils::setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value) +{ + /// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML. + /// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", "."); + kafka_config.set(setting_name_in_kafka_config, value); +} + +void GlutenKafkaUtils::loadNamedCollectionConfig( + cppkafka::Configuration & kafka_config, const String & collection_name, const String & config_prefix) +{ + const auto & collection = DB::NamedCollectionFactory::instance().get(collection_name); + for (const auto & key : collection->getKeys(-1, config_prefix)) + { + // Cut prefix with '.' before actual config tag. + const auto param_name = key.substr(config_prefix.size() + 1); + setKafkaConfigValue(kafka_config, param_name, collection->get(key)); + } +} + +void GlutenKafkaUtils::loadConfigProperty( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + const String & tag) +{ + const String property_path = config_prefix + "." + tag; + const String property_value = config.getString(property_path); + + setKafkaConfigValue(kafka_config, tag, property_value); +} + + +void GlutenKafkaUtils::loadTopicConfig( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & collection_name, + const String & config_prefix, + const String & topic) +{ + if (!collection_name.empty()) + { + const auto topic_prefix = fmt::format("{}.{}", config_prefix, KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG); + const auto & collection = NamedCollectionFactory::instance().get(collection_name); + for (const auto & key : collection->getKeys(1, config_prefix)) + { + /// Only consider key . Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc. + if (!key.starts_with(topic_prefix)) + continue; + + const String kafka_topic_path = config_prefix + "." + key; + const String kafka_topic_name_path = kafka_topic_path + "." + KafkaConfigLoader::CONFIG_NAME_TAG; + if (topic == collection->get(kafka_topic_name_path)) + /// Found it! Now read the per-topic configuration into cppkafka. + loadNamedCollectionConfig(kafka_config, collection_name, kafka_topic_path); + } + } + else + { + /// Read all tags one level below + Poco::Util::AbstractConfiguration::Keys tags; + config.keys(config_prefix, tags); + + for (const auto & tag : tags) + { + if (tag == KafkaConfigLoader::CONFIG_NAME_TAG) + continue; // ignore , it is used to match topic configurations + loadConfigProperty(kafka_config, config, config_prefix, tag); + } + } +} + + +void GlutenKafkaUtils::loadFromConfig( + cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params, const String & config_prefix) +{ + if (!params.collection_name.empty()) + { + loadNamedCollectionConfig(kafka_config, params.collection_name, config_prefix); + return; + } + + /// Read all tags one level below + Poco::Util::AbstractConfiguration::Keys tags; + params.config.keys(config_prefix, tags); + + for (const auto & tag : tags) + { + if (tag == KafkaConfigLoader::CONFIG_KAFKA_PRODUCER_TAG || tag == KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG) + /// Do not load consumer/producer properties, since they should be separated by different configuration objects. + continue; + + if (tag.starts_with( + KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc. + { + // Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball": + // + // + // football + // 250 + // 5000 + // + // + // baseball + // 300 + // 2000 + // + // + // Advantages: The period restriction no longer applies (e.g. sports.football will work), everything + // Kafka-related is below . + for (const auto & topic : params.topics) + { + /// Read topic name between ... + const String kafka_topic_path = config_prefix + "." + tag; + const String kafka_topic_name_path = kafka_topic_path + "." + KafkaConfigLoader::CONFIG_NAME_TAG; + const String topic_name = params.config.getString(kafka_topic_name_path); + + if (topic_name != topic) + continue; + loadTopicConfig(kafka_config, params.config, params.collection_name, kafka_topic_path, topic); + } + continue; + } + if (tag.starts_with(KafkaConfigLoader::CONFIG_KAFKA_TAG)) + /// skip legacy configuration per topic e.g. . + /// it will be processed is a separate function + continue; + // Update configuration from the configuration. Example: + // + // 250 + // 100000 + // + loadConfigProperty(kafka_config, params.config, config_prefix, tag); + } +} + + +void GlutenKafkaUtils::updateGlobalConfiguration(cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params) +{ + loadFromConfig(kafka_config, params, KafkaConfigLoader::CONFIG_KAFKA_TAG); + +#if USE_KRB5 + if (kafka_config.has_property("sasl.kerberos.kinit.cmd")) + LOG_WARNING(params.log, "sasl.kerberos.kinit.cmd configuration parameter is ignored."); + + kafka_config.set("sasl.kerberos.kinit.cmd", ""); + kafka_config.set("sasl.kerberos.min.time.before.relogin", "0"); + + if (kafka_config.has_property("sasl.kerberos.keytab") && kafka_config.has_property("sasl.kerberos.principal")) + { + String keytab = kafka_config.get("sasl.kerberos.keytab"); + String principal = kafka_config.get("sasl.kerberos.principal"); + LOG_DEBUG(params.log, "Running KerberosInit"); + try + { + kerberosInit(keytab, principal); + } + catch (const Exception & e) + { + LOG_ERROR(params.log, "KerberosInit failure: {}", getExceptionMessage(e, false)); + } + LOG_DEBUG(params.log, "Finished KerberosInit"); + } +#else // USE_KRB5 + if (kafka_config.has_property("sasl.kerberos.keytab") || kafka_config.has_property("sasl.kerberos.principal")) + LOG_WARNING(params.log, "Ignoring Kerberos-related parameters because ClickHouse was built without krb5 library support."); +#endif // USE_KRB5 + // No need to add any prefix, messages can be distinguished + kafka_config.set_log_callback( + [log = params.log](cppkafka::KafkaHandleBase & handle, int level, const std::string & facility, const std::string & message) + { + auto [poco_level, client_logs_level] = parseSyslogLevel(level); + const auto & kafka_object_config = handle.get_configuration(); + const std::string client_id_key{"client.id"}; + chassert(kafka_object_config.has_property(client_id_key) && "Kafka configuration doesn't have expected client.id set"); + LOG_IMPL( + log, + client_logs_level, + poco_level, + "[client.id:{}] [rdk:{}] {}", + kafka_object_config.get(client_id_key), + facility, + message); + }); + + /// NOTE: statistics should be consumed, otherwise it creates too much + /// entries in the queue, that leads to memory leak and slow shutdown. + if (!kafka_config.has_property("statistics.interval.ms")) + { + // every 3 seconds by default. set to 0 to disable. + kafka_config.set("statistics.interval.ms", "3000"); + } +} + + +void GlutenKafkaUtils::loadLegacyTopicConfig( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & collection_name, + const String & config_prefix) +{ + if (!collection_name.empty()) + { + loadNamedCollectionConfig(kafka_config, collection_name, config_prefix); + return; + } + + Poco::Util::AbstractConfiguration::Keys tags; + config.keys(config_prefix, tags); + + for (const auto & tag : tags) + loadConfigProperty(kafka_config, config, config_prefix, tag); +} + +void GlutenKafkaUtils::loadLegacyConfigSyntax( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & collection_name, + const Names & topics) +{ + for (const auto & topic : topics) + { + const String kafka_topic_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + "." + KafkaConfigLoader::CONFIG_KAFKA_TAG + "_" + topic; + loadLegacyTopicConfig(kafka_config, config, collection_name, kafka_topic_path); + } +} + + +void GlutenKafkaUtils::loadConsumerConfig(cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params) +{ + const String consumer_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + "." + KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG; + loadLegacyConfigSyntax(kafka_config, params.config, params.collection_name, params.topics); + // A new syntax has higher priority + loadFromConfig(kafka_config, params, consumer_path); +} + + +cppkafka::Configuration GlutenKafkaUtils::getConsumerConfiguration(const KafkaConfigLoader::ConsumerConfigParams & params) +{ + cppkafka::Configuration conf; + + conf.set("metadata.broker.list", params.brokers); + conf.set("group.id", params.group); + if (params.multiple_consumers) + conf.set("client.id", fmt::format("{}-{}", params.client_id, params.consumer_number)); + else + conf.set("client.id", params.client_id); + conf.set("client.software.name", VERSION_NAME); + conf.set("client.software.version", VERSION_DESCRIBE); + conf.set("auto.offset.reset", "earliest"); // If no offset stored for this group, read all messages from the start + + // that allows to prevent fast draining of the librdkafka queue + // during building of single insert block. Improves performance + // significantly, but may lead to bigger memory consumption. + size_t default_queued_min_messages = 100000; // must be greater than or equal to default + size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value + conf.set( + "queued.min.messages", std::min(std::max(params.max_block_size, default_queued_min_messages), max_allowed_queued_min_messages)); + + updateGlobalConfiguration(conf, params); + loadConsumerConfig(conf, params); + + // those settings should not be changed by users. + conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished + conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once. + conf.set("enable.partition.eof", "false"); // Ignore EOF messages + + for (auto & property : conf.get_all()) + LOG_TRACE(params.log, "Consumer set property {}:{}", property.first, property.second); + + return conf; +} +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h new file mode 100644 index 000000000000..411e78d1d001 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h @@ -0,0 +1,74 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace local_engine +{ +using namespace DB; + +class GlutenKafkaUtils +{ +public: + static void setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value); + + static void + loadNamedCollectionConfig(cppkafka::Configuration & kafka_config, const String & collection_name, const String & config_prefix); + + static void loadConfigProperty( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + const String & tag); + + static void loadTopicConfig( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & collection_name, + const String & config_prefix, + const String & topic); + + static void loadFromConfig( + cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params, const String & config_prefix); + + static void updateGlobalConfiguration(cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params); + + static void loadLegacyTopicConfig( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & collection_name, + const String & config_prefix); + + static void loadLegacyConfigSyntax( + cppkafka::Configuration & kafka_config, + const Poco::Util::AbstractConfiguration & config, + const String & collection_name, + const Names & topics); + + static void loadConsumerConfig(cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params); + + + static cppkafka::Configuration getConsumerConfiguration(const KafkaConfigLoader::ConsumerConfigParams & params); +}; + + +} diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp new file mode 100644 index 000000000000..9b6fdc31bd92 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp @@ -0,0 +1,100 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "GlutenKafkaSource.h" + +#include +#include +#include + + +namespace DB +{ +namespace Setting +{ +extern const SettingsBool stream_like_engine_allow_direct_select; +} +} + + +namespace local_engine +{ +using namespace DB; + +ReadFromGlutenStorageKafka::ReadFromGlutenStorageKafka( + const Names & column_names_, + Header output_header_, + // std::shared_ptr storage_limits_, + ContextPtr context_, + Names & topics, + size_t partition, + size_t start_offset, + size_t end_offset, + size_t poll_timeout_ms, + String group_id, + String brokers) + : ISourceStep{output_header_} + , WithContext{context_} + , + // storage_limits{std::move(storage_limits_)}, + column_names(column_names_) + , output_header(output_header_) + , topics(topics) + , partition(partition) + , start_offset(start_offset) + , end_offset(end_offset) + , poll_timeout_ms(poll_timeout_ms) + , group_id(group_id) + , brokers(brokers) +{ +} + +void ReadFromGlutenStorageKafka::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + auto pipe = makePipe(); + + /// Add storage limits. + // for (const auto & processor : pipe.getProcessors()) + // processor->setStorageLimits(storage_limits); + + /// Add to processors to get processor info through explain pipeline statement. + for (const auto & processor : pipe.getProcessors()) + processors.emplace_back(processor); + + pipeline.init(std::move(pipe)); +} + +Pipe ReadFromGlutenStorageKafka::makePipe() +{ + auto kafka_settings = createKafkaSettings(); + Pipes pipes; + pipes.reserve(1); + auto modified_context = Context::createCopy(getContext()); + // modified_context->applySettingsChanges(kafka_storage.settings_adjustments); + pipes.emplace_back(std::make_shared( + output_header, modified_context, topics, partition, brokers, group_id, poll_timeout_ms, start_offset, end_offset, kafka_settings)); + + // LOG_DEBUG(kafka_storage.log, "Starting reading kafka batch stream"); + return Pipe::unitePipes(std::move(pipes)); +} + +std::shared_ptr ReadFromGlutenStorageKafka::createKafkaSettings() +{ + // TODO: add more configuration + return std::make_shared(); +} + +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h new file mode 100644 index 000000000000..477fe2f10957 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h @@ -0,0 +1,65 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace local_engine +{ +using namespace DB; +class ReadFromGlutenStorageKafka : public ISourceStep, protected WithContext +{ +public: + ReadFromGlutenStorageKafka( + const Names & column_names_, + Header output_header_, + ContextPtr context_, + Names & topics, + size_t partition, + size_t start_offset, + size_t end_offset, + size_t poll_timeout_ms, + String group_id, + String brokers); + + String getName() const override { return "ReadFromGlutenStorageKafka"; } + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/) final; + +private: + Pipe makePipe(); + std::shared_ptr createKafkaSettings(); + +protected: + // std::shared_ptr storage_limits; + const Names & column_names; + Header output_header; + + Names topics; + size_t partition; + size_t start_offset; + size_t end_offset; + size_t poll_timeout_ms; + String group_id; + String brokers; +}; + + +} diff --git a/gluten-kafka/pom.xml b/gluten-kafka/pom.xml new file mode 100644 index 000000000000..35ad69f8dc8a --- /dev/null +++ b/gluten-kafka/pom.xml @@ -0,0 +1,149 @@ + + + + gluten-parent + org.apache.gluten + 1.3.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-kafka + jar + Gluten Kafka + + + ${project.basedir}/src/main/resources + + + + + org.apache.gluten + gluten-substrait + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + + org.apache.spark + spark-sql-kafka-0-10_${scala.binary.version} + ${spark.version} + provided + + + + + org.apache.gluten + gluten-substrait + ${project.version} + test-jar + test + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + test + + + com.google.protobuf + protobuf-java + ${protobuf.version} + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + ${resource.dir} + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.scalatest + scalatest-maven-plugin + + . + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + diff --git a/gluten-kafka/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java b/gluten-kafka/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java new file mode 100644 index 000000000000..c0cfe8866ffb --- /dev/null +++ b/gluten-kafka/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel; + +import java.util.Map; + +public class StreamKafkaSourceBuilder { + public static StreamKafkaSourceNode makeStreamKafkaBatch( + String topic, + Integer partition, + Long startOffset, + Long endOffset, + Long pollTimeoutMs, + Boolean failOnDataLoss, + Boolean includeHeaders, + Map kafkaParams) { + return new StreamKafkaSourceNode( + topic, + partition, + startOffset, + endOffset, + pollTimeoutMs, + failOnDataLoss, + includeHeaders, + kafkaParams); + } +} diff --git a/gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister b/gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister new file mode 100644 index 000000000000..13d23f79e1f9 --- /dev/null +++ b/gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister @@ -0,0 +1 @@ +org.apache.gluten.execution.StreamKafkaTransformerProvider \ No newline at end of file diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala new file mode 100644 index 000000000000..402ef2fec31c --- /dev/null +++ b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.gluten.substrait.rel.{ReadRelNode, SplitInfo} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} +import org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec +import org.apache.spark.sql.kafka010.GlutenStreamKafkaSourceUtil +import org.apache.spark.sql.types.StructType + +/** Physical plan node for scanning a micro-batch of data from a data source. */ +case class MicroBatchScanExecTransformer( + override val output: Seq[AttributeReference], + @transient override val scan: Scan, + @transient stream: MicroBatchStream, + @transient start: Offset, + @transient end: Offset, + @transient override val table: Table, + override val keyGroupedPartitioning: Option[Seq[Expression]] = None, + override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None +) extends BatchScanExecTransformerBase( + output = output, + scan = scan, + runtimeFilters = Seq.empty, + table = table, + keyGroupedPartitioning = keyGroupedPartitioning, + commonPartitionValues = commonPartitionValues + ) { + + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. + override def equals(other: Any): Boolean = other match { + case other: MicroBatchScanExecTransformer => this.stream == other.stream + case _ => false + } + + override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() + + @transient override lazy val inputPartitions: Seq[InputPartition] = + stream.planInputPartitions(start, end) + + override def filterExprs(): Seq[Expression] = Seq.empty + + override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty + + override def outputAttributes(): Seq[Attribute] = output + + override def getPartitions: Seq[InputPartition] = inputPartitions + + /** Returns the actual schema of this data source scan. */ + override def getDataSchema: StructType = scan.readSchema() + + override def nodeName: String = "Kafka" + super.nodeName + + override lazy val fileFormat: ReadFileFormat = GlutenStreamKafkaSourceUtil.getFileFormat(scan) + + override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { + val groupedPartitions = filteredPartitions.flatten + groupedPartitions.zipWithIndex.map { + case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p) + } + } + + override protected def doTransform(context: SubstraitContext): TransformContext = { + val ctx = super.doTransform(context) + ctx.root.asInstanceOf[ReadRelNode].setStreamKafka(true); + ctx + } +} + +object MicroBatchScanExecTransformer { + def apply(batch: MicroBatchScanExec): MicroBatchScanExecTransformer = { + val output = batch.output.filter(_.isInstanceOf[AttributeReference]).map(_.asInstanceOf[AttributeReference]).toSeq + if (output.size == batch.output.size) { + new MicroBatchScanExecTransformer( + output, + batch.scan, + batch.stream, + batch.start, + batch.end, + null, + Option.empty, + Option.empty) + } else { + throw new UnsupportedOperationException( + s"Unsupported DataSourceV2ScanExecBase: ${batch.output.getClass.getName}") + } + } +} diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala b/gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala new file mode 100644 index 000000000000..1ea54cac0074 --- /dev/null +++ b/gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, MicroBatchScanExec} + +class StreamKafkaTransformerProvider extends DataSourceScanTransformerRegister { + + override val scanClassName: String = + "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan" + + override def createDataSourceV2Transformer( + batchScan: DataSourceV2ScanExecBase): BatchScanExecTransformerBase = { + MicroBatchScanExecTransformer(batchScan.asInstanceOf[MicroBatchScanExec]) + } +} diff --git a/gluten-kafka/src/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala b/gluten-kafka/src/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala new file mode 100644 index 000000000000..d5468f2cdebd --- /dev/null +++ b/gluten-kafka/src/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.kafka010 + +import org.apache.gluten.exception.GlutenNotSupportException +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.gluten.substrait.rel.{SplitInfo, StreamKafkaSourceBuilder} +import org.apache.spark.sql.connector.read.{InputPartition, Scan} + +object GlutenStreamKafkaSourceUtil { + def genSplitInfo( + inputPartition: InputPartition): SplitInfo = inputPartition match { + case batch: KafkaBatchInputPartition => + StreamKafkaSourceBuilder.makeStreamKafkaBatch( + batch.offsetRange.topicPartition.topic(), + batch.offsetRange.topicPartition.partition(), + batch.offsetRange.fromOffset, + batch.offsetRange.untilOffset, + batch.pollTimeoutMs, + batch.failOnDataLoss, + batch.includeHeaders, + batch.executorKafkaParams + ) + case _ => + throw new UnsupportedOperationException("Only support kafka KafkaBatchInputPartition.") + } + + def getFileFormat(scan: Scan): ReadFileFormat = scan.getClass.getSimpleName match { + case "KafkaScan" => ReadFileFormat.KafkaReadFormat + case _ => + throw new GlutenNotSupportException("Only support KafkaScan.") + } + +} diff --git a/gluten-kafka/src/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala b/gluten-kafka/src/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala new file mode 100644 index 000000000000..0855e03192b3 --- /dev/null +++ b/gluten-kafka/src/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.kafka + +import org.apache.gluten.execution.{MicroBatchScanExecTransformer, WholeStageTransformerSuite} +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper +import org.apache.spark.sql.functions.{col, split} +import org.apache.spark.sql.streaming.Trigger + +import scala.concurrent.duration.DurationInt + +trait GlutenKafkaScanSuite extends WholeStageTransformerSuite { + protected val kafkaBootstrapServers: String + + test("test MicroBatchScanExecTransformer not fallback") { + withTempDir( + dir => { + val table_name = "kafka_table" + withTable(s"$table_name") { + val df = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", kafkaBootstrapServers) + .option("subscribe", table_name) + .load() + .withColumn("sp", split(col("value").cast("string"), ",")) + .withColumn("id", col("sp").getItem(0).cast("int")) + .withColumn("name", col("sp").getItem(1).cast("string")) + .drop(col("sp")) + .drop(col("value")) + .drop(col("key")) + .drop(col("topic")) + .drop(col("partition")) + .drop(col("offset")) + .drop(col("timestamp")) + .drop(col("timestampType")) + + val streamQuery = df.writeStream + .format("parquet") + .option("checkpointLocation", dir.getCanonicalPath + "/_checkpoint") + .trigger(Trigger.ProcessingTime("5 seconds")) + .start(dir.getCanonicalPath) + + spark.sql(s""" + |CREATE EXTERNAL TABLE $table_name ( + | id long, + | name string + |)USING Parquet + |LOCATION '${dir.getCanonicalPath}' + |""".stripMargin) + + spark + .range(0, 20) + .selectExpr( + "concat(id,',', id) as value" + ) + .write + .format("kafka") + .option("kafka.bootstrap.servers", kafkaBootstrapServers) + .option("topic", table_name) + .save() + + eventually(timeout(60.seconds), interval(5.seconds)) { + val size = streamQuery + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery + .lastExecution + .executedPlan + .collect { case p: MicroBatchScanExecTransformer => p } + assert(size.size == 1) + streamQuery.awaitTermination(1000) + } + } + }) + } +} diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index 9638e43cd76e..bebc6b1e95ef 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -49,6 +49,7 @@ public enum ReadFileFormat { MergeTreeReadFormat(), TextReadFormat(), JsonReadFormat(), + KafkaReadFormat(), UnknownFormat() } diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java index fba59af57b99..b82e05fd367e 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java @@ -37,6 +37,7 @@ public class ReadRelNode implements RelNode, Serializable { private final List columnTypeNodes = new ArrayList<>(); private final ExpressionNode filterNode; private final AdvancedExtensionNode extensionNode; + private boolean streamKafka = false; ReadRelNode( List types, @@ -51,6 +52,10 @@ public class ReadRelNode implements RelNode, Serializable { this.extensionNode = extensionNode; } + public void setStreamKafka(boolean streamKafka) { + this.streamKafka = streamKafka; + } + @Override public Rel toProtobuf() { RelCommon.Builder relCommonBuilder = RelCommon.newBuilder(); @@ -62,6 +67,7 @@ public Rel toProtobuf() { ReadRel.Builder readBuilder = ReadRel.newBuilder(); readBuilder.setCommon(relCommonBuilder.build()); readBuilder.setBaseSchema(nStructBuilder.build()); + readBuilder.setStreamKafka(streamKafka); if (filterNode != null) { readBuilder.setFilter(filterNode.toProtobuf()); diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java new file mode 100644 index 000000000000..5974e3bb0f42 --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel; + +import io.substrait.proto.ReadRel; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class StreamKafkaSourceNode implements SplitInfo { + + private final String topic; + private final Integer partition; + private final Long startOffset; + private final Long endOffset; + private final Long pollTimeoutMs; + private final Boolean failOnDataLoss; + private final Boolean includeHeaders; + + private final Map kafkaParams; + + public StreamKafkaSourceNode( + String topic, + Integer partition, + Long startOffset, + Long endOffset, + Long pollTimeoutMs, + Boolean failOnDataLoss, + Boolean includeHeaders, + Map kafkaParams) { + this.topic = topic; + this.partition = partition; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.pollTimeoutMs = pollTimeoutMs; + this.failOnDataLoss = failOnDataLoss; + this.includeHeaders = includeHeaders; + this.kafkaParams = kafkaParams; + } + + @Override + public List preferredLocations() { + return Collections.emptyList(); + } + + @Override + public ReadRel.StreamKafka toProtobuf() { + ReadRel.StreamKafka.Builder builder = ReadRel.StreamKafka.newBuilder(); + + ReadRel.StreamKafka.TopicPartition.Builder topicPartition = + ReadRel.StreamKafka.TopicPartition.newBuilder(); + topicPartition.setTopic(topic); + topicPartition.setPartition(partition); + + builder.setTopicPartition(topicPartition.build()); + builder.setStartOffset(startOffset); + builder.setEndOffset(endOffset); + builder.setPollTimeoutMs(pollTimeoutMs); + builder.setFailOnDataLoss(failOnDataLoss); + builder.setIncludeHeaders(includeHeaders); + kafkaParams.forEach((k, v) -> builder.putParams(k, v.toString())); + + return builder.build(); + } +} diff --git a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto index ca669c763906..d53a9aef9d0e 100644 --- a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto @@ -69,6 +69,7 @@ message ReadRel { LocalFiles local_files = 6; NamedTable named_table = 7; ExtensionTable extension_table = 8; + bool stream_kafka = 9; } // A base table. The list of string is used to represent namespacing (e.g., mydb.mytable). @@ -89,6 +90,22 @@ message ReadRel { google.protobuf.Any detail = 1; } + // Used to KafkaBatch or KafkaContinuous source + message StreamKafka { + message TopicPartition { + string topic = 1; + int32 partition = 2; + } + + TopicPartition topic_partition = 1; + int64 start_offset = 2; + int64 end_offset = 3; + map params = 4; + int64 poll_timeout_ms = 5; + bool fail_on_data_loss = 6; + bool include_headers = 7; + } + // Represents a list of files in input of a scan operation message LocalFiles { repeated FileOrFiles items = 1; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 8d0abcaa686a..513360c54702 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, MicroBatchScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec} @@ -207,6 +207,8 @@ object OffloadOthers { // TODO: Add DynamicPartitionPruningHiveScanSuite.scala logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") HiveTableScanExecTransformer(plan) + case plan: MicroBatchScanExec => + applyScanTransformer(plan) case plan: CoalesceExec => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") ColumnarCoalesceExec(plan.numPartitions, plan.child) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 509d7c02ae19..bf60c1df1e5a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, MicroBatchScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec diff --git a/pom.xml b/pom.xml index 8fa2f355030b..9af8e64527f5 100644 --- a/pom.xml +++ b/pom.xml @@ -804,6 +804,15 @@ velox + + kafka + + false + + + gluten-kafka + + spark-ut From 9d21248197ffcde59465dec0b17ba6b4615b4856 Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 24 Dec 2024 11:04:59 +0800 Subject: [PATCH 02/15] fix build --- .../ClickhouseGlutenKafkaScanSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala index 3c13d335274c..14d1983a330d 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala @@ -16,14 +16,14 @@ */ package org.apache.gluten.execution.kafka -import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite - -class ClickhouseGlutenKafkaScanSuite - extends GlutenClickHouseWholeStageTransformerSuite - with GlutenKafkaScanSuite { - - override protected val fileFormat: String = "parquet" - - protected val kafkaBootstrapServers: String = "localhost:9092" - -} +// TODO: need add profile kafka +// import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite +// +// class ClickhouseGlutenKafkaScanSuite +// extends GlutenClickHouseWholeStageTransformerSuite +// with GlutenKafkaScanSuite { +// +// override protected val fileFormat: String = "parquet" +// +// protected val kafkaBootstrapServers: String = "localhost:9092" +// } From 89d7a0294f4d689f7bd57757b79aba9ebbbb0ef1 Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 24 Dec 2024 15:14:21 +0800 Subject: [PATCH 03/15] fix rebase --- backends-clickhouse/pom.xml | 36 ----------- ...g.apache.gluten.component.CHKafkaComponent | 0 .../CHKafkaComponent.scala | 20 +++--- .../ClickhouseGlutenKafkaScanSuite.scala | 20 +++--- .../backendsapi/clickhouse/CHBackend.scala | 1 + .../rel/StreamKafkaSourceBuilder.java | 0 .../MicroBatchScanExecTransformer.scala | 8 +++ .../gluten/execution/OffloadKafkaScan.scala | 56 ++++++++++++++++ .../GlutenStreamKafkaSourceUtil.scala | 0 .../kafka/GlutenKafkaScanSuite.scala | 0 ...xecution.DataSourceScanTransformerRegister | 1 - .../offload/OffloadSingleNodeRules.scala | 4 +- .../columnar/validator/Validators.scala | 2 +- pom.xml | 64 +++++++++++++++++++ 14 files changed, 152 insertions(+), 60 deletions(-) create mode 100644 backends-clickhouse/src-kafka/main/resources/META-INF/gluten-components/org.apache.gluten.component.CHKafkaComponent rename gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala => backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala (57%) rename backends-clickhouse/{src => src-kafka}/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala (71%) rename gluten-kafka/{src => src-kafka}/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java (100%) rename gluten-kafka/{src => src-kafka}/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala (94%) create mode 100644 gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala rename gluten-kafka/{src => src-kafka}/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala (100%) rename gluten-kafka/{src => src-kafka}/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala (100%) delete mode 100644 gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 428af5b0f368..6ca945d8cc58 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -93,42 +93,6 @@ provided - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-kafka-sources - generate-sources - - add-source - - - - ${project.basedir}/src/main-kafka/scala - ${project.basedir}/src/main-kafka/java - - - - - add-kafka-test-sources - generate-test-sources - - add-test-source - - - - ${project.basedir}/src/test-kafka/scala - ${project.basedir}/src/test-kafka/java - - - - - - - diff --git a/backends-clickhouse/src-kafka/main/resources/META-INF/gluten-components/org.apache.gluten.component.CHKafkaComponent b/backends-clickhouse/src-kafka/main/resources/META-INF/gluten-components/org.apache.gluten.component.CHKafkaComponent new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala b/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala similarity index 57% rename from gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala rename to backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala index 1ea54cac0074..1e9f0f9d77a6 100644 --- a/gluten-kafka/src/main/scala/org/apache/gluten/execution/StreamKafkaTransformerProvider.scala +++ b/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala @@ -14,17 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.execution -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, MicroBatchScanExec} +package org.apache.gluten.component -class StreamKafkaTransformerProvider extends DataSourceScanTransformerRegister { +import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.execution.OffloadKafkaScan +import org.apache.gluten.extension.injector.Injector - override val scanClassName: String = - "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan" - - override def createDataSourceV2Transformer( - batchScan: DataSourceV2ScanExecBase): BatchScanExecTransformerBase = { - MicroBatchScanExecTransformer(batchScan.asInstanceOf[MicroBatchScanExec]) +class CHKafkaComponent extends Component { + override def name(): String = "clickhouse-kafka" + override def buildInfo(): Component.BuildInfo = + Component.BuildInfo("ClickHouseKafka", "N/A", "N/A", "N/A") + override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] :: Nil + override def injectRules(injector: Injector): Unit = { + OffloadKafkaScan.inject(injector) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala similarity index 71% rename from backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala rename to backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala index 14d1983a330d..e32914110e26 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala +++ b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala @@ -17,13 +17,13 @@ package org.apache.gluten.execution.kafka // TODO: need add profile kafka -// import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite -// -// class ClickhouseGlutenKafkaScanSuite -// extends GlutenClickHouseWholeStageTransformerSuite -// with GlutenKafkaScanSuite { -// -// override protected val fileFormat: String = "parquet" -// -// protected val kafkaBootstrapServers: String = "localhost:9092" -// } +import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite + +class ClickhouseGlutenKafkaScanSuite + extends GlutenClickHouseWholeStageTransformerSuite + with GlutenKafkaScanSuite { + + override protected val fileFormat: String = "parquet" + + protected val kafkaBootstrapServers: String = "localhost:9092" +} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index d2e1a35a9aad..a47aab55fe45 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -222,6 +222,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { case "OrcScan" => ReadFileFormat.OrcReadFormat case "ParquetScan" => ReadFileFormat.ParquetReadFormat case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat + case "KafkaScan" => ReadFileFormat.KafkaReadFormat case _ => ReadFileFormat.UnknownFormat } } diff --git a/gluten-kafka/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java b/gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java similarity index 100% rename from gluten-kafka/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java rename to gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala similarity index 94% rename from gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala rename to gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala index 402ef2fec31c..b6d869f6eeca 100644 --- a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala +++ b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala @@ -73,6 +73,10 @@ case class MicroBatchScanExecTransformer( override lazy val fileFormat: ReadFileFormat = GlutenStreamKafkaSourceUtil.getFileFormat(scan) + protected[this] def supportsBatchScan(scan: Scan): Boolean = { + MicroBatchScanExecTransformer.supportsBatchScan(scan) + } + override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { val groupedPartitions = filteredPartitions.flatten groupedPartitions.zipWithIndex.map { @@ -105,4 +109,8 @@ object MicroBatchScanExecTransformer { s"Unsupported DataSourceV2ScanExecBase: ${batch.output.getClass.getName}") } } + + def supportsBatchScan(scan: Scan): Boolean = { + scan.getClass.getName == "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan" + } } diff --git a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala new file mode 100644 index 000000000000..5d670e0723f2 --- /dev/null +++ b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.extension.columnar.enumerated.RasOffload +import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform +import org.apache.gluten.extension.columnar.offload.OffloadSingleNode +import org.apache.gluten.extension.columnar.validator.Validators +import org.apache.gluten.extension.injector.Injector +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, MicroBatchScanExec} + +case class OffloadKafkaScan() extends OffloadSingleNode { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case scan: MicroBatchScanExec if MicroBatchScanExecTransformer.supportsBatchScan(scan.scan) => + MicroBatchScanExecTransformer(scan) + case other => other + } +} + +object OffloadKafkaScan { + def inject(injector: Injector): Unit = { + // Inject legacy rule. + injector.gluten.legacy.injectTransform { + c => + val offload = Seq(OffloadKafkaScan()) + HeuristicTransform.Simple( + Validators.newValidator(c.glutenConf, offload), + offload + ) + } + + // Inject RAS rule. + injector.gluten.ras.injectRasRule { + c => + RasOffload.Rule( + RasOffload.from[BatchScanExec](OffloadKafkaScan()), + Validators.newValidator(c.glutenConf), + Nil) + } + } +} diff --git a/gluten-kafka/src/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala b/gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala similarity index 100% rename from gluten-kafka/src/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala rename to gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala diff --git a/gluten-kafka/src/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala similarity index 100% rename from gluten-kafka/src/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala rename to gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala diff --git a/gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister b/gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister deleted file mode 100644 index 13d23f79e1f9..000000000000 --- a/gluten-kafka/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister +++ /dev/null @@ -1 +0,0 @@ -org.apache.gluten.execution.StreamKafkaTransformerProvider \ No newline at end of file diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 513360c54702..8d0abcaa686a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, MicroBatchScanExec} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec} @@ -207,8 +207,6 @@ object OffloadOthers { // TODO: Add DynamicPartitionPruningHiveScanSuite.scala logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") HiveTableScanExecTransformer(plan) - case plan: MicroBatchScanExec => - applyScanTransformer(plan) case plan: CoalesceExec => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") ColumnarCoalesceExec(plan.numPartitions, plan.child) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index bf60c1df1e5a..509d7c02ae19 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, MicroBatchScanExec} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec diff --git a/pom.xml b/pom.xml index 9af8e64527f5..924650bfbe00 100644 --- a/pom.xml +++ b/pom.xml @@ -812,6 +812,70 @@ gluten-kafka + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-iceberg-sources + generate-sources + + add-source + + + + ${project.basedir}/src-kafka/main/scala + ${project.basedir}/src-kafka/main/java + + + + + add-kafka-resources + generate-resources + + add-resource + + + + + ${project.basedir}/src-kafka/main/resources + + + + + + add-kafka-test-sources + generate-test-sources + + add-test-source + + + + ${project.basedir}/src-kafka/test/scala + ${project.basedir}/src-kafka/test/java + + + + + add-kafka-test-resources + generate-test-resources + + add-test-resource + + + + + ${project.basedir}/src-kafka/test/resources + + + + + + + + spark-ut From 9d1cb9ece7b7ad48cc94c8e3ec895aed1c737e8b Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 24 Dec 2024 20:34:44 +0800 Subject: [PATCH 04/15] fix license --- cpp-ch/local-engine/CMakeLists.txt | 1 + .../RelParsers/StreamKafkaRelParser.cpp | 2 +- .../Parser/RelParsers/StreamKafkaRelParser.h | 21 ++++++++++++------- .../Storages/Kafka/GlutenKafkaSource.cpp | 2 +- .../Storages/Kafka/GlutenKafkaSource.h | 2 +- .../Storages/Kafka/GlutenKafkaUtils.cpp | 2 +- .../Storages/Kafka/GlutenKafkaUtils.h | 2 +- .../Kafka/ReadFromGlutenStorageKafka.cpp | 2 +- .../Kafka/ReadFromGlutenStorageKafka.h | 2 +- 9 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index f1819ff21726..ccffdd9aa011 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -57,6 +57,7 @@ add_headers_and_sources(storages Storages/Output) add_headers_and_sources(storages Storages/Serializations) add_headers_and_sources(storages Storages/IO) add_headers_and_sources(storages Storages/MergeTree) +add_headers_and_sources(storages Storages/Kafka) add_headers_and_sources(storages Storages/Cache) add_headers_and_sources(common Common) add_headers_and_sources(external External) diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp index 94b18bc8bd45..dac6545bba37 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h index d99f8fda7412..f8ac4b065ded 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 @@ -18,12 +18,20 @@ #include +namespace DB +{ +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} +} + namespace local_engine { class StreamKafkaRelParser : public RelParser { public: - explicit StreamKafkaRelParser(ParserContextPtr parser_context_, const ContextPtr & context_) + explicit StreamKafkaRelParser(ParserContextPtr parser_context_, const DB::ContextPtr & context_) : RelParser(parser_context_), context(context_) { } @@ -32,21 +40,20 @@ class StreamKafkaRelParser : public RelParser DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override { - throw Exception(ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call parse(), call parseReadRel instead."); + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call parse(), call parseReadRel instead."); } std::optional getSingleInput(const substrait::Rel &) override { - throw Exception(ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call getSingleInput()."); + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call getSingleInput()."); } - DB::QueryPlanPtr parseReadRel( - DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel); + DB::QueryPlanPtr parseReadRel(DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel); void setSplitInfo(String split_info_) { split_info = split_info_; } private: - ContextPtr context; + DB::ContextPtr context; String split_info; }; diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp index eb1dd92756a5..5fdeae45db9e 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h index e007bdc2f5b4..17a70eeed62a 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp index 9dc25a0a0860..c5a68385239a 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h index 411e78d1d001..517341c259bf 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp index 9b6fdc31bd92..7e304a18943d 100644 --- a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp +++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h index 477fe2f10957..820189a101a8 100644 --- a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h +++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h @@ -1,5 +1,5 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 From 4da81ea8f04740d41c0009c2bfeebeb4cfd2109f Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 24 Dec 2024 20:35:13 +0800 Subject: [PATCH 05/15] rm todo --- .../gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala index e32914110e26..cfb53a30ac99 100644 --- a/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala +++ b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.execution.kafka -// TODO: need add profile kafka import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite class ClickhouseGlutenKafkaScanSuite From afc6e077d4eba3ba7db65caf9b0552639144d160 Mon Sep 17 00:00:00 2001 From: loneylee Date: Wed, 25 Dec 2024 20:49:22 +0800 Subject: [PATCH 06/15] fix ci --- .../Storages/Kafka/GlutenKafkaSource.cpp | 6 ++-- .../Storages/Kafka/GlutenKafkaSource.h | 36 +++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp index 5fdeae45db9e..589788dabef1 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp @@ -17,7 +17,6 @@ #include "GlutenKafkaSource.h" -#include #include #include #include @@ -131,6 +130,7 @@ GlutenKafkaSource::GlutenKafkaSource( , partition(partition_) { max_block_size = end_offset - start_offset; + client_id = topics[0] + "_" + std::to_string(partition); for (const auto & columns_with_type_and_name : result_header.getColumnsWithTypeAndName()) { @@ -156,7 +156,7 @@ void GlutenKafkaSource::initConsumer() { std::lock_guard lock(consumer_mutex); auto topic_partition = TopicPartition{topics[0], partition}; - consumers_in_memory.try_emplace(topic_partition, std::vector()); + consumers_in_memory.try_emplace(topic_partition, std::vector>()); auto & consumers = consumers_in_memory[topic_partition]; @@ -170,7 +170,7 @@ void GlutenKafkaSource::initConsumer() { LOG_DEBUG(log, "Creating new Kafka consumer for topic: {}, partition: {}", topics[0], partition); String collection_name = ""; - KafkaConsumerPtr kafka_consumer_ptr = std::make_shared( + std::shared_ptr kafka_consumer_ptr = std::make_shared( log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h index 17a70eeed62a..6b861279cf65 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h @@ -19,26 +19,26 @@ #include -#include +#include +#include namespace local_engine { -using namespace DB; -class GlutenKafkaSource : public ISource +class GlutenKafkaSource : public DB::ISource { public: GlutenKafkaSource( - const Block & result_header_, - const ContextPtr & context_, - const Names & topics_, + const DB::Block & result_header_, + const DB::ContextPtr & context_, + const DB::Names & topics_, const size_t & partition_, const String & brokers_, const String & group_, const size_t & poll_timeout_ms_, const size_t & start_offset_, const size_t & end_offset_, - const std::shared_ptr & kafka_settings_); + const std::shared_ptr & kafka_settings_); ~GlutenKafkaSource() override; @@ -55,10 +55,10 @@ class GlutenKafkaSource : public ISource String getName() const override { return "GlutenKafkaSource"; } protected: - Chunk generate() override; + DB::Chunk generate() override; private: - Chunk generateImpl(); + DB::Chunk generateImpl(); void initConsumer(); size_t getPollMaxBatchSize() const; @@ -66,23 +66,23 @@ class GlutenKafkaSource : public ISource size_t getPollTimeoutMillisecond() const; LoggerPtr log; - ContextPtr context; + DB::ContextPtr context; UInt64 max_block_size; - KafkaConsumerPtr consumer; + std::shared_ptr consumer; - Block result_header; - Block virtual_header; - Block non_virtual_header; - std::shared_ptr kafka_settings; + DB::Block result_header; + DB::Block virtual_header; + DB::Block non_virtual_header; + std::shared_ptr kafka_settings; - const Names topics; + const DB::Names topics; const size_t partition; const String brokers; const String group; - const String client_id = "123"; const size_t poll_timeout_ms; const size_t start_offset; const size_t end_offset; + String client_id; bool finished = false; }; @@ -105,5 +105,5 @@ struct hash namespace local_engine { static std::mutex consumer_mutex; -static std::unordered_map> consumers_in_memory; +static std::unordered_map>> consumers_in_memory; } \ No newline at end of file From dfff37fb2c7ba47a587674df7129afc84d222f9c Mon Sep 17 00:00:00 2001 From: loneylee Date: Thu, 26 Dec 2024 11:02:06 +0800 Subject: [PATCH 07/15] add kafka on --- cpp-ch/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp-ch/CMakeLists.txt b/cpp-ch/CMakeLists.txt index 15be68a2a6bd..1b23fa38d772 100644 --- a/cpp-ch/CMakeLists.txt +++ b/cpp-ch/CMakeLists.txt @@ -106,7 +106,7 @@ else() -DENABLE_TESTS=OFF -DENABLE_JEMALLOC=ON -DENABLE_MULTITARGET_CODE=ON -DENABLE_EXTERN_LOCAL_ENGINE=ON -DENABLE_ODBC=OFF -DENABLE_CAPNP=OFF -DENABLE_GRPC=OFF -DENABLE_RUST=OFF -DENABLE_H3=OFF -DENABLE_AMQPCPP=OFF - -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=OFF -DENABLE_NATS=OFF + -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=ON -DENABLE_NATS=OFF -DENABLE_LIBPQXX=OFF -DENABLE_NURAFT=OFF -DENABLE_DATASKETCHES=OFF -DENABLE_SQLITE=OFF -DENABLE_S2_GEOMETRY=OFF -DENABLE_ANNOY=OFF -DENABLE_ULID=OFF -DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF -DENABLE_LDAP=OFF From ed8441e43d88b111af02e9b2b1faa163b8d404b6 Mon Sep 17 00:00:00 2001 From: loneylee Date: Fri, 3 Jan 2025 15:32:19 +0800 Subject: [PATCH 08/15] fix ut --- .../apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala index 0855e03192b3..632f6b0c160a 100644 --- a/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala +++ b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration.DurationInt -trait GlutenKafkaScanSuite extends WholeStageTransformerSuite { +abstract class GlutenKafkaScanSuite extends WholeStageTransformerSuite { protected val kafkaBootstrapServers: String test("test MicroBatchScanExecTransformer not fallback") { From 830a658d966aae5c557b6631d8544d813cb96cdc Mon Sep 17 00:00:00 2001 From: loneylee Date: Mon, 6 Jan 2025 21:42:00 +0800 Subject: [PATCH 09/15] add more ut --- .../RelParsers/StreamKafkaRelParser.cpp | 11 +++- .../Storages/Kafka/GlutenKafkaSource.cpp | 63 ++++++++++--------- .../Storages/Kafka/GlutenKafkaSource.h | 1 + .../MicroBatchScanExecTransformer.scala | 2 +- .../kafka/GlutenKafkaScanSuite.scala | 2 +- 5 files changed, 46 insertions(+), 33 deletions(-) diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp index dac6545bba37..d606e60f2526 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp @@ -60,7 +60,15 @@ DB::QueryPlanPtr StreamKafkaRelParser::parseReadRel(DB::QueryPlanPtr query_plan, else if (param.first == "bootstrap.servers") brokers = param.second; else - std::cout << param.first << " : " << param.second << std::endl; + LOG_DEBUG(getLogger("StreamKafkaRelParser"), "Unused kafka parameter: {}: {}", param.first, param.second); + + LOG_INFO( + getLogger("StreamKafkaRelParser"), + "Kafka source: topic: {}, partition: {}, start_offset: {}, end_offset: {}", + topic, + partition, + start_offset, + end_offset); Names topics; topics.emplace_back(topic); @@ -72,7 +80,6 @@ DB::QueryPlanPtr StreamKafkaRelParser::parseReadRel(DB::QueryPlanPtr query_plan, query_plan->addStep(std::move(source)); - return query_plan; } diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp index 589788dabef1..89d26d975424 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp @@ -150,6 +150,12 @@ GlutenKafkaSource::~GlutenKafkaSource() std::lock_guard lock(consumer_mutex); auto topic_partition = TopicPartition{topics[0], partition}; consumers_in_memory[topic_partition].emplace_back(consumer); + LOG_DEBUG( + log, + "Kafka consumer for topic: {}, partition: {} is returned to pool, current pool size: {}", + topics[0], + partition, + consumers_in_memory[topic_partition].size()); } void GlutenKafkaSource::initConsumer() @@ -162,6 +168,7 @@ void GlutenKafkaSource::initConsumer() if (!consumers.empty()) { + LOG_DEBUG(log, "Reuse Kafka consumer for topic: {}, partition: {}", topics[0], partition); consumer = consumers.back(); consumers.pop_back(); } @@ -175,7 +182,7 @@ void GlutenKafkaSource::initConsumer() getPollMaxBatchSize(), getPollTimeoutMillisecond(), /*intermediate_commit*/ false, - /*stream_cancelled*/ is_cancelled, + /*stream_cancelled*/ is_stopped, topics); KafkaConfigLoader::ConsumerConfigParams params{ @@ -234,24 +241,23 @@ Chunk GlutenKafkaSource::generateImpl() }; EmptyReadBuffer empty_buf; - auto input_format - = FormatFactory::instance().getInput("Raw", empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); + // auto input_format + // = FormatFactory::instance().getInput("Raw", empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); - StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); + // StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); size_t total_rows = 0; MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); + MutableColumns no_virtual_columns = non_virtual_header.cloneEmptyColumns(); while (true) { size_t new_rows = 0; if (auto buf = consumer->consume()) { - ProfileEvents::increment(ProfileEvents::KafkaMessagesRead); - new_rows = executor.execute(*buf); - } + String message; + readStringUntilEOF(message, *buf); + no_virtual_columns[0]->insert(message); - if (new_rows) - { // In read_kafka_message(), KafkaConsumer::nextImpl() // will be called, that may make something unusable, i.e. clean // KafkaConsumer::messages, which is accessed from @@ -269,28 +275,25 @@ Chunk GlutenKafkaSource::generateImpl() auto timestamp_raw = consumer->currentTimestamp(); auto header_list = consumer->currentHeaderList(); - for (size_t i = 0; i < new_rows; ++i) + virtual_columns[0]->insert(key); + virtual_columns[1]->insert(topic); + virtual_columns[2]->insert(partition); + virtual_columns[3]->insert(offset); + + if (timestamp_raw) { - virtual_columns[0]->insert(key); - virtual_columns[1]->insert(topic); - virtual_columns[2]->insert(partition); - virtual_columns[3]->insert(offset); - - if (timestamp_raw) - { - auto ts = timestamp_raw->get_timestamp(); - virtual_columns[4]->insert( - DecimalField(std::chrono::duration_cast(ts).count(), 3)); - } - else - { - virtual_columns[4]->insertDefault(); - } - - virtual_columns[5]->insertDefault(); + auto ts = timestamp_raw->get_timestamp(); + virtual_columns[4]->insert( + DecimalField(std::chrono::duration_cast(ts).count(), 3)); } + else + { + virtual_columns[4]->insertDefault(); + } + + virtual_columns[5]->insertDefault(); - total_rows = total_rows + new_rows; + total_rows = total_rows + 1; } else if (consumer->polledDataUnusable()) { @@ -314,6 +317,8 @@ Chunk GlutenKafkaSource::generateImpl() break; } + LOG_DEBUG(log, "Read {} rows from Kafka topic: {}, partition: {}", total_rows, topics[0], partition); + if (total_rows == 0) return {}; @@ -326,7 +331,7 @@ Chunk GlutenKafkaSource::generateImpl() return {}; } - auto result_block = non_virtual_header.cloneWithColumns(executor.getResultColumns()); //.cloneWithCutColumns(0, max_block_size); + auto result_block = non_virtual_header.cloneWithColumns(std::move(no_virtual_columns)); //.cloneWithCutColumns(0, max_block_size); auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); //.cloneWithCutColumns(0, max_block_size); for (const auto & column : virtual_block.getColumnsWithTypeAndName()) diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h index 6b861279cf65..c1aa54157908 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h @@ -106,4 +106,5 @@ namespace local_engine { static std::mutex consumer_mutex; static std::unordered_map>> consumers_in_memory; +static const std::atomic is_stopped{false}; // for kafka progress, it always false } \ No newline at end of file diff --git a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala index b6d869f6eeca..9170a0898025 100644 --- a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala +++ b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala @@ -69,7 +69,7 @@ case class MicroBatchScanExecTransformer( /** Returns the actual schema of this data source scan. */ override def getDataSchema: StructType = scan.readSchema() - override def nodeName: String = "Kafka" + super.nodeName + override def nodeName: String = s"MicroBatchScanExecTransformer(${scan.description()})" override lazy val fileFormat: ReadFileFormat = GlutenStreamKafkaSourceUtil.getFileFormat(scan) diff --git a/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala index 632f6b0c160a..0855e03192b3 100644 --- a/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala +++ b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration.DurationInt -abstract class GlutenKafkaScanSuite extends WholeStageTransformerSuite { +trait GlutenKafkaScanSuite extends WholeStageTransformerSuite { protected val kafkaBootstrapServers: String test("test MicroBatchScanExecTransformer not fallback") { From 2dd2d472268c136c2438becbf93cb18e0af00983 Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 7 Jan 2025 15:08:06 +0800 Subject: [PATCH 10/15] update metrics --- .../metrics/BatchScanMetricsUpdater.scala | 5 ++- .../Parser/RelParsers/ReadRelParser.cpp | 4 +- .../Parser/RelParsers/ReadRelParser.h | 2 +- .../RelParsers/StreamKafkaRelParser.cpp | 13 +++++- .../Parser/RelParsers/StreamKafkaRelParser.h | 10 ++--- .../Storages/Kafka/GlutenKafkaSource.cpp | 42 ++----------------- 6 files changed, 24 insertions(+), 52 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala index 1b9389351bf2..9002b33e38c5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala @@ -64,6 +64,7 @@ class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]) object BatchScanMetricsUpdater { // in mergetree format, the processor name is `MergeTreeSelect(pool: XXX, algorithm: XXX)` - val INCLUDING_PROCESSORS = Array("MergeTreeSelect(pool", "SubstraitFileSource") - val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource") + val INCLUDING_PROCESSORS = + Array("MergeTreeSelect(pool", "SubstraitFileSource", "GlutenKafkaSource") + val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource", "GlutenKafkaSource") } diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp index 5be79ecb1c23..f65584eaea03 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp @@ -47,7 +47,7 @@ extern const int LOGICAL_ERROR; namespace local_engine { using namespace DB; -DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) +DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack) { if (query_plan) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null"); @@ -75,7 +75,7 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra { StreamKafkaRelParser kafka_parser(parser_context, getContext()); kafka_parser.setSplitInfo(split_info); - query_plan = kafka_parser.parseReadRel(std::make_unique(), read); + query_plan = kafka_parser.parse(std::make_unique(), rel, rel_stack); steps = kafka_parser.getSteps(); } else diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h index 838ce6392003..a6ad920947b2 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h @@ -34,7 +34,7 @@ class ReadRelParser : public RelParser return parse(std::move(query_plan), rel, rel_stack_); } - DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) override; + DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack) override; // This is source node, there is no input std::optional getSingleInput(const substrait::Rel & rel) override { return {}; } diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp index d606e60f2526..0771b481d7e6 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp @@ -36,9 +36,17 @@ extern const int UNKNOWN_TYPE; namespace local_engine { -using namespace DB; -DB::QueryPlanPtr StreamKafkaRelParser::parseReadRel(DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel) +DB::QueryPlanPtr +StreamKafkaRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) +{ + if (rel.has_read()) + return parseRelImpl(std::move(query_plan), rel.read()); + + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't parse rel:{}", rel.ShortDebugString()); +} + +DB::QueryPlanPtr StreamKafkaRelParser::parseRelImpl(DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel) { if (!read_rel.has_stream_kafka()) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can't not parse kafka rel, because of read rel don't contained stream kafka"); @@ -78,6 +86,7 @@ DB::QueryPlanPtr StreamKafkaRelParser::parseReadRel(DB::QueryPlanPtr query_plan, auto source = std::make_unique( names, header, getContext(), topics, partition, start_offset, end_offset, poll_timeout_ms, group_id, brokers); + steps.emplace_back(source.get()); query_plan->addStep(std::move(source)); return query_plan; diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h index f8ac4b065ded..1fd12dea0026 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h @@ -38,21 +38,19 @@ class StreamKafkaRelParser : public RelParser ~StreamKafkaRelParser() override = default; - DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override - { - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call parse(), call parseReadRel instead."); - } + DB::QueryPlanPtr + parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override; std::optional getSingleInput(const substrait::Rel &) override { throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't call getSingleInput()."); } - DB::QueryPlanPtr parseReadRel(DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel); - void setSplitInfo(String split_info_) { split_info = split_info_; } private: + DB::QueryPlanPtr parseRelImpl(DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel); + DB::ContextPtr context; String split_info; diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp index 89d26d975424..471899d933b3 100644 --- a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp +++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp @@ -208,50 +208,12 @@ Chunk GlutenKafkaSource::generateImpl() if (!consumer) initConsumer(); - // TODO: delete - bool put_error_to_stream = false; - std::optional exception_message; - - auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) - { - ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); - - if (put_error_to_stream) - { - exception_message = e.message(); - for (size_t i = 0; i < result_columns.size(); ++i) - { - // We could already push some rows to result_columns before exception, we need to fix it. - result_columns[i]->rollback(*checkpoints[i]); - - // all data columns will get default value in case of error - result_columns[i]->insertDefault(); - } - - return 1; - } - - e.addMessage( - "while parsing Kafka message (topic: {}, partition: {}, offset: {})'", - consumer->currentTopic(), - consumer->currentPartition(), - consumer->currentOffset()); - consumer->setExceptionInfo(e.message()); - throw std::move(e); - }; - - EmptyReadBuffer empty_buf; - // auto input_format - // = FormatFactory::instance().getInput("Raw", empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); - - // StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); size_t total_rows = 0; MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); MutableColumns no_virtual_columns = non_virtual_header.cloneEmptyColumns(); while (true) { - size_t new_rows = 0; if (auto buf = consumer->consume()) { String message; @@ -265,7 +227,7 @@ Chunk GlutenKafkaSource::generateImpl() if (consumer->isStalled()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Polled messages became unusable"); - ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows); + ProfileEvents::increment(ProfileEvents::KafkaRowsRead, 1); consumer->storeLastReadMessageOffset(); auto topic = consumer->currentTopic(); @@ -337,6 +299,8 @@ Chunk GlutenKafkaSource::generateImpl() for (const auto & column : virtual_block.getColumnsWithTypeAndName()) result_block.insert(column); + progress(total_rows, result_block.bytes()); + auto converting_dag = ActionsDAG::makeConvertingActions( result_block.cloneEmpty().getColumnsWithTypeAndName(), getPort().getHeader().getColumnsWithTypeAndName(), From 88dbaf5bc17fba9dd0922af6296504a202adfb7b Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 7 Jan 2025 15:09:39 +0800 Subject: [PATCH 11/15] add cmake --- cpp-ch/local-engine/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index ccffdd9aa011..4be2bdbc38d7 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -53,6 +53,7 @@ add_headers_and_sources(parser Parser) add_headers_and_sources(parser Parser/RelParsers) add_headers_and_sources(rewriter Rewriter) add_headers_and_sources(storages Storages) +add_headers_and_sources(storages Storages/Kafka) add_headers_and_sources(storages Storages/Output) add_headers_and_sources(storages Storages/Serializations) add_headers_and_sources(storages Storages/IO) From 2e9f3921deda15683677fcabeff530f82c829b2a Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 7 Jan 2025 15:51:11 +0800 Subject: [PATCH 12/15] fix pom --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 924650bfbe00..cd321f327d50 100644 --- a/pom.xml +++ b/pom.xml @@ -819,7 +819,7 @@ build-helper-maven-plugin - add-iceberg-sources + add-kafka-sources generate-sources add-source From bce42a99a39eecafc376c467ca7515cacbe1ce3e Mon Sep 17 00:00:00 2001 From: loneylee Date: Wed, 8 Jan 2025 12:06:11 +0800 Subject: [PATCH 13/15] add input partition shim --- .../gluten/execution/MicroBatchScanExecTransformer.scala | 4 ++-- .../sql/execution/datasources/v2/AbstractBatchScanExec.scala | 5 ++++- .../sql/execution/datasources/v2/AbstractBatchScanExec.scala | 5 ++++- .../sql/execution/datasources/v2/AbstractBatchScanExec.scala | 5 ++++- .../sql/execution/datasources/v2/AbstractBatchScanExec.scala | 5 ++++- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala index 9170a0898025..d3a581ae3071 100644 --- a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala +++ b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala @@ -55,7 +55,7 @@ case class MicroBatchScanExecTransformer( override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() - @transient override lazy val inputPartitions: Seq[InputPartition] = + @transient override lazy val inputPartitionsShim: Seq[InputPartition] = stream.planInputPartitions(start, end) override def filterExprs(): Seq[Expression] = Seq.empty @@ -64,7 +64,7 @@ case class MicroBatchScanExecTransformer( override def outputAttributes(): Seq[Attribute] = output - override def getPartitions: Seq[InputPartition] = inputPartitions + override def getPartitions: Seq[InputPartition] = inputPartitionsShim /** Returns the actual schema of this data source scan. */ override def getDataSchema: StructType = scan.readSchema() diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index 6b495105d9c6..001cd9a5825f 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -46,7 +46,10 @@ abstract class AbstractBatchScanExec( override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) - @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + @transient override lazy val partitions: Seq[InputPartition] = inputPartitionsShim + + @transient protected lazy val inputPartitionsShim: Seq[InputPartition] = + batch.planInputPartitions() @transient private lazy val filteredPartitions: Seq[InputPartition] = { val dataSourceFilters = runtimeFilters.flatMap { diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index f0ee5c1ab743..b12a257a0c1a 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -47,7 +47,10 @@ abstract class AbstractBatchScanExec( override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) - @transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions() + @transient override lazy val inputPartitions: Seq[InputPartition] = inputPartitionsShim + + @transient protected lazy val inputPartitionsShim: Seq[InputPartition] = + batch.planInputPartitions() @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = { val dataSourceFilters = runtimeFilters.flatMap { diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index 3313c3c76842..aeaf5ccf7a16 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -57,7 +57,10 @@ abstract class AbstractBatchScanExec( override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) - @transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions() + @transient override lazy val inputPartitions: Seq[InputPartition] = inputPartitionsShim + + @transient protected lazy val inputPartitionsShim: Seq[InputPartition] = + batch.planInputPartitions() @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = { val dataSourceFilters = runtimeFilters.flatMap { diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index 8f51ea2c72b1..3508711124e2 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -52,7 +52,10 @@ abstract class AbstractBatchScanExec( override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) - @transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions() + @transient override lazy val inputPartitions: Seq[InputPartition] = inputPartitionsShim + + @transient protected lazy val inputPartitionsShim: Seq[InputPartition] = + batch.planInputPartitions() @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = { val dataSourceFilters = runtimeFilters.flatMap { From e5de15c450d1b1a78394dd61712a9319332f3cf2 Mon Sep 17 00:00:00 2001 From: loneylee Date: Mon, 13 Jan 2025 20:45:03 +0800 Subject: [PATCH 14/15] fix version --- gluten-kafka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-kafka/pom.xml b/gluten-kafka/pom.xml index 35ad69f8dc8a..ff633728a830 100644 --- a/gluten-kafka/pom.xml +++ b/gluten-kafka/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 From 14f33c074e3739837469f444ed390789060d27c3 Mon Sep 17 00:00:00 2001 From: loneylee Date: Fri, 17 Jan 2025 15:43:01 +0800 Subject: [PATCH 15/15] Fix review --- docs/developers/SubstraitModifications.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/developers/SubstraitModifications.md b/docs/developers/SubstraitModifications.md index 3db2b5869c5e..8ca2ab4f107b 100644 --- a/docs/developers/SubstraitModifications.md +++ b/docs/developers/SubstraitModifications.md @@ -29,6 +29,7 @@ changed `Unbounded` in `WindowFunction` into `Unbounded_Preceding` and `Unbounde * Added `TopNRel` ([#5409](https://github.com/apache/incubator-gluten/pull/5409)). * Added `ref` field in window bound `Preceding` and `Following` ([#5626](https://github.com/apache/incubator-gluten/pull/5626)). * Added `BucketSpec` field in `WriteRel`([#8386](https://github.com/apache/incubator-gluten/pull/8386)) +* Added `StreamKafka` in `ReadRel`([#8321](https://github.com/apache/incubator-gluten/pull/8321)) ## Modifications to type.proto