diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 613a74219526d8..11ee2424496061 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -291,9 +291,7 @@ Status DeltaWriter::close() { return Status::OK(); } -Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec, - google::protobuf::RepeatedPtrField* tablet_errors, - bool is_broken) { +Status DeltaWriter::close_wait() { std::lock_guard l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; @@ -303,15 +301,7 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* } // return error if previous flush failed - Status s = _flush_token->wait(); - if (!s.ok()) { -#ifndef BE_TEST - PTabletError* tablet_error = tablet_errors->Add(); - tablet_error->set_tablet_id(_tablet->tablet_id()); - tablet_error->set_msg(s.get_error_msg()); -#endif - return s; - } + RETURN_NOT_OK(_flush_token->wait()); // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); @@ -327,14 +317,6 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* return res; } -#ifndef BE_TEST - if (!is_broken) { - PTabletInfo* tablet_info = tablet_vec->Add(); - tablet_info->set_tablet_id(_tablet->tablet_id()); - tablet_info->set_schema_hash(_tablet->schema_hash()); - } -#endif - _delta_written_success = true; const FlushStatistic& stat = _flush_token->get_stats(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 210c4b3eec626f..3d49c302d34247 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -67,9 +67,7 @@ class DeltaWriter { Status close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - Status close_wait(google::protobuf::RepeatedPtrField* tablet_vec, - google::protobuf::RepeatedPtrField* tablet_errors, - bool is_broken); + Status close_wait(); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. @@ -91,6 +89,8 @@ class DeltaWriter { int64_t tablet_id() { return _tablet->tablet_id(); } + int32_t schema_hash() { return _tablet->schema_hash(); } + int64_t save_mem_consumption_snapshot(); int64_t get_mem_consumption_snapshot() const; @@ -133,4 +133,4 @@ class DeltaWriter { int64_t _mem_consumption_snapshot = 0; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index bc3535e88ba4dd..8010956cc4e5fa 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -129,14 +129,29 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, for (auto writer : need_wait_writers) { // close may return failed, but no need to handle it here. // tablet_vec will only contains success tablet, and then let FE judge it. - writer->close_wait( - tablet_vec, tablet_errors, - (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end())); + _close_wait(writer, tablet_vec, tablet_errors); } } return Status::OK(); } +void TabletsChannel::_close_wait(DeltaWriter* writer, + google::protobuf::RepeatedPtrField* tablet_vec, + google::protobuf::RepeatedPtrField* tablet_errors) { + Status st = writer->close_wait(); + if (st.ok()) { + if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) { + PTabletInfo* tablet_info = tablet_vec->Add(); + tablet_info->set_tablet_id(writer->tablet_id()); + tablet_info->set_schema_hash(writer->schema_hash()); + } + } else { + PTabletError* tablet_error = tablet_errors->Add(); + tablet_error->set_tablet_id(writer->tablet_id()); + tablet_error->set_msg(st.get_error_msg()); + } +} + Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { std::lock_guard l(_lock); if (_state == kFinished) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 5ad955c61c56f5..7c3bb3300cc387 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -97,6 +97,11 @@ class TabletsChannel { // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& request); + // deal with DeltaWriter close_wait(), add tablet to list for return. + void _close_wait(DeltaWriter* writer, + google::protobuf::RepeatedPtrField* tablet_vec, + google::protobuf::RepeatedPtrField* tablet_error); + // id of this load channel TabletsChannelKey _key; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 2096900dbf5737..2326bd156a3060 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -367,7 +367,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); @@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); @@ -475,7 +475,7 @@ TEST_F(TestDeltaWriter, write) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); // publish version success @@ -609,7 +609,7 @@ TEST_F(TestDeltaWriter, vec_write) { res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(); ASSERT_TRUE(res.ok()); // publish version success @@ -687,7 +687,7 @@ TEST_F(TestDeltaWriter, sequence_col) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); // publish version success @@ -772,7 +772,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(); ASSERT_TRUE(res.ok()); // publish version success diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index df5f685eff728e..608d4b60212f7e 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -194,7 +194,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(nullptr, nullptr, false); + res = delta_writer->close_wait(); EXPECT_EQ(Status::OK(), res); // publish version success