Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ endif()
set(LLVM_BIN "${LLVM_HOME}/bin")

option(MAKE_TEST "ON for make unit test or OFF for not" OFF)
option(WITH_MYSQL "Support access MySQL" ON)

# Check gcc
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
Expand Down Expand Up @@ -154,8 +155,10 @@ set_target_properties(thriftnb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/li
add_library(lzo STATIC IMPORTED)
set_target_properties(lzo PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/liblzo2.a)

add_library(mysql STATIC IMPORTED)
set_target_properties(mysql PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libmysqlclient.a)
if (WITH_MYSQL)
add_library(mysql STATIC IMPORTED)
set_target_properties(mysql PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libmysqlclient.a)
endif()

add_library(libevent STATIC IMPORTED)
set_target_properties(libevent PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libevent.a)
Expand Down Expand Up @@ -274,6 +277,10 @@ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse4.2")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DLLVM_ON_UNIX")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-attributes -DS2_USE_GFLAGS -DS2_USE_GLOG")

if (WITH_MYSQL)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_MYSQL")
endif()

if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new")
endif()
Expand Down Expand Up @@ -443,9 +450,15 @@ set(DORIS_LINK_LIBS
${WL_END_GROUP}
)

if (WITH_MYSQL)
set(DORIS_DEPENDENCIES
mysql
)
endif()

# Set thirdparty libraries
set(DORIS_DEPENDENCIES
mysql
${DORIS_DEPENDENCIES}
${WL_START_GROUP}
rocksdb
librdkafka_cpp
Expand Down
37 changes: 8 additions & 29 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ set(EXEC_FILES
olap_table_info.cpp
olap_table_sink.cpp
plain_text_line_reader.cpp
mysql_scan_node.cpp
mysql_scanner.cpp
csv_scan_node.cpp
csv_scanner.cpp
es_scan_node.cpp
Expand Down Expand Up @@ -95,6 +93,14 @@ set(EXEC_FILES
broker_writer.cpp
)

if (WITH_MYSQL)
set(EXEC_FILES
${EXEC_FILES}
mysql_scan_node.cpp
mysql_scanner.cpp
)
endif()

if(EXISTS "${BASE_DIR}/src/exec/kudu_util.cpp")
set(EXEC_FILES ${EXEC_FILES}
#kudu_scan_node.cpp
Expand All @@ -106,30 +112,3 @@ endif()
add_library(Exec STATIC
${EXEC_FILES}
)

# TODO: why is this test disabled?
#ADD_BE_TEST(es/es_query_builder_test)
#ADD_BE_TEST(es/es_scan_reader_test)
#ADD_BE_TEST(new_olap_scan_node_test)
#ADD_BE_TEST(pre_aggregation_node_test)
#ADD_BE_TEST(hash_table_test)
#ADD_BE_TEST(olap_scanner_test)
#ADD_BE_TEST(olap_meta_reader_test)
#ADD_BE_TEST(olap_common_test)
#ADD_BE_TEST(olap_scan_node_test)
#ADD_BE_TEST(mysql_scan_node_test)
#ADD_BE_TEST(mysql_scanner_test)
#ADD_BE_TEST(schema_scan_node_test)
#ADD_BE_TEST(schema_scanner_test)
##ADD_BE_TEST(set_executor_test)
#ADD_BE_TEST(schema_scanner/schema_authors_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_columns_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_create_table_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_open_tables_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_schemata_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_table_names_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_tables_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_variables_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_engines_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_collations_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_charsets_scanner_test)
4 changes: 4 additions & 0 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Status DataSink::create_data_sink(
break;

case TDataSinkType::MYSQL_TABLE_SINK: {
#ifdef DORIS_WITH_MYSQL
if (!thrift_sink.__isset.mysql_table_sink) {
return Status("Missing data buffer sink.");
}
Expand All @@ -80,6 +81,9 @@ Status DataSink::create_data_sink(
pool, row_desc, output_exprs);
sink->reset(mysql_tbl_sink);
break;
#else
return Status("Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
#endif
}

case TDataSinkType::DATA_SPLIT_SINK: {
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,12 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK;

case TPlanNodeType::MYSQL_SCAN_NODE:
#ifdef DORIS_WITH_MYSQL
*node = pool->add(new MysqlScanNode(pool, tnode, descs));
return Status::OK;
#else
return Status("Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
#endif

case TPlanNodeType::ES_SCAN_NODE:
*node = pool->add(new EsScanNode(pool, tnode, descs));
Expand Down
11 changes: 11 additions & 0 deletions be/src/exec/mysql_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.

#include <mysql/mysql.h>

#define __DorisMysql MYSQL
#define __DorisMysqlRes MYSQL_RES
#include "mysql_scanner.h"


Expand Down Expand Up @@ -170,6 +174,13 @@ Status MysqlScanner::get_next_row(char** *buf, unsigned long** lengths, bool* eo
return Status::OK;
}

Status MysqlScanner::_error_status(const std::string& prefix) {
std::stringstream msg;
msg << prefix << " Err: " << mysql_error(_my_conn);
LOG(INFO) << msg.str();
return Status(msg.str());
}

}

/* vim: set ts=4 sw=4 sts=4 tw=100 noet: */
21 changes: 12 additions & 9 deletions be/src/exec/mysql_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@
#include <stdlib.h>
#include <string>
#include <vector>
#include <mysql/mysql.h>

#include "common/status.h"


#ifndef __DorisMysql
#define __DorisMysql void
#endif

#ifndef __DorisMysqlRes
#define __DorisMysqlRes void
#endif

namespace doris {

struct MysqlScannerParam {
Expand Down Expand Up @@ -55,16 +63,11 @@ class MysqlScanner {
return _field_num;
}
private:
Status _error_status(const std::string& prefix) {
std::stringstream msg;
msg << prefix << " Err: " << mysql_error(_my_conn);
LOG(WARNING) << msg.str();
return Status(msg.str());
}
Status _error_status(const std::string& prefix);

const MysqlScannerParam& _my_param;
MYSQL* _my_conn;
MYSQL_RES* _my_result;
__DorisMysql* _my_conn;
__DorisMysqlRes* _my_result;
std::string _sql_str;
bool _is_open;
int _field_num;
Expand Down
35 changes: 10 additions & 25 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/runtime")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime")

add_library(Runtime STATIC
set(RUNTIME_FILES
broker_mgr.cpp
buffered_block_mgr.cpp
buffered_tuple_stream.cpp
Expand Down Expand Up @@ -66,8 +66,6 @@ add_library(Runtime STATIC
dpp_sink.cpp
etl_job_mgr.cpp
load_path_mgr.cpp
mysql_table_writer.cpp
mysql_table_sink.cpp
types.cpp
tmp_file_mgr.cc
disk_io_mgr.cc
Expand Down Expand Up @@ -103,26 +101,13 @@ add_library(Runtime STATIC
routine_load/routine_load_task_executor.cpp
)

# This test runs forever so should not be part of 'make test'
#add_executable(disk_io_mgr_stress_test disk_io_mgr_stress_test.cpp)
#target_link_libraries(disk_io_mgr_stress_test ${IMPALA_TEST_LINK_LIBS})
if (WITH_MYSQL)
set(RUNTIME_FILES ${RUNTIME_FILES}
mysql_table_writer.cpp
mysql_table_sink.cpp
)
endif()

#ADD_BE_TEST(sorter_test)
#ADD_BE_TEST(buffered_tuple_stream_test)
#ADD_BE_TEST(result_writer_test)
#ADD_BE_TEST(buffer_control_block_test)
#ADD_BE_TEST(result_buffer_mgr_test)
#ADD_BE_TEST(result_receiver_test)
#ADD_BE_TEST(result_sink_test)
#ADD_BE_TEST(mem_pool_test)
#ADD_BE_TEST(free_list_test)
#ADD_BE_TEST(string_buffer_test)
#ADD_BE_TEST(data_stream_test)
#ADD_BE_TEST(timestamp_test)
#ADD_BE_TEST(disk_io_mgr_test)
#ADD_BE_TEST(parallel_executor_test)
#ADD_BE_TEST(datetime_value_test)
#ADD_BE_TEST(decimal_value_test)
#ADD_BE_TEST(decimalv2_value_test)
#ADD_BE_TEST(string_value_test)
#ADD_BE_TEST(thread_resource_mgr_test)
add_library(Runtime STATIC
${RUNTIME_FILES}
)
1 change: 0 additions & 1 deletion be/src/runtime/decimal_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include "runtime/decimal_value.h"

#include <mysql/mysql.h>
#include <algorithm>
#include <iostream>
#include <utility>
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/mysql_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ MysqlTableSink::~MysqlTableSink() {
}

Status MysqlTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(MysqlTableSink::init(t_sink));
RETURN_IF_ERROR(DataSink::init(t_sink));
const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink;

_conn_info.host = t_mysql_sink.host;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/mysql_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
// specific language governing permissions and limitations
// under the License.

#include <mysql/mysql.h>

#define __DorisMysql MYSQL
#include "runtime/mysql_table_writer.h"

#include <sstream>

#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "exprs/expr.h"
Expand Down
7 changes: 5 additions & 2 deletions be/src/runtime/mysql_table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

#include <string>
#include <vector>
#include <mysql/mysql.h>

#include "common/status.h"

#ifndef __DorisMysql
#define __DorisMysql void
#endif

namespace doris {

struct MysqlConnInfo {
Expand Down Expand Up @@ -67,7 +70,7 @@ class MysqlTableWriter {

const std::vector<ExprContext*>& _output_expr_ctxs;
std::string _mysql_tbl;
MYSQL* _mysql_conn;
__DorisMysql* _mysql_conn;
};

}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
#include "util/disk_info.h"
#include "util/file_utils.h"
#include "util/pretty_printer.h"
#include "util/mysql_load_error_hub.h"
#include "util/load_error_hub.h"
#include "runtime/mem_tracker.h"
#include "runtime/bufferpool/reservation_tracker.h"

Expand Down
25 changes: 11 additions & 14 deletions be/src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/util")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/util")

add_library(Util STATIC
set(UTIL_FILES
arena.cpp
bfd_parser.cpp
bitmap.cpp
Expand Down Expand Up @@ -62,7 +62,6 @@ add_library(Util STATIC
spinlock.cc
filesystem_util.cc
load_error_hub.cpp
mysql_load_error_hub.cpp
broker_load_error_hub.cpp
null_load_error_hub.cpp
time.cpp
Expand All @@ -77,15 +76,13 @@ add_library(Util STATIC
frontend_helper.cpp
)

#ADD_BE_TEST(integer-array-test)
#ADD_BE_TEST(runtime-profile-test)
#ADD_BE_TEST(benchmark-test)
#ADD_BE_TEST(decompress-test)
#ADD_BE_TEST(metrics-test)
#ADD_BE_TEST(debug-util-test)
#ADD_BE_TEST(url-coding-test)
#ADD_BE_TEST(thrift-util-test)
#ADD_BE_TEST(bit-util-test)
#ADD_BE_TEST(rle-test)
##ADD_BE_TEST(perf-counters-test)
##ADD_BE_TEST(es-scan-reader-test)
if (WITH_MYSQL)
set(UTIL_FILES ${UTIL_FILES}
mysql_load_error_hub.cpp
)
endif()

add_library(Util STATIC
${UTIL_FILES}
)

4 changes: 4 additions & 0 deletions be/src/util/load_error_hub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ Status LoadErrorHub::create_hub(

switch (t_hub_info->type) {
case TErrorHubType::MYSQL:
#ifdef DORIS_WITH_MYSQL
tmp_hub = new MysqlLoadErrorHub(t_hub_info->mysql_info);
tmp_hub->prepare();
hub->reset(tmp_hub);
break;
#else
return Status("Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
#endif
case TErrorHubType::BROKER: {
// the origin file name may contains __shard_0/xxx
// replace the '/' with '_'
Expand Down
3 changes: 3 additions & 0 deletions be/src/util/mysql_load_error_hub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

#include <mysql/mysql.h>

#define __DorisMysql MYSQL
#include "mysql_load_error_hub.h"

#include "util/defer_op.h"
Expand Down
Loading