From f301d8446abb087206bc5fe1a710286f71566e56 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 27 Nov 2018 12:42:50 +0800 Subject: [PATCH 1/2] Avoid 'No more data to read' error when handling stream load rpc 1. Catch throwable of all stream load rpc. 2. Avoid setting null string as error msg of rpc result status. --- .../apache/doris/common/UserException.java | 10 ++++---- .../doris/service/FrontendServiceImpl.java | 23 ++++++++++++++++++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/UserException.java b/fe/src/main/java/org/apache/doris/common/UserException.java index 7f04a08f92fd58..7add338f931329 100644 --- a/fe/src/main/java/org/apache/doris/common/UserException.java +++ b/fe/src/main/java/org/apache/doris/common/UserException.java @@ -17,24 +17,26 @@ package org.apache.doris.common; +import com.google.common.base.Strings; + /** * Thrown for internal server errors. */ public class UserException extends Exception { public UserException(String msg, Throwable cause) { - super(msg, cause); + super(Strings.nullToEmpty(msg), cause); } public UserException(Throwable cause) { super(cause); } - public UserException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); + public UserException(String msg, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(Strings.nullToEmpty(msg), cause, enableSuppression, writableStackTrace); } public UserException(String msg) { - super(msg); + super(Strings.nullToEmpty(msg)); } } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9c6d2668c0be16..0f3478d87d8eec 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -559,7 +559,7 @@ public TFeResult loadCheck(TLoadCheckRequest request) throws TException { } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatus_code(TStatusCode.INTERNAL_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); return result; } @@ -580,7 +580,13 @@ public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TEx } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); status.setError_msgs(Lists.newArrayList(e.getMessage())); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + return result; } + return result; } @@ -630,6 +636,11 @@ public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); status.addToError_msgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + return result; } return result; } @@ -674,6 +685,11 @@ public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) t } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); status.addToError_msgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + return result; } return result; @@ -704,6 +720,11 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) throws } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); status.addToError_msgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + return result; } return result; } From 8af408b35a8317268edd54c3622d24613511768d Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 27 Nov 2018 14:13:01 +0800 Subject: [PATCH 2/2] Change setError_msgs to addToError_msgs --- .../doris/service/FrontendServiceImpl.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 0f3478d87d8eec..72ca0f9c4eb258 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -396,11 +396,11 @@ public TFeResult miniLoad(TMiniLoadRequest request) throws TException { } catch (UserException e) { LOG.warn("add mini load error", e); status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); } catch (Throwable e) { LOG.warn("unexpected exception when adding mini load", e); status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); } finally { ConnectContext.remove(); } @@ -465,7 +465,7 @@ public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request String failMsg = "job does not exist. id: " + jobId; LOG.warn(failMsg); status.setStatus_code(TStatusCode.CANCELLED); - status.setError_msgs(Lists.newArrayList(failMsg)); + status.addToError_msgs(failMsg); return result; } @@ -474,7 +474,7 @@ public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request String failMsg = "task info does not exist. task id: " + taskId + ", job id: " + jobId; LOG.warn(failMsg); status.setStatus_code(TStatusCode.CANCELLED); - status.setError_msgs(Lists.newArrayList(failMsg)); + status.addToError_msgs(failMsg); return result; } @@ -554,12 +554,12 @@ public TFeResult loadCheck(TLoadCheckRequest request) throws TException { request.getTbl(), request.getUser_ip(), PrivPredicate.LOAD); } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); return result; } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatus_code(TStatusCode.INTERNAL_ERROR); - status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); return result; } @@ -576,14 +576,14 @@ public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TEx result.setTxnId(loadTxnBeginImpl(request)); } catch (LabelAlreadyExistsException e) { status.setStatus_code(TStatusCode.LABEL_ALREADY_EXISTS); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatus_code(TStatusCode.INTERNAL_ERROR); - status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); return result; } @@ -630,8 +630,7 @@ public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws if (!loadTxnCommitImpl(request)) { // committed success but not visible status.setStatus_code(TStatusCode.PUBLISH_TIMEOUT); - status.setError_msgs( - Lists.newArrayList("transaction commit successfully, BUT data will be visible later")); + status.addToError_msgs("transaction commit successfully, BUT data will be visible later"); } } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); @@ -639,7 +638,7 @@ public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatus_code(TStatusCode.INTERNAL_ERROR); - status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); return result; } return result; @@ -688,7 +687,7 @@ public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) t } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatus_code(TStatusCode.INTERNAL_ERROR); - status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); return result; } @@ -723,7 +722,7 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) throws } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatus_code(TStatusCode.INTERNAL_ERROR); - status.setError_msgs(Lists.newArrayList(Strings.nullToEmpty(e.getMessage()))); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); return result; } return result;