From 01b474c6e705e458a706b91164316bd65c9ff7b1 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 22 Mar 2024 14:48:02 +0800 Subject: [PATCH] [Fix](compress) Fix occasional crushes when serializing blocks --- be/src/vec/core/block.cpp | 4 +- .../java/org/apache/doris/catalog/Env.java | 22 +++--- .../org/apache/doris/qe/SessionVariable.java | 13 ++++ .../java/org/apache/doris/qe/VariableMgr.java | 71 ++----------------- 4 files changed, 33 insertions(+), 77 deletions(-) diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 047c7029a2e185..c93bfb11f09d6d 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -868,9 +868,9 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); } *uncompressed_bytes = content_uncompressed_size; - const size_t serialize_bytes = buf - column_values.data(); + const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; *compressed_bytes = serialize_bytes; - column_values.resize(serialize_bytes + STREAMVBYTE_PADDING); + column_values.resize(serialize_bytes); // compress if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 50c8b23919387b..3241a318715d56 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -229,6 +229,7 @@ import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.QueryCancelWorker; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; @@ -1433,6 +1434,7 @@ private void getHelperNodeFromDeployManager() throws Exception { } } + @SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"}) private void transferToMaster() { // stop replayer if (replayer != null) { @@ -1500,24 +1502,24 @@ private void transferToMaster() { // because the default parallelism of pipeline engine is higher than previous version. // so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum; - VariableMgr.setGlobalPipelineTask(newVal); - LOG.info("upgrade FE from 1.x to 2.0, set parallel_pipeline_task_num " - + "to parallel_fragment_exec_instance_num: {}", newVal); + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, + String.valueOf(newVal)); // similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x // if the default value has been upgraded double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor(); - VariableMgr.setGlobalBroadcastScaleFactor(newBcFactorVal); - LOG.info("upgrade FE from 1.x to 2.x, set broadcast_right_table_scale_factor " - + "to new default value: {}", newBcFactorVal); + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", + SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, + String.valueOf(newBcFactorVal)); // similar reason as above, need to upgrade enable_nereids_planner to true - VariableMgr.enableNereidsPlanner(); - LOG.info("upgrade FE from 1.x to 2.x, set enable_nereids_planner to new default value: true"); + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER, + "true"); } if (journalVersion <= FeMetaVersion.VERSION_123) { - VariableMgr.enableNereidsDml(); - LOG.info("upgrade FE from 2.0 to 2.1, set enable_nereids_dml to new default value: true"); + VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, "true"); + VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", + SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 25076e803f65a6..0ab3658ad899be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1757,6 +1757,7 @@ public void initFuzzyModeVariables() { this.enableFunctionPushdown = true; this.enableDeleteSubPredicateV2 = true; } + /* switch (randomInt) { case 0: @@ -1807,6 +1808,18 @@ public void initFuzzyModeVariables() { } else { this.enableFoldConstantByBe = true; } + + switch (Config.pull_request_id % 3) { + case 0: + this.fragmentTransmissionCompressionCodec = "snappy"; + break; + case 1: + this.fragmentTransmissionCompressionCodec = "lz4"; + break; + default: + this.fragmentTransmissionCompressionCodec = "none"; + } + this.runtimeFilterType = 1 << randomInt; this.enableParallelScan = Config.pull_request_id % 2 == 0 ? randomInt % 2 == 0 : randomInt % 1 == 0; switch (randomInt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index e83fd474dafd73..6e75f17a04222c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -389,84 +389,25 @@ private static void setGlobalVarAndWriteEditLog(VarContext ctx, String name, Str } } - public static void setGlobalPipelineTask(int instance) { + public static void refreshDefaultSessionVariables(String versionMsg, String sessionVar, String value) { wlock.lock(); try { - VarContext ctx = ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM); + VarContext ctx = ctxByVarName.get(sessionVar); try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(instance)); + setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), value); } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e); + LOG.warn("failed to set global variable: {}", sessionVar, e); return; } // write edit log GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM)); - Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); - } finally { - wlock.unlock(); - } - } - - public static void setGlobalBroadcastScaleFactor(double factor) { - wlock.lock(); - try { - VarContext ctx = ctxByVarName.get(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR); - try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(factor)); - } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, e); - return; - } - - // write edit log - GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR)); - Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); - } finally { - wlock.unlock(); - } - } - - public static void enableNereidsPlanner() { - wlock.lock(); - try { - VarContext ctx = ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_PLANNER); - try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(true)); - } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.ENABLE_NEREIDS_PLANNER, e); - return; - } - - // write edit log - GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_PLANNER)); - Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); - } finally { - wlock.unlock(); - } - } - - public static void enableNereidsDml() { - wlock.lock(); - try { - VarContext ctx = ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_DML); - try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(true)); - } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.ENABLE_NEREIDS_DML, e); - return; - } - - // write edit log - GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_DML)); + Lists.newArrayList(sessionVar)); Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); } finally { wlock.unlock(); } + LOG.info("upgrade FE from {}, set {} to new default value: {}", versionMsg, sessionVar, value); } public static void setLowerCaseTableNames(int mode) throws DdlException {