Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
// Currently it supports 2 kinds of requests:
// 1. get all kafka partition ids for given topic
// 2. get all kafka partition offsets for given topic and timestamp.
int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000;
int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000;
if (request->has_kafka_meta_request()) {
const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request();
if (!kafka_request.partition_id_for_latest_offsets().empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_routine_load_task_num_per_be = 5;

/**
* the max timeout of get kafka meta.
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_get_kafka_meta_timeout_second = 60;

/**
* The max number of files store in SmallFileMgr
*/
Expand Down
144 changes: 65 additions & 79 deletions fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.common.util;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
Expand All @@ -41,25 +42,10 @@

public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;

public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
Map<String, String> convertedCustomProperties) throws UserException {
TNetworkAddress address = null;
Backend be = null;
long beId = -1L;
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get all partitions. No alive backends");
}
Collections.shuffle(backendIds);
beId = backendIds.get(0);
be = Env.getCurrentSystemInfo().getBackend(beId);
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -71,21 +57,11 @@ public static List<Integer> getAllKafkaPartitions(String brokerList, String topi
)
)
).build();

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList());
} else {
return result.getKafkaMetaResult().getPartitionIdsList();
}
return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second)
.getKafkaMetaResult().getPartitionIdsList();
} catch (Exception e) {
LOG.warn("failed to get partitions from backend[{}].", beId, e);
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId
+ "]. error: " + e.getMessage());
"Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage());
}
}

Expand All @@ -95,18 +71,10 @@ public static List<Integer> getAllKafkaPartitions(String brokerList, String topi
public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
Map<String, String> convertedCustomProperties, List<Pair<Integer, Long>> timestampOffsets)
throws LoadException {
TNetworkAddress address = null;
LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
}
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get offset for times. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -127,23 +95,18 @@ public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, St
}

InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
return partitionOffsets;
}
return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get offsets for times.", e);
throw new LoadException(
Expand All @@ -154,19 +117,11 @@ public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, St
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Integer> partitionIds) throws LoadException {
TNetworkAddress address = null;
LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionIds, topic, taskId, jobId);
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionIds, topic, taskId, jobId);
}
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get latest offsets. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -185,28 +140,59 @@ public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId
metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionOffsets, topic, taskId, jobId);
return partitionOffsets;
}
return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get latest offsets.", e);
throw new LoadException(
"Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage());
}
}

private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
throws LoadException {
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
}
}

throw new LoadException("Failed to get info");
}
}