Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ OLAPStatus CumulativeCompaction::_calculate_need_merged_versions() {
Versions delta_versions;
OLAPStatus res = _get_delta_versions(&delta_versions);
if (res != OLAP_SUCCESS) {
LOG(INFO) << "failed to get delta versions.";
LOG(INFO) << "failed to get delta versions. res=" << res;
return res;
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ OLAPStatus DeltaWriter::init() {
}

{
ReadLock base_migration_rlock(_tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!base_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
MutexLock push_lock(_tablet->get_push_lock());
RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(
_req.partition_id, _req.txn_id,
Expand All @@ -99,6 +103,10 @@ OLAPStatus DeltaWriter::init() {
<< ", schema_hash: " << new_schema_hash;
return OLAP_ERR_TABLE_NOT_FOUND;
}
ReadLock new_migration_rlock(_new_tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!new_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
StorageEngine::instance()->txn_manager()->prepare_txn(
_req.partition_id, _req.txn_id,
new_tablet_id, new_schema_hash, _new_tablet->tablet_uid(), _req.load_id);
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
if (tablet == nullptr) {
return OLAP_ERR_TABLE_NOT_FOUND;
}
ReadLock base_migration_rlock(tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!base_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
tablet->obtain_push_lock();
PUniqueId load_id;
load_id.set_hi(0);
Expand Down Expand Up @@ -130,6 +134,10 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
<< "tablet=" << tablet->full_name()
<< " related_tablet=" << related_tablet->full_name();
} else {
ReadLock new_migration_rlock(related_tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!new_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,15 @@ OLAPStatus SchemaChangeHandler::process_alter_tablet(AlterTabletType type,
return OLAP_ERR_TABLE_CREATE_META_ERROR;
}

ReadLock base_migration_rlock(base_tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!base_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
ReadLock new_migration_rlock(new_tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!new_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}

base_tablet->obtain_push_lock();
base_tablet->obtain_header_wrlock();
new_tablet->obtain_header_wrlock();
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,11 +572,10 @@ void StorageEngine::perform_cumulative_compaction(DataDir* data_dir) {
if (res != OLAP_SUCCESS) {
if (res != OLAP_ERR_CUMULATIVE_REPEAT_INIT && res != OLAP_ERR_CE_TRY_CE_LOCK_ERROR) {
best_tablet->set_last_compaction_failure_time(UnixMillis());
LOG(WARNING) << "failed to init cumulative compaction"
<< ", table=" << best_tablet->full_name()
<< ", res=" << res;

if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
LOG(WARNING) << "failed to init cumulative compaction"
<< ", table=" << best_tablet->full_name()
<< ", res=" << res;
DorisMetrics::cumulative_compaction_request_failed.increment(1);
}
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
inline void obtain_cumulative_lock() { _cumulative_lock.lock(); }
inline void release_cumulative_lock() { _cumulative_lock.unlock(); }

inline RWMutex* get_migration_lock_ptr() { return &_migration_lock; }

// operation for compaction
bool can_do_compaction();
const uint32_t calc_cumulative_compaction_score() const;
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ OLAPStatus EngineCloneTask::execute() {
bool is_new_tablet = tablet == nullptr;
// try to repair a tablet with missing version
if (tablet != nullptr) {
ReadLock migration_rlock(tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
LOG(INFO) << "clone tablet exist yet, begin to incremental clone. "
<< "signature:" << _signature
<< ", tablet_id:" << _clone_req.tablet_id
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
return OLAP_SUCCESS;
}

WriteLock migration_wlock(tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!migration_wlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}

int64_t partition_id;
std::set<int64_t> transaction_ids;
StorageEngine::instance()->txn_manager()->get_tablet_related_txns(tablet->tablet_id(),
tablet->schema_hash(), tablet->tablet_uid(), &partition_id, &transaction_ids);
if (transaction_ids.size() > 0) {
LOG(WARNING) << "could not migration because has unfinished txns, "
<< " tablet=" << tablet->full_name();
return OLAP_ERR_HEADER_HAS_PENDING_DATA;
}

tablet->obtain_push_lock();

// TODO(ygl): the tablet should not under schema change or rollup or load
Expand Down
23 changes: 0 additions & 23 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,29 +444,6 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
return found;
}


bool TxnManager::has_committed_txn(TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {

pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
ReadLock rdlock(_get_txn_lock(transaction_id));
ReadLock txn_rdlock(&_txn_map_lock);
auto it = _txn_tablet_map.find(key);
if (it != _txn_tablet_map.end()) {
auto load_itr = it->second.find(tablet_info);
if (load_itr != it->second.end()) {
// found load for txn,tablet
// case 1: user commit rowset, then the load id must be equal
TabletTxnInfo& load_info = load_itr->second;
if (load_info.rowset != nullptr) {
return true;
}
}
}
return false;
}

bool TxnManager::get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid,
std::vector<int64_t>* transaction_ids) {
if (transaction_ids == nullptr) {
Expand Down
4 changes: 0 additions & 4 deletions be/src/olap/txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ class TxnManager {
// just check if the txn exists
bool has_txn(TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid);

// check if the txn exists and has related rowset
bool has_committed_txn(TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid);

bool get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, std::vector<int64_t>* transaction_ids);

Expand Down
26 changes: 14 additions & 12 deletions be/src/olap/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include "olap/olap_common.h"
#include "olap/olap_define.h"

#define TRY_LOCK true

namespace doris {
void write_log_info(char* buf, size_t buf_len, const char* fmt, ...);

Expand Down Expand Up @@ -246,25 +248,25 @@ class RWMutex {
class ReadLock {
public:
explicit ReadLock(RWMutex* mutex, bool try_lock = false)
: _mutex(mutex), own_lock(false) {
: _mutex(mutex), locked(false) {
if (try_lock) {
own_lock = this->_mutex->tryrdlock() == OLAP_SUCCESS;
locked = this->_mutex->tryrdlock() == OLAP_SUCCESS;
} else {
this->_mutex->rdlock();
own_lock = true;
locked = true;
}
}
~ReadLock() {
if (own_lock) {
if (locked) {
this->_mutex->unlock();
}
}

bool has_own_lock() { return own_lock; }
bool own_lock() { return locked; }

private:
RWMutex* _mutex;
bool own_lock;
bool locked;
DISALLOW_COPY_AND_ASSIGN(ReadLock);
};

Expand All @@ -276,25 +278,25 @@ class ReadLock {
class WriteLock {
public:
explicit WriteLock(RWMutex* mutex, bool try_lock = false)
: _mutex(mutex), own_lock(false) {
: _mutex(mutex), locked(false) {
if (try_lock) {
own_lock = this->_mutex->trywrlock() == OLAP_SUCCESS;
locked = this->_mutex->trywrlock() == OLAP_SUCCESS;
} else {
this->_mutex->wrlock();
own_lock = true;
locked = true;
}
}
~WriteLock() {
if (own_lock) {
if (locked) {
this->_mutex->unlock();
}
}

bool has_own_lock() { return own_lock; }
bool own_lock() { return locked; }

private:
RWMutex* _mutex;
bool own_lock;
bool locked;
DISALLOW_COPY_AND_ASSIGN(WriteLock);
};

Expand Down