Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/io/fs/broker_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ Status BrokerFileWriter::finalize() {
return Status::OK();
}

Status BrokerFileWriter::open() {
if (!_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
return Status::OK();
}

Status BrokerFileWriter::_open() {
TBrokerOpenWriterRequest request;

Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/broker_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class BrokerFileWriter : public FileWriter {
int64_t start_offset, FileSystemSPtr fs);
virtual ~BrokerFileWriter();

Status open() override;
Status close() override;
Status abort() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class FileWriter {

DISALLOW_COPY_AND_ASSIGN(FileWriter);

// Open the file for writing.
virtual Status open() { return Status::OK(); }

// Normal close. Wait for all data to persist before returning.
virtual Status close() = 0;

Expand Down
8 changes: 8 additions & 0 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ Status HdfsFileWriter::finalize() {
return Status::OK();
}

Status HdfsFileWriter::open() {
if (!_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
return Status::OK();
}

Status HdfsFileWriter::_open() {
_path = convert_path(_path, _hdfs_fs->_fs_name);
std::string hdfs_dir = _path.parent_path().string();
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/hdfs_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class HdfsFileWriter : public FileWriter {
HdfsFileWriter(Path file, FileSystemSPtr fs);
~HdfsFileWriter();

Status open() override;
Status close() override;
Status abort() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Expand Down
28 changes: 22 additions & 6 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,32 @@ Status S3FileWriter::close() {
return _st;
}
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
// it might be one file less than 5MB, we do upload here
if (_pending_buf != nullptr) {
if (_upload_id.empty()) {

if (_upload_id.empty()) {
if (_pending_buf != nullptr) {
// it might be one file less than 5MB, we do upload here
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
} else {
// if there is no pending buffer, we need to create an empty file
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
// if there is no upload id, we need to create a new one
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
_pending_buf->set_finish_upload([this]() { _countdown_event.signal(); });
_pending_buf->set_is_cancel([this]() { return _failed.load(); });
_pending_buf->set_on_failed([this](Status st) {
VLOG_NOTICE << "failed at key: " << _key << ", status: " << st.to_string();
std::unique_lock<std::mutex> _lck {_completed_lock};
this->_st = std::move(st);
_failed = true;
});
}
_countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
_countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;

RETURN_IF_ERROR(_complete());

return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
FileFactory::convert_storage_type(_storage_type), _state->exec_env(),
_file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0,
_file_writer_impl));
RETURN_IF_ERROR(_file_writer_impl->open());
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
// just use file writer is enough
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_base1 --

-- !select_tvf1 --

-- !select_tvf2 --

-- !select_tvf3 --

Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths

suite("test_outfile_empty_data", "external,hive,tvf,external_docker") {

String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("diable Hive test.")
return;
}

// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """

// use to outfile to hdfs
String hdfs_port = context.config.otherConfigs.get("hdfs_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
// It's okay to use random `hdfsUser`, but can not be empty.
def hdfsUserName = "doris"
def format = "csv"
def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"

// use to outfile to s3
String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");

// broker
String broker_name = "hdfs"

def export_table_name = "outfile_empty_data_test"

def create_table = {table_name, column_define ->
sql """ DROP TABLE IF EXISTS ${table_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
${column_define}
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
}

def outfile_to_HDFS_directly = {
// select ... into outfile ...
def uuid = UUID.randomUUID().toString()

hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"

def res = sql """
SELECT * FROM ${export_table_name} t ORDER BY user_id
INTO OUTFILE "${uri}"
FORMAT AS ${format}
PROPERTIES (
"fs.defaultFS"="${defaultFS}",
"hadoop.username" = "${hdfsUserName}"
);
"""
logger.info("outfile to hdfs direct success path: " + res[0][3]);
return res[0][3]
}

def outfile_to_HDFS_with_broker = {
// select ... into outfile ...
def uuid = UUID.randomUUID().toString()

hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"

def res = sql """
SELECT * FROM ${export_table_name} t ORDER BY user_id
INTO OUTFILE "${uri}"
FORMAT AS ${format}
PROPERTIES (
"broker.fs.defaultFS"="${defaultFS}",
"broker.name"="hdfs",
"broker.username" = "${hdfsUserName}"
);
"""
logger.info("outfile to hdfs with broker success path: " + res[0][3]);
return res[0][3]
}

def outfile_to_S3_directly = {
// select ... into outfile ...
s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/"
uri = "s3://${s3_outfile_path}/exp_"

def res = sql """
SELECT * FROM ${export_table_name} t ORDER BY user_id
INTO OUTFILE "${uri}"
FORMAT AS csv
PROPERTIES (
"s3.endpoint" = "${s3_endpoint}",
"s3.region" = "${region}",
"s3.secret_key"="${sk}",
"s3.access_key" = "${ak}"
);
"""
logger.info("outfile to s3 success path: " + res[0][3]);
return res[0][3]
}

try {
def doris_column_define = """
`user_id` INT NOT NULL COMMENT "用户id",
`name` STRING NULL,
`age` INT NULL"""
// create table
create_table(export_table_name, doris_column_define);
// test outfile empty data to hdfs directly
def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly()
// test outfile empty data to hdfs with broker
def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker()
// test outfile empty data to s3 directly
def outfile_to_s3_directly_url = outfile_to_S3_directly()
qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY user_id; """

qt_select_tvf1 """ select * from HDFS(
"uri" = "${outfile_to_hdfs_directly_url}0.csv",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}");
"""

qt_select_tvf2 """ select * from HDFS(
"uri" = "${outfile_to_hdfs_with_broker_url}0.csv",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}");
"""

qt_select_tvf3 """ SELECT * FROM S3 (
"uri" = "http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4, outfile_to_s3_directly_url.length())}0.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "${format}",
"region" = "${region}",
"use_path_style" = "true"
);
"""

} finally {
}
}