Skip to content

[Bug] The value of timestamp type data obtained through arrow is incorrect. #38174

@liugddx

Description

@liugddx

Search before asking

  • I had searched in the issues and found no similar issues.

Version

master

What's Wrong?

The value of timestamp type data obtained through arrow is incorrect.
image
image

What You Expected?

The actual value is 2024, but the extracted value is 1970.

How to Reproduce?

  • ddl
-- e2e_sink.doris_e2e_unique_table definition

CREATE TABLE `doris_e2e_unique_table` (
  `F_ID` bigint(20) NULL,
  `F_INT` int(11) NULL,
  `F_BIGINT` bigint(20) NULL,
  `F_TINYINT` tinyint(4) NULL,
  `F_SMALLINT` smallint(6) NULL,
  `F_DECIMAL` DECIMAL(18, 6) NULL,
  `F_LARGEINT` largeint(40) NULL,
  `F_BOOLEAN` boolean NULL,
  `F_DOUBLE` double NULL,
  `F_FLOAT` float NULL,
  `F_CHAR` char(1) NULL,
  `F_VARCHAR_11` varchar(11) NULL,
  `F_STRING` text NULL,
  `F_DATETIME_P` datetime(6) NULL,
  `F_DATETIME` datetime NULL,
  `F_DATE` date NULL,
  `MAP_VARCHAR_BOOLEAN` MAP<text,boolean> NULL,
  `MAP_CHAR_TINYINT` MAP<text,tinyint(4)> NULL,
  `MAP_STRING_SMALLINT` MAP<text,smallint(6)> NULL,
  `MAP_INT_INT` MAP<int(11),int(11)> NULL,
  `MAP_TINYINT_BIGINT` MAP<tinyint(4),bigint(20)> NULL,
  `MAP_SMALLINT_LARGEINT` MAP<smallint(6),decimalv3(20, 0)> NULL,
  `MAP_BIGINT_FLOAT` MAP<bigint(20),float> NULL,
  `MAP_LARGEINT_DOUBLE` MAP<decimalv3(20, 0),double> NULL,
  `MAP_STRING_DECIMAL` MAP<text,decimalv3(10, 2)> NULL,
  `MAP_DECIMAL_DATE` MAP<decimalv3(10, 2),datev2> NULL,
  `MAP_DATE_DATETIME` MAP<datev2,datetimev2(6)> NULL,
  `MAP_DATETIME_CHAR` MAP<datetimev2(6),text> NULL,
  `MAP_CHAR_VARCHAR` MAP<text,text> NULL,
  `MAP_VARCHAR_STRING` MAP<text,text> NULL
) ENGINE=OLAP
UNIQUE KEY(`F_ID`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`F_ID`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"is_being_synced" = "false",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);

-data

INSERT INTO `doris_e2e_unique_table` 
(F_ID, F_INT, F_BIGINT, F_TINYINT, F_SMALLINT, F_DECIMAL, F_LARGEINT, F_BOOLEAN, F_DOUBLE, F_FLOAT, F_CHAR, F_VARCHAR_11, F_STRING, F_DATETIME_P, F_DATETIME, F_DATE, MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING) 
VALUES 
(1, 10, 10000000000, 1, 100, 123.456789, 9223372036854775807, TRUE, 3.14159, 2.71828, 'A', 'Hello World', 'Sample text for the string field.', '2024-07-19 12:00:00.123456', '2024-07-19 12:00:00', '2024-07-19', 
    {'key1': TRUE, 'key2': FALSE}, 
    {'A': 1, 'B': 2}, 
    {'text1': 1, 'text2': 2}, 
    {1: 1, 2: 4}, 
    {1: 10000000000}, 
    {100: 9223372036854775807}, 
    {10000000000: 3.14}, 
    {9223372036854775807: 3.14}, 
    {'value1': 123.45, 'value2': 678.90}, 
    {123.45: '2024-07-19'}, 
    {'2024-07-19': '2024-07-19 12:00:00'}, 
    {'2024-07-19 12:00:00': 'Datetime example'}, 
    {'key1': 'value1', 'key2': 'value2'}, 
    {'keyA': 'Sample text', 'keyB': 'Another text'}
)

-code

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.liugddx;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.TDorisExternalService;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
import org.apache.doris.sdk.thrift.TScanColumnDesc;
import org.apache.doris.sdk.thrift.TScanNextBatchParams;
import org.apache.doris.sdk.thrift.TScanOpenParams;
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Main {
    static String dorisUrl = "172.30.34.130:8030";
    static String user = "root";
    static String password = "";
    static String database = "e2e_source";
    static String table = "doris_e2e_unique_table";
    static String sql = "select * from e2e_source.doris_e2e_unique_table";
    static int readRowCount = 0;
    static List<List<Object>> result = new ArrayList<>();

    private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
    private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
    private final DateTimeFormatter dateTimeV2Formatter =
            DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");

    public static void main(String[] args) throws Exception {
        JSONObject queryPlan = getQueryPlan();
        System.out.println(queryPlan);
        readData(queryPlan);
        System.out.println(result);
        System.out.println(result.size());
    }

    private static JSONObject getQueryPlan() throws Exception {
        try (CloseableHttpClient client = HttpClients.custom().build()) {
            HttpPost post = new HttpPost(String.format("http://%s/api/%s/%s/_query_plan", dorisUrl, database, table));
            post.setHeader(HttpHeaders.EXPECT, "100-continue");
            post.setHeader(HttpHeaders.AUTHORIZATION,  "Basic " + new String(Base64.encodeBase64((user + ":" + password).getBytes(StandardCharsets.UTF_8))));

            //The param is specific SQL, and the query plan is returned
            Map<String,String> params = new HashMap<>();
            params.put("sql",sql);
            StringEntity entity = new StringEntity(JSON.toJSONString(params));
            post.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(post)) {
                if (response.getEntity() != null ) {
                    String loadResult = EntityUtils.toString(response.getEntity());
                    JSONObject queryPlan = JSONObject.parseObject(loadResult);
                    return queryPlan;
                }
            }
        }
        return null;
    }

    private static void readData(JSONObject queryRes) throws Exception {
        JSONObject data = queryRes.getJSONObject("data");
        String queryPlan = data.getString("opaqued_query_plan");
        JSONObject partitions = data.getJSONObject("partitions");
        for(Map.Entry<String, Object> tablet : partitions.entrySet()){
            Long tabletId = Long.parseLong(tablet.getKey());
            JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));
            //get first backend
            String routingsBackend = value.getJSONArray("routings").getString(0);
            String backendHost = routingsBackend.split(":")[0];
            String backendPort = routingsBackend.split(":")[1];

            //connect backend
            TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
            TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));
            TProtocol protocol = factory.getProtocol(transport);
            TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);
            if (!transport.isOpen()) {
                transport.open();
            }

            //build params
            TScanOpenParams params = new TScanOpenParams();
            params.cluster = "default_cluster";
            params.database = database;
            params.table = table;
            params.tablet_ids = Arrays.asList(tabletId);
            params.opaqued_query_plan = queryPlan;
            // max row number of one read batch
            params.setBatchSize(1024);
            params.setQueryTimeout(3600);
            params.setMemLimit(2147483648L);
            params.setUser(user);
            params.setPasswd(password);

            //            int offset =0;
            //open scanner
            TScanOpenResult tScanOpenResult = client.openScanner(params);
            if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {
                throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",
                        routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));
            }
            List<TScanColumnDesc> selectedColumns = tScanOpenResult.getSelectedColumns();

            TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
            nextBatchParams.setContextId(tScanOpenResult.getContextId());
            boolean eos = false;
            //read data
            int offset = 0;
            while(!eos){
                nextBatchParams.setOffset(offset);
                TScanBatchResult next = client.getNext(nextBatchParams);
                if (!TStatusCode.OK.equals(next.getStatus().getStatusCode())) {
                    throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",
                            routingsBackend, next.getStatus().getStatusCode(), next.getStatus().getErrorMsgs()));
                }
                eos = next.isEos();
                if(!eos){
                    int i = convertArrow(next, selectedColumns);
                    offset += i;
                    readRowCount = offset;
                }
            }
            //close
            TScanCloseParams closeParams = new TScanCloseParams();
            closeParams.setContextId(tScanOpenResult.getContextId());
            client.closeScanner(closeParams);
            if ((transport != null) && transport.isOpen()) {
                transport.close();
            }
            //System.out.println(String.format("read tablet %s from backend %s finished", tabletId, routingsBackend));
        }
    }

    private static int convertArrow(TScanBatchResult nextResult, List<TScanColumnDesc> selectedColumns) throws Exception {
        int offset = 0;
        RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
        ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);

        VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
        while (arrowStreamReader.loadNextBatch()) {
            List<FieldVector>  fieldVectors = root.getFieldVectors();
            //total data rows
            int rowCountInOneBatch = root.getRowCount();
            // init the result
            for (int i = 0; i < rowCountInOneBatch; ++i) {
                result.add(new ArrayList<>(fieldVectors.size()));
            }
            //Arrow returns in column format and needs to be converted to row format
            for (int col = 0; col < fieldVectors.size(); col++) {
                FieldVector fieldVector = fieldVectors.get(col);
                Types.MinorType minorType = fieldVector.getMinorType();
                for(int row = 0 ; row < rowCountInOneBatch ;row++){
                    convertValue(row , minorType, fieldVector);
                }
            }
            offset += root.getRowCount();
        }
        return offset;
    }


    private static void convertValue(int rowIndex,
            Types.MinorType minorType,
            FieldVector fieldVector) throws Exception {

        switch (minorType) {
            case BIT:
                BitVector bitVector = (BitVector) fieldVector;
                Object fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case TINYINT:
                TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
                fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case SMALLINT:
                SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
                fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case INT:
                IntVector intVector = (IntVector) fieldVector;
                fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case BIGINT:
                BigIntVector bigIntVector = (BigIntVector) fieldVector;
                fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case FLOAT4:
                Float4Vector float4Vector = (Float4Vector) fieldVector;
                fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case FLOAT8:
                Float8Vector float8Vector = (Float8Vector) fieldVector;
                fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case VARBINARY:
                VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
                fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            case DECIMAL:
                DecimalVector decimalVector = (DecimalVector) fieldVector;
                BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
                result.get(readRowCount + rowIndex).add(value);
                break;
            case VARCHAR:
                VarCharVector date = (VarCharVector) fieldVector;
                String stringValue = new String(date.get(rowIndex));
                result.get(readRowCount + rowIndex).add(stringValue);
                break;
            case LIST:
                ListVector listVector = (ListVector) fieldVector;
                List listValue = listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex);
                result.get(readRowCount + rowIndex).add(listValue);
                break;
            case TIMESTAMPMICRO:
                TimeStampMicroVector timestampVector = (TimeStampMicroVector) fieldVector;
                String timestampVectorStringValue = timestampVector.getObject(rowIndex).toString();
                timestampVectorStringValue = completeMilliseconds(timestampVectorStringValue);
                result.get(readRowCount + rowIndex).add(timestampVectorStringValue);
                break;
            case DATEDAY:
                DateDayVector dateDayVector = (DateDayVector) fieldVector;
                fieldValue = dateDayVector.isNull(rowIndex) ? null : LocalDate.ofEpochDay(dateDayVector.get(rowIndex));
                result.get(readRowCount + rowIndex).add(fieldValue);
                break;
            default:
                String errMsg = "Unsupported type " + minorType;
                throw new RuntimeException(errMsg);
        }
    }

    private static String completeMilliseconds(String stringValue) {
        if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
            return stringValue;
        }
        StringBuilder sb = new StringBuilder(stringValue);
        if (stringValue.length() == DATETIME_PATTERN.length()) {
            sb.append(".");
        }
        while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
            sb.append(0);
        }
        return sb.toString();
    }
}

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions