Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
12 changes: 12 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ set(DORIS_LINK_LIBS
DorisGen
Webserver
Geo
Vec
Plugin
${WL_END_GROUP}
)
Expand Down Expand Up @@ -652,6 +653,7 @@ endif()

add_subdirectory(${SRC_DIR}/util)
add_subdirectory(${SRC_DIR}/plugin)
add_subdirectory(${SRC_DIR}/vec)

# Utility CMake function to make specifying tests and benchmarks less verbose
FUNCTION(ADD_BE_TEST TEST_NAME)
Expand Down Expand Up @@ -704,6 +706,11 @@ if (${MAKE_TEST} STREQUAL "ON")
add_subdirectory(${TEST_DIR}/runtime)
add_subdirectory(${TEST_DIR}/udf)
add_subdirectory(${TEST_DIR}/util)
add_subdirectory(${TEST_DIR}/vec/core)
add_subdirectory(${TEST_DIR}/vec/exprs)
add_subdirectory(${TEST_DIR}/vec/function)
add_subdirectory(${TEST_DIR}/vec/runtime)
add_subdirectory(${TEST_DIR}/vec/aggregate_functions)
add_subdirectory(${TEST_DIR}/plugin)
add_subdirectory(${TEST_DIR}/plugin/example)
add_subdirectory(${TEST_DIR}/tools)
Expand All @@ -727,3 +734,8 @@ install(FILES
${BASE_DIR}/../conf/odbcinst.ini
DESTINATION ${OUTPUT_DIR}/conf)


get_property(dirs DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES)
foreach(dir ${dirs})
message(STATUS "dir='${dir}'")
endforeach()
1 change: 1 addition & 0 deletions be/src/agent/topic_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "agent/topic_subscriber.h"

#include "common/logging.h"
#include <mutex>

namespace doris {

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/blocking_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace doris {
BlockingJoinNode::BlockingJoinNode(const std::string& node_name, const TJoinOp::type join_op,
ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs), _node_name(node_name), _join_op(join_op) {}
: ExecNode(pool, tnode, descs), _node_name(node_name), _join_op(join_op),
_left_side_eos(false) {}

Status BlockingJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
return ExecNode::init(tnode, state);
Expand Down
15 changes: 13 additions & 2 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
#include "runtime/result_file_sink.h"
#include "runtime/result_sink.h"
#include "runtime/runtime_state.h"
#include "util/logging.h"

#include "vec/sink/result_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vtablet_sink.h"

namespace doris {

Expand All @@ -57,6 +60,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
: false;
// TODO: figure out good buffer size based on size of output row
if (is_vec) {
tmp_sink = new doris::vectorized::VDataStreamSender(
pool, params.sender_id, row_desc, thrift_sink.stream_sink, params.destinations,
16 * 1024, send_query_statistics_with_every_batch);
} else {
tmp_sink = new DataStreamSender(pool, params.sender_id, row_desc,
thrift_sink.stream_sink, params.destinations, 16 * 1024,
Expand All @@ -73,6 +79,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink

// TODO: figure out good buffer size based on size of output row
if (is_vec) {
tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs, thrift_sink.result_sink, 4096);
} else {
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
}
Expand Down Expand Up @@ -149,7 +156,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(new stream_load::OlapTableSink(pool, row_desc, output_exprs, &status));
if (is_vec) {
sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new stream_load::OlapTableSink(pool, row_desc, output_exprs, &status));
}
RETURN_IF_ERROR(status);
break;
}
Expand Down
11 changes: 8 additions & 3 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ class ObjectPool;
class RowBatch;
class RuntimeProfile;
class RuntimeState;
class TPlanExecRequest;
class TPlanExecParams;
class TPlanFragmentExecParams;
class RowDescriptor;

namespace vectorized {
class Block;
}

// Superclass of all data sinks.
class DataSink {
public:
Expand All @@ -56,8 +58,11 @@ class DataSink {
// Send a row batch into this sink.
// eos should be true when the last batch is passed to send()
virtual Status send(RuntimeState* state, RowBatch* batch) = 0;
// virtual Status send(RuntimeState* state, RowBatch* batch, bool eos) = 0;

// Send a Block into this sink.
virtual Status send(RuntimeState* state, vectorized::Block* block) {
return Status::NotSupported("Not support send block");
};
// Releases all resources that were allocated in prepare()/send().
// Further send() calls are illegal after calling close().
// It must be okay to call this multiple times. Subsequent calls should
Expand Down
Loading