From 7b08d13de2e571d4eb726a29d0d0910b99f5c855 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Fri, 14 Nov 2025 17:30:52 +0800 Subject: [PATCH 1/3] [fix](filecache) fix warm up cancel failure when be is down Fixed issue where cancel flow would exit if a BE was offline,preventing subsequent BEs from receiving clear_job RPC.Now skips failed BEs and continues sending RPCs to others. Signed-off-by: zhengyu --- .../apache/doris/cloud/CloudWarmUpJob.java | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java index d8d7c95085dc50..5d12c2b967e65a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -539,20 +540,42 @@ public void releaseClients() { private final void clearJobOnBEs() { try { initClients(); - for (Map.Entry entry : beToClient.entrySet()) { + // Iterate with explicit iterator so we can remove invalidated clients during iteration. + Iterator> iter = beToClient.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + long beId = entry.getKey(); + Client client = entry.getValue(); TWarmUpTabletsRequest request = new TWarmUpTabletsRequest(); request.setType(TWarmUpTabletsRequestType.CLEAR_JOB); request.setJobId(jobId); if (this.isEventDriven()) { TWarmUpEventType event = getTWarmUpEventType(); if (event == null) { - throw new IllegalArgumentException("Unknown SyncEvent " + syncEvent); + // If event type is unknown, skip this BE but continue others. + LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE {}", syncEvent, beId); + continue; } request.setEvent(event); } - LOG.info("send warm up request to BE {}. job_id={}, request_type=CLEAR_JOB", - entry.getKey(), jobId); - entry.getValue().warmUpTablets(request); + LOG.info("send warm up request to BE {}. job_id={}, request_type=CLEAR_JOB", beId, jobId); + try { + client.warmUpTablets(request); + } catch (Exception e) { + // If RPC to this BE fails, invalidate this client and remove it from map, + // then continue to next BE so that one bad BE won't block others. + LOG.warn("send warm up request to BE {} failed: {}", beId, e.getMessage()); + try { + TNetworkAddress addr = beToAddr == null ? null : beToAddr.get(beId); + if (addr != null) { + ClientPool.backendPool.invalidateObject(addr, client); + } + } catch (Exception ie) { + LOG.warn("invalidate client for BE {} failed: {}", beId, ie.getMessage()); + } + // remove from local map so releaseClients won't try to return an invalidated client + iter.remove(); + } } } catch (Exception e) { LOG.warn("send warm up request failed. job_id={}, request_type=CLEAR_JOB, exception={}", From ae714552c285e7306e63e9bcfb8be01349ff90c9 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Mon, 1 Dec 2025 17:08:56 +0800 Subject: [PATCH 2/3] response to the reviewer Signed-off-by: zhengyu --- .../apache/doris/cloud/CloudWarmUpJob.java | 49 ++++++++++++++----- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java index 5d12c2b967e65a..762e8de23022d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java @@ -24,7 +24,6 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Triple; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -537,6 +536,26 @@ public void releaseClients() { beToAddr = null; } + private String getBackendEndpoint(long beId) { + if (beToAddr != null) { + TNetworkAddress addr = beToAddr.get(beId); + if (addr != null) { + String host = addr.getHostname(); + if (host == null) { + host = "unknown"; + } + return host + ":" + addr.getPort(); + } + } + if (beToThriftAddress != null) { + String addr = beToThriftAddress.get(beId); + if (addr != null) { + return addr; + } + } + return "unknown"; + } + private final void clearJobOnBEs() { try { initClients(); @@ -553,18 +572,21 @@ private final void clearJobOnBEs() { TWarmUpEventType event = getTWarmUpEventType(); if (event == null) { // If event type is unknown, skip this BE but continue others. - LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE {}", syncEvent, beId); + LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE {} ({})", + syncEvent, beId, getBackendEndpoint(beId)); continue; } request.setEvent(event); } - LOG.info("send warm up request to BE {}. job_id={}, request_type=CLEAR_JOB", beId, jobId); + LOG.info("send warm up request to BE {} ({}). job_id={}, request_type=CLEAR_JOB", + beId, getBackendEndpoint(beId), jobId); try { client.warmUpTablets(request); } catch (Exception e) { // If RPC to this BE fails, invalidate this client and remove it from map, // then continue to next BE so that one bad BE won't block others. - LOG.warn("send warm up request to BE {} failed: {}", beId, e.getMessage()); + LOG.warn("send warm up request to BE {} ({}) failed: {}", + beId, getBackendEndpoint(beId), e.getMessage()); try { TNetworkAddress addr = beToAddr == null ? null : beToAddr.get(beId); if (addr != null) { @@ -676,8 +698,8 @@ private void runEventDrivenJob() throws Exception { throw new IllegalArgumentException("Unknown SyncEvent " + syncEvent); } request.setEvent(event); - LOG.debug("send warm up request to BE {}. job_id={}, event={}, request_type=SET_JOB(EVENT)", - entry.getKey(), jobId, syncEvent); + LOG.debug("send warm up request to BE {} ({}). job_id={}, event={}, request_type=SET_JOB(EVENT)", + entry.getKey(), getBackendEndpoint(entry.getKey()), jobId, syncEvent); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) { @@ -721,9 +743,10 @@ private void runRunningJob() throws Exception { request.setJobId(jobId); request.setBatchId(lastBatchId + 1); request.setJobMetas(buildJobMetas(entry.getKey(), request.batch_id)); - LOG.info("send warm up request to BE {}. job_id={}, batch_id={}" + LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}" + ", job_size={}, request_type=SET_JOB", - entry.getKey(), jobId, request.batch_id, request.job_metas.size()); + entry.getKey(), getBackendEndpoint(entry.getKey()), + jobId, request.batch_id, request.job_metas.size()); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) { @@ -738,8 +761,9 @@ private void runRunningJob() throws Exception { for (Map.Entry entry : beToClient.entrySet()) { TWarmUpTabletsRequest request = new TWarmUpTabletsRequest(); request.setType(TWarmUpTabletsRequestType.GET_CURRENT_JOB_STATE_AND_LEASE); - LOG.info("send warm up request to BE {}. job_id={}, request_type=GET_CURRENT_JOB_STATE_AND_LEASE", - entry.getKey(), jobId); + LOG.info("send warm up request to BE {} ({}). job_id={}" + + ", request_type=GET_CURRENT_JOB_STATE_AND_LEASE", + entry.getKey(), getBackendEndpoint(entry.getKey()), jobId); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) { @@ -777,9 +801,10 @@ private void runRunningJob() throws Exception { if (!request.job_metas.isEmpty()) { // check all batches is done or not allBatchesDone = false; - LOG.info("send warm up request to BE {}. job_id={}, batch_id={}" + LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}" + ", job_size={}, request_type=SET_BATCH", - entry.getKey(), jobId, request.batch_id, request.job_metas.size()); + entry.getKey(), getBackendEndpoint(entry.getKey()), + jobId, request.batch_id, request.job_metas.size()); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) { From cab851da3ce63a3b1ae5cfb9c8c965906d7c5ef8 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 2 Dec 2025 16:53:47 +0800 Subject: [PATCH 3/3] Update CloudWarmUpJob.java --- .../src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java index 762e8de23022d8..90f61fbfb75cef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java @@ -24,6 +24,7 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.Triple; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable;