Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ void PInternalService::outfile_write_success(google::protobuf::RpcController* co
uint32_t len = request->result_file_sink().size();
st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
if (!st.ok()) {
LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
st.to_protobuf(result->mutable_status());
return;
}
Expand All @@ -677,15 +677,15 @@ void PInternalService::outfile_write_success(google::protobuf::RpcController* co
bool exists = true;
st = io::global_local_filesystem()->exists(file_name, &exists);
if (!st.ok()) {
LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
LOG(WARNING) << "outfile write success filefailed, errmsg = " << st;
st.to_protobuf(result->mutable_status());
return;
}
if (exists) {
st = Status::InternalError("File already exists: {}", file_name);
}
if (!st.ok()) {
LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
st.to_protobuf(result->mutable_status());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ public static void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFil
}
}

public static void deleteDirectoryWithFileSystem(String path, BrokerDesc brokerDesc) throws UserException {
RemoteFileSystem fileSystem = FileSystemFactory.get(
brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties());
Status st = fileSystem.deleteDirectory(path);
if (!st.ok()) {
throw new UserException(brokerDesc.getName() + " delete directory exception. path="
+ path + ", err: " + st.getErrMsg());
}
}

public static String printBroker(String brokerName, TNetworkAddress address) {
return brokerName + "[" + address.toString() + "]";
}
Expand Down Expand Up @@ -358,7 +368,7 @@ public static void writeFile(String srcFilePath, String destFilePath,
* @param brokerDesc
* @throws UserException if broker op failed
*/
public static void deletePath(String path, BrokerDesc brokerDesc) throws UserException {
public static void deletePathWithBroker(String path, BrokerDesc brokerDesc) throws UserException {
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
boolean failed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,9 +632,7 @@ private Map<String, String> convertOutfileProperties() {
if (!maxFileSize.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize);
}
if (!deleteExistingFiles.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES, deleteExistingFiles);
}

outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom);

// broker properties
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -107,6 +108,15 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
}
}
unprotectAddJob(job);
// delete existing files
if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) {
if (job.getBrokerDesc() == null) {
throw new AnalysisException("Local file system does not support delete existing files");
}
String fullPath = job.getExportPath();
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
}
job.getTaskExecutors().forEach(executor -> {
Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
job.getTaskIdToExecutor().put(taskId, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public static synchronized void initLocalDir() {

public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
try {
BrokerUtil.deletePath(outputPath, brokerDesc);
BrokerUtil.deletePathWithBroker(outputPath, brokerDesc);
LOG.info("delete path success. path: {}", outputPath);
} catch (UserException e) {
LOG.warn("delete path failed. path: {}", outputPath, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private void uploadArchive(boolean isReplace) throws LoadException {
try {
String remoteArchivePath = getRemoteArchivePath(currentDppVersion);
if (isReplace) {
BrokerUtil.deletePath(remoteArchivePath, brokerDesc);
BrokerUtil.deletePathWithBroker(remoteArchivePath, brokerDesc);
currentArchive.libraries.clear();
}
String srcFilePath = null;
Expand Down
23 changes: 5 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,12 @@
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
Expand All @@ -182,7 +178,6 @@
import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator.FragmentExecParams;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
Expand Down Expand Up @@ -2061,26 +2056,18 @@ private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception {
TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions();

// 2. set brokerNetAddress
List<PlanFragment> fragments = coord.getFragments();
Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = coord.getFragmentExecParamsMap();
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
DataSink topDataSink = topParams.fragment.getSink();
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
if (topDataSink instanceof ResultFileSink
&& ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
StorageType storageType = outFileClause.getBrokerDesc() == null
? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType();
if (storageType == StorageType.BROKER) {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
String brokerName = outFileClause.getBrokerDesc().getName();
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName);
sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port)));
}

// 3. set TResultFileSink properties
TResultFileSink sink = new TResultFileSink();
sink.setFileOptions(sinkOptions);
StorageType storageType = outFileClause.getBrokerDesc() == null
? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType();
sink.setStorageBackendType(storageType.toThrift());

// 4. get BE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client

try {
BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc);
BrokerUtil.deletePathWithBroker("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client
public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws UserException {
new Expectations() {
{
BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any);
BrokerUtil.deletePathWithBroker(etlOutputPath, (BrokerDesc) any);
times = 1;
}
};
Expand Down
3 changes: 0 additions & 3 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -707,9 +707,6 @@ message PFetchTableSchemaResult {
}

message POutfileWriteSuccessRequest {
// optional string file_path = 1;
// optional string success_file_name = 2;
// map<string, string> broker_properties = 4; // only for remote file
optional bytes result_file_sink = 1;
}

Expand Down