Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Client/BuzzHouse/Generator/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ std::unordered_map<String, CHSetting> serverSettings = {
{"input_format_parquet_case_insensitive_column_matching", trueOrFalseSettingNoOracle},
{"input_format_parquet_enable_json_parsing", trueOrFalseSettingNoOracle},
{"input_format_parquet_enable_row_group_prefetch", trueOrFalseSettingNoOracle},
{"input_format_parquet_verify_checksums", trueOrFalseSettingNoOracle},
{"input_format_parquet_filter_push_down", trueOrFalseSetting},
{"input_format_parquet_preserve_order", trueOrFalseSettingNoOracle},
{"input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference", trueOrFalseSettingNoOracle},
Expand Down
25 changes: 19 additions & 6 deletions src/Common/threadPoolCallbackRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void ThreadPoolCallbackRunnerFast::operator()(std::function<void()> f)
{
std::unique_lock lock(mutex);
queue.push_back(std::move(f));

startMoreThreadsIfNeeded(active_tasks_, lock);
}

Expand All @@ -92,23 +93,35 @@ void ThreadPoolCallbackRunnerFast::bulkSchedule(std::vector<std::function<void()
if (mode == Mode::Disabled)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized");

size_t active_tasks_ = fs.size() + active_tasks.fetch_add(fs.size(), std::memory_order_relaxed);
size_t n = fs.size();
size_t active_tasks_ = n + active_tasks.fetch_add(n, std::memory_order_relaxed);

{
std::unique_lock lock(mutex);
queue.insert(queue.end(), std::move_iterator(fs.begin()), std::move_iterator(fs.end()));
startMoreThreadsIfNeeded(active_tasks_, lock);

try
{
startMoreThreadsIfNeeded(active_tasks_, lock);
}
catch (...)
{
/// Keep `queue` consistent with `queue_size`.
queue.erase(queue.end() - n, queue.end());
active_tasks.fetch_sub(n, std::memory_order_relaxed);
throw;
}
}

if (mode == Mode::ThreadPool)
{
#ifdef OS_LINUX
UInt32 prev_size = queue_size.fetch_add(fs.size(), std::memory_order_release);
UInt32 prev_size = queue_size.fetch_add(n, std::memory_order_release);
if (prev_size < max_threads)
futexWake(&queue_size, fs.size());
futexWake(&queue_size, n);
#else
if (fs.size() < 4)
for (size_t i = 0; i < fs.size(); ++i)
if (n < 4)
for (size_t i = 0; i < n; ++i)
queue_cv.notify_one();
else
queue_cv.notify_all();
Expand Down
12 changes: 9 additions & 3 deletions src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ When reading Parquet files, parse JSON columns as ClickHouse JSON Column.
DECLARE(Bool, input_format_parquet_use_native_reader, false, R"(
Use native parquet reader v1. It's relatively fast but unfinished. Deprecated.
)", 0) \
DECLARE(Bool, input_format_parquet_use_native_reader_v3, false, R"(
Use Parquet reader v3. Experimental.
)", EXPERIMENTAL) \
DECLARE(Bool, input_format_parquet_use_native_reader_v3, true, R"(
Use Parquet reader v3.
)", 0) \
DECLARE(UInt64, input_format_parquet_memory_low_watermark, 2ul << 20, R"(
Schedule prefetches more aggressively if memory usage is below than threshold. Potentially useful e.g. if there are many small bloom filters to read over network.
)", 0) \
Expand All @@ -196,6 +196,9 @@ Skip pages using min/max values from column index.
)", 0) \
DECLARE(Bool, input_format_parquet_use_offset_index, true, R"(
Minor tweak to how pages are read from parquet file when no page filtering is used.
)", 0) \
DECLARE(Bool, input_format_parquet_verify_checksums, true, R"(
Verify page checksums when reading parquet files.
)", 0) \
DECLARE(Bool, input_format_allow_seeks, true, R"(
Allow seeks while reading in ORC/Parquet/Arrow input formats.
Expand Down Expand Up @@ -1127,6 +1130,9 @@ If dictionary size grows bigger than this many bytes, switch to encoding without
)", 0) \
DECLARE(Bool, output_format_parquet_enum_as_byte_array, true, R"(
Write enum using parquet physical type: BYTE_ARRAY and logical type: ENUM
)", 0) \
DECLARE(Bool, output_format_parquet_write_checksums, true, R"(
Put crc32 checksums in parquet page headers.
)", 0) \
DECLARE(String, output_format_avro_codec, "", R"(
Compression codec used for output. Possible values: 'null', 'deflate', 'snappy', 'zstd'.
Expand Down
5 changes: 4 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"},
{"input_format_parquet_verify_checksums", true, true, "New setting."},
{"output_format_parquet_write_checksums", false, true, "New setting."},
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
});
Expand All @@ -82,7 +85,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"distributed_cache_connect_max_tries", 20, 5, "Changed setting value"},
{"opentelemetry_trace_cpu_scheduling", false, false, "New setting to trace `cpu_slot_preemption` feature."},
{"output_format_parquet_max_dictionary_size", 1024 * 1024, 1024 * 1024, "New setting"},
{"input_format_parquet_use_native_reader_v3", false, true, "New setting"},
{"input_format_parquet_use_native_reader_v3", false, false, "New setting"},
{"input_format_parquet_memory_low_watermark", 2ul << 20, 2ul << 20, "New setting"},
{"input_format_parquet_memory_high_watermark", 4ul << 30, 4ul << 30, "New setting"},
{"input_format_parquet_page_filter_push_down", true, true, "New setting (no effect when input_format_parquet_use_native_reader_v3 is disabled)"},
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.output_date_as_uint16 = settings[Setting::output_format_parquet_date_as_uint16];
format_settings.parquet.max_dictionary_size = settings[Setting::output_format_parquet_max_dictionary_size];
format_settings.parquet.output_enum_as_byte_array = settings[Setting::output_format_parquet_enum_as_byte_array];
format_settings.parquet.write_checksums = settings[Setting::output_format_parquet_write_checksums];
format_settings.parquet.max_block_size = settings[Setting::input_format_parquet_max_block_size];
format_settings.parquet.prefer_block_bytes = settings[Setting::input_format_parquet_prefer_block_bytes];
format_settings.parquet.output_compression_method = settings[Setting::output_format_parquet_compression_method];
Expand All @@ -225,6 +226,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.bloom_filter_flush_threshold_bytes = settings[Setting::output_format_parquet_bloom_filter_flush_threshold_bytes];
format_settings.parquet.local_read_min_bytes_for_seek = settings[Setting::input_format_parquet_local_file_min_bytes_for_seek];
format_settings.parquet.enable_row_group_prefetch = settings[Setting::input_format_parquet_enable_row_group_prefetch];
format_settings.parquet.verify_checksums = settings[Setting::input_format_parquet_verify_checksums];
format_settings.parquet.allow_geoparquet_parser = settings[Setting::input_format_parquet_allow_geoparquet_parser];
format_settings.parquet.write_geometadata = settings[Setting::output_format_parquet_geometadata];
format_settings.pretty.charset = settings[Setting::output_format_pretty_grid_charset].toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ struct FormatSettings
bool enable_json_parsing = true;
bool preserve_order = false;
bool enable_row_group_prefetch = true;
bool verify_checksums = true;
std::unordered_set<int> skip_row_groups = {};
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
Expand All @@ -312,6 +313,7 @@ struct FormatSettings
bool output_compliant_nested_types = true;
bool write_page_index = false;
bool write_bloom_filter = false;
bool write_checksums = true;
ParquetVersion output_version = ParquetVersion::V2_LATEST;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
uint64_t output_compression_level;
Expand Down
8 changes: 4 additions & 4 deletions src/Processors/Formats/Impl/Parquet/Decoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ void Dictionary::index(const ColumnUInt32 & indexes_col, IColumn & out)
c.reserve(c.size() + indexes.size());
for (UInt32 idx : indexes)
{
size_t start = offsets[size_t(idx) - 1] + 4; // offsets[-1] is ok because of padding
size_t start = offsets[ssize_t(idx) - 1] + 4; // offsets[-1] is ok because of padding
size_t len = offsets[idx] - start;
/// TODO [parquet]: Try optimizing short memcpy by taking advantage of padding (maybe memcpySmall.h helps). Also in PlainStringDecoder.
c.insertData(data.data() + start, len);
Expand Down Expand Up @@ -1219,7 +1219,7 @@ void TrivialStringConverter::convertColumn(std::span<const char> chars, const UI
{
col_str.getChars().reserve(col_str.getChars().size() + (offsets[num_values - 1] - offsets[-1]) - separator_bytes * num_values);
for (size_t i = 0; i < num_values; ++i)
col_str.insertData(chars.data() + offsets[i - 1], offsets[i] - offsets[i - 1] - separator_bytes);
col_str.insertData(chars.data() + offsets[ssize_t(i) - 1], offsets[i] - offsets[ssize_t(i) - 1] - separator_bytes);
}
}

Expand Down Expand Up @@ -1345,8 +1345,8 @@ void BigEndianDecimalStringConverter<T>::convertColumn(std::span<const char> cha

for (size_t i = 0; i < num_values; ++i)
{
const char * data = chars.data() + offsets[i - 1];
size_t size = offsets[i] - offsets[i - 1] - separator_bytes;
const char * data = chars.data() + offsets[ssize_t(i) - 1];
size_t size = offsets[i] - offsets[ssize_t(i) - 1] - separator_bytes;
if (size > sizeof(T))
throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpectedly wide Decimal value: {} > {} bytes", size, sizeof(T));

Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/Parquet/Prefetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void Prefetcher::determineReadModeAndFileSize(ReadBuffer * reader_, const ReadOp
if (!reader_->eof() && reader_->available() >= expected_prefix.size() &&
memcmp(reader_->position(), expected_prefix.data(), expected_prefix.size()) != 0)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a parquet file (wrong magic bytes at the start)");
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a Parquet file (wrong magic bytes at the start)");
}

WriteBufferFromVector<PaddedPODArray<char>> out(entire_file);
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/Parquet/ReadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ ReadManager::ReadResult ReadManager::read()
bool thread_pool_was_idle = parser_shared_resources->parsing_runner.isIdle();

if (exception)
std::rethrow_exception(exception);
std::rethrow_exception(copyMutableException(exception));

/// If `preserve_order`, only deliver chunks from `first_incomplete_row_group`.
/// This ensures that row groups are delivered in order. Within a row group, row
Expand Down
Loading
Loading