Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -868,9 +868,9 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
}
*uncompressed_bytes = content_uncompressed_size;
const size_t serialize_bytes = buf - column_values.data();
const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING;
*compressed_bytes = serialize_bytes;
column_values.resize(serialize_bytes + STREAMVBYTE_PADDING);
column_values.resize(serialize_bytes);

// compress
if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) {
Expand Down
22 changes: 12 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.QueryCancelWorker;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
Expand Down Expand Up @@ -1433,6 +1434,7 @@ private void getHelperNodeFromDeployManager() throws Exception {
}
}

@SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
private void transferToMaster() {
// stop replayer
if (replayer != null) {
Expand Down Expand Up @@ -1500,24 +1502,24 @@ private void transferToMaster() {
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.setGlobalPipelineTask(newVal);
LOG.info("upgrade FE from 1.x to 2.0, set parallel_pipeline_task_num "
+ "to parallel_fragment_exec_instance_num: {}", newVal);
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));

// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.setGlobalBroadcastScaleFactor(newBcFactorVal);
LOG.info("upgrade FE from 1.x to 2.x, set broadcast_right_table_scale_factor "
+ "to new default value: {}", newBcFactorVal);
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));

// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.enableNereidsPlanner();
LOG.info("upgrade FE from 1.x to 2.x, set enable_nereids_planner to new default value: true");
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.enableNereidsDml();
LOG.info("upgrade FE from 2.0 to 2.1, set enable_nereids_dml to new default value: true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, "true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
}
}

Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,7 @@ public void initFuzzyModeVariables() {
this.enableFunctionPushdown = true;
this.enableDeleteSubPredicateV2 = true;
}

/*
switch (randomInt) {
case 0:
Expand Down Expand Up @@ -1807,6 +1808,18 @@ public void initFuzzyModeVariables() {
} else {
this.enableFoldConstantByBe = true;
}

switch (Config.pull_request_id % 3) {
case 0:
this.fragmentTransmissionCompressionCodec = "snappy";
break;
case 1:
this.fragmentTransmissionCompressionCodec = "lz4";
break;
default:
this.fragmentTransmissionCompressionCodec = "none";
}

this.runtimeFilterType = 1 << randomInt;
this.enableParallelScan = Config.pull_request_id % 2 == 0 ? randomInt % 2 == 0 : randomInt % 1 == 0;
switch (randomInt) {
Expand Down
71 changes: 6 additions & 65 deletions fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,84 +389,25 @@ private static void setGlobalVarAndWriteEditLog(VarContext ctx, String name, Str
}
}

public static void setGlobalPipelineTask(int instance) {
public static void refreshDefaultSessionVariables(String versionMsg, String sessionVar, String value) {
wlock.lock();
try {
VarContext ctx = ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM);
VarContext ctx = ctxByVarName.get(sessionVar);
try {
setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(instance));
setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), value);
} catch (DdlException e) {
LOG.warn("failed to set global variable: {}", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e);
LOG.warn("failed to set global variable: {}", sessionVar, e);
return;
}

// write edit log
GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable,
Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM));
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
} finally {
wlock.unlock();
}
}

public static void setGlobalBroadcastScaleFactor(double factor) {
wlock.lock();
try {
VarContext ctx = ctxByVarName.get(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR);
try {
setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(factor));
} catch (DdlException e) {
LOG.warn("failed to set global variable: {}", SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, e);
return;
}

// write edit log
GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable,
Lists.newArrayList(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR));
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
} finally {
wlock.unlock();
}
}

public static void enableNereidsPlanner() {
wlock.lock();
try {
VarContext ctx = ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_PLANNER);
try {
setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(true));
} catch (DdlException e) {
LOG.warn("failed to set global variable: {}", SessionVariable.ENABLE_NEREIDS_PLANNER, e);
return;
}

// write edit log
GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable,
Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_PLANNER));
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
} finally {
wlock.unlock();
}
}

public static void enableNereidsDml() {
wlock.lock();
try {
VarContext ctx = ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_DML);
try {
setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(true));
} catch (DdlException e) {
LOG.warn("failed to set global variable: {}", SessionVariable.ENABLE_NEREIDS_DML, e);
return;
}

// write edit log
GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable,
Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_DML));
Lists.newArrayList(sessionVar));
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
} finally {
wlock.unlock();
}
LOG.info("upgrade FE from {}, set {} to new default value: {}", versionMsg, sessionVar, value);
}

public static void setLowerCaseTableNames(int mode) throws DdlException {
Expand Down