Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
// create writer based on sink type
switch (p._sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile));
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
} else {
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<false>(
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
default:
return Status::InternalError("Unknown result sink type");
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ class RuntimeState {
_query_options.enable_common_expr_pushdown_for_inverted_index;
};

bool mysql_row_binary_format() const {
return _query_options.__isset.mysql_row_binary_format &&
_query_options.mysql_row_binary_format;
}

bool enable_faster_float_convert() const {
return _query_options.__isset.faster_float_convert && _query_options.faster_float_convert;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/jsonb/serialize.h"
#include "vec/sink/vmysql_result_writer.cpp"
#include "vec/sink/vmysql_result_writer.h"

namespace doris {
Expand Down
12 changes: 9 additions & 3 deletions be/src/vec/sink/vresult_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,16 @@ Status VResultSink::prepare(RuntimeState* state) {

// create writer based on sink type
switch (_sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
_writer.reset(new (std::nothrow)
VMysqlResultWriter(_sender.get(), _output_vexpr_ctxs, _profile));
case TResultSinkType::MYSQL_PROTOCAL: {
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow) VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
} else {
_writer.reset(new (std::nothrow) VMysqlResultWriter<false>(
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ public boolean isInvalid() {
return isScalarType(PrimitiveType.INVALID_TYPE);
}

public boolean isUnsupported() {
return isScalarType(PrimitiveType.UNSUPPORTED);
}

public boolean isValid() {
return !isInvalid();
}
Expand Down
8 changes: 6 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,11 @@ stmt ::=
| insert_overwrite_stmt : stmt
{: RESULT = stmt; :}
| update_stmt : stmt
{: RESULT = stmt; :}
{:
RESULT = stmt;
stmt.setPlaceHolders(parser.placeholder_expr_list);
parser.placeholder_expr_list.clear();
:}
| backup_stmt : stmt
{: RESULT = stmt; :}
| restore_stmt : stmt
Expand Down Expand Up @@ -5786,7 +5790,7 @@ expr_or_default ::=
prepare_stmt ::=
KW_PREPARE variable_name:name KW_FROM select_stmt:s
{:
RESULT = new PrepareStmt(s, name, false);
RESULT = new PrepareStmt(s, name);
s.setPlaceHolders(parser.placeholder_expr_list);
parser.placeholder_expr_list.clear();
:}
Expand Down
16 changes: 8 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,14 @@ public int getCallDepth() {
return callDepth;
}

public void setPrepareStmt(PrepareStmt stmt) {
prepareStmt = stmt;
}

public PrepareStmt getPrepareStmt() {
return prepareStmt;
}

public void setInlineView(boolean inlineView) {
isInlineView = inlineView;
}
Expand All @@ -630,14 +638,6 @@ public void setExplicitViewAlias(String alias) {
explicitViewAlias = alias;
}

public void setPrepareStmt(PrepareStmt stmt) {
prepareStmt = stmt;
}

public PrepareStmt getPrepareStmt() {
return prepareStmt;
}

public String getExplicitViewAlias() {
return explicitViewAlias;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,13 @@ public Pair<SlotRef, Expr> extract() {
public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
super.analyzeImpl(analyzer);
this.checkIncludeBitmap();
// Ignore placeholder
if (getChild(0) instanceof PlaceHolderExpr || getChild(1) instanceof PlaceHolderExpr) {
// Ignore placeholder, when it type is invalid.
// Invalid type could happen when analyze prepared point query select statement,
// since the value is occupied but not assigned
if ((getChild(0) instanceof PlaceHolderExpr && getChild(0).type == Type.UNSUPPORTED)
|| (getChild(1) instanceof PlaceHolderExpr && getChild(1).type == Type.UNSUPPORTED)) {
return;
}

for (Expr expr : children) {
if (expr instanceof Subquery) {
Subquery subquery = (Subquery) expr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ private void checkHllCompatibility() throws AnalysisException {
* failure to convert a string literal to a date literal
*/
public final Expr castTo(Type targetType) throws AnalysisException {
if (this instanceof PlaceHolderExpr && this.type.isInvalid()) {
if (this instanceof PlaceHolderExpr && this.type.isUnsupported()) {
return this;
}
// If the targetType is NULL_TYPE then ignore the cast because NULL_TYPE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,6 @@ public String toString() {
return getStringValue();
}

// Parse from binary data, the format follows mysql binary protocal
// see https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_binary_resultset.html.
// Return next offset
public void setupParamFromBinary(ByteBuffer data) {
Preconditions.checkState(false,
"should implement this in derived class. " + this.type.toSql());
}

public static LiteralExpr getLiteralByMysqlType(int mysqlType) throws AnalysisException {
switch (mysqlType) {
// MYSQL_TYPE_TINY
Expand Down Expand Up @@ -499,4 +491,12 @@ public static LiteralExpr getLiteralExprFromThrift(TExprNode node) throws Analys
default: throw new AnalysisException("Wrong type from thrift;");
}
}

// Parse from binary data, the format follows mysql binary protocal
// see https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_binary_resultset.html.
// Return next offset
public void setupParamFromBinary(ByteBuffer data) {
Preconditions.checkState(false,
"should implement this in derived class. " + this.type.toSql());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ enum InsertType {
}
}

public NativeInsertStmt(NativeInsertStmt other) {
super(other.label, null, null);
this.tblName = other.tblName;
this.targetPartitionNames = other.targetPartitionNames;
this.label = other.label;
this.queryStmt = other.queryStmt;
this.planHints = other.planHints;
this.targetColumnNames = other.targetColumnNames;
this.isValuesOrConstantSelect = other.isValuesOrConstantSelect;
}

public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints) {
super(new LabelName(null, label), null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class PlaceHolderExpr extends LiteralExpr {
int mysqlTypeCode = -1;

public PlaceHolderExpr() {

type = Type.UNSUPPORTED;
}

public void setTypeCode(int mysqlTypeCode) {
Expand All @@ -48,10 +48,12 @@ public void setTypeCode(int mysqlTypeCode) {

protected PlaceHolderExpr(LiteralExpr literal) {
this.lExpr = literal;
this.type = literal.getType();
}

protected PlaceHolderExpr(PlaceHolderExpr other) {
this.lExpr = other.lExpr;
this.type = other.type;
}

public void setLiteral(LiteralExpr literal) {
Expand Down Expand Up @@ -164,7 +166,17 @@ public Expr clone() {

@Override
public String toSqlImpl() {
return getStringValue();
if (this.lExpr == null) {
return "?";
}
return "_placeholder_(" + this.lExpr.toSqlImpl() + ")";
}

// @Override
public Expr reset() {
this.lExpr = null;
this.type = Type.UNSUPPORTED;
return this;
}

@Override
Expand Down
Loading