From 50dcce546dc430d69d65945a98bc25428316d45c Mon Sep 17 00:00:00 2001 From: BePPPower Date: Thu, 25 Jul 2024 17:55:36 +0800 Subject: [PATCH 1/9] fix 1 --- .../java/org/apache/doris/load/ExportJob.java | 4 +- .../java/org/apache/doris/load/ExportMgr.java | 70 +++++++++++++++++++ .../doris/rpc/BackendServiceClient.java | 5 ++ .../apache/doris/rpc/BackendServiceProxy.java | 12 ++++ 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 2d84023938e620..33418531f2cda8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -632,9 +632,7 @@ private Map convertOutfileProperties() { if (!maxFileSize.isEmpty()) { outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize); } - if (!deleteExistingFiles.isEmpty()) { - outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES, deleteExistingFiles); - } + outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom); // broker properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index bce0c5c4d9f195..74d9cdec6cdc30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -19,6 +19,10 @@ import org.apache.doris.analysis.CancelExportStmt; import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.OutFileClause; +import org.apache.doris.analysis.Queriable; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -33,21 +37,32 @@ import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.util.ListComparator; +import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PExportDeleteExistFilesRequest; +import org.apache.doris.proto.InternalService.PExportDeleteExistFilesResult; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TResultFileSinkOptions; +import org.apache.doris.thrift.TStatusCode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import com.google.protobuf.ByteString; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TSerializer; import java.util.ArrayList; import java.util.Collections; @@ -56,6 +71,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -107,6 +123,10 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception { } } unprotectAddJob(job); + // delete existing files + if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) { + exportDeleteExistFiles(job); + } job.getTaskExecutors().forEach(executor -> { Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); job.getTaskIdToExecutor().put(taskId, executor); @@ -118,6 +138,56 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception { LOG.info("add export job. {}", job); } + private void exportDeleteExistFiles(ExportJob job) throws Exception { + OutFileClause outFileClause = ((Queriable) (job.getSelectStmtListPerParallel().get(0) + .get(0))).getOutFileClause(); + + // 1. get TResultFileSinkOptions + TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions(); + + // 2. get StorageType + StorageType storageType = outFileClause.getBrokerDesc() == null + ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); + if (storageType == StorageType.BROKER) { + throw new AnalysisException("Outfile with broker does not support delete existing files."); + } + + // 3. prepare PExportDeleteExistFilesRequest + PExportDeleteExistFilesRequest request = PExportDeleteExistFilesRequest.newBuilder() + .setResultFileSinkOptions(ByteString.copyFrom(new TSerializer().serialize(sinkOptions))) + .setStorageType(storageType.toThrift().getValue()) + .build(); + + // 4. get BE + TNetworkAddress address = null; + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + break; + } + } + if (address == null) { + throw new AnalysisException("No Alive backends"); + } + + // 5. send rpc to BE + Future future = BackendServiceProxy.getInstance() + .exportDeleteExistFilesAsync(address, request); + InternalService.PExportDeleteExistFilesResult result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + String errMsg; + if (code != TStatusCode.OK) { + if (!result.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = result.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = "Outfile write success file failed. backend address: " + + NetUtils + .getHostPortInAccessibleFormat(address.getHostname(), address.getPort()); + } + throw new AnalysisException(errMsg); + } + } + public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException { // List of export jobs waiting to be cancelled List matchExportJobs = getWaitingCancelJobs(stmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 54c5e68144c57c..87fc1d95901851 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -107,6 +107,11 @@ public Future fetchArrowFlightSch return stub.fetchArrowFlightSchema(request); } + public Future exportDeleteExistFilesAsync( + InternalService.PExportDeleteExistFilesRequest request) { + return stub.exportDeleteExistFiles(request); + } + public Future outfileWriteSuccessAsync( InternalService.POutfileWriteSuccessRequest request) { return stub.outfileWriteSuccess(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 97a06176fef04d..92087d7e0590ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -341,6 +341,18 @@ public Future outfileWriteSuccessAsy } } + public Future exportDeleteExistFilesAsync( + TNetworkAddress address, InternalService.PExportDeleteExistFilesRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.exportDeleteExistFilesAsync(request); + } catch (Throwable e) { + LOG.warn("export delete exist files catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future fetchTableStructureAsync( TNetworkAddress address, InternalService.PFetchTableSchemaRequest request) throws RpcException { try { From de4b064e5c0bf24b85165a20b4154cec8e3f5761 Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Thu, 25 Jul 2024 18:23:48 +0800 Subject: [PATCH 2/9] fix 2 --- be/src/service/internal_service.cpp | 52 +++++++++++++++++-- be/src/service/internal_service.h | 5 ++ .../vec/sink/writer/vfile_result_writer.cpp | 4 +- be/src/vec/sink/writer/vfile_result_writer.h | 6 +-- gensrc/proto/internal_service.proto | 13 +++-- 5 files changed, 69 insertions(+), 11 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1fd8c681881be3..1fc8e7c707aadf 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -89,6 +89,7 @@ #include "olap/txn_manager.h" #include "olap/utils.h" #include "olap/wal/wal_manager.h" +#include "pipeline/exec/result_sink_operator.h" #include "runtime/buffer_control_block.h" #include "runtime/cache/result_cache.h" #include "runtime/define_primitive_type.h" @@ -135,6 +136,7 @@ #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/jsonb/serialize.h" #include "vec/runtime/vdata_stream_mgr.h" +#include "vec/sink/writer/vfile_result_writer.h" namespace google { namespace protobuf { @@ -644,6 +646,50 @@ void PInternalService::fetch_data(google::protobuf::RpcController* controller, } } +void PInternalService::export_delete_exist_files(google::protobuf::RpcController* controller, + const PExportDeleteExistFilesRequest* request, + PExportDeleteExistFilesResult* result, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, result, done]() { + VLOG_RPC << "export delete exist files"; + brpc::ClosureGuard closure_guard(done); + TResultFileSinkOptions file_options; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->result_file_sink_options().data()); + uint32_t len = request->result_file_sink_options().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_options); + if (!st.ok()) { + LOG(WARNING) << "export delete exist files failed, errmsg = " << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + + pipeline::ResultFileOptions pipleline_file_opts = pipeline::ResultFileOptions(file_options); + // TODO(ftw): get from FE + auto storage_type = static_cast(2); + + // delete files + TUniqueId unique_id; + RowDescriptor row_desc; + std::unique_ptr _writer; + _writer.reset(new vectorized::VFileResultWriter(&pipleline_file_opts, storage_type, + unique_id, {}, nullptr, nullptr, false, + row_desc)); + st = _writer->delete_dir(); + if (!st.ok()) { + LOG(WARNING) << "export delete exist files failed, errmsg = " << st; + st.to_protobuf(result->mutable_status()); + return; + } + }); + if (!ret) { + offer_failed(result, done, _heavy_work_pool); + return; + } +} + void PInternalService::outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, @@ -658,7 +704,7 @@ void PInternalService::outfile_write_success(google::protobuf::RpcController* co uint32_t len = request->result_file_sink().size(); st = deserialize_thrift_msg(buf, &len, false, &result_file_sink); if (!st.ok()) { - LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + LOG(WARNING) << "outfile write success file failed, errmsg = " << st; st.to_protobuf(result->mutable_status()); return; } @@ -677,7 +723,7 @@ void PInternalService::outfile_write_success(google::protobuf::RpcController* co bool exists = true; st = io::global_local_filesystem()->exists(file_name, &exists); if (!st.ok()) { - LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + LOG(WARNING) << "outfile write success filefailed, errmsg = " << st; st.to_protobuf(result->mutable_status()); return; } @@ -685,7 +731,7 @@ void PInternalService::outfile_write_success(google::protobuf::RpcController* co st = Status::InternalError("File already exists: {}", file_name); } if (!st.ok()) { - LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + LOG(WARNING) << "outfile write success file failed, errmsg = " << st; st.to_protobuf(result->mutable_status()); return; } diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 85a31136f22e8f..2769f649611ddb 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -95,6 +95,11 @@ class PInternalService : public PBackendService { void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) override; + void export_delete_exist_files(google::protobuf::RpcController* controller, + const PExportDeleteExistFilesRequest* request, + PExportDeleteExistFilesResult* result, + google::protobuf::Closure* done) override; + void outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index ce8f2d18e075fa..eff4e7bbf3b59b 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -85,7 +85,7 @@ Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) { _init_profile(profile); // Delete existing files if (_file_opts->delete_existing_files) { - RETURN_IF_ERROR(_delete_dir()); + RETURN_IF_ERROR(delete_dir()); } return _create_next_file_writer(); } @@ -348,7 +348,7 @@ Status VFileResultWriter::_fill_result_block() { return Status::OK(); } -Status VFileResultWriter::_delete_dir() { +Status VFileResultWriter::delete_dir() { // get dir of file_path std::string dir = _file_opts->file_path.substr(0, _file_opts->file_path.find_last_of('/') + 1); switch (_storage_type) { diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 42753a5e261cb5..782d0e97e39f95 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -74,6 +74,9 @@ class VFileResultWriter final : public AsyncResultWriter { _header = header; } + // delete the dir of file_path + Status delete_dir(); + private: Status _write_file(const Block& block); @@ -93,9 +96,6 @@ class VFileResultWriter final : public AsyncResultWriter { Status _send_result(); // save result into batch rather than send it Status _fill_result_block(); - // delete the dir of file_path - Status _delete_dir(); - RuntimeState* _state; // not owned, set when init const pipeline::ResultFileOptions* _file_opts = nullptr; TStorageBackendType::type _storage_type; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9997ecd478e56d..e335b28a9949cf 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -707,9 +707,6 @@ message PFetchTableSchemaResult { } message POutfileWriteSuccessRequest { - // optional string file_path = 1; - // optional string success_file_name = 2; - // map broker_properties = 4; // only for remote file optional bytes result_file_sink = 1; } @@ -717,6 +714,15 @@ message POutfileWriteSuccessResult { optional PStatus status = 1; } +message PExportDeleteExistFilesRequest { + optional bytes result_file_sink_options = 1; + optional int32 storage_type = 2; +} + +message PExportDeleteExistFilesResult { + optional PStatus status = 1; +} + message PJdbcTestConnectionRequest { optional bytes jdbc_table = 1; optional int32 jdbc_table_type = 2; @@ -1017,5 +1023,6 @@ service PBackendService { rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); rpc alter_vault_sync(PAlterVaultSyncRequest) returns (PAlterVaultSyncResponse); rpc get_be_resource(PGetBeResourceRequest) returns (PGetBeResourceResponse); + rpc export_delete_exist_files(PExportDeleteExistFilesRequest) returns (PExportDeleteExistFilesResult); }; From c2c22b32238ecbf8c1d49555e9c1811b2fb8d00e Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Fri, 26 Jul 2024 11:29:36 +0800 Subject: [PATCH 3/9] fix 3 --- be/src/service/internal_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1fc8e7c707aadf..366bdecc0e18ac 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -668,7 +668,7 @@ void PInternalService::export_delete_exist_files(google::protobuf::RpcController pipeline::ResultFileOptions pipleline_file_opts = pipeline::ResultFileOptions(file_options); // TODO(ftw): get from FE - auto storage_type = static_cast(2); + auto storage_type = static_cast(request->storage_type()); // delete files TUniqueId unique_id; From 23c1ffb65230988a402f327eb3b80c37ead8d9b2 Mon Sep 17 00:00:00 2001 From: BePPPower Date: Fri, 26 Jul 2024 15:00:24 +0800 Subject: [PATCH 4/9] fix 4 --- .../src/main/java/org/apache/doris/load/ExportMgr.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 74d9cdec6cdc30..6e64c4a23a61a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; @@ -145,11 +146,14 @@ private void exportDeleteExistFiles(ExportJob job) throws Exception { // 1. get TResultFileSinkOptions TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions(); - // 2. get StorageType + // 2. set Broker Address StorageType storageType = outFileClause.getBrokerDesc() == null ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); if (storageType == StorageType.BROKER) { - throw new AnalysisException("Outfile with broker does not support delete existing files."); + // set the broker address for OUTFILE sink + String brokerName = outFileClause.getBrokerDesc().getName(); + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName); + sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port))); } // 3. prepare PExportDeleteExistFilesRequest From e5c2aeb384a4c9d4063cf12ef9da026452463881 Mon Sep 17 00:00:00 2001 From: BePPPower Date: Thu, 1 Aug 2024 15:47:27 +0800 Subject: [PATCH 5/9] fix 5 --- .../apache/doris/common/util/BrokerUtil.java | 12 +++++++++- .../java/org/apache/doris/load/ExportMgr.java | 6 +++-- .../doris/load/loadv2/SparkEtlJobHandler.java | 2 +- .../doris/load/loadv2/SparkRepository.java | 2 +- .../org/apache/doris/qe/StmtExecutor.java | 23 ++++--------------- .../doris/common/util/BrokerUtilTest.java | 2 +- .../load/loadv2/SparkEtlJobHandlerTest.java | 2 +- 7 files changed, 24 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index d32a04331b58ce..eb5416f4aa48f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -107,6 +107,16 @@ public static void parseFile(String path, BrokerDesc brokerDesc, List { Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); @@ -164,7 +166,7 @@ private void exportDeleteExistFiles(ExportJob job) throws Exception { // 4. get BE TNetworkAddress address = null; - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + for (Backend be : Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) { if (be.isAlive()) { address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 75e3a6e1718352..69a41bd12836d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -365,7 +365,7 @@ public static synchronized void initLocalDir() { public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) { try { - BrokerUtil.deletePath(outputPath, brokerDesc); + BrokerUtil.deletePathWithBroker(outputPath, brokerDesc); LOG.info("delete path success. path: {}", outputPath); } catch (UserException e) { LOG.warn("delete path failed. path: {}", outputPath, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java index 4efd2d17279020..19b21ff11fe25b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -166,7 +166,7 @@ private void uploadArchive(boolean isReplace) throws LoadException { try { String remoteArchivePath = getRemoteArchivePath(currentDppVersion); if (isReplace) { - BrokerUtil.deletePath(remoteArchivePath, brokerDesc); + BrokerUtil.deletePathWithBroker(remoteArchivePath, brokerDesc); currentArchive.libraries.clear(); } String srcFilePath = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index adb8ecf61bf3b9..6b12a244297a04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -164,16 +164,12 @@ import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; -import org.apache.doris.planner.DataSink; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.GroupCommitScanNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.Planner; -import org.apache.doris.planner.ResultFileSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; @@ -182,7 +178,6 @@ import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; -import org.apache.doris.qe.Coordinator.FragmentExecParams; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; @@ -2061,26 +2056,18 @@ private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception { TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions(); // 2. set brokerNetAddress - List fragments = coord.getFragments(); - Map fragmentExecParamsMap = coord.getFragmentExecParamsMap(); - PlanFragmentId topId = fragments.get(0).getFragmentId(); - FragmentExecParams topParams = fragmentExecParamsMap.get(topId); - DataSink topDataSink = topParams.fragment.getSink(); - TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; - if (topDataSink instanceof ResultFileSink - && ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) { + StorageType storageType = outFileClause.getBrokerDesc() == null + ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); + if (storageType == StorageType.BROKER) { // set the broker address for OUTFILE sink - ResultFileSink topResultFileSink = (ResultFileSink) topDataSink; - FsBroker broker = Env.getCurrentEnv().getBrokerMgr() - .getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname()); + String brokerName = outFileClause.getBrokerDesc().getName(); + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName); sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port))); } // 3. set TResultFileSink properties TResultFileSink sink = new TResultFileSink(); sink.setFileOptions(sinkOptions); - StorageType storageType = outFileClause.getBrokerDesc() == null - ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); sink.setStorageBackendType(storageType.toThrift()); // 4. get BE diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java index ceae283a2fde33..13bbd66fab1d6f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java @@ -318,7 +318,7 @@ public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client try { BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); - BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc); + BrokerUtil.deletePathWithBroker("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc); } catch (Exception e) { Assert.fail(e.getMessage()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java index 5ecfa2e2d6492b..d1b6c786441d91 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java @@ -418,7 +418,7 @@ public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws UserException { new Expectations() { { - BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any); + BrokerUtil.deletePathWithBroker(etlOutputPath, (BrokerDesc) any); times = 1; } }; From 3114850b95c79a8985c8c00b8df24e384228b0f9 Mon Sep 17 00:00:00 2001 From: BePPPower Date: Thu, 1 Aug 2024 20:42:07 +0800 Subject: [PATCH 6/9] fix 6 --- .../main/java/org/apache/doris/common/util/BrokerUtil.java | 6 +++--- .../src/main/java/org/apache/doris/load/ExportMgr.java | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index eb5416f4aa48f4..c5a2803b848c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -107,12 +107,12 @@ public static void parseFile(String path, BrokerDesc brokerDesc, List { From 425ef70e1fe65cf040b12ba6b30ed227ccf1c565 Mon Sep 17 00:00:00 2001 From: BePPPower Date: Fri, 2 Aug 2024 11:32:54 +0800 Subject: [PATCH 7/9] fix 7 --- .../java/org/apache/doris/load/ExportMgr.java | 74 +------------------ .../doris/rpc/BackendServiceClient.java | 5 -- .../apache/doris/rpc/BackendServiceProxy.java | 12 --- 3 files changed, 3 insertions(+), 88 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 04c68c6aa5221f..7dbe953cf9bdbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -19,14 +19,9 @@ import org.apache.doris.analysis.CancelExportStmt; import org.apache.doris.analysis.CompoundPredicate; -import org.apache.doris.analysis.OutFileClause; -import org.apache.doris.analysis.Queriable; -import org.apache.doris.analysis.StorageBackend; -import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; @@ -39,32 +34,21 @@ import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.ListComparator; -import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.proto.InternalService; -import org.apache.doris.proto.InternalService.PExportDeleteExistFilesRequest; -import org.apache.doris.proto.InternalService.PExportDeleteExistFilesResult; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.scheduler.exception.JobException; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TResultFileSinkOptions; -import org.apache.doris.thrift.TStatusCode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; -import com.google.protobuf.ByteString; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TSerializer; import java.util.ArrayList; import java.util.Collections; @@ -73,7 +57,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -127,10 +110,12 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception { unprotectAddJob(job); // delete existing files if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) { + if (job.getBrokerDesc() == null) { + throw new AnalysisException("Local file system does not support delete existing files"); + } String fullPath = job.getExportPath(); BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), job.getBrokerDesc()); - // exportDeleteExistFiles(job); } job.getTaskExecutors().forEach(executor -> { Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); @@ -143,59 +128,6 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception { LOG.info("add export job. {}", job); } - private void exportDeleteExistFiles(ExportJob job) throws Exception { - OutFileClause outFileClause = ((Queriable) (job.getSelectStmtListPerParallel().get(0) - .get(0))).getOutFileClause(); - - // 1. get TResultFileSinkOptions - TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions(); - - // 2. set Broker Address - StorageType storageType = outFileClause.getBrokerDesc() == null - ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); - if (storageType == StorageType.BROKER) { - // set the broker address for OUTFILE sink - String brokerName = outFileClause.getBrokerDesc().getName(); - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName); - sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port))); - } - - // 3. prepare PExportDeleteExistFilesRequest - PExportDeleteExistFilesRequest request = PExportDeleteExistFilesRequest.newBuilder() - .setResultFileSinkOptions(ByteString.copyFrom(new TSerializer().serialize(sinkOptions))) - .setStorageType(storageType.toThrift().getValue()) - .build(); - - // 4. get BE - TNetworkAddress address = null; - for (Backend be : Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) { - if (be.isAlive()) { - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - break; - } - } - if (address == null) { - throw new AnalysisException("No Alive backends"); - } - - // 5. send rpc to BE - Future future = BackendServiceProxy.getInstance() - .exportDeleteExistFilesAsync(address, request); - InternalService.PExportDeleteExistFilesResult result = future.get(); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - String errMsg; - if (code != TStatusCode.OK) { - if (!result.getStatus().getErrorMsgsList().isEmpty()) { - errMsg = result.getStatus().getErrorMsgsList().get(0); - } else { - errMsg = "Outfile write success file failed. backend address: " - + NetUtils - .getHostPortInAccessibleFormat(address.getHostname(), address.getPort()); - } - throw new AnalysisException(errMsg); - } - } - public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException { // List of export jobs waiting to be cancelled List matchExportJobs = getWaitingCancelJobs(stmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 87fc1d95901851..54c5e68144c57c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -107,11 +107,6 @@ public Future fetchArrowFlightSch return stub.fetchArrowFlightSchema(request); } - public Future exportDeleteExistFilesAsync( - InternalService.PExportDeleteExistFilesRequest request) { - return stub.exportDeleteExistFiles(request); - } - public Future outfileWriteSuccessAsync( InternalService.POutfileWriteSuccessRequest request) { return stub.outfileWriteSuccess(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 92087d7e0590ca..97a06176fef04d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -341,18 +341,6 @@ public Future outfileWriteSuccessAsy } } - public Future exportDeleteExistFilesAsync( - TNetworkAddress address, InternalService.PExportDeleteExistFilesRequest request) throws RpcException { - try { - final BackendServiceClient client = getProxy(address); - return client.exportDeleteExistFilesAsync(request); - } catch (Throwable e) { - LOG.warn("export delete exist files catch a exception, address={}:{}", - address.getHostname(), address.getPort(), e); - throw new RpcException(address.hostname, e.getMessage()); - } - } - public Future fetchTableStructureAsync( TNetworkAddress address, InternalService.PFetchTableSchemaRequest request) throws RpcException { try { From f1a091e46ecc80c11cda4bd98d62abd941a0fc58 Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Fri, 2 Aug 2024 11:53:57 +0800 Subject: [PATCH 8/9] fix 8 --- be/src/service/internal_service.cpp | 44 ------------------- be/src/service/internal_service.h | 5 --- .../vec/sink/writer/vfile_result_writer.cpp | 4 +- be/src/vec/sink/writer/vfile_result_writer.h | 6 +-- gensrc/proto/internal_service.proto | 10 ----- 5 files changed, 5 insertions(+), 64 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 366bdecc0e18ac..154772b50be4f7 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -646,50 +646,6 @@ void PInternalService::fetch_data(google::protobuf::RpcController* controller, } } -void PInternalService::export_delete_exist_files(google::protobuf::RpcController* controller, - const PExportDeleteExistFilesRequest* request, - PExportDeleteExistFilesResult* result, - google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([request, result, done]() { - VLOG_RPC << "export delete exist files"; - brpc::ClosureGuard closure_guard(done); - TResultFileSinkOptions file_options; - Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)(request->result_file_sink_options().data()); - uint32_t len = request->result_file_sink_options().size(); - st = deserialize_thrift_msg(buf, &len, false, &file_options); - if (!st.ok()) { - LOG(WARNING) << "export delete exist files failed, errmsg = " << st; - st.to_protobuf(result->mutable_status()); - return; - } - } - - pipeline::ResultFileOptions pipleline_file_opts = pipeline::ResultFileOptions(file_options); - // TODO(ftw): get from FE - auto storage_type = static_cast(request->storage_type()); - - // delete files - TUniqueId unique_id; - RowDescriptor row_desc; - std::unique_ptr _writer; - _writer.reset(new vectorized::VFileResultWriter(&pipleline_file_opts, storage_type, - unique_id, {}, nullptr, nullptr, false, - row_desc)); - st = _writer->delete_dir(); - if (!st.ok()) { - LOG(WARNING) << "export delete exist files failed, errmsg = " << st; - st.to_protobuf(result->mutable_status()); - return; - } - }); - if (!ret) { - offer_failed(result, done, _heavy_work_pool); - return; - } -} - void PInternalService::outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 2769f649611ddb..85a31136f22e8f 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -95,11 +95,6 @@ class PInternalService : public PBackendService { void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) override; - void export_delete_exist_files(google::protobuf::RpcController* controller, - const PExportDeleteExistFilesRequest* request, - PExportDeleteExistFilesResult* result, - google::protobuf::Closure* done) override; - void outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index eff4e7bbf3b59b..ce8f2d18e075fa 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -85,7 +85,7 @@ Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) { _init_profile(profile); // Delete existing files if (_file_opts->delete_existing_files) { - RETURN_IF_ERROR(delete_dir()); + RETURN_IF_ERROR(_delete_dir()); } return _create_next_file_writer(); } @@ -348,7 +348,7 @@ Status VFileResultWriter::_fill_result_block() { return Status::OK(); } -Status VFileResultWriter::delete_dir() { +Status VFileResultWriter::_delete_dir() { // get dir of file_path std::string dir = _file_opts->file_path.substr(0, _file_opts->file_path.find_last_of('/') + 1); switch (_storage_type) { diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 782d0e97e39f95..42753a5e261cb5 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -74,9 +74,6 @@ class VFileResultWriter final : public AsyncResultWriter { _header = header; } - // delete the dir of file_path - Status delete_dir(); - private: Status _write_file(const Block& block); @@ -96,6 +93,9 @@ class VFileResultWriter final : public AsyncResultWriter { Status _send_result(); // save result into batch rather than send it Status _fill_result_block(); + // delete the dir of file_path + Status _delete_dir(); + RuntimeState* _state; // not owned, set when init const pipeline::ResultFileOptions* _file_opts = nullptr; TStorageBackendType::type _storage_type; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index e335b28a9949cf..b08308f21449e5 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -714,15 +714,6 @@ message POutfileWriteSuccessResult { optional PStatus status = 1; } -message PExportDeleteExistFilesRequest { - optional bytes result_file_sink_options = 1; - optional int32 storage_type = 2; -} - -message PExportDeleteExistFilesResult { - optional PStatus status = 1; -} - message PJdbcTestConnectionRequest { optional bytes jdbc_table = 1; optional int32 jdbc_table_type = 2; @@ -1023,6 +1014,5 @@ service PBackendService { rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); rpc alter_vault_sync(PAlterVaultSyncRequest) returns (PAlterVaultSyncResponse); rpc get_be_resource(PGetBeResourceRequest) returns (PGetBeResourceResponse); - rpc export_delete_exist_files(PExportDeleteExistFilesRequest) returns (PExportDeleteExistFilesResult); }; From b9ad5eea731fd32053370a79bc23861279a2d3bb Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Fri, 2 Aug 2024 11:55:43 +0800 Subject: [PATCH 9/9] fix 9 --- be/src/service/internal_service.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 154772b50be4f7..a60b279391cd56 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -89,7 +89,6 @@ #include "olap/txn_manager.h" #include "olap/utils.h" #include "olap/wal/wal_manager.h" -#include "pipeline/exec/result_sink_operator.h" #include "runtime/buffer_control_block.h" #include "runtime/cache/result_cache.h" #include "runtime/define_primitive_type.h" @@ -136,7 +135,6 @@ #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/jsonb/serialize.h" #include "vec/runtime/vdata_stream_mgr.h" -#include "vec/sink/writer/vfile_result_writer.h" namespace google { namespace protobuf {