Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 23 additions & 58 deletions be/src/olap/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
#include "util/types.h"
#include "vec/common/arena.h"
#include "vec/core/wide_integer.h"
#include "vec/runtime/ipv4_value.h"
#include "vec/runtime/ipv6_value.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
Expand Down Expand Up @@ -728,7 +730,7 @@ struct CppTypeTraits<FieldType::OLAP_FIELD_TYPE_IPV4> {
};
template <>
struct CppTypeTraits<FieldType::OLAP_FIELD_TYPE_IPV6> {
using CppType = uint128_t;
using CppType = int128_t;
using UnsignedCppType = uint128_t;
};
template <>
Expand Down Expand Up @@ -980,11 +982,8 @@ struct FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_IPV4>
: public BaseFieldTypeTraits<FieldType::OLAP_FIELD_TYPE_IPV4> {
static Status from_string(void* buf, const std::string& scan_key, const int precision,
const int scale) {
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
uint32_t value = StringParser::string_to_unsigned_int<uint32_t>(scan_key.c_str(),
scan_key.size(), &result);

if (result == StringParser::PARSE_FAILURE) {
uint32_t value;
if (!IPv4Value::from_string(value, scan_key)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"FieldTypeTraits<OLAP_FIELD_TYPE_IPV4>::from_string meet PARSE_FAILURE");
}
Expand All @@ -994,10 +993,16 @@ struct FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_IPV4>

static std::string to_string(const void* src) {
uint32_t value = *reinterpret_cast<const uint32_t*>(src);
std::stringstream ss;
ss << ((value >> 24) & 0xFF) << '.' << ((value >> 16) & 0xFF) << '.'
<< ((value >> 8) & 0xFF) << '.' << (value & 0xFF);
return ss.str();
IPv4Value ipv4_value(value);
return ipv4_value.to_string();
}

static void set_to_max(void* buf) {
*reinterpret_cast<uint32_t*>(buf) = 0xFFFFFFFF; // 255.255.255.255
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 0xFFFFFFFF is a magic number; consider replacing it with a named constant [readability-magic-numbers]

        *reinterpret_cast<uint32_t*>(buf) = 0xFFFFFFFF; // 255.255.255.255
                                            ^

}

static void set_to_min(void* buf) {
*reinterpret_cast<uint32_t*>(buf) = 0; // 0.0.0.0
}
};

Expand All @@ -1006,67 +1011,27 @@ struct FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_IPV6>
: public BaseFieldTypeTraits<FieldType::OLAP_FIELD_TYPE_IPV6> {
static Status from_string(void* buf, const std::string& scan_key, const int precision,
const int scale) {
std::istringstream iss(scan_key);
std::string token;
uint128_t result = 0;
int count = 0;

while (std::getline(iss, token, ':')) {
if (token.empty()) {
count += 8 - count;
break;
}

if (count > 8) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"FieldTypeTraits<OLAP_FIELD_TYPE_IPV6>::from_string meet PARSE_FAILURE");
}

uint16_t value = 0;
std::istringstream ss(token);
if (!(ss >> std::hex >> value)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"FieldTypeTraits<OLAP_FIELD_TYPE_IPV6>::from_string meet PARSE_FAILURE");
}

result = (result << 16) | value;
count++;
}

if (count < 8) {
int128_t value;
if (!IPv6Value::from_string(value, scan_key)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"FieldTypeTraits<OLAP_FIELD_TYPE_IPV6>::from_string meet PARSE_FAILURE");
}

*reinterpret_cast<uint128_t*>(buf) = result;
*reinterpret_cast<int128_t*>(buf) = value;
return Status::OK();
}

static std::string to_string(const void* src) {
std::stringstream result;
uint128_t ipv6 = *reinterpret_cast<const uint128_t*>(src);

for (int i = 0; i < 8; i++) {
uint16_t part = static_cast<uint16_t>((ipv6 >> (112 - i * 16)) & 0xFFFF);
result << std::to_string(part);
if (i != 7) {
result << ":";
}
}

return result.str();
int128_t value = *reinterpret_cast<const int128_t*>(src);
IPv6Value ipv6_value(value);
return ipv6_value.to_string();
}

static void set_to_max(void* buf) {
*reinterpret_cast<PackedInt128*>(buf) =
static_cast<int128_t>(999999999999999999ll) * 100000000000000000ll * 1000ll +
static_cast<int128_t>(99999999999999999ll) * 1000ll + 999ll;
*reinterpret_cast<int128_t*>(buf) = -1; // ::1
}

static void set_to_min(void* buf) {
*reinterpret_cast<PackedInt128*>(buf) =
-(static_cast<int128_t>(999999999999999999ll) * 100000000000000000ll * 1000ll +
static_cast<int128_t>(99999999999999999ll) * 1000ll + 999ll);
*reinterpret_cast<int128_t*>(buf) = 0; // ::
}
};

Expand Down
6 changes: 6 additions & 0 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
case TYPE_DECIMAL128I:
*result = std::make_shared<arrow::Decimal128Type>(type.precision, type.scale);
break;
case TYPE_IPV4:
*result = arrow::uint32();
break;
case TYPE_IPV6:
*result = arrow::utf8();
break;
case TYPE_BOOLEAN:
*result = arrow::boolean();
break;
Expand Down
27 changes: 27 additions & 0 deletions be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <arrow/builder.h>

#include "vec/columns/column_const.h"
#include "vec/io/io_helper.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -61,5 +62,31 @@ Status DataTypeIPv4SerDe::write_column_to_mysql(const IColumn& column,
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
}

Status DataTypeIPv4SerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'serialize_one_cell_to_json' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status DataTypeIPv4SerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,
static Status DataTypeIPv4SerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,

be/src/vec/data_types/serde/data_type_ipv4_serde.cpp:66:

-                                                      FormatOptions& options) const {
+                                                      FormatOptions& options) {

BufferWritable& bw,
FormatOptions& options) const {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;
IPv4 data = assert_cast<const ColumnIPv4&>(*ptr).get_element(row_num);
IPv4Value ipv4_value(data);
std::string ipv4_str = ipv4_value.to_string();
bw.write(ipv4_str.c_str(), ipv4_str.length());
return Status::OK();
}

Status DataTypeIPv4SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
Comment on lines +78 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'deserialize_one_cell_from_json' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status DataTypeIPv4SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
static Status DataTypeIPv4SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) {

auto& column_data = reinterpret_cast<ColumnIPv4&>(column);
ReadBuffer rb(slice.data, slice.size);
IPv4 val = 0;
if (!read_ipv4_text_impl(val, rb)) {
return Status::InvalidArgument("parse ipv4 fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str());
}
column_data.insert_value(val);
return Status::OK();
}

} // namespace vectorized
} // namespace doris
4 changes: 4 additions & 0 deletions be/src/vec/data_types/serde/data_type_ipv4_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class DataTypeIPv4SerDe : public DataTypeNumberSerDe<IPv4> {
int row_idx, bool col_const) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const override;
Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here has impl serialize_column_to_json/deserialize_column_from_json

FormatOptions& options) const override;
Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const override;

private:
template <bool is_binary_format>
Expand Down
27 changes: 27 additions & 0 deletions be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <arrow/builder.h>

#include "vec/columns/column_const.h"
#include "vec/io/io_helper.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -61,5 +62,31 @@ Status DataTypeIPv6SerDe::write_column_to_mysql(const IColumn& column,
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
}

Status DataTypeIPv6SerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'serialize_one_cell_to_json' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status DataTypeIPv6SerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,
static Status DataTypeIPv6SerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,

be/src/vec/data_types/serde/data_type_ipv6_serde.cpp:66:

-                                                      FormatOptions& options) const {
+                                                      FormatOptions& options) {

BufferWritable& bw,
FormatOptions& options) const {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;
IPv6 data = assert_cast<const ColumnIPv6&>(*ptr).get_element(row_num);
IPv6Value ipv6_value(data);
std::string ipv6_str = ipv6_value.to_string();
bw.write(ipv6_str.c_str(), ipv6_str.length());
return Status::OK();
}

Status DataTypeIPv6SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
Comment on lines +78 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'deserialize_one_cell_from_json' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status DataTypeIPv6SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
static Status DataTypeIPv6SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) {

auto& column_data = reinterpret_cast<ColumnIPv6&>(column);
ReadBuffer rb(slice.data, slice.size);
IPv6 val = 0;
if (!read_ipv6_text_impl(val, rb)) {
return Status::InvalidArgument("parse ipv6 fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str());
}
column_data.insert_value(val);
return Status::OK();
}

} // namespace vectorized
} // namespace doris
4 changes: 4 additions & 0 deletions be/src/vec/data_types/serde/data_type_ipv6_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class DataTypeIPv6SerDe : public DataTypeNumberSerDe<IPv6> {
int row_idx, bool col_const) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const override;
Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw,
FormatOptions& options) const override;
Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const override;

private:
template <bool is_binary_format>
Expand Down
32 changes: 32 additions & 0 deletions be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
#include "vec/data_types/data_type_date_time.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_hll.h"
#include "vec/data_types/data_type_ipv4.h"
#include "vec/data_types/data_type_ipv6.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
Expand Down Expand Up @@ -98,6 +100,8 @@ void serialize_and_deserialize_arrow_test() {
{"k5", FieldType::OLAP_FIELD_TYPE_DECIMAL32, 5, TYPE_DECIMAL32, false},
{"k6", FieldType::OLAP_FIELD_TYPE_DECIMAL64, 6, TYPE_DECIMAL64, false},
{"k12", FieldType::OLAP_FIELD_TYPE_DATETIMEV2, 12, TYPE_DATETIMEV2, false},
{"k8", FieldType::OLAP_FIELD_TYPE_IPV4, 8, TYPE_IPV4, false},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 8 is a magic number; consider replacing it with a named constant [readability-magic-numbers]

                {"k8", FieldType::OLAP_FIELD_TYPE_IPV4, 8, TYPE_IPV4, false},
                                                        ^

{"k9", FieldType::OLAP_FIELD_TYPE_IPV6, 9, TYPE_IPV6, false},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 9 is a magic number; consider replacing it with a named constant [readability-magic-numbers]

                {"k9", FieldType::OLAP_FIELD_TYPE_IPV6, 9, TYPE_IPV6, false},
                                                        ^

};
} else {
cols = {{"a", FieldType::OLAP_FIELD_TYPE_ARRAY, 6, TYPE_ARRAY, true},
Expand Down Expand Up @@ -445,6 +449,34 @@ void serialize_and_deserialize_arrow_test() {
block.insert(type_and_name);
}
break;
case TYPE_IPV4:
tslot.__set_slotType(type_desc.to_thrift());
{
auto vec = vectorized::ColumnIPv4::create();
auto& data = vec->get_data();
for (int i = 0; i < row_num; ++i) {
data.push_back(i);
}
vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeIPv4>());
vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type,
col_name);
block.insert(std::move(type_and_name));
}
break;
case TYPE_IPV6:
tslot.__set_slotType(type_desc.to_thrift());
{
auto vec = vectorized::ColumnIPv6::create();
auto& data = vec->get_data();
for (int i = 0; i < row_num; ++i) {
data.push_back(i);
}
vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeIPv6>());
vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type,
col_name);
block.insert(std::move(type_and_name));
}
break;
default:
break;
}
Expand Down
74 changes: 71 additions & 3 deletions be/test/vec/data_types/serde/data_type_serde_csv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,83 @@ TEST(CsvSerde, ScalaDataTypeSerdeCsvTest) {
StringRef max_s_d = ser_col->get_data_at(1);
StringRef rand_s_d = ser_col->get_data_at(2);

std::cout << "min(" << min_s << ") with datat_ype_str:" << min_s_d << std::endl;
std::cout << "max(" << max_s << ") with datat_ype_str:" << max_s_d << std::endl;
std::cout << "rand(" << rand_date << ") with datat_type_str:" << rand_s_d << std::endl;
std::cout << "min(" << min_s << ") with data_type_str:" << min_s_d << std::endl;
std::cout << "max(" << max_s << ") with data_type_str:" << max_s_d << std::endl;
std::cout << "rand(" << rand_date << ") with data_type_str:" << rand_s_d << std::endl;
EXPECT_EQ(min_s, min_s_d.to_string());
EXPECT_EQ(max_s, max_s_d.to_string());
EXPECT_EQ(rand_date, rand_s_d.to_string());
}
}

// ipv4 and ipv6 type
{
typedef std::pair<FieldType, string> FieldType_RandStr;
std::vector<FieldType_RandStr> date_scala_field_types = {
FieldType_RandStr(FieldType::OLAP_FIELD_TYPE_IPV4, "127.0.0.1"),
FieldType_RandStr(FieldType::OLAP_FIELD_TYPE_IPV6, "2405:9800:9800:66::2")};
for (auto pair : date_scala_field_types) {
auto type = pair.first;
DataTypePtr data_type_ptr = DataTypeFactory::instance().create_data_type(type, 0, 0);
std::cout << "========= This type is " << data_type_ptr->get_name() << ": "
<< fmt::format("{}", type) << std::endl;

std::unique_ptr<WrapperField> min_wf(WrapperField::create_by_type(type));
std::unique_ptr<WrapperField> max_wf(WrapperField::create_by_type(type));
std::unique_ptr<WrapperField> rand_wf(WrapperField::create_by_type(type));

min_wf->set_to_min();
max_wf->set_to_max();
EXPECT_EQ(rand_wf->from_string(pair.second, 0, 0).ok(), true);

string min_s = min_wf->to_string();
string max_s = max_wf->to_string();
string rand_ip = rand_wf->to_string();

Slice min_rb(min_s.data(), min_s.size());
Slice max_rb(max_s.data(), max_s.size());
Slice rand_rb(rand_ip.data(), rand_ip.size());

auto col = data_type_ptr->create_column();
DataTypeSerDeSPtr serde = data_type_ptr->get_serde();
// make use c++ lib equals to wrapper field from_string behavior
DataTypeSerDe::FormatOptions formatOptions;

Status st = serde->deserialize_one_cell_from_json(*col, min_rb, formatOptions);
EXPECT_EQ(st.ok(), true);
st = serde->deserialize_one_cell_from_json(*col, max_rb, formatOptions);
EXPECT_EQ(st.ok(), true);
st = serde->deserialize_one_cell_from_json(*col, rand_rb, formatOptions);
EXPECT_EQ(st.ok(), true);

auto ser_col = ColumnString::create();
ser_col->reserve(3);
VectorBufferWriter buffer_writer(*ser_col.get());
st = serde->serialize_one_cell_to_json(*col, 0, buffer_writer, formatOptions);
EXPECT_EQ(st.ok(), true);
buffer_writer.commit();
st = serde->serialize_one_cell_to_json(*col, 1, buffer_writer, formatOptions);
EXPECT_EQ(st.ok(), true);
buffer_writer.commit();
st = serde->serialize_one_cell_to_json(*col, 2, buffer_writer, formatOptions);
EXPECT_EQ(st.ok(), true);
buffer_writer.commit();
rtrim(min_s);
rtrim(max_s);
rtrim(rand_ip);
StringRef min_s_d = ser_col->get_data_at(0);
StringRef max_s_d = ser_col->get_data_at(1);
StringRef rand_s_d = ser_col->get_data_at(2);

std::cout << "min(" << min_s << ") with data_type_str:" << min_s_d << std::endl;
std::cout << "max(" << max_s << ") with data_type_str:" << max_s_d << std::endl;
std::cout << "rand(" << rand_ip << ") with data_type_str:" << rand_s_d << std::endl;
EXPECT_EQ(min_s, min_s_d.to_string());
EXPECT_EQ(max_s, max_s_d.to_string());
EXPECT_EQ(rand_ip, rand_s_d.to_string());
}
}

// nullable data type with const column
{
DataTypePtr data_type_ptr = DataTypeFactory::instance().create_data_type(
Expand Down
Loading