-
Notifications
You must be signed in to change notification settings - Fork 6.7k
Open
Description
Apply this patch to reproduce bug of CommitWithParallelValidate and CommitWithSerialValidate
- if immutable_db_options_.unordered_write is enabled, WriteCallback is invoked twice, first in WriteImplWALOnly, and then second in UnorderedWriteMemtable. This is easy to fix, for now I simply remove the second invocation
- if immutable_db_options_.unordered_write is enabled, CommitWithSerialValidate may fail to detect conflict with other OCC txn. The UnorderedWriteConflictDetectionBug reliabily reproduce this race: After a txn completes WAL-write but before completes memtable-write, another txn carring a conflict succeeds finishing validation
- CommitWithParallelValidate may not detect conflict if there is concurrent non OCC write. Since the lock in CommitWithParallelValidate only synchronize OCC txns.
These are serious bug because they violate the core semantic of OCC and may bring serious impact to users.
My suggestions:
- for the 2nd bug, since CommitWithSerialValidate is the default validation mode, we should not sacrifice too much simplicity for this rare case, let's disable unordered_write for CommitWithSerialValidate
- for the 3rd bug, inside CommitWithParallelValidate, we also need to block other concurrent db_impl->Write calls except those from OCC txns. This is reasonable from the performance perspective because it should be rare for users to mix OCC write with non OCC write.
More details: in CommitWithParallelValidate, we group all OCC txn threads in groupA, group other threads calling db_impl->Write in groupB, then we enforce this policy: a member in one group is free to run concurrently with another member in the same group, but it is not allowed to run concurrently with members in the other group.
diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc
index 91a2f0f4c..454ead00b 100644
--- a/db/db_impl/db_impl_write.cc
+++ b/db/db_impl/db_impl_write.cc
@@ -310,6 +310,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
if (immutable_db_options_.unordered_write) {
+#ifndef NDEBUG
+ static std::atomic<int> sync_counter{0};
+ int sync_index = sync_counter.fetch_add(1);
+ TEST_IDX_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteBeforeWriteWAL:", sync_index);
+#endif // NDEBUG
+
const size_t sub_batch_cnt = batch_cnt != 0
? batch_cnt
// every key is a sub-batch consuming a seq
@@ -901,7 +907,7 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
/*user_write_cb=*/nullptr, log_ref,
false /*disable_memtable*/);
- if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
+ if (w.ShouldWriteToMemtable()) {
w.sequence = seq;
size_t total_count = WriteBatchInternal::Count(my_batch);
InternalStats* stats = default_cf_internal_stats_;
diff --git a/utilities/transactions/optimistic_transaction.cc b/utilities/transactions/optimistic_transaction.cc
index bbd99575f..377d6d741 100644
--- a/utilities/transactions/optimistic_transaction.cc
+++ b/utilities/transactions/optimistic_transaction.cc
@@ -15,6 +15,7 @@
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
+#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/defer.h"
#include "util/string_util.h"
@@ -140,6 +141,12 @@ Status OptimisticTransaction::CommitWithParallelValidate() {
return s;
}
+ TEST_SYNC_POINT(
+ "OptimisticTransaction::CommitWithParallelValidate:AfterCheck");
+
+ TEST_SYNC_POINT(
+ "OptimisticTransaction::CommitWithParallelValidate:BeforeWrite");
+
s = db_impl->Write(write_options_, GetWriteBatch()->GetWriteBatch());
if (s.ok()) {
Clear();
@@ -193,12 +200,24 @@ Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
Status OptimisticTransaction::CheckTransactionForConflicts(DB* db) {
auto db_impl = static_cast_with_check<DBImpl>(db);
+#ifndef NDEBUG
+ static std::atomic<int> sync_counter{0};
+ int idx = sync_counter.fetch_add(1);
+ TEST_IDX_SYNC_POINT(
+ "OptimisticTransaction::CheckTransactionForConflicts:BeforeCheck:", idx);
+#endif // NDEBUG
+
// Since we are on the write thread and do not want to block other writers,
// we will do a cache-only conflict check. This can result in TryAgain
// getting returned if there is not sufficient memtable history to check
// for conflicts.
- return TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_,
+ Status s = TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_,
true /* cache_only */);
+#ifndef NDEBUG
+ TEST_IDX_SYNC_POINT(
+ "OptimisticTransaction::CheckTransactionForConflicts:AfterCheck:", idx);
+#endif // NDEBUG
+ return s;
}
Status OptimisticTransaction::SetName(const TransactionName& /* unused */) {
diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc
index 81def3902..67ff0f3ac 100644
--- a/utilities/transactions/optimistic_transaction_test.cc
+++ b/utilities/transactions/optimistic_transaction_test.cc
@@ -4,8 +4,10 @@
// (found in the LICENSE.Apache file in the root directory).
#include <cstdint>
+#include <condition_variable>
#include <functional>
#include <memory>
+#include <mutex>
#include <string>
#include <thread>
@@ -170,6 +172,157 @@ TEST_P(OptimisticTransactionTest, WriteConflictTest2) {
delete txn;
}
+// Test for unordered_write bug where conflicts are not detected
+// Only run with kValidateSerial since that's where the bug occurs
+TEST(OptimisticTransactionTest, UnorderedWriteConflictDetectionBug) {
+#ifdef NDEBUG
+ return;
+#endif // NDEBUG
+
+ // Setup database with unordered_write enabled
+ Options options;
+ options.create_if_missing = true;
+ options.unordered_write = true; // Enable unordered_write
+ options.allow_concurrent_memtable_write = true; // Required for unordered_write
+ options.max_write_buffer_size_to_maintain = 10000000; // Large enough for conflict check
+
+ std::string dbname = test::PerThreadDBPath("occ_unordered_write_bug_test");
+ DestroyDB(dbname, options);
+
+ OptimisticTransactionDB* txn_db;
+ ColumnFamilyOptions cf_options(options);
+ OptimisticTransactionDBOptions occ_opts;
+ occ_opts.validate_policy = OccValidationPolicy::kValidateSerial; // Bug only in serial mode
+
+ std::vector<ColumnFamilyDescriptor> column_families;
+ std::vector<ColumnFamilyHandle*> handles;
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+
+ Status s = OptimisticTransactionDB::Open(
+ DBOptions(options), occ_opts, dbname, column_families, &handles, &txn_db);
+
+ ASSERT_OK(s);
+ ASSERT_NE(txn_db, nullptr);
+ ASSERT_EQ(handles.size(), 1);
+ delete handles[0];
+
+ WriteOptions write_options;
+ ReadOptions read_options;
+
+
+ Status txn1_status;
+ Status txn2_status;
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ // Txn2 completes WAL (idx=1) before Txn1 checks (idx=2)
+ {
+ "DBImpl::WriteImpl:UnorderedWriteBeforeWriteWAL:0",
+ "OptimisticTransaction::CheckTransactionForConflicts:BeforeCheck:1"
+ },
+ // Txn1 finishes check (idx=2) before Txn2 writes to memtable (idx=1)
+ {
+ "OptimisticTransaction::CheckTransactionForConflicts:AfterCheck:1",
+ "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable",
+ },
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // Create both transactions BEFORE either commits
+ Transaction* txn1 = txn_db->BeginTransaction(write_options);
+ Transaction* txn2 = txn_db->BeginTransaction(write_options);
+
+ ASSERT_OK(txn1->Put("key", "txn1_value"));
+ ASSERT_OK(txn2->Put("key", "txn2_value"));
+
+ // Thread for txn2 - commits first
+ std::thread t2([&]() {
+ txn2_status = txn2->Commit();
+ delete txn2;
+ });
+
+ // Small delay to ensure txn2 starts first
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+
+ // Thread for txn1 - commits second
+ std::thread t1([&]() {
+ txn1_status = txn1->Commit();
+ delete txn1;
+ });
+
+ t1.join();
+ t2.join();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+
+ ASSERT_TRUE(txn1_status.IsBusy() || txn2_status.IsBusy())
+ << "Bug reproduced! Both transactions succeeded when at least one should fail.\n"
+ << "txn1 status: " << txn1_status.ToString() << "\n"
+ << "txn2 status: " << txn2_status.ToString();
+
+ delete txn_db;
+ DestroyDB(dbname, options);
+}
+
+
+TEST_P(OptimisticTransactionTest,
+ ParallelValidateMissesWriteBetweenCheckAndWrite) {
+ if (GetParam() != OccValidationPolicy::kValidateParallel) {
+ return;
+ }
+
+ WriteOptions write_options;
+ ReadOptions read_options;
+ OptimisticTransactionOptions txn_options;
+ txn_options.set_snapshot = true;
+
+ ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
+
+ Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
+ ASSERT_NE(txn, nullptr);
+ ASSERT_OK(txn->Put("foo", "txn-value"));
+
+ // Coordinate using LoadDependency:
+ // AfterCheck blocks until external put starts
+ // BeforeWrite blocks until external put finishes
+ SyncPoint::GetInstance()->LoadDependency({
+ {"OptimisticTransaction::CommitWithParallelValidate:AfterCheck",
+ "ParallelValidateTest::BeforeExternalPut"},
+ {"ParallelValidateTest::AfterExternalPut",
+ "OptimisticTransaction::CommitWithParallelValidate:BeforeWrite"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Status commit_status;
+ std::thread commit_thread([&] { commit_status = txn->Commit(); });
+
+ // Do external put in separate thread
+ Status external_put;
+ std::thread external_thread([&]() {
+ TEST_SYNC_POINT("ParallelValidateTest::BeforeExternalPut");
+ external_put = txn_db->Put(write_options, "foo", "external");
+ TEST_SYNC_POINT("ParallelValidateTest::AfterExternalPut");
+ });
+
+ commit_thread.join();
+ external_thread.join();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ ASSERT_OK(external_put);
+ ASSERT_TRUE(commit_status.IsBusy());
+
+ std::string value;
+ ASSERT_OK(txn_db->Get(read_options, "foo", &value));
+ ASSERT_EQ(value, "external");
+
+ delete txn;
+}
+
TEST_P(OptimisticTransactionTest, WriteConflictTest3) {
ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar"));
Metadata
Metadata
Assignees
Labels
No labels