From 82eeec76053276195a9a306ea883378acd5d9611 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 27 Sep 2024 11:44:38 +0800 Subject: [PATCH 1/5] [Enhancement](maxCompute)Achieve compatibility with catalogs from previous versions. --- .../maxcompute/MaxComputeColumnValue.java | 7 +- .../maxcompute/MaxComputeExternalCatalog.java | 90 ++++++++++++++----- .../property/constants/MCProperties.java | 10 ++- .../test_external_catalog_maxcompute.out | 30 +++++++ .../test_max_compute_complex_type.out | 19 ++-- .../test_external_catalog_maxcompute.groovy | 47 +++++++++- .../test_max_compute_all_type.groovy | 2 +- .../test_max_compute_complex_type.groovy | 33 ++++--- 8 files changed, 188 insertions(+), 50 deletions(-) diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index 98900edce5a546..815bfac9d1e09d 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -61,7 +61,6 @@ public class MaxComputeColumnValue implements ColumnValue { private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class); private int idx; - private int offset = 0; // for complex type private ValueVector column; public MaxComputeColumnValue() { @@ -80,7 +79,6 @@ public MaxComputeColumnValue(ValueVector valueVector, int i) { public void reset(ValueVector column) { this.column = column; this.idx = 0; - this.offset = 0; } @Override @@ -283,7 +281,8 @@ public byte[] getBytes() { @Override public void unpackArray(List values) { ListVector listCol = (ListVector) column; - int elemSize = listCol.getObject(idx).size(); + int elemSize = listCol.getElementEndIndex(idx) - listCol.getElementStartIndex(idx); + int offset = listCol.getElementStartIndex(idx); for (int i = 0; i < elemSize; i++) { MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset); values.add(val); @@ -295,6 +294,7 @@ public void unpackArray(List values) { public void unpackMap(List keys, List values) { MapVector mapCol = (MapVector) column; int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx); + int offset = mapCol.getElementStartIndex(idx); List innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields(); FieldVector keyList = innerCols.get(0); FieldVector valList = innerCols.get(1); @@ -319,6 +319,7 @@ public void unpackStruct(List structFieldIndex, List value public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) { long timestampMillis = milliTZVector.get(index); + // time_zone DateTime set return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index cfcf4331b96a97..b20546bf67e6a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -29,12 +29,17 @@ import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; import com.aliyun.odps.Partition; +import com.aliyun.odps.Project; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.security.SecurityManager; import com.aliyun.odps.table.configuration.SplitOptions; import com.aliyun.odps.table.enviroment.Credentials; import com.aliyun.odps.table.enviroment.EnvironmentSettings; +import com.aliyun.odps.utils.StringUtils; import com.google.common.collect.ImmutableList; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import java.util.ArrayList; import java.util.Iterator; @@ -43,6 +48,8 @@ import java.util.stream.Collectors; public class MaxComputeExternalCatalog extends ExternalCatalog { + private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api"; + private Odps odps; private String accessKey; private String secretKey; @@ -50,6 +57,7 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { private String defaultProject; private String quota; private EnvironmentSettings settings; + private String catalogOwner; private String splitStrategy; private SplitOptions splitOptions; @@ -67,10 +75,47 @@ public MaxComputeExternalCatalog(long catalogId, String name, String resource, M catalogProperty = new CatalogProperty(resource, props); } + //Compatible with existing catalogs in previous versions. + protected void generatorEndpoint() { + Map props = catalogProperty.getProperties(); + + if (props.containsKey(MCProperties.ENDPOINT)) { + endpoint = props.get(MCProperties.ENDPOINT); + return; + } else if (props.containsKey(MCProperties.TUNNEL_SDK_ENDPOINT)) { + String tunnelEndpoint = props.get(MCProperties.TUNNEL_SDK_ENDPOINT); + endpoint = tunnelEndpoint.replace("//dt", "//service") + "/api"; + } else if (props.containsKey(MCProperties.ODPS_ENDPOINT)) { + endpoint = props.get(MCProperties.ODPS_ENDPOINT); + } else if (props.containsKey(MCProperties.REGION)) { + //Copied from original logic + String region = props.get(MCProperties.REGION); + if (region.startsWith("oss-")) { + // may use oss-cn-beijing, ensure compatible + region = region.replace("oss-", ""); + } + boolean enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, + MCProperties.DEFAULT_PUBLIC_ACCESS)); + endpoint = endpointTemplate.replace("{}", region); + if (enablePublicAccess) { + endpoint = endpoint.replace("-inc", ""); + } + } + /* + Since MCProperties.REGION is a REQUIRED_PROPERTIES in previous versions + and MCProperties.ENDPOINT is a REQUIRED_PROPERTIES in current versions, + `else {}` is not needed here. + */ + catalogProperty.addProperty(MCProperties.ENDPOINT, endpoint); + } + + @Override protected void initLocalObjectsImpl() { Map props = catalogProperty.getProperties(); + generatorEndpoint(); + endpoint = props.get(MCProperties.ENDPOINT); defaultProject = props.get(MCProperties.PROJECT); quota = props.getOrDefault(MCProperties.QUOTA, MCProperties.DEFAULT_QUOTA); @@ -124,26 +169,25 @@ protected List listDatabaseNames() { List result = new ArrayList<>(); result.add(defaultProject); - // TODO: Improve `show tables` and `select * from table` when `use other project`. - // try { - // result.add(defaultProject); - // if (StringUtils.isNullOrEmpty(catalogOwner)) { - // SecurityManager sm = odps.projects().get().getSecurityManager(); - // String whoami = sm.runQuery("whoami", false); - // - // JsonObject js = JsonParser.parseString(whoami).getAsJsonObject(); - // catalogOwner = js.get("DisplayName").getAsString(); - // } - // Iterator iterator = odps.projects().iterator(catalogOwner); - // while (iterator.hasNext()) { - // Project project = iterator.next(); - // if (!project.getName().equals(defaultProject)) { - // result.add(project.getName()); - // } - // } - // } catch (OdpsException e) { - // throw new RuntimeException(e); - // } + try { + result.add(defaultProject); + if (StringUtils.isNullOrEmpty(catalogOwner)) { + SecurityManager sm = odps.projects().get().getSecurityManager(); + String whoami = sm.runQuery("whoami", false); + + JsonObject js = JsonParser.parseString(whoami).getAsJsonObject(); + catalogOwner = js.get("DisplayName").getAsString(); + } + Iterator iterator = odps.projects().iterator(catalogOwner); + while (iterator.hasNext()) { + Project project = iterator.next(); + if (!project.getName().equals(defaultProject)) { + result.add(project.getName()); + } + } + } catch (OdpsException e) { + throw new RuntimeException(e); + } return result; } @@ -166,11 +210,11 @@ public List listPartitionNames(String dbName, String tbl, long skip, lon if (getClient().projects().exists(dbName)) { List parts; if (limit < 0) { - parts = getClient().tables().get(tbl).getPartitions(); + parts = getClient().tables().get(dbName, tbl).getPartitions(); } else { skip = skip < 0 ? 0 : skip; parts = new ArrayList<>(); - Iterator it = getClient().tables().get(tbl).getPartitionIterator(); + Iterator it = getClient().tables().get(dbName, tbl).getPartitionIterator(); int count = 0; while (it.hasNext()) { if (count < skip) { @@ -197,7 +241,7 @@ public List listPartitionNames(String dbName, String tbl, long skip, lon public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); List result = new ArrayList<>(); - getClient().tables().forEach(e -> result.add(e.getName())); + getClient().tables().iterable(dbName).forEach(e -> result.add(e.getName())); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java index df4fa068ca706d..20a77574fc7820 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java @@ -25,11 +25,17 @@ * properties for aliyun max compute */ public class MCProperties extends BaseProperties { + + //To be compatible with previous versions of the catalog. public static final String REGION = "mc.region"; - public static final String PROJECT = "mc.default.project"; - public static final String SESSION_TOKEN = "mc.session_token"; public static final String PUBLIC_ACCESS = "mc.public_access"; public static final String DEFAULT_PUBLIC_ACCESS = "false"; + public static final String ODPS_ENDPOINT = "mc.odps_endpoint"; + public static final String TUNNEL_SDK_ENDPOINT = "mc.tunnel_endpoint"; + + + public static final String PROJECT = "mc.default.project"; + public static final String SESSION_TOKEN = "mc.session_token"; public static final String ACCESS_KEY = "mc.access_key"; public static final String SECRET_KEY = "mc.secret_key"; diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out index 8e2dbfd52b2a51..63677f14720494 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out +++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out @@ -139,6 +139,29 @@ yy=2023/mm=08/dd=05/pt=5 5 2023 08 05 5 2023 08 05 +-- !other_db_show -- +other_db_mc_parts +other_db_mc_tb + +-- !other_db_select -- +1 10 +2 20 +3 30 + +-- !other_db_select_partiton -- +1001 Sample data 1 a +1002 Sample data 2 b +1003 Sample data 3 c +1004 Sample data 4 d +1005 Sample data 5 e + +-- !other_db_show_partiton -- +dt=a +dt=b +dt=c +dt=d +dt=e + -- !null_1 -- 1 1 2 \N @@ -157,3 +180,10 @@ yy=2023/mm=08/dd=05/pt=5 3 \N 5 \N +-- !show_partition -- +dt=2023-08-01 +dt=2023-08-02 +dt=2023-08-03 +dt=2023-08-04 +dt=2023-08-05 + diff --git a/regression-test/data/external_table_p2/maxcompute/test_max_compute_complex_type.out b/regression-test/data/external_table_p2/maxcompute/test_max_compute_complex_type.out index 226c4d8f3043f4..32a6e57688cb69 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_max_compute_complex_type.out +++ b/regression-test/data/external_table_p2/maxcompute/test_max_compute_complex_type.out @@ -1,19 +1,26 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !mc_q1 -- -3 [1.3] [1, 2, 3] ["2023-05-23 13:55:12.000"] ["a", "b", "c"] -2 [1.2, 1.3] [1, 2, 3] ["2023-05-23 13:55:12.000"] ["a", "b", "c"] -1 [1.2, 1.3] [1, 2, 3] ["2023-05-23 13:55:12.000"] ["a", "b", "c"] -1 [1.2, 1.3] [1, 2, 3] ["2023-05-23 13:55:12.000"] ["a", "b", "c"] +1 [1.2, 1.3] [1] ["2023-05-23 13:55:12.000"] ["a", "b", "c", "ssadasda"] ["2023-05-23", "2021-05-23"] +2 [1, 1.3] [1, 3] ["2023-05-23 13:55:12.000"] ["a", "ccccb", "c"] ["2023-05-23", "2023-05-26"] +3 [1.3] [1, 2, 3] ["2023-05-23 13:55:12.000"] ["a", "b3", "c2"] ["2023-05-23"] -- !mc_q2 -- {1:"example1", 2:"example2"} {1:2.5, 2:3.75} +{34900:"assssd", 32994:"uisd"} {10000000:999.5, 98889:31.75, 1:2} {349:"asd", 324:"uid"} {3:2.5, 99:3.75} -- !mc_q3 -- {"phone_number":123450, "email":"user1@example.com", "addr":"Addr1"} {"id":"user1", "age":25} {"phone_number":2345671, "email":"user2@example.com", "addr":"Addr2"} {"id":"user2", "age":30} +{"phone_number":3456789, "email":"user3@example.com", "addr":"Addr3"} {"id":"user3", "age":35} -- !mc_q4 -- -user1 [{"activity_date":"2024-08-01", "activities":{"cooking":{"details":"Made vegan meal", "metrics":{"time_spent":1.5, "calories":500}}, "movie":{"details":"Watched action movie", "metrics":{"time_spent":1.5, "calories":500}}}}, {"activity_date":"2024-08-02", "activities":{"cooking":{"details":"Made vegan meal", "metrics":{"time_spent":1.5, "calories":500}}, "movie":{"details":"Watched action movie", "metrics":{"time_spent":1.5, "calories":500}}}}] -user1 [{"activity_date":"2024-08-01", "activities":{"cooking":{"details":"Made vegan meal", "metrics":{"time_spent":1.5, "calories":500}}, "movie":{"details":"Watched action movie", "metrics":{"time_spent":1.5, "calories":500}}}}, {"activity_date":"2024-08-02", "activities":{"cooking":{"details":"Made vegan meal", "metrics":{"time_spent":1.5, "calories":500}}, "movie":{"details":"Watched action movie", "metrics":{"time_spent":1.5, "calories":500}}}}] +user1 [{"activity_date":"2024-08-01", "activities":{"workout":{"details":"Morning run", "metrics":{"duration":30.5, "calories":200}}, "reading":{"details":"Read book on Hive", "metrics":{"pages":50, "time":2}}}}, {"activity_date":"2024-08-02", "activities":{"travel":{"details":"Flight to NY", "metrics":{"distance":500, "time":3}}, "meeting":{"details":"Project meeting", "metrics":{"duration":1.5, "participants":5}}}}] +user2 [{"activity_date":"2024-08-01", "activities":{"hiking":{"details":"Mountain trail", "metrics":{"distance":10, "elevation":500}}, "photography":{"details":"Wildlife photoshoot", "metrics":{"photos_taken":100, "time":4}}}}, {"activity_date":"2024-08-02", "activities":{"workshop":{"details":"Photography workshop", "metrics":{"duration":3, "participants":15}}, "shopping":{"details":"Bought camera gear", "metrics":{"items":5, "cost":1500}}}}] +user3 [{"activity_date":"2024-08-01", "activities":{"cooking":{"details":"Made vegan meal", "metrics":{"time_spent":1.5, "calories":500}}, "movie":{"details":"Watched action movie", "metrics":{"duration":2, "rating":8.5}}}}, {"activity_date":"2024-08-02", "activities":{"gym":{"details":"Strength training", "metrics":{"duration":1, "calories":300}}, "shopping":{"details":"Bought groceries", "metrics":{"items":10, "cost":100}}}}] + +-- !mc_q5 -- +user1 {"name":"Alice", "age":28, "preferences":{"sports":{"preference_id":101, "preference_values":["soccer", "tennis"]}, "music":{"preference_id":102, "preference_values":["rock", "classical"]}}} [{"activity_date":"2024-08-01", "activities":{"workout":{"details":"Morning run", "metrics":{"duration":30.5, "calories":200}}, "reading":{"details":"Read book on Hive", "metrics":{"pages":50, "time":2}}}}, {"activity_date":"2024-08-02", "activities":{"travel":{"details":"Flight to NY", "metrics":{"distance":500, "time":3}}, "meeting":{"details":"Project meeting", "metrics":{"duration":1.5, "participants":5}}}}] +user2 {"name":"Bob", "age":32, "preferences":{"books":{"preference_id":201, "preference_values":["fiction", "non-fiction"]}, "travel":{"preference_id":202, "preference_values":["beaches", "mountains"]}}} [{"activity_date":"2024-08-01", "activities":{"hiking":{"details":"Mountain trail", "metrics":{"distance":10, "elevation":500}}, "photography":{"details":"Wildlife photoshoot", "metrics":{"photos_taken":100, "time":4}}}}, {"activity_date":"2024-08-02", "activities":{"workshop":{"details":"Photography workshop", "metrics":{"duration":3, "participants":15}}, "shopping":{"details":"Bought camera gear", "metrics":{"items":5, "cost":1500}}}}] +user3 {"name":"Carol", "age":24, "preferences":{"food":{"preference_id":301, "preference_values":["vegan", "desserts"]}, "movies":{"preference_id":302, "preference_values":["action", "comedy"]}}} [{"activity_date":"2024-08-01", "activities":{"cooking":{"details":"Made vegan meal", "metrics":{"time_spent":1.5, "calories":500}}, "movie":{"details":"Watched action movie", "metrics":{"duration":2, "rating":8.5}}}}, {"activity_date":"2024-08-02", "activities":{"gym":{"details":"Strength training", "metrics":{"duration":1, "calories":300}}, "shopping":{"details":"Bought groceries", "metrics":{"items":10, "cost":100}}}}] diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy index 6663a2aa842df3..ffb633c7b728d6 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -65,6 +65,7 @@ INSERT INTO `multi_partitions` PARTITION (yy='2023', mm='08', dd='05', pt=5) VALUES ('San Jose', FALSE, CAST(14 AS SMALLINT), CAST(1.20 AS FLOAT), CAST(9999.111111111 AS DECIMAL(24,9)), CAST('2023-08-05' AS DATE), CAST('2023-08-05 15:30:00' AS DATETIME), CAST('2023-08-05 16:00:00' AS timestamp_ntz)); + drop table mc_parts; CREATE TABLE `mc_parts` ( `mc_bigint` bigint, @@ -82,6 +83,7 @@ (1004, 'Sample data 4'); INSERT INTO `mc_parts` PARTITION (dt='2023-08-05') VALUES (1005, 'Sample data 5'); + CREATE TABLE int_types ( mc_boolean BOOLEAN, mc_tinyint TINYINT, @@ -105,6 +107,7 @@ (FALSE, CAST(8 AS TINYINT), CAST(800 AS SMALLINT), CAST(8000 AS BIGINT)), (TRUE, CAST(9 AS TINYINT), CAST(900 AS SMALLINT), CAST(9000 AS BIGINT)), (FALSE, CAST(10 AS TINYINT), CAST(1000 AS SMALLINT), CAST(10000 AS BIGINT)); + CREATE TABLE web_site ( web_site_sk BIGINT, web_site_id STRING, @@ -278,6 +281,7 @@ CAST(-7.0 AS DOUBLE), CAST(8.00 AS DECIMAL(5,2)) ); + drop table mc_test_null; CREATE TABLE `mc_test_null` ( `id` int, @@ -285,13 +289,40 @@ ); insert into mc_test_null values (1,1),(2,NULL),(3,NULL),(4,4),(5,NULL),(6,6); + -- other project : other_mc_datalake_test + CREATE TABLE other_db_mc_tb ( + `id` int, + `col` int + ); + insert into other_db_mc_tb values (1,10),(2,20),(3,30); + + + CREATE TABLE `other_db_mc_parts` ( + `mc_bigint` bigint, + `mc_string` string + )PARTITIONED BY ( + `dt` string + ); + INSERT INTO `other_db_mc_parts` PARTITION (dt='a') VALUES + (1001, 'Sample data 1'); + INSERT INTO `other_db_mc_parts` PARTITION (dt='b') VALUES + (1002, 'Sample data 2'); + INSERT INTO `other_db_mc_parts` PARTITION (dt='c') VALUES + (1003, 'Sample data 3'); + INSERT INTO `other_db_mc_parts` PARTITION (dt='d') VALUES + (1004, 'Sample data 4'); + INSERT INTO `other_db_mc_parts` PARTITION (dt='e') VALUES + (1005, 'Sample data 5'); + + + */ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remote,external_remote_maxcompute") { String enabled = context.config.otherConfigs.get("enableMaxComputeTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { String ak = context.config.otherConfigs.get("aliYunAk") String sk = context.config.otherConfigs.get("aliYunSk"); - String mc_db = "jz_datalake" + String mc_db = "mc_datalake" String mc_catalog_name = "test_external_mc_catalog" sql """drop catalog if exists ${mc_catalog_name};""" @@ -360,11 +391,23 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """ order_qt_multi_partition_q10 """ select pt, yy, mm, dd from multi_partitions where pt >= 2 and create_time > '2023-08-03 03:11:00' order by pt, yy, mm, dd; """ + + //other db + sql """ use other_mc_datalake_test """ + order_qt_other_db_show """ show tables ; """ + order_qt_other_db_select """ select * from other_db_mc_tb """ + order_qt_other_db_select_partiton """ select * from other_db_mc_parts """ + order_qt_other_db_show_partiton """show partitions from other_db_mc_parts;""" + + + + sql """ use `${mc_db}`; """ //test null value order_qt_null_1 """ select * from mc_test_null; """ order_qt_null_2 """ select * from mc_test_null where col is not null ; """ order_qt_null_3 """ select * from mc_test_null where col is null ; """ - + order_qt_show_partition """ show partitions from mc_parts """ + } } diff --git a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy index 42aef6a2928a73..bd089693d1969d 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy @@ -320,7 +320,7 @@ suite("test_max_compute_all_type", "p2,external,maxcompute,external_remote,exter String mc_catalog_name = "test_max_compute_all_type" sql """drop catalog if exists ${mc_catalog_name} """ - String defaultProject = "jz_datalake" + String defaultProject = "mc_datalake" sql """ CREATE CATALOG IF NOT EXISTS ${mc_catalog_name} PROPERTIES ( "type" = "max_compute", diff --git a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_complex_type.groovy b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_complex_type.groovy index 4f4748099bb2a2..764f0398f50c42 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_complex_type.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_complex_type.groovy @@ -25,29 +25,35 @@ arr4 ARRAY, arr5 ARRAY ); - INSERT INTO array_table VALUES(1, array(1, 2, 3), array('a', 'b', 'c'), array(1.2, 1.3), array(date('2023-05-23')), array(datetime('2023-05-23 13:55:12'))); - INSERT INTO array_table VALUES(1, array(1, 2, 3), array('a', 'b', 'c'), array(1.2, 1.3), array(date('2023-05-23')), array(datetime('2023-05-23 13:55:12'))); - INSERT INTO array_table VALUES(2, array(1, 2, 3), array('a', 'b', 'c'), array(1.2, 1.3), array(date('2023-05-23')), array(datetime('2023-05-23 13:55:12'))); - INSERT INTO array_table VALUES(3, array(1, 2, 3), array('a', 'b', 'c'), array(1.3), array(date('2023-05-23')), array(datetime('2023-05-23 13:55:12'))); + INSERT INTO array_table VALUES(1, array(1), array('a', 'b', 'c','ssadasda'), array(1.2, 1.3), array(date('2023-05-23'),date('2021-05-23')), array(datetime('2023-05-23 13:55:12'))) + ,(2, array(1, 3), array('a', 'ccccb', 'c'), array(1, 1.3), array(date('2023-05-23'),date('2023-05-26')), array(datetime('2023-05-23 13:55:12'))) + ,(3, array(1, 2, 3), array('a', 'b3', 'c2'), array(1.3), array(date('2023-05-23')), array(datetime('2023-05-23 13:55:12'))); + drop table map_table; create table map_table ( - id int. + id int, arr1 MAP, arr2 MAP ); - INSERT INTO map_table (arr1, arr2) + INSERT INTO map_table VALUES ( 1, MAP(1, 2.5, 2, 3.75), MAP(1, 'example1', 2, 'example2') ); - INSERT INTO map_table (arr1, arr2) + INSERT INTO map_table VALUES ( 2, MAP(3, 2.5, 99, 3.75), MAP(349, 'asd', 324, 'uid') + ),( + 3, + MAP(10000000, 999.5, 98889, 31.75, 1 , 2), + MAP(34900, 'assssd', 32994, 'uisd') ); + + drop table struct_table; create table struct_table ( id int, @@ -161,7 +167,7 @@ suite("test_max_compute_complex_type", "p2,external,maxcompute,external_remote,e sql """ CREATE CATALOG IF NOT EXISTS ${mc_catalog_name} PROPERTIES ( "type" = "max_compute", - "mc.default.project" = "jz_datalake", + "mc.default.project" = "mc_datalake", "mc.access_key" = "${ak}", "mc.secret_key" = "${sk}", "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api" @@ -171,12 +177,13 @@ suite("test_max_compute_complex_type", "p2,external,maxcompute,external_remote,e logger.info("catalog " + mc_catalog_name + " created") sql """switch ${mc_catalog_name};""" logger.info("switched to catalog " + mc_catalog_name) - sql """ use jz_datalake """ + sql """ use mc_datalake """ - qt_mc_q1 """ select id,arr3,arr1,arr5,arr2 from array_table order by id desc """ - qt_mc_q2 """ select arr2,arr1 from map_table order by id limit 2 """ - qt_mc_q3 """ select contact_info,user_info from struct_table order by id limit 2 """ - qt_mc_q4 """ select user_id,activity_log from nested_complex_table order by user_id limit 2 """ + order_qt_mc_q1 """ select id,arr3,arr1,arr5,arr2,arr4 from array_table order by id desc """ + order_qt_mc_q2 """ select arr2,arr1 from map_table order by id """ + order_qt_mc_q3 """ select contact_info,user_info from struct_table order by id """ + order_qt_mc_q4 """ select user_id,activity_log from nested_complex_table order by user_id""" + order_qt_mc_q5 """ select * from nested_complex_table """ sql """drop catalog ${mc_catalog_name};""" } From 15a304820900af848bde0088b24ad0302f2ec9ea Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 27 Sep 2024 18:40:46 +0800 Subject: [PATCH 2/5] fix time zone --- be/src/vec/exec/jni_connector.cpp | 1 + .../maxcompute/MaxComputeColumnValue.java | 31 +++++-------------- .../maxcompute/MaxComputeJniScanner.java | 16 ++++++++-- .../maxcompute/MaxComputeExternalCatalog.java | 1 - .../maxcompute/source/MaxComputeScanNode.java | 11 ++++--- .../maxcompute/test_max_compute_all_type.out | 26 ++++++++++++++++ .../test_max_compute_all_type.groovy | 11 +++++++ 7 files changed, 64 insertions(+), 33 deletions(-) diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 0c2485ada3bbed..aa61777213e650 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -84,6 +84,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { return Status::InternalError("Failed to get/create JVM"); } SCOPED_TIMER(_open_scanner_time); + _scanner_params.emplace("time_zone", _state->timezone_obj().name()); RETURN_IF_ERROR(_init_jni_scanner(env, batch_size)); // Call org.apache.doris.common.jni.JniScanner#open env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open); diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index 815bfac9d1e09d..d25a76cc2a9faa 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -29,11 +29,8 @@ import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMicroTZVector; import org.apache.arrow.vector.TimeStampMilliTZVector; -import org.apache.arrow.vector.TimeStampNanoTZVector; import org.apache.arrow.vector.TimeStampNanoVector; -import org.apache.arrow.vector.TimeStampSecTZVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarBinaryVector; @@ -62,6 +59,7 @@ public class MaxComputeColumnValue implements ColumnValue { private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class); private int idx; private ValueVector column; + private ZoneId timeZone; public MaxComputeColumnValue() { idx = 0; @@ -81,6 +79,10 @@ public void reset(ValueVector column) { this.idx = 0; } + public void setTimeZone(ZoneId timeZone) { + this.timeZone = timeZone; + } + @Override public boolean canGetStringAsBytes() { return true; @@ -317,28 +319,9 @@ public void unpackStruct(List structFieldIndex, List value } } - public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) { + public LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) { long timestampMillis = milliTZVector.get(index); - // time_zone DateTime set - return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault()); - } - - public static LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) { - long timestampNanos = nanoTZVector.get(index); - return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampNanos / 1_000_000_000, - timestampNanos % 1_000_000_000), ZoneId.systemDefault()); + return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), timeZone); } - public static LocalDateTime convertToLocalDateTime(TimeStampSecTZVector secTZVector, int index) { - long timestampSeconds = secTZVector.get(index); - return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampSeconds), ZoneId.systemDefault()); - } - - public static LocalDateTime convertToLocalDateTime(TimeStampMicroTZVector microTZVector, int index) { - long timestampMicros = microTZVector.get(index); - long seconds = timestampMicros / 1_000_000; - long nanos = (timestampMicros % 1_000_000) * 1_000; - - return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanos), ZoneId.systemDefault()); - } } diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index c72153c3449aac..0e5f252c3eee31 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -40,6 +40,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; +import java.time.ZoneId; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -64,7 +65,7 @@ public class MaxComputeJniScanner extends JniScanner { private static final String SPLIT_SIZE = "split_size"; private static final String SESSION_ID = "session_id"; private static final String SCAN_SERIALIZER = "scan_serializer"; - + private static final String TIME_ZONE = "time_zone"; private enum SplitType { BYTE_SIZE, @@ -86,7 +87,7 @@ private enum SplitType { private long startOffset = -1L; private long splitSize = -1L; public EnvironmentSettings settings; - + public ZoneId timeZone; public MaxComputeJniScanner(int batchSize, Map params) { String[] requiredFields = params.get("required_fields").split(","); @@ -117,6 +118,14 @@ public MaxComputeJniScanner(int batchSize, Map params) { project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'."); table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'."); sessionId = Objects.requireNonNull(params.get(SESSION_ID), "required property '" + SESSION_ID + "'."); + String timeZoneName = Objects.requireNonNull(params.get(TIME_ZONE), "required property '" + TIME_ZONE + "'."); + try { + timeZone = ZoneId.of(timeZoneName); + } catch (Exception e) { + LOG.info(e.getMessage()); + LOG.info("set timeZoneName = " + timeZoneName + "fail, use systemDefault."); + timeZone = ZoneId.systemDefault(); + } Account account = new AliyunAccount(accessKey, secretKey); @@ -172,7 +181,7 @@ public void open() throws IOException { LOG.info("createArrowReader failed.", e); } catch (Exception e) { close(); - throw new IOException(e); + throw new IOException(e.getMessage(), e); } } @@ -192,6 +201,7 @@ protected int getNext() throws IOException { return 0; } columnValue = new MaxComputeColumnValue(); + columnValue.setTimeZone(timeZone); int expectedRows = batchSize; return readVectors(expectedRows); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index b20546bf67e6a3..0dbb3e12a85c8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -106,7 +106,6 @@ protected void generatorEndpoint() { and MCProperties.ENDPOINT is a REQUIRED_PROPERTIES in current versions, `else {}` is not needed here. */ - catalogProperty.addProperty(MCProperties.ENDPOINT, endpoint); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index 24e8e5ec4e2db9..f8f822cb89e257 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -363,11 +363,12 @@ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws A ScalarType dstType = ScalarType.createDateV2Type(); return " \"" + dateLiteral.getStringValue(dstType) + "\" "; } - case DATETIME: { - DateLiteral dateLiteral = (DateLiteral) literalExpr; - ScalarType dstType = ScalarType.createDatetimeV2Type(3); - return " \"" + dateLiteral.getStringValue(dstType) + "\" "; - } + // case DATETIME: { + // Need to consider which time zone the datetime is passed in. + // DateLiteral dateLiteral = (DateLiteral) literalExpr; + // ScalarType dstType = ScalarType.createDatetimeV2Type(3); + // return " \"" + dateLiteral.getStringValue(dstType) + "\" "; + // } case TIMESTAMP_NTZ: { DateLiteral dateLiteral = (DateLiteral) literalExpr; ScalarType dstType = ScalarType.createDatetimeV2Type(6); diff --git a/regression-test/data/external_table_p2/maxcompute/test_max_compute_all_type.out b/regression-test/data/external_table_p2/maxcompute/test_max_compute_all_type.out index bff928fdedb437..4431c7b8a5871e 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_max_compute_all_type.out +++ b/regression-test/data/external_table_p2/maxcompute/test_max_compute_all_type.out @@ -316,6 +316,32 @@ t_array_string_ending_with_nulls array Yes true \N 3 2024-03-25T12:00 4 2024-03-21T12:00 +-- !test_2_52 -- +1 2024-03-25T04:00 +2 2024-03-20T04:00 +3 2024-03-25T04:00 +4 2024-03-21T04:00 + +-- !test_2_53 -- +2 2024-03-20T04:00 +4 2024-03-21T04:00 + +-- !test_2_54 -- +1 2024-03-25T04:00 +3 2024-03-25T04:00 + +-- !test_2_55 -- + +-- !test_2_56 -- +2 2024-03-20T04:00 +4 2024-03-21T04:00 + +-- !test_2_57 -- +1 2024-03-25T04:00 +2 2024-03-20T04:00 +3 2024-03-25T04:00 +4 2024-03-21T04:00 + -- !test_58 -- 1 2024-03-25T12:00:00.123456 2 2024-03-20T12:00:00.123456 diff --git a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy index bd089693d1969d..875f3b97c3a982 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_all_type.groovy @@ -412,6 +412,8 @@ suite("test_max_compute_all_type", "p2,external,maxcompute,external_remote,exter + sql """ set time_zone = "Asia/Shanghai" """ + qt_test_52 """ select id,datetime_col from ${table_name} order by id """ qt_test_53 """ select id,datetime_col from ${table_name} where datetime_col != "2024-03-25 12:00:00" order by id """ qt_test_54 """ select id,datetime_col from ${table_name} where datetime_col = "2024-03-25 12:00:00" order by id """ @@ -419,7 +421,16 @@ suite("test_max_compute_all_type", "p2,external,maxcompute,external_remote,exter qt_test_56 """ select id,datetime_col from ${table_name} where datetime_col < "2024-03-25 12:00:00" order by id """ qt_test_57 """ select id,datetime_col from ${table_name} where datetime_col <= "2024-03-25 12:00:00" order by id """ + sql """ set time_zone = "UTC" """ + + qt_test_2_52 """ select id,datetime_col from ${table_name} order by id """ + qt_test_2_53 """ select id,datetime_col from ${table_name} where datetime_col != "2024-03-25 04:00:00" order by id """ + qt_test_2_54 """ select id,datetime_col from ${table_name} where datetime_col = "2024-03-25 04:00:00" order by id """ + qt_test_2_55 """ select id,datetime_col from ${table_name} where datetime_col > "2024-03-25 04:00:00" order by id """ + qt_test_2_56 """ select id,datetime_col from ${table_name} where datetime_col < "2024-03-25 04:00:00" order by id """ + qt_test_2_57 """ select id,datetime_col from ${table_name} where datetime_col <= "2024-03-25 04:00:00" order by id """ + sql """ set time_zone = "Asia/Shanghai" """ qt_test_58 """ select id,timestamp_ntz_col2 from ${table_name} order by id """ qt_test_59 """ select id,timestamp_ntz_col2 from ${table_name} where timestamp_ntz_col2 != "2024-03-25 12:00:00.123456" order by id """ From 601635fc9b047407dcc7dca109f46f0dc3178fd8 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 27 Sep 2024 18:44:05 +0800 Subject: [PATCH 3/5] fix error --- .../doris/datasource/maxcompute/MaxComputeExternalCatalog.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index 0dbb3e12a85c8d..551a7e21a40a8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -115,7 +115,6 @@ protected void initLocalObjectsImpl() { generatorEndpoint(); - endpoint = props.get(MCProperties.ENDPOINT); defaultProject = props.get(MCProperties.PROJECT); quota = props.getOrDefault(MCProperties.QUOTA, MCProperties.DEFAULT_QUOTA); From 754f1225637927a70253070ec2211592b6dbc5bc Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sun, 29 Sep 2024 20:40:10 +0800 Subject: [PATCH 4/5] done. --- .../maxcompute/MaxComputeColumnValue.java | 14 +++-- .../maxcompute/MaxComputeJniScanner.java | 3 +- .../maxcompute/MaxComputeExternalCatalog.java | 63 ++++++++++++++++++- .../maxcompute/source/MaxComputeScanNode.java | 42 +++++++++---- fe/pom.xml | 2 +- 5 files changed, 104 insertions(+), 20 deletions(-) diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index d25a76cc2a9faa..fb29cb89fa8b08 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -74,6 +74,12 @@ public MaxComputeColumnValue(ValueVector valueVector, int i) { this.idx = i; } + public MaxComputeColumnValue(ValueVector valueVector, int i, ZoneId timeZone) { + this.column = valueVector; + this.idx = i; + this.timeZone = timeZone; + } + public void reset(ValueVector column) { this.column = column; this.idx = 0; @@ -286,7 +292,7 @@ public void unpackArray(List values) { int elemSize = listCol.getElementEndIndex(idx) - listCol.getElementStartIndex(idx); int offset = listCol.getElementStartIndex(idx); for (int i = 0; i < elemSize; i++) { - MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset); + MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset, timeZone); values.add(val); offset++; } @@ -301,9 +307,9 @@ public void unpackMap(List keys, List values) { FieldVector keyList = innerCols.get(0); FieldVector valList = innerCols.get(1); for (int i = 0; i < elemSize; i++) { - MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset); + MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset, timeZone); keys.add(key); - MaxComputeColumnValue val = new MaxComputeColumnValue(valList, offset); + MaxComputeColumnValue val = new MaxComputeColumnValue(valList, offset, timeZone); values.add(val); offset++; } @@ -314,7 +320,7 @@ public void unpackStruct(List structFieldIndex, List value StructVector structCol = (StructVector) column; List innerCols = structCol.getChildrenFromFields(); for (Integer fieldIndex : structFieldIndex) { - MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx); + MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx, timeZone); values.add(val); } } diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 0e5f252c3eee31..6cbed70adc7d46 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -122,8 +122,7 @@ public MaxComputeJniScanner(int batchSize, Map params) { try { timeZone = ZoneId.of(timeZoneName); } catch (Exception e) { - LOG.info(e.getMessage()); - LOG.info("set timeZoneName = " + timeZoneName + "fail, use systemDefault."); + LOG.warn(e.getMessage() + " Set timeZoneName = " + timeZoneName + "fail, use systemDefault."); timeZone = ZoneId.systemDefault(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index 551a7e21a40a8b..637980287a2e96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -40,14 +40,19 @@ import com.google.common.collect.ImmutableList; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import jdk.internal.org.jline.utils.Log; +import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class MaxComputeExternalCatalog extends ExternalCatalog { + // you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api"; private Odps odps; @@ -64,11 +69,41 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { private long splitRowCount; private long splitByteSize; + private static final Map REGION_ZONE_MAP; private static final List REQUIRED_PROPERTIES = ImmutableList.of( MCProperties.PROJECT, MCProperties.ENDPOINT ); + static { + Map map = new HashMap<>(); + + map.put("cn-hangzhou", ZoneId.of("Asia/Shanghai")); + map.put("cn-shanghai", ZoneId.of("Asia/Shanghai")); + map.put("cn-shanghai-finance-1", ZoneId.of("Asia/Shanghai")); + map.put("cn-beijing", ZoneId.of("Asia/Shanghai")); + map.put("cn-north-2-gov-1", ZoneId.of("Asia/Shanghai")); + map.put("cn-zhangjiakou", ZoneId.of("Asia/Shanghai")); + map.put("cn-wulanchabu", ZoneId.of("Asia/Shanghai")); + map.put("cn-shenzhen", ZoneId.of("Asia/Shanghai")); + map.put("cn-shenzhen-finance-1", ZoneId.of("Asia/Shanghai")); + map.put("cn-chengdu", ZoneId.of("Asia/Shanghai")); + map.put("cn-hongkong", ZoneId.of("Asia/Shanghai")); + map.put("ap-southeast-1", ZoneId.of("Asia/Singapore")); + map.put("ap-southeast-2", ZoneId.of("Australia/Sydney")); + map.put("ap-southeast-3", ZoneId.of("Asia/Kuala_Lumpur")); + map.put("ap-southeast-5", ZoneId.of("Asia/Jakarta")); + map.put("ap-northeast-1", ZoneId.of("Asia/Tokyo")); + map.put("eu-central-1", ZoneId.of("Europe/Berlin")); + map.put("eu-west-1", ZoneId.of("Europe/London")); + map.put("us-west-1", ZoneId.of("America/Los_Angeles")); + map.put("us-east-1", ZoneId.of("America/New_York")); + map.put("me-east-1", ZoneId.of("Asia/Dubai")); + + REGION_ZONE_MAP = Collections.unmodifiableMap(map); + } + + public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map props, String comment) { super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE, comment); @@ -80,15 +115,19 @@ protected void generatorEndpoint() { Map props = catalogProperty.getProperties(); if (props.containsKey(MCProperties.ENDPOINT)) { + // This is a new version of the property, so no parsing conversion is required. endpoint = props.get(MCProperties.ENDPOINT); - return; } else if (props.containsKey(MCProperties.TUNNEL_SDK_ENDPOINT)) { + // If customized `mc.tunnel_endpoint` before, + // need to convert the value of this property because used the `tunnel API` before. String tunnelEndpoint = props.get(MCProperties.TUNNEL_SDK_ENDPOINT); endpoint = tunnelEndpoint.replace("//dt", "//service") + "/api"; } else if (props.containsKey(MCProperties.ODPS_ENDPOINT)) { + // If you customized `mc.odps_endpoint` before, + // this value is equivalent to the new version of `mc.endpoint`, so you can use it directly endpoint = props.get(MCProperties.ODPS_ENDPOINT); } else if (props.containsKey(MCProperties.REGION)) { - //Copied from original logic + //Copied from original logic. String region = props.get(MCProperties.REGION); if (region.startsWith("oss-")) { // may use oss-cn-beijing, ensure compatible @@ -263,6 +302,26 @@ public String getDefaultProject() { return defaultProject; } + public ZoneId getProjectDateTimeZone() { + makeSureInitialized(); + + String[] endpointSplit = endpoint.split("\\."); + if (endpointSplit.length >= 2) { + // http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api => cn-hangzhou-vpc + String regionAndSuffix = endpointSplit[1]; + + //remove `-vpc` and `-intranet` suffix. + String region = regionAndSuffix.replace("-vpc", "").replace("-intranet", ""); + if (REGION_ZONE_MAP.containsKey(region)) { + return REGION_ZONE_MAP.get(region); + } + Log.warn("Not exist region. region = " + region + ". endpoint = " + endpoint + ". use systemDefault."); + return ZoneId.systemDefault(); + } + Log.warn("Split EndPoint " + endpoint + "fill. use systemDefault."); + return ZoneId.systemDefault(); + } + public String getQuota() { return quota; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index f8f822cb89e257..856bcfff1614fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -40,6 +40,7 @@ import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType; import org.apache.doris.datasource.property.constants.MCProperties; +import org.apache.doris.nereids.util.DateUtils; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; @@ -64,6 +65,10 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Base64; import java.util.Collections; @@ -169,8 +174,8 @@ protected Predicate convertPredicate() { try { odpsPredicates.add(convertExprToOdpsPredicate(dorisPredicate)); } catch (AnalysisException e) { - Log.info("Failed to convert predicate " + dorisPredicate); - Log.info("Reason: " + e.getMessage()); + Log.warn("Failed to convert predicate " + dorisPredicate.toString() + "Reason: " + + e.getMessage()); } } @@ -222,9 +227,6 @@ private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException } else if (expr instanceof InPredicate) { InPredicate inPredicate = (InPredicate) expr; - if (inPredicate.getChildren().size() > 2) { - throw new AnalysisException("InPredicate must contain at most 1 children"); - } com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator odpsOp = inPredicate.isNotIn() ? com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.IN @@ -363,12 +365,13 @@ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws A ScalarType dstType = ScalarType.createDateV2Type(); return " \"" + dateLiteral.getStringValue(dstType) + "\" "; } - // case DATETIME: { - // Need to consider which time zone the datetime is passed in. - // DateLiteral dateLiteral = (DateLiteral) literalExpr; - // ScalarType dstType = ScalarType.createDatetimeV2Type(3); - // return " \"" + dateLiteral.getStringValue(dstType) + "\" "; - // } + case DATETIME: { + DateLiteral dateLiteral = (DateLiteral) literalExpr; + ScalarType dstType = ScalarType.createDatetimeV2Type(3); + + return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType), + ((MaxComputeExternalCatalog) table.getCatalog()).getProjectDateTimeZone()) + "\" "; + } case TIMESTAMP_NTZ: { DateLiteral dateLiteral = (DateLiteral) literalExpr; ScalarType dstType = ScalarType.createDatetimeV2Type(6); @@ -381,6 +384,23 @@ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws A throw new AnalysisException("Do not support convert odps type [" + odpsType + "] to odps values."); } + + public static String convertDateTimezone(String dateTimeStr, ZoneId toZone) { + if (DateUtils.getTimeZone().equals(toZone)) { + return dateTimeStr; + } + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + LocalDateTime localDateTime = LocalDateTime.parse(dateTimeStr, formatter); + + ZonedDateTime sourceZonedDateTime = localDateTime.atZone(DateUtils.getTimeZone()); + ZonedDateTime targetZonedDateTime = sourceZonedDateTime.withZoneSameInstant(toZone); + + return targetZonedDateTime.format(formatter); + } + + + @Override public TFileFormatType getFileFormatType() { return TFileFormatType.FORMAT_JNI; diff --git a/fe/pom.xml b/fe/pom.xml index 29b2b61530a935..1e6c9a52a4578b 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -312,7 +312,7 @@ under the License. 1.4.3 - 0.48.8-public + 0.49.0-public 1.11.3 17.0.0 From 1bb3914bd3b20ac4a3208e10d2382e4540b72cf4 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sun, 29 Sep 2024 23:14:18 +0800 Subject: [PATCH 5/5] fix log --- .../datasource/maxcompute/MaxComputeExternalCatalog.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index 637980287a2e96..e6cd77103dbc3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -40,7 +40,7 @@ import com.google.common.collect.ImmutableList; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import jdk.internal.org.jline.utils.Log; +import org.apache.log4j.Logger; import java.time.ZoneId; import java.util.ArrayList; @@ -52,6 +52,8 @@ import java.util.stream.Collectors; public class MaxComputeExternalCatalog extends ExternalCatalog { + private static final Logger LOG = Logger.getLogger(MaxComputeExternalCatalog.class); + // you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api"; @@ -315,10 +317,10 @@ public ZoneId getProjectDateTimeZone() { if (REGION_ZONE_MAP.containsKey(region)) { return REGION_ZONE_MAP.get(region); } - Log.warn("Not exist region. region = " + region + ". endpoint = " + endpoint + ". use systemDefault."); + LOG.warn("Not exist region. region = " + region + ". endpoint = " + endpoint + ". use systemDefault."); return ZoneId.systemDefault(); } - Log.warn("Split EndPoint " + endpoint + "fill. use systemDefault."); + LOG.warn("Split EndPoint " + endpoint + "fill. use systemDefault."); return ZoneId.systemDefault(); }