diff --git a/CMakeLists.txt b/CMakeLists.txt index b5273a77..a98fdb78 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,7 @@ option(PAIMON_ENABLE_ORC "Whether to enable orc file format" ON) option(PAIMON_ENABLE_LANCE "Whether to enable lance file format" OFF) option(PAIMON_ENABLE_JINDO "Whether to enable jindo file system" OFF) option(PAIMON_ENABLE_LUMINA "Whether to enable lumina vector index" ON) +option(PAIMON_ENABLE_LUCENE "Whether to enable lucene index" ON) if(PAIMON_ENABLE_ORC) add_definitions(-DPAIMON_ENABLE_ORC) @@ -82,6 +83,10 @@ if(PAIMON_ENABLE_LUMINA) add_definitions(-DPAIMON_ENABLE_LUMINA) endif() +if(PAIMON_ENABLE_LUCENE) + add_definitions(-DPAIMON_ENABLE_LUCENE) +endif() + add_definitions(-DSNAPPY_CODEC_AVAILABLE) add_definitions(-DZSTD_CODEC_AVAILABLE) add_definitions(-DRAPIDJSON_HAS_STDSTRING) @@ -379,6 +384,11 @@ if(PAIMON_BUILD_TESTS) list(APPEND TEST_STATIC_LINK_LIBS paimon_lumina_index_shared) list(APPEND TEST_STATIC_LINK_LIBS "-Wl,--as-needed") endif() + if(PAIMON_ENABLE_LUCENE) + list(APPEND TEST_STATIC_LINK_LIBS "-Wl,--no-as-needed") + list(APPEND TEST_STATIC_LINK_LIBS paimon_lucene_index_shared) + list(APPEND TEST_STATIC_LINK_LIBS "-Wl,--as-needed") + endif() endif() @@ -407,6 +417,7 @@ add_subdirectory(src/paimon/format/parquet) add_subdirectory(src/paimon/format/avro) add_subdirectory(src/paimon/format/lance) add_subdirectory(src/paimon/global_index/lumina) +add_subdirectory(src/paimon/global_index/lucene) add_subdirectory(src/paimon/testing/mock) add_subdirectory(src/paimon/testing/utils) add_subdirectory(test/inte) diff --git a/cmake_modules/SetupCxxFlags.cmake b/cmake_modules/SetupCxxFlags.cmake index e4b27108..03b1918c 100644 --- a/cmake_modules/SetupCxxFlags.cmake +++ b/cmake_modules/SetupCxxFlags.cmake @@ -80,10 +80,13 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-parameter") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unknown-warning-option") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-constant-logical-operand") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-builtins") elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-conversion") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-builtins") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-sign-conversion") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-variable") else() @@ -182,6 +185,8 @@ elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang" OR CMAKE_CXX_COMPILER_ID STRE # Don't complain about optimization passes that were not possible set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-pass-failed") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-builtins") if(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang") # Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 97b82d8f..511a6919 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -112,6 +112,18 @@ else() endif() endif() +if(DEFINED ENV{PAIMON_LUCENE_URL}) + set(LUCENE_SOURCE_URL "$ENV{PAIMON_LUCENE_URL}") +else() + if(EXISTS "${THIRDPARTY_DIR}/${PAIMON_LUCENE_PKG_NAME}") + set_urls(LUCENE_SOURCE_URL "${THIRDPARTY_DIR}/${PAIMON_LUCENE_PKG_NAME}") + else() + set_urls(LUCENE_SOURCE_URL + "${THIRDPARTY_MIRROR_URL}https://github.com/luceneplusplus/LucenePlusPlus/archive/refs/tags/${PAIMON_LUCENE_PKG_NAME}" + ) + endif() +endif() + if(DEFINED ENV{PAIMON_GLOG_URL}) set(GLOG_SOURCE_URL "$ENV{PAIMON_GLOG_URL}") else() @@ -275,6 +287,62 @@ set(EP_COMMON_CMAKE_ARGS -DCMAKE_C_FLAGS=${EP_C_FLAGS} -DCMAKE_INSTALL_LIBDIR=lib) +macro(build_lucene) + message(STATUS "Building lucene from source") + set(LUCENE_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/lucene_ep-install") + set(LUCENE_CMAKE_ARGS + ${EP_COMMON_CMAKE_ARGS} + "-DENABLE_TEST=OFF" + "-DCMAKE_C_FLAGS=-pthread" + "-DCMAKE_CXX_FLAGS=-pthread" + "-DCMAKE_EXE_LINKER_FLAGS=-pthread" + "-DBoost_INCLUDE_DIR=${BOOST_INCLUDE_DIR}" + "-DBoost_LIBRARY_DIR=${BOOST_LIBRARY_DIR}" + "-DBOOST_ROOT=${BOOST_INSTALL}" + "-DBoost_CHRONO_FOUND=TRUE" + "-DBoost_THREAD_FOUND=TRUE" + "-DCMAKE_INSTALL_PREFIX=${LUCENE_PREFIX}") + + set(LUCENE_LIB "${LUCENE_PREFIX}/lib/liblucene++.so.0") + externalproject_add(lucene_ep + ${EP_COMMON_OPTIONS} + URL ${LUCENE_SOURCE_URL} + URL_HASH "SHA256=${PAIMON_LUCENE_BUILD_SHA256_CHECKSUM}" + CMAKE_ARGS ${LUCENE_CMAKE_ARGS} + BUILD_BYPRODUCTS ${LUCENE_LIB} + DEPENDS boost_date_time + boost_filesystem + boost_regex + boost_thread + boost_iostreams + boost_system + boost_chrono + boost_atomic) + + set(LUCENE_INCLUDE_DIR "${LUCENE_PREFIX}/include") + # The include directory must exist before it is referenced by a target. + file(MAKE_DIRECTORY "${LUCENE_INCLUDE_DIR}") + include_directories(SYSTEM ${LUCENE_INCLUDE_DIR} ${BOOST_INCLUDE_DIR} + ${BOOST_EXTRA_INCLUDE_DIR}) + add_library(lucene INTERFACE IMPORTED) + target_include_directories(lucene SYSTEM INTERFACE "${LUCENE_INCLUDE_DIR}") + target_compile_options(lucene INTERFACE -pthread) + + target_link_libraries(lucene + INTERFACE "${LUCENE_LIB}" + boost_date_time + boost_filesystem + boost_regex + boost_thread + boost_iostreams + boost_system + boost_chrono + boost_atomic + pthread + dl) + add_dependencies(lucene lucene_ep) +endmacro() + macro(build_rapidjson) message(STATUS "Building RapidJSON from source") set(RAPIDJSON_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/rapidjson_ep-install") @@ -342,6 +410,99 @@ macro(build_fmt) add_dependencies(fmt fmt_ep) endmacro(build_fmt) +macro(build_boost) + message(STATUS "Building boost from source") + set(BOOST_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/boost_ep-prefix") + set(BOOST_INSTALL "${CMAKE_CURRENT_BINARY_DIR}/boost_ep-install") + set(BOOST_INCLUDE_DIR "${BOOST_INSTALL}/include") + set(BOOST_LIBRARY_DIR ${BOOST_INSTALL}/lib) + file(MAKE_DIRECTORY ${BOOST_INCLUDE_DIR}) + file(MAKE_DIRECTORY ${BOOST_LIBRARY_DIR}) + + set(BOOST_BYPRODUCTS + ${BOOST_LIBRARY_DIR}/libboost_date_time.a + ${BOOST_LIBRARY_DIR}/libboost_filesystem.a + ${BOOST_LIBRARY_DIR}/libboost_system.a + ${BOOST_LIBRARY_DIR}/libboost_regex.a + ${BOOST_LIBRARY_DIR}/libboost_thread.a + ${BOOST_LIBRARY_DIR}/libboost_atomic.a + ${BOOST_LIBRARY_DIR}/libboost_chrono.a + ${BOOST_LIBRARY_DIR}/libboost_iostreams.a) + + externalproject_add(boost_ep + GIT_REPOSITORY https://github.com/boostorg/boost.git + GIT_TAG boost-${PAIMON_BOOST_BUILD_VERSION} + GIT_SHALLOW FALSE + GIT_PROGRESS TRUE + GIT_SUBMODULES_RECURSE TRUE + CONFIGURE_COMMAND ${BOOST_PREFIX}/src/boost_ep/bootstrap.sh + --with-libraries=date_time,filesystem,iostreams,regex,system,thread,chrono,atomic + BUILD_IN_SOURCE TRUE + BUILD_COMMAND ${BOOST_PREFIX}/src/boost_ep/b2 + --prefix=${BOOST_INSTALL} + --libdir=${BOOST_LIBRARY_DIR} link=static + runtime-link=shared threading=multi variant=release + cxxflags=-fPIC install + INSTALL_COMMAND bash -c + "mkdir -p ${BOOST_INSTALL}/include/boost && cp -r ${BOOST_PREFIX}/src/boost_ep/libs/*/include/boost/* ${BOOST_INSTALL}/include/boost && cp -r ${BOOST_PREFIX}/src/boost_ep/libs/*/*/include/boost/* ${BOOST_INSTALL}/include/boost" + BUILD_BYPRODUCTS ${BOOST_BYPRODUCTS} + LOG_DOWNLOAD ON + LOG_CONFIGURE ON + LOG_BUILD ON) + + include_directories(SYSTEM ${BOOST_INCLUDE_DIR}) + + add_library(boost_atomic STATIC IMPORTED) + set_target_properties(boost_atomic + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_atomic.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + add_library(boost_chrono STATIC IMPORTED) + set_target_properties(boost_chrono + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_chrono.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + add_library(boost_date_time STATIC IMPORTED) + set_target_properties(boost_date_time + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_date_time.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + add_library(boost_filesystem STATIC IMPORTED) + set_target_properties(boost_filesystem + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_filesystem.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + add_library(boost_regex STATIC IMPORTED) + set_target_properties(boost_regex + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_regex.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + add_library(boost_thread STATIC IMPORTED) + set_target_properties(boost_thread + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_thread.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + add_library(boost_iostreams STATIC IMPORTED) + set_target_properties(boost_iostreams + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_iostreams.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + add_library(boost_system STATIC IMPORTED) + set_target_properties(boost_system + PROPERTIES IMPORTED_LOCATION + ${BOOST_LIBRARY_DIR}/libboost_system.a + INTERFACE_INCLUDE_DIRECTORIES ${BOOST_INCLUDE_DIR}) + + add_dependencies(boost_atomic boost_ep) + add_dependencies(boost_chrono boost_ep) + add_dependencies(boost_date_time boost_ep) + add_dependencies(boost_filesystem boost_ep) + add_dependencies(boost_regex boost_ep) + add_dependencies(boost_thread boost_ep) + add_dependencies(boost_iostreams boost_ep) + add_dependencies(boost_system boost_ep) +endmacro(build_boost) + macro(build_snappy) message(STATUS "Building snappy from source") set(SNAPPY_HOME "${CMAKE_CURRENT_BINARY_DIR}/snappy_ep-install") @@ -1108,3 +1269,7 @@ if(PAIMON_ENABLE_JINDO) build_jindosdk_c() build_jindosdk_nextarch() endif() +if(PAIMON_ENABLE_LUCENE) + build_boost() + build_lucene() +endif() diff --git a/include/paimon/predicate/full_text_search.h b/include/paimon/predicate/full_text_search.h new file mode 100644 index 00000000..e5fba975 --- /dev/null +++ b/include/paimon/predicate/full_text_search.h @@ -0,0 +1,74 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include +#include + +#include "paimon/predicate/predicate.h" +#include "paimon/visibility.h" + +namespace paimon { +/// A configuration structure for full-text search operations. +struct PAIMON_EXPORT FullTextSearch { + /// Enumeration of supported full-text search types. + enum class SearchType { + /// All terms in the query must be present (AND semantics). + MATCH_ALL = 1, + /// Any term in the query can match (OR semantics). + MATCH_ANY = 2, + /// Matches the exact sequence of words (with proximity). + PHRASE = 3, + /// Matches terms starting with the given string (e.g., "run*" → running, runner). + PREFIX = 4, + /// Supports wildcards * and ? (e.g., "ap*e", "app?e" -> "apple"). + WILDCARD = 5, + /// Default/fallback type for unrecognized or invalid queries. + UNKNOWN = 128 + }; + + FullTextSearch(const std::string& _field_name, int32_t _limit, const std::string& _query, + const SearchType& _search_type) + : field_name(_field_name), limit(_limit), query(_query), search_type(_search_type) {} + + /// Name of the field to search within (must be a full-text indexed field). + std::string field_name; + /// Maximum number of documents to return. Ordered by scores. + int32_t limit; + /// The query string to search for. The interpretation depends on search_type: + /// + /// - For MATCH_ALL/MATCH_ANY: keywords are split into terms using the **same analyzer as + /// indexing**. + /// Example: "Hello World" → terms ["hello", "world"] (after lowercasing and tokenization). + /// + /// - For PHRASE: matches the exact word sequence (with optional slop). Also be analyzed. + /// + /// - For PREFIX: matches terms starting with the given string (e.g., "run" → running, runner). + /// Only the prefix part is considered; analysis will not be applied. + /// + /// - For WILDCARD: supports wildcards * and ? (e.g., "ap*e", "app?e"). + /// Not passed through analyzer — matched directly against indexed terms. + /// + /// @note Analyzer consistency between indexing and querying is critical for correctness. + std::string query; + /// Type of search to perform. + SearchType search_type; +}; +} // namespace paimon diff --git a/src/paimon/common/io/data_output_stream.h b/src/paimon/common/io/data_output_stream.h index 67d81e32..6d383a11 100644 --- a/src/paimon/common/io/data_output_stream.h +++ b/src/paimon/common/io/data_output_stream.h @@ -28,7 +28,7 @@ class OutputStream; // data output stream, support WriteValue() and WriteString() from OutputStream, also do big-endian // conversion to ensure cross-language compatibility -class DataOutputStream { +class PAIMON_EXPORT DataOutputStream { public: explicit DataOutputStream(const std::shared_ptr& output_stream); diff --git a/src/paimon/common/utils/options_utils.h b/src/paimon/common/utils/options_utils.h index 9070e856..35977d6c 100644 --- a/src/paimon/common/utils/options_utils.h +++ b/src/paimon/common/utils/options_utils.h @@ -73,6 +73,19 @@ class OptionsUtils { return value.value(); } + /// Fetch options with specific prefix and remove prefix for key. + static std::map FetchOptionsWithPrefix( + const std::string& prefix, const std::map& options) { + std::map options_with_prefix; + int64_t prefix_len = prefix.size(); + for (const auto& [key, value] : options) { + if (StringUtils::StartsWith(key, prefix)) { + options_with_prefix[key.substr(prefix_len)] = value; + } + } + return options_with_prefix; + } + private: template static std::string GetTypeName() { diff --git a/src/paimon/common/utils/options_utils_test.cpp b/src/paimon/common/utils/options_utils_test.cpp index 9c1830ed..2f762754 100644 --- a/src/paimon/common/utils/options_utils_test.cpp +++ b/src/paimon/common/utils/options_utils_test.cpp @@ -59,4 +59,11 @@ TEST(OptionsUtilsTest, TestGetValueFromMap) { OptionsUtils::GetValueFromMap(key_value_map, "", 999)); ASSERT_EQ(999, empty); } + +TEST(OptionsUtilsTest, TestFetchOptionsWithPrefix) { + std::map options = {{"key1", "value1"}, {"test.key2", "value2"}}; + auto new_options = OptionsUtils::FetchOptionsWithPrefix("test.", options); + std::map expected = {{"key2", "value2"}}; + ASSERT_EQ(expected, new_options); +} } // namespace paimon::test diff --git a/src/paimon/global_index/lucene/CMakeLists.txt b/src/paimon/global_index/lucene/CMakeLists.txt new file mode 100644 index 00000000..b049ca88 --- /dev/null +++ b/src/paimon/global_index/lucene/CMakeLists.txt @@ -0,0 +1,55 @@ +# Copyright 2026-present Alibaba Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(PAIMON_ENABLE_LUCENE) + set(PAIMON_LUCENE lucene_global_index.cpp lucene_directory.cpp + lucene_global_index_factory.cpp) + + add_paimon_lib(paimon_lucene_index + SOURCES + ${PAIMON_LUCENE} + EXTRA_INCLUDES + ${LUCENE_INCLUDE_DIR} + DEPENDENCIES + paimon_shared + lucene + STATIC_LINK_LIBS + lucene + arrow + fmt + dl + Threads::Threads + SHARED_LINK_LIBS + paimon_shared + SHARED_LINK_FLAGS + ${PAIMON_VERSION_SCRIPT_FLAGS}) + + if(PAIMON_BUILD_TESTS) + add_paimon_test(lucene_index_test + SOURCES + lucene_api_test.cpp + lucene_directory_test.cpp + lucene_global_index_test.cpp + EXTRA_INCLUDES + ${LUCENE_INCLUDE_DIR} + STATIC_LINK_LIBS + paimon_shared + test_utils_static + "-Wl,--whole-archive" + paimon_local_file_system_static + paimon_lucene_index_static + "-Wl,--no-whole-archive" + ${GTEST_LINK_TOOLCHAIN}) + endif() +endif() diff --git a/src/paimon/global_index/lucene/lucene_api_test.cpp b/src/paimon/global_index/lucene/lucene_api_test.cpp new file mode 100644 index 00000000..815ca510 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_api_test.cpp @@ -0,0 +1,106 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" +#include "lucene++/FileUtils.h" +#include "lucene++/LuceneHeaders.h" +#include "lucene++/MiscUtils.h" +#include "paimon/global_index/lucene/lucene_directory.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::lucene::test { +TEST(LuceneInterfaceTest, TestSimple) { + auto dir = paimon::test::UniqueTestDirectory::Create("local"); + std::string index_path = dir->Str() + "/lucene_test"; + auto lucene_dir = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(index_path), + Lucene::NoLockFactory::getNoLockFactory()); + + Lucene::IndexWriterPtr writer = Lucene::newLucene( + lucene_dir, + Lucene::newLucene(Lucene::LuceneVersion::LUCENE_CURRENT), + /*create=*/true, Lucene::IndexWriter::MaxFieldLengthLIMITED); + + Lucene::DocumentPtr doc = Lucene::newLucene(); + auto field = Lucene::newLucene(L"content", L"", Lucene::Field::STORE_NO, + Lucene::Field::INDEX_ANALYZED_NO_NORMS); + auto doc_id_field = Lucene::newLucene( + L"id", L"", Lucene::Field::STORE_YES, Lucene::Field::INDEX_NOT_ANALYZED_NO_NORMS); + + field->setOmitTermFreqAndPositions(false); + doc_id_field->setOmitTermFreqAndPositions(true); + doc->add(field); + doc->add(doc_id_field); + + auto build = [&](const std::wstring& doc_str, int32_t doc_id) { + field->setValue(doc_str); + doc_id_field->setValue(LuceneUtils::StringToWstring(std::to_string(doc_id))); + writer->addDocument(doc); + }; + + build(L"This is an test document.", 0); + build(L"This is an new document document document.", 1); + build(L"Document document document document test.", 2); + build(L"unordered user-defined doc id", 5); + build(L"", 6); // add a null doc + + writer->optimize(); + writer->close(); + + // read + Lucene::IndexReaderPtr reader = Lucene::IndexReader::open(lucene_dir, /*read_only=*/true); + Lucene::IndexSearcherPtr searcher = Lucene::newLucene(reader); + Lucene::QueryParserPtr parser = Lucene::newLucene( + Lucene::LuceneVersion::LUCENE_CURRENT, L"content", + Lucene::newLucene(Lucene::LuceneVersion::LUCENE_CURRENT)); + parser->setAllowLeadingWildcard(true); + + auto search = [&](const std::wstring& query_str, int32_t limit, + const std::vector& expected_doc_id_vec, + const std::vector& expected_doc_id_content_vec) { + Lucene::QueryPtr query = parser->parse(query_str); + Lucene::TopDocsPtr results = searcher->search(query, limit); + ASSERT_EQ(expected_doc_id_vec.size(), results->scoreDocs.size()); + + std::vector resule_doc_id_vec; + std::vector result_doc_id_content_vec; + for (auto score_doc : results->scoreDocs) { + Lucene::DocumentPtr result_doc = searcher->doc(score_doc->doc); + resule_doc_id_vec.push_back(score_doc->doc); + result_doc_id_content_vec.push_back(result_doc->get(L"id")); + } + ASSERT_EQ(resule_doc_id_vec, expected_doc_id_vec); + ASSERT_EQ(result_doc_id_content_vec, expected_doc_id_content_vec); + }; + + // result is sorted by tf-idf score + search(L"document", /*limit=*/10, std::vector({2, 1, 0}), + std::vector({L"2", L"1", L"0"})); + search(L"document", /*limit=*/1, std::vector({2}), std::vector({L"2"})); + search(L"test AND document", /*limit=*/10, std::vector({2, 0}), + std::vector({L"2", L"0"})); + search(L"test OR new", /*limit=*/10, std::vector({1, 0, 2}), + std::vector({L"1", L"0", L"2"})); + search(L"\"test document\"", /*limit=*/10, std::vector({0}), + std::vector({L"0"})); + search(L"unordered", /*limit=*/10, std::vector({3}), + std::vector({L"5"})); + search(L"*orDer*", /*limit=*/10, std::vector({3}), std::vector({L"5"})); + + reader->close(); + lucene_dir->close(); +} + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_defs.h b/src/paimon/global_index/lucene/lucene_defs.h new file mode 100644 index 00000000..db47037f --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_defs.h @@ -0,0 +1,36 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace paimon::lucene { +static inline const int32_t kVersion = 1; +static inline const char kIdentifier[] = "lucene-fts"; +static inline const wchar_t kEmptyWstring[] = L""; +static inline const wchar_t kRowIdFieldWstring[] = L"_ROW_ID"; + +static inline const char kOptionKeyPrefix[] = "lucene-fts."; + +static inline const int32_t kDefaultReadBufferSize = 1024 * 1024; +// default is 1MB +static inline const char kLuceneReadBufferSize[] = "read.buffer-size"; +// default is false +static inline const char kLuceneWriteOmitTermFreqAndPositions[] = + "write.omit-term-freq-and-position"; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_directory.cpp b/src/paimon/global_index/lucene/lucene_directory.cpp new file mode 100644 index 00000000..8f642183 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_directory.cpp @@ -0,0 +1,107 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_directory.h" + +#include "paimon/common/io/offset_input_stream.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/global_index/lucene/lucene_input.h" +#include "paimon/global_index/lucene/lucene_utils.h" + +namespace paimon::lucene { +LuceneDirectory::LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input) + : LuceneDirectory::LuceneDirectory(path, file_name_to_offset_and_length, paimon_input, + kDefaultReadBufferSize) {} + +LuceneDirectory::LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input, int32_t input_buffer_size) + : Lucene::Directory(), + input_buffer_size_(input_buffer_size), + path_(path), + file_name_to_offset_and_length_(file_name_to_offset_and_length), + paimon_input_(paimon_input) { + Lucene::Directory::setLockFactory(Lucene::NoLockFactory::getNoLockFactory()); +} + +Lucene::HashSet LuceneDirectory::listAll() { + ensureOpen(); + Lucene::HashSet result_file_list( + Lucene::HashSet::newInstance()); + for (const auto& [file_name, _] : file_name_to_offset_and_length_) { + result_file_list.add(LuceneUtils::StringToWstring(file_name)); + } + return result_file_list; +} + +bool LuceneDirectory::fileExists(const Lucene::String& name) { + ensureOpen(); + auto iter = file_name_to_offset_and_length_.find(LuceneUtils::WstringToString(name)); + return iter != file_name_to_offset_and_length_.end(); +} + +uint64_t LuceneDirectory::fileModified(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support fileModified()"); +} + +void LuceneDirectory::touchFile(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support touchFile()"); +} + +void LuceneDirectory::deleteFile(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support deleteFile()"); +} + +int64_t LuceneDirectory::fileLength(const Lucene::String& name) { + ensureOpen(); + auto iter = file_name_to_offset_and_length_.find(LuceneUtils::WstringToString(name)); + if (iter == file_name_to_offset_and_length_.end()) { + throw Lucene::IOException(L"file not exist in fileLength"); + } + return iter->second.second; +} + +Lucene::IndexOutputPtr LuceneDirectory::createOutput(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support createOutput()"); +} + +Lucene::IndexInputPtr LuceneDirectory::openInput(const Lucene::String& name) { + ensureOpen(); + auto file_iter = file_name_to_offset_and_length_.find(LuceneUtils::WstringToString(name)); + if (file_iter == file_name_to_offset_and_length_.end()) { + throw Lucene::IOException(L"file not exist in openInput"); + } + const auto& [offset, length] = file_iter->second; + auto offset_input_result = OffsetInputStream::Create(paimon_input_, length, offset); + if (!offset_input_result.ok()) { + throw Lucene::IOException( + LuceneUtils::StringToWstring(offset_input_result.status().ToString())); + } + std::shared_ptr offset_input = std::move(offset_input_result).value(); + return Lucene::newLucene(Lucene::newLucene(offset_input), + input_buffer_size_); +} + +void LuceneDirectory::close() { + Lucene::SyncLock sync_lock(this); + isOpen = false; +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_directory.h b/src/paimon/global_index/lucene/lucene_directory.h new file mode 100644 index 00000000..e40a395f --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_directory.h @@ -0,0 +1,65 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "lucene++/LuceneHeaders.h" +#include "lucene++/NoLockFactory.h" +#include "paimon/fs/file_system.h" + +namespace paimon::lucene { +/// This class wraps a Paimon FileSystem instance to enable Lucene index reading capabilities, +/// but only supports read operations (e.g., openInput, fileLength). Write operations +/// (e.g., createOutput, delete) will throw an exception or be unsupported. +class LuceneDirectory : public Lucene::Directory { + public: + LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input); + + LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input, int32_t input_buffer_size); + + Lucene::HashSet listAll() override; + + bool fileExists(const Lucene::String& name) override; + + uint64_t fileModified(const Lucene::String& name) override; + + void touchFile(const Lucene::String& name) override; + + void deleteFile(const Lucene::String& name) override; + + int64_t fileLength(const Lucene::String& name) override; + + Lucene::IndexOutputPtr createOutput(const Lucene::String& name) override; + + Lucene::IndexInputPtr openInput(const Lucene::String& name) override; + + void close() override; + + private: + int32_t input_buffer_size_; + std::string path_; + /// @note All files are concatenated into a single physical file for the Paimon global index. + /// Use `file_name_to_offset_and_length_` and `paimon_input_` to obtain the actual + /// offset and length of each logical file within the merged file, which are used + /// to create Lucene index inputs for `Lucene::Directory`. + std::map> file_name_to_offset_and_length_; + std::shared_ptr paimon_input_; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_directory_test.cpp b/src/paimon/global_index/lucene/lucene_directory_test.cpp new file mode 100644 index 00000000..6c031ae4 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_directory_test.cpp @@ -0,0 +1,103 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_directory.h" + +#include "gtest/gtest.h" +#include "lucene++/FileUtils.h" +#include "lucene++/LuceneHeaders.h" +#include "lucene++/MiscUtils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::lucene::test { +class LuceneDirectoryTest : public ::testing::Test, public ::testing::WithParamInterface { + public: + void SetUp() override {} + void TearDown() override {} +}; + +TEST_P(LuceneDirectoryTest, TestSimple) { + int32_t read_buffer_size = GetParam(); + // write 3 files in a single concact file + std::vector data = {"helloworld", "abcdefg", "paimoncpp"}; + auto dir = paimon::test::UniqueTestDirectory::Create("local"); + auto lucene_directory = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(dir->Str()), + Lucene::NoLockFactory::getNoLockFactory()); + std::string single_file_name = "lucene-file"; + Lucene::IndexOutputPtr output = + lucene_directory->createOutput(LuceneUtils::StringToWstring(single_file_name)); + std::map> file_name_to_offset_and_length; + int64_t offset = 0; + for (size_t i = 0; i < data.size(); i++) { + const auto& data_str = data[i]; + output->writeBytes(reinterpret_cast(data_str.data()), /*offset=*/0, + data_str.size()); + file_name_to_offset_and_length["file" + std::to_string(i)] = {offset, data_str.size()}; + offset += data_str.size(); + } + output->close(); + + // create paimon directory + auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input, + fs->Open(PathUtil::JoinPath(dir->Str(), single_file_name))); + auto paimon_directory = std::make_shared( + dir->Str(), file_name_to_offset_and_length, input, read_buffer_size); + // test list all + auto list_file_names = paimon_directory->listAll(); + Lucene::HashSet expect_file_list( + Lucene::HashSet::newInstance()); + expect_file_list.add(L"file0"); + expect_file_list.add(L"file1"); + expect_file_list.add(L"file2"); + ASSERT_EQ(list_file_names, expect_file_list); + + // test non-exist file + ASSERT_FALSE(paimon_directory->fileExists(L"non-exist-file")); + ASSERT_THROW(paimon_directory->fileLength(L"non-exist-file"), Lucene::IOException); + + for (size_t i = 0; i < data.size(); i++) { + std::wstring file_name = LuceneUtils::StringToWstring("file" + std::to_string(i)); + // check file exist + ASSERT_TRUE(paimon_directory->fileExists(file_name)); + // check file length + ASSERT_EQ(paimon_directory->fileLength(file_name), data[i].size()); + // check read data + Lucene::IndexInputPtr data_input = paimon_directory->openInput(file_name); + ASSERT_TRUE(data_input); + std::string read_data(data[i].size(), '\0'); + data_input->readBytes(reinterpret_cast(read_data.data()), + /*offset=*/0, /*length=*/data[i].size(), + /*useBuffer=*/true); + ASSERT_EQ(read_data, data[i]); + // check seek + data_input->seek(1); + ASSERT_EQ(1l, data_input->getFilePointer()); + // check read after seek + read_data.resize(data[i].size() - 1, '\0'); + data_input->readBytes(reinterpret_cast(read_data.data()), + /*offset=*/0, /*length=*/data[i].size() - 1, + /*useBuffer=*/true); + ASSERT_EQ(read_data, data[i].substr(1)); + ASSERT_EQ(data[i].size(), data_input->length()); + data_input->close(); + } +} +INSTANTIATE_TEST_SUITE_P(ReadBufferSize, LuceneDirectoryTest, + ::testing::ValuesIn(std::vector({10, 100, 1024}))); + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_global_index.cpp b/src/paimon/global_index/lucene/lucene_global_index.cpp new file mode 100644 index 00000000..1502cf2a --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index.cpp @@ -0,0 +1,417 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_global_index.h" + +#include + +#include "arrow/c/bridge.h" +#include "lucene++/FileUtils.h" +#include "paimon/common/io/data_output_stream.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/rapidjson_util.h" +#include "paimon/common/utils/uuid.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/global_index/lucene/lucene_directory.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/io/data_input_stream.h" + +namespace paimon::lucene { +#define CHECK_NOT_NULL(pointer, error_msg) \ + do { \ + if (!(pointer)) { \ + return Status::Invalid(error_msg); \ + } \ + } while (0) + +LuceneGlobalIndex::LuceneGlobalIndex(const std::map& options) + : options_(OptionsUtils::FetchOptionsWithPrefix(kOptionKeyPrefix, options)) {} + +Result> LuceneGlobalIndex::CreateWriter( + const std::string& field_name, ::ArrowSchema* arrow_schema, + const std::shared_ptr& file_writer, + const std::shared_ptr& pool) const { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_type, + arrow::ImportType(arrow_schema)); + // check data type + auto struct_type = std::dynamic_pointer_cast(arrow_type); + CHECK_NOT_NULL(struct_type, "arrow schema must be struct type when create LuceneIndexWriter"); + auto index_field = struct_type->GetFieldByName(field_name); + CHECK_NOT_NULL(index_field, + fmt::format("field {} not exist in arrow schema when create LuceneIndexWriter", + field_name)); + if (index_field->type()->id() != arrow::Type::type::STRING) { + return Status::Invalid("field type must be string"); + } + return LuceneGlobalIndexWriter::Create(field_name, arrow_type, file_writer, options_, pool); +} + +Result> LuceneGlobalIndex::CreateReader( + ::ArrowSchema* c_arrow_schema, const std::shared_ptr& file_reader, + const std::vector& files, const std::shared_ptr& pool) const { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_schema, + arrow::ImportSchema(c_arrow_schema)); + if (files.size() != 1) { + return Status::Invalid("lucene index only has one index file per shard"); + } + const auto& io_meta = files[0]; + // check data type + if (arrow_schema->num_fields() != 1) { + return Status::Invalid("LuceneGlobalIndex now only support one field"); + } + auto index_field = arrow_schema->field(0); + if (index_field->type()->id() != arrow::Type::type::STRING) { + return Status::Invalid("field type must be string"); + } + return LuceneGlobalIndexReader::Create(index_field->name(), io_meta, file_reader, options_, + pool); +} + +LuceneGlobalIndexWriter::LuceneWriteContext::LuceneWriteContext( + const std::string& _tmp_index_path, const Lucene::FSDirectoryPtr& _lucene_dir, + const Lucene::IndexWriterPtr& _index_writer, const Lucene::DocumentPtr& _doc, + const Lucene::FieldPtr& _field, const Lucene::FieldPtr& _row_id_field) + : tmp_index_path(_tmp_index_path), + lucene_dir(_lucene_dir), + index_writer(_index_writer), + doc(_doc), + field(_field), + row_id_field(_row_id_field) {} + +Result> LuceneGlobalIndexWriter::Create( + const std::string& field_name, const std::shared_ptr& arrow_type, + const std::shared_ptr& file_writer, + const std::map& options, const std::shared_ptr& pool) { + try { + std::string uuid; + if (!UUID::Generate(&uuid)) { + return Status::Invalid("generate uuid for lucene tmp path failed."); + } + // create a local tmp path + std::string tmp_path = PathUtil::JoinPath(std::filesystem::temp_directory_path().string(), + "paimon-lucene-" + uuid); + auto lucene_dir = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(tmp_path), + Lucene::NoLockFactory::getNoLockFactory()); + // TODO(xinyu.lxy): support other tokenizer + // open lucene index writer + Lucene::IndexWriterPtr writer = Lucene::newLucene( + lucene_dir, + Lucene::newLucene(Lucene::LuceneVersion::LUCENE_CURRENT), + /*create=*/true, Lucene::IndexWriter::MaxFieldLengthLIMITED); + + // prepare field and document + Lucene::DocumentPtr doc = Lucene::newLucene(); + auto field = Lucene::newLucene(LuceneUtils::StringToWstring(field_name), + kEmptyWstring, Lucene::Field::STORE_NO, + Lucene::Field::INDEX_ANALYZED_NO_NORMS); + auto row_id_field = Lucene::newLucene( + kRowIdFieldWstring, kEmptyWstring, Lucene::Field::STORE_YES, + Lucene::Field::INDEX_NOT_ANALYZED_NO_NORMS); + PAIMON_ASSIGN_OR_RAISE( + bool omit_term_freq_and_positions, + OptionsUtils::GetValueFromMap(options, kLuceneWriteOmitTermFreqAndPositions, false)); + field->setOmitTermFreqAndPositions(omit_term_freq_and_positions); + row_id_field->setOmitTermFreqAndPositions(true); + doc->add(field); + doc->add(row_id_field); + return std::shared_ptr(new LuceneGlobalIndexWriter( + field_name, arrow_type, + LuceneWriteContext(tmp_path, lucene_dir, writer, doc, field, row_id_field), file_writer, + options, pool)); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("create lucene global index writer failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "create lucene global index writer failed, with unknown error."); + } +} + +LuceneGlobalIndexWriter::LuceneGlobalIndexWriter( + const std::string& field_name, const std::shared_ptr& arrow_type, + LuceneWriteContext&& write_context, const std::shared_ptr& file_writer, + const std::map& options, const std::shared_ptr& pool) + : pool_(pool), + field_name_(field_name), + arrow_type_(arrow_type), + write_context_(std::move(write_context)), + file_writer_(file_writer), + options_(options) {} + +LuceneGlobalIndexWriter::~LuceneGlobalIndexWriter() { + try { + [[maybe_unused]] bool ec = Lucene::FileUtils::removeDirectory( + LuceneUtils::StringToWstring(write_context_.tmp_index_path)); + } catch (...) { + // do nothing + } +} + +Status LuceneGlobalIndexWriter::AddBatch(::ArrowArray* arrow_array) { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, + arrow::ImportArray(arrow_array, arrow_type_)); + auto struct_array = std::dynamic_pointer_cast(array); + CHECK_NOT_NULL(struct_array, "invalid input array in LuceneIndexWriter, must be struct array"); + auto field_array = struct_array->GetFieldByName(field_name_); + CHECK_NOT_NULL( + field_array, + fmt::format("invalid input array in LuceneIndexWriter, field {} not in input array", + field_name_)); + auto string_array = std::dynamic_pointer_cast(field_array); + CHECK_NOT_NULL( + string_array, + fmt::format( + "invalid input array in LuceneIndexWriter, field array {} is not a string array", + field_name_)); + try { + for (int64_t i = 0; i < string_array->length(); i++) { + if (string_array->IsNull(i)) { + write_context_.field->setValue(kEmptyWstring); + } else { + auto view = string_array->Value(i); + write_context_.field->setValue(LuceneUtils::StringToWstring(view)); + } + write_context_.row_id_field->setValue( + LuceneUtils::StringToWstring(std::to_string(row_id_++))); + write_context_.index_writer->addDocument(write_context_.doc); + } + } catch (const std::exception& e) { + return Status::Invalid(fmt::format( + "add batch for lucene global index writer failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "add batch for lucene global index writer failed, with unknown error."); + } + return Status::OK(); +} + +Result LuceneGlobalIndexWriter::FlushIndexToFinal() const { + try { + // flush index to tmp dir + write_context_.index_writer->optimize(); + write_context_.index_writer->close(); + + // list tmp dir + auto tmp_file_names = write_context_.lucene_dir->listAll(); + PAIMON_ASSIGN_OR_RAISE(std::string index_file_name, file_writer_->NewFileName(kIdentifier)); + // prepare output from file_writer + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, + file_writer_->NewOutputStream(index_file_name)); + DataOutputStream data_output_stream(out); + PAIMON_RETURN_NOT_OK(data_output_stream.WriteValue(kVersion)); + PAIMON_RETURN_NOT_OK( + data_output_stream.WriteValue(static_cast(tmp_file_names.size()))); + // read all data from index files and write to target output + auto buffer = std::make_shared(kDefaultReadBufferSize, pool_.get()); + for (const auto& wfile_name : tmp_file_names) { + auto file_name = LuceneUtils::WstringToString(wfile_name); + PAIMON_RETURN_NOT_OK( + data_output_stream.WriteValue(static_cast(file_name.size()))); + PAIMON_RETURN_NOT_OK( + data_output_stream.WriteBytes(std::make_shared(file_name, pool_.get()))); + int64_t file_length = write_context_.lucene_dir->fileLength(wfile_name); + PAIMON_RETURN_NOT_OK(data_output_stream.WriteValue(file_length)); + + Lucene::IndexInputPtr input = write_context_.lucene_dir->openInput(wfile_name); + int64_t total_write_size = 0; + while (total_write_size < file_length) { + int64_t current_write_size = std::min(file_length - total_write_size, + static_cast(kDefaultReadBufferSize)); + input->readBytes(reinterpret_cast(buffer->data()), /*offset=*/0, + static_cast(current_write_size)); + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_write_size, + out->Write(buffer->data(), static_cast(current_write_size))); + if (static_cast(actual_write_size) != current_write_size) { + return Status::Invalid( + fmt::format("invalid write, try to write {} while actual write {}", + current_write_size, actual_write_size)); + } + total_write_size += current_write_size; + } + input->close(); + } + PAIMON_RETURN_NOT_OK(out->Flush()); + PAIMON_RETURN_NOT_OK(out->Close()); + write_context_.lucene_dir->close(); + return index_file_name; + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("finish for lucene global index writer failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "finish for lucene global index writer failed, with unknown error."); + } +} + +Result> LuceneGlobalIndexWriter::Finish() { + PAIMON_ASSIGN_OR_RAISE(std::string index_file_name, FlushIndexToFinal()); + // prepare global index meta + PAIMON_ASSIGN_OR_RAISE(int64_t file_size, file_writer_->GetFileSize(index_file_name)); + std::string options_json; + PAIMON_RETURN_NOT_OK(RapidJsonUtil::ToJsonString(options_, &options_json)); + auto meta_bytes = std::make_shared(options_json, pool_.get()); + GlobalIndexIOMeta meta(file_writer_->ToPath(index_file_name), file_size, + /*range_end=*/static_cast(row_id_) - 1, + /*metadata=*/meta_bytes); + return std::vector({meta}); +} + +Result> LuceneGlobalIndexReader::Create( + const std::string& field_name, const GlobalIndexIOMeta& io_meta, + const std::shared_ptr& file_reader, + const std::map& options, const std::shared_ptr& pool) { + try { + std::map> file_name_to_offset_and_length; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr paimon_input, + file_reader->GetInputStream(io_meta.file_path)); + DataInputStream data_input_stream(paimon_input); + PAIMON_ASSIGN_OR_RAISE(int32_t version, data_input_stream.ReadValue()); + if (version != kVersion) { + return Status::Invalid(fmt::format("LuceneGlobalIndex not support version {}"), + kVersion); + } + PAIMON_ASSIGN_OR_RAISE(int32_t num_files, data_input_stream.ReadValue()); + for (int32_t i = 0; i < num_files; i++) { + PAIMON_ASSIGN_OR_RAISE(int32_t file_name_len, data_input_stream.ReadValue()); + auto file_name_bytes = std::make_shared(file_name_len, pool.get()); + PAIMON_RETURN_NOT_OK(data_input_stream.ReadBytes(file_name_bytes.get())); + std::string file_name(file_name_bytes->data(), file_name_bytes->size()); + PAIMON_ASSIGN_OR_RAISE(int64_t file_len, data_input_stream.ReadValue()); + PAIMON_ASSIGN_OR_RAISE(int64_t pos, data_input_stream.GetPos()); + file_name_to_offset_and_length[file_name] = {pos, file_len}; + pos += file_len; + if (i != num_files - 1) { + PAIMON_RETURN_NOT_OK(data_input_stream.Seek(pos)); + } + } + PAIMON_ASSIGN_OR_RAISE( + int32_t read_buffer_size, + OptionsUtils::GetValueFromMap(options, kLuceneReadBufferSize, kDefaultReadBufferSize)); + Lucene::DirectoryPtr lucene_dir = Lucene::newLucene( + PathUtil::GetParentDirPath(io_meta.file_path), file_name_to_offset_and_length, + paimon_input, read_buffer_size); + + Lucene::IndexReaderPtr reader = Lucene::IndexReader::open(lucene_dir, /*read_only=*/true); + Lucene::IndexSearcherPtr searcher = Lucene::newLucene(reader); + return std::shared_ptr(new LuceneGlobalIndexReader( + LuceneUtils::StringToWstring(field_name), io_meta.range_end, searcher)); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("create lucene global index reader failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "create lucene global index reader failed, with unknown error."); + } +} + +std::vector LuceneGlobalIndexReader::TokenizeQuery(const std::string& query) { + // TODO(xinyu.lxy): support jieba analyzer + std::vector terms = + StringUtils::Split(query, /*sep_str=*/" ", /*ignore_empty=*/true); + std::vector wterms; + wterms.reserve(terms.size()); + for (const auto& term : terms) { + wterms.push_back(LuceneUtils::StringToWstring(term)); + } + return wterms; +} + +Result> LuceneGlobalIndexReader::VisitFullTextSearch( + const std::shared_ptr& full_text_search) { + try { + Lucene::QueryPtr query; + switch (full_text_search->search_type) { + case FullTextSearch::SearchType::MATCH_ALL: + case FullTextSearch::SearchType::MATCH_ANY: { + Lucene::BooleanClause::Occur occur = + full_text_search->search_type == FullTextSearch::SearchType::MATCH_ALL + ? Lucene::BooleanClause::Occur::MUST + : Lucene::BooleanClause::Occur::SHOULD; + std::vector query_terms = TokenizeQuery(full_text_search->query); + if (query_terms.size() == 1) { + query = Lucene::newLucene( + Lucene::newLucene(wfield_name_, query_terms[0])); + } else { + auto typed_query = Lucene::newLucene(); + for (const auto& term : query_terms) { + typed_query->add(Lucene::newLucene( + Lucene::newLucene(wfield_name_, term)), + occur); + } + query = typed_query; + } + break; + } + case FullTextSearch::SearchType::PHRASE: { + std::vector query_terms = TokenizeQuery(full_text_search->query); + auto typed_query = Lucene::newLucene(); + for (const auto& term : query_terms) { + typed_query->add(Lucene::newLucene(wfield_name_, term)); + } + query = typed_query; + break; + } + case FullTextSearch::SearchType::PREFIX: { + query = Lucene::newLucene(Lucene::newLucene( + wfield_name_, LuceneUtils::StringToWstring(full_text_search->query))); + break; + } + case FullTextSearch::SearchType::WILDCARD: { + query = Lucene::newLucene(Lucene::newLucene( + wfield_name_, LuceneUtils::StringToWstring(full_text_search->query))); + break; + } + default: + return Status::Invalid( + fmt::format("Not support for FullTextSearch SearchType {}", + static_cast(full_text_search->search_type))); + } + + Lucene::TopDocsPtr results = searcher_->search(query, full_text_search->limit); + + // prepare BitmapVectorSearchGlobalIndexResult + std::map id_to_score; + for (auto score_doc : results->scoreDocs) { + Lucene::DocumentPtr result_doc = searcher_->doc(score_doc->doc); + std::string row_id_str = + LuceneUtils::WstringToString(result_doc->get(kRowIdFieldWstring)); + std::optional row_id = StringUtils::StringToValue(row_id_str); + if (!row_id) { + return Status::Invalid(fmt::format("parse row id str {} to int failed"), + row_id_str); + } + id_to_score[static_cast(row_id.value())] = + static_cast(score_doc->score); + } + RoaringBitmap64 bitmap; + std::vector scores; + scores.reserve(id_to_score.size()); + for (const auto& [id, score] : id_to_score) { + bitmap.Add(id); + scores.push_back(score); + } + return std::make_shared(std::move(bitmap), + std::move(scores)); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("visit term query failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError("visit term query failed, with unknown error."); + } +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index.h b/src/paimon/global_index/lucene/lucene_global_index.h new file mode 100644 index 00000000..67dd27ec --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index.h @@ -0,0 +1,186 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/type.h" +#include "lucene++/LuceneHeaders.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/global_indexer.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/predicate/full_text_search.h" +namespace paimon::lucene { +class LuceneGlobalIndex : public GlobalIndexer { + public: + explicit LuceneGlobalIndex(const std::map& options); + + Result> CreateWriter( + const std::string& field_name, ::ArrowSchema* arrow_schema, + const std::shared_ptr& file_writer, + const std::shared_ptr& pool) const override; + + Result> CreateReader( + ::ArrowSchema* arrow_schema, const std::shared_ptr& file_reader, + const std::vector& files, + const std::shared_ptr& pool) const override; + + private: + std::map options_; +}; + +class LuceneGlobalIndexWriter : public GlobalIndexWriter { + public: + struct LuceneWriteContext { + LuceneWriteContext(const std::string& _tmp_index_path, + const Lucene::FSDirectoryPtr& _lucene_dir, + const Lucene::IndexWriterPtr& _index_writer, + const Lucene::DocumentPtr& _doc, const Lucene::FieldPtr& _field, + const Lucene::FieldPtr& _row_id_field); + + LuceneWriteContext(LuceneWriteContext&&) = default; + LuceneWriteContext& operator=(LuceneWriteContext&&) = default; + + std::string tmp_index_path; + Lucene::FSDirectoryPtr lucene_dir; + Lucene::IndexWriterPtr index_writer; + Lucene::DocumentPtr doc; + Lucene::FieldPtr field; + Lucene::FieldPtr row_id_field; + }; + + static Result> Create( + const std::string& field_name, const std::shared_ptr& arrow_type, + const std::shared_ptr& file_writer, + const std::map& options, const std::shared_ptr& pool); + + ~LuceneGlobalIndexWriter() override; + + Status AddBatch(::ArrowArray* c_arrow_array) override; + + Result> Finish() override; + + private: + LuceneGlobalIndexWriter(const std::string& field_name, + const std::shared_ptr& arrow_type, + LuceneWriteContext&& write_context, + const std::shared_ptr& file_writer, + const std::map& options, + const std::shared_ptr& pool); + + Result FlushIndexToFinal() const; + + private: + std::shared_ptr pool_; + int32_t row_id_ = 0; + std::string field_name_; + std::shared_ptr arrow_type_; + LuceneWriteContext write_context_; + std::shared_ptr file_writer_; + std::map options_; +}; + +class LuceneGlobalIndexReader : public GlobalIndexReader { + public: + static Result> Create( + const std::string& field_name, const GlobalIndexIOMeta& io_meta, + const std::shared_ptr& file_reader, + const std::map& options, const std::shared_ptr& pool); + + Result> VisitIsNotNull() override { + return CreateAllResult(); + } + + Result> VisitIsNull() override { + return CreateAllResult(); + } + + Result> VisitEqual(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitNotEqual(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitLessThan(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitLessOrEqual(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitGreaterThan(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitGreaterOrEqual( + const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitIn( + const std::vector& literals) override { + return CreateAllResult(); + } + + Result> VisitNotIn( + const std::vector& literals) override { + return CreateAllResult(); + } + + Result> VisitStartsWith(const Literal& prefix) override { + return CreateAllResult(); + } + + Result> VisitEndsWith(const Literal& suffix) override { + return CreateAllResult(); + } + + Result> VisitContains(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitVectorSearch( + const std::shared_ptr& vector_search) override { + return Status::Invalid( + "LuceneGlobalIndexReader is not supposed to handle vector search query"); + } + + Result> VisitFullTextSearch( + const std::shared_ptr& full_text_search); + + private: + LuceneGlobalIndexReader(const std::wstring& wfield_name, int64_t range_end, + const Lucene::IndexSearcherPtr& searcher) + : range_end_(range_end), wfield_name_(wfield_name), searcher_(searcher) {} + + static std::vector TokenizeQuery(const std::string& query); + + std::shared_ptr CreateAllResult() const { + return BitmapGlobalIndexResult::FromRanges({Range(0, range_end_)}); + } + + private: + int64_t range_end_; + std::wstring wfield_name_; + Lucene::IndexSearcherPtr searcher_; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_factory.cpp b/src/paimon/global_index/lucene/lucene_global_index_factory.cpp new file mode 100644 index 00000000..76aa5ad6 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_factory.cpp @@ -0,0 +1,36 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/global_index/lucene/lucene_global_index_factory.h" + +#include +#include +#include +#include + +#include "paimon/global_index/lucene/lucene_global_index.h" +namespace paimon::lucene { + +const char LuceneGlobalIndexFactory::IDENTIFIER[] = "lucene-fts-global"; + +Result> LuceneGlobalIndexFactory::Create( + const std::map& options) const { + return std::make_unique(options); +} + +REGISTER_PAIMON_FACTORY(LuceneGlobalIndexFactory); + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_factory.h b/src/paimon/global_index/lucene/lucene_global_index_factory.h new file mode 100644 index 00000000..1b00dbfa --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_factory.h @@ -0,0 +1,39 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/global_index/global_indexer.h" +#include "paimon/global_index/global_indexer_factory.h" +namespace paimon::lucene { +/// Factory for creating lucene global indexers. +class LuceneGlobalIndexFactory : public GlobalIndexerFactory { + public: + static const char IDENTIFIER[]; + + const char* Identifier() const override { + return IDENTIFIER; + } + + Result> Create( + const std::map& options) const override; +}; + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_test.cpp b/src/paimon/global_index/lucene/lucene_global_index_test.cpp new file mode 100644 index 00000000..bd13a9c1 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_test.cpp @@ -0,0 +1,225 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_global_index.h" + +#include "arrow/c/bridge.h" +#include "arrow/ipc/api.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/global_index/global_index_file_manager.h" +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/global_index/bitmap_vector_search_global_index_result.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::lucene::test { +class LuceneGlobalIndexTest : public ::testing::Test, + public ::testing::WithParamInterface { + public: + void SetUp() override {} + void TearDown() override {} + + class FakeIndexPathFactory : public IndexPathFactory { + public: + explicit FakeIndexPathFactory(const std::string& index_path) : index_path_(index_path) {} + std::string NewPath() const override { + assert(false); + return ""; + } + std::string ToPath(const std::shared_ptr& file) const override { + assert(false); + return ""; + } + std::string ToPath(const std::string& file_name) const override { + return PathUtil::JoinPath(index_path_, file_name); + } + bool IsExternalPath() const override { + return false; + } + + private: + std::string index_path_; + }; + + std::unique_ptr<::ArrowSchema> CreateArrowSchema( + const std::shared_ptr& data_type) const { + auto c_schema = std::make_unique<::ArrowSchema>(); + EXPECT_TRUE(arrow::ExportType(*data_type, c_schema.get()).ok()); + return c_schema; + } + + Result WriteGlobalIndex(const std::string& index_root, + const std::shared_ptr& data_type, + const std::map& options, + const std::shared_ptr& array, + const Range& expected_range) const { + auto global_index = std::make_shared(options); + auto path_factory = std::make_shared(index_root); + auto file_writer = std::make_shared(fs_, path_factory); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr global_writer, + global_index->CreateWriter("f0", CreateArrowSchema(data_type).get(), + file_writer, pool_)); + + ArrowArray c_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &c_array)); + PAIMON_RETURN_NOT_OK(global_writer->AddBatch(&c_array)); + PAIMON_ASSIGN_OR_RAISE(auto result_metas, global_writer->Finish()); + // check meta + EXPECT_EQ(result_metas.size(), 1); + auto file_name = PathUtil::GetName(result_metas[0].file_path); + EXPECT_TRUE(StringUtils::StartsWith(file_name, "lucene-fts-global-index-")); + EXPECT_TRUE(StringUtils::EndsWith(file_name, ".index")); + EXPECT_EQ(result_metas[0].range_end, expected_range.to); + EXPECT_TRUE(result_metas[0].metadata); + return result_metas[0]; + } + + Result> CreateGlobalIndexReader( + const std::string& index_root, const std::shared_ptr& data_type, + const std::map& options, const GlobalIndexIOMeta& meta) const { + auto global_index = std::make_shared(options); + auto path_factory = std::make_shared(index_root); + auto file_reader = std::make_shared(fs_, path_factory); + return global_index->CreateReader(CreateArrowSchema(data_type).get(), file_reader, {meta}, + pool_); + } + + void CheckResult(const std::shared_ptr& result, + const std::vector& expected_ids) const { + auto typed_result = std::dynamic_pointer_cast(result); + ASSERT_TRUE(typed_result); + ASSERT_OK_AND_ASSIGN(const RoaringBitmap64* bitmap, typed_result->GetBitmap()); + ASSERT_TRUE(bitmap); + ASSERT_EQ(*(typed_result->GetBitmap().value()), RoaringBitmap64::From(expected_ids)) + << "result=" << (typed_result->GetBitmap().value())->ToString() + << ", expected=" << RoaringBitmap64::From(expected_ids).ToString(); + ASSERT_EQ(typed_result->scores_.size(), expected_ids.size()); + } + + private: + std::shared_ptr pool_ = GetDefaultPool(); + std::shared_ptr fs_ = std::make_shared(); + std::shared_ptr data_type_ = + arrow::struct_({arrow::field("f0", arrow::utf8())}); +}; + +TEST_P(LuceneGlobalIndexTest, TestSimple) { + int32_t read_buffer_size = GetParam(); + + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + std::map options = { + {"lucene-fts.write.omit-term-freq-and-position", "false"}, + {"lucene-fts.read.buffer-size", std::to_string(read_buffer_size)}}; + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + ["This is an test document."], + ["This is an new document document document."], + ["Document document document document test."], + ["unordered user-defined doc id"] + ])") + .ValueOrDie(); + + // write index + ASSERT_OK_AND_ASSIGN(auto meta, + WriteGlobalIndex(test_root, data_type_, options, array, Range(0, 3))); + if (read_buffer_size == 10) { + ASSERT_EQ(std::string(meta.metadata->data(), meta.metadata->size()), + R"({"read.buffer-size":"10","write.omit-term-freq-and-position":"false"})"); + } + + // create reader + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options, meta)); + auto lucene_reader = std::dynamic_pointer_cast(reader); + ASSERT_TRUE(lucene_reader); + + // test visit + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "document", FullTextSearch::SearchType::MATCH_ALL))); + CheckResult(result, {2l, 1l, 0l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/1, "document", FullTextSearch::SearchType::MATCH_ANY))); + CheckResult(result, {2l}); + } + { + ASSERT_OK_AND_ASSIGN( + auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "test document", FullTextSearch::SearchType::MATCH_ALL))); + CheckResult(result, {2l, 0l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "test new", FullTextSearch::SearchType::MATCH_ANY))); + CheckResult(result, {1l, 0l, 2l}); + } + { + ASSERT_OK_AND_ASSIGN( + auto result, lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "test document", FullTextSearch::SearchType::PHRASE))); + CheckResult(result, {0l}); + } + { + ASSERT_OK_AND_ASSIGN( + auto result, lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "unordered", FullTextSearch::SearchType::MATCH_ALL))); + CheckResult(result, {3l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "unorder", FullTextSearch::SearchType::PREFIX))); + CheckResult(result, {3l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "*order*", FullTextSearch::SearchType::WILDCARD))); + CheckResult(result, {3l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "*or*er*", FullTextSearch::SearchType::WILDCARD))); + CheckResult(result, {3l}); + } +} + +INSTANTIATE_TEST_SUITE_P(ReadBufferSize, LuceneGlobalIndexTest, + ::testing::ValuesIn(std::vector({10, 100, 1024}))); + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_input.h b/src/paimon/global_index/lucene/lucene_input.h new file mode 100644 index 00000000..a16880a2 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_input.h @@ -0,0 +1,89 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "lucene++/BufferedIndexInput.h" +#include "paimon/fs/file_system.h" +#include "paimon/global_index/lucene/lucene_utils.h" + +namespace paimon::lucene { +class LuceneSyncInput : public Lucene::LuceneObject { + public: + explicit LuceneSyncInput(const std::shared_ptr& in_stream) : in_(in_stream) {} + const std::shared_ptr& GetInput() const { + return in_; + } + + private: + std::shared_ptr in_; +}; + +class LuceneIndexInput : public Lucene::BufferedIndexInput { + public: + LuceneIndexInput(const boost::shared_ptr& in_stream, int32_t buffer_size) + : Lucene::BufferedIndexInput(buffer_size), + input_buffer_size_(buffer_size), + in_stream_(in_stream) {} + + public: + int64_t length() override { + auto result = in_stream_->GetInput()->Length(); + if (!result.ok()) { + throw Lucene::IOException(LuceneUtils::StringToWstring(result.status().ToString())); + } + return static_cast(result.value()); + } + void close() override { + if (is_clone_) { + return; + } + if (in_stream_) { + in_stream_.reset(); + } + } + + private: + void readInternal(uint8_t* b, int32_t offset, int32_t length) override { + Lucene::SyncLock lock(in_stream_); + int64_t position = getFilePointer(); + auto read_result = + in_stream_->GetInput()->Read(reinterpret_cast(b + offset), length, position); + if (!read_result.ok()) { + throw Lucene::IOException( + LuceneUtils::StringToWstring(read_result.status().ToString())); + } + if (read_result.value() != length) { + throw Lucene::IOException(L"actual read len and expect read len mismatch"); + } + } + void seekInternal(int64_t pos) override {} + + Lucene::LuceneObjectPtr clone(const Lucene::LuceneObjectPtr& other) override { + Lucene::LuceneObjectPtr clone = Lucene::BufferedIndexInput::clone( + other ? other : Lucene::newLucene(in_stream_, input_buffer_size_)); + boost::shared_ptr clone_index_input( + boost::dynamic_pointer_cast(clone)); + clone_index_input->in_stream_ = in_stream_; + clone_index_input->is_clone_ = true; + return clone_index_input; + } + + private: + bool is_clone_ = false; + int32_t input_buffer_size_; + boost::shared_ptr in_stream_; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lumina/lumina_utils_test.cpp b/src/paimon/global_index/lucene/lucene_utils.h similarity index 51% rename from src/paimon/global_index/lumina/lumina_utils_test.cpp rename to src/paimon/global_index/lucene/lucene_utils.h index b8479a53..70a35dcc 100644 --- a/src/paimon/global_index/lumina/lumina_utils_test.cpp +++ b/src/paimon/global_index/lucene/lucene_utils.h @@ -13,16 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "paimon/global_index/lumina/lumina_utils.h" +#pragma once -#include +#include "lucene++/StringUtils.h" +namespace paimon::lucene { +class LuceneUtils { + public: + LuceneUtils() = delete; + ~LuceneUtils() = delete; -#include "paimon/testing/utils/testharness.h" -namespace paimon::lumina::test { -TEST(LuminaUtilsTest, TestSimple) { - std::map options = {{"key1", "value1"}, {"lumina.key2", "value2"}}; - auto lumina_options = LuminaUtils::FetchLuminaOptions(options); - std::map expected = {{"key2", "value2"}}; - ASSERT_EQ(expected, lumina_options); -} -} // namespace paimon::lumina::test + template + static Lucene::String StringToWstring(const StrType& str) { + return Lucene::StringUtils::toUnicode(reinterpret_cast(str.data()), + str.length()); + } + + static std::string WstringToString(const Lucene::String& wstr) { + return Lucene::StringUtils::toUTF8(wstr); + } +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lumina/CMakeLists.txt b/src/paimon/global_index/lumina/CMakeLists.txt index e142aa7c..ad683272 100644 --- a/src/paimon/global_index/lumina/CMakeLists.txt +++ b/src/paimon/global_index/lumina/CMakeLists.txt @@ -35,7 +35,6 @@ if(PAIMON_ENABLE_LUMINA) if(PAIMON_BUILD_TESTS) add_paimon_test(lumina_index_test SOURCES - lumina_utils_test.cpp lumina_api_test.cpp lumina_file_io_test.cpp lumina_global_index_test.cpp diff --git a/src/paimon/global_index/lumina/lumina_global_index.cpp b/src/paimon/global_index/lumina/lumina_global_index.cpp index 64966e8e..5731d979 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index.cpp @@ -61,7 +61,8 @@ Result> LuminaGlobalIndex::CreateWriter( } // check options - auto lumina_options = LuminaUtils::FetchLuminaOptions(options_); + auto lumina_options = + OptionsUtils::FetchOptionsWithPrefix(LuminaDefines::kOptionKeyPrefix, options_); PAIMON_ASSIGN_OR_RAISE(uint32_t dimension, OptionsUtils::GetValueFromMap( lumina_options, std::string(::lumina::core::kDimension))); @@ -138,7 +139,8 @@ Result> LuminaGlobalIndex::CreateReader( auto lumina_pool = std::make_shared(pool); ::lumina::core::MemoryResourceConfig memory_resource(lumina_pool.get()); - auto lumina_options = LuminaUtils::FetchLuminaOptions(options_); + auto lumina_options = + OptionsUtils::FetchOptionsWithPrefix(LuminaDefines::kOptionKeyPrefix, options_); lumina_options[std::string(::lumina::core::kDimension)] = std::to_string(index_info.dimension); lumina_options[std::string(::lumina::core::kIndexType)] = index_info.index_type; @@ -290,7 +292,8 @@ Result> LuminaIndexWriter::Finish() { PAIMON_RETURN_NOT_OK_FROM_LUMINA(builder.InsertFrom(dataset2)); // dump index - PAIMON_ASSIGN_OR_RAISE(std::string index_file_name, file_manager_->NewFileName(kIdentifier)); + PAIMON_ASSIGN_OR_RAISE(std::string index_file_name, + file_manager_->NewFileName(LuminaDefines::kIdentifier)); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, file_manager_->NewOutputStream(index_file_name)) auto file_writer = std::make_unique(out); @@ -330,7 +333,8 @@ Result> LuminaIndexReader::VisitV return Status::Invalid("dimension for index and search not match"); } - auto lumina_options = LuminaUtils::FetchLuminaOptions(vector_search->options); + auto lumina_options = OptionsUtils::FetchOptionsWithPrefix(LuminaDefines::kOptionKeyPrefix, + vector_search->options); auto index_type_iter = lumina_options.find(std::string(::lumina::core::kIndexType)); if (index_type_iter != lumina_options.end() && index_type_iter->second != index_info_.index_type) { diff --git a/src/paimon/global_index/lumina/lumina_global_index.h b/src/paimon/global_index/lumina/lumina_global_index.h index 1a123fcd..8187bcd0 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.h +++ b/src/paimon/global_index/lumina/lumina_global_index.h @@ -92,8 +92,6 @@ class LuminaIndexWriter : public GlobalIndexWriter { Result> Finish() override; private: - static constexpr char kIdentifier[] = "lumina"; - int64_t count_ = 0; std::shared_ptr pool_; std::string field_name_; diff --git a/src/paimon/global_index/lumina/lumina_utils.h b/src/paimon/global_index/lumina/lumina_utils.h index 5abe1ea8..449f8ac2 100644 --- a/src/paimon/global_index/lumina/lumina_utils.h +++ b/src/paimon/global_index/lumina/lumina_utils.h @@ -19,7 +19,6 @@ #include #include "lumina/core/Status.h" -#include "paimon/common/utils/string_utils.h" #include "paimon/status.h" namespace paimon::lumina { @@ -82,24 +81,12 @@ inline Status LuminaToPaimonStatus(const ::lumina::core::Status& status) { } } -class LuminaUtils { +class LuminaDefines { public: - LuminaUtils() = delete; - ~LuminaUtils() = delete; - - static std::map FetchLuminaOptions( - const std::map& options) { - std::map lumina_options; - int64_t prefix_len = strlen(kOptionKeyPrefix); - for (const auto& [key, value] : options) { - if (StringUtils::StartsWith(key, kOptionKeyPrefix)) { - lumina_options[key.substr(prefix_len)] = value; - } - } - return lumina_options; - } - - private: + LuminaDefines() = delete; + ~LuminaDefines() = delete; static constexpr char kOptionKeyPrefix[] = "lumina."; + static constexpr char kIdentifier[] = "lumina"; }; + } // namespace paimon::lumina diff --git a/third_party/versions.txt b/third_party/versions.txt index d1cd50a6..e834f9b8 100644 --- a/third_party/versions.txt +++ b/third_party/versions.txt @@ -71,6 +71,11 @@ PAIMON_JINDOSDK_C_BUILD_VERSION=6.10.2 PAIMON_JINDOSDK_C_BUILD_SHA256_CHECKSUM=23e61c9815fab1cd88c369445bdbe1eab02cc09bafed3bb5118ecaf5b2fbc518 PAIMON_JINDOSDK_C_PKG_NAME=jindosdk-${PAIMON_JINDOSDK_C_BUILD_VERSION}.tar.gz +PAIMON_BOOST_BUILD_VERSION=1.66.0 + +PAIMON_LUCENE_BUILD_VERSION=3.0.9 +PAIMON_LUCENE_BUILD_SHA256_CHECKSUM=4e69e29d5d79a976498ef71eab70c9c88c7014708be4450a9fda7780fe93584e +PAIMON_LUCENE_PKG_NAME=rel_${PAIMON_LUCENE_BUILD_VERSION}.tar.gz # The first field is the name of the environment variable expected by cmake. # This _must_ match what is defined. The second field is the name of the @@ -91,4 +96,5 @@ DEPENDENCIES=( "PAIMON_GLOG_URL ${PAIMON_GLOG_PKG_NAME} ${THIRDPARTY_MIRROR_URL}https://github.com/google/glog/archive/${PAIMON_GLOG_BUILD_VERSION}.tar.gz" "PAIMON_RAPIDJSON_URL ${PAIMON_RAPIDJSON_PKG_NAME} ${THIRDPARTY_MIRROR_URL}https://github.com/miloyip/rapidjson/archive/${PAIMON_RAPIDJSON_BUILD_VERSION}.tar.gz" "PAIMON_JINDOSDK_C_URL ${PAIMON_JINDOSDK_C_PKG_NAME} https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/release/${PAIMON_JINDOSDK_C_BUILD_VERSION}/jindosdk-${PAIMON_JINDOSDK_C_BUILD_VERSION}-linux.tar.gz" + "PAIMON_LUCENE_URL ${PAIMON_LUCENE_PKG_NAME} ${THIRDPARTY_MIRROR_URL}https://github.com/luceneplusplus/LucenePlusPlus/archive/refs/tags/${PAIMON_LUCENE_PKG_NAME}" )