Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,21 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
_tunnel_url(tdesc.mcTable.tunnel_url),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
_public_access(tdesc.mcTable.public_access) {}
_public_access(tdesc.mcTable.public_access) {
if (tdesc.mcTable.__isset.endpoint) {
_endpoint = tdesc.mcTable.endpoint;
} else {
_init_status = Status::InvalidArgument(
"fail to init MaxComputeTableDescriptor, missing endpoint.");
}

if (tdesc.mcTable.__isset.quota) {
_quota = tdesc.mcTable.quota;
} else {
_init_status =
Status::InvalidArgument("fail to init MaxComputeTableDescriptor, missing quota.");
}
}

MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;

Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,22 @@ class MaxComputeTableDescriptor : public TableDescriptor {
std::string access_key() const { return _access_key; }
std::string secret_key() const { return _secret_key; }
std::string public_access() const { return _public_access; }
std::string endpoint() const { return _endpoint; }
std::string quota() const { return _quota; }
Status init_status() const { return _init_status; }

private:
std::string _region;
std::string _region; //deprecated
std::string _project;
std::string _table;
std::string _odps_url;
std::string _tunnel_url;
std::string _odps_url; //deprecated
std::string _tunnel_url; //deprecated
std::string _access_key;
std::string _secret_key;
std::string _public_access;
std::string _public_access; //deprecated
std::string _endpoint;
std::string _quota;
Status _init_status = Status::OK();
};

class TrinoConnectorTableDescriptor : public TableDescriptor {
Expand Down
28 changes: 15 additions & 13 deletions be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,21 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
}
index++;
}
std::map<String, String> params = {{"region", _table_desc->region()},
{"odps_url", _table_desc->odps_url()},
{"tunnel_url", _table_desc->tunnel_url()},
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"project", _table_desc->project()},
{"partition_spec", _max_compute_params.partition_spec},
{"table", _table_desc->table()},
{"public_access", _table_desc->public_access()},
{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()}};
std::map<String, String> params = {
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"endpoint", _table_desc->endpoint()},
{"quota", _table_desc->quota()},
{"project", _table_desc->project()},
{"table", _table_desc->table()},

{"session_id", _max_compute_params.session_id},
{"scan_serializer", _max_compute_params.table_batch_read_session},

{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()}};
_jni_connector = std::make_unique<JniConnector>(
"org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names);
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,9 @@ Status VFileScanner::_get_next_reader() {
range.table_format_params.table_format_type == "max_compute") {
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
if (!mc_desc->init_status()) {
return mc_desc->init_status();
}
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
range, _state, _profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public interface ColumnValue {

LocalDate getDate();

default String getChar() {
return getString();
}

default byte[] getCharAsBytes() {
return getStringAsBytes();
}

default boolean canGetCharAsBytes() {
return canGetStringAsBytes();
}

LocalDateTime getDateTime();

byte[] getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,12 @@ public void appendValue(ColumnValue o) {
appendDateTime(o.getDateTime());
break;
case CHAR:
if (o.canGetCharAsBytes()) {
appendBytesAndOffset(o.getCharAsBytes());
} else {
appendStringAndOffset(o.getChar());
}
break;
case VARCHAR:
case STRING:
if (o.canGetStringAsBytes()) {
Expand Down
6 changes: 6 additions & 0 deletions fe/be-java-extensions/max-compute-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ under the License.
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>${maxcompute.version}</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
Expand All @@ -54,6 +55,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>${maxcompute.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,36 @@
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.holders.NullableTimeStampNanoHolder;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.log4j.Logger;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;

/**
Expand Down Expand Up @@ -193,6 +201,33 @@ public String getString() {
return v == null ? new String(new byte[0]) : v;
}



public String getChar() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
return varcharCol.getObject(idx++).toString().stripTrailing();
}

// Maybe I can use `appendBytesAndOffset(byte[] src, int offset, int length)` to reduce the creation of byte[].
// But I haven't figured out how to write it elegantly.
public byte[] getCharAsBytes() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
byte[] v = varcharCol.getObject(idx++).getBytes();

if (v == null) {
return new byte[0];
}

int end = v.length - 1;
while (end >= 0 && v[end] == ' ') {
end--;
}
return (end == -1) ? new byte[0] : Arrays.copyOfRange(v, 0, end + 1);
}


@Override
public byte[] getStringAsBytes() {
skippedIfNull();
Expand All @@ -213,14 +248,52 @@ public LocalDate getDate() {
public LocalDateTime getDateTime() {
skippedIfNull();
LocalDateTime result;
if (column instanceof DateMilliVector) {
DateMilliVector datetimeCol = (DateMilliVector) column;
result = datetimeCol.getObject(idx++);

ArrowType.Timestamp timestampType = ( ArrowType.Timestamp) column.getField().getFieldType().getType();
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) {
result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx++);
} else {
TimeStampNanoVector datetimeCol = (TimeStampNanoVector) column;
result = datetimeCol.getObject(idx++);
NullableTimeStampNanoHolder valueHoder = new NullableTimeStampNanoHolder();
((TimeStampNanoVector) column).get(idx++, valueHoder);
long timestampNanos = valueHoder.value;

result = LocalDateTime.ofEpochSecond(timestampNanos / 1_000_000_000,
(int) (timestampNanos % 1_000_000_000), java.time.ZoneOffset.UTC);
}
return result == null ? LocalDateTime.MIN : result;

/*
timestampType.getUnit()
result = switch (timestampType.getUnit()) {
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++);
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++);
case MILLISECOND -> convertToLocalDateTime((TimeStampMilliTZVector) column, idx++);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++);
};

Because :
MaxCompute type => Doris Type
DATETIME => ScalarType.createDatetimeV2Type(3)
TIMESTAMP_NTZ => ScalarType.createDatetimeV2Type(6);

and
TableBatchReadSession
.withArrowOptions (
ArrowOptions.newBuilder()
.withDatetimeUnit(TimestampUnit.MILLI)
.withTimestampUnit(TimestampUnit.NANO)
.build()
)
,
TIMESTAMP_NTZ is NTZ => column is TimeStampNanoVector

So:
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++);
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++);
may never be used.
*/

return result;
}

@Override
Expand Down Expand Up @@ -248,9 +321,10 @@ public void unpackArray(List<ColumnValue> values) {
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
skippedIfNull();
MapVector mapCol = (MapVector) column;
int elemSize = mapCol.getObject(idx).size();
FieldVector keyList = mapCol.getDataVector().getChildrenFromFields().get(0);
FieldVector valList = mapCol.getDataVector().getChildrenFromFields().get(1);
int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx);
List<FieldVector> innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields();
FieldVector keyList = innerCols.get(0);
FieldVector valList = innerCols.get(1);
for (int i = 0; i < elemSize; i++) {
MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset);
keys.add(key);
Expand All @@ -265,10 +339,35 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
skippedIfNull();
StructVector structCol = (StructVector) column;
List<FieldVector> innerCols = structCol.getChildrenFromFields();
for (Integer fieldIndex : structFieldIndex) {
MaxComputeColumnValue val = new MaxComputeColumnValue(structCol.getChildByOrdinal(fieldIndex), idx);
MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx);
values.add(val);
}
idx++;
}

public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
long timestampMillis = milliTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) {
long timestampNanos = nanoTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampNanos / 1_000_000_000,
timestampNanos % 1_000_000_000), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampSecTZVector secTZVector, int index) {
long timestampSeconds = secTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampSeconds), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampMicroTZVector microTZVector, int index) {
long timestampMicros = microTZVector.get(index);
long seconds = timestampMicros / 1_000_000;
long nanos = (timestampMicros % 1_000_000) * 1_000;

return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanos), ZoneId.systemDefault());
}
}
Loading