Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -52,10 +53,10 @@
* row batch data container.
*/
public class RowBatch {
private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
private static final Logger logger = LoggerFactory.getLogger(RowBatch.class);

public static class Row {
private List<Object> cols;
private final List<Object> cols;

Row(int colCount) {
this.cols = new ArrayList<>(colCount);
Expand All @@ -74,11 +75,10 @@ public void put(Object o) {
private int offsetInRowBatch = 0;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
private List<Row> rowBatch = new ArrayList<>();
private final List<Row> rowBatch = new ArrayList<>();
private final ArrowStreamReader arrowStreamReader;
private final VectorSchemaRoot root;
private List<FieldVector> fieldVectors;
private RootAllocator rootAllocator;
private final RootAllocator rootAllocator;
private final Schema schema;

public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException {
Expand All @@ -88,9 +88,8 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio
new ByteArrayInputStream(nextResult.getRows()),
rootAllocator
);
this.offsetInRowBatch = 0;
try {
this.root = arrowStreamReader.getVectorSchemaRoot();
VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != schema.size()) {
Expand Down Expand Up @@ -119,10 +118,7 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio
}

public boolean hasNext() {
if (offsetInRowBatch < readRowCount) {
return true;
}
return false;
return offsetInRowBatch < readRowCount;
}

private void addValueToRow(int rowIndex, Object obj) {
Expand Down Expand Up @@ -268,7 +264,7 @@ public void convertArrowToRowBatch() throws DorisException {
addValueToRow(rowIndex, null);
continue;
}
String value = new String(varCharVector.get(rowIndex));
String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
addValueToRow(rowIndex, value);
}
break;
Expand All @@ -284,7 +280,7 @@ public void convertArrowToRowBatch() throws DorisException {
}
}

public List<Object> next() throws DorisException {
public List<Object> next() {
if (!hasNext()) {
String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount;
logger.error(errMsg);
Expand Down