diff --git a/.github/workflows/release_branches.yml b/.github/workflows/release_branches.yml index 78d989e82047..12e370e00207 100644 --- a/.github/workflows/release_branches.yml +++ b/.github/workflows/release_branches.yml @@ -770,6 +770,141 @@ jobs: python3 -m praktika run 'Install packages (arm_release)' --workflow "ReleaseBranchCI" --ci |& tee ./ci/tmp/job.log fi + stateless_tests_amd_asan_distributed_plan_parallel_1_2: + runs-on: [self-hosted, altinity-on-demand, altinity-func-tester] + needs: [config_workflow, dockers_build_amd, dockers_build_arm, build_amd_asan] + if: ${{ !failure() && !cancelled() && !contains(fromJson(needs.config_workflow.outputs.data).cache_success_base64, 'U3RhdGVsZXNzIHRlc3RzIChhbWRfYXNhbiwgZGlzdHJpYnV0ZWQgcGxhbiwgcGFyYWxsZWwsIDEvMik=') }} + name: "Stateless tests (amd_asan, distributed plan, parallel, 1/2)" + outputs: + data: ${{ steps.run.outputs.DATA }} + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ env.CHECKOUT_REF }} + + - name: Setup + uses: ./.github/actions/runner_setup + - name: Docker setup + uses: ./.github/actions/docker_setup + with: + test_name: "Stateless tests (amd_asan, distributed plan, parallel, 1/2)" + + - name: Prepare env script + run: | + rm -rf ./ci/tmp ./ci/tmp ./ci/tmp + mkdir -p ./ci/tmp ./ci/tmp ./ci/tmp + cat > ./ci/tmp/praktika_setup_env.sh << 'ENV_SETUP_SCRIPT_EOF' + export PYTHONPATH=./ci:.: + cat > ./ci/tmp/workflow_config_releasebranchci.json << 'EOF' + ${{ needs.config_workflow.outputs.data }} + EOF + cat > ./ci/tmp/workflow_status.json << 'EOF' + ${{ toJson(needs) }} + EOF + ENV_SETUP_SCRIPT_EOF + + - name: Run + id: run + run: | + . ./ci/tmp/praktika_setup_env.sh + set -o pipefail + if command -v ts &> /dev/null; then + python3 -m praktika run 'Stateless tests (amd_asan, distributed plan, parallel, 1/2)' --workflow "ReleaseBranchCI" --ci |& ts '[%Y-%m-%d %H:%M:%S]' | tee ./ci/tmp/job.log + else + python3 -m praktika run 'Stateless tests (amd_asan, distributed plan, parallel, 1/2)' --workflow "ReleaseBranchCI" --ci |& tee ./ci/tmp/job.log + fi + + stateless_tests_amd_asan_distributed_plan_parallel_2_2: + runs-on: [self-hosted, altinity-on-demand, altinity-func-tester] + needs: [config_workflow, dockers_build_amd, dockers_build_arm, build_amd_asan] + if: ${{ !failure() && !cancelled() && !contains(fromJson(needs.config_workflow.outputs.data).cache_success_base64, 'U3RhdGVsZXNzIHRlc3RzIChhbWRfYXNhbiwgZGlzdHJpYnV0ZWQgcGxhbiwgcGFyYWxsZWwsIDIvMik=') }} + name: "Stateless tests (amd_asan, distributed plan, parallel, 2/2)" + outputs: + data: ${{ steps.run.outputs.DATA }} + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ env.CHECKOUT_REF }} + + - name: Setup + uses: ./.github/actions/runner_setup + - name: Docker setup + uses: ./.github/actions/docker_setup + with: + test_name: "Stateless tests (amd_asan, distributed plan, parallel, 2/2)" + + - name: Prepare env script + run: | + rm -rf ./ci/tmp ./ci/tmp ./ci/tmp + mkdir -p ./ci/tmp ./ci/tmp ./ci/tmp + cat > ./ci/tmp/praktika_setup_env.sh << 'ENV_SETUP_SCRIPT_EOF' + export PYTHONPATH=./ci:.: + cat > ./ci/tmp/workflow_config_releasebranchci.json << 'EOF' + ${{ needs.config_workflow.outputs.data }} + EOF + cat > ./ci/tmp/workflow_status.json << 'EOF' + ${{ toJson(needs) }} + EOF + ENV_SETUP_SCRIPT_EOF + + - name: Run + id: run + run: | + . ./ci/tmp/praktika_setup_env.sh + set -o pipefail + if command -v ts &> /dev/null; then + python3 -m praktika run 'Stateless tests (amd_asan, distributed plan, parallel, 2/2)' --workflow "ReleaseBranchCI" --ci |& ts '[%Y-%m-%d %H:%M:%S]' | tee ./ci/tmp/job.log + else + python3 -m praktika run 'Stateless tests (amd_asan, distributed plan, parallel, 2/2)' --workflow "ReleaseBranchCI" --ci |& tee ./ci/tmp/job.log + fi + + stateless_tests_amd_asan_distributed_plan_sequential: + runs-on: [self-hosted, altinity-on-demand, altinity-func-tester] + needs: [config_workflow, dockers_build_amd, dockers_build_arm, build_amd_asan] + if: ${{ !failure() && !cancelled() && !contains(fromJson(needs.config_workflow.outputs.data).cache_success_base64, 'U3RhdGVsZXNzIHRlc3RzIChhbWRfYXNhbiwgZGlzdHJpYnV0ZWQgcGxhbiwgc2VxdWVudGlhbCk=') }} + name: "Stateless tests (amd_asan, distributed plan, sequential)" + outputs: + data: ${{ steps.run.outputs.DATA }} + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ env.CHECKOUT_REF }} + + - name: Setup + uses: ./.github/actions/runner_setup + - name: Docker setup + uses: ./.github/actions/docker_setup + with: + test_name: "Stateless tests (amd_asan, distributed plan, sequential)" + + - name: Prepare env script + run: | + rm -rf ./ci/tmp ./ci/tmp ./ci/tmp + mkdir -p ./ci/tmp ./ci/tmp ./ci/tmp + cat > ./ci/tmp/praktika_setup_env.sh << 'ENV_SETUP_SCRIPT_EOF' + export PYTHONPATH=./ci:.: + cat > ./ci/tmp/workflow_config_releasebranchci.json << 'EOF' + ${{ needs.config_workflow.outputs.data }} + EOF + cat > ./ci/tmp/workflow_status.json << 'EOF' + ${{ toJson(needs) }} + EOF + ENV_SETUP_SCRIPT_EOF + + - name: Run + id: run + run: | + . ./ci/tmp/praktika_setup_env.sh + set -o pipefail + if command -v ts &> /dev/null; then + python3 -m praktika run 'Stateless tests (amd_asan, distributed plan, sequential)' --workflow "ReleaseBranchCI" --ci |& ts '[%Y-%m-%d %H:%M:%S]' | tee ./ci/tmp/job.log + else + python3 -m praktika run 'Stateless tests (amd_asan, distributed plan, sequential)' --workflow "ReleaseBranchCI" --ci |& tee ./ci/tmp/job.log + fi + integration_tests_amd_asan_1_4: runs-on: [self-hosted, altinity-on-demand, altinity-func-tester] needs: [config_workflow, dockers_build_amd, dockers_build_arm, build_amd_asan] @@ -1672,7 +1807,7 @@ jobs: finish_workflow: runs-on: [self-hosted, altinity-on-demand, altinity-style-checker-aarch64] - needs: [config_workflow, dockers_build_amd, dockers_build_arm, build_amd_debug, build_amd_release, build_amd_asan, build_amd_tsan, build_amd_msan, build_amd_ubsan, build_arm_release, build_amd_darwin, build_arm_darwin, docker_server_image, docker_keeper_image, install_packages_amd_release, install_packages_arm_release, integration_tests_amd_asan_1_4, integration_tests_amd_asan_2_4, integration_tests_amd_asan_3_4, integration_tests_amd_asan_4_4, integration_tests_amd_asan_old_analyzer_1_6, integration_tests_amd_asan_old_analyzer_2_6, integration_tests_amd_asan_old_analyzer_3_6, integration_tests_amd_asan_old_analyzer_4_6, integration_tests_amd_asan_old_analyzer_5_6, integration_tests_amd_asan_old_analyzer_6_6, integration_tests_amd_tsan_1_6, integration_tests_amd_tsan_2_6, integration_tests_amd_tsan_3_6, integration_tests_amd_tsan_4_6, integration_tests_amd_tsan_5_6, integration_tests_amd_tsan_6_6, stress_test_amd_debug, stress_test_amd_tsan, stress_test_amd_ubsan, stress_test_amd_msan] + needs: [config_workflow, dockers_build_amd, dockers_build_arm, build_amd_debug, build_amd_release, build_amd_asan, build_amd_tsan, build_amd_msan, build_amd_ubsan, build_arm_release, build_amd_darwin, build_arm_darwin, docker_server_image, docker_keeper_image, install_packages_amd_release, install_packages_arm_release, stateless_tests_amd_asan_distributed_plan_parallel_1_2, stateless_tests_amd_asan_distributed_plan_parallel_2_2, stateless_tests_amd_asan_distributed_plan_sequential, integration_tests_amd_asan_1_4, integration_tests_amd_asan_2_4, integration_tests_amd_asan_3_4, integration_tests_amd_asan_4_4, integration_tests_amd_asan_old_analyzer_1_6, integration_tests_amd_asan_old_analyzer_2_6, integration_tests_amd_asan_old_analyzer_3_6, integration_tests_amd_asan_old_analyzer_4_6, integration_tests_amd_asan_old_analyzer_5_6, integration_tests_amd_asan_old_analyzer_6_6, integration_tests_amd_tsan_1_6, integration_tests_amd_tsan_2_6, integration_tests_amd_tsan_3_6, integration_tests_amd_tsan_4_6, integration_tests_amd_tsan_5_6, integration_tests_amd_tsan_6_6, stress_test_amd_debug, stress_test_amd_tsan, stress_test_amd_ubsan, stress_test_amd_msan] if: ${{ !cancelled() }} name: "Finish Workflow" outputs: diff --git a/base/poco/Foundation/include/Poco/Exception.h b/base/poco/Foundation/include/Poco/Exception.h index 59a31e77c3cb..9efd824ebec0 100644 --- a/base/poco/Foundation/include/Poco/Exception.h +++ b/base/poco/Foundation/include/Poco/Exception.h @@ -84,7 +84,7 @@ class Foundation_API Exception : public std::exception /// The copy can later be thrown again by /// invoking rethrow() on it. - virtual void rethrow() const; + [[noreturn]] virtual void rethrow() const; /// (Re)Throws the exception. /// /// This is useful for temporarily storing a diff --git a/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h b/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h index d5263319ed38..604e66da5786 100644 --- a/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h +++ b/base/poco/Net/include/Poco/Net/HTTPChunkedStream.h @@ -45,7 +45,7 @@ namespace Net ~HTTPChunkedStreamBuf(); void close(); - bool isComplete(bool read_from_device_to_check_eof = false); + bool isComplete(bool read_from_device_to_check_eof = false) noexcept; protected: int readFromDevice(char * buffer, std::streamsize length); diff --git a/base/poco/Net/src/HTTPChunkedStream.cpp b/base/poco/Net/src/HTTPChunkedStream.cpp index 043c38ce9e71..ca0b73128ff4 100644 --- a/base/poco/Net/src/HTTPChunkedStream.cpp +++ b/base/poco/Net/src/HTTPChunkedStream.cpp @@ -140,7 +140,7 @@ int HTTPChunkedStreamBuf::readFromDevice(char* buffer, std::streamsize length) } -bool HTTPChunkedStreamBuf::isComplete(bool read_from_device_to_check_eof) +bool HTTPChunkedStreamBuf::isComplete(bool read_from_device_to_check_eof) noexcept { if (read_from_device_to_check_eof) { @@ -150,7 +150,7 @@ bool HTTPChunkedStreamBuf::isComplete(bool read_from_device_to_check_eof) /// "Unexpected EOF" exception would be thrown readFromDevice(nullptr, 0); } - catch (Poco::Net::MessageException &) + catch (...) { return false; } diff --git a/ci/workflows/release_branches.py b/ci/workflows/release_branches.py index 748553296403..fe53cbd718c3 100644 --- a/ci/workflows/release_branches.py +++ b/ci/workflows/release_branches.py @@ -24,6 +24,7 @@ JobConfigs.docker_sever, JobConfigs.docker_keeper, *JobConfigs.install_check_master_jobs, + *[job for job in JobConfigs.functional_tests_jobs if "asan" in job.name], *[ job for job in JobConfigs.integration_test_asan_master_jobs diff --git a/cmake/autogenerated_versions.txt b/cmake/autogenerated_versions.txt index 53f198322b11..ea5027262049 100644 --- a/cmake/autogenerated_versions.txt +++ b/cmake/autogenerated_versions.txt @@ -2,13 +2,13 @@ # NOTE: VERSION_REVISION has nothing common with DBMS_TCP_PROTOCOL_VERSION, # only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes. -SET(VERSION_REVISION 54509) +SET(VERSION_REVISION 54512) SET(VERSION_MAJOR 25) SET(VERSION_MINOR 8) -SET(VERSION_PATCH 9) -SET(VERSION_GITHASH 8a2475033080b4a8d57b7771f52140af663dd4e0) -SET(VERSION_DESCRIBE v25.8.9.20000.altinityantalya) -SET(VERSION_STRING 25.8.9.20000.altinityantalya) +SET(VERSION_PATCH 12) +SET(VERSION_GITHASH fa393206741c830da77b8f1bcf18c753161932c8) +SET(VERSION_DESCRIBE v25.8.12.20000.altinityantalya) +SET(VERSION_STRING 25.8.12.20000.altinityantalya) # end of autochange # This is the 'base' tweak of the version, build scripts will diff --git a/contrib/azure b/contrib/azure index 9e62bd3c7645..0f7a2013f7d7 160000 --- a/contrib/azure +++ b/contrib/azure @@ -1 +1 @@ -Subproject commit 9e62bd3c7645fbf276d37bcf99d9b90230d8efc9 +Subproject commit 0f7a2013f7d79058047fc4bd35e94d20578c0d2b diff --git a/contrib/curl b/contrib/curl index cfbfb65047e8..400fffa90f30 160000 --- a/contrib/curl +++ b/contrib/curl @@ -1 +1 @@ -Subproject commit cfbfb65047e85e6b08af65fe9cdbcf68e9ad496a +Subproject commit 400fffa90f30c7a2dc762fa33009d24851bd2016 diff --git a/contrib/curl-cmake/CMakeLists.txt b/contrib/curl-cmake/CMakeLists.txt index c0332cdb8e57..c616b7d0471e 100644 --- a/contrib/curl-cmake/CMakeLists.txt +++ b/contrib/curl-cmake/CMakeLists.txt @@ -19,6 +19,7 @@ set (SRCS "${LIBRARY_DIR}/lib/cf-h2-proxy.c" "${LIBRARY_DIR}/lib/cf-haproxy.c" "${LIBRARY_DIR}/lib/cf-https-connect.c" + "${LIBRARY_DIR}/lib/cf-ip-happy.c" "${LIBRARY_DIR}/lib/cf-socket.c" "${LIBRARY_DIR}/lib/cfilters.c" "${LIBRARY_DIR}/lib/conncache.c" @@ -27,9 +28,9 @@ set (SRCS "${LIBRARY_DIR}/lib/cookie.c" "${LIBRARY_DIR}/lib/cshutdn.c" "${LIBRARY_DIR}/lib/curl_addrinfo.c" - "${LIBRARY_DIR}/lib/curl_des.c" "${LIBRARY_DIR}/lib/curl_endian.c" "${LIBRARY_DIR}/lib/curl_fnmatch.c" + "${LIBRARY_DIR}/lib/curl_fopen.c" "${LIBRARY_DIR}/lib/curl_get_line.c" "${LIBRARY_DIR}/lib/curl_gethostname.c" "${LIBRARY_DIR}/lib/curl_gssapi.c" @@ -54,7 +55,6 @@ set (SRCS "${LIBRARY_DIR}/lib/fake_addrinfo.c" "${LIBRARY_DIR}/lib/file.c" "${LIBRARY_DIR}/lib/fileinfo.c" - "${LIBRARY_DIR}/lib/fopen.c" "${LIBRARY_DIR}/lib/formdata.c" "${LIBRARY_DIR}/lib/ftp.c" "${LIBRARY_DIR}/lib/ftplistparser.c" @@ -81,7 +81,6 @@ set (SRCS "${LIBRARY_DIR}/lib/idn.c" "${LIBRARY_DIR}/lib/if2ip.c" "${LIBRARY_DIR}/lib/imap.c" - "${LIBRARY_DIR}/lib/krb5.c" "${LIBRARY_DIR}/lib/ldap.c" "${LIBRARY_DIR}/lib/llist.c" "${LIBRARY_DIR}/lib/macos.c" @@ -93,6 +92,7 @@ set (SRCS "${LIBRARY_DIR}/lib/mqtt.c" "${LIBRARY_DIR}/lib/multi.c" "${LIBRARY_DIR}/lib/multi_ev.c" + "${LIBRARY_DIR}/lib/multi_ntfy.c" "${LIBRARY_DIR}/lib/netrc.c" "${LIBRARY_DIR}/lib/noproxy.c" "${LIBRARY_DIR}/lib/openldap.c" @@ -170,16 +170,18 @@ set (SRCS "${LIBRARY_DIR}/lib/vtls/x509asn1.c" "${LIBRARY_DIR}/lib/curlx/base64.c" "${LIBRARY_DIR}/lib/curlx/dynbuf.c" + "${LIBRARY_DIR}/lib/curlx/fopen.c" "${LIBRARY_DIR}/lib/curlx/inet_ntop.c" "${LIBRARY_DIR}/lib/curlx/inet_pton.c" "${LIBRARY_DIR}/lib/curlx/multibyte.c" "${LIBRARY_DIR}/lib/curlx/nonblock.c" + "${LIBRARY_DIR}/lib/curlx/strerr.c" "${LIBRARY_DIR}/lib/curlx/strparse.c" "${LIBRARY_DIR}/lib/curlx/timediff.c" "${LIBRARY_DIR}/lib/curlx/timeval.c" "${LIBRARY_DIR}/lib/curlx/version_win32.c" - "${LIBRARY_DIR}/lib/curlx/warnless.c" "${LIBRARY_DIR}/lib/curlx/wait.c" + "${LIBRARY_DIR}/lib/curlx/warnless.c" "${LIBRARY_DIR}/lib/curlx/winapi.c" ) diff --git a/contrib/icu b/contrib/icu index 4216173eeeb3..b29faa6d4e46 160000 --- a/contrib/icu +++ b/contrib/icu @@ -1 +1 @@ -Subproject commit 4216173eeeb39c1d4caaa54a68860e800412d273 +Subproject commit b29faa6d4e46f10d230b93a3c33885e7ec71bd41 diff --git a/contrib/icu-cmake/CMakeLists.txt b/contrib/icu-cmake/CMakeLists.txt index b2c5923bfe81..46deb3d1bd67 100644 --- a/contrib/icu-cmake/CMakeLists.txt +++ b/contrib/icu-cmake/CMakeLists.txt @@ -40,6 +40,7 @@ set(ICUUC_SOURCES "${ICU_SOURCE_DIR}/common/errorcode.cpp" "${ICU_SOURCE_DIR}/common/filteredbrk.cpp" "${ICU_SOURCE_DIR}/common/filterednormalizer2.cpp" +"${ICU_SOURCE_DIR}/common/fixedstring.cpp" "${ICU_SOURCE_DIR}/common/icudataver.cpp" "${ICU_SOURCE_DIR}/common/icuplug.cpp" "${ICU_SOURCE_DIR}/common/loadednormalizer2impl.cpp" @@ -478,11 +479,11 @@ enable_language(ASM) if (OS_DARWIN) # Fine for both x86 and ARM - set(ICUDATA_SOURCE_FILE "${ICUDATA_SOURCE_DIR}/darwin_x86_64/icudt75l_dat.S") + set(ICUDATA_SOURCE_FILE "${ICUDATA_SOURCE_DIR}/darwin/icudt78l_dat.S") elseif (ARCH_S390X) - set(ICUDATA_SOURCE_FILE "${ICUDATA_SOURCE_DIR}/icudt75b_dat.S") + set(ICUDATA_SOURCE_FILE "${ICUDATA_SOURCE_DIR}/icudt78b_dat.S") else () - set(ICUDATA_SOURCE_FILE "${ICUDATA_SOURCE_DIR}/icudt75l_dat.S") + set(ICUDATA_SOURCE_FILE "${ICUDATA_SOURCE_DIR}/icudt78l_dat.S") endif () # ^^ you might be confused how for different little endian platforms (x86, ARM) the same assembly files can be used. # These files are indeed assembly but they only contain data ('.long' directive), which makes them portable accross CPUs. diff --git a/contrib/icudata b/contrib/icudata index cfc05b4c3140..e3ae5bcb2b24 160000 --- a/contrib/icudata +++ b/contrib/icudata @@ -1 +1 @@ -Subproject commit cfc05b4c3140ff2be84291b80de8c62b1e42d0da +Subproject commit e3ae5bcb2b24f17cd9336c4f1b25f36ed636d839 diff --git a/contrib/libarchive b/contrib/libarchive index 9525f90ca4bd..7f53fce04e4e 160000 --- a/contrib/libarchive +++ b/contrib/libarchive @@ -1 +1 @@ -Subproject commit 9525f90ca4bd14c7b335e2f8c84a4607b0af6bdf +Subproject commit 7f53fce04e4e672230f4eb80b219af17975e4f83 diff --git a/contrib/libarchive-cmake/config.h b/contrib/libarchive-cmake/config.h index b82e1afa61cc..309251100cd9 100644 --- a/contrib/libarchive-cmake/config.h +++ b/contrib/libarchive-cmake/config.h @@ -334,16 +334,16 @@ typedef uint64_t uintmax_t; /* #undef ARCHIVE_XATTR_LINUX */ /* Version number of bsdcpio */ -#define BSDCPIO_VERSION_STRING "3.7.4" +#define BSDCPIO_VERSION_STRING "3.9.0" /* Version number of bsdtar */ -#define BSDTAR_VERSION_STRING "3.7.4" +#define BSDTAR_VERSION_STRING "3.9.0" /* Version number of bsdcat */ -#define BSDCAT_VERSION_STRING "3.7.4" +#define BSDCAT_VERSION_STRING "3.9.0" /* Version number of bsdunzip */ -#define BSDUNZIP_VERSION_STRING "3.7.4" +#define BSDUNZIP_VERSION_STRING "3.9.0" /* Define to 1 if you have the `acl_create_entry' function. */ /* #undef HAVE_ACL_CREATE_ENTRY */ @@ -405,6 +405,12 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the `chroot' function. */ #define HAVE_CHROOT 1 +/* Define to 1 if you have the `closefrom' function. */ +/* #undef HAVE_CLOSEFROM */ + +/* Define to 1 if you have the `close_range' function. */ +/* #undef HAVE_CLOSE_RANGE */ + /* Define to 1 if you have the header file. */ /* #undef HAVE_COPYFILE_H */ diff --git a/docs/en/sql-reference/statements/grant.md b/docs/en/sql-reference/statements/grant.md index 9b0ea66d21b6..11a8a655cfaa 100644 --- a/docs/en/sql-reference/statements/grant.md +++ b/docs/en/sql-reference/statements/grant.md @@ -680,6 +680,23 @@ GRANT READ ON S3('s3://foo/.*') TO john GRANT READ ON S3('s3://bar/.*') TO john ``` +:::warning +Source filter takes **regexp** as a parameter, so a grant +`GRANT READ ON URL('http://www.google.com') TO john;` + +will allow queries +```sql +SELECT * FROM url('https://www.google.com'); +SELECT * FROM url('https://www-google.com'); +``` + +because `.` is treated as an `Any Single Character` in the regexps. +This may lead to potential vulnerability. The correct grant should be +```sql +GRANT READ ON URL('https://www\.google\.com') TO john; +``` +::: + **Re-granting with GRANT OPTION:** If the original grant has `WITH GRANT OPTION`, it can be re-granted using `GRANT CURRENT GRANTS`: diff --git a/packages/clickhouse-keeper.postinstall b/packages/clickhouse-keeper.postinstall index 2b4f303b6849..aab4596be52f 100644 --- a/packages/clickhouse-keeper.postinstall +++ b/packages/clickhouse-keeper.postinstall @@ -28,6 +28,25 @@ if [ "$1" = configure ] || [ -n "$not_deb_os" ]; then "${KEEPER_USER}" fi + if [ -x "/bin/systemctl" ] && [ -f /lib/systemd/system/clickhouse-keeper.service ] && [ -d /run/systemd/system ]; then + # if old rc.d service present - remove it + if [ -x "/etc/init.d/clickhouse-keeper" ] && [ -x "/usr/sbin/update-rc.d" ]; then + /usr/sbin/update-rc.d clickhouse-keeper remove + fi + + /bin/systemctl daemon-reload + /bin/systemctl enable clickhouse-keeper + else + # If you downgrading to version older than 1.1.54336 run: systemctl disable clickhouse-keeper + if [ -x "/etc/init.d/clickhouse-keeper" ]; then + if [ -x "/usr/sbin/update-rc.d" ]; then + /usr/sbin/update-rc.d clickhouse-keeper defaults 19 19 >/dev/null || exit $? + else + echo # Other OS + fi + fi + fi + chown -R "${KEEPER_USER}:${KEEPER_GROUP}" "${KEEPER_CONFDIR}" chmod 0755 "${KEEPER_CONFDIR}" diff --git a/src/Access/Authentication.h b/src/Access/Authentication.h index b4ba11c71ea3..2c73aa96698c 100644 --- a/src/Access/Authentication.h +++ b/src/Access/Authentication.h @@ -38,6 +38,9 @@ struct Authentication explicit Require(const String & realm_); const String & getRealm() const; + Require * clone() const override { return new Require(*this); } + void rethrow() const override { throw *this; } /// NOLINT(cert-err60-cpp) + private: const String realm; }; diff --git a/src/AggregateFunctions/AggregateFunctionGroupArray.cpp b/src/AggregateFunctions/AggregateFunctionGroupArray.cpp index 578099706632..2371477c084a 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArray.cpp +++ b/src/AggregateFunctions/AggregateFunctionGroupArray.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -464,7 +465,10 @@ struct GroupArrayNodeGeneral : public GroupArrayNodeBase return node; } - void insertInto(IColumn & column) { std::ignore = column.deserializeAndInsertAggregationStateValueFromArena(data()); } + void insertInto(IColumn & column) + { + deserializeAndInsert({data(), size}, column); + } }; template diff --git a/src/AggregateFunctions/AggregateFunctionGroupArrayIntersect.cpp b/src/AggregateFunctions/AggregateFunctionGroupArrayIntersect.cpp index a0008f09d24d..6305add32200 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArrayIntersect.cpp +++ b/src/AggregateFunctions/AggregateFunctionGroupArrayIntersect.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -341,10 +342,7 @@ class AggregateFunctionGroupArrayIntersectGeneric final for (auto & elem : set) { - if constexpr (is_plain_column) - data_to.insertData(elem.getValue().data, elem.getValue().size); - else - std::ignore = data_to.deserializeAndInsertAggregationStateValueFromArena(elem.getValue().data); + deserializeAndInsert(elem.getValue(), data_to); } } }; diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionDistinct.h b/src/AggregateFunctions/Combinators/AggregateFunctionDistinct.h index ea8c928d6a90..99507db6b12b 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionDistinct.h @@ -138,9 +138,10 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi Set::LookupResult it; bool inserted; history.emplace(ArenaKeyHolder{value, *arena}, it, inserted); - const char * pos = it->getValue().data; + ReadBufferFromString in({it->getValue().data, it->getValue().size}); + /// Multiple columns are serialized one by one for (auto & column : argument_columns) - pos = column->deserializeAndInsertAggregationStateValueFromArena(pos); + column->deserializeAndInsertAggregationStateValueFromArena(in); } } } diff --git a/src/AggregateFunctions/DDSketch.h b/src/AggregateFunctions/DDSketch.h index 16b6318181cc..7c2b042699a3 100644 --- a/src/AggregateFunctions/DDSketch.h +++ b/src/AggregateFunctions/DDSketch.h @@ -1,49 +1,49 @@ #pragma once -#include // for std::unique_ptr -#include -#include #include -#include +#include // for std::unique_ptr #include -#include -#include - +#include #include #include -#include +#include +#include +#include +#include +#include namespace DB { namespace ErrorCodes { - extern const int BAD_ARGUMENTS; - extern const int INCORRECT_DATA; +extern const int BAD_ARGUMENTS; +extern const int INCORRECT_DATA; } class DDSketchDenseLogarithmic { public: explicit DDSketchDenseLogarithmic(Float64 relative_accuracy = 0.01) - : mapping(std::make_unique(relative_accuracy)), - store(std::make_unique()), - negative_store(std::make_unique()), - zero_count(0.0), - count(0.0) + : mapping(std::make_unique(relative_accuracy)) + , store(std::make_unique()) + , negative_store(std::make_unique()) + , zero_count(0.0) + , count(0.0) { } - DDSketchDenseLogarithmic(std::unique_ptr mapping_, - std::unique_ptr store_, - std::unique_ptr negative_store_, - Float64 zero_count_) - : mapping(std::move(mapping_)), - store(std::move(store_)), - negative_store(std::move(negative_store_)), - zero_count(zero_count_), - count(store->count + negative_store->count + zero_count_) + DDSketchDenseLogarithmic( + std::unique_ptr mapping_, + std::unique_ptr store_, + std::unique_ptr negative_store_, + Float64 zero_count_) + : mapping(std::move(mapping_)) + , store(std::move(store_)) + , negative_store(std::move(negative_store_)) + , zero_count(zero_count_) + , count(store->count + negative_store->count + zero_count_) { } @@ -97,7 +97,11 @@ class DDSketchDenseLogarithmic return quantile_value; } - void copy(const DDSketchDenseLogarithmic& other) + Float64 getGamma() const { return mapping->getGamma(); } + + Float64 getCount() const { return count; } + + void copy(const DDSketchDenseLogarithmic & other) { Float64 rel_acc = (other.mapping->getGamma() - 1) / (other.mapping->getGamma() + 1); mapping = std::make_unique(rel_acc); @@ -109,9 +113,9 @@ class DDSketchDenseLogarithmic count = other.count; } - void merge(const DDSketchDenseLogarithmic& other) + void merge(const DDSketchDenseLogarithmic & other) { - if (mapping->getGamma() != other.mapping->getGamma()) + if (*mapping != *other.mapping) { // modify the one with higher precision to match the one with lower precision if (mapping->getGamma() > other.mapping->getGamma()) @@ -147,7 +151,7 @@ class DDSketchDenseLogarithmic /// NOLINTBEGIN(readability-static-accessed-through-instance) - void serialize(WriteBuffer& buf) const + void serialize(WriteBuffer & buf) const { // Write the mapping writeBinary(enc.FlagIndexMappingBaseLogarithmic.byte, buf); @@ -165,7 +169,7 @@ class DDSketchDenseLogarithmic writeBinary(zero_count, buf); } - void deserialize(ReadBuffer& buf) + void deserialize(ReadBuffer & buf) { // Read the mapping UInt8 flag = 0; @@ -219,7 +223,7 @@ class DDSketchDenseLogarithmic auto new_positive_store = std::make_unique(); auto new_negative_store = std::make_unique(); - auto remap_store = [this, &new_mapping](DDSketchDenseStore& old_store, std::unique_ptr& target_store) + auto remap_store = [this, &new_mapping](DDSketchDenseStore & old_store, std::unique_ptr & target_store) { for (int i = 0; i < old_store.length(); ++i) { diff --git a/src/AggregateFunctions/DDSketch/DDSketchEncoding.h b/src/AggregateFunctions/DDSketch/DDSketchEncoding.h index 477bc3f54495..64dc8c0e55ed 100644 --- a/src/AggregateFunctions/DDSketch/DDSketchEncoding.h +++ b/src/AggregateFunctions/DDSketch/DDSketchEncoding.h @@ -1,7 +1,6 @@ #pragma once -#include -#include +#include /** * An encoded DDSketch comprises multiple contiguous blocks (sequences of bytes). @@ -36,7 +35,10 @@ class DDSketchEncoding { public: UInt8 byte; - Flag(UInt8 t, UInt8 s) : byte(t | s) { } + Flag(UInt8 t, UInt8 s) + : byte(t | s) + { + } [[maybe_unused]] UInt8 Type() const { return byte & flagTypeMask; } [[maybe_unused]] UInt8 SubFlag() const { return byte & subFlagMask; } }; diff --git a/src/AggregateFunctions/DDSketch/Mapping.h b/src/AggregateFunctions/DDSketch/Mapping.h index 0d1ff785d59d..0f4d939f8f56 100644 --- a/src/AggregateFunctions/DDSketch/Mapping.h +++ b/src/AggregateFunctions/DDSketch/Mapping.h @@ -1,26 +1,29 @@ #pragma once -#include #include -#include #include +#include #include +#include #include +#include +#include namespace DB { namespace ErrorCodes { - extern const int BAD_ARGUMENTS; +extern const int BAD_ARGUMENTS; } class DDSketchLogarithmicMapping { public: explicit DDSketchLogarithmicMapping(Float64 relative_accuracy_, Float64 offset_ = 0.0) - : relative_accuracy(relative_accuracy_), offset(offset_) + : relative_accuracy(relative_accuracy_) + , offset(offset_) { if (relative_accuracy <= 0 || relative_accuracy >= 1) { @@ -44,48 +47,40 @@ class DDSketchLogarithmicMapping return static_cast(logGamma(value) + offset); } - Float64 value(int key) const - { - return lowerBound(key) * (1 + relative_accuracy); - } + Float64 value(int key) const { return lowerBound(key) * (1 + relative_accuracy); } - Float64 logGamma(Float64 value) const - { - return std::log(value) * multiplier; - } + Float64 logGamma(Float64 value) const { return std::log(value) * multiplier; } - Float64 powGamma(Float64 value) const - { - return std::exp(value / multiplier); - } + Float64 powGamma(Float64 value) const { return std::exp(value / multiplier); } - Float64 lowerBound(int index) const - { - return powGamma(static_cast(index) - offset); - } + Float64 lowerBound(int index) const { return powGamma(static_cast(index) - offset); } - Float64 getGamma() const - { - return gamma; - } + Float64 getGamma() const { return gamma; } - Float64 getMinPossible() const - { - return min_possible; - } + Float64 getMinPossible() const { return min_possible; } - [[maybe_unused]] Float64 getMaxPossible() const + [[maybe_unused]] Float64 getMaxPossible() const { return max_possible; } + + bool operator==(const DDSketchLogarithmicMapping & other) const { - return max_possible; + if (gamma == other.gamma) + { + return true; + } + + auto [min, max] = std::minmax(gamma, other.gamma); + const Float64 difference = max - min; + const Float64 acceptable = (std::nextafter(min, max) - min) * min; + return difference <= acceptable; } - void serialize(WriteBuffer& buf) const + void serialize(WriteBuffer & buf) const { writeBinary(gamma, buf); writeBinary(offset, buf); } - void deserialize(ReadBuffer& buf) + void deserialize(ReadBuffer & buf) { readBinary(gamma, buf); readBinary(offset, buf); diff --git a/src/AggregateFunctions/DDSketch/Store.h b/src/AggregateFunctions/DDSketch/Store.h index 0e499e445d2a..594746f73e10 100644 --- a/src/AggregateFunctions/DDSketch/Store.h +++ b/src/AggregateFunctions/DDSketch/Store.h @@ -1,13 +1,15 @@ #pragma once -#include -#include #include #include +#include +#include +#include #include +#include #include -#include +#include // We start with 128 bins and grow the number of bins by 128 @@ -18,6 +20,11 @@ constexpr UInt32 CHUNK_SIZE = 128; namespace DB { +namespace ErrorCodes +{ +extern const int INCORRECT_DATA; +} + class DDSketchDenseStore { public: @@ -27,9 +34,12 @@ class DDSketchDenseStore int offset = 0; std::vector bins; - explicit DDSketchDenseStore(UInt32 chunk_size_ = CHUNK_SIZE) : chunk_size(chunk_size_) {} + explicit DDSketchDenseStore(UInt32 chunk_size_ = CHUNK_SIZE) + : chunk_size(chunk_size_) + { + } - void copy(DDSketchDenseStore* other) + void copy(DDSketchDenseStore * other) { bins = other->bins; count = other->count; @@ -38,10 +48,7 @@ class DDSketchDenseStore offset = other->offset; } - int length() const - { - return static_cast(bins.size()); - } + int length() const { return static_cast(bins.size()); } void add(int key, Float64 weight) { @@ -64,9 +71,10 @@ class DDSketchDenseStore return max_key; } - void merge(DDSketchDenseStore* other) + void merge(DDSketchDenseStore * other) { - if (other->count == 0) return; + if (other->count == 0) + return; if (count == 0) { @@ -89,9 +97,8 @@ class DDSketchDenseStore /// NOLINTBEGIN(readability-static-accessed-through-instance) - void serialize(WriteBuffer& buf) const + void serialize(WriteBuffer & buf) const { - // Calculate the size of the dense and sparse encodings to choose the smallest one UInt64 num_bins = 0; UInt64 num_non_empty_bins = 0; @@ -144,8 +151,10 @@ class DDSketchDenseStore } } - void deserialize(ReadBuffer& buf) + void deserialize(ReadBuffer & buf) { + count = 0; + UInt8 encoding_mode; readBinary(encoding_mode, buf); if (encoding_mode == enc.BinEncodingContiguousCounts) @@ -165,7 +174,7 @@ class DDSketchDenseStore start_key += index_delta; } } - else + else if (encoding_mode == enc.BinEncodingIndexDeltasAndCounts) { UInt64 num_non_empty_bins; readVarUInt(num_non_empty_bins, buf); @@ -180,6 +189,10 @@ class DDSketchDenseStore add(previous_index, bin_count); } } + else + { + throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid flag for encoding mode"); + } } /// NOLINTEND(readability-static-accessed-through-instance) diff --git a/src/AggregateFunctions/KeyHolderHelpers.h b/src/AggregateFunctions/KeyHolderHelpers.h index f19c9b7a7cb1..d1a78e855c0b 100644 --- a/src/AggregateFunctions/KeyHolderHelpers.h +++ b/src/AggregateFunctions/KeyHolderHelpers.h @@ -1,12 +1,18 @@ #pragma once -#include #include +#include +#include namespace DB { struct Settings; +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + template static auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena) { @@ -29,7 +35,14 @@ static void deserializeAndInsert(StringRef str, IColumn & data_to) if constexpr (is_plain_column) data_to.insertData(str.data, str.size); else - std::ignore = data_to.deserializeAndInsertAggregationStateValueFromArena(str.data); + { + ReadBufferFromString in({str.data, str.size}); + data_to.deserializeAndInsertAggregationStateValueFromArena(in); + if (!in.eof()) + { + throw Exception(ErrorCodes::INCORRECT_DATA, "Extra bytes ({}) found after deserializing aggregation state", in.available()); + } + } } } diff --git a/src/AggregateFunctions/tests/gtest_ddsketch.cpp b/src/AggregateFunctions/tests/gtest_ddsketch.cpp new file mode 100644 index 000000000000..69c01d81fcd1 --- /dev/null +++ b/src/AggregateFunctions/tests/gtest_ddsketch.cpp @@ -0,0 +1,89 @@ +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +TEST(DDSketch, MergeDifferentGammasWithoutSegfault) +{ + using namespace DB; + + DDSketchDenseLogarithmic lhs{}; + DDSketchDenseLogarithmic rhs{}; + + /* + { + "mapping": { + "gamma": 2.0, + "index_offset": 0.0, + "interpolation": 0 + }, + "positive_values": { + "bin_counts": {}, + "contiguous_bin_counts": [ + 1.0 + ], + "contiguous_bin_index_offset": -8 + }, + "negative_values": { + "bin_counts": {}, + "contiguous_bin_counts": [], + "contiguous_bin_index_offset": 0 + }, + "zero_count": 0.0 + } + */ + std::string lhs_data = base64Decode("AgAAAAAAAABAAAAAAAAAAAABDAEPAgAAAAAAAPA/AwwA/v///w8CBAAAAAAAAAAA"); + ReadBufferFromString lhs_buffer{lhs_data}; + lhs.deserialize(lhs_buffer); + + ASSERT_DOUBLE_EQ(lhs.getCount(), 1); + ASSERT_DOUBLE_EQ(lhs.getGamma(), 2.0); + + /* + { + "mapping": { + "gamma": 1.4142135623730951, + "index_offset": 0.0, + "interpolation": 0 + }, + "positive_values": { + "bin_counts": {}, + "contiguous_bin_counts": [ + 1.0 + ], + "contiguous_bin_index_offset": -18 + }, + "negative_values": { + "bin_counts": {}, + "contiguous_bin_counts": [], + "contiguous_bin_index_offset": 0 + }, + "zero_count": 0.0 + } + */ + std::string rhs_data = base64Decode("As07f2aeoPY/AAAAAAAAAAABDAEjAgAAAAAAAPA/AwwA/v///w8CBAAAAAAAAAAA"); + ReadBufferFromString rhs_buffer{rhs_data}; + rhs.deserialize(rhs_buffer); + + ASSERT_DOUBLE_EQ(rhs.getCount(), 1); + ASSERT_DOUBLE_EQ(rhs.getGamma(), 1.4142135623730951); + + lhs.merge(rhs); + std::vector merge_buffer; + + WriteBufferFromVector> writer{merge_buffer}; + lhs.serialize(writer); + + ReadBufferFromMemory reader{merge_buffer.data(), merge_buffer.size()}; + ASSERT_NO_THROW(rhs.deserialize(reader)); + + ASSERT_FLOAT_EQ(rhs.getCount(), 2); + ASSERT_DOUBLE_EQ(rhs.getGamma(), 2.0); +} diff --git a/src/Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.cpp b/src/Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.cpp index 9e1c21375fae..5293ba0bf860 100644 --- a/src/Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.cpp +++ b/src/Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.cpp @@ -13,6 +13,7 @@ namespace Setting { extern const SettingsBool group_by_use_nulls; extern const SettingsBool optimize_injective_functions_in_group_by; + extern const SettingsBool allow_suspicious_types_in_group_by; } namespace @@ -88,7 +89,8 @@ class OptimizeGroupByInjectiveFunctionsVisitor : public InDepthQueryTreeVisitorW // Aggregate functions are not allowed in GROUP BY clause auto function = function_node->getFunctionOrThrow(); - bool can_be_eliminated = function->isInjective(function_node->getArgumentColumns()); + auto arguments = function_node->getArgumentColumns(); + bool can_be_eliminated = function->isInjective(arguments) && isValidGroupByKeyTypes(arguments); if (can_be_eliminated) { @@ -106,6 +108,29 @@ class OptimizeGroupByInjectiveFunctionsVisitor : public InDepthQueryTreeVisitorW grouping_set = std::move(new_group_by_keys); } + + bool isValidGroupByKeyTypes(const ColumnsWithTypeAndName & columns) const + { + if (getContext()->getSettingsRef()[Setting::allow_suspicious_types_in_group_by]) + return true; + + bool is_valid = true; + auto check = [&](const IDataType & type) + { + /// Dynamic and Variant types are not allowed in GROUP BY by default. + is_valid &= !isDynamic(type) && !isVariant(type); + }; + + for (const auto & column : columns) + { + check(*column.type); + column.type->forEachChild(check); + if (!is_valid) + break; + } + + return is_valid; + } }; } diff --git a/src/Analyzer/SetUtils.cpp b/src/Analyzer/SetUtils.cpp index 82554ccf256e..1f1e6e649297 100644 --- a/src/Analyzer/SetUtils.cpp +++ b/src/Analyzer/SetUtils.cpp @@ -42,16 +42,13 @@ size_t getCompoundTypeDepth(const IDataType & type) else if (which_type.isTuple()) { const auto & tuple_elements = assert_cast(*current_type).getElements(); + ++result; if (!tuple_elements.empty()) current_type = tuple_elements.at(0).get(); else { - /// Special case: tuple with no element - tuple(). In this case, what's the compound type depth? - /// I'm not certain about the theoretical answer, but from experiment, 1 is the most reasonable choice. - return 1; + break; } - - ++result; } else { @@ -162,7 +159,8 @@ ColumnsWithTypeAndName getSetElementsForConstantValue(const DataTypePtr & expres DataTypes set_element_types = {expression_type}; const auto * lhs_tuple_type = typeid_cast(expression_type.get()); - if (lhs_tuple_type && lhs_tuple_type->getElements().size() != 1) + /// Do not unpack if empty tuple or single element tuple + if (lhs_tuple_type && lhs_tuple_type->getElements().size() > 1) set_element_types = lhs_tuple_type->getElements(); for (auto & set_element_type : set_element_types) diff --git a/src/Backups/BackupCoordinationStageSync.cpp b/src/Backups/BackupCoordinationStageSync.cpp index 60b06be31226..b86c4309e104 100644 --- a/src/Backups/BackupCoordinationStageSync.cpp +++ b/src/Backups/BackupCoordinationStageSync.cpp @@ -115,7 +115,8 @@ BackupCoordinationStageSync::BackupCoordinationStageSync( , failure_after_host_disconnected_for_seconds(with_retries.getKeeperSettings().failure_after_host_disconnected_for_seconds) , finish_timeout_after_error(with_retries.getKeeperSettings().finish_timeout_after_error) , sync_period_ms(with_retries.getKeeperSettings().sync_period_ms) - , max_attempts_after_bad_version(with_retries.getKeeperSettings().max_attempts_after_bad_version) + // all_hosts.size() is added to max_attempts_after_bad_version since each host change the num_hosts node once, and it's a valid case + , max_attempts_after_bad_version(with_retries.getKeeperSettings().max_attempts_after_bad_version + all_hosts.size()) , zookeeper_path(zookeeper_path_) , root_zookeeper_path(zookeeper_path.parent_path().parent_path()) , operation_zookeeper_path(zookeeper_path.parent_path()) diff --git a/src/Backups/BackupIO_AzureBlobStorage.cpp b/src/Backups/BackupIO_AzureBlobStorage.cpp index cdc6b194f84a..7d95ba062740 100644 --- a/src/Backups/BackupIO_AzureBlobStorage.cpp +++ b/src/Backups/BackupIO_AzureBlobStorage.cpp @@ -22,70 +22,12 @@ namespace fs = std::filesystem; namespace DB { + namespace ErrorCodes { extern const int LOGICAL_ERROR; } -/// This function compares the authorization methods used to access AzureBlobStorage -/// It takes 2 variables of variant type as input and checks if they are the same type and value -static bool compareAuthMethod (AzureBlobStorage::AuthMethod auth_method_a, AzureBlobStorage::AuthMethod auth_method_b) -{ - const auto * conn_string_a = std::get_if(&auth_method_a); - const auto * conn_string_b = std::get_if(&auth_method_b); - - if (conn_string_a && conn_string_b) - { - return *conn_string_a == *conn_string_b; - } - - const auto * shared_key_a = std::get_if>(&auth_method_a); - const auto * shared_key_b = std::get_if>(&auth_method_b); - - if (shared_key_a && shared_key_b) - { - return (shared_key_a->get()->AccountName == shared_key_b->get()->AccountName); - } - - try - { - const auto * workload_identity_a = std::get_if>(&auth_method_a); - const auto * workload_identity_b = std::get_if>(&auth_method_b); - - if (workload_identity_a && workload_identity_b) - { - Azure::Core::Credentials::TokenRequestContext tokenRequestContext; - return workload_identity_a->get()->GetToken(tokenRequestContext, {}).Token == workload_identity_b->get()->GetToken(tokenRequestContext, {}).Token; - } - - const auto * managed_identity_a = std::get_if>(&auth_method_a); - const auto * managed_identity_b = std::get_if>(&auth_method_b); - - if (managed_identity_a && managed_identity_b) - { - Azure::Core::Credentials::TokenRequestContext tokenRequestContext; - return managed_identity_a->get()->GetToken(tokenRequestContext, {}).Token == managed_identity_b->get()->GetToken(tokenRequestContext, {}).Token; - } - - const auto * static_credential_a = std::get_if>(&auth_method_a); - const auto * static_credential_b = std::get_if>(&auth_method_b); - - if (static_credential_a && static_credential_b) - { - Azure::Core::Credentials::TokenRequestContext tokenRequestContext; - auto az_context = Azure::Core::Context(); - return static_credential_a->get()->GetToken(tokenRequestContext, az_context).Token == static_credential_b->get()->GetToken(tokenRequestContext, az_context).Token; - } - } - catch (const Azure::Core::Credentials::AuthenticationException & e) - { - /// This is added to catch exception from GetToken. We want to log & fail silently i.e return false so that we can fallback to read & copy (i.e not native copy) - LOG_DEBUG(getLogger("compareAuthMethod"), "Exception caught while comparing credentials, error = {}", e.what()); - return false; - } - return false; -} - BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage( const AzureBlobStorage::ConnectionParams & connection_params_, const String & blob_path_, @@ -166,7 +108,7 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup, /* dest_path */ dst_blob_path[0], settings, read_settings, - compareAuthMethod(connection_params.auth_method, destination_disk->getObjectStorage()->getAzureBlobStorageAuthMethod()), + std::optional(), threadPoolCallbackRunnerUnsafe(getBackupsIOThreadPool().get(), "BackupRDAzure")); return file_size; @@ -233,7 +175,7 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk( /// In this case we can't use the native copy. if (auto src_blob_path = src_disk->getBlobPath(src_path); src_blob_path.size() == 2) { - LOG_TRACE(log, "Copying file {} from disk {} to AzureBlobStorag", src_path, src_disk->getName()); + LOG_TRACE(log, "Copying file {} from disk {} to AzureBlobStorage", src_path, src_disk->getName()); copyAzureBlobStorageFile( src_disk->getObjectStorage()->getAzureBlobStorageClient(), client, @@ -245,7 +187,7 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk( fs::path(blob_path) / path_in_backup, settings, read_settings, - compareAuthMethod(src_disk->getObjectStorage()->getAzureBlobStorageAuthMethod(), connection_params.auth_method), + std::optional(), threadPoolCallbackRunnerUnsafe(getBackupsIOThreadPool().get(), "BackupWRAzure")); return; /// copied! } @@ -269,7 +211,7 @@ void BackupWriterAzureBlobStorage::copyFile(const String & destination, const St /* dest_path */ destination, settings, read_settings, - true, + std::optional(), threadPoolCallbackRunnerUnsafe(getBackupsIOThreadPool().get(), "BackupWRAzure")); } diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 1738ca4607f4..51cf22a12eee 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -37,7 +37,7 @@ namespace Setting extern const SettingsUInt64 s3_max_connections; extern const SettingsUInt64 s3_max_redirects; extern const SettingsBool s3_slow_all_threads_after_network_error; - extern const SettingsBool s3_slow_all_threads_after_retryable_error; + extern const SettingsBool backup_slow_all_threads_after_retryable_s3_error; } namespace S3AuthSetting @@ -76,6 +76,36 @@ namespace ErrorCodes namespace { +class S3BackupClientCreator +{ +public: + explicit S3BackupClientCreator(const ContextPtr & context) + { + const Settings & local_settings = context->getSettingsRef(); + retry_strategy = S3::PocoHTTPClientConfiguration::RetryStrategy{ + .max_retries = static_cast(local_settings[Setting::backup_restore_s3_retry_attempts]), + .initial_delay_ms = static_cast(local_settings[Setting::backup_restore_s3_retry_initial_backoff_ms]), + .max_delay_ms = static_cast(local_settings[Setting::backup_restore_s3_retry_max_backoff_ms]), + .jitter_factor = local_settings[Setting::backup_restore_s3_retry_jitter_factor]}; + slow_all_threads_after_retryable_error = local_settings[Setting::backup_slow_all_threads_after_retryable_s3_error]; + } + + S3BackupDiskClientFactory::Entry operator()(DiskPtr disk) const + { + auto disk_client = disk->getS3StorageClient(); + + auto config = disk_client->getClientConfiguration(); + config.retry_strategy = retry_strategy; + config.s3_slow_all_threads_after_retryable_error = slow_all_threads_after_retryable_error; + + return {disk_client->cloneWithConfigurationOverride(config), disk_client}; + } + +private: + S3::PocoHTTPClientConfiguration::RetryStrategy retry_strategy; + bool slow_all_threads_after_retryable_error = false; +}; + std::shared_ptr makeS3Client( const S3::URI & s3_uri, const String & access_key_id, @@ -114,7 +144,7 @@ namespace .jitter_factor = local_settings[Setting::backup_restore_s3_retry_jitter_factor]}, local_settings[Setting::s3_slow_all_threads_after_network_error], - local_settings[Setting::s3_slow_all_threads_after_retryable_error], + local_settings[Setting::backup_slow_all_threads_after_retryable_s3_error], local_settings[Setting::enable_s3_requests_logging], /* for_disk_s3 = */ false, /* opt_disk_name = */ {}, @@ -180,6 +210,31 @@ namespace } +S3BackupDiskClientFactory::S3BackupDiskClientFactory(const S3BackupDiskClientFactory::CreateFn & create_fn_) + : create_fn(create_fn_) +{ +} + +std::shared_ptr S3BackupDiskClientFactory::getOrCreate(DiskPtr disk) +{ + std::lock_guard lock(clients_mutex); + + auto [it, inserted] = clients.try_emplace(disk->getName(), Entry{}); + auto log = getLogger("S3BackupDiskClientFactory"); + auto & entry = it->second; + if (inserted) + LOG_TRACE(log, "Creating S3 client for copy from disk '{}' to backup bucket", disk->getName()); + else if (const_pointer_cast(entry.disk_reported_client.lock()) != disk->getS3StorageClient()) + LOG_INFO( + log, "Updating S3 client for copy from disk '{}' to the backup bucket because the disk client was updated", disk->getName()); + + while (const_pointer_cast(entry.disk_reported_client.lock()) != disk->getS3StorageClient()) + entry = create_fn(disk); + + chassert(entry.backup_client); + return entry.backup_client; +} + BackupReaderS3::BackupReaderS3( const S3::URI & s3_uri_, const String & access_key_id_, @@ -278,7 +333,6 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s BackupReaderDefault::copyFileToDisk(path_in_backup, file_size, encrypted_in_backup, destination_disk, destination_path, write_mode); } - BackupWriterS3::BackupWriterS3( const S3::URI & s3_uri_, const String & access_key_id_, @@ -295,6 +349,7 @@ BackupWriterS3::BackupWriterS3( , s3_uri(s3_uri_) , data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false, ""} , s3_capabilities(getCapabilitiesFromConfig(context_->getConfigRef(), "s3")) + , disk_client_factory(S3BackupClientCreator(context_)) { s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef()); @@ -331,8 +386,9 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src if (auto blob_path = src_disk->getBlobPath(src_path); blob_path.size() == 2) { LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName()); + /// Use storage client with overridden retry strategy settings. copyS3File( - src_disk->getS3StorageClient(), + /* src_s3_client */ disk_client_factory.getOrCreate(src_disk), /* src_bucket */ blob_path[1], /* src_key= */ blob_path[0], start_pos, diff --git a/src/Backups/BackupIO_S3.h b/src/Backups/BackupIO_S3.h index 6cedf04a67af..c8a3575243e7 100644 --- a/src/Backups/BackupIO_S3.h +++ b/src/Backups/BackupIO_S3.h @@ -6,15 +6,39 @@ #include #include #include +#include #include #include #include #include #include +#include + + namespace DB { +class S3BackupDiskClientFactory +{ +public: + struct Entry + { + std::shared_ptr backup_client; + std::weak_ptr disk_reported_client; + }; + using CreateFn = std::function; + explicit S3BackupDiskClientFactory(const CreateFn & create_fn_); + std::shared_ptr getOrCreate(DiskPtr disk); + +private: + const CreateFn create_fn; + + mutable std::mutex clients_mutex; + /// Disk name to client entry; + std::unordered_map clients TSA_GUARDED_BY(clients_mutex); +}; + /// Represents a backup stored to AWS S3. class BackupReaderS3 : public BackupReaderDefault { @@ -87,6 +111,7 @@ class BackupWriterS3 : public BackupWriterDefault S3Settings s3_settings; std::shared_ptr client; S3Capabilities s3_capabilities; + S3BackupDiskClientFactory disk_client_factory; BlobStorageLogWriterPtr blob_storage_log; }; diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 7ef943acdde8..8fe062107fe7 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -339,6 +339,9 @@ class LocalFormatError : public Exception { public: using Exception::Exception; + + LocalFormatError * clone() const override { return new LocalFormatError(*this); } + void rethrow() const override { throw *this; } /// NOLINT(cert-err60-cpp) }; diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 232c19bdc5b3..74d39468c0d4 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -1000,7 +1000,7 @@ void Connection::sendCancel() { /// If we already disconnected. if (!out) - return; + throw Exception(ErrorCodes::NETWORK_ERROR, "Connection to {} terminated", getDescription()); writeVarUInt(Protocol::Client::Cancel, *out); out->finishChunk(); diff --git a/src/Columns/ColumnAggregateFunction.cpp b/src/Columns/ColumnAggregateFunction.cpp index 988e45c62d4a..461a391501ca 100644 --- a/src/Columns/ColumnAggregateFunction.cpp +++ b/src/Columns/ColumnAggregateFunction.cpp @@ -609,7 +609,7 @@ StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & are return out.complete(); } -const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char * src_arena) +void ColumnAggregateFunction::deserializeAndInsertFromArena(ReadBuffer & in) { ensureOwnership(); @@ -618,21 +618,10 @@ const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char * */ Arena & dst_arena = createOrGetArena(); pushBackAndCreateState(data, dst_arena, func.get()); - - /** We will read from src_arena. - * There is no limit for reading - it is assumed, that we can read all that we need after src_arena pointer. - * Buf ReadBufferFromMemory requires some bound. We will use arbitrary big enough number, that will not overflow pointer. - * NOTE Technically, this is not compatible with C++ standard, - * as we cannot legally compare pointers after last element + 1 of some valid memory region. - * Probably this will not work under UBSan. - */ - ReadBufferFromMemory read_buffer(src_arena, std::numeric_limits::max() - src_arena - 1); - func->deserialize(data.back(), read_buffer, version, &dst_arena); - - return read_buffer.position(); + func->deserialize(data.back(), in, version, &dst_arena); } -const char * ColumnAggregateFunction::skipSerializedInArena(const char *) const +void ColumnAggregateFunction::skipSerializedInArena(ReadBuffer &) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method skipSerializedInArena is not supported for {}", getName()); } diff --git a/src/Columns/ColumnAggregateFunction.h b/src/Columns/ColumnAggregateFunction.h index 75f3c2f28f1f..ab17001d677b 100644 --- a/src/Columns/ColumnAggregateFunction.h +++ b/src/Columns/ColumnAggregateFunction.h @@ -173,9 +173,9 @@ class ColumnAggregateFunction final : public COWHelper #include #include // memcpy +#include namespace DB @@ -296,39 +297,35 @@ std::optional ColumnArray::getSerializedValueSize(size_t n) const } -const char * ColumnArray::deserializeAndInsertFromArena(const char * pos) +void ColumnArray::deserializeAndInsertFromArena(ReadBuffer & in) { - size_t array_size = unalignedLoad(pos); - pos += sizeof(array_size); + size_t array_size; + readBinaryLittleEndian(array_size, in); for (size_t i = 0; i < array_size; ++i) - pos = getData().deserializeAndInsertFromArena(pos); + getData().deserializeAndInsertFromArena(in); getOffsets().push_back(getOffsets().back() + array_size); - return pos; } -const char * ColumnArray::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnArray::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { - size_t array_size = unalignedLoad(pos); - pos += sizeof(array_size); + size_t array_size; + readBinaryLittleEndian(array_size, in); for (size_t i = 0; i < array_size; ++i) - pos = getData().deserializeAndInsertAggregationStateValueFromArena(pos); + getData().deserializeAndInsertAggregationStateValueFromArena(in); getOffsets().push_back(getOffsets().back() + array_size); - return pos; } -const char * ColumnArray::skipSerializedInArena(const char * pos) const +void ColumnArray::skipSerializedInArena(ReadBuffer & in) const { - size_t array_size = unalignedLoad(pos); - pos += sizeof(array_size); + size_t array_size; + readBinaryLittleEndian(array_size, in); for (size_t i = 0; i < array_size; ++i) - pos = getData().skipSerializedInArena(pos); - - return pos; + getData().skipSerializedInArena(in); } void ColumnArray::updateHashWithValue(size_t n, SipHash & hash) const diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index 6f59eb85d84a..d84827eb384a 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -82,9 +82,9 @@ class ColumnArray final : public COWHelper, ColumnArr StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; char * serializeValueIntoMemory(size_t, char * memory) const override; std::optional getSerializedValueSize(size_t n) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override; WeakHash32 getWeakHash32() const override; void updateHashFast(SipHash & hash) const override; diff --git a/src/Columns/ColumnBLOB.h b/src/Columns/ColumnBLOB.h index 64356d2376a8..c3f13e8f019f 100644 --- a/src/Columns/ColumnBLOB.h +++ b/src/Columns/ColumnBLOB.h @@ -163,8 +163,8 @@ class ColumnBLOB : public COWHelper, ColumnBLOB> void popBack(size_t) override { throwInapplicable(); } StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwInapplicable(); } char * serializeValueIntoMemory(size_t, char *) const override { throwInapplicable(); } - const char * deserializeAndInsertFromArena(const char *) override { throwInapplicable(); } - const char * skipSerializedInArena(const char *) const override { throwInapplicable(); } + void deserializeAndInsertFromArena(ReadBuffer &) override { throwInapplicable(); } + void skipSerializedInArena(ReadBuffer &) const override { throwInapplicable(); } void updateHashWithValue(size_t, SipHash &) const override { throwInapplicable(); } WeakHash32 getWeakHash32() const override { throwInapplicable(); } void updateHashFast(SipHash &) const override { throwInapplicable(); } diff --git a/src/Columns/ColumnCompressed.h b/src/Columns/ColumnCompressed.h index 0f07148f143e..63cc32299301 100644 --- a/src/Columns/ColumnCompressed.h +++ b/src/Columns/ColumnCompressed.h @@ -99,8 +99,8 @@ class ColumnCompressed : public COWHelper, Colum void popBack(size_t) override { throwMustBeDecompressed(); } StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeDecompressed(); } char * serializeValueIntoMemory(size_t, char *) const override { throwMustBeDecompressed(); } - const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); } - const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); } + void deserializeAndInsertFromArena(ReadBuffer &) override { throwMustBeDecompressed(); } + void skipSerializedInArena(ReadBuffer &) const override { throwMustBeDecompressed(); } void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); } WeakHash32 getWeakHash32() const override { throwMustBeDecompressed(); } void updateHashFast(SipHash &) const override { throwMustBeDecompressed(); } diff --git a/src/Columns/ColumnConst.h b/src/Columns/ColumnConst.h index bfde9ca7ddff..95c696d586db 100644 --- a/src/Columns/ColumnConst.h +++ b/src/Columns/ColumnConst.h @@ -184,17 +184,23 @@ class ColumnConst final : public COWHelper, ColumnCon return data->serializeValueIntoMemory(0, memory); } - const char * deserializeAndInsertFromArena(const char * pos) override + void deserializeAndInsertFromArena(ReadBuffer & in) override { - const auto * res = data->deserializeAndInsertFromArena(pos); + data->deserializeAndInsertFromArena(in); data->popBack(1); ++s; - return res; } - const char * skipSerializedInArena(const char * pos) const override + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override { - return data->skipSerializedInArena(pos); + data->deserializeAndInsertAggregationStateValueFromArena(in); + data->popBack(1); + ++s; + } + + void skipSerializedInArena(ReadBuffer & in) const override + { + data->skipSerializedInArena(in); } void updateHashWithValue(size_t, SipHash & hash) const override diff --git a/src/Columns/ColumnDecimal.cpp b/src/Columns/ColumnDecimal.cpp index 51b37a381f14..cb37ca914a29 100644 --- a/src/Columns/ColumnDecimal.cpp +++ b/src/Columns/ColumnDecimal.cpp @@ -78,16 +78,17 @@ Float64 ColumnDecimal::getFloat64(size_t n) const } template -const char * ColumnDecimal::deserializeAndInsertFromArena(const char * pos) +void ColumnDecimal::deserializeAndInsertFromArena(ReadBuffer & in) { - data.push_back(unalignedLoad(pos)); - return pos + sizeof(T); + T dec; + readBinaryLittleEndian(dec, in); + data.push_back(std::move(dec)); } template -const char * ColumnDecimal::skipSerializedInArena(const char * pos) const +void ColumnDecimal::skipSerializedInArena(ReadBuffer & in) const { - return pos + sizeof(T); + in.ignore(sizeof(T)); } template diff --git a/src/Columns/ColumnDecimal.h b/src/Columns/ColumnDecimal.h index 482d063fe5dc..72943a696d51 100644 --- a/src/Columns/ColumnDecimal.h +++ b/src/Columns/ColumnDecimal.h @@ -103,8 +103,8 @@ class ColumnDecimal final : public COWHelper, Col Float64 getFloat64(size_t n) const final; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override; WeakHash32 getWeakHash32() const override; void updateHashFast(SipHash & hash) const override; diff --git a/src/Columns/ColumnDynamic.cpp b/src/Columns/ColumnDynamic.cpp index eff44e53735a..a00b1a44ba18 100644 --- a/src/Columns/ColumnDynamic.cpp +++ b/src/Columns/ColumnDynamic.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +24,7 @@ namespace DB namespace ErrorCodes { + extern const int ATTEMPT_TO_READ_AFTER_EOF; extern const int LOGICAL_ERROR; extern const int PARAMETER_OUT_OF_BOUND; } @@ -769,22 +771,25 @@ StringRef ColumnDynamic::serializeValueIntoArena(size_t n, Arena & arena, const return res; } -const char * ColumnDynamic::deserializeAndInsertFromArena(const char * pos) +void ColumnDynamic::deserializeAndInsertFromArena(ReadBuffer & in) { auto & variant_col = getVariantColumn(); - UInt8 null_bit = unalignedLoad(pos); - pos += sizeof(UInt8); + UInt8 null_bit; + readBinaryLittleEndian(null_bit, in); if (null_bit) { insertDefault(); - return pos; + return; } /// Read variant type and value in binary format. - const size_t type_and_value_size = unalignedLoad(pos); - pos += sizeof(type_and_value_size); - std::string_view type_and_value(pos, type_and_value_size); - pos += type_and_value_size; + size_t type_and_value_size; + readBinaryLittleEndian(type_and_value_size, in); + if (in.available() < type_and_value_size) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after eof when deserializing ColumnDynamic"); + + std::string_view type_and_value(in.position(), type_and_value_size); + in.ignore(type_and_value_size); ReadBufferFromMemory buf(type_and_value.data(), type_and_value.size()); auto variant_type = decodeDataType(buf); @@ -809,21 +814,18 @@ const char * ColumnDynamic::deserializeAndInsertFromArena(const char * pos) variant_col.getLocalDiscriminators().push_back(variant_col.localDiscriminatorByGlobal(getSharedVariantDiscriminator())); variant_col.getOffsets().push_back(shared_variant.size() - 1); } - - return pos; } -const char * ColumnDynamic::skipSerializedInArena(const char * pos) const +void ColumnDynamic::skipSerializedInArena(ReadBuffer & in) const { - UInt8 null_bit = unalignedLoad(pos); - pos += sizeof(UInt8); + UInt8 null_bit; + readBinaryLittleEndian(null_bit, in); if (null_bit) - return pos; + return; - const size_t type_and_value_size = unalignedLoad(pos); - pos += sizeof(type_and_value_size); - pos += type_and_value_size; - return pos; + size_t type_and_value_size; + readBinaryLittleEndian(type_and_value_size, in); + in.ignore(type_and_value_size); } void ColumnDynamic::updateHashWithValue(size_t n, SipHash & hash) const diff --git a/src/Columns/ColumnDynamic.h b/src/Columns/ColumnDynamic.h index d174e887fb20..f46b6a465287 100644 --- a/src/Columns/ColumnDynamic.h +++ b/src/Columns/ColumnDynamic.h @@ -194,8 +194,8 @@ class ColumnDynamic final : public COWHelper, Colum } StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; std::optional getSerializedValueSize(size_t) const override { return std::nullopt; } void updateHashWithValue(size_t n, SipHash & hash) const override; diff --git a/src/Columns/ColumnFixedString.cpp b/src/Columns/ColumnFixedString.cpp index 817d94a0d2f7..0954922542a6 100644 --- a/src/Columns/ColumnFixedString.cpp +++ b/src/Columns/ColumnFixedString.cpp @@ -119,17 +119,16 @@ void ColumnFixedString::insertData(const char * pos, size_t length) memset(chars.data() + old_size + length, 0, n - length); } -const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos) +void ColumnFixedString::deserializeAndInsertFromArena(ReadBuffer & in) { size_t old_size = chars.size(); chars.resize(old_size + n); - memcpy(chars.data() + old_size, pos, n); - return pos + n; + in.readStrict(reinterpret_cast(chars.data() + old_size), n); } -const char * ColumnFixedString::skipSerializedInArena(const char * pos) const +void ColumnFixedString::skipSerializedInArena(ReadBuffer & in) const { - return pos + n; + in.ignore(n); } void ColumnFixedString::updateHashWithValue(size_t index, SipHash & hash) const diff --git a/src/Columns/ColumnFixedString.h b/src/Columns/ColumnFixedString.h index 7812a2bcd647..795bed3dd2d6 100644 --- a/src/Columns/ColumnFixedString.h +++ b/src/Columns/ColumnFixedString.h @@ -137,9 +137,9 @@ class ColumnFixedString final : public COWHelper, Col throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName()); } - const char * deserializeAndInsertFromArena(const char *) override + void deserializeAndInsertFromArena(ReadBuffer &) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot deserialize to {}", getName()); } - const char * skipSerializedInArena(const char*) const override + void skipSerializedInArena(ReadBuffer &) const override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot skip serialized {}", getName()); } diff --git a/src/Columns/ColumnLazy.cpp b/src/Columns/ColumnLazy.cpp index a9f8c7eafc39..d01e63ac05c9 100644 --- a/src/Columns/ColumnLazy.cpp +++ b/src/Columns/ColumnLazy.cpp @@ -152,12 +152,12 @@ void ColumnLazy::popBack(size_t) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method popBack is not supported for {}", getName()); } -const char * ColumnLazy::deserializeAndInsertFromArena(const char *) +void ColumnLazy::deserializeAndInsertFromArena(ReadBuffer &) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method deserializeAndInsertFromArena is not supported for {}", getName()); } -const char * ColumnLazy::skipSerializedInArena(const char *) const +void ColumnLazy::skipSerializedInArena(ReadBuffer &) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method skipSerializedInArena is not supported for {}", getName()); } diff --git a/src/Columns/ColumnLazy.h b/src/Columns/ColumnLazy.h index e540af1b0095..2a4826c264b7 100644 --- a/src/Columns/ColumnLazy.h +++ b/src/Columns/ColumnLazy.h @@ -91,8 +91,8 @@ class ColumnLazy final : public COWHelper void insertDefault() override; void popBack(size_t n) override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override; WeakHash32 getWeakHash32() const override; void updateHashFast(SipHash & hash) const override; diff --git a/src/Columns/ColumnLowCardinality.cpp b/src/Columns/ColumnLowCardinality.cpp index 1db462151d32..964b43652b8a 100644 --- a/src/Columns/ColumnLowCardinality.cpp +++ b/src/Columns/ColumnLowCardinality.cpp @@ -240,6 +240,8 @@ static void checkPositionsAreLimited(const IColumn & positions, UInt64 limit) const auto & data = column_ptr->getData(); size_t num_rows = data.size(); + if (num_rows == 0) + return true; UInt64 max_position = 0; for (size_t i = 0; i < num_rows; ++i) max_position = std::max(max_position, data[i]); @@ -307,29 +309,21 @@ void ColumnLowCardinality::collectSerializedValueSizes(PaddedPODArray & idx.collectSerializedValueSizes(sizes, dict_sizes); } -const char * ColumnLowCardinality::deserializeAndInsertFromArena(const char * pos) +void ColumnLowCardinality::deserializeAndInsertFromArena(ReadBuffer & in) { compactIfSharedDictionary(); - - const char * new_pos; - idx.insertPosition(getDictionary().uniqueDeserializeAndInsertFromArena(pos, new_pos)); - - return new_pos; + idx.insertPosition(getDictionary().uniqueDeserializeAndInsertFromArena(in)); } -const char * ColumnLowCardinality::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnLowCardinality::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { compactIfSharedDictionary(); - - const char * new_pos; - idx.insertPosition(getDictionary().uniqueDeserializeAndInsertAggregationStateValueFromArena(pos, new_pos)); - - return new_pos; + idx.insertPosition(getDictionary().uniqueDeserializeAndInsertAggregationStateValueFromArena(in)); } -const char * ColumnLowCardinality::skipSerializedInArena(const char * pos) const +void ColumnLowCardinality::skipSerializedInArena(ReadBuffer & in) const { - return getDictionary().skipSerializedInArena(pos); + getDictionary().skipSerializedInArena(in); } WeakHash32 ColumnLowCardinality::getWeakHash32() const @@ -350,7 +344,7 @@ MutableColumnPtr ColumnLowCardinality::cloneResized(size_t size) const if (size == 0) unique_ptr = unique_ptr->cloneEmpty(); - return ColumnLowCardinality::create(IColumn::mutate(std::move(unique_ptr)), getIndexes().cloneResized(size)); + return ColumnLowCardinality::create(IColumn::mutate(std::move(unique_ptr)), getIndexes().cloneResized(size), /*is_shared=*/false); } MutableColumnPtr ColumnLowCardinality::cloneNullable() const @@ -584,7 +578,7 @@ std::vector ColumnLowCardinality::scatter(ColumnIndex num_colu for (auto & column : columns) { auto unique_ptr = dictionary.getColumnUniquePtr(); - column = ColumnLowCardinality::create(IColumn::mutate(std::move(unique_ptr)), std::move(column)); + column = ColumnLowCardinality::create(IColumn::mutate(std::move(unique_ptr)), std::move(column), /*is_shared=*/false); } return columns; @@ -603,7 +597,7 @@ ColumnLowCardinality::MutablePtr ColumnLowCardinality::cutAndCompact(size_t star { auto sub_positions = IColumn::mutate(idx.getPositions()->cut(start, length)); auto new_column_unique = Dictionary::compact(getDictionary(), sub_positions); - return ColumnLowCardinality::create(std::move(new_column_unique), std::move(sub_positions)); + return ColumnLowCardinality::create(std::move(new_column_unique), std::move(sub_positions), /*is_shared=*/false); } void ColumnLowCardinality::compactInplace() diff --git a/src/Columns/ColumnLowCardinality.h b/src/Columns/ColumnLowCardinality.h index 54a1ee52a212..429c58d8603c 100644 --- a/src/Columns/ColumnLowCardinality.h +++ b/src/Columns/ColumnLowCardinality.h @@ -28,7 +28,7 @@ class ColumnLowCardinality final : public COWHelper, ColumnLowCardinality>; - ColumnLowCardinality(MutableColumnPtr && column_unique, MutableColumnPtr && indexes, bool is_shared = false); + ColumnLowCardinality(MutableColumnPtr && column_unique, MutableColumnPtr && indexes, bool is_shared); ColumnLowCardinality(const ColumnLowCardinality & other) = default; public: @@ -36,12 +36,12 @@ class ColumnLowCardinality final : public COWHelper, ColumnLowCardinality>; - static Ptr create(const ColumnPtr & column_unique_, const ColumnPtr & indexes_, bool is_shared = false) + static Ptr create(const ColumnPtr & column_unique_, const ColumnPtr & indexes_, bool is_shared) { return ColumnLowCardinality::create(column_unique_->assumeMutable(), indexes_->assumeMutable(), is_shared); } - static MutablePtr create(MutableColumnPtr && column_unique, MutableColumnPtr && indexes, bool is_shared = false) + static MutablePtr create(MutableColumnPtr && column_unique, MutableColumnPtr && indexes, bool is_shared) { return Base::create(std::move(column_unique), std::move(indexes), is_shared); } @@ -75,7 +75,7 @@ class ColumnLowCardinality final : public COWHelper & sizes, const UInt8 * is_null) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; - const char * skipSerializedInArena(const char * pos) const override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override { @@ -123,7 +123,8 @@ class ColumnLowCardinality final : public COWHelper scatter(ColumnIndex num_columns, const Selector & selector) const override; diff --git a/src/Columns/ColumnMap.cpp b/src/Columns/ColumnMap.cpp index 6ef1278b254f..7471b7b6a276 100644 --- a/src/Columns/ColumnMap.cpp +++ b/src/Columns/ColumnMap.cpp @@ -170,19 +170,19 @@ std::optional ColumnMap::getSerializedValueSize(size_t n) const return nested->getSerializedValueSize(n); } -const char * ColumnMap::deserializeAndInsertFromArena(const char * pos) +void ColumnMap::deserializeAndInsertFromArena(ReadBuffer & in) { - return nested->deserializeAndInsertFromArena(pos); + nested->deserializeAndInsertFromArena(in); } -const char * ColumnMap::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnMap::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { - return nested->deserializeAndInsertAggregationStateValueFromArena(pos); + nested->deserializeAndInsertAggregationStateValueFromArena(in); } -const char * ColumnMap::skipSerializedInArena(const char * pos) const +void ColumnMap::skipSerializedInArena(ReadBuffer & in) const { - return nested->skipSerializedInArena(pos); + nested->skipSerializedInArena(in); } void ColumnMap::updateHashWithValue(size_t n, SipHash & hash) const diff --git a/src/Columns/ColumnMap.h b/src/Columns/ColumnMap.h index 9a71714aa615..79caab2b07a0 100644 --- a/src/Columns/ColumnMap.h +++ b/src/Columns/ColumnMap.h @@ -60,9 +60,9 @@ class ColumnMap final : public COWHelper, ColumnMap> StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; char * serializeValueIntoMemory(size_t n, char * memory) const override; std::optional getSerializedValueSize(size_t n) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override; WeakHash32 getWeakHash32() const override; void updateHashFast(SipHash & hash) const override; diff --git a/src/Columns/ColumnNullable.cpp b/src/Columns/ColumnNullable.cpp index 072926203cca..010722971a5d 100644 --- a/src/Columns/ColumnNullable.cpp +++ b/src/Columns/ColumnNullable.cpp @@ -225,45 +225,39 @@ std::optional ColumnNullable::getSerializedValueSize(size_t n) const return 1 + *nested_size; /// +1 for null mask byte. } -const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos) +void ColumnNullable::deserializeAndInsertFromArena(ReadBuffer & in) { - UInt8 val = unalignedLoad(pos); - pos += sizeof(val); + UInt8 val; + readBinaryLittleEndian(val, in); getNullMapData().push_back(val); if (val == 0) - pos = getNestedColumn().deserializeAndInsertFromArena(pos); + getNestedColumn().deserializeAndInsertFromArena(in); else getNestedColumn().insertDefault(); - - return pos; } -const char * ColumnNullable::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnNullable::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { - UInt8 val = unalignedLoad(pos); - pos += sizeof(val); + UInt8 val; + readBinaryLittleEndian(val, in); getNullMapData().push_back(val); if (val == 0) - pos = getNestedColumn().deserializeAndInsertAggregationStateValueFromArena(pos); + getNestedColumn().deserializeAndInsertAggregationStateValueFromArena(in); else getNestedColumn().insertDefault(); - - return pos; } -const char * ColumnNullable::skipSerializedInArena(const char * pos) const +void ColumnNullable::skipSerializedInArena(ReadBuffer & in) const { - UInt8 val = unalignedLoad(pos); - pos += sizeof(val); + UInt8 val; + readBinaryLittleEndian(val, in); if (val == 0) - return getNestedColumn().skipSerializedInArena(pos); - - return pos; + getNestedColumn().skipSerializedInArena(in); } #if !defined(DEBUG_OR_SANITIZER_BUILD) diff --git a/src/Columns/ColumnNullable.h b/src/Columns/ColumnNullable.h index 12398b782ad7..a0b80c03305c 100644 --- a/src/Columns/ColumnNullable.h +++ b/src/Columns/ColumnNullable.h @@ -70,9 +70,9 @@ class ColumnNullable final : public COWHelper, Col StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; char * serializeValueIntoMemory(size_t n, char * memory) const override; std::optional getSerializedValueSize(size_t n) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; #if !defined(DEBUG_OR_SANITIZER_BUILD) void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; #else diff --git a/src/Columns/ColumnObject.cpp b/src/Columns/ColumnObject.cpp index 12c7bd54550a..2d158c4896bf 100644 --- a/src/Columns/ColumnObject.cpp +++ b/src/Columns/ColumnObject.cpp @@ -14,6 +14,7 @@ namespace DB namespace ErrorCodes { + extern const int ATTEMPT_TO_READ_AFTER_EOF; extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; } @@ -729,7 +730,31 @@ void ColumnObject::insertFromSharedDataAndFillRemainingDynamicPaths(const DB::Co { /// Deserialize binary value into dynamic column from shared data. if (it->second->size() != current_size) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of dynamic path {}: {} != {}. It may indicate duplicated data for this path", path, it->second->size(), current_size); + { + if (src_object_column.getDynamicPaths().contains(path)) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Path {} is present both in shared data and in dynamic paths at row {}. Dynamic path value type: {}. Shared data path value type: {}", + path, + row, + src_object_column.getDynamicPathsPtrs().at(toString(path))->getTypeNameAt(row), + decodeDataType(src_shared_data_values->getDataAt(i).toString())->getName()); + + for (size_t j = offset; j != end; ++j) + { + if (j != i && src_shared_data_paths->getDataAt(j).toView() == path) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Path {} is duplicated inside shared data at offsets {} and {}. First value type: {}. Second value type: {}", + path, + i, + j, + decodeDataType(src_shared_data_values->getDataAt(i).toString())->getName(), + decodeDataType(src_shared_data_values->getDataAt(j).toString())->getName()); + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of dynamic path {}: {} != {}", path, it->second->size(), current_size); + } deserializeValueFromSharedData(src_shared_data_values, i, *it->second); } else @@ -984,59 +1009,73 @@ void ColumnObject::serializePathAndValueIntoArena(DB::Arena & arena, const char res.size += sizeof(size_t) + path_size + sizeof(size_t) + value_size; } -const char * ColumnObject::deserializeAndInsertFromArena(const char * pos) +void ColumnObject::deserializeAndInsertFromArena(ReadBuffer & in) { /// First deserialize typed paths. They come first. for (auto path : sorted_typed_paths) - pos = typed_paths.find(path)->second->deserializeAndInsertFromArena(pos); + typed_paths.find(path)->second->deserializeAndInsertFromArena(in); /// Second deserialize all other paths and values and insert them into dynamic paths or shared data. - return deserializeDynamicPathsAndSharedDataFromArena(pos); + deserializeDynamicPathsAndSharedDataFromArena(in); } -const char * ColumnObject::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnObject::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { /// First deserialize typed paths. They come first. for (auto path : sorted_typed_paths) - pos = typed_paths.find(path)->second->deserializeAndInsertAggregationStateValueFromArena(pos); + typed_paths.find(path)->second->deserializeAndInsertAggregationStateValueFromArena(in); /// Second deserialize all other paths and values and insert them into dynamic paths or shared data. - return deserializeDynamicPathsAndSharedDataFromArena(pos); + deserializeDynamicPathsAndSharedDataFromArena(in); } -const char * ColumnObject::deserializeDynamicPathsAndSharedDataFromArena(const char * pos) +void ColumnObject::deserializeDynamicPathsAndSharedDataFromArena(ReadBuffer & in) { size_t current_size = size(); - auto num_paths = unalignedLoad(pos); - pos += sizeof(size_t); + size_t num_paths; + readBinaryLittleEndian(num_paths, in); + const auto [shared_data_paths, shared_data_values] = getSharedDataPathsAndValues(); for (size_t i = 0; i != num_paths; ++i) { - auto path_size = unalignedLoad(pos); - pos += sizeof(size_t); - std::string_view path(pos, path_size); - pos += path_size; + size_t path_size; + readBinaryLittleEndian(path_size, in); + + if (in.available() < path_size) + throw Exception( + ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, + "Attempt to read {} bytes for the path size, but only {} bytes are available", + path_size, + in.available()); + std::string_view path(in.position(), path_size); + in.ignore(path_size); + /// Deserialize binary value and try to insert it to dynamic paths or shared data. - auto value_size = unalignedLoad(pos); - pos += sizeof(size_t); - std::string_view value(pos, value_size); - pos += value_size; + size_t value_size; + readBinaryLittleEndian(value_size, in); + /// Check if we have this path in dynamic paths. if (auto dynamic_it = dynamic_paths.find(path); dynamic_it != dynamic_paths.end()) { - ReadBufferFromMemory buf(value.data(), value.size()); - getDynamicSerialization()->deserializeBinary(*dynamic_it->second, buf, getFormatSettings()); + getDynamicSerialization()->deserializeBinary(*dynamic_it->second, in, getFormatSettings()); } /// Try to add a new dynamic path. else if (auto * dynamic_path_column = tryToAddNewDynamicPath(path)) { - ReadBufferFromMemory buf(value.data(), value.size()); - getDynamicSerialization()->deserializeBinary(*dynamic_path_column, buf, getFormatSettings()); + getDynamicSerialization()->deserializeBinary(*dynamic_path_column, in, getFormatSettings()); } /// Limit on dynamic paths is reached, add this path to shared data. /// Serialized paths are sorted, so we can insert right away. else { + if (in.available() < value_size) + throw Exception( + ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, + "Attempt to read {} bytes for the value size, but only {} bytes are available", + value_size, + in.available()); + std::string_view value(in.position(), value_size); + in.ignore(value_size); shared_data_paths->insertData(path.data(), path.size()); shared_data_values->insertData(value.data(), value.size()); } @@ -1050,30 +1089,28 @@ const char * ColumnObject::deserializeDynamicPathsAndSharedDataFromArena(const c if (column->size() == current_size) column->insertDefault(); } - - return pos; } -const char * ColumnObject::skipSerializedInArena(const char * pos) const +void ColumnObject::skipSerializedInArena(ReadBuffer & in) const { /// First, skip all values of typed paths; for (auto path : sorted_typed_paths) - pos = typed_paths.find(path)->second->skipSerializedInArena(pos); + typed_paths.find(path)->second->skipSerializedInArena(in); /// Second, skip all other paths and values. - auto num_paths = unalignedLoad(pos); - pos += sizeof(size_t); + size_t num_paths; + readBinaryLittleEndian(num_paths, in); + for (size_t i = 0; i != num_paths; ++i) { - auto path_size = unalignedLoad(pos); - pos += sizeof(size_t); - std::string_view path(pos, path_size); - pos += path_size; - auto value_size = unalignedLoad(pos); - pos += sizeof(size_t) + value_size; - } + size_t path_size; + readBinaryLittleEndian(path_size, in); + in.ignore(path_size); - return pos; + size_t value_size; + readBinaryLittleEndian(value_size, in); + in.ignore(value_size); + } } void ColumnObject::updateHashWithValue(size_t n, SipHash & hash) const diff --git a/src/Columns/ColumnObject.h b/src/Columns/ColumnObject.h index 2f25e0a1230f..c21b77f07b33 100644 --- a/src/Columns/ColumnObject.h +++ b/src/Columns/ColumnObject.h @@ -145,9 +145,9 @@ class ColumnObject final : public COWHelper, ColumnO StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; std::optional getSerializedValueSize(size_t) const override { return std::nullopt; } void updateHashWithValue(size_t n, SipHash & hash) const override; @@ -290,7 +290,7 @@ class ColumnObject final : public COWHelper, ColumnO void insertFromSharedDataAndFillRemainingDynamicPaths(const ColumnObject & src_object_column, std::vector && src_dynamic_paths_for_shared_data, size_t start, size_t length); void serializePathAndValueIntoArena(Arena & arena, const char *& begin, StringRef path, StringRef value, StringRef & res) const; void serializeDynamicPathsAndSharedDataIntoArena(size_t n, Arena & arena, const char *& begin, StringRef & res) const; - const char * deserializeDynamicPathsAndSharedDataFromArena(const char * pos); + void deserializeDynamicPathsAndSharedDataFromArena(ReadBuffer & in); /// Map path -> column for paths with explicitly specified types. /// This set of paths is constant and cannot be changed. diff --git a/src/Columns/ColumnObjectDeprecated.h b/src/Columns/ColumnObjectDeprecated.h index 3401d03f6fde..c7dd2b66158c 100644 --- a/src/Columns/ColumnObjectDeprecated.h +++ b/src/Columns/ColumnObjectDeprecated.h @@ -252,8 +252,8 @@ class ColumnObjectDeprecated final : public COWHelper ColumnSparse::getSerializedValueSize(size_t n) const return values->getSerializedValueSize(getValueIndex(n)); } -const char * ColumnSparse::deserializeAndInsertFromArena(const char * pos) +void ColumnSparse::deserializeAndInsertFromArena(ReadBuffer & in) { - const char * res = nullptr; - insertSingleValue([&](IColumn & column) { res = column.deserializeAndInsertFromArena(pos); }); - return res; + insertSingleValue([&](IColumn & column) { column.deserializeAndInsertFromArena(in); }); } -const char * ColumnSparse::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnSparse::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { - const char * res = nullptr; - insertSingleValue([&](IColumn & column) { res = column.deserializeAndInsertAggregationStateValueFromArena(pos); }); - return res; + insertSingleValue([&](IColumn & column) { column.deserializeAndInsertAggregationStateValueFromArena(in); }); } -const char * ColumnSparse::skipSerializedInArena(const char * pos) const +void ColumnSparse::skipSerializedInArena(ReadBuffer & in) const { - return values->skipSerializedInArena(pos); + values->skipSerializedInArena(in); } #if !defined(DEBUG_OR_SANITIZER_BUILD) diff --git a/src/Columns/ColumnSparse.h b/src/Columns/ColumnSparse.h index ab1cecfc51d7..f51982bd2f2f 100644 --- a/src/Columns/ColumnSparse.h +++ b/src/Columns/ColumnSparse.h @@ -82,9 +82,9 @@ class ColumnSparse final : public COWHelper, ColumnS StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; char * serializeValueIntoMemory(size_t n, char * memory) const override; std::optional getSerializedValueSize(size_t n) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; - const char * skipSerializedInArena(const char *) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; #if !defined(DEBUG_OR_SANITIZER_BUILD) void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; #else diff --git a/src/Columns/ColumnString.cpp b/src/Columns/ColumnString.cpp index c54bf5d1da3d..11f6c21385fc 100644 --- a/src/Columns/ColumnString.cpp +++ b/src/Columns/ColumnString.cpp @@ -317,40 +317,39 @@ void ColumnString::batchSerializeValueIntoMemory(std::vector & memories) } } -const char * ColumnString::deserializeAndInsertFromArena(const char * pos) +void ColumnString::deserializeAndInsertFromArena(ReadBuffer & in) { - const size_t string_size = unalignedLoad(pos); - pos += sizeof(string_size); + size_t string_size; + readBinaryLittleEndian(string_size, in); const size_t old_size = chars.size(); const size_t new_size = old_size + string_size; chars.resize(new_size); - memcpy(chars.data() + old_size, pos, string_size); + in.readStrict(reinterpret_cast(chars.data() + old_size), string_size); offsets.push_back(new_size); - return pos + string_size; } -const char * ColumnString::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnString::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { /// Serialized value contains string values with 0 byte at the end for compatibility. - const size_t string_size_with_zero_byte = unalignedLoad(pos); - pos += sizeof(string_size_with_zero_byte); + size_t string_size_with_zero_byte; + readBinaryLittleEndian(string_size_with_zero_byte, in); const size_t old_size = chars.size(); const size_t new_size = old_size + string_size_with_zero_byte - 1; chars.resize(new_size); - memcpy(chars.data() + old_size, pos, string_size_with_zero_byte - 1); + in.readStrict(reinterpret_cast(chars.data() + old_size), string_size_with_zero_byte - 1); + in.ignore(1); /// ignore the 0 byte at the end. offsets.push_back(new_size); - return pos + string_size_with_zero_byte; } -const char * ColumnString::skipSerializedInArena(const char * pos) const +void ColumnString::skipSerializedInArena(ReadBuffer & in) const { - const size_t string_size = unalignedLoad(pos); - pos += sizeof(string_size); - return pos + string_size; + size_t string_size; + readBinaryLittleEndian(string_size, in); + in.ignore(string_size); } ColumnPtr ColumnString::index(const IColumn & indexes, size_t limit) const diff --git a/src/Columns/ColumnString.h b/src/Columns/ColumnString.h index 977576fe7b27..807f75de0b1f 100644 --- a/src/Columns/ColumnString.h +++ b/src/Columns/ColumnString.h @@ -212,10 +212,10 @@ class ColumnString final : public COWHelper, ColumnS void batchSerializeValueIntoMemory(std::vector & memories) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; - const char * skipSerializedInArena(const char * pos) const override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override; diff --git a/src/Columns/ColumnTuple.cpp b/src/Columns/ColumnTuple.cpp index d7f89b03d064..8da05b044c91 100644 --- a/src/Columns/ColumnTuple.cpp +++ b/src/Columns/ColumnTuple.cpp @@ -352,6 +352,12 @@ StringRef ColumnTuple::serializeAggregationStateValueIntoArena(size_t n, Arena & char * ColumnTuple::serializeValueIntoMemory(size_t n, char * memory) const { + if (columns.empty()) + { + *memory = 0; + return memory + 1; + } + for (const auto & column : columns) memory = column->serializeValueIntoMemory(n, memory); @@ -360,6 +366,9 @@ char * ColumnTuple::serializeValueIntoMemory(size_t n, char * memory) const std::optional ColumnTuple::getSerializedValueSize(size_t n) const { + if (columns.empty()) + return 1; + size_t res = 0; for (const auto & column : columns) { @@ -373,38 +382,44 @@ std::optional ColumnTuple::getSerializedValueSize(size_t n) const } -const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos) +void ColumnTuple::deserializeAndInsertFromArena(ReadBuffer & in) { ++column_length; if (columns.empty()) - return pos + 1; + { + in.ignore(1); + return; + } for (auto & column : columns) - pos = column->deserializeAndInsertFromArena(pos); - - return pos; + column->deserializeAndInsertFromArena(in); } -const char * ColumnTuple::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnTuple::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { ++column_length; if (columns.empty()) - return pos + 1; + { + in.ignore(1); + return; + } for (auto & column : columns) - pos = column->deserializeAndInsertAggregationStateValueFromArena(pos); - - return pos; + column->deserializeAndInsertAggregationStateValueFromArena(in); } -const char * ColumnTuple::skipSerializedInArena(const char * pos) const +void ColumnTuple::skipSerializedInArena(ReadBuffer & in) const { - for (const auto & column : columns) - pos = column->skipSerializedInArena(pos); + if (columns.empty()) + { + in.ignore(1); + return; + } - return pos; + for (const auto & column : columns) + column->skipSerializedInArena(in); } void ColumnTuple::updateHashWithValue(size_t n, SipHash & hash) const diff --git a/src/Columns/ColumnTuple.h b/src/Columns/ColumnTuple.h index f6a9879e1610..14758ae6bee5 100644 --- a/src/Columns/ColumnTuple.h +++ b/src/Columns/ColumnTuple.h @@ -81,9 +81,9 @@ class ColumnTuple final : public COWHelper, ColumnTup StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; char * serializeValueIntoMemory(size_t n, char * memory) const override; std::optional getSerializedValueSize(size_t n) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override; WeakHash32 getWeakHash32() const override; void updateHashFast(SipHash & hash) const override; diff --git a/src/Columns/ColumnUnique.h b/src/Columns/ColumnUnique.h index 22c259dc8e1b..c3a533ccaef7 100644 --- a/src/Columns/ColumnUnique.h +++ b/src/Columns/ColumnUnique.h @@ -27,6 +27,7 @@ namespace DB namespace ErrorCodes { + extern const int ATTEMPT_TO_READ_AFTER_EOF; extern const int LOGICAL_ERROR; extern const int ILLEGAL_COLUMN; extern const int NOT_IMPLEMENTED; @@ -65,8 +66,8 @@ class ColumnUnique final : public COWHelper & sizes, const UInt8 * is_null) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; char * serializeValueIntoMemory(size_t n, char * memory) const override; - const char * skipSerializedInArena(const char * pos) const override; + void skipSerializedInArena(ReadBuffer & in) const override; + StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; void updateHashWithValue(size_t n, SipHash & hash_func) const override; #if !defined(DEBUG_OR_SANITIZER_BUILD) @@ -496,68 +498,102 @@ char * ColumnUnique::serializeValueIntoMemory(size_t n, char * memor } template -size_t ColumnUnique::uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) +StringRef ColumnUnique::serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const { if (is_nullable) { - UInt8 val = unalignedLoad(pos); - pos += sizeof(val); + static constexpr auto s = sizeof(UInt8); + + auto * pos = arena.allocContinue(s, begin); + UInt8 flag = (n == getNullValueIndex() ? 1 : 0); + unalignedStore(pos, flag); + + if (n == getNullValueIndex()) + return StringRef(pos, s); + + auto nested_ref = column_holder->serializeAggregationStateValueIntoArena(n, arena, begin); + + /// serializeAggregationStateValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back. + return StringRef(nested_ref.data - s, nested_ref.size + s); + } + + return column_holder->serializeAggregationStateValueIntoArena(n, arena, begin); +} + +template +size_t ColumnUnique::uniqueDeserializeAndInsertFromArena(ReadBuffer & in) +{ + if (is_nullable) + { + UInt8 val; + readBinaryLittleEndian(val, in); if (val) - { - new_pos = pos; return getNullValueIndex(); - } } /// Numbers, FixedString if (size_of_value_if_fixed) { - new_pos = pos + size_of_value_if_fixed; - return uniqueInsertData(pos, size_of_value_if_fixed); + if (in.available() < size_of_value_if_fixed) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Not enough data to deserialize fixed size value in ColumnUnique."); + + size_t ret = uniqueInsertData(in.position(), size_of_value_if_fixed); + in.ignore(size_of_value_if_fixed); + return ret; } /// String - const size_t string_size = unalignedLoad(pos); - pos += sizeof(string_size); - new_pos = pos + string_size; - - return uniqueInsertData(pos, string_size); + size_t string_size; + readBinaryLittleEndian(string_size, in); + if (in.available() < string_size) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Not enough data to deserialize string value in ColumnUnique."); + + size_t ret = uniqueInsertData(in.position(), string_size); + in.ignore(string_size); + return ret; } template -size_t ColumnUnique::uniqueDeserializeAndInsertAggregationStateValueFromArena(const char * pos, const char *& new_pos) +size_t ColumnUnique::uniqueDeserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { if (is_nullable) { - UInt8 val = unalignedLoad(pos); - pos += sizeof(val); + UInt8 val; + readBinaryLittleEndian(val, in); if (val) - { - new_pos = pos; return getNullValueIndex(); - } + } /// Numbers, FixedString if (size_of_value_if_fixed) { - new_pos = pos + size_of_value_if_fixed; - return uniqueInsertData(pos, size_of_value_if_fixed); + if (in.available() < size_of_value_if_fixed) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Not enough data to deserialize fixed size value in ColumnUnique."); + + size_t ret = uniqueInsertData(in.position(), size_of_value_if_fixed); + in.ignore(size_of_value_if_fixed); + return ret; + } /// String /// For compatibility, serialized string value contains zero byte at the end, we just ignore this byte. - const size_t string_size_with_zero_byte = unalignedLoad(pos); - pos += sizeof(string_size_with_zero_byte); - new_pos = pos + string_size_with_zero_byte; + size_t string_size_with_zero_byte; + readBinaryLittleEndian(string_size_with_zero_byte, in); + if (in.available() < string_size_with_zero_byte) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Not enough data to deserialize string value in ColumnUnique."); + + size_t ret = uniqueInsertData(in.position(), string_size_with_zero_byte - 1); + in.ignore(string_size_with_zero_byte); - return uniqueInsertData(pos, string_size_with_zero_byte - 1); + return ret; } template -const char * ColumnUnique::skipSerializedInArena(const char *) const +void ColumnUnique::skipSerializedInArena(ReadBuffer &) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method skipSerializedInArena is not supported for {}", this->getName()); } diff --git a/src/Columns/ColumnVariant.cpp b/src/Columns/ColumnVariant.cpp index 0d6e7365c5d9..111a501a739f 100644 --- a/src/Columns/ColumnVariant.cpp +++ b/src/Columns/ColumnVariant.cpp @@ -822,48 +822,51 @@ StringRef ColumnVariant::serializeAggregationStateValueIntoArena(size_t n, Arena return res; } -const char * ColumnVariant::deserializeAndInsertFromArena(const char * pos) +void ColumnVariant::deserializeAndInsertFromArena(ReadBuffer & in) { /// During any serialization/deserialization we should always use global discriminators. - Discriminator global_discr = unalignedLoad(pos); - pos += sizeof(global_discr); + Discriminator global_discr; + readBinaryLittleEndian(global_discr, in); + Discriminator local_discr = localDiscriminatorByGlobal(global_discr); getLocalDiscriminators().push_back(local_discr); if (local_discr == NULL_DISCRIMINATOR) { getOffsets().emplace_back(); - return pos; + return; } getOffsets().push_back(variants[local_discr]->size()); - return variants[local_discr]->deserializeAndInsertFromArena(pos); + variants[local_discr]->deserializeAndInsertFromArena(in); } -const char * ColumnVariant::deserializeAndInsertAggregationStateValueFromArena(const char * pos) +void ColumnVariant::deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { /// During any serialization/deserialization we should always use global discriminators. - Discriminator global_discr = unalignedLoad(pos); - pos += sizeof(global_discr); + Discriminator global_discr; + readBinaryLittleEndian(global_discr, in); + Discriminator local_discr = localDiscriminatorByGlobal(global_discr); getLocalDiscriminators().push_back(local_discr); if (local_discr == NULL_DISCRIMINATOR) { getOffsets().emplace_back(); - return pos; + return; } getOffsets().push_back(variants[local_discr]->size()); - return variants[local_discr]->deserializeAndInsertAggregationStateValueFromArena(pos); + variants[local_discr]->deserializeAndInsertAggregationStateValueFromArena(in); } -const char * ColumnVariant::skipSerializedInArena(const char * pos) const +void ColumnVariant::skipSerializedInArena(ReadBuffer & in) const { - Discriminator global_discr = unalignedLoad(pos); - pos += sizeof(global_discr); + Discriminator global_discr; + readBinaryLittleEndian(global_discr, in); + if (global_discr == NULL_DISCRIMINATOR) - return pos; + return; - return variants[localDiscriminatorByGlobal(global_discr)]->skipSerializedInArena(pos); + variants[localDiscriminatorByGlobal(global_discr)]->skipSerializedInArena(in); } char * ColumnVariant::serializeValueIntoMemory(size_t n, char * memory) const diff --git a/src/Columns/ColumnVariant.h b/src/Columns/ColumnVariant.h index 6bf29b641127..8af324755333 100644 --- a/src/Columns/ColumnVariant.h +++ b/src/Columns/ColumnVariant.h @@ -215,9 +215,9 @@ class ColumnVariant final : public COWHelper, Colum void popBack(size_t n) override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeAggregationStateValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; - const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) override; - const char * skipSerializedInArena(const char * pos) const override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; + void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) override; + void skipSerializedInArena(ReadBuffer & in) const override; char * serializeValueIntoMemory(size_t n, char * memory) const override; std::optional getSerializedValueSize(size_t n) const override; diff --git a/src/Columns/ColumnVector.cpp b/src/Columns/ColumnVector.cpp index 3fdf75fed79e..d03ccca0f5e9 100644 --- a/src/Columns/ColumnVector.cpp +++ b/src/Columns/ColumnVector.cpp @@ -24,6 +24,8 @@ #include #include #include +#include +#include #include #include @@ -56,16 +58,17 @@ namespace ErrorCodes } template -const char * ColumnVector::deserializeAndInsertFromArena(const char * pos) +void ColumnVector::deserializeAndInsertFromArena(ReadBuffer & in) { - data.emplace_back(unalignedLoad(pos)); - return pos + sizeof(T); + T element; + readBinaryLittleEndian(element, in); + data.emplace_back(std::move(element)); } template -const char * ColumnVector::skipSerializedInArena(const char * pos) const +void ColumnVector::skipSerializedInArena(ReadBuffer & in) const { - return pos + sizeof(T); + in.ignore(sizeof(T)); } template diff --git a/src/Columns/ColumnVector.h b/src/Columns/ColumnVector.h index 06fe60c7b174..b5154ee38520 100644 --- a/src/Columns/ColumnVector.h +++ b/src/Columns/ColumnVector.h @@ -106,9 +106,9 @@ class ColumnVector final : public COWHelper, Colum data.resize_assume_reserved(data.size() - n); } - const char * deserializeAndInsertFromArena(const char * pos) override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; - const char * skipSerializedInArena(const char * pos) const override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t n, SipHash & hash) const override; diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index cab3d8b6eb1f..31ae50a72d22 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -32,6 +32,8 @@ class WeakHash32; class ColumnConst; class IDataType; class Block; +class ReadBuffer; +struct ColumnsInfo; using DataTypePtr = std::shared_ptr; using IColumnPermutation = PaddedPODArray; using IColumnFilter = PaddedPODArray; @@ -308,19 +310,18 @@ class IColumn : public COW virtual void collectSerializedValueSizes(PaddedPODArray & /* sizes */, const UInt8 * /* is_null */) const; /// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method. - /// Returns pointer to the position after the read data. - [[nodiscard]] virtual const char * deserializeAndInsertFromArena(const char * pos) = 0; + /// Note that it needs to deal with user input + virtual void deserializeAndInsertFromArena(ReadBuffer & in) = 0; /// Deserializes a value that was serialized using IColumn::serializeAggregationStateValueIntoArena method. - /// Returns pointer to the position after the read data. - [[nodiscard]] virtual const char * deserializeAndInsertAggregationStateValueFromArena(const char * pos) + /// Note that it needs to deal with user input + virtual void deserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) { - return deserializeAndInsertFromArena(pos); + deserializeAndInsertFromArena(in); } /// Skip previously serialized value that was serialized using IColumn::serializeValueIntoArena method. - /// Returns a pointer to the position after the deserialized data. - [[nodiscard]] virtual const char * skipSerializedInArena(const char *) const = 0; + virtual void skipSerializedInArena(ReadBuffer & in) const = 0; /// Update state of hash function with value of n-th element. /// On subsequent calls of this method for sequence of column values of arbitrary types, diff --git a/src/Columns/IColumnDummy.cpp b/src/Columns/IColumnDummy.cpp index 1d1d9f53cec4..4532a6a79141 100644 --- a/src/Columns/IColumnDummy.cpp +++ b/src/Columns/IColumnDummy.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -48,15 +49,15 @@ StringRef IColumnDummy::serializeValueIntoArena(size_t /*n*/, Arena & arena, cha return { res, 1 }; } -const char * IColumnDummy::deserializeAndInsertFromArena(const char * pos) +void IColumnDummy::deserializeAndInsertFromArena(ReadBuffer & in) { ++s; - return pos + 1; + in.ignore(1); } -const char * IColumnDummy::skipSerializedInArena(const char * pos) const +void IColumnDummy::skipSerializedInArena(ReadBuffer & in) const { - return pos; + in.ignore(1); } ColumnPtr IColumnDummy::filter(const Filter & filt, ssize_t /*result_size_hint*/) const diff --git a/src/Columns/IColumnDummy.h b/src/Columns/IColumnDummy.h index f5e0cf779d37..a66c2223edd5 100644 --- a/src/Columns/IColumnDummy.h +++ b/src/Columns/IColumnDummy.h @@ -57,9 +57,9 @@ class IColumnDummy : public IColumnHelper StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin) const override; - const char * deserializeAndInsertFromArena(const char * pos) override; + void deserializeAndInsertFromArena(ReadBuffer & in) override; - const char * skipSerializedInArena(const char * pos) const override; + void skipSerializedInArena(ReadBuffer & in) const override; void updateHashWithValue(size_t /*n*/, SipHash & /*hash*/) const override { diff --git a/src/Columns/IColumnUnique.h b/src/Columns/IColumnUnique.h index cd5a46405744..5c0d60de8a30 100644 --- a/src/Columns/IColumnUnique.h +++ b/src/Columns/IColumnUnique.h @@ -68,8 +68,8 @@ class IColumnUnique : public IColumn virtual size_t getNestedTypeDefaultValueIndex() const = 0; /// removeNullable()->getDefault() value index virtual bool canContainNulls() const = 0; - virtual size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) = 0; - virtual size_t uniqueDeserializeAndInsertAggregationStateValueFromArena(const char * pos, const char *& new_pos) = 0; + virtual size_t uniqueDeserializeAndInsertFromArena(ReadBuffer & in) = 0; + virtual size_t uniqueDeserializeAndInsertAggregationStateValueFromArena(ReadBuffer & in) = 0; /// Returns dictionary hash which is SipHash is applied to each row of nested column. virtual UInt128 getHash() const = 0; @@ -116,7 +116,7 @@ class IColumnUnique : public IColumn throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method gather is not supported for ColumnUnique."); } - const char * deserializeAndInsertFromArena(const char *) override + void deserializeAndInsertFromArena(ReadBuffer &) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method deserializeAndInsertFromArena is not supported for ColumnUnique."); } diff --git a/src/Columns/tests/gtest_column_dynamic.cpp b/src/Columns/tests/gtest_column_dynamic.cpp index 539926d03635..0679bbd5243c 100644 --- a/src/Columns/tests/gtest_column_dynamic.cpp +++ b/src/Columns/tests/gtest_column_dynamic.cpp @@ -1,7 +1,8 @@ #include #include -#include +#include #include +#include using namespace DB; @@ -755,10 +756,12 @@ TEST(ColumnDynamic, SerializeDeserializeFromArena1) column->serializeValueIntoArena(1, arena, pos); column->serializeValueIntoArena(2, arena, pos); column->serializeValueIntoArena(3, arena, pos); - pos = column->deserializeAndInsertFromArena(ref1.data); - pos = column->deserializeAndInsertFromArena(pos); - pos = column->deserializeAndInsertFromArena(pos); - column->deserializeAndInsertFromArena(pos); + + ReadBufferFromString in({ref1.data, arena.usedBytes()}); + column->deserializeAndInsertFromArena(in); + column->deserializeAndInsertFromArena(in); + column->deserializeAndInsertFromArena(in); + column->deserializeAndInsertFromArena(in); ASSERT_EQ((*column)[column->size() - 4], 42); ASSERT_EQ((*column)[column->size() - 3], 42.42); @@ -782,10 +785,11 @@ TEST(ColumnDynamic, SerializeDeserializeFromArena2) column_from->serializeValueIntoArena(3, arena, pos); auto column_to = ColumnDynamic::create(254); - pos = column_to->deserializeAndInsertFromArena(ref1.data); - pos = column_to->deserializeAndInsertFromArena(pos); - pos = column_to->deserializeAndInsertFromArena(pos); - column_to->deserializeAndInsertFromArena(pos); + ReadBufferFromString in({ref1.data, arena.usedBytes()}); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); ASSERT_EQ((*column_to)[column_to->size() - 4], 42); ASSERT_EQ((*column_to)[column_to->size() - 3], 42.42); @@ -814,10 +818,11 @@ TEST(ColumnDynamic, SerializeDeserializeFromArenaOverflow1) column_from->serializeValueIntoArena(3, arena, pos); auto column_to = getDynamicWithManyVariants(253); - pos = column_to->deserializeAndInsertFromArena(ref1.data); - pos = column_to->deserializeAndInsertFromArena(pos); - pos = column_to->deserializeAndInsertFromArena(pos); - column_to->deserializeAndInsertFromArena(pos); + ReadBufferFromString in({ref1.data, arena.usedBytes()}); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); ASSERT_EQ((*column_to)[column_to->size() - 4], 42); ASSERT_EQ((*column_to)[column_to->size() - 3], 42.42); @@ -848,11 +853,12 @@ TEST(ColumnDynamic, SerializeDeserializeFromArenaOverflow2) auto column_to = ColumnDynamic::create(2); column_to->insert(Field(42.42)); - pos = column_to->deserializeAndInsertFromArena(ref1.data); - pos = column_to->deserializeAndInsertFromArena(pos); - pos = column_to->deserializeAndInsertFromArena(pos); - pos = column_to->deserializeAndInsertFromArena(pos); - column_to->deserializeAndInsertFromArena(pos); + ReadBufferFromString in({ref1.data, arena.usedBytes()}); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); + column_to->deserializeAndInsertFromArena(in); ASSERT_EQ((*column_to)[column_to->size() - 5], 42); ASSERT_EQ((*column_to)[column_to->size() - 4], 42.42); @@ -879,16 +885,16 @@ TEST(ColumnDynamic, skipSerializedInArena) auto ref1 = column_from->serializeValueIntoArena(0, arena, pos); column_from->serializeValueIntoArena(1, arena, pos); column_from->serializeValueIntoArena(2, arena, pos); - auto ref4 = column_from->serializeValueIntoArena(3, arena, pos); + column_from->serializeValueIntoArena(3, arena, pos); - const char * end = ref4.data + ref4.size; auto column_to = ColumnDynamic::create(254); - pos = column_to->skipSerializedInArena(ref1.data); - pos = column_to->skipSerializedInArena(pos); - pos = column_to->skipSerializedInArena(pos); - pos = column_to->skipSerializedInArena(pos); + ReadBufferFromString in({ref1.data, arena.usedBytes()}); + column_to->skipSerializedInArena(in); + column_to->skipSerializedInArena(in); + column_to->skipSerializedInArena(in); + column_to->skipSerializedInArena(in); - ASSERT_EQ(pos, end); + ASSERT_TRUE(in.eof()); ASSERT_EQ(column_to->getVariantInfo().variant_name_to_discriminator.at("SharedVariant"), 0); ASSERT_EQ(column_to->getVariantInfo().variant_names, Names{"SharedVariant"}); } diff --git a/src/Columns/tests/gtest_column_object.cpp b/src/Columns/tests/gtest_column_object.cpp index 408c76cd58d8..fb2ba8c0be6c 100644 --- a/src/Columns/tests/gtest_column_object.cpp +++ b/src/Columns/tests/gtest_column_object.cpp @@ -1,7 +1,8 @@ -#include #include +#include #include #include +#include #include #include @@ -318,9 +319,11 @@ TEST(ColumnObject, SerializeDeserializerFromArena) auto col2 = type->createColumn(); auto & col_object2 = assert_cast(*col); - pos = col_object2.deserializeAndInsertFromArena(ref1.data); - pos = col_object2.deserializeAndInsertFromArena(pos); - col_object2.deserializeAndInsertFromArena(pos); + ReadBufferFromString in({ref1.data, arena.usedBytes()}); + col_object2.deserializeAndInsertFromArena(in); + col_object2.deserializeAndInsertFromArena(in); + col_object2.deserializeAndInsertFromArena(in); + ASSERT_TRUE(in.eof()); ASSERT_EQ(col_object2[0], (Object{{"b.d", Field(42u)}, {"a.b", Array{"Str1", "Str2"}}, {"a.a", Tuple{"Str3", 441u}}, {"a.c", Field("Str4")}, {"a.d", Array{Field(45), Field(46)}}, {"a.e", Field(47)}})); ASSERT_EQ(col_object2[1], (Object{{"b.d", Field{0u}}, {"a.b", Array{}}, {"b.a", Field(48)}, {"b.b", Array{Field(49), Field(50)}}})); @@ -340,14 +343,14 @@ TEST(ColumnObject, SkipSerializedInArena) const char * pos = nullptr; auto ref1 = col_object.serializeValueIntoArena(0, arena, pos); col_object.serializeValueIntoArena(1, arena, pos); - auto ref3 = col_object.serializeValueIntoArena(2, arena, pos); + col_object.serializeValueIntoArena(2, arena, pos); - const char * end = ref3.data + ref3.size; auto col2 = type->createColumn(); - pos = col2->skipSerializedInArena(ref1.data); - pos = col2->skipSerializedInArena(pos); - pos = col2->skipSerializedInArena(pos); - ASSERT_EQ(pos, end); + ReadBufferFromString in({ref1.data, arena.usedBytes()}); + col2->skipSerializedInArena(in); + col2->skipSerializedInArena(in); + col2->skipSerializedInArena(in); + ASSERT_TRUE(in.eof()); } TEST(ColumnObject, rollback) diff --git a/src/Columns/tests/gtest_column_unique.cpp b/src/Columns/tests/gtest_column_unique.cpp index 15208da70fb2..ddebfe076082 100644 --- a/src/Columns/tests/gtest_column_unique.cpp +++ b/src/Columns/tests/gtest_column_unique.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -118,9 +119,9 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const for (size_t i = 0; i < num_values; ++i) { auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos); - const char * new_pos; - column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos); - ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i; + ReadBufferFromString in({ref.data, ref.size}); + column_unique->uniqueDeserializeAndInsertFromArena(in); + ASSERT_TRUE(in.eof()) << "Deserialized data has different sizes at position " << i; ASSERT_EQ(column_unique_pattern->getNestedNotNullableColumn()->getDataAt(idx->getUInt(i)), column_unique->getNestedNotNullableColumn()->getDataAt(idx->getUInt(i))) diff --git a/src/Columns/tests/gtest_low_cardinality.cpp b/src/Columns/tests/gtest_low_cardinality.cpp index 301fa2a60904..c8ae39013fe0 100644 --- a/src/Columns/tests/gtest_low_cardinality.cpp +++ b/src/Columns/tests/gtest_low_cardinality.cpp @@ -62,3 +62,25 @@ TEST(ColumnLowCardinality, Clone) ASSERT_TRUE(assert_cast(*nullable_column).nestedIsNullable()); ASSERT_FALSE(assert_cast(*column).nestedIsNullable()); } + +TEST(ColumnLowCardinality, EmptyDictionaryEmptyIndexes) +{ + /// Test edge case: empty dictionary (size=0) with empty indexes (num_rows=0) + /// This should not throw an error, as empty indexes are always valid + /// Regression test for bug where check was: if (max_position >= limit) + /// When num_rows=0, max_position stays 0, and with limit=0, this incorrectly threw + + auto data_type = std::make_shared(); + auto low_cardinality_type = std::make_shared(data_type); + auto column = low_cardinality_type->createColumn(); + auto & lc_column = assert_cast(*column); + + // Create empty keys and indexes columns + auto empty_keys = ColumnUInt32::create(); + auto empty_indexes = ColumnUInt8::create(); + + // This should NOT throw an exception + ASSERT_NO_THROW(lc_column.insertRangeFromDictionaryEncodedColumn(*empty_keys, *empty_indexes)); + + ASSERT_EQ(column->size(), 0); +} diff --git a/src/Common/DateLUT.cpp b/src/Common/DateLUT.cpp index ba7dd6602e48..39e8044f36d6 100644 --- a/src/Common/DateLUT.cpp +++ b/src/Common/DateLUT.cpp @@ -209,6 +209,11 @@ ExtendedDayNum makeDayNum(const DateLUTImpl & date_lut, Int16 year, UInt8 month, return date_lut.makeDayNum(year, month, day_of_month, default_error_day_num); } +std::optional tryToMakeDayNum(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 day_of_month) +{ + return date_lut.tryToMakeDayNum(year, month, day_of_month); +} + Int64 makeDate(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 day_of_month) { static_assert(std::same_as); @@ -221,6 +226,12 @@ Int64 makeDateTime(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 return date_lut.makeDateTime(year, month, day_of_month, hour, minute, second); } +std::optional tryToMakeDateTime(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 day_of_month, UInt8 hour, UInt8 minute, UInt8 second) +{ + static_assert(std::same_as); + return date_lut.tryToMakeDateTime(year, month, day_of_month, hour, minute, second); +} + const std::string & getDateLUTTimeZone(const DateLUTImpl & date_lut) { return date_lut.getTimeZone(); diff --git a/src/Common/DateLUT.h b/src/Common/DateLUT.h index 72992e3c70d2..ff025fd8036b 100644 --- a/src/Common/DateLUT.h +++ b/src/Common/DateLUT.h @@ -87,9 +87,11 @@ inline UInt64 timeInNanoseconds(std::chrono::time_point tryToMakeDayNum(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 day_of_month); Int64 makeDate(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 day_of_month); Int64 makeDateTime(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 day_of_month, UInt8 hour, UInt8 minute, UInt8 second); +std::optional tryToMakeDateTime(const DateLUTImpl & date_lut, Int16 year, UInt8 month, UInt8 day_of_month, UInt8 hour, UInt8 minute, UInt8 second); const std::string & getDateLUTTimeZone(const DateLUTImpl & date_lut); UInt32 getDayNumOffsetEpoch(); diff --git a/src/Common/DateLUTImpl.h b/src/Common/DateLUTImpl.h index 5c08916f8095..b9c058b60a85 100644 --- a/src/Common/DateLUTImpl.h +++ b/src/Common/DateLUTImpl.h @@ -1177,6 +1177,20 @@ class DateLUTImpl return LUTIndex{std::min(index, static_cast(DATE_LUT_SIZE - 1))}; } + std::optional tryToMakeLUTIndex(Int16 year, UInt8 month, UInt8 day_of_month) const + { + if (unlikely(year < DATE_LUT_MIN_YEAR || year > DATE_LUT_MAX_YEAR || month < 1 || month > 12 || day_of_month < 1 || day_of_month > 31)) + return std::nullopt; + + auto year_lut_index = (year - DATE_LUT_MIN_YEAR) * 12 + month - 1; + UInt32 index = years_months_lut[year_lut_index].toUnderType() + day_of_month - 1; + + if (index >= DATE_LUT_SIZE) + return std::nullopt; + + return LUTIndex(index); + } + /// Create DayNum from year, month, day of month. ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month, Int32 default_error_day_num = 0) const { @@ -1186,6 +1200,18 @@ class DateLUTImpl return toDayNum(makeLUTIndex(year, month, day_of_month)); } + std::optional tryToMakeDayNum(Int16 year, UInt8 month, UInt8 day_of_month) const + { + if (unlikely(year < DATE_LUT_MIN_YEAR || month < 1 || month > 12 || day_of_month < 1 || day_of_month > 31)) + return std::nullopt; + + auto index = tryToMakeLUTIndex(year, month, day_of_month); + if (!index) + return std::nullopt; + + return toDayNum(*index); + } + Time makeDate(Int16 year, UInt8 month, UInt8 day_of_month) const { return lut[makeLUTIndex(year, month, day_of_month)].date; @@ -1204,6 +1230,20 @@ class DateLUTImpl return lut[index].date + time_offset; } + std::optional