diff --git a/README.md b/README.md index 9cd9144ba11e1a..0cd386c0c53bb7 100644 --- a/README.md +++ b/README.md @@ -70,8 +70,8 @@ apachedoris/doris-dev build-env-1.3 c9665fbee395 5 days ago > |---|---|---| > | apachedoris/doris-dev:build-env | before [ff0dd0d](https://github.com/apache/incubator-doris/commit/ff0dd0d2daa588f18b6db56f947e813a56d8ec81) | 0.8.x, 0.9.x | > | apachedoris/doris-dev:build-env-1.1 | [ff0dd0d](https://github.com/apache/incubator-doris/commit/ff0dd0d2daa588f18b6db56f947e813a56d8ec81) or later | 0.10.x or 0.11.x | -> | apachedoris/doris-dev:build-env-1.2 | [1648226](https://github.com/apache/incubator-doris/commit/1648226927c5b4e33f33ce2e12bf0e06369b7f6e) or later | 0.12.x or 0.13 | -> | apachedoris/doris-dev:build-env-1.3 | [ad67dd3](https://github.com/apache/incubator-doris/commit/ad67dd34a04c1ca960cff38e5b335b30fc7d559f) or later | 0.14.x or later | +> | apache/incubator-doris:build-env-1.2 | [4ef5a8c](https://github.com/apache/incubator-doris/commit/4ef5a8c8560351d7fff7ff8fd51c4c7a75e006a8) | 0.12.x - 0.14.0 | +> | apache/incubator-doris:build-env-1.3 | [ad67dd3](https://github.com/apache/incubator-doris/commit/ad67dd34a04c1ca960cff38e5b335b30fc7d559f) | later version | diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index ce208d8abc6e4b..261a167fa95402 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -294,17 +294,20 @@ set_target_properties(aws-s2n PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib add_library(minzip STATIC IMPORTED) set_target_properties(minzip PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libminizip.a) -add_library(hdfs3 STATIC IMPORTED) -set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libhdfs3.a) +if (ARCH_AMD64) + # libhdfs3 only support x86 or amd64 + add_library(hdfs3 STATIC IMPORTED) + set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libhdfs3.a) -add_library(gsasl STATIC IMPORTED) -set_target_properties(gsasl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libgsasl.a) + add_library(gsasl STATIC IMPORTED) + set_target_properties(gsasl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libgsasl.a) -add_library(xml2 STATIC IMPORTED) -set_target_properties(xml2 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libxml2.a) + add_library(xml2 STATIC IMPORTED) + set_target_properties(xml2 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libxml2.a) -add_library(lzma STATIC IMPORTED) -set_target_properties(lzma PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/liblzma.a) + add_library(lzma STATIC IMPORTED) + set_target_properties(lzma PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/liblzma.a) +endif() find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin) @@ -434,7 +437,7 @@ set(WL_END_GROUP "-Wl,--end-group") set(AWS_LIBS aws-sdk-s3 aws-sdk-core aws-checksums aws-c-io aws-c-event-stream aws-c-common aws-c-cal aws-s2n) -# Set Palo libraries +# Set Doris libraries set(DORIS_LINK_LIBS ${WL_START_GROUP} Agent @@ -459,10 +462,10 @@ set(DORIS_LINK_LIBS ${WL_END_GROUP} ) -# Set thirdparty libraries -set(DORIS_DEPENDENCIES - ${DORIS_DEPENDENCIES} - ${WL_START_GROUP} +# COMMON_THIRDPARTY are thirdparty dependencies that can run on all platform +# When adding new dependencies, If you don’t know if it can run on all platforms, +# add it here first. +set(COMMON_THIRDPARTY rocksdb librdkafka_cpp librdkafka @@ -506,14 +509,35 @@ set(DORIS_DEPENDENCIES odbc cctz minzip + ${AWS_LIBS} +) + +# thirdparties dependescies that can only run on X86 platform +set(X86_DEPENDENCIES + ${COMMON_THIRDPARTY} hdfs3 gsasl xml2 lzma - ${AWS_LIBS} - ${WL_END_GROUP} ) +if(ARCH_AARCH64) + # Set thirdparty libraries + set(DORIS_DEPENDENCIES + ${DORIS_DEPENDENCIES} + ${WL_START_GROUP} + ${COMMON_THIRDPARTY} + ${WL_END_GROUP} + ) +else() + set(DORIS_DEPENDENCIES + ${DORIS_DEPENDENCIES} + ${WL_START_GROUP} + ${X86_DEPENDENCIES} + ${WL_END_GROUP} + ) +endif() + if(WITH_LZO) set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} lzo @@ -526,12 +550,15 @@ if (WITH_MYSQL) ) endif() +message(STATUS "DORIS_DEPENDENCIES is ${DORIS_DEPENDENCIES}") + # Add all external dependencies. They should come after the palo libs. # static link gcc's lib set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} ${DORIS_DEPENDENCIES} -static-libstdc++ -static-libgcc + -lstdc++fs ) if ("${CMAKE_BUILD_TYPE}" STREQUAL "BCC") diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index c958c67c9f5cc5..27ef5ed911e10c 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -40,7 +40,6 @@ set(EXEC_FILES hash_join_node.cpp hash_join_node_ir.cpp hash_table.cpp - hdfs_file_reader.cpp local_file_reader.cpp merge_node.cpp merge_join_node.cpp @@ -107,6 +106,13 @@ set(EXEC_FILES s3_writer.cpp ) +if (ARCH_AMD64) + set(EXEC_FILES + ${EXEC_FILES} + hdfs_file_reader.cpp + ) +endif() + if (WITH_MYSQL) set(EXEC_FILES ${EXEC_FILES} diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 5d57a703c7afff..6287705bed8641 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -24,7 +24,6 @@ #include "exec/buffered_reader.h" #include "exec/decompressor.h" #include "exec/exec_node.h" -#include "exec/hdfs_file_reader.h" #include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" #include "exec/s3_reader.h" @@ -40,6 +39,10 @@ #include "runtime/tuple.h" #include "util/utf8_check.h" +#if defined(__x86_64__) + #include "exec/hdfs_file_reader.h" +#endif + namespace doris { BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, @@ -163,12 +166,16 @@ Status BrokerScanner::open_file_reader() { break; } case TFileType::FILE_HDFS: { +#if defined(__x86_64__) BufferedReader* file_reader = new BufferedReader(new HdfsFileReader(range.hdfs_params, range.path, start_offset), config::remote_storage_read_buffer_mb * 1024 * 1024); RETURN_IF_ERROR(file_reader->open()); _cur_file_reader = file_reader; break; +#else + return Status::InternalError("HdfsFileReader do not support on non x86 platform"); +#endif } case TFileType::FILE_BROKER: { BrokerReader* broker_reader = diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 8d0bf57be7ca16..72e460d5141221 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -36,13 +36,16 @@ #include "exprs/expr.h" #include "exec/text_converter.h" #include "exec/text_converter.hpp" -#include "exec/hdfs_file_reader.h" #include "exec/local_file_reader.h" #include "exec/broker_reader.h" #include "exec/buffered_reader.h" #include "exec/decompressor.h" #include "exec/parquet_reader.h" +#if defined(__x86_64__) + #include "exec/hdfs_file_reader.h" +#endif + namespace doris { ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, @@ -128,9 +131,13 @@ Status ParquetScanner::open_next_reader() { break; } case TFileType::FILE_HDFS: { +#if defined(__x86_64__) file_reader.reset(new HdfsFileReader( range.hdfs_params, range.path, range.start_offset)); break; +#else + return Status::InternalError("HdfsFileReader do not support on non x86 platform"); +#endif } case TFileType::FILE_BROKER: { int64_t file_size = 0; diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index c5aec41aeefcb4..a65cdf2130c800 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -20,6 +20,7 @@ #include "common/logging.h" #include "gutil/strings/substitute.h" // for Substitute #include "olap/rowset/segment_v2/bitshuffle_page.h" +#include "runtime/mem_pool.h" #include "util/slice.h" // for Slice namespace doris { @@ -238,8 +239,8 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) { // dictionary encoding DCHECK(_parsed); DCHECK(_dict_decoder != nullptr) << "dict decoder pointer is nullptr"; + if (PREDICT_FALSE(*n == 0)) { - *n = 0; return Status::OK(); } Slice* out = reinterpret_cast(dst->data()); @@ -248,21 +249,37 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) { ColumnBlock column_block(_batch.get(), dst->column_block()->pool()); ColumnBlockView tmp_block_view(&column_block); RETURN_IF_ERROR(_data_page_decoder->next_batch(n, &tmp_block_view)); - for (int i = 0; i < *n; ++i) { + const auto len = *n; + + size_t mem_len[len]; + for (int i = 0; i < len; ++i) { int32_t codeword = *reinterpret_cast(column_block.cell_ptr(i)); // get the string from the dict decoder - Slice element = _dict_decoder->string_at_index(codeword); - if (element.size > 0) { - char* destination = (char*)dst->column_block()->pool()->allocate(element.size); - if (destination == nullptr) { - return Status::MemoryAllocFailed( - strings::Substitute("memory allocate failed, size:$0", element.size)); - } - element.relocate(destination); - } - *out = element; + *out = _dict_decoder->string_at_index(codeword); + mem_len[i] = out->size; + out++; + } + + // use SIMD instruction to speed up call function `RoundUpToPowerOfTwo` + auto mem_size = 0; + for (int i = 0; i < len; ++i) { + mem_len[i] = BitUtil::RoundUpToPowerOf2Int32(mem_len[i], MemPool::DEFAULT_ALIGNMENT); + mem_size += mem_len[i]; + } + + // allocate a batch of memory and do memcpy + out = reinterpret_cast(dst->data()); + char* destination = (char*)dst->column_block()->pool()->allocate(mem_size); + if (destination == nullptr) { + return Status::MemoryAllocFailed( + strings::Substitute("memory allocate failed, size:$0", mem_size)); + } + for (int i = 0; i < len; ++i) { + out->relocate(destination); + destination += mem_len[i]; ++out; } + return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index bde3ae0dc9a826..97e7fa8bbcf622 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -29,6 +29,7 @@ #pragma once #include "common/logging.h" +#include "gutil/strings/substitute.h" #include "olap/olap_common.h" #include "olap/rowset/segment_v2/options.h" #include "olap/rowset/segment_v2/page_builder.h" @@ -193,18 +194,33 @@ class BinaryPlainPageDecoder : public PageDecoder { *n = 0; return Status::OK(); } - size_t max_fetch = std::min(*n, static_cast(_num_elems - _cur_idx)); + const size_t max_fetch = std::min(*n, static_cast(_num_elems - _cur_idx)); Slice* out = reinterpret_cast(dst->data()); - + size_t mem_len[max_fetch]; for (size_t i = 0; i < max_fetch; i++, out++, _cur_idx++) { - Slice elem(string_at_index(_cur_idx)); - out->size = elem.size; - if (elem.size != 0) { - out->data = - reinterpret_cast(dst->pool()->allocate(elem.size * sizeof(uint8_t))); - memcpy(out->data, elem.data, elem.size); - } + *out = string_at_index(_cur_idx); + mem_len[i] = out->size; + } + + // use SIMD instruction to speed up call function `RoundUpToPowerOfTwo` + auto mem_size = 0; + for (int i = 0; i < max_fetch; ++i) { + mem_len[i] = BitUtil::RoundUpToPowerOf2Int32(mem_len[i], MemPool::DEFAULT_ALIGNMENT); + mem_size += mem_len[i]; + } + + // allocate a batch of memory and do memcpy + out = reinterpret_cast(dst->data()); + char* destination = (char*)dst->column_block()->pool()->allocate(mem_size); + if (destination == nullptr) { + return Status::MemoryAllocFailed( + strings::Substitute("memory allocate failed, size:$0", mem_size)); + } + for (int i = 0; i < max_fetch; ++i) { + out->relocate(destination); + destination += mem_len[i]; + ++out; } *n = max_fetch; diff --git a/be/src/olap/rowset/segment_v2/bloom_filter.cpp b/be/src/olap/rowset/segment_v2/bloom_filter.cpp index 74018957c953a1..f12e3e3fa0ef64 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter.cpp +++ b/be/src/olap/rowset/segment_v2/bloom_filter.cpp @@ -46,7 +46,7 @@ uint32_t BloomFilter::_optimal_bit_num(uint64_t n, double fpp) { } // Get closest power of 2 if bits is not power of 2. - if ((num_bits && (num_bits - 1)) != 0) { + if ((num_bits & (num_bits - 1)) != 0) { num_bits = 1 << ser::used_bits(num_bits); } if (num_bits < MINIMUM_BYTES << 3) { diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 0290361ba7e8f3..3a3750eda79ea7 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -161,7 +161,7 @@ class MemPool { MemTracker* mem_tracker() { return mem_tracker_; } - static const int DEFAULT_ALIGNMENT = 8; + static constexpr int DEFAULT_ALIGNMENT = 8; private: friend class MemPoolTest; diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h index d49e48325e4668..a4bf2ef6eb25bd 100644 --- a/be/src/util/bit_util.h +++ b/be/src/util/bit_util.h @@ -300,6 +300,12 @@ class BitUtil { return (value + (factor - 1)) & ~(factor - 1); } + // speed up function compute for SIMD + static inline size_t RoundUpToPowerOf2Int32(size_t value, size_t factor) { + DCHECK((factor > 0) && ((factor & (factor - 1)) == 0)); + return (value + (factor - 1)) & ~(factor - 1); + } + // Returns the ceil of value/divisor static inline int Ceil(int value, int divisor) { return value / divisor + (value % divisor != 0); diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 75cfe7148a9bbd..cb50c2b07662d5 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -28,6 +28,7 @@ module.exports = [ directoryPath: "installing/", children: [ "compilation", + "compilation-arm", "install-deploy", "upgrade", ], @@ -483,6 +484,7 @@ module.exports = [ "SHOW ALTER", "SHOW BACKUP", "SHOW CREATE FUNCTION", + "SHOW CREATE ROUTINE LOAD", "SHOW DATA", "SHOW DATABASES", "SHOW DELETE", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index cfbe0a4e7db8ce..1a7dfc91036d1b 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -28,6 +28,7 @@ module.exports = [ directoryPath: "installing/", children: [ "compilation", + "compilation-arm", "install-deploy", "upgrade", ], @@ -486,6 +487,7 @@ module.exports = [ "SHOW ALTER", "SHOW BACKUP", "SHOW CREATE FUNCTION", + "SHOW CREATE ROUTINE LOAD", "SHOW DATA", "SHOW DATABASES", "SHOW DELETE", diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 2cf55f6bbbe81c..a0fbb86c19b0ee 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -185,9 +185,9 @@ HTTP Server V2 is implemented by SpringBoot. It uses an architecture that separa ### http_max_request_size -Default:100M +Default:100MB -The above two parameters are the http v2 version, the web maximum upload file limit, the default is 100M, you can modify it according to your needs. +The above two parameters are the http v2 version, the web maximum upload file limit, the default is 100MB, you can modify it according to your needs. ### frontend_address @@ -496,6 +496,11 @@ This will limit the max recursion depth of hash distribution pruner. So that distribution pruner will no work and just return all buckets. Increase the depth can support distribution pruning for more elements, but may cost more CPU. +### max_backup_restore_job_num_per_db + +Default: 10 + +This configuration is mainly used to control the number of backup/restore tasks recorded in each database. ### using_old_load_usage_pattern diff --git a/docs/en/installing/compilation-arm.md b/docs/en/installing/compilation-arm.md new file mode 100644 index 00000000000000..58a5d40c4f7b49 --- /dev/null +++ b/docs/en/installing/compilation-arm.md @@ -0,0 +1,223 @@ +--- +{ + "title": "Compile on ARM platform", + "language": "en" +} +--- + + + + +# Compile and Run Doris on ARM64 + KylinOS. + +This document describes how to compile Doris on the ARM64 platform. + +Note that this document is only a guide document. Other errors may occur when compiling in different environments. + +## Software and hardware environment + +1. KylinOS version: + + ``` + $> cat /etc/.kyinfo + name=Kylin-Server + milestone=10-SP1-Release-Build04-20200711 + arch=arm64 + beta=False + time=2020-07-11 17:16:54 + dist_id=Kylin-Server-10-SP1-Release-Build04-20200711-arm64-2020-07-11 17:16:54 + ``` + +2. CPU model + + ``` + $> cat /proc/cpuinfo + model name: Phytium,FT-2000+/64 + ``` + +3. Doris version + + commit 68bab73 + +## Compilation tool installation (no network) + +In the example, all tools are installed in the `/home/doris/tools/installed/` directory. + +Please obtain the required installation package first under network conditions. + +### 1. Install gcc10 + +Download gcc-10.1.0 + +``` +wget https://mirrors.tuna.tsinghua.edu.cn/gnu/gcc/gcc-10.1.0/gcc-10.1.0.tar.gz +``` + +After unzipping, check the dependencies in `contrib/download_prerequisites` and download: + +``` +http://gcc.gnu.org/pub/gcc/infrastructure/gmp-6.1.0.tar.bz2 +http://gcc.gnu.org/pub/gcc/infrastructure/mpfr-3.1.4.tar.bz2 +http://gcc.gnu.org/pub/gcc/infrastructure/mpc-1.0.3.tar.gz +http://gcc.gnu.org/pub/gcc/infrastructure/isl-0.18.tar.bz2 +``` + +Unzip these four dependencies, then move to the gcc-10.1.0 source directory and rename them to gmp, isl, mpc, mpfr. + +Download and install automake-1.15 (because gcc10 will find automake 1.15 version during compilation) + +``` +https://ftp.gnu.org/gnu/automake/automake-1.15.tar.gz +tar xzf automake-1.15.tar.gz +./configure --prefix=/home/doris/tools/installed +make && make install +export PATH=/home/doris/tools/installed/bin:$PATH +``` + +Compile GCC10: + +``` +cd gcc-10.1.0 +./configure --prefix=/home/doris/tools/installed +make -j && make install +``` + +Compile time is longer. + +### 2. Install other compilation components + +1. jdk-8u291-linux-aarch64.tar.gz + + `https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html` + + No need to compile, just use it out of the box. + +2. cmake-3.19.8-Linux-aarch64.tar.gz + + `https://cmake.org/download/` + + No need to compile, just use it out of the box + +3. apache-maven-3.8.1-bin.tar.gz + + `https://maven.apache.org/download.cgi` + + No need to compile, just use it out of the box + +4. nodejs 16.3.0 + + `https://nodejs.org/dist/v16.3.0/node-v16.3.0-linux-arm64.tar.xz` + + No need to compile, just use it out of the box + +5. libtool-2.4.6.tar.gz + + For compiling third-party components, although the system may come with libtool, libtool needs to be together with automake, so it is not easy to cause problems. + + ``` + https://ftp.gnu.org/gnu/libtool/libtool-2.4.6.tar.gz + cd libtool-2.4.6/ + ./configure --prefix=/home/doris/tools/installed + make -j && make install + ``` + +6. binutils-2.36.tar.xz (obtain bdf.h) + + ``` + https://ftp.gnu.org/gnu/binutils/binutils-2.36.tar.bz2 + ./configure --prefix=/home/doris/tools/installed + make -j && make install + ``` + +7. Libiberty (for compiling BE) + + The source code of this library is under the source code package of gcc-10.1.0 + ``` + cd gcc-10.1.0/libiberty/ + ./configure --prefix=/home/doris/tools/installed + make + ``` + + After compilation, libiberty.a will be generated, which can be moved to the lib64 directory of Doris' thirdparty. + +### 3. Compile third-party libraries + +Suppose Doris source code is under `/home/doris/doris-src/`. + +1. Manually download all third-party libraries and place them in the thirdparty/src directory. +2. Add `custom_env.sh` in the Doris source directory and add the following content + + ``` + export DORIS_THIRDPARTY=/home/doris/doris-src/thirdparty/ + export JAVA_HOME=/home/doris/tools/jdk1.8.0_291/ + export DORIS_GCC_HOME=/home/doris/tools/installed/ + export PATCH_COMPILER_RT=true + ``` + + Pay attention to replace the corresponding directory + +3. Modify part of the content in build-thirdparty.sh + + 1. Close `build_mysql` and `build_libhdfs3` + + mysql is no longer needed. However, libhdfs3 does not support arm architecture for the time being, so running Doris in arm does not support direct access to hdfs through libhdfs3, and requires a broker. + + 2. Add the configure parameter in `build_curl`: `--without-libpsl`. If it is not added, an error may be reported during the linking phase of the final compilation of Doris BE: `undefined reference to ‘psl_is_cookie_domain_acceptable'` + +4. Execute build-thirdparty.sh. Here are only possible errors + + * `error: narrowing conversion of'-1' from'int' to'char' [-Wnarrowing]` + + There will be an error when compiling brpc 0.9.7. The solution is to add `-Wno-narrowing` in `CMAKE_CXX_FLAGS` of CMakeLists.txt of brpc. This problem has been fixed in the brpc master code: + + `https://github.com/apache/incubator-brpc/issues/1091` + + * `libz.a(deflate.o): relocation R_AARCH64_ADR_PREL_PG_HI21 against symbol `z_errmsg' which may bind externally can not be used when making a shared object; recompile with -fPIC` + + There will be errors when compiling brpc 0.9.7, and libcrypto will also report similar errors. The reason is unknown. It seems that under aarch64, brpc needs to link the dynamic zlib and crypto libraries. But when we compile these two third-party libraries, we only compiled .a static files. Solution: Recompile zlib and openssl to generate .so dynamic library: + + Open `build-thirdparty.sh`, find the `build_zlib` function, and change: + + ``` + ./configure --prefix=$TP_INSTALL_DIR --static + Just change to + ./configure --prefix=$TP_INSTALL_DIR + ``` + + Find `build_openssl` and comment out the following parts: + + ``` + #if [-f $TP_INSTALL_DIR/lib64/libcrypto.so ]; then + # rm -rf $TP_INSTALL_DIR/lib64/libcrypto.so* + #fi + #if [-f $TP_INSTALL_DIR/lib64/libssl.so ]; then + # rm -rf $TP_INSTALL_DIR/lib64/libssl.so* + #fi + ``` + + Then go to `build-thirdparty.sh`, comment out other `build_xxx`, open only `build_zlib` and `build_openssl`, and `build_brpc` and later `build_xxx`. Then re-execute `build-thirdparty.sh`. + + * The compilation is stuck at a certain stage. + + Not sure why. Solution: Rerun `build-thirdparty.sh`. `build-thirdparty.sh` can be executed repeatedly. + +### 4. Compile Doris source code + +Just execute sh build.sh. diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md new file mode 100644 index 00000000000000..3aa570bdd63da2 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md @@ -0,0 +1,42 @@ +--- +{ + "title": "SHOW CREATE ROUTINE LOAD", + "language": "en" +} +--- + + + +# SHOW CREATE ROUTINE LOAD +## description + The statement is used to show the routine load job creation statement of user-defined + grammar: + SHOW [ALL] CREATE ROUTINE LOAD for load_name; + + Description: + `ALL`: optional,Is for getting all jobs, including history jobs + `load_name`: routine load name + +## example + 1. Show the creation statement of the specified routine load under the default db + SHOW CREATE ROUTINE LOAD for test_load + +## keyword + SHOW,CREATE,ROUTINE,LOAD \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 289d2b36afd660..48df2ea9dc33a4 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -184,9 +184,9 @@ HTTP Server V2 由 SpringBoot 实现。它采用前后端分离的架构。只 ### http_max_request_size -默认值:100M +默认值:100MB -以上两个参数是http v2版本,web最大上传文件限制,默认100M,可以根据自己需要修改 +以上两个参数是http v2版本,web最大上传文件限制,默认100MB,可以根据自己需要修改 ### default_max_filter_ratio @@ -545,6 +545,12 @@ SmallFileMgr 中存储的最大文件数 最大 Routine Load 作业数,包括 NEED_SCHEDULED, RUNNING, PAUSE +### max_backup_restore_job_num_per_db + +默认值:10 + +此配置用于控制每个 DB 能够记录的 backup/restore 任务的数量 + ### max_running_txn_num_per_db 默认值:100 diff --git a/docs/zh-CN/installing/compilation-arm.md b/docs/zh-CN/installing/compilation-arm.md new file mode 100644 index 00000000000000..7a26f17c7321de --- /dev/null +++ b/docs/zh-CN/installing/compilation-arm.md @@ -0,0 +1,223 @@ +--- +{ + "title": "在ARM平台上编译", + "language": "zh-CN" +} +--- + + + + +# ARM64 + KylinOS 编译运行 Doris + +本文档介绍如何在 ARM64 平台上编译 Doris。 + +注意,该文档仅作为指导性文档。在不同环境中编译可能出现其他错误。 + +## 软硬件环境 + +1. KylinOS 版本: + + ``` + $> cat /etc/.kyinfo + name=Kylin-Server + milestone=10-SP1-Release-Build04-20200711 + arch=arm64 + beta=False + time=2020-07-11 17:16:54 + dist_id=Kylin-Server-10-SP1-Release-Build04-20200711-arm64-2020-07-11 17:16:54 + ``` + +2. CPU型号 + + ``` + $> cat /proc/cpuinfo + model name : Phytium,FT-2000+/64 + ``` + +3. Doris 版本 + + commit 68bab73 + +## 编译工具安装(无网络) + +示例中,所有工具安装在在 `/home/doris/tools/installed/` 目录下。 + +所需安装包请先在有网络情况下获取。 + +### 1. 安装gcc10 + +下载 gcc-10.1.0 + +``` +wget https://mirrors.tuna.tsinghua.edu.cn/gnu/gcc/gcc-10.1.0/gcc-10.1.0.tar.gz +``` + +解压后,在 `contrib/download_prerequisites` 查看依赖并下载: + +``` +http://gcc.gnu.org/pub/gcc/infrastructure/gmp-6.1.0.tar.bz2 +http://gcc.gnu.org/pub/gcc/infrastructure/mpfr-3.1.4.tar.bz2 +http://gcc.gnu.org/pub/gcc/infrastructure/mpc-1.0.3.tar.gz +http://gcc.gnu.org/pub/gcc/infrastructure/isl-0.18.tar.bz2 +``` + +解压这四个依赖,然后移动到 gcc-10.1.0 源码目录下,并重命名为 gmp、isl、mpc、mpfr。 + +下载并安装 automake-1.15(因为gcc10编译过程中会查找automake 1.15 版本) + +``` +https://ftp.gnu.org/gnu/automake/automake-1.15.tar.gz +tar xzf automake-1.15.tar.gz +./configure --prefix=/home/doris/tools/installed +make && make install +export PATH=/home/doris/tools/installed/bin:$PATH +``` + +编译GCC10: + +``` +cd gcc-10.1.0 +./configure --prefix=/home/doris/tools/installed +make -j && make install +``` + +编译时间较长。 + +### 2. 安装其他编译组件 + +1. jdk-8u291-linux-aarch64.tar.gz + + `https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html` + + 无需编译,开箱即用。 + +2. cmake-3.19.8-Linux-aarch64.tar.gz + + `https://cmake.org/download/` + + 无需编译,开箱即用 + +3. apache-maven-3.8.1-bin.tar.gz + + `https://maven.apache.org/download.cgi` + + 无需编译,开箱即用 + +4. nodejs 16.3.0 + + `https://nodejs.org/dist/v16.3.0/node-v16.3.0-linux-arm64.tar.xz` + + 无需编译,开箱即用 + +5. libtool-2.4.6.tar.gz + + 编译第三方组件用,虽然系统可能自带了libtool,但是libtool需要和automake在一起,这样不容易出问题。 + + ``` + https://ftp.gnu.org/gnu/libtool/libtool-2.4.6.tar.gz + cd libtool-2.4.6/ + ./configure --prefix=/home/doris/tools/installed + make -j && make install + ``` + +6. binutils-2.36.tar.xz(获取bdf.h) + + ``` + https://ftp.gnu.org/gnu/binutils/binutils-2.36.tar.bz2 + ./configure --prefix=/home/doris/tools/installed + make -j && make install + ``` + +7. libiberty(编译BE用) + + 这个库的源码就在 gcc-10.1.0 的源码包下 + ``` + cd gcc-10.1.0/libiberty/ + ./configure --prefix=/home/doris/tools/installed + make + ``` + + 编译后会产生 libiberty.a,后续移动到 Doris 的thirdparty 的 lib64 目录中即可。 + +### 3. 编译第三方库 + +假设Doris源码在 `/home/doris/doris-src/` 下。 + +1. 手动下载所有第三方库并放在 thirdparty/src 目录下。 +2. 在Doris源码目录下新增 `custom_env.sh` 并添加如下内容 + + ``` + export DORIS_THIRDPARTY=/home/doris/doris-src/thirdparty/ + export JAVA_HOME=/home/doris/tools/jdk1.8.0_291/ + export DORIS_GCC_HOME=/home/doris/tools/installed/ + export PATCH_COMPILER_RT=true + ``` + + 注意替换对应的目录 + +3. 修改 build-thirdparty.sh 中的部分内容 + + 1. 关闭 `build_mysql` 和 `build_libhdfs3` + + mysql 不在需要。而 libhdfs3 暂不支持 arm 架构,所以在arm中运行Doris,暂不支持通过 libhdfs3 直接访问 hdfs,需要通过broker。 + + 2. 在 `build_curl` 中增加 configure 参数:`--without-libpsl`。如果不添加,则在最终编译Doris BE的链接阶段,可能报错:`undefined reference to ‘psl_is_cookie_domain_acceptable'` + +4. 执行 build-thirdparty.sh。这里仅列举可能出现的错误 + + * ` error: narrowing conversion of '-1' from 'int' to 'char' [-Wnarrowing]` + + 编译brpc 0.9.7 时会出现错误,解决方案,在 brpc 的 CMakeLists.txt 的 `CMAKE_CXX_FLAGS` 中添加 `-Wno-narrowing`。brpc master 代码中已经修复这个问题: + + `https://github.com/apache/incubator-brpc/issues/1091` + + * `libz.a(deflate.o): relocation R_AARCH64_ADR_PREL_PG_HI21 against symbol `z_errmsg' which may bind externally can not be used when making a shared object; recompile with -fPIC` + + 编译brpc 0.9.7 时会出现错误,还有 libcrypto 也会报类似错误。原因未知,似乎在 aarch64 下,brpc 需要链接动态的 zlib 和 crypto 库。但是我们在编译这两个第三方库时,都只编译的了 .a 静态文件。解决方案:重新编译zlib和 openssl 生成.so 动态库: + + 打开 `build-thirdparty.sh`,找到 `build_zlib` 函数,将: + + ``` + ./configure --prefix=$TP_INSTALL_DIR --static + 就改为 + ./configure --prefix=$TP_INSTALL_DIR + ``` + + 找到 `build_openssl`,将以下部分注释掉: + + ``` + #if [ -f $TP_INSTALL_DIR/lib64/libcrypto.so ]; then + # rm -rf $TP_INSTALL_DIR/lib64/libcrypto.so* + #fi + #if [ -f $TP_INSTALL_DIR/lib64/libssl.so ]; then + # rm -rf $TP_INSTALL_DIR/lib64/libssl.so* + #fi + ``` + + 然后来到 `build-thirdparty.sh`,注释掉其他 `build_xxx`,仅打开 `build_zlib` 和 `build_openssl`,以及 `build_brpc` 和之后的 `build_xxx`。然后重新执行 `build-thirdparty.sh`。 + + * 编译到某个阶段卡住不动。 + + 不确定原因。解决方案:重跑 `build-thirdparty.sh`。`build-thirdparty.sh` 是可以重复执行的。 + +### 4. 编译Doris源码 + +执行 sh build.sh 即可。 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md new file mode 100644 index 00000000000000..56189c6eb2baa7 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md @@ -0,0 +1,42 @@ +--- +{ + "title": "SHOW CREATE ROUTINE LOAD", + "language": "zh-CN" +} +--- + + + +# SHOW CREATE ROUTINE LOAD +## description + 该语句用于展示例行导入作业的创建语句 + 语法: + SHOW [ALL] CREATE ROUTINE LOAD for load_name; + + 说明: + `ALL`: 可选参数,代表获取所有作业,包括历史作业 + `load_name`: 例行导入作业名称 + +## example + 1. 展示默认db下指定例行导入作业的创建语句 + SHOW CREATE ROUTINE LOAD for test_load + +## keyword + SHOW,CREATE,ROUTINE,LOAD \ No newline at end of file diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index ffdf533a9dec89..68492bb88fbf38 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -284,7 +284,7 @@ terminal String COMMENTED_PLAN_HINTS; nonterminal List stmts; nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, - show_routine_load_stmt, show_routine_load_task_stmt, + show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, describe_stmt, alter_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, @@ -666,6 +666,8 @@ stmt ::= {: RESULT = stmt; :} | show_routine_load_task_stmt : stmt {: RESULT = stmt; :} + | show_create_routine_load_stmt : stmt + {: RESULT = stmt; :} | cancel_stmt : stmt {: RESULT = stmt; :} | delete_stmt : stmt @@ -1080,7 +1082,7 @@ alter_system_clause ::= | KW_SET KW_LOAD KW_ERRORS KW_HUB opt_properties:properties {: RESULT = new AlterLoadErrorUrlClause(properties); - :} + :} ; // Sync Stmt @@ -1564,16 +1566,16 @@ opt_broker ::= :} ; -resource_desc ::= - KW_WITH KW_RESOURCE ident_or_text:resourceName - {: - RESULT = new ResourceDesc(resourceName, null); - :} +resource_desc ::= + KW_WITH KW_RESOURCE ident_or_text:resourceName + {: + RESULT = new ResourceDesc(resourceName, null); + :} | KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN - {: - RESULT = new ResourceDesc(resourceName, properties); - :} - ; + {: + RESULT = new ResourceDesc(resourceName, properties); + :} + ; // Routine load statement create_routine_load_stmt ::= @@ -1681,6 +1683,17 @@ show_routine_load_task_stmt ::= :} ; +show_create_routine_load_stmt ::= + KW_SHOW KW_CREATE KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel + {: + RESULT = new ShowCreateRoutineLoadStmt(jobLabel, false); + :} + | KW_SHOW KW_ALL KW_CREATE KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel + {: + RESULT = new ShowCreateRoutineLoadStmt(jobLabel, true); + :} + ; + // Grant statement grant_stmt ::= KW_GRANT privilege_list:privs KW_ON tbl_pattern:tblPattern KW_TO user_identity:userId @@ -2509,9 +2522,9 @@ show_param ::= {: RESULT = new ShowUserPropertyStmt(user, parser.wild); :} - | KW_BACKUP opt_db:db + | KW_BACKUP opt_db:db opt_wild_where {: - RESULT = new ShowBackupStmt(db); + RESULT = new ShowBackupStmt(db, parser.where); :} | KW_RESTORE opt_db:db opt_wild_where {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 0d571d46b296ff..2a5acd6f5544a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -26,12 +26,16 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TResultFileSinkOptions; + +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -139,7 +143,11 @@ public BrokerDesc getBrokerDesc() { return brokerDesc; } - public void analyze(Analyzer analyzer) throws AnalysisException { + public List> getSchema() { + return schema; + } + + private void analyze(Analyzer analyzer) throws AnalysisException { analyzeFilePath(); if (Strings.isNullOrEmpty(filePath)) { @@ -167,11 +175,111 @@ public void analyze(Analyzer analyzer) throws AnalysisException { public void analyze(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { analyze(analyzer); - List items = stmt.getSelectList().getItems(); - for (SelectListItem item:items) { - if (item.getExpr().getType() == Type.LARGEINT && isParquetFormat()) { - throw new AnalysisException("currently parquet do not support largeint type"); + + if (isParquetFormat()) { + analyzeForParquetFormat(stmt.getResultExprs()); + } + } + + private void analyzeForParquetFormat(List resultExprs) throws AnalysisException { + if (this.schema.isEmpty()) { + genParquetSchema(resultExprs); + } + + // check schema number + if (resultExprs.size() != this.schema.size()) { + throw new AnalysisException("Parquet schema number does not equal to select item number"); + } + + // check type + for (int i = 0; i < this.schema.size(); ++i) { + String type = this.schema.get(i).get(1); + Type resultType = resultExprs.get(i).getType(); + switch (resultType.getPrimitiveType()) { + case BOOLEAN: + if (!type.equals("boolean")) { + throw new AnalysisException("project field type is BOOLEAN, should use boolean, but the type of column " + + i + " is " + type); + } + break; + case TINYINT: + case SMALLINT: + case INT: + if (!type.equals("int32")) { + throw new AnalysisException("project field type is TINYINT/SMALLINT/INT, should use int32, " + + "but the definition type of column " + i + " is " + type); + } + break; + case BIGINT: + case DATE: + case DATETIME: + if (!type.equals("int64")) { + throw new AnalysisException("project field type is BIGINT/DATE/DATETIME, should use int64, " + + "but the definition type of column " + i + " is " + type); + } + break; + case FLOAT: + if (!type.equals("float")) { + throw new AnalysisException("project field type is FLOAT, should use float, but the definition type of column " + + i + " is " + type); + } + break; + case DOUBLE: + if (!type.equals("double")) { + throw new AnalysisException("project field type is DOUBLE, should use double, but the definition type of column " + + i + " is " + type); + } + break; + case CHAR: + case VARCHAR: + case DECIMALV2: + if (!type.equals("byte_array")) { + throw new AnalysisException("project field type is CHAR/VARCHAR/DECIMAL, should use byte_array, " + + "but the definition type of column " + i + " is " + type); + } + break; + default: + throw new AnalysisException("Parquet format does not support column type: " + resultType.getPrimitiveType()); + } + } + } + + private void genParquetSchema(List resultExprs) throws AnalysisException { + Preconditions.checkState(this.schema.isEmpty()); + for (int i = 0; i < resultExprs.size(); ++i) { + Expr expr = resultExprs.get(i); + List column = new ArrayList<>(); + column.add("required"); + switch (expr.getType().getPrimitiveType()) { + case BOOLEAN: + column.add("boolean"); + break; + case TINYINT: + case SMALLINT: + case INT: + column.add("int32"); + break; + case BIGINT: + case DATE: + case DATETIME: + column.add("int64"); + break; + case FLOAT: + column.add("float"); + break; + case DOUBLE: + column.add("double"); + break; + case CHAR: + case VARCHAR: + case DECIMALV2: + column.add("byte_array"); + break; + default: + throw new AnalysisException("currently parquet do not support column type: " + expr.getType().getPrimitiveType()); } + column.add("col" + i); + this.schema.add(column); } } @@ -238,7 +346,6 @@ private void analyzeProperties() throws AnalysisException { throw new AnalysisException("Unknown properties: " + properties.keySet().stream() .filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList())); } - } private void getBrokerProperties(Set processedPropKeys) { @@ -273,14 +380,28 @@ private void getBrokerProperties(Set processedPropKeys) { * currently only supports: compression, disable_dictionary, version */ private void getParquetProperties(Set processedPropKeys) throws AnalysisException { + // save all parquet prefix property + Iterator> iter = properties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { + processedPropKeys.add(entry.getKey()); + fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue()); + } + } + + // check schema. if schema is not set, Doris will gen schema by select items String schema = properties.get(SCHEMA); - if (schema == null || schema.length() <= 0) { - throw new AnalysisException("schema is required for parquet file"); + if (schema == null) { + return; + } + if (schema.isEmpty()) { + throw new AnalysisException("Parquet schema property should not be empty"); } - schema = schema.replace(" ",""); + schema = schema.replace(" ", ""); schema = schema.toLowerCase(); String[] schemas = schema.split(";"); - for (String item:schemas) { + for (String item : schemas) { String[] properties = item.split(","); if (properties.length != 3) { throw new AnalysisException("must only contains repetition type/column type/column name"); @@ -299,14 +420,6 @@ private void getParquetProperties(Set processedPropKeys) throws Analysis this.schema.add(column); } processedPropKeys.add(SCHEMA); - Iterator> iter = properties.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { - processedPropKeys.add(entry.getKey()); - fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue()); - } - } } private boolean isCsvFormat() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java index 6137697e3d60ad..053d5c8d480663 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java @@ -21,8 +21,11 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -31,6 +34,8 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import java.util.function.Predicate; + public class ShowBackupStmt extends ShowStmt { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("JobId").add("SnapshotName").add("DbName").add("State").add("BackupObjs").add("CreateTime") @@ -39,9 +44,13 @@ public class ShowBackupStmt extends ShowStmt { .build(); private String dbName; + private final Expr where; + private boolean isAccurateMatch; + private String labelValue; - public ShowBackupStmt(String dbName) { + public ShowBackupStmt(String dbName, Expr where) { this.dbName = dbName; + this.where = where; } public String getDbName() { @@ -65,6 +74,56 @@ public void analyze(Analyzer analyzer) throws UserException { ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED, ConnectContext.get().getQualifiedUser(), dbName); } + + if (where == null) { + return; + } + boolean valid = analyzeWhereClause(); + if (!valid) { + throw new AnalysisException("Where clause should like: LABEL = \"your_label_name\", " + + " or LABEL LIKE \"matcher\""); + } + } + + private boolean analyzeWhereClause() { + if (!(where instanceof LikePredicate) && !(where instanceof BinaryPredicate)) { + return false; + } + + if (where instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) where; + if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) { + return false; + } + isAccurateMatch = true; + } + + if (where instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) where; + if (LikePredicate.Operator.LIKE != likePredicate.getOp()) { + return false; + } + } + + // left child + if (!(where.getChild(0) instanceof SlotRef)) { + return false; + } + String leftKey = ((SlotRef) where.getChild(0)).getColumnName(); + if (!"label".equalsIgnoreCase(leftKey)) { + return false; + } + + // right child + if (!(where.getChild(1) instanceof StringLiteral)) { + return false; + } + labelValue = ((StringLiteral) where.getChild(1)).getStringValue(); + if (Strings.isNullOrEmpty(labelValue)) { + return false; + } + + return true; } @Override @@ -84,6 +143,10 @@ public String toSql() { builder.append(" FROM `").append(dbName).append("` "); } + if (where != null) { + builder.append(where.toSql()); + } + return builder.toString(); } @@ -96,4 +159,28 @@ public String toString() { public RedirectStatus getRedirectStatus() { return RedirectStatus.FORWARD_NO_SYNC; } + + public boolean isAccurateMatch() { + return isAccurateMatch; + } + + public String getLabelValue() { + return labelValue; + } + + public Expr getWhere() { + return where; + } + + public Predicate getLabelPredicate() throws AnalysisException { + if (null == where) { + return label -> true; + } + if (isAccurateMatch) { + return CaseSensibility.LABEL.getCaseSensibility() ? label -> label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue); + } else { + PatternMatcher patternMatcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility()); + return patternMatcher::match; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java new file mode 100644 index 00000000000000..d0ae92b6ef8ece --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.qe.ShowResultSetMetaData; + +// SHOW CREATE ROUTINE LOAD statement. +public class ShowCreateRoutineLoadStmt extends ShowStmt { + + private static final ShowResultSetMetaData META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("Routine Load Id", ScalarType.createVarchar(20))) + .addColumn(new Column("Routine Load Name", ScalarType.createVarchar(20))) + .addColumn(new Column("Create Routine Load", ScalarType.createVarchar(30))) + .build(); + + private final LabelName labelName; + + private final boolean includeHistory; + + public ShowCreateRoutineLoadStmt(LabelName labelName, boolean includeHistory) { + this.labelName = labelName; + this.includeHistory = includeHistory; + } + + public String getDb() { + return labelName.getDbName(); + } + + public String getLabel() { + return labelName.getLabelName(); + } + + public boolean isIncludeHistory() { + return includeHistory; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + labelName.analyze(analyzer); + } + + @Override + public ShowResultSetMetaData getMetaData() { + return META_DATA; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java index 9426898c39addd..b47a0a39f7a5ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java @@ -21,8 +21,11 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -31,6 +34,8 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import java.util.function.Predicate; + public class ShowRestoreStmt extends ShowStmt { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("JobId").add("Label").add("Timestamp").add("DbName").add("State") @@ -42,7 +47,8 @@ public class ShowRestoreStmt extends ShowStmt { private String dbName; private Expr where; - private String label; + private String labelValue; + private boolean isAccurateMatch; public ShowRestoreStmt(String dbName, Expr where) { this.dbName = dbName; @@ -53,8 +59,8 @@ public String getDbName() { return dbName; } - public String getLabel() { - return label; + public String getLabelValue() { + return labelValue; } @Override @@ -74,6 +80,56 @@ public void analyze(Analyzer analyzer) throws UserException { ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED, ConnectContext.get().getQualifiedUser(), dbName); } + + if (where == null) { + return; + } + boolean valid = analyzeWhereClause(); + if (!valid) { + throw new AnalysisException("Where clause should like: LABEL = \"your_label_name\", " + + " or LABEL LIKE \"matcher\""); + } + } + + private boolean analyzeWhereClause() { + if (!(where instanceof LikePredicate) && !(where instanceof BinaryPredicate)) { + return false; + } + + if (where instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) where; + if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) { + return false; + } + isAccurateMatch = true; + } + + if (where instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) where; + if (LikePredicate.Operator.LIKE != likePredicate.getOp()) { + return false; + } + } + + // left child + if (!(where.getChild(0) instanceof SlotRef)) { + return false; + } + String leftKey = ((SlotRef) where.getChild(0)).getColumnName(); + if (!"label".equalsIgnoreCase(leftKey)) { + return false; + } + + // right child + if (!(where.getChild(1) instanceof StringLiteral)) { + return false; + } + labelValue = ((StringLiteral) where.getChild(1)).getStringValue(); + if (Strings.isNullOrEmpty(labelValue)) { + return false; + } + + return true; } @Override @@ -106,5 +162,25 @@ public String toString() { public RedirectStatus getRedirectStatus() { return RedirectStatus.FORWARD_NO_SYNC; } + + public boolean isAccurateMatch() { + return isAccurateMatch; + } + + public Expr getWhere() { + return where; + } + + public Predicate getLabelPredicate() throws AnalysisException { + if (null == where) { + return label -> true; + } + if (isAccurateMatch) { + return CaseSensibility.LABEL.getCaseSensibility() ? label -> label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue); + } else { + PatternMatcher patternMatcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility()); + return patternMatcher::match; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index bf5e43cb254c3c..5b18f2fee31389 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -55,9 +55,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,11 +67,16 @@ import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class BackupHandler extends MasterDaemon implements Writable { private static final Logger LOG = LogManager.getLogger(BackupHandler.class); @@ -82,13 +87,14 @@ public class BackupHandler extends MasterDaemon implements Writable { private RepositoryMgr repoMgr = new RepositoryMgr(); - // db id -> last running or finished backup/restore jobs - // We only save the last backup/restore job of a database. + // this lock is used for updating dbIdToBackupOrRestoreJobs + private final ReentrantLock jobLock = new ReentrantLock(); + + // db id -> last 10(max_backup_restore_job_num_per_db) backup/restore jobs // Newly submitted job will replace the current job, only if current job is finished or cancelled. // If the last job is finished, user can get the job info from repository. If the last job is cancelled, // user can get the error message before submitting the next one. - // Use ConcurrentMap to get rid of locks. - private Map dbIdToBackupOrRestoreJob = Maps.newConcurrentMap(); + private final Map> dbIdToBackupOrRestoreJobs = new HashMap<>(); // this lock is used for handling one backup or restore request at a time. private ReentrantLock seqlock = new ReentrantLock(); @@ -154,7 +160,19 @@ private boolean init() { } public AbstractJob getJob(long dbId) { - return dbIdToBackupOrRestoreJob.get(dbId); + return getCurrentJob(dbId); + } + + public List getJobs(long dbId, Predicate predicate) { + jobLock.lock(); + try { + return dbIdToBackupOrRestoreJobs.getOrDefault(dbId, new LinkedList<>()) + .stream() + .filter(e -> predicate.test(e.getLabel())) + .collect(Collectors.toList()); + } finally { + jobLock.unlock(); + } } @Override @@ -165,7 +183,7 @@ protected void runAfterCatalogReady() { } } - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + for (AbstractJob job : getAllCurrentJobs()) { job.setCatalog(catalog); job.run(); } @@ -197,8 +215,8 @@ public void dropRepository(DropRepositoryStmt stmt) throws DdlException { if (repo == null) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist"); } - - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + + for (AbstractJob job : getAllCurrentJobs()) { if (!job.isDone() && job.getRepoId() == repo.getId()) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Backup or restore job is running on this repository." @@ -239,7 +257,7 @@ public void process(AbstractBackupStmt stmt) throws DdlException { tryLock(); try { // Check if there is backup or restore job running on this database - AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(db.getId()); + AbstractJob currentJob = getCurrentJob(db.getId()); if (currentJob != null && !currentJob.isDone()) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Can only run one backup or restore job of a database at same time"); @@ -364,7 +382,7 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws catalog.getEditLog().logBackupJob(backupJob); // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed. - dbIdToBackupOrRestoreJob.put(db.getId(), backupJob); + addBackupOrRestoreJob(db.getId(), backupJob); LOG.info("finished to submit backup job: {}", backupJob); } @@ -392,11 +410,51 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw catalog.getEditLog().logRestoreJob(restoreJob); // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed. - dbIdToBackupOrRestoreJob.put(db.getId(), restoreJob); + addBackupOrRestoreJob(db.getId(), restoreJob); LOG.info("finished to submit restore job: {}", restoreJob); } + private void addBackupOrRestoreJob(long dbId, AbstractJob job) { + jobLock.lock(); + try { + Deque jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList()); + while (jobs.size() >= Config.max_backup_restore_job_num_per_db) { + jobs.removeFirst(); + } + AbstractJob lastJob = jobs.peekLast(); + + // Remove duplicate jobs and keep only the latest status + // Otherwise, the tasks that have been successfully executed will be repeated when replaying edit log. + if (lastJob != null && (lastJob.isPending() || lastJob.getJobId() == job.getJobId())) { + jobs.removeLast(); + } + jobs.addLast(job); + } finally { + jobLock.unlock(); + } + } + + private List getAllCurrentJobs() { + jobLock.lock(); + try { + return dbIdToBackupOrRestoreJobs.values().stream().filter(CollectionUtils::isNotEmpty) + .map(Deque::getLast).collect(Collectors.toList()); + } finally { + jobLock.unlock(); + } + } + + private AbstractJob getCurrentJob(long dbId) { + jobLock.lock(); + try { + Deque jobs = dbIdToBackupOrRestoreJobs.getOrDefault(dbId, Lists.newLinkedList()); + return jobs.isEmpty() ? null : jobs.getLast(); + } finally { + jobLock.unlock(); + } + } + private void checkAndFilterRestoreObjsExistInSnapshot(BackupJobInfo jobInfo, AbstractBackupTableRefClause backupTableRefClause) throws DdlException { @@ -490,8 +548,8 @@ public void cancel(CancelBackupStmt stmt) throws DdlException { if (db == null) { ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName); } - - AbstractJob job = dbIdToBackupOrRestoreJob.get(db.getId()); + + AbstractJob job = getCurrentJob(db.getId()); if (job == null || (job instanceof BackupJob && stmt.isRestore()) || (job instanceof RestoreJob && !stmt.isRestore())) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "No " @@ -508,7 +566,8 @@ public void cancel(CancelBackupStmt stmt) throws DdlException { } public boolean handleFinishedSnapshotTask(SnapshotTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); + AbstractJob job = getCurrentJob(task.getDbId()); + if (job == null) { LOG.warn("failed to find backup or restore job for task: {}", task); // return true to remove this task from AgentTaskQueue @@ -533,7 +592,7 @@ public boolean handleFinishedSnapshotTask(SnapshotTask task, TFinishTaskRequest } public boolean handleFinishedSnapshotUploadTask(UploadTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); + AbstractJob job = getCurrentJob(task.getDbId()); if (job == null || (job instanceof RestoreJob)) { LOG.info("invalid upload task: {}, no backup job is found. db id: {}", task, task.getDbId()); return false; @@ -548,8 +607,8 @@ public boolean handleFinishedSnapshotUploadTask(UploadTask task, TFinishTaskRequ } public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); - if (job == null || !(job instanceof RestoreJob)) { + AbstractJob job = getCurrentJob(task.getDbId()); + if (!(job instanceof RestoreJob)) { LOG.warn("failed to find restore job for task: {}", task); // return true to remove this task from AgentTaskQueue return true; @@ -559,8 +618,8 @@ public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest } public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); - if (job == null || !(job instanceof RestoreJob)) { + AbstractJob job = getCurrentJob(task.getDbId()); + if (!(job instanceof RestoreJob)) { LOG.warn("failed to find restore job for task: {}", task); // return true to remove this task from AgentTaskQueue return true; @@ -571,16 +630,16 @@ public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest request) { public void replayAddJob(AbstractJob job) { if (job.isCancelled()) { - AbstractJob existingJob = dbIdToBackupOrRestoreJob.get(job.getDbId()); + AbstractJob existingJob = getCurrentJob(job.getDbId()); if (existingJob == null || existingJob.isDone()) { LOG.error("invalid existing job: {}. current replay job is: {}", - existingJob, job); + existingJob, job); return; } existingJob.setCatalog(catalog); existingJob.replayCancel(); } else if (!job.isPending()) { - AbstractJob existingJob = dbIdToBackupOrRestoreJob.get(job.getDbId()); + AbstractJob existingJob = getCurrentJob(job.getDbId()); if (existingJob == null || existingJob.isDone()) { LOG.error("invalid existing job: {}. current replay job is: {}", existingJob, job); @@ -591,11 +650,12 @@ public void replayAddJob(AbstractJob job) { // for example: In restore job, PENDING will transfer to SNAPSHOTING, not DOWNLOAD. job.replayRun(); } - dbIdToBackupOrRestoreJob.put(job.getDbId(), job); + + addBackupOrRestoreJob(job.getDbId(), job); } public boolean report(TTaskType type, long jobId, long taskId, int finishedNum, int totalNum) { - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + for (AbstractJob job : getAllCurrentJobs()) { if (job.getType() == JobType.BACKUP) { if (!job.isDone() && job.getJobId() == jobId && type == TTaskType.UPLOAD) { job.taskProgress.put(taskId, Pair.create(finishedNum, totalNum)); @@ -621,8 +681,9 @@ public static BackupHandler read(DataInput in) throws IOException { public void write(DataOutput out) throws IOException { repoMgr.write(out); - out.writeInt(dbIdToBackupOrRestoreJob.size()); - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + List jobs = dbIdToBackupOrRestoreJobs.values().stream().flatMap(Deque::stream).collect(Collectors.toList()); + out.writeInt(jobs.size()); + for (AbstractJob job : jobs) { job.write(out); } } @@ -633,7 +694,7 @@ public void readFields(DataInput in) throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { AbstractJob job = AbstractJob.read(in); - dbIdToBackupOrRestoreJob.put(job.getDbId(), job); + addBackupOrRestoreJob(job.getDbId(), job); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 361d6ec12625bf..b7288dbe244f96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -62,6 +62,7 @@ public enum TabletStatus { COLOCATE_MISMATCH, // replicas do not all locate in right colocate backends set. COLOCATE_REDUNDANT, // replicas match the colocate backends set, but redundant. NEED_FURTHER_REPAIR, // one of replicas need a definite repair. + UNRECOVERABLE // non of replicas are healthy } @SerializedName(value = "id") @@ -455,7 +456,9 @@ public Pair getHealthStatusWithPriority( // 1. alive replicas are not enough int aliveBackendsNum = aliveBeIdsInCluster.size(); - if (alive < replicationNum && replicas.size() >= aliveBackendsNum + if (alive == 0) { + return Pair.create(TabletStatus.UNRECOVERABLE, Priority.VERY_HIGH); + } else if (alive < replicationNum && replicas.size() >= aliveBackendsNum && aliveBackendsNum >= replicationNum && replicationNum > 1) { // there is no enough backend for us to create a new replica, so we have to delete an existing replica, // so there can be available backend for us to create a new replica. @@ -473,7 +476,9 @@ public Pair getHealthStatusWithPriority( } // 2. version complete replicas are not enough - if (aliveAndVersionComplete < (replicationNum / 2) + 1) { + if (aliveAndVersionComplete == 0) { + return Pair.create(TabletStatus.UNRECOVERABLE, Priority.VERY_HIGH); + } else if (aliveAndVersionComplete < (replicationNum / 2) + 1) { return Pair.create(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.HIGH); } else if (aliveAndVersionComplete < replicationNum) { return Pair.create(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.NORMAL); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java index 4e375c10001fca..2c9666d1b67a89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -329,6 +329,11 @@ private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Part // Only set last status check time when status is healthy. tablet.setLastStatusCheckTime(startTime); continue; + } else if (statusWithPrio.first == TabletStatus.UNRECOVERABLE) { + // This tablet is not recoverable, do not set it into tablet scheduler + // all UNRECOVERABLE tablet can be seen from "show proc '/statistic'" + counter.unhealthyTabletNum++; + continue; } else if (isInPrios) { statusWithPrio.second = TabletSchedCtx.Priority.VERY_HIGH; prioPartIsHealthy = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 0794296781748b..9c4b2b4c5b822d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -575,17 +575,19 @@ private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tab case FORCE_REDUNDANT: handleRedundantReplica(tabletCtx, true); break; - case REPLICA_MISSING_IN_CLUSTER: - handleReplicaClusterMigration(tabletCtx, batchTask); - break; - case COLOCATE_MISMATCH: - handleColocateMismatch(tabletCtx, batchTask); - break; - case COLOCATE_REDUNDANT: - handleColocateRedundant(tabletCtx); - break; - default: - break; + case REPLICA_MISSING_IN_CLUSTER: + handleReplicaClusterMigration(tabletCtx, batchTask); + break; + case COLOCATE_MISMATCH: + handleColocateMismatch(tabletCtx, batchTask); + break; + case COLOCATE_REDUNDANT: + handleColocateRedundant(tabletCtx); + break; + case UNRECOVERABLE: + throw new SchedException(Status.UNRECOVERABLE, "tablet is unrecoverable"); + default: + break; } } else { // balance diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 5a6ab5f2295ee7..9d351def23c47d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -326,10 +326,10 @@ public class Config extends ConfigBase { @ConfField public static int http_backlog_num = 1024; /** - *Maximum file limit for single upload of web request, default value: 100M + *Maximum file limit for single upload of web request, default value: 100MB */ - @ConfField public static String http_max_file_size = "100M"; - @ConfField public static String http_max_request_size = "100M"; + @ConfField public static String http_max_file_size = "100MB"; + @ConfField public static String http_max_request_size = "100MB"; /** * The backlog_num for mysql nio server @@ -1402,4 +1402,10 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int max_dynamic_partition_num = 500; + + /* + * Control the max num of backup/restore job per db + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_backup_restore_job_num_per_db = 10; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java index b278c47cc1f5c5..4cdf5de7145805 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java @@ -29,20 +29,23 @@ public class IncompleteTabletsProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("UnhealthyTablets").add("InconsistentTablets").add("CloningTablets") + .add("UnhealthyTablets").add("InconsistentTablets").add("CloningTablets").add("BadTablets") .build(); private static final Joiner JOINER = Joiner.on(","); Collection unhealthyTabletIds; Collection inconsistentTabletIds; Collection cloningTabletIds; + Collection unrecoverableTabletIds; public IncompleteTabletsProcNode(Collection unhealthyTabletIds, Collection inconsistentTabletIds, - Collection cloningTabletIds) { + Collection cloningTabletIds, + Collection unrecoverableTabletIds) { this.unhealthyTabletIds = unhealthyTabletIds; this.inconsistentTabletIds = inconsistentTabletIds; this.cloningTabletIds = cloningTabletIds; + this.unrecoverableTabletIds = unrecoverableTabletIds; } @Override @@ -56,9 +59,11 @@ public ProcResult fetchResult() throws AnalysisException { String incompleteTablets = JOINER.join(Arrays.asList(unhealthyTabletIds)); String inconsistentTablets = JOINER.join(Arrays.asList(inconsistentTabletIds)); String cloningTablets = JOINER.join(Arrays.asList(cloningTabletIds)); + String unrecoverableTablets = JOINER.join(Arrays.asList(unrecoverableTabletIds)); row.add(incompleteTablets); row.add(inconsistentTablets); row.add(cloningTablets); + row.add(unrecoverableTablets); result.addRow(row); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java index 001f00c00c1a9c..596267cb4228ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java @@ -17,10 +17,6 @@ package org.apache.doris.common.proc; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Multimap; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -38,6 +34,12 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.thrift.TTaskType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -49,7 +51,7 @@ public class StatisticProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("DbId").add("DbName").add("TableNum").add("PartitionNum") .add("IndexNum").add("TabletNum").add("ReplicaNum").add("UnhealthyTabletNum") - .add("InconsistentTabletNum").add("CloningTabletNum") + .add("InconsistentTabletNum").add("CloningTabletNum").add("BadTabletNum") .build(); private static final Logger LOG = LogManager.getLogger(StatisticProcDir.class); @@ -61,12 +63,15 @@ public class StatisticProcDir implements ProcDirInterface { Multimap inconsistentTabletIds; // db id -> set(tablet id) Multimap cloningTabletIds; + // db id -> set(tablet id) + Multimap unrecoverableTabletIds; public StatisticProcDir(Catalog catalog) { this.catalog = catalog; unhealthyTabletIds = HashMultimap.create(); inconsistentTabletIds = HashMultimap.create(); cloningTabletIds = HashMultimap.create(); + unrecoverableTabletIds = HashMultimap.create(); } @Override @@ -140,8 +145,11 @@ public ProcResult fetchResult() throws AnalysisException { // here we treat REDUNDANT as HEALTHY, for user friendly. if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT - && res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR) { + && res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR + && res.first != TabletStatus.UNRECOVERABLE) { unhealthyTabletIds.put(dbId, tablet.getId()); + } else if (res.first == TabletStatus.UNRECOVERABLE) { + unrecoverableTabletIds.put(dbId, tablet.getId()); } if (!tablet.isConsistent()) { @@ -166,6 +174,7 @@ public ProcResult fetchResult() throws AnalysisException { oneLine.add(unhealthyTabletIds.get(dbId).size()); oneLine.add(inconsistentTabletIds.get(dbId).size()); oneLine.add(cloningTabletIds.get(dbId).size()); + oneLine.add(unrecoverableTabletIds.get(dbId).size()); lines.add(oneLine); @@ -195,6 +204,7 @@ public ProcResult fetchResult() throws AnalysisException { finalLine.add(unhealthyTabletIds.size()); finalLine.add(inconsistentTabletIds.size()); finalLine.add(cloningTabletIds.size()); + finalLine.add(unrecoverableTabletIds.size()); lines.add(finalLine); // add result @@ -224,7 +234,8 @@ public ProcNodeInterface lookup(String dbIdStr) throws AnalysisException { } return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId), - inconsistentTabletIds.get(dbId), - cloningTabletIds.get(dbId)); + inconsistentTabletIds.get(dbId), + cloningTabletIds.get(dbId), + unrecoverableTabletIds.get(dbId)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java index 0e5782551ccbae..8f062ea2ba6b71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java @@ -65,6 +65,9 @@ public void start(String dorisHome) { properties.put("spring.http.encoding.force", true); properties.put("spring.servlet.multipart.max-file-size", this.maxFileSize); properties.put("spring.servlet.multipart.max-request-size", this.maxRequestSize); + // This is to disable the spring-boot-devtools restart feature. + // To avoid some unexpected behavior. + System.setProperty("spring.devtools.restart.enabled", "false"); properties.put("logging.config", dorisHome + "/conf/" + SpringLog4j2Config.SPRING_LOG_XML_FILE); new SpringApplicationBuilder() .sources(HttpServer.class) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index e66751572f4df2..5ba162d7d843ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -63,6 +63,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.TimeZone; import java.util.UUID; @@ -96,7 +97,7 @@ public KafkaRoutineLoadJob() { } public KafkaRoutineLoadJob(Long id, String name, String clusterName, - long dbId, long tableId, String brokerList, String topic) { + long dbId, long tableId, String brokerList, String topic) { super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA); this.brokerList = brokerList; this.topic = topic; @@ -224,8 +225,8 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { // Through the transaction status and attachment information, to determine whether the progress needs to be updated. @Override protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, - TransactionState txnState, - TransactionState.TxnStatusChangeReason txnStatusChangeReason) { + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason) { if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) { // For committed txn, update the progress. return true; @@ -494,6 +495,21 @@ protected String customPropertiesJsonToString() { return gson.toJson(customProperties); } + @Override + protected Map getDataSourceProperties() { + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("kafka_broker_list", brokerList); + dataSourceProperties.put("kafka_topic", topic); + return dataSourceProperties; + } + + @Override + protected Map getCustomProperties() { + Map ret = new HashMap<>(); + customProperties.forEach((k, v) -> ret.put("property." + k, v)); + return ret; + } + @Override public void write(DataOutput out) throws IOException { super.write(out); @@ -570,7 +586,7 @@ private void convertTimestampToOffset(RoutineLoadDataSourceProperties dataSource } private void modifyPropertiesInternal(Map jobProperties, - RoutineLoadDataSourceProperties dataSourceProperties) + RoutineLoadDataSourceProperties dataSourceProperties) throws DdlException { List> kafkaPartitionOffsets = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 3ca38250912429..31f28c14037e47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -110,29 +110,29 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public static final boolean DEFAULT_STRICT_MODE = false; // default is false protected static final String STAR_STRING = "*"; - /* - +-----------------+ - fe schedule job | NEED_SCHEDULE | user resume job - +-----------+ | <---------+ - | | | | - v +-----------------+ ^ - | | - +------------+ user(system)pause job +-------+----+ - | RUNNING | | PAUSED | - | +-----------------------> | | - +----+-------+ +-------+----+ - | | | - | | +---------------+ | - | | | STOPPED | | - | +---------> | | <-----------+ - | user stop job+---------------+ user stop job - | - | - | +---------------+ - | | CANCELLED | - +-------------> | | - system error +---------------+ - */ + /* + +-----------------+ + fe schedule job | NEED_SCHEDULE | user resume job + +-----------+ | <---------+ + | | | | + v +-----------------+ ^ + | | + +------------+ user(system)pause job +-------+----+ + | RUNNING | | PAUSED | + | +-----------------------> | | + +----+-------+ +-------+----+ + | | | + | | +---------------+ | + | | | STOPPED | | + | +---------> | | <-----------+ + | user stop job+---------------+ user stop job + | + | + | +---------------+ + | | CANCELLED | + +-------------> | | + system error +---------------+ + */ public enum JobState { NEED_SCHEDULE, RUNNING, @@ -733,11 +733,11 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse if (currentTotalRows > maxBatchRows * 10) { if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_rows", currentTotalRows) - .add("current_error_rows", currentErrorRows) - .add("max_error_num", maxErrorNum) - .add("msg", "current error rows is more than max error num, begin to pause job") - .build()); + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) + .add("max_error_num", maxErrorNum) + .add("msg", "current error rows is more than max error num, begin to pause job") + .build()); // if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB if (!isReplay) { // remove all of task in jobs and change job state to paused @@ -749,23 +749,23 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_rows", currentTotalRows) - .add("current_error_rows", currentErrorRows) - .add("max_error_num", maxErrorNum) - .add("msg", "reset current total rows and current error rows " - + "when current total rows is more than base") - .build()); + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) + .add("max_error_num", maxErrorNum) + .add("msg", "reset current total rows and current error rows " + + "when current total rows is more than base") + .build()); } // reset currentTotalNum and currentErrorNum currentErrorRows = 0; currentTotalRows = 0; } else if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_rows", currentTotalRows) - .add("current_error_rows", currentErrorRows) - .add("max_error_num", maxErrorNum) - .add("msg", "current error rows is more than max error rows, begin to pause job") - .build()); + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) + .add("max_error_num", maxErrorNum) + .add("msg", "current error rows is more than max error rows, begin to pause job") + .build()); if (!isReplay) { // remove all of task in jobs and change job state to paused updateState(JobState.PAUSED, @@ -833,9 +833,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserExc public void beforeAborted(TransactionState txnState) throws TransactionException { if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel()) - .add("txn_state", txnState) - .add("msg", "task before aborted") - .build()); + .add("txn_state", txnState) + .add("msg", "task before aborted") + .build()); } executeBeforeCheck(txnState, TransactionStatus.ABORTED); } @@ -847,9 +847,9 @@ public void beforeAborted(TransactionState txnState) throws TransactionException public void beforeCommitted(TransactionState txnState) throws TransactionException { if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel()) - .add("txn_state", txnState) - .add("msg", "task before committed") - .build()); + .add("txn_state", txnState) + .add("msg", "task before committed") + .build()); } executeBeforeCheck(txnState, TransactionStatus.COMMITTED); } @@ -873,8 +873,8 @@ private void executeBeforeCheck(TransactionState txnState, TransactionStatus tra switch (transactionStatus) { case COMMITTED: throw new TransactionException("txn " + txnState.getTransactionId() - + " could not be " + transactionStatus - + " while task " + txnState.getLabel() + " has been aborted."); + + " could not be " + transactionStatus + + " while task " + txnState.getLabel() + " has been aborted."); default: break; } @@ -943,7 +943,7 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) { // so we can find this error and step in. return; } - + writeLock(); try { if (state != JobState.RUNNING) { @@ -1010,9 +1010,9 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String // step1: job state will be changed depending on txnStatusChangeReasonString if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel()) - .add("txn_id", txnState.getTransactionId()) - .add("msg", "txn abort with reason " + txnStatusChangeReasonString) - .build()); + .add("txn_id", txnState.getTransactionId()) + .add("msg", "txn abort with reason " + txnStatusChangeReasonString) + .build()); } ++abortedTaskNum; TransactionState.TxnStatusChangeReason txnStatusChangeReason = null; @@ -1024,7 +1024,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String case OFFSET_OUT_OF_RANGE: case PAUSE: String msg = "be " + taskBeId + " abort task " - + "with reason: " + txnStatusChangeReasonString; + + "with reason: " + txnStatusChangeReasonString; updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg), false /* not replay */); @@ -1041,11 +1041,11 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String } catch (Exception e) { String msg = "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(); updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg), - false /* not replay */); + false /* not replay */); LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("task_id", txnState.getLabel()) - .add("error_msg", "change job state to paused when task has been aborted with error " + e.getMessage()) - .build(), e); + .add("task_id", txnState.getLabel()) + .add("error_msg", "change job state to paused when task has been aborted with error " + e.getMessage()) + .build(), e); } finally { writeUnlock(); LOG.debug("unlock write lock of routine load job after aborted: {}", id); @@ -1070,11 +1070,11 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn if (rlTaskTxnCommitAttachment == null) { if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()) - .add("job_id", routineLoadTaskInfo.getJobId()) - .add("txn_id", routineLoadTaskInfo.getTxnId()) - .add("msg", "commit task will be ignore when attachment txn of task is null," - + " maybe task was aborted by master when timeout") - .build()); + .add("job_id", routineLoadTaskInfo.getJobId()) + .add("txn_id", routineLoadTaskInfo.getTxnId()) + .add("msg", "commit task will be ignore when attachment txn of task is null," + + " maybe task was aborted by master when timeout") + .build()); } } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress @@ -1108,16 +1108,16 @@ protected static void checkMeta(Database db, String tblName, RoutineLoadDesc rou if (table.getType() != Table.TableType.OLAP) { throw new AnalysisException("Only olap table support routine load"); } - + if (routineLoadDesc == null) { return; } - + PartitionNames partitionNames = routineLoadDesc.getPartitionNames(); if (partitionNames == null) { return; } - + // check partitions OlapTable olapTable = (OlapTable) table; olapTable.readLock(); @@ -1149,7 +1149,7 @@ protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boole .add("desire_job_state", jobState) .add("msg", reason) .build()); - + checkStateTransform(jobState); switch (jobState) { case RUNNING: @@ -1179,10 +1179,10 @@ protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boole Catalog.getCurrentCatalog().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_job_state", getState()) - .add("msg", "job state has been changed") - .add("is replay", String.valueOf(isReplay)) - .build()); + .add("current_job_state", getState()) + .add("msg", "job state has been changed") + .add("is replay", String.valueOf(isReplay)) + .build()); } private void executeRunning() { @@ -1221,8 +1221,8 @@ public void update() throws UserException { Database database = Catalog.getCurrentCatalog().getDb(dbId); if (database == null) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("db_id", dbId) - .add("msg", "The database has been deleted. Change job state to cancelled").build()); + .add("db_id", dbId) + .add("msg", "The database has been deleted. Change job state to cancelled").build()); writeLock(); try { if (!state.isFinalState()) { @@ -1240,8 +1240,8 @@ public void update() throws UserException { Table table = database.getTable(tableId); if (table == null) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id).add("db_id", dbId) - .add("table_id", tableId) - .add("msg", "The table has been deleted change job state to cancelled").build()); + .add("table_id", tableId) + .add("msg", "The table has been deleted change job state to cancelled").build()); writeLock(); try { if (!state.isFinalState()) { @@ -1259,8 +1259,8 @@ public void update() throws UserException { try { if (unprotectNeedReschedule()) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("msg", "Job need to be rescheduled") - .build()); + .add("msg", "Job need to be rescheduled") + .build()); unprotectUpdateProgress(); executeNeedSchedule(); } @@ -1282,8 +1282,8 @@ public void setOrigStmt(OriginStatement origStmt) { // check the correctness of commit info protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, - TransactionState txnState, - TransactionState.TxnStatusChangeReason txnStatusChangeReason); + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason); protected abstract String getStatistic(); @@ -1333,6 +1333,82 @@ public List> getTasksShowInfo() { return rows; } + public String getShowCreateInfo() { + Database db = Catalog.getCurrentCatalog().getDb(dbId); + Table tbl = (db == null) ? null : db.getTable(tableId); + StringBuilder sb = new StringBuilder(); + // 1.job_name + sb.append("CREATE ROUTINE LOAD ").append(name); + // 2.tbl_name + sb.append(" ON ").append(tbl == null ? String.valueOf(tableId) : tbl.getName()).append("\n"); + // 3.merge_type + sb.append("WITH ").append(mergeType.toString()).append("\n"); + // 4.load_properties + // 4.1.column_separator + if (columnSeparator != null) { + sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getSeparator()).append("\",\n"); + } + // 4.2.columns_mapping + if (columnDescs != null) { + sb.append("COLUMNS(").append(Joiner.on(",").join(columnDescs.descs)).append("),\n"); + } + // 4.3.where_predicates + if (whereExpr != null) { + sb.append("WHERE ").append(whereExpr.toSql()).append(",\n"); + } + // 4.4.partitions + if (partitions != null) { + sb.append("PARTITION(").append(Joiner.on(",").join(partitions.getPartitionNames())).append("),\n"); + } + // 4.5.delete_on_predicates + if (deleteCondition != null) { + sb.append("DELETE ON ").append(deleteCondition.toSql()).append(",\n"); + } + // 4.6.source_sequence + if (sequenceCol != null) { + sb.append("ORDER BY ").append(sequenceCol).append(",\n"); + } + // 4.7.preceding_predicates + if (precedingFilter != null) { + sb.append("PRECEDING FILTER ").append(precedingFilter.toSql()).append(",\n"); + } + // remove the last , + sb.replace(sb.length() - 2, sb.length() - 1, ""); + // 5.job_properties + sb.append("PROPERTIES\n(\n"); + appendProperties(sb, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false); + appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false); + appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false); + appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false); + appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false); + appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false); + appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false); + appendProperties(sb, PROPS_FORMAT, getFormat(), false); + appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false); + appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false); + appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), true); + sb.append(")\n"); + // 6. data_source + sb.append("FROM ").append(dataSourceType).append("\n"); + // 7. data_source_properties + sb.append("(\n"); + getDataSourceProperties().forEach((k, v) -> appendProperties(sb, k, v, false)); + getCustomProperties().forEach((k, v) -> appendProperties(sb, k, v, false)); + // remove the last , + sb.replace(sb.length() - 2, sb.length() - 1, ""); + sb.append(");"); + return sb.toString(); + } + + public void appendProperties(StringBuilder sb, String key, Object value, boolean end) { + sb.append("\"").append(key).append("\"").append(" = ").append("\"").append(value).append("\""); + if (!end) { + sb.append(",\n"); + } else { + sb.append("\n"); + } + } + public List getShowStatistic() { Database db = Catalog.getCurrentCatalog().getDb(dbId); @@ -1348,9 +1424,9 @@ public List getShowStatistic() { private String getTaskStatistic() { Map result = Maps.newHashMap(); result.put("running_task", - String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count())); + String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count())); result.put("waiting_task", - String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count())); + String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count())); Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(result); } @@ -1385,6 +1461,10 @@ private String jobPropertiesToJsonString() { abstract String customPropertiesJsonToString(); + abstract Map getDataSourceProperties(); + + abstract Map getCustomProperties(); + public boolean needRemove() { if (!isFinal()) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 1782b796cfd4b6..31a2dc4ab8c164 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -1014,7 +1014,8 @@ private static void addReplica(long tabletId, TTabletInfo backendTabletInfo, lon db.getClusterName(), visibleVersion, visibleVersionHash, replicationNum, aliveBeIdsInCluster); - if (status.first == TabletStatus.VERSION_INCOMPLETE || status.first == TabletStatus.REPLICA_MISSING) { + if (status.first == TabletStatus.VERSION_INCOMPLETE || status.first == TabletStatus.REPLICA_MISSING + || status.first == TabletStatus.UNRECOVERABLE) { long lastFailedVersion = -1L; long lastFailedVersionHash = 0L; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 1e9a006e135878..1f793274784434 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -33,6 +33,7 @@ import org.apache.doris.analysis.ShowColumnStmt; import org.apache.doris.analysis.ShowCreateDbStmt; import org.apache.doris.analysis.ShowCreateFunctionStmt; +import org.apache.doris.analysis.ShowCreateRoutineLoadStmt; import org.apache.doris.analysis.ShowCreateTableStmt; import org.apache.doris.analysis.ShowDataStmt; import org.apache.doris.analysis.ShowDbIdStmt; @@ -226,6 +227,8 @@ public ShowResultSet execute() throws AnalysisException { handleShowRoutineLoad(); } else if (stmt instanceof ShowRoutineLoadTaskStmt) { handleShowRoutineLoadTask(); + } else if (stmt instanceof ShowCreateRoutineLoadStmt) { + handleShowCreateRoutineLoad(); } else if (stmt instanceof ShowDeleteStmt) { handleShowDelete(); } else if (stmt instanceof ShowAlterStmt) { @@ -349,7 +352,7 @@ private void handleShowFunctions() throws AnalysisException { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName()); } List functions = showStmt.getIsBuiltin() ? ctx.getCatalog().getBuiltinFunctions() : - db.getFunctions(); + db.getFunctions(); List> rowSet = Lists.newArrayList(); for (Function function : functions) { @@ -383,8 +386,8 @@ private void handleShowFunctions() throws AnalysisException { // Only success ShowResultSetMetaData showMetaData = showStmt.getIsVerbose() ? showStmt.getMetaData() : - ShowResultSetMetaData.builder() - .addColumn(new Column("Function Name", ScalarType.createVarchar(256))).build(); + ShowResultSetMetaData.builder() + .addColumn(new Column("Function Name", ScalarType.createVarchar(256))).build(); resultSet = new ShowResultSet(showMetaData, resultRowSet); } @@ -414,7 +417,7 @@ private void handleShowProc() throws AnalysisException { // if this is superuser, hide ip and host info form backends info proc if (procNode instanceof BackendsProcDir) { if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), - PrivPredicate.OPERATOR)) { + PrivPredicate.OPERATOR)) { // hide host info for (List row : finalRows) { row.remove(BackendsProcDir.HOSTNAME_INDEX); @@ -455,7 +458,7 @@ private void handleShowMigrations() throws AnalysisException { for (BaseParam param : infos) { final int percent = (int) (param.getFloatParam(0) * 100f); rows.add(Lists.newArrayList(param.getStringParam(0), param.getStringParam(1), param.getStringParam(2), - String.valueOf(percent + "%"))); + String.valueOf(percent + "%"))); } resultSet = new ShowResultSet(showStmt.getMetaData(), rows); @@ -543,7 +546,7 @@ private void handleShowDb() throws AnalysisException { PatternMatcher matcher = null; if (showDbStmt.getPattern() != null) { matcher = PatternMatcher.createMysqlPattern(showDbStmt.getPattern(), - CaseSensibility.DATABASE.getCaseSensibility()); + CaseSensibility.DATABASE.getCaseSensibility()); } Set dbNameSet = Sets.newTreeSet(); for (String fullName : dbNames) { @@ -554,7 +557,7 @@ private void handleShowDb() throws AnalysisException { } if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), fullName, - PrivPredicate.SHOW)) { + PrivPredicate.SHOW)) { continue; } @@ -651,7 +654,7 @@ private void handleShowVariables() throws AnalysisException { PatternMatcher matcher = null; if (showStmt.getPattern() != null) { matcher = PatternMatcher.createMysqlPattern(showStmt.getPattern(), - CaseSensibility.VARIABLES.getCaseSensibility()); + CaseSensibility.VARIABLES.getCaseSensibility()); } List> rows = VariableMgr.dump(showStmt.getType(), ctx.getSessionVariable(), matcher); resultSet = new ShowResultSet(showStmt.getMetaData(), rows); @@ -702,7 +705,7 @@ private void handleShowCreateTable() throws AnalysisException { } else { if (showStmt.isView()) { ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_OBJECT, showStmt.getDb(), - showStmt.getTable(), "VIEW"); + showStmt.getTable(), "VIEW"); } rows.add(Lists.newArrayList(table.getName(), createTableStmt.get(0))); resultSet = new ShowResultSet(showStmt.getMetaData(), rows); @@ -868,7 +871,7 @@ private void handleHelp() { } else if (categories.size() > 1) { // Send category list resultSet = new ShowResultSet(helpStmt.getCategoryMetaData(), - Lists.>newArrayList(categories)); + Lists.>newArrayList(categories)); } else { // Send topic list and sub-category list List> rows = Lists.newArrayList(); @@ -899,15 +902,15 @@ private void handleShowLoad() throws AnalysisException { // combine the List of load(v1) and loadManager(v2) Load load = catalog.getLoadInstance(); List> loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(), - showStmt.getLabelValue(), - showStmt.isAccurateMatch(), - showStmt.getStates()); + showStmt.getLabelValue(), + showStmt.isAccurateMatch(), + showStmt.getStates()); Set statesValue = showStmt.getStates() == null ? null : showStmt.getStates().stream() .map(entity -> entity.name()) .collect(Collectors.toSet()); loadInfos.addAll(catalog.getLoadManager().getLoadJobInfosByDb(dbId, showStmt.getLabelValue(), - showStmt.isAccurateMatch(), - statesValue)); + showStmt.isAccurateMatch(), + statesValue)); // order the result of List by orderByPairs in show stmt List orderByPairs = showStmt.getOrderByPairs(); @@ -1047,20 +1050,20 @@ private void handleShowLoadWarnings() throws AnalysisException { if (tableNames.isEmpty()) { // forward compatibility if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), db.getFullName(), - PrivPredicate.SHOW)) { + PrivPredicate.SHOW)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED, - ConnectContext.get().getQualifiedUser(), - db.getFullName()); + ConnectContext.get().getQualifiedUser(), + db.getFullName()); } } else { for (String tblName : tableNames) { if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), db.getFullName(), - tblName, PrivPredicate.SHOW)) { + tblName, PrivPredicate.SHOW)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, - "SHOW LOAD WARNING", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - tblName); + "SHOW LOAD WARNING", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + tblName); } } } @@ -1153,21 +1156,21 @@ private void handleShowRoutineLoad() throws AnalysisException { tableName = routineLoadJob.getTableName(); } catch (MetaNotFoundException e) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("error_msg", "The table metadata of job has been changed. " - + "The job will be cancelled automatically") - .build(), e); + .add("error_msg", "The table metadata of job has been changed. " + + "The job will be cancelled automatically") + .build(), e); } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - dbFullName, - tableName, - PrivPredicate.LOAD)) { + dbFullName, + tableName, + PrivPredicate.LOAD)) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("operator", "show routine load job") - .add("user", ConnectContext.get().getQualifiedUser()) - .add("remote_ip", ConnectContext.get().getRemoteIP()) - .add("db_full_name", dbFullName) - .add("table_name", tableName) - .add("error_msg", "The table access denied")); + .add("operator", "show routine load job") + .add("user", ConnectContext.get().getQualifiedUser()) + .add("remote_ip", ConnectContext.get().getRemoteIP()) + .add("db_full_name", dbFullName) + .add("table_name", tableName) + .add("error_msg", "The table access denied")); continue; } @@ -1179,7 +1182,7 @@ private void handleShowRoutineLoad() throws AnalysisException { if (!Strings.isNullOrEmpty(showRoutineLoadStmt.getName()) && rows.size() == 0) { // if the jobName has been specified throw new AnalysisException("There is no job named " + showRoutineLoadStmt.getName() - + " in db " + showRoutineLoadStmt.getDbFullName() + + " in db " + showRoutineLoadStmt.getDbFullName() + ". Include history? " + showRoutineLoadStmt.isIncludeHistory()); } resultSet = new ShowResultSet(showRoutineLoadStmt.getMetaData(), rows); @@ -1192,14 +1195,14 @@ private void handleShowRoutineLoadTask() throws AnalysisException { RoutineLoadJob routineLoadJob; try { routineLoadJob = Catalog.getCurrentCatalog().getRoutineLoadManager().getJob(showRoutineLoadTaskStmt.getDbFullName(), - showRoutineLoadTaskStmt.getJobName()); + showRoutineLoadTaskStmt.getJobName()); } catch (MetaNotFoundException e) { LOG.warn(e.getMessage(), e); throw new AnalysisException(e.getMessage()); } if (routineLoadJob == null) { throw new AnalysisException("The job named " + showRoutineLoadTaskStmt.getJobName() + "does not exists " - + "or job state is stopped or cancelled"); + + "or job state is stopped or cancelled"); } // check auth @@ -1211,13 +1214,13 @@ private void handleShowRoutineLoadTask() throws AnalysisException { throw new AnalysisException("The table metadata of job has been changed. The job will be cancelled automatically", e); } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - dbFullName, - tableName, - PrivPredicate.LOAD)) { + dbFullName, + tableName, + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - tableName); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + tableName); } // get routine load task info @@ -1263,7 +1266,7 @@ private void handleShowAlter() throws AnalysisException { ProcNodeInterface procNodeI = showStmt.getNode(); Preconditions.checkNotNull(procNodeI); List> rows; - //Only SchemaChangeProc support where/order by/limit syntax + //Only SchemaChangeProc support where/order by/limit syntax if (procNodeI instanceof SchemaChangeProcDir) { rows = ((SchemaChangeProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(), showStmt.getOrderPairs(), showStmt.getLimitElement()).getRows(); @@ -1302,7 +1305,7 @@ private void handleShowPartitions() throws AnalysisException { ProcNodeInterface procNodeI = showStmt.getNode(); Preconditions.checkNotNull(procNodeI); List> rows = ((PartitionsProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(), - showStmt.getOrderByPairs(), showStmt.getLimitElement()).getRows(); + showStmt.getOrderByPairs(), showStmt.getLimitElement()).getRows(); resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } @@ -1383,11 +1386,11 @@ private void handleShowTablet() throws AnalysisException { } while (false); String detailCmd = String.format("SHOW PROC '/dbs/%d/%d/partitions/%d/%d/%d';", - dbId, tableId, partitionId, indexId, tabletId); + dbId, tableId, partitionId, indexId, tabletId); rows.add(Lists.newArrayList(dbName, tableName, partitionName, indexName, - dbId.toString(), tableId.toString(), - partitionId.toString(), indexId.toString(), - isSync.toString(), detailCmd)); + dbId.toString(), tableId.toString(), + partitionId.toString(), indexId.toString(), + isSync.toString(), detailCmd)); } else { Database db = catalog.getDb(showStmt.getDbName()); if (db == null) { @@ -1503,8 +1506,8 @@ private void handleShowResources() { ShowResourcesStmt showStmt = (ShowResourcesStmt) stmt; List> resourcesInfos = Catalog.getCurrentCatalog().getResourceMgr() .getResourcesInfo(showStmt.getNameValue(), - showStmt.isAccurateMatch(), - showStmt.getTypeSet()); + showStmt.isAccurateMatch(), + showStmt.getTypeSet()); // order the result of List by orderByPairs in show stmt List orderByPairs = showStmt.getOrderByPairs(); @@ -1614,16 +1617,13 @@ private void handleShowBackup() throws AnalysisException { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName()); } - AbstractJob jobI = Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId()); - if (!(jobI instanceof BackupJob)) { - resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET); - return; - } + List jobs = Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(), showStmt.getLabelPredicate()); + + List backupJobs = jobs.stream().filter(job -> job instanceof BackupJob) + .map(job -> (BackupJob) job).collect(Collectors.toList()); + + List> infos = backupJobs.stream().map(BackupJob::getInfo).collect(Collectors.toList()); - BackupJob backupJob = (BackupJob) jobI; - List info = backupJob.getInfo(); - List> infos = Lists.newArrayList(); - infos.add(info); resultSet = new ShowResultSet(showStmt.getMetaData(), infos); } @@ -1634,16 +1634,13 @@ private void handleShowRestore() throws AnalysisException { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName()); } - AbstractJob jobI = Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId()); - if (!(jobI instanceof RestoreJob)) { - resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET); - return; - } + List jobs = Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(), showStmt.getLabelPredicate()); + + List restoreJobs = jobs.stream().filter(job -> job instanceof RestoreJob) + .map(job -> (RestoreJob) job).collect(Collectors.toList()); + + List> infos = restoreJobs.stream().map(RestoreJob::getInfo).collect(Collectors.toList()); - RestoreJob restoreJob = (RestoreJob) jobI; - List info = restoreJob.getInfo(); - List> infos = Lists.newArrayList(); - infos.add(info); resultSet = new ShowResultSet(showStmt.getMetaData(), infos); } @@ -1835,6 +1832,58 @@ private void handleShowQueryProfile() throws AnalysisException { resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } + private void handleShowCreateRoutineLoad() throws AnalysisException { + ShowCreateRoutineLoadStmt showCreateRoutineLoadStmt = (ShowCreateRoutineLoadStmt) stmt; + List> rows = Lists.newArrayList(); + String dbName = showCreateRoutineLoadStmt.getDb(); + String labelName = showCreateRoutineLoadStmt.getLabel(); + // if include history return all create load + if (showCreateRoutineLoadStmt.isIncludeHistory()) { + List routineLoadJobList = new ArrayList<>(); + try { + routineLoadJobList = Catalog.getCurrentCatalog().getRoutineLoadManager().getJob(dbName, labelName, true); + } catch (MetaNotFoundException e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, labelName) + .add("error_msg", "Routine load cannot be found by this name") + .build(), e); + } + if (routineLoadJobList == null) { + resultSet = new ShowResultSet(showCreateRoutineLoadStmt.getMetaData(), rows); + return; + } + for (RoutineLoadJob job : routineLoadJobList) { + String tableName = ""; + try { + tableName = job.getTableName(); + } catch (MetaNotFoundException e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, job.getId()) + .add("error_msg", "The table name for this routine load does not exist") + .build(), e); + } + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbName, + tableName, + PrivPredicate.LOAD)) { + resultSet = new ShowResultSet(showCreateRoutineLoadStmt.getMetaData(), rows); + return; + } + rows.add(Lists.newArrayList(String.valueOf(job.getId()), showCreateRoutineLoadStmt.getLabel(), job.getShowCreateInfo())); + } + } else { + // if job exists + RoutineLoadJob routineLoadJob; + try { + routineLoadJob = Catalog.getCurrentCatalog().getRoutineLoadManager().checkPrivAndGetJob(dbName, labelName); + // get routine load info + rows.add(Lists.newArrayList(String.valueOf(routineLoadJob.getId()), showCreateRoutineLoadStmt.getLabel(), routineLoadJob.getShowCreateInfo())); + } catch (MetaNotFoundException | DdlException e) { + LOG.warn(e.getMessage(), e); + throw new AnalysisException(e.getMessage()); + } + } + resultSet = new ShowResultSet(showCreateRoutineLoadStmt.getMetaData(), rows); + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java index 728e9944686f4a..20bbb1acc373aa 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java @@ -17,8 +17,6 @@ package org.apache.doris.analysis; -import mockit.Mock; -import mockit.MockUp; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.util.Util; @@ -27,8 +25,10 @@ import org.apache.doris.qe.VariableMgr; import org.apache.doris.utframe.DorisAssert; import org.apache.doris.utframe.UtFrameUtils; + import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -40,6 +40,9 @@ import java.util.Set; import java.util.UUID; +import mockit.Mock; +import mockit.MockUp; + public class SelectStmtTest { private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; private static DorisAssert dorisAssert; @@ -584,15 +587,27 @@ public void testGetTableRefs() throws Exception { public void testOutfile() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); Config.enable_outfile_to_local = true; - String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;required,byte_array,username;\");"; + String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,col0\");"; dorisAssert.query(sql).explainQuery(); - // must contains schema + // if shema not set, gen schema sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET;"; + try { + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + Assert.assertEquals(1, stmt.getOutFileClause().getSchema().size()); + Assert.assertEquals(Lists.newArrayList("required", "byte_array", "col0"), + stmt.getOutFileClause().getSchema().get(0)); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // schema can not be empty + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"\");"; try { SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("schema is required for parquet file")); + Assert.assertTrue(e.getMessage().contains("Parquet schema property should not be empty")); } + // schema must contains 3 fields sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"int32,siteid;\");"; try { @@ -600,6 +615,7 @@ public void testOutfile() throws Exception { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("must only contains repetition type/column type/column name")); } + // unknown repetition type sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeat, int32,siteid;\");"; try { @@ -607,6 +623,7 @@ public void testOutfile() throws Exception { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("unknown repetition type")); } + // only support required type sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeated,int32,siteid;\");"; try { @@ -614,6 +631,7 @@ public void testOutfile() throws Exception { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("currently only support required type")); } + // unknown data type sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int128,siteid;\");"; try { @@ -621,26 +639,57 @@ public void testOutfile() throws Exception { } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("data type is not supported")); } + // contains parquet properties - sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;\", 'parquet.compression'='snappy');"; + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,siteid;\", 'parquet.compression'='snappy');"; dorisAssert.query(sql).explainQuery(); // support parquet for broker - sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " + + "PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" , " + + "\"schema\"=\"required,byte_array,siteid;\");"; + dorisAssert.query(sql).explainQuery(); // do not support large int type try { - sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " + + "PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" ," + + " \"schema\"=\"required,int32,siteid;\");"; SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type")); + e.printStackTrace(); + Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT")); } // do not support large int type, contains function try { - sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");"; + sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " + + "FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" , " + + "\"schema\"=\"required,int32,siteid;\");"; + SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT")); + } + + // support cast + try { + sql = "SELECT cast(sum(k5) as bigint) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " + + "FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " + + "\"broker.hadoop.security.authentication\" = \"kerberos\", " + + "\"broker.kerberos_principal\" = \"test\", " + + "\"broker.kerberos_keytab_content\" = \"test\" , " + + "\"schema\"=\"required,int64,siteid;\");"; SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type")); + Assert.fail(e.getMessage()); } } }