From 3c89e1ce3adb6388c1528d2158b7257e39454135 Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Wed, 5 Jun 2024 14:20:17 +0800 Subject: [PATCH 1/3] [improve](routine-load) add retry when get Kafka meta info (#35376) If be down when FE send RPC `getInfo` or meet network error when be send RPC to Kafka, routine load job will pause. To keep routine load stable, add retry when get Kafka meta info. --- .../apache/doris/common/util/KafkaUtil.java | 196 +++++++++++------- 1 file changed, 120 insertions(+), 76 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 60f423773e7f00..24fbfb7a428007 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -42,24 +42,11 @@ public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; - private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5; + private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10; public static List getAllKafkaPartitions(String brokerList, String topic, Map convertedCustomProperties) throws UserException { - TNetworkAddress address = null; - Backend be = null; - long beId = -1L; try { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get all partitions. No alive backends"); - } - Collections.shuffle(backendIds); - beId = backendIds.get(0); - be = Env.getCurrentSystemInfo().getBackend(beId); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -71,21 +58,10 @@ public static List getAllKafkaPartitions(String brokerList, String topi ) ) ).build(); - - // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList()); - } else { - return result.getKafkaMetaResult().getPartitionIdsList(); - } + return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList(); } catch (Exception e) { - LOG.warn("failed to get partitions from backend[{}].", beId, e); throw new LoadException( - "Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId - + "]. error: " + e.getMessage()); + "Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage()); } } @@ -95,18 +71,10 @@ public static List getAllKafkaPartitions(String brokerList, String topi public static List> getOffsetsForTimes(String brokerList, String topic, Map convertedCustomProperties, List> timestampOffsets) throws LoadException { - TNetworkAddress address = null; - LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets); + if (LOG.isDebugEnabled()) { + LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets); + } try { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get offset for times. No alive backends"); - } - Collections.shuffle(backendIds); - Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -128,22 +96,17 @@ public static List> getOffsetsForTimes(String brokerList, St InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); - // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList()); - } else { - List pairs = result.getPartitionOffsets().getOffsetTimesList(); - List> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); + } + if (LOG.isDebugEnabled()) { LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets); - return partitionOffsets; } + return partitionOffsets; } catch (Exception e) { LOG.warn("failed to get offsets for times.", e); throw new LoadException( @@ -154,19 +117,11 @@ public static List> getOffsetsForTimes(String brokerList, St public static List> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic, Map convertedCustomProperties, List partitionIds) throws LoadException { - TNetworkAddress address = null; - LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", - partitionIds, topic, taskId, jobId); + if (LOG.isDebugEnabled()) { + LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", + partitionIds, topic, taskId, jobId); + } try { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get latest offsets. No alive backends"); - } - Collections.shuffle(backendIds); - Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -186,27 +141,116 @@ public static List> getLatestOffsets(long jobId, UUID taskId } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); - // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList()); - } else { - List pairs = result.getPartitionOffsets().getOffsetTimesList(); - List> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); + } + if (LOG.isDebugEnabled()) { LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}", partitionOffsets, topic, taskId, jobId); - return partitionOffsets; } + return partitionOffsets; } catch (Exception e) { LOG.warn("failed to get latest offsets.", e); throw new LoadException( "Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage()); } } + + public static List> getRealOffsets(String brokerList, String topic, + Map convertedCustomProperties, + List> offsets) + throws LoadException { + // filter values greater than 0 as these offsets is real offset + // only update offset like OFFSET_BEGINNING or OFFSET_END + List> offsetFlags = new ArrayList<>(); + List> realOffsets = new ArrayList<>(); + for (Pair pair : offsets) { + if (pair.second < 0) { + offsetFlags.add(pair); + } else { + realOffsets.add(pair); + } + } + if (offsetFlags.size() == 0) { + LOG.info("do not need update and directly return offsets for partitions {} in topic: {}", offsets, topic); + return offsets; + } + + try { + InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = + InternalService.PKafkaMetaProxyRequest.newBuilder() + .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() + .setBrokers(brokerList) + .setTopic(topic) + .addAllProperties( + convertedCustomProperties.entrySet().stream().map( + e -> InternalService.PStringPair.newBuilder() + .setKey(e.getKey()) + .setVal(e.getValue()) + .build() + ).collect(Collectors.toList()) + ) + ); + for (Pair pair : offsetFlags) { + metaRequestBuilder.addOffsetFlags(InternalService.PIntegerPair.newBuilder().setKey(pair.first) + .setVal(pair.second).build()); + } + InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( + metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); + } + realOffsets.addAll(partitionOffsets); + LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic); + return realOffsets; + } catch (Exception e) { + LOG.warn("failed to get real offsets.", e); + throw new LoadException( + "Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage()); + } + } + + private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout) + throws LoadException { + int retryTimes = 0; + TNetworkAddress address = null; + Future future = null; + InternalService.PProxyResult result = null; + while (retryTimes < 3) { + List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (backendIds.isEmpty()) { + throw new LoadException("Failed to get info. No alive backends"); + } + Collections.shuffle(backendIds); + Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + + try { + future = BackendServiceProxy.getInstance().getInfo(address, request); + result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); + retryTimes++; + continue; + } + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + LOG.warn("failed to get info request to " + + address + " err " + result.getStatus().getErrorMsgsList()); + retryTimes++; + } else { + return result; + } + } + + throw new LoadException("Failed to get info"); + } } From 929c6890ef1ee9477cfde9cb0a6a05f1918d504e Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Sat, 22 Jun 2024 08:02:26 +0800 Subject: [PATCH 2/3] [chore](routine-load) make get Kafka meta timeout configurable (#36619) Sometimes, the delay from be to Kafka is relatively high, which can cause get info RPC timeout. Make get Kafka meta timeout configurable allow users to customize timeout times which can solve this issue. --- be/src/service/internal_service.cpp | 2 +- .../java/org/apache/doris/common/Config.java | 6 ++++++ .../apache/doris/common/util/KafkaUtil.java | 20 +++++++++---------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3677c1210a5634..9591c1928ee961 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -933,7 +933,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, // Currently it supports 2 kinds of requests: // 1. get all kafka partition ids for given topic // 2. get all kafka partition offsets for given topic and timestamp. - int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000; + int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000; if (request->has_kafka_meta_request()) { const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); if (!kafka_request.partition_id_for_latest_offsets().empty()) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index dcdcc7dd035d23..0b4aa1cfd3a81c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1141,6 +1141,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_routine_load_task_num_per_be = 5; + /** + * the max timeout of get kafka meta. + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_get_kafka_meta_timeout_second = 60; + /** * The max number of files store in SmallFileMgr */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 24fbfb7a428007..ae0f1920380dab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -18,6 +18,7 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -41,8 +42,6 @@ public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); - private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; - private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10; public static List getAllKafkaPartitions(String brokerList, String topic, Map convertedCustomProperties) throws UserException { @@ -58,7 +57,8 @@ public static List getAllKafkaPartitions(String brokerList, String topi ) ) ).build(); - return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList(); + return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second) + .getKafkaMetaResult().getPartitionIdsList(); } catch (Exception e) { throw new LoadException( "Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage()); @@ -95,8 +95,8 @@ public static List> getOffsetsForTimes(String brokerList, St } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List pairs = result.getPartitionOffsets().getOffsetTimesList(); List> partitionOffsets = Lists.newArrayList(); @@ -140,8 +140,8 @@ public static List> getLatestOffsets(long jobId, UUID taskId metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List pairs = result.getPartitionOffsets().getOffsetTimesList(); List> partitionOffsets = Lists.newArrayList(); @@ -200,8 +200,8 @@ public static List> getRealOffsets(String brokerList, String .setVal(pair.second).build()); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List pairs = result.getPartitionOffsets().getOffsetTimesList(); List> partitionOffsets = Lists.newArrayList(); @@ -235,7 +235,7 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx try { future = BackendServiceProxy.getInstance().getInfo(address, request); - result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); + result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); retryTimes++; From 5ea81615346a7628319382b3cabcdd3201371110 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Mon, 8 Jul 2024 15:42:33 +0800 Subject: [PATCH 3/3] fix merge error --- .../apache/doris/common/util/KafkaUtil.java | 58 ------------------- 1 file changed, 58 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index ae0f1920380dab..f6342e1a6fbc50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -160,64 +160,6 @@ public static List> getLatestOffsets(long jobId, UUID taskId } } - public static List> getRealOffsets(String brokerList, String topic, - Map convertedCustomProperties, - List> offsets) - throws LoadException { - // filter values greater than 0 as these offsets is real offset - // only update offset like OFFSET_BEGINNING or OFFSET_END - List> offsetFlags = new ArrayList<>(); - List> realOffsets = new ArrayList<>(); - for (Pair pair : offsets) { - if (pair.second < 0) { - offsetFlags.add(pair); - } else { - realOffsets.add(pair); - } - } - if (offsetFlags.size() == 0) { - LOG.info("do not need update and directly return offsets for partitions {} in topic: {}", offsets, topic); - return offsets; - } - - try { - InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = - InternalService.PKafkaMetaProxyRequest.newBuilder() - .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() - .setBrokers(brokerList) - .setTopic(topic) - .addAllProperties( - convertedCustomProperties.entrySet().stream().map( - e -> InternalService.PStringPair.newBuilder() - .setKey(e.getKey()) - .setVal(e.getValue()) - .build() - ).collect(Collectors.toList()) - ) - ); - for (Pair pair : offsetFlags) { - metaRequestBuilder.addOffsetFlags(InternalService.PIntegerPair.newBuilder().setKey(pair.first) - .setVal(pair.second).build()); - } - InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); - InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); - - List pairs = result.getPartitionOffsets().getOffsetTimesList(); - List> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } - realOffsets.addAll(partitionOffsets); - LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic); - return realOffsets; - } catch (Exception e) { - LOG.warn("failed to get real offsets.", e); - throw new LoadException( - "Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage()); - } - } - private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout) throws LoadException { int retryTimes = 0;