From abb35918a919ac49d7c544bcf2af7a0d5ff96a7d Mon Sep 17 00:00:00 2001 From: gaoxin Date: Fri, 8 Sep 2023 08:00:48 +0800 Subject: [PATCH 1/2] fix hudi --- .../vec/exec/format/table/hudi_jni_reader.cpp | 7 ++++++- .../vec/exec/format/table/hudi_jni_reader.h | 1 + docs/en/docs/lakehouse/multi-catalog/hudi.md | 3 +++ .../docs/lakehouse/multi-catalog/hudi.md | 4 ++++ .../doris/hudi/HoodieRecordIterator.scala | 5 +++-- .../planner/external/hudi/HudiScanNode.java | 19 +++++++++++-------- 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index 029135ac670bd2..bd6b40f3f1a232 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -36,6 +36,7 @@ class Block; namespace doris::vectorized { +const std::string HudiJniReader::HOODIE_CONF_PREFIX = "hoodie."; const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf."; HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, @@ -67,7 +68,11 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, // Use compatible hadoop client to read data for (auto& kv : _scan_params.properties) { - params[HADOOP_CONF_PREFIX + kv.first] = kv.second; + if (kv.first.starts_with(HOODIE_CONF_PREFIX)) { + params[kv.first] = kv.second; + } else { + params[HADOOP_CONF_PREFIX + kv.first] = kv.second; + } } _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", params, diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index bf2dab943d8d80..c0438e93289063 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -46,6 +46,7 @@ class HudiJniReader : public GenericReader { ENABLE_FACTORY_CREATOR(HudiJniReader); public: + static const std::string HOODIE_CONF_PREFIX; static const std::string HADOOP_CONF_PREFIX; HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params, diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md b/docs/en/docs/lakehouse/multi-catalog/hudi.md index 4c46ccb0e143f4..52892db2df2174 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md @@ -59,6 +59,9 @@ CREATE CATALOG hudi PROPERTIES ( Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md). +## Skip Merge +Spark will create the read optimize table with `_ro` suffix when generating hudi mor table. Doris will skip the log files when reading optimize table. Doris does not determine whether a table is read optimize by the `_ro` suffix instead of the hive inputformat. Users can observe whether the inputformat of the 'cow/mor/read optimize' table is the same through the `SHOW CREATE TABLE` command. In addition, Doris supports adding hoodie related configurations to catalog properties, which are compatible with [Spark Datasource Configs](https://hudi.apache.org/docs/configurations/#Read-Options), so users can add `hoodie.datasource.merge.type=skip_merge` in catalog properties to skip merge logs files. + ## Query Optimization Doris uses the parquet native reader to read the data files of the COW table, and uses the Java SDK (By calling hudi-bundle through JNI) to read the data files of the MOR table. In `upsert` scenario, there may still remains base files that have not been updated in the MOR table, which can be read through the parquet native reader. Users can view the execution plan of hudi scan through the [explain](../../advanced/best-practice/query-analysis.md) command, where `hudiNativeReadSplits` indicates how many split files are read through the parquet native reader. ``` diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md index 228be874203ad3..b619283cacf5bc 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md @@ -59,6 +59,10 @@ CREATE CATALOG hudi PROPERTIES ( 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。 +## Skip Merge +Spark 在创建 hudi mor 表的时候,会创建 `_ro` 后缀的 read optimize 表,doris 读取 read optimize 表会跳过 log 文件的合并。doris 判定一个表是否为 read optimize 表并不是通过 `_ro` 后缀,而是通过 hive inputformat,用户可以通过 `SHOW CREATE TABLE` 命令观察 cow/mor/read optimize 表的 inputformat 是否相同。 +此外 doris 支持在 catalog properties 添加 hoodie 相关的配置,配置项兼容 [Spark Datasource Configs](https://hudi.apache.org/docs/configurations/#Read-Options)。所以用户可以在 catalog properties 中添加 `hoodie.datasource.merge.type=skip_merge` 跳过合并 log 文件。 + ## 查询优化 Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java SDK(通过JNI调用hudi-bundle) 读取 MOR 表的数据文件。在 upsert 场景下,MOR 依然会有数据文件没有被更新,这部分文件可以通过 parquet native reader读取,用户可以通过 [explain](../../advanced/best-practice/query-analysis.md) 命令查看 hudi scan 的执行计划,`hudiNativeReadSplits` 表示有多少 split 文件通过 parquet native reader 读取。 diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala index c5645655355708..6e2b7b31e547bc 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala @@ -98,8 +98,9 @@ class HoodieMORRecordIterator(config: Configuration, case split => mergeType match { case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => - val reader = fileReaders.requiredSchemaReaderSkipMerging - new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config) + // val reader = fileReaders.requiredSchemaReaderSkipMerging + // new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config) + throw new UnsupportedOperationException("Skip merge is optimized by native read") case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => val reader = pickBaseFileReader() diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index c92c46659eee3e..328c7b0f19c7ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -79,7 +79,7 @@ public class HudiScanNode extends HiveScanNode { private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); - private final boolean isCowTable; + private final boolean isCowOrRoTable; private final AtomicLong noLogsSplitNum = new AtomicLong(0); @@ -91,9 +91,10 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); - isCowTable = hmsTable.isHoodieCowTable(); - if (isCowTable) { - LOG.debug("Hudi table {} can read as cow table", hmsTable.getName()); + isCowOrRoTable = hmsTable.isHoodieCowTable() || "skip_merge".equals( + hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type")); + if (isCowOrRoTable) { + LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName()); } else { LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName()); } @@ -101,7 +102,7 @@ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumn @Override public TFileFormatType getFileFormatType() throws UserException { - if (isCowTable) { + if (isCowOrRoTable) { return super.getFileFormatType(); } else { // Use jni to read hudi table in BE @@ -124,7 +125,7 @@ protected void doInitialize() throws UserException { @Override protected Map getLocationProperties() throws UserException { - if (isCowTable) { + if (isCowOrRoTable) { return super.getLocationProperties(); } else { // HudiJniScanner uses hadoop client to read data. @@ -291,7 +292,7 @@ public List getSplits() throws UserException { HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, timeline, statuses.toArray(new FileStatus[0])); - if (isCowTable) { + if (isCowOrRoTable) { fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); @@ -312,7 +313,9 @@ public List getSplits() throws UserException { noLogsSplitNum.incrementAndGet(); } - HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, + // no base file, use log file to parse file type + String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath; + HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize, new String[0], partition.getPartitionValues()); split.setTableFormatType(TableFormatType.HUDI); split.setDataFilePath(filePath); From 05d7ee13965f0f3e6205c8c9303889362806382d Mon Sep 17 00:00:00 2001 From: gaoxin Date: Fri, 8 Sep 2023 09:19:59 +0800 Subject: [PATCH 2/2] fix column --- .../org/apache/doris/hudi/HudiJniScanner.java | 16 +++-- .../apache/doris/hudi/BaseSplitReader.scala | 48 +++++++++------ .../external_table_p2/hive/test_hive_hudi.out | 25 ++++++++ .../hive/test_hive_hudi.groovy | 58 +++++++++++++++++++ 4 files changed, 123 insertions(+), 24 deletions(-) create mode 100644 regression-test/data/external_table_p2/hive/test_hive_hudi.out create mode 100644 regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 417b338115c2da..64c4fd70e7b542 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -86,6 +86,9 @@ public class HudiJniScanner extends JniScanner { static { int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4); + if (numThreads > 32) { + numThreads = Runtime.getRuntime().availableProcessors(); + } avroReadPool = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build()); LOG.info("Create " + numThreads + " daemon threads to load avro logs"); @@ -176,10 +179,15 @@ public void open() throws IOException { if (ugi != null) { recordIterator = ugi.doAs( (PrivilegedExceptionAction>) () -> new MORSnapshotSplitReader( - split).buildScanIterator(split.requiredFields(), new Filter[0])); + split).buildScanIterator(new Filter[0])); } else { recordIterator = new MORSnapshotSplitReader(split) - .buildScanIterator(split.requiredFields(), new Filter[0]); + .buildScanIterator(new Filter[0]); + } + if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) { + cachedResolvers.computeIfAbsent(Thread.currentThread().getId(), + threadId -> AVRO_RESOLVER_CACHE.get()); + AVRO_RESOLVER_CACHE.get().clear(); } } catch (Exception e) { LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e); @@ -189,10 +197,6 @@ public void open() throws IOException { } isKilled.set(true); executorService.shutdownNow(); - if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) { - cachedResolvers.computeIfAbsent(Thread.currentThread().getId(), - threadId -> AVRO_RESOLVER_CACHE.get()); - } getRecordReaderTimeNs += System.nanoTime() - startTime; }); try { diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index 5ba16a5e164067..3c10f8a4cd7208 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -153,6 +153,8 @@ case class HoodieTableInformation(sparkSession: SparkSession, metaClient: HoodieTableMetaClient, timeline: HoodieTimeline, tableConfig: HoodieTableConfig, + resolvedTargetFields: Array[String], + tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) /** @@ -214,22 +216,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) { * required to fetch table's Avro and Internal schemas */ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { - val schemaResolver = new TableSchemaResolver(tableInformation.metaClient) - val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) - val avroSchema: Schema = tableInformation.internalSchemaOpt.map { is => - AvroInternalSchemaConverter.convert(is, namespace + "." + name) - } orElse { - specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) - } orElse { - split.schemaSpec.map(s => convertToAvroSchema(s, tableName)) - } getOrElse { - Try(schemaResolver.getTableAvroSchema) match { - case Success(schema) => schema - case Failure(e) => - throw new HoodieSchemaException("Failed to fetch schema from the table", e) - } - } - (avroSchema, tableInformation.internalSchemaOpt) + (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt) } protected lazy val tableStructSchema: StructType = convertAvroSchemaToStructType(tableAvroSchema) @@ -280,13 +267,13 @@ abstract class BaseSplitReader(val split: HoodieSplit) { sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") } - def buildScanIterator(requiredColumns: Array[String], filters: Array[Filter]): Iterator[InternalRow] = { + def buildScanIterator(filters: Array[Filter]): Iterator[InternalRow] = { // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES // *Appending* additional columns to the ones requested by the caller is not a problem, as those // will be eliminated by the caller's projection; // (!) Please note, however, that it's critical to avoid _reordering_ of the requested columns as this // will break the upstream projection - val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns) + val targetColumns: Array[String] = appendMandatoryColumns(tableInformation.resolvedTargetFields) // NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions @@ -663,11 +650,36 @@ object BaseSplitReader { None } } + val tableName = metaClient.getTableConfig.getTableName + val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) + val avroSchema: Schema = internalSchemaOpt.map { is => + AvroInternalSchemaConverter.convert(is, namespace + "." + name) + } orElse { + specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) + } orElse { + split.schemaSpec.map(s => convertToAvroSchema(s, tableName)) + } getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + throw new HoodieSchemaException("Failed to fetch schema from the table", e) + } + } + + // match column name in lower case + val colNames = internalSchemaOpt.map { internalSchema => + internalSchema.getAllColsFullName.asScala.map(f => f.toLowerCase -> f).toMap + } getOrElse { + avroSchema.getFields.asScala.map(f => f.name().toLowerCase -> f.name()).toMap + } + val resolvedTargetFields = split.requiredFields.map(field => colNames.getOrElse(field.toLowerCase, field)) HoodieTableInformation(sparkSession, metaClient, timeline, metaClient.getTableConfig, + resolvedTargetFields, + avroSchema, internalSchemaOpt) } } diff --git a/regression-test/data/external_table_p2/hive/test_hive_hudi.out b/regression-test/data/external_table_p2/hive/test_hive_hudi.out new file mode 100644 index 00000000000000..a695d3cdb7d1f0 --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_hive_hudi.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !optimize_table -- +20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001 +20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004 + +-- !merge_on_read -- +20230801201335031 20230801201335031_0_1 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0 row_1 2021-01-01 0 bob v_0 toBeDel0 1 1000000 +20230801201335031 20230801201335031_1_1 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0 row_1 2011-11-11 1 bob v_1 toBeDel1 1 1000001 +20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004 + +-- !lowercase_column -- +row_1 2021-01-01 0 v_0 +row_1 2011-11-11 1 v_1 +row_2 2021-01-01 0 v_0 +row_4 2021-02-01 4 v_4 + +-- !skip_merge -- +20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001 +20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004 + diff --git a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy new file mode 100644 index 00000000000000..abdd5b34dcbca5 --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_hudi", "p2,external,hive,hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_hive_hudi" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'hadoop.username'='hadoop', + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + + sql """use ${catalog_name}.hudi_catalog""" + // read optimize table with partition + qt_optimize_table """select * from partitioned_mor_ro order by rowid, versionid""" + // copy on write table with update + qt_merge_on_read """select * from partitioned_mor_rt order by rowid, versionid""" + // match colum name in lower case + qt_lowercase_column """select RoWiD, PaRtiTionID, PrEComB, VerSIonID from partitioned_mor_rt order by rowid, versionid""" + + + // skip logs + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'hadoop.username'='hadoop', + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}', + 'hoodie.datasource.merge.type'='skip_merge' + ); + """ + // copy on write table with update, skip merge logs, so the result is the same as partitioned_mor_ro + qt_skip_merge """select * from partitioned_mor_rt order by rowid, versionid""" + + sql """drop catalog if exists ${catalog_name};""" + } +}