Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,9 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) {
} else {
++_s_report_version;
// get path hash of the created tablet
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash);
BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->
get_base_tablet(create_tablet_req.tablet_id,
create_tablet_req.tablet_schema.schema_hash);
DCHECK(tablet != nullptr);
TTabletInfo tablet_info;
tablet_info.tablet_id = tablet->table_id();
Expand Down Expand Up @@ -399,8 +400,8 @@ void* TaskWorkerPool::_drop_tablet_worker_thread_callback(void* arg_this) {
TStatusCode::type status_code = TStatusCode::OK;
vector<string> error_msgs;
TStatus task_status;
TabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
drop_tablet_req.tablet_id, drop_tablet_req.schema_hash);
BaseTabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->
get_base_tablet(drop_tablet_req.tablet_id, drop_tablet_req.schema_hash);
if (dropped_tablet != nullptr) {
OLAPStatus drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet(
drop_tablet_req.tablet_id, drop_tablet_req.schema_hash);
Expand Down Expand Up @@ -827,8 +828,8 @@ void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this)
TStatus task_status;

for (auto tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
tablet_meta_info.tablet_id, tablet_meta_info.schema_hash);
BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->
get_base_tablet(tablet_meta_info.tablet_id, tablet_meta_info.schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "could not find tablet when update partition id"
<< " tablet_id=" << tablet_meta_info.tablet_id
Expand Down
5 changes: 2 additions & 3 deletions be/src/http/action/meta_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ Status MetaAction::_handle_header(HttpRequest* req, std::string* json_meta) {
<< ", schema_hash:" << req_schema_hash;
return Status::InternalError(strings::Substitute("convert failed, $0", e.what()));
}

TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_base_tablet(
tablet_id, schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema hash:" << schema_hash;
return Status::InternalError("no tablet exist");
Expand Down
30 changes: 29 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,31 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
: _state(tablet_meta->tablet_state()),
_tablet_meta(tablet_meta),
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir) {
_data_dir(data_dir),
_is_bad(false) {
_gen_tablet_path();
}

BaseTablet::~BaseTablet() {}


OLAPStatus BaseTablet::init() {
return _init_once.call([this] { return _init_once_action(); });
}

// should save tablet meta to remote meta store
// if it's a primary replica
void BaseTablet::save_meta() {
auto res = _tablet_meta->save_meta(_data_dir);
CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res
<< ", root=" << _data_dir->path();
// User could directly update tablet schema by _tablet_meta,
// So we need to refetch schema again
_schema = _tablet_meta->tablet_schema();
// TODO: update _mem_schema too?
}


OLAPStatus BaseTablet::set_tablet_state(TabletState state) {
if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) {
LOG(WARNING) << "could not change tablet state from shutdown to " << state;
Expand All @@ -52,4 +71,13 @@ void BaseTablet::_gen_tablet_path() {
}
}

OLAPStatus BaseTablet::set_partition_id(int64_t partition_id) {
return _tablet_meta->set_partition_id(partition_id);
}

TabletInfo BaseTablet::get_tablet_info() const {
return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
}


} /* namespace doris */
138 changes: 138 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@

#include <memory>

#include "gen_cpp/AgentService_types.h"
#include "gen_cpp/MasterService_types.h"
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "olap/utils.h"
#include "util/once.h"

namespace doris {

class DataDir;
class BaseTablet;
using BaseTabletSharedPtr = std::shared_ptr<BaseTablet>;

// Base class for all tablet classes, currently only olap/Tablet and
// olap/memory/MemTablet.
Expand Down Expand Up @@ -60,11 +66,54 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
inline void set_creation_time(int64_t creation_time);
inline bool equal(int64_t tablet_id, int32_t schema_hash);

OLAPStatus init();
inline bool init_succeeded();

bool is_used();

void save_meta();

void register_tablet_into_dir();
void deregister_tablet_from_dir();


// properties encapsulated in TabletSchema
inline const TabletSchema& tablet_schema() const;
inline size_t tablet_footprint(); // disk space occupied by tablet
inline size_t num_rows();
inline int version_count() const;
inline Version max_version() const;

// propreties encapsulated in TabletSchema
inline KeysType keys_type() const;
inline size_t num_columns() const;
inline size_t num_null_columns() const;
inline size_t num_key_columns() const;
inline size_t num_short_key_columns() const;
inline size_t num_rows_per_row_block() const;
inline CompressKind compress_kind() const;
inline double bloom_filter_fpp() const;
inline size_t next_unique_id() const;
inline size_t row_size() const;
inline size_t field_index(const string& field_name) const;

OLAPStatus set_partition_id(int64_t partition_id);

TabletInfo get_tablet_info() const;

// meta lock
inline void obtain_header_rdlock() { _meta_lock.rdlock(); }
inline void obtain_header_wrlock() { _meta_lock.wrlock(); }
inline void release_header_lock() { _meta_lock.unlock(); }
inline RWMutex* get_header_lock_ptr() { return &_meta_lock; }

virtual void build_tablet_report_info(TTabletInfo* tablet_info) = 0;

virtual void delete_all_files() = 0;

protected:
void _gen_tablet_path();
virtual OLAPStatus _init_once_action() = 0;

protected:
TabletState _state;
Expand All @@ -74,6 +123,13 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
DataDir* _data_dir;
std::string _tablet_path;

DorisCallOnce<OLAPStatus> _init_once;
// TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to
// explain how these two locks work together.
mutable RWMutex _meta_lock;
// if this tablet is broken, set to true. default is false
std::atomic<bool> _is_bad;

private:
DISALLOW_COPY_AND_ASSIGN(BaseTablet);
};
Expand Down Expand Up @@ -141,6 +197,88 @@ inline const TabletSchema& BaseTablet::tablet_schema() const {
return _schema;
}

inline bool BaseTablet::init_succeeded() {
return _init_once.has_called() && _init_once.stored_result() == OLAP_SUCCESS;
}

inline bool BaseTablet::is_used() {
return !_is_bad && _data_dir->is_used();
}

inline void BaseTablet::register_tablet_into_dir() {
_data_dir->register_tablet(this);
}

inline void BaseTablet::deregister_tablet_from_dir() {
_data_dir->deregister_tablet(this);
}

// TODO(lingbin): Why other methods that need to get information from _tablet_meta
// are not locked, here needs a comment to explain.
inline size_t BaseTablet::tablet_footprint() {
ReadLock rdlock(&_meta_lock);
return _tablet_meta->tablet_footprint();
}

// TODO(lingbin): Why other methods which need to get information from _tablet_meta
// are not locked, here needs a comment to explain.
inline size_t BaseTablet::num_rows() {
ReadLock rdlock(&_meta_lock);
return _tablet_meta->num_rows();
}

inline int BaseTablet::version_count() const {
return _tablet_meta->version_count();
}

inline Version BaseTablet::max_version() const {
return _tablet_meta->max_version();
}

inline KeysType BaseTablet::keys_type() const {
return _schema.keys_type();
}

inline size_t BaseTablet::num_columns() const {
return _schema.num_columns();
}

inline size_t BaseTablet::num_null_columns() const {
return _schema.num_null_columns();
}

inline size_t BaseTablet::num_key_columns() const {
return _schema.num_key_columns();
}

inline size_t BaseTablet::num_short_key_columns() const {
return _schema.num_short_key_columns();
}

inline size_t BaseTablet::num_rows_per_row_block() const {
return _schema.num_rows_per_row_block();
}

inline CompressKind BaseTablet::compress_kind() const {
return _schema.compress_kind();
}

inline double BaseTablet::bloom_filter_fpp() const {
return _schema.bloom_filter_fpp();
}

inline size_t BaseTablet::next_unique_id() const {
return _schema.next_column_unique_id();
}

inline size_t BaseTablet::field_index(const string& field_name) const {
return _schema.field_index(field_name);
}

inline size_t BaseTablet::row_size() const {
return _schema.row_size();
}

} /* namespace doris */

#endif /* DORIS_BE_SRC_OLAP_BASE_TABLET_H */
4 changes: 2 additions & 2 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,14 @@ OLAPStatus DataDir::get_shard(uint64_t* shard) {
return OLAP_SUCCESS;
}

void DataDir::register_tablet(Tablet* tablet) {
void DataDir::register_tablet(BaseTablet* tablet) {
TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid());

std::lock_guard<std::mutex> l(_mutex);
_tablet_set.emplace(std::move(tablet_info));
}

void DataDir::deregister_tablet(Tablet* tablet) {
void DataDir::deregister_tablet(BaseTablet* tablet) {
TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid());

std::lock_guard<std::mutex> l(_mutex);
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

namespace doris {

class BaseTablet;
class Tablet;
class TabletManager;
class TabletMeta;
Expand Down Expand Up @@ -80,8 +81,8 @@ class DataDir {

TStorageMedium::type storage_medium() const { return _storage_medium; }

void register_tablet(Tablet* tablet);
void deregister_tablet(Tablet* tablet);
void register_tablet(BaseTablet* tablet);
void deregister_tablet(BaseTablet* tablet);
void clear_tablets(std::vector<TabletInfo>* tablet_infos);

std::string get_absolute_shard_path(int64_t shard_id);
Expand Down
34 changes: 32 additions & 2 deletions be/src/olap/memory/mem_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ std::shared_ptr<MemTablet> MemTablet::create_tablet_from_meta(TabletMetaSharedPt
return std::make_shared<MemTablet>(tablet_meta, data_dir);
}

Status MemTablet::init() {
OLAPStatus MemTablet::_init_once_action() {
_max_version = 0;
return MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet);
Status ret = MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet);
if (ret.ok()) {
return OLAP_SUCCESS;
} else {
// TODO: Status/OLAPStatus compatibility
return OLAP_ERR_INIT_FAILED;
}
}


Status MemTablet::scan(std::unique_ptr<ScanSpec>* spec, std::unique_ptr<MemTabletScan>* scan) {
uint64_t version = (*spec)->version();
if (version == UINT64_MAX) {
Expand Down Expand Up @@ -86,5 +93,28 @@ Status MemTablet::commit_write_txn(WriteTxn* wtxn, uint64_t version) {
return Status::OK();
}

void MemTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
ReadLock rdlock(&_meta_lock);
tablet_info->tablet_id = _tablet_meta->tablet_id();
tablet_info->schema_hash = _tablet_meta->schema_hash();
tablet_info->row_count = _tablet_meta->num_rows();
tablet_info->data_size = _tablet_meta->tablet_footprint();
tablet_info->version = _max_version;
tablet_info->version_hash = 0;
tablet_info->__set_partition_id(_tablet_meta->partition_id());
tablet_info->__set_storage_medium(_data_dir->storage_medium());
tablet_info->__set_version_count(_tablet_meta->version_count());
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory());
tablet_info->__set_tablet_type(_tablet_meta->tablet_type() == TabletTypePB::TABLET_TYPE_DISK ?
TTabletType::TABLET_TYPE_DISK : TTabletType::TABLET_TYPE_MEMORY);
}

void MemTablet::delete_all_files() {
// TODO:
}



} // namespace memory
} // namespace doris
19 changes: 16 additions & 3 deletions be/src/olap/memory/mem_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ class MemSubTablet;
class ScanSpec;
class MemTabletScan;
class WriteTxn;
class MemTablet;
using MemTabletSharedPtr = std::shared_ptr<MemTablet>;

inline MemTabletSharedPtr to_mem_tablet(const BaseTabletSharedPtr& base) {
if (base->is_memory()) {
return std::static_pointer_cast<MemTablet>(base);
}
return MemTabletSharedPtr();
}

// Tablet class for memory-optimized storage engine.
//
Expand All @@ -49,9 +58,6 @@ class MemTablet : public BaseTablet {

virtual ~MemTablet();

// Initialize
Status init();

// Scan the tablet, return a MemTabletScan object scan, user can specify projections
// using ScanSpec, currently only support full scan with projection, will support
// filter/aggregation in the future.
Expand All @@ -70,6 +76,13 @@ class MemTablet : public BaseTablet {
// Note: commit is done sequentially, protected by internal write lock
Status commit_write_txn(WriteTxn* wtxn, uint64_t version);

virtual void build_tablet_report_info(TTabletInfo* tablet_info);

virtual void delete_all_files();

protected:
virtual OLAPStatus _init_once_action();

private:
friend class MemTabletScan;
// memory::Schema is used internally rather than TabletSchema, so we need an extra
Expand Down
Loading