diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index 8d237a40265df2..daba1af2bce832 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -158,6 +158,14 @@ Status BrokerFileWriter::finalize() { return Status::OK(); } +Status BrokerFileWriter::open() { + if (!_opened) { + RETURN_IF_ERROR(_open()); + _opened = true; + } + return Status::OK(); +} + Status BrokerFileWriter::_open() { TBrokerOpenWriterRequest request; diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index 95773f0a04ea42..e3e53525679671 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -45,6 +45,7 @@ class BrokerFileWriter : public FileWriter { int64_t start_offset, FileSystemSPtr fs); virtual ~BrokerFileWriter(); + Status open() override; Status close() override; Status abort() override; Status appendv(const Slice* data, size_t data_cnt) override; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index dfd8d2f094d93d..03f092c042402a 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -36,6 +36,9 @@ class FileWriter { DISALLOW_COPY_AND_ASSIGN(FileWriter); + // Open the file for writing. + virtual Status open() { return Status::OK(); } + // Normal close. Wait for all data to persist before returning. virtual Status close() = 0; diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 936defb2f6ce0f..a5f5dab9fd4e49 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -108,6 +108,14 @@ Status HdfsFileWriter::finalize() { return Status::OK(); } +Status HdfsFileWriter::open() { + if (!_opened) { + RETURN_IF_ERROR(_open()); + _opened = true; + } + return Status::OK(); +} + Status HdfsFileWriter::_open() { _path = convert_path(_path, _hdfs_fs->_fs_name); std::string hdfs_dir = _path.parent_path().string(); diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index 371cae6e0cd941..812598e7a51cd4 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -36,6 +36,7 @@ class HdfsFileWriter : public FileWriter { HdfsFileWriter(Path file, FileSystemSPtr fs); ~HdfsFileWriter(); + Status open() override; Status close() override; Status abort() override; Status appendv(const Slice* data, size_t data_cnt) override; diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 18de6ed038983d..78c8f9355c9875 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -189,16 +189,32 @@ Status S3FileWriter::close() { return _st; } VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); - // it might be one file less than 5MB, we do upload here - if (_pending_buf != nullptr) { - if (_upload_id.empty()) { + + if (_upload_id.empty()) { + if (_pending_buf != nullptr) { + // it might be one file less than 5MB, we do upload here + _pending_buf->set_upload_remote_callback( + [this, buf = _pending_buf]() { _put_object(*buf); }); + } else { + // if there is no pending buffer, we need to create an empty file + _pending_buf = S3FileBufferPool::GetInstance()->allocate(); + // if there is no upload id, we need to create a new one _pending_buf->set_upload_remote_callback( [this, buf = _pending_buf]() { _put_object(*buf); }); + _pending_buf->set_finish_upload([this]() { _countdown_event.signal(); }); + _pending_buf->set_is_cancel([this]() { return _failed.load(); }); + _pending_buf->set_on_failed([this](Status st) { + VLOG_NOTICE << "failed at key: " << _key << ", status: " << st.to_string(); + std::unique_lock _lck {_completed_lock}; + this->_st = std::move(st); + _failed = true; + }); } - _countdown_event.add_count(); - _pending_buf->submit(); - _pending_buf = nullptr; } + _countdown_event.add_count(); + _pending_buf->submit(); + _pending_buf = nullptr; + RETURN_IF_ERROR(_complete()); return Status::OK(); diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index 2ab14485ddfecb..080a351fa8137e 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -154,6 +154,7 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { FileFactory::convert_storage_type(_storage_type), _state->exec_env(), _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0, _file_writer_impl)); + RETURN_IF_ERROR(_file_writer_impl->open()); switch (_file_opts->file_format) { case TFileFormatType::FORMAT_CSV_PLAIN: // just use file writer is enough diff --git a/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out new file mode 100644 index 00000000000000..260c177d310c7d --- /dev/null +++ b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_base1 -- + +-- !select_tvf1 -- + +-- !select_tvf2 -- + +-- !select_tvf3 -- + diff --git a/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy new file mode 100644 index 00000000000000..1804fff2a11450 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_outfile_empty_data", "external,hive,tvf,external_docker") { + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + // use to outfile to hdfs + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + // It's okay to use random `hdfsUser`, but can not be empty. + def hdfsUserName = "doris" + def format = "csv" + def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}" + + // use to outfile to s3 + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + // broker + String broker_name = "hdfs" + + def export_table_name = "outfile_empty_data_test" + + def create_table = {table_name, column_define -> + sql """ DROP TABLE IF EXISTS ${table_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + ${column_define} + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + } + + def outfile_to_HDFS_directly = { + // select ... into outfile ... + def uuid = UUID.randomUUID().toString() + + hdfs_outfile_path = "/user/doris/tmp_data/${uuid}" + uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS ${format} + PROPERTIES ( + "fs.defaultFS"="${defaultFS}", + "hadoop.username" = "${hdfsUserName}" + ); + """ + logger.info("outfile to hdfs direct success path: " + res[0][3]); + return res[0][3] + } + + def outfile_to_HDFS_with_broker = { + // select ... into outfile ... + def uuid = UUID.randomUUID().toString() + + hdfs_outfile_path = "/user/doris/tmp_data/${uuid}" + uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS ${format} + PROPERTIES ( + "broker.fs.defaultFS"="${defaultFS}", + "broker.name"="hdfs", + "broker.username" = "${hdfsUserName}" + ); + """ + logger.info("outfile to hdfs with broker success path: " + res[0][3]); + return res[0][3] + } + + def outfile_to_S3_directly = { + // select ... into outfile ... + s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/" + uri = "s3://${s3_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS csv + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + logger.info("outfile to s3 success path: " + res[0][3]); + return res[0][3] + } + + try { + def doris_column_define = """ + `user_id` INT NOT NULL COMMENT "用户id", + `name` STRING NULL, + `age` INT NULL""" + // create table + create_table(export_table_name, doris_column_define); + // test outfile empty data to hdfs directly + def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly() + // test outfile empty data to hdfs with broker + def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker() + // test outfile empty data to s3 directly + def outfile_to_s3_directly_url = outfile_to_S3_directly() + qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY user_id; """ + + qt_select_tvf1 """ select * from HDFS( + "uri" = "${outfile_to_hdfs_directly_url}0.csv", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); + """ + + qt_select_tvf2 """ select * from HDFS( + "uri" = "${outfile_to_hdfs_with_broker_url}0.csv", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); + """ + + qt_select_tvf3 """ SELECT * FROM S3 ( + "uri" = "http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4, outfile_to_s3_directly_url.length())}0.csv", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}", + "use_path_style" = "true" + ); + """ + + } finally { + } +}