From 5040387f5bc4e9ddc32d81186a24632f6ab1dced Mon Sep 17 00:00:00 2001 From: chaoyli Date: Thu, 14 Mar 2019 10:26:41 +0800 Subject: [PATCH] Fix transaction non-idempotency error add_pending_version() is not idempotent upon rpc retry. Transaction will be garbaged collection falsely. --- be/src/olap/delta_writer.cpp | 11 ++++++++--- be/src/olap/olap_header.cpp | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 85372458e95ffb..a30a4fe3ddee03 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -182,9 +182,11 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* t } RETURN_NOT_OK(_mem_table->close(_writer)); - OLAPStatus res = OLAP_SUCCESS; + OLAPStatus res = _table->add_pending_version(_req.partition_id, _req.transaction_id, nullptr); + if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { + return res; + } //add pending data to tablet - RETURN_NOT_OK(_table->add_pending_version(_req.partition_id, _req.transaction_id, nullptr)); for (SegmentGroup* segment_group : _segment_group_vec) { RETURN_NOT_OK(_table->add_pending_segment_group(segment_group)); RETURN_NOT_OK(segment_group->load()); @@ -208,7 +210,10 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* t return res; } - RETURN_NOT_OK(_new_table->add_pending_version(_req.partition_id, _req.transaction_id, nullptr)); + res = _new_table->add_pending_version(_req.partition_id, _req.transaction_id, nullptr); + if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { + return res; + } for (SegmentGroup* segment_group : _new_segment_group_vec) { RETURN_NOT_OK(_new_table->add_pending_segment_group(segment_group)); RETURN_NOT_OK(segment_group->load()); diff --git a/be/src/olap/olap_header.cpp b/be/src/olap/olap_header.cpp index f8d6d439395120..5a4675f1cc73f1 100644 --- a/be/src/olap/olap_header.cpp +++ b/be/src/olap/olap_header.cpp @@ -294,7 +294,7 @@ OLAPStatus OLAPHeader::add_pending_version( if (pending_delta(i).transaction_id() == transaction_id) { LOG(WARNING) << "pending delta already exists in header." << "transaction_id: " << transaction_id; - return OLAP_ERR_HEADER_ADD_PENDING_DELTA; + return OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST; } }