Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
return Status::InternalError("Failed to get/create JVM");
}
SCOPED_TIMER(_open_scanner_time);
_scanner_params.emplace("time_zone", _state->timezone_obj().name());
RETURN_IF_ERROR(_init_jni_scanner(env, batch_size));
// Call org.apache.doris.common.jni.JniScanner#open
env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
Expand Down Expand Up @@ -61,8 +58,8 @@
public class MaxComputeColumnValue implements ColumnValue {
private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class);
private int idx;
private int offset = 0; // for complex type
private ValueVector column;
private ZoneId timeZone;

public MaxComputeColumnValue() {
idx = 0;
Expand All @@ -77,10 +74,19 @@ public MaxComputeColumnValue(ValueVector valueVector, int i) {
this.idx = i;
}

public MaxComputeColumnValue(ValueVector valueVector, int i, ZoneId timeZone) {
this.column = valueVector;
this.idx = i;
this.timeZone = timeZone;
}

public void reset(ValueVector column) {
this.column = column;
this.idx = 0;
this.offset = 0;
}

public void setTimeZone(ZoneId timeZone) {
this.timeZone = timeZone;
}

@Override
Expand Down Expand Up @@ -283,9 +289,10 @@ public byte[] getBytes() {
@Override
public void unpackArray(List<ColumnValue> values) {
ListVector listCol = (ListVector) column;
int elemSize = listCol.getObject(idx).size();
int elemSize = listCol.getElementEndIndex(idx) - listCol.getElementStartIndex(idx);
int offset = listCol.getElementStartIndex(idx);
for (int i = 0; i < elemSize; i++) {
MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset);
MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset, timeZone);
values.add(val);
offset++;
}
Expand All @@ -295,13 +302,14 @@ public void unpackArray(List<ColumnValue> values) {
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
MapVector mapCol = (MapVector) column;
int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx);
int offset = mapCol.getElementStartIndex(idx);
List<FieldVector> innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields();
FieldVector keyList = innerCols.get(0);
FieldVector valList = innerCols.get(1);
for (int i = 0; i < elemSize; i++) {
MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset);
MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset, timeZone);
keys.add(key);
MaxComputeColumnValue val = new MaxComputeColumnValue(valList, offset);
MaxComputeColumnValue val = new MaxComputeColumnValue(valList, offset, timeZone);
values.add(val);
offset++;
}
Expand All @@ -312,32 +320,14 @@ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> value
StructVector structCol = (StructVector) column;
List<FieldVector> innerCols = structCol.getChildrenFromFields();
for (Integer fieldIndex : structFieldIndex) {
MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx);
MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx, timeZone);
values.add(val);
}
}

public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
public LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
long timestampMillis = milliTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault());
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), timeZone);
}

public static LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) {
long timestampNanos = nanoTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampNanos / 1_000_000_000,
timestampNanos % 1_000_000_000), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampSecTZVector secTZVector, int index) {
long timestampSeconds = secTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampSeconds), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampMicroTZVector microTZVector, int index) {
long timestampMicros = microTZVector.get(index);
long seconds = timestampMicros / 1_000_000;
long nanos = (timestampMicros % 1_000_000) * 1_000;

return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanos), ZoneId.systemDefault());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.ZoneId;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand All @@ -64,7 +65,7 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String SPLIT_SIZE = "split_size";
private static final String SESSION_ID = "session_id";
private static final String SCAN_SERIALIZER = "scan_serializer";

private static final String TIME_ZONE = "time_zone";

private enum SplitType {
BYTE_SIZE,
Expand All @@ -86,7 +87,7 @@ private enum SplitType {
private long startOffset = -1L;
private long splitSize = -1L;
public EnvironmentSettings settings;

public ZoneId timeZone;

public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
String[] requiredFields = params.get("required_fields").split(",");
Expand Down Expand Up @@ -117,6 +118,13 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'.");
table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'.");
sessionId = Objects.requireNonNull(params.get(SESSION_ID), "required property '" + SESSION_ID + "'.");
String timeZoneName = Objects.requireNonNull(params.get(TIME_ZONE), "required property '" + TIME_ZONE + "'.");
try {
timeZone = ZoneId.of(timeZoneName);
} catch (Exception e) {
LOG.warn(e.getMessage() + " Set timeZoneName = " + timeZoneName + "fail, use systemDefault.");
timeZone = ZoneId.systemDefault();
}


Account account = new AliyunAccount(accessKey, secretKey);
Expand Down Expand Up @@ -172,7 +180,7 @@ public void open() throws IOException {
LOG.info("createArrowReader failed.", e);
} catch (Exception e) {
close();
throw new IOException(e);
throw new IOException(e.getMessage(), e);
}
}

Expand All @@ -192,6 +200,7 @@ protected int getNext() throws IOException {
return 0;
}
columnValue = new MaxComputeColumnValue();
columnValue.setTimeZone(timeZone);
int expectedRows = batchSize;
return readVectors(expectedRows);
}
Expand Down
Loading