From 40e2be1da8ddf276412ec8ef53fba8b15184e5b1 Mon Sep 17 00:00:00 2001 From: HuangWei Date: Thu, 7 Jan 2021 10:56:28 +0800 Subject: [PATCH] Update RowBatch.java --- .../doris/spark/serialization/RowBatch.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 0781f1e9252fc5..ad3cfe523a8d45 100644 --- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; @@ -52,10 +53,10 @@ * row batch data container. */ public class RowBatch { - private static Logger logger = LoggerFactory.getLogger(RowBatch.class); + private static final Logger logger = LoggerFactory.getLogger(RowBatch.class); public static class Row { - private List cols; + private final List cols; Row(int colCount) { this.cols = new ArrayList<>(colCount); @@ -74,11 +75,10 @@ public void put(Object o) { private int offsetInRowBatch = 0; private int rowCountInOneBatch = 0; private int readRowCount = 0; - private List rowBatch = new ArrayList<>(); + private final List rowBatch = new ArrayList<>(); private final ArrowStreamReader arrowStreamReader; - private final VectorSchemaRoot root; private List fieldVectors; - private RootAllocator rootAllocator; + private final RootAllocator rootAllocator; private final Schema schema; public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException { @@ -88,9 +88,8 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio new ByteArrayInputStream(nextResult.getRows()), rootAllocator ); - this.offsetInRowBatch = 0; try { - this.root = arrowStreamReader.getVectorSchemaRoot(); + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); while (arrowStreamReader.loadNextBatch()) { fieldVectors = root.getFieldVectors(); if (fieldVectors.size() != schema.size()) { @@ -119,10 +118,7 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio } public boolean hasNext() { - if (offsetInRowBatch < readRowCount) { - return true; - } - return false; + return offsetInRowBatch < readRowCount; } private void addValueToRow(int rowIndex, Object obj) { @@ -268,7 +264,7 @@ public void convertArrowToRowBatch() throws DorisException { addValueToRow(rowIndex, null); continue; } - String value = new String(varCharVector.get(rowIndex)); + String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); addValueToRow(rowIndex, value); } break; @@ -284,7 +280,7 @@ public void convertArrowToRowBatch() throws DorisException { } } - public List next() throws DorisException { + public List next() { if (!hasNext()) { String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount; logger.error(errMsg);