From aa891c7c68114dc7577ca7876d6b229d09e578ca Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 11 Sep 2023 12:05:56 +0800 Subject: [PATCH] [Improve](Routineload)Set the maximum timeout for obtaining partition to 60s --- .../src/main/java/org/apache/doris/common/util/KafkaUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 581a1ca48e63d0..40041502ca8087 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -41,6 +41,7 @@ public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); + private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; public static List getAllKafkaPartitions(String brokerList, String topic, Map convertedCustomProperties) throws UserException { @@ -70,7 +71,7 @@ public static List getAllKafkaPartitions(String brokerList, String topi // get info Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); + InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList());