Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (ctx->is_chunked_transfer) {
pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
pipe->set_is_chunked_transfer(true);
} else {
pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
Expand Down
10 changes: 10 additions & 0 deletions be/src/io/fs/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

size_t current_capacity();

bool is_chunked_transfer() const { return _is_chunked_transfer; }

void set_is_chunked_transfer(bool is_chunked_transfer) {
_is_chunked_transfer = is_chunked_transfer;
}

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down Expand Up @@ -121,6 +127,10 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

// no use, only for compatibility with the `Path` interface
Path _path = "";

// When importing JSON data and using chunked transfer encoding,
// the data needs to be completely read before it can be parsed.
bool _is_chunked_transfer = false;
};
} // namespace io
} // namespace doris
51 changes: 49 additions & 2 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,7 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
break;
}
case TFileType::FILE_STREAM: {
RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
->read_one_message(file_buf, read_size));
RETURN_IF_ERROR(_read_one_message_from_pipe(file_buf, read_size));
break;
}
default: {
Expand All @@ -1004,6 +1003,54 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}

Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf,
size_t* read_size) {
auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());

// first read: read from the pipe once.
RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size));

// When the file is not chunked, the entire file has already been read.
if (!stream_load_pipe->is_chunked_transfer()) {
return Status::OK();
}

std::vector<uint8_t> buf;
uint64_t cur_size = 0;

// second read: continuously read data from the pipe until all data is read.
std::unique_ptr<uint8_t[]> read_buf;
size_t read_buf_size = 0;
while (true) {
RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size));
if (read_buf_size == 0) {
break;
} else {
buf.insert(buf.end(), read_buf.get(), read_buf.get() + read_buf_size);
cur_size += read_buf_size;
read_buf_size = 0;
read_buf.reset();
}
}

// No data is available during the second read.
if (cur_size == 0) {
return Status::OK();
}

std::unique_ptr<uint8_t[]> total_buf = std::make_unique<uint8_t[]>(cur_size + *read_size);

// copy the data during the first read
memcpy(total_buf.get(), file_buf->get(), *read_size);

// copy the data during the second read
memcpy(total_buf.get() + *read_size, buf.data(), cur_size);
*file_buf = std::move(total_buf);
*read_size += cur_size;
return Status::OK();
}

// ---------SIMDJSON----------
// simdjson, replace none simdjson function if it is ready
Status NewJsonReader::_simdjson_init_reader() {
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class NewJsonReader : public GenericReader {

Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size);

// StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
// Need to read all the data before performing JSON parsing.
Status _read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size);

// simdjson, replace none simdjson function if it is ready
Status _simdjson_init_reader();
Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
15272

-- !sql --
15282

Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.http.HttpStatus
import org.apache.http.client.methods.CloseableHttpResponse
import org.apache.http.client.methods.RequestBuilder
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils

suite("test_load_with_transfer_encoding", "p0") {
def table_name = "test_load_with_transfer_encoding"

sql "DROP TABLE IF EXISTS ${table_name}"
sql """
CREATE TABLE ${table_name} (
`place_id` varchar(14) NOT NULL,
`card_id` varchar(12) NOT NULL,
`shift_id` varchar(4) NULL,
`id` bigint NOT NULL AUTO_INCREMENT (1),
`created` datetime(6) NOT NULL,
`creater` bigint NULL,
`deleted` int NOT NULL,
`updated` datetime(6) NULL,
`card_type_id` varchar(8) NOT NULL,
`card_type_name` varchar(20) NULL,
`cash_balance` int NOT NULL,
`cashier_id` varchar(8) NULL,
`client_id` varchar(4) NULL,
`cost` int NOT NULL,
`creater_name` varchar(50) NULL,
`details` varchar(200) NULL,
`id_name` varchar(50) NOT NULL,
`id_number` varchar(18) NOT NULL,
`last_client_id` varchar(4) NULL,
`login_id` varchar(16) NULL,
`operation_type` varchar(50) NOT NULL,
`present` int NOT NULL,
`present_balance` int NOT NULL,
`remark` varchar(200) NULL,
`source_type` varchar(50) NOT NULL,
`online_account` int NOT NULL
) ENGINE = OLAP DUPLICATE KEY (`place_id`, `card_id`, `shift_id`) DISTRIBUTED BY HASH (`operation_type`) BUCKETS 10 PROPERTIES (
"file_cache_ttl_seconds" = "0",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"replication_num" = "1",
"group_commit_data_bytes" = "134217728"
);
"""


String db = context.config.getDbNameByFile(context.file)

def load_data = { inputFile, int count ->
String url = """${getS3Url()}/regression/load/data/${inputFile}.json"""
String fileName

HttpClients.createDefault().withCloseable { client ->
def file = new File("${context.config.cacheDataPath}/${inputFile}.json")
if (file.exists()) {
log.info("Found ${url} in ${file.getAbsolutePath()}");
fileName = file.getAbsolutePath()
return;
}

log.info("Start to down data from ${url} to $context.config.cacheDataPath}/");
CloseableHttpResponse resp = client.execute(RequestBuilder.get(url).build())
int code = resp.getStatusLine().getStatusCode()

if (code != HttpStatus.SC_OK) {
String streamBody = EntityUtils.toString(resp.getEntity())
log.info("Fail to download data ${url}, code: ${code}, body:\n${streamBody}")
throw new IllegalStateException("Get http stream failed, status code is ${code}, body:\n${streamBody}")
}

InputStream httpFileStream = resp.getEntity().getContent()
java.nio.file.Files.copy(httpFileStream, file.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING)
httpFileStream.close()
fileName = file.getAbsolutePath()
log.info("File downloaded to: ${fileName}")
}

def command = """curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H read_json_by_line:false -H Expect:100-continue -H max_filter_ratio:1 -H strict_mode:false -H strip_outer_array:true -H columns:id,created,creater,deleted,updated,card_id,card_type_id,card_type_name,cash_balance,cashier_id,client_id,cost,creater_name,details,id_name,id_number,last_client_id,login_id,operation_type,place_id,present,present_balance,remark,shift_id,source_type,online_account -H format:json -H Transfer-Encoding:chunked -T ${fileName} -XPUT http://${context.config.feHttpAddress}/api/${db}/${table_name}/_stream_load"""
log.info("stream load: ${command}")
def process = command.execute()
def code = process.waitFor()
def out = process.text
def json = parseJson(out)
log.info("stream load result is:: ${out}".toString())
assertEquals("success", json.Status.toLowerCase())
assertEquals(count, json.NumberLoadedRows)
qt_sql """ select count() from ${table_name} """
}

load_data.call("test_load_with_transfer_encoding", 15272)
load_data.call("test_transfer_encoding_small", 10)

}

Loading