From 54be83c137df02c0f2a993205986a0fba5611719 Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Tue, 17 May 2022 18:28:35 +0800 Subject: [PATCH 1/8] add tablet erros when close_wait return error --- be/src/olap/delta_writer.cpp | 11 +---------- be/src/olap/delta_writer.h | 3 +-- be/src/runtime/tablets_channel.cpp | 11 +++++++++-- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index b40677e2b044ae..1890efd0e90fd7 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -311,7 +311,6 @@ Status DeltaWriter::close() { } Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec, - google::protobuf::RepeatedPtrField* tablet_errors, bool is_broken) { std::lock_guard l(_lock); DCHECK(_is_init) @@ -322,15 +321,7 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* } // return error if previous flush failed - Status s = _flush_token->wait(); - if (!s.ok()) { -#ifndef BE_TEST - PTabletError* tablet_error = tablet_errors->Add(); - tablet_error->set_tablet_id(_tablet->tablet_id()); - tablet_error->set_msg(s.get_error_msg()); -#endif - return s; - } + RETURN_NOT_OK(_flush_token->wait()); // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 210c4b3eec626f..1aa294f5426c2e 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -68,7 +68,6 @@ class DeltaWriter { // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. Status close_wait(google::protobuf::RepeatedPtrField* tablet_vec, - google::protobuf::RepeatedPtrField* tablet_errors, bool is_broken); // abandon current memtable and wait for all pending-flushing memtables to be destructed. @@ -133,4 +132,4 @@ class DeltaWriter { int64_t _mem_consumption_snapshot = 0; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index bc3535e88ba4dd..8417ec01a30caf 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -129,9 +129,16 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, for (auto writer : need_wait_writers) { // close may return failed, but no need to handle it here. // tablet_vec will only contains success tablet, and then let FE judge it. - writer->close_wait( - tablet_vec, tablet_errors, + Status st = writer->close_wait( + tablet_vec, (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end())); + if (!st.ok()) { +#ifndef BE_TEST + PTabletError* tablet_error = tablet_errors->Add(); + tablet_error->set_tablet_id(writer->tablet_id()); + tablet_error->set_msg(st.get_error_msg()); +#endif + } } } return Status::OK(); From fde516cad8ce0090268b90a3ea3cb2798e25cc22 Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Tue, 17 May 2022 18:48:29 +0800 Subject: [PATCH 2/8] add tablet erros when close_wait return error --- be/test/olap/delta_writer_test.cpp | 12 ++++++------ be/test/olap/engine_storage_migration_task_test.cpp | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 2096900dbf5737..019d252581e8cb 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -367,7 +367,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(nullptr, false); EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); @@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(nullptr, false); EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); @@ -475,7 +475,7 @@ TEST_F(TestDeltaWriter, write) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(nullptr, false); EXPECT_EQ(Status::OK(), res); // publish version success @@ -609,7 +609,7 @@ TEST_F(TestDeltaWriter, vec_write) { res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(nullptr, false); ASSERT_TRUE(res.ok()); // publish version success @@ -687,7 +687,7 @@ TEST_F(TestDeltaWriter, sequence_col) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(nullptr, false); EXPECT_EQ(Status::OK(), res); // publish version success @@ -772,7 +772,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(nullptr, false); ASSERT_TRUE(res.ok()); // publish version success diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index df5f685eff728e..7d70f1cb01e9c0 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -194,7 +194,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(nullptr, false); EXPECT_EQ(Status::OK(), res); // publish version success From 86ff4374423197632e6e46cba58114a9d692606a Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Wed, 18 May 2022 11:28:07 +0800 Subject: [PATCH 3/8] add tablet erros when close_wait return error --- be/src/olap/delta_writer.cpp | 11 +------- be/src/olap/delta_writer.h | 5 ++-- be/src/runtime/tablets_channel.cpp | 28 ++++++++++++------- be/src/runtime/tablets_channel.h | 5 ++++ be/test/olap/delta_writer_test.cpp | 12 ++++---- .../engine_storage_migration_task_test.cpp | 2 +- 6 files changed, 34 insertions(+), 29 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1890efd0e90fd7..ae4dd2d0edad10 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -310,8 +310,7 @@ Status DeltaWriter::close() { return Status::OK(); } -Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec, - bool is_broken) { +Status DeltaWriter::close_wait() { std::lock_guard l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; @@ -337,14 +336,6 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* return res; } -#ifndef BE_TEST - if (!is_broken) { - PTabletInfo* tablet_info = tablet_vec->Add(); - tablet_info->set_tablet_id(_tablet->tablet_id()); - tablet_info->set_schema_hash(_tablet->schema_hash()); - } -#endif - _delta_written_success = true; const FlushStatistic& stat = _flush_token->get_stats(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 1aa294f5426c2e..3d49c302d34247 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -67,8 +67,7 @@ class DeltaWriter { Status close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - Status close_wait(google::protobuf::RepeatedPtrField* tablet_vec, - bool is_broken); + Status close_wait(); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. @@ -90,6 +89,8 @@ class DeltaWriter { int64_t tablet_id() { return _tablet->tablet_id(); } + int32_t schema_hash() { return _tablet->schema_hash(); } + int64_t save_mem_consumption_snapshot(); int64_t get_mem_consumption_snapshot() const; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 8417ec01a30caf..54740a4d5e74ee 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -129,21 +129,29 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, for (auto writer : need_wait_writers) { // close may return failed, but no need to handle it here. // tablet_vec will only contains success tablet, and then let FE judge it. - Status st = writer->close_wait( - tablet_vec, - (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end())); - if (!st.ok()) { -#ifndef BE_TEST - PTabletError* tablet_error = tablet_errors->Add(); - tablet_error->set_tablet_id(writer->tablet_id()); - tablet_error->set_msg(st.get_error_msg()); -#endif - } + _close_wait(writer, tablet_vec, tablet_errors); } } return Status::OK(); } +void TabletsChannel::_close_wait(DeltaWriter* writer, + google::protobuf::RepeatedPtrField* tablet_vec, + google::protobuf::RepeatedPtrField* tablet_errors) { + Status st = writer->close_wait(); + if (st.ok()) { + if (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()) { + PTabletInfo* tablet_info = tablet_vec->Add(); + tablet_info->set_tablet_id(writer->tablet_id()); + tablet_info->set_schema_hash(writer->schema_hash()); + } + } else { + PTabletError* tablet_error = tablet_errors->Add(); + tablet_error->set_tablet_id(writer->tablet_id()); + tablet_error->set_msg(st.get_error_msg()); + } +} + Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { std::lock_guard l(_lock); if (_state == kFinished) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 5ad955c61c56f5..b3adfffb761891 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -97,6 +97,11 @@ class TabletsChannel { // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& request); + // deal with DeltaWriter close_wait(), add tablet to list for return. + void _close_wait(DeltaWriter* writer, + google::protobuf::RepeatedPtrField* tablet_vec, + google::protobuf::RepeatedPtrField* tablet_error); + // id of this load channel TabletsChannelKey _key; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 019d252581e8cb..2326bd156a3060 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -367,7 +367,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); @@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); @@ -475,7 +475,7 @@ TEST_F(TestDeltaWriter, write) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); // publish version success @@ -609,7 +609,7 @@ TEST_F(TestDeltaWriter, vec_write) { res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(nullptr, false); + res = delta_writer->close_wait(); ASSERT_TRUE(res.ok()); // publish version success @@ -687,7 +687,7 @@ TEST_F(TestDeltaWriter, sequence_col) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); // publish version success @@ -772,7 +772,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(nullptr, false); + res = delta_writer->close_wait(); ASSERT_TRUE(res.ok()); // publish version success diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 7d70f1cb01e9c0..608d4b60212f7e 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -194,7 +194,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); // publish version success From a000fe04b4b42c2ddf9578c85273a40df46fec36 Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Wed, 18 May 2022 14:25:49 +0800 Subject: [PATCH 4/8] add tablet erros when close_wait return error --- be/src/runtime/tablets_channel.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index b3adfffb761891..7c3bb3300cc387 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -99,8 +99,8 @@ class TabletsChannel { // deal with DeltaWriter close_wait(), add tablet to list for return. void _close_wait(DeltaWriter* writer, - google::protobuf::RepeatedPtrField* tablet_vec, - google::protobuf::RepeatedPtrField* tablet_error); + google::protobuf::RepeatedPtrField* tablet_vec, + google::protobuf::RepeatedPtrField* tablet_error); // id of this load channel TabletsChannelKey _key; From 5af2e1f40bf7d4e2baae289b20cdf2e0c00c6a2d Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Wed, 18 May 2022 19:04:36 +0800 Subject: [PATCH 5/8] add tablet erros when close_wait return error --- be/src/runtime/tablets_channel.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 54740a4d5e74ee..e27993b97b39df 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -139,6 +139,7 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, google::protobuf::RepeatedPtrField* tablet_vec, google::protobuf::RepeatedPtrField* tablet_errors) { Status st = writer->close_wait(); +#ifndef BE_TEST if (st.ok()) { if (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()) { PTabletInfo* tablet_info = tablet_vec->Add(); @@ -150,6 +151,7 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, tablet_error->set_tablet_id(writer->tablet_id()); tablet_error->set_msg(st.get_error_msg()); } +#endif } Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { From 240b9ca5ca1e2a0e6243a3474437f922159201b3 Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Fri, 20 May 2022 11:13:54 +0800 Subject: [PATCH 6/8] return error tablet list when close_wait return error --- be/src/runtime/tablets_channel.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index e27993b97b39df..7c6ff67a1cf426 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -141,7 +141,7 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, Status st = writer->close_wait(); #ifndef BE_TEST if (st.ok()) { - if (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()) { + if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) { PTabletInfo* tablet_info = tablet_vec->Add(); tablet_info->set_tablet_id(writer->tablet_id()); tablet_info->set_schema_hash(writer->schema_hash()); From 5f6fa652b8321e387b4a58b10274e7d18e896ae6 Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Fri, 20 May 2022 11:28:42 +0800 Subject: [PATCH 7/8] return error tablet list when close_wait return error --- be/src/exec/parquet_reader.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index c62054800d733d..45d5bc5836d78b 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -31,7 +31,8 @@ #include #include - + +#include "common/config.h" #include "common/status.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/PlanNodes_types.h" From f230095ce3bb3a98710457043c5d959870fe546e Mon Sep 17 00:00:00 2001 From: pengxianyu Date: Fri, 20 May 2022 16:53:06 +0800 Subject: [PATCH 8/8] return error tablet list when close_wait return error --- be/src/runtime/tablets_channel.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 7c6ff67a1cf426..8010956cc4e5fa 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -139,7 +139,6 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, google::protobuf::RepeatedPtrField* tablet_vec, google::protobuf::RepeatedPtrField* tablet_errors) { Status st = writer->close_wait(); -#ifndef BE_TEST if (st.ok()) { if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) { PTabletInfo* tablet_info = tablet_vec->Add(); @@ -151,7 +150,6 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, tablet_error->set_tablet_id(writer->tablet_id()); tablet_error->set_msg(st.get_error_msg()); } -#endif } Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {