Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 340 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,344 @@ bool ColumnSelectVector::can_filter_all(size_t remaining_num_values) {
void ColumnSelectVector::skip(size_t num_values) {
_filter_map_index += num_values;
}

ParsedVersion::ParsedVersion(std::string application, std::optional<std::string> version,
std::optional<std::string> app_build_hash)
: _application(std::move(application)),
_version(std::move(version)),
_app_build_hash(std::move(app_build_hash)) {}

bool ParsedVersion::operator==(const ParsedVersion& other) const {
return _application == other._application && _version == other._version &&
_app_build_hash == other._app_build_hash;
}

bool ParsedVersion::operator!=(const ParsedVersion& other) const {
return !(*this == other);
}

size_t ParsedVersion::hash() const {
std::hash<std::string> hasher;
return hasher(_application) ^ (_version ? hasher(*_version) : 0) ^
(_app_build_hash ? hasher(*_app_build_hash) : 0);
}

std::string ParsedVersion::to_string() const {
return "ParsedVersion(application=" + _application +
", semver=" + (_version ? *_version : "null") +
", app_build_hash=" + (_app_build_hash ? *_app_build_hash : "null") + ")";
}

Status VersionParser::parse(const std::string& created_by,
std::unique_ptr<ParsedVersion>* parsed_version) {
static const std::string FORMAT =
"(.*?)\\s+version\\s*(?:([^(]*?)\\s*(?:\\(\\s*build\\s*([^)]*?)\\s*\\))?)?";
static const std::regex PATTERN(FORMAT);

std::smatch matcher;
if (!std::regex_match(created_by, matcher, PATTERN)) {
return Status::InternalError(fmt::format("Could not parse created_by: {}, using format: {}",
created_by, FORMAT));
}

std::string application = matcher[1].str();
if (application.empty()) {
return Status::InternalError("application cannot be null or empty");
}
std::optional<std::string> semver =
matcher[2].str().empty() ? std::nullopt : std::optional<std::string>(matcher[2].str());
std::optional<std::string> app_build_hash =
matcher[3].str().empty() ? std::nullopt : std::optional<std::string>(matcher[3].str());
*parsed_version = std::make_unique<ParsedVersion>(application, semver, app_build_hash);
return Status::OK();
}

SemanticVersion::SemanticVersion(int major, int minor, int patch)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(false),
_unknown(std::nullopt),
_pre(std::nullopt),
_build_info(std::nullopt) {}

#ifdef BE_TEST
SemanticVersion::SemanticVersion(int major, int minor, int patch, bool has_unknown)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(has_unknown),
_unknown(std::nullopt),
_pre(std::nullopt),
_build_info(std::nullopt) {}
#endif

SemanticVersion::SemanticVersion(int major, int minor, int patch,
std::optional<std::string> unknown, std::optional<std::string> pre,
std::optional<std::string> build_info)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(unknown.has_value() && !unknown.value().empty()),
_unknown(std::move(unknown)),
_pre(pre.has_value() ? std::optional<Prerelease>(Prerelease(std::move(pre.value())))
: std::nullopt),
_build_info(std::move(build_info)) {}

Status SemanticVersion::parse(const std::string& version,
std::unique_ptr<SemanticVersion>* semantic_version) {
static const std::regex pattern(R"(^(\d+)\.(\d+)\.(\d+)([^-+]*)?(?:-([^+]*))?(?:\+(.*))?$)");
std::smatch match;

if (!std::regex_match(version, match, pattern)) {
return Status::InternalError(version + " does not match format");
}

int major = std::stoi(match[1].str());
int minor = std::stoi(match[2].str());
int patch = std::stoi(match[3].str());
std::optional<std::string> unknown =
match[4].str().empty() ? std::nullopt : std::optional<std::string>(match[4].str());
std::optional<std::string> prerelease =
match[5].str().empty() ? std::nullopt : std::optional<std::string>(match[5].str());
std::optional<std::string> build_info =
match[6].str().empty() ? std::nullopt : std::optional<std::string>(match[6].str());
if (major < 0 || minor < 0 || patch < 0) {
return Status::InternalError("major({}), minor({}), and patch({}) must all be >= 0", major,
minor, patch);
}
*semantic_version =
std::make_unique<SemanticVersion>(major, minor, patch, unknown, prerelease, build_info);
return Status::OK();
}

int SemanticVersion::compare_to(const SemanticVersion& other) const {
if (int cmp = _compare_integers(_major, other._major); cmp != 0) {
return cmp;
}
if (int cmp = _compare_integers(_minor, other._minor); cmp != 0) {
return cmp;
}
if (int cmp = _compare_integers(_patch, other._patch); cmp != 0) {
return cmp;
}
if (int cmp = _compare_booleans(other._prerelease, _prerelease); cmp != 0) {
return cmp;
}
if (_pre.has_value()) {
if (other._pre.has_value()) {
return _pre.value().compare_to(other._pre.value());
} else {
return -1;
}
} else if (other._pre.has_value()) {
return 1;
}
return 0;
}

bool SemanticVersion::operator==(const SemanticVersion& other) const {
return compare_to(other) == 0;
}

bool SemanticVersion::operator!=(const SemanticVersion& other) const {
return !(*this == other);
}

std::string SemanticVersion::to_string() const {
std::string result =
std::to_string(_major) + "." + std::to_string(_minor) + "." + std::to_string(_patch);
if (_prerelease && _unknown) result += _unknown.value();
if (_pre) result += _pre.value().to_string();
if (_build_info) result += _build_info.value();
return result;
}

SemanticVersion::NumberOrString::NumberOrString(const std::string& value_string)
: _original(value_string) {
const static std::regex NUMERIC("\\d+");
_is_numeric = std::regex_match(_original, NUMERIC);
_number = -1;
if (_is_numeric) {
_number = std::stoi(_original);
}
}

SemanticVersion::NumberOrString::NumberOrString(const NumberOrString& other)
: _original(other._original), _is_numeric(other._is_numeric), _number(other._number) {}

int SemanticVersion::NumberOrString::compare_to(const SemanticVersion::NumberOrString& that) const {
if (this->_is_numeric != that._is_numeric) {
return this->_is_numeric ? -1 : 1;
}

if (_is_numeric) {
return this->_number - that._number;
}

return this->_original.compare(that._original);
}

std::string SemanticVersion::NumberOrString::to_string() const {
return _original;
}

bool SemanticVersion::NumberOrString::operator<(const SemanticVersion::NumberOrString& that) const {
return compare_to(that) < 0;
}

bool SemanticVersion::NumberOrString::operator==(
const SemanticVersion::NumberOrString& that) const {
return compare_to(that) == 0;
}

bool SemanticVersion::NumberOrString::operator!=(
const SemanticVersion::NumberOrString& that) const {
return !(*this == that);
}

bool SemanticVersion::NumberOrString::operator>(const SemanticVersion::NumberOrString& that) const {
return compare_to(that) > 0;
}

bool SemanticVersion::NumberOrString::operator<=(
const SemanticVersion::NumberOrString& that) const {
return !(*this > that);
}

bool SemanticVersion::NumberOrString::operator>=(
const SemanticVersion::NumberOrString& that) const {
return !(*this < that);
}

int SemanticVersion::_compare_integers(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}

int SemanticVersion::_compare_booleans(bool x, bool y) {
return (x == y) ? 0 : (x ? 1 : -1);
}

std::vector<std::string> SemanticVersion::Prerelease::_split(const std::string& s,
const std::regex& delimiter) {
std::sregex_token_iterator iter(s.begin(), s.end(), delimiter, -1);
std::sregex_token_iterator end;
std::vector<std::string> tokens(iter, end);
return tokens;
}

SemanticVersion::Prerelease::Prerelease(std::string original) : _original(std::move(original)) {
static const std::regex DOT("\\.");
auto parts = _split(_original, DOT);
for (const auto& part : parts) {
NumberOrString number_or_string(part);
_identifiers.emplace_back(number_or_string);
}
}

int SemanticVersion::Prerelease::compare_to(const Prerelease& that) const {
int size = std::min(this->_identifiers.size(), that._identifiers.size());
for (int i = 0; i < size; ++i) {
int cmp = this->_identifiers[i].compare_to(that._identifiers[i]);
if (cmp != 0) {
return cmp;
}
}
return static_cast<int>(this->_identifiers.size()) - static_cast<int>(that._identifiers.size());
}

std::string SemanticVersion::Prerelease::to_string() const {
return _original;
}

bool SemanticVersion::Prerelease::operator<(const Prerelease& that) const {
return compare_to(that) < 0;
}

bool SemanticVersion::Prerelease::operator==(const Prerelease& that) const {
return compare_to(that) == 0;
}

bool SemanticVersion::Prerelease::operator!=(const Prerelease& that) const {
return !(*this == that);
}

bool SemanticVersion::Prerelease::operator>(const Prerelease& that) const {
return compare_to(that) > 0;
}

bool SemanticVersion::Prerelease::operator<=(const Prerelease& that) const {
return !(*this > that);
}

bool SemanticVersion::Prerelease::operator>=(const Prerelease& that) const {
return !(*this < that);
}

const SemanticVersion CorruptStatistics::PARQUET_251_FIXED_VERSION(1, 8, 0);
const SemanticVersion CorruptStatistics::CDH_5_PARQUET_251_FIXED_START(1, 5, 0, std::nullopt,
"cdh5.5.0", std::nullopt);
const SemanticVersion CorruptStatistics::CDH_5_PARQUET_251_FIXED_END(1, 5, 0);

bool CorruptStatistics::should_ignore_statistics(const std::string& created_by,
tparquet::Type::type physical_type) {
if (physical_type != tparquet::Type::BYTE_ARRAY &&
physical_type != tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// The bug only applies to binary columns
return false;
}

if (created_by.empty()) {
// created_by is not populated
VLOG_DEBUG
<< "Ignoring statistics because created_by is null or empty! See PARQUET-251 and "
"PARQUET-297";
return true;
}

Status status;
std::unique_ptr<ParsedVersion> parsed_version;
status = VersionParser::parse(created_by, &parsed_version);
if (!status.ok()) {
VLOG_DEBUG << "Ignoring statistics because created_by could not be parsed (see "
"PARQUET-251)."
" CreatedBy: "
<< created_by << ", msg: " << status.msg();
return true;
}

if (parsed_version->application() != "parquet-mr") {
// Assume other applications don't have this bug
return false;
}

if ((!parsed_version->version().has_value()) || parsed_version->version().value().empty()) {
VLOG_DEBUG << "Ignoring statistics because created_by did not contain a semver (see "
"PARQUET-251): "
<< created_by;
return true;
}

std::unique_ptr<SemanticVersion> semantic_version;
status = SemanticVersion::parse(parsed_version->version().value(), &semantic_version);
if (!status.ok()) {
VLOG_DEBUG << "Ignoring statistics because created_by could not be parsed (see "
"PARQUET-251)."
" CreatedBy: "
<< created_by << ", msg: " << status.msg();
return true;
}
if (semantic_version->compare_to(PARQUET_251_FIXED_VERSION) < 0 &&
!(semantic_version->compare_to(CDH_5_PARQUET_251_FIXED_START) >= 0 &&
semantic_version->compare_to(CDH_5_PARQUET_251_FIXED_END) < 0)) {
VLOG_DEBUG
<< "Ignoring statistics because this file was created prior to the fixed version, "
"see PARQUET-251";
return true;
}

// This file was created after the fix
return false;
}

} // namespace doris::vectorized
Loading