Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ AgentServer::AgentServer(ExecEnv* exec_env,
TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
_exec_env,
master_info);
_update_tablet_meta_info_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
_exec_env,
master_info);
#ifndef BE_TEST
_create_tablet_workers->start();
_drop_tablet_workers->start();
Expand All @@ -173,6 +177,7 @@ AgentServer::AgentServer(ExecEnv* exec_env,
_release_snapshot_workers->start();
_move_dir_workers->start();
_recover_tablet_workers->start();
_update_tablet_meta_info_workers->start();
// Add subscriber here and register listeners
TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
LOG(INFO) << "Register user resource listener";
Expand Down Expand Up @@ -238,6 +243,10 @@ AgentServer::~AgentServer() {
if (_recover_tablet_workers != NULL) {
delete _recover_tablet_workers;
}

if (_update_tablet_meta_info_workers != NULL) {
delete _update_tablet_meta_info_workers;
}
if (_release_snapshot_workers != NULL) {
delete _release_snapshot_workers;
}
Expand Down Expand Up @@ -389,6 +398,13 @@ void AgentServer::submit_tasks(
status_code = TStatusCode::ANALYSIS_ERROR;
}
break;
case TTaskType::UPDATE_TABLET_META_INFO:
if (task.__isset.update_tablet_meta_info_req) {
_update_tablet_meta_info_workers->submit_task(task);
} else {
status_code = TStatusCode::ANALYSIS_ERROR;
}
break;
default:
status_code = TStatusCode::ANALYSIS_ERROR;
break;
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class AgentServer {
TaskWorkerPool* _release_snapshot_workers;
TaskWorkerPool* _move_dir_workers;
TaskWorkerPool* _recover_tablet_workers;
TaskWorkerPool* _update_tablet_meta_info_workers;

DISALLOW_COPY_AND_ASSIGN(AgentServer);

Expand Down
57 changes: 57 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ void TaskWorkerPool::start() {
_worker_count = 1;
_callback_function = _recover_tablet_thread_callback;
break;
case TaskWorkerType::UPDATE_TABLET_META_INFO:
_worker_count = 1;
_callback_function = _update_tablet_meta_worker_thread_callback;
break;
default:
// pass
break;
Expand Down Expand Up @@ -949,6 +953,59 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
return (void*)0;
}

void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this) {

TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this;
while (true) {
TAgentTaskRequest agent_task_req;
TUpdateTabletMetaInfoReq update_tablet_meta_req;
{
lock_guard<Mutex> worker_thread_lock(worker_pool_this->_worker_thread_lock);
while (worker_pool_this->_tasks.empty()) {
worker_pool_this->_worker_thread_condition_lock.wait();
}

agent_task_req = worker_pool_this->_tasks.front();
update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req;
worker_pool_this->_tasks.pop_front();
}
LOG(INFO) << "get update tablet meta task, signature:" << agent_task_req.signature;

TStatusCode::type status_code = TStatusCode::OK;
vector<string> error_msgs;
TStatus task_status;

for (auto tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
tablet_meta_info.tablet_id, tablet_meta_info.schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "could not find tablet when update partition id"
<< " tablet_id=" << tablet_meta_info.tablet_id
<< " schema_hash=" << tablet_meta_info.schema_hash;
continue;
}
WriteLock wrlock(tablet->get_header_lock_ptr());
tablet->set_partition_id(tablet_meta_info.partition_id);
tablet->save_meta();
}

LOG(INFO) << "finish update tablet meta task. signature:" << agent_task_req.signature;

task_status.__set_status_code(status_code);
task_status.__set_error_msgs(error_msgs);

TFinishTaskRequest finish_task_request;
finish_task_request.__set_task_status(task_status);
finish_task_request.__set_backend(worker_pool_this->_backend);
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);

worker_pool_this->_finish_task(finish_task_request);
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
}
return (void*)0;
}

void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this;

Expand Down
4 changes: 3 additions & 1 deletion be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class TaskWorkerPool {
MAKE_SNAPSHOT,
RELEASE_SNAPSHOT,
MOVE,
RECOVER_TABLET
RECOVER_TABLET,
UPDATE_TABLET_META_INFO
};

typedef void* (*CALLBACK_FUNCTION)(void*);
Expand Down Expand Up @@ -111,6 +112,7 @@ class TaskWorkerPool {
static void* _release_snapshot_thread_callback(void* arg_this);
static void* _move_dir_thread_callback(void* arg_this);
static void* _recover_tablet_thread_callback(void* arg_this);
static void* _update_tablet_meta_worker_thread_callback(void* arg_this);

void _alter_tablet(
TaskWorkerPool* worker_pool_this,
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ void TabletManager::_build_tablet_info(TabletSharedPtr tablet, TTabletInfo* tabl
tablet->max_continuous_version_from_begining(&version, &v_hash);
tablet_info->version = version.second;
tablet_info->version_hash = v_hash;
tablet_info->__set_partition_id(tablet->partition_id());
}

void TabletManager::_build_tablet_stat() {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ class TabletManager {

std::vector<TabletSharedPtr> _shutdown_tablets;

// a map from partition id to
std::map<int64_t, vector<TabletSharedPtr>> partition_tablet_map;

DISALLOW_COPY_AND_ASSIGN(TabletManager);
};

Expand Down
15 changes: 12 additions & 3 deletions fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Table;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -108,14 +109,22 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
Set<Long> foundTabletsWithValidSchema,
Map<Long, TTabletInfo> foundTabletsWithInvalidSchema,
ListMultimap<TStorageMedium, Long> tabletMigrationMap,
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap) {
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
SetMultimap<Long, Integer> tabletWithoutPartitionId) {
long start = 0L;
readLock();
try {
LOG.info("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size());
start = System.currentTimeMillis();
for (TTablet backendTablet : backendTablets.values()) {
for (TTabletInfo tabletInfo : backendTablet.tablet_infos) {
if (!tabletInfo.isSetPartition_id() || tabletInfo.getPartition_id() < 1) {
tabletWithoutPartitionId.put(tabletInfo.getTablet_id(), tabletInfo.getSchema_hash());
}
}
}
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
// traverse replicas in meta with this backend
Expand Down
25 changes: 24 additions & 1 deletion fe/src/main/java/org/apache/doris/master/ReportHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.task.PushTask;
import org.apache.doris.task.RecoverTabletTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TMasterResult;
Expand All @@ -66,11 +67,13 @@
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTaskType;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.SetMultimap;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -260,6 +263,8 @@ private static void tabletReport(long backendId, Map<Long, TTablet> backendTable

// db id -> tablet id
ListMultimap<Long, Long> tabletRecoveryMap = LinkedListMultimap.create();

SetMultimap<Long, Integer> tabletWithoutPartitionId = HashMultimap.create();

// 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
Catalog.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap,
Expand All @@ -270,7 +275,9 @@ private static void tabletReport(long backendId, Map<Long, TTablet> backendTable
tabletMigrationMap,
transactionsToPublish,
transactionsToClear,
tabletRecoveryMap);
tabletRecoveryMap,
tabletWithoutPartitionId);


// 2. sync
sync(backendTablets, tabletSyncMap, backendId, backendReportVersion);
Expand All @@ -297,6 +304,9 @@ private static void tabletReport(long backendId, Map<Long, TTablet> backendTable
// 9. send force create replica task to be
// handleForceCreateReplica(createReplicaTasks, backendId, forceRecovery);

// 10. send set tablet partition info to backend
handleSetTabletMetaInfo(backendId, tabletWithoutPartitionId);

long end = System.currentTimeMillis();
LOG.info("tablet report from backend[{}] cost: {} ms", backendId, (end - start));
}
Expand Down Expand Up @@ -860,6 +870,19 @@ private static void handleForceCreateReplica(List<CreateReplicaTask> createRepli
AgentTaskExecutor.submit(batchTask);
}


private static void handleSetTabletMetaInfo(long backendId, SetMultimap<Long, Integer> tabletWithoutPartitionId) {

LOG.info("find [{}] tablets without partition id, try to set them", tabletWithoutPartitionId.size());
if (tabletWithoutPartitionId.size() < 1) {
return;
}
AgentBatchTask batchTask = new AgentBatchTask();
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId, tabletWithoutPartitionId);
batchTask.addTask(task);
AgentTaskExecutor.submit(batchTask);
}

private static void handleClearTransactions(ListMultimap<Long, Long> transactionsToClear, long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
for (Long transactionId : transactionsToClear.keySet()) {
Expand Down
10 changes: 10 additions & 0 deletions fe/src/main/java/org/apache/doris/task/AgentBatchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.thrift.TSnapshotRequest;
import org.apache.doris.thrift.TStorageMediumMigrateReq;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUpdateTabletMetaInfoReq;
import org.apache.doris.thrift.TUploadReq;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -307,6 +308,15 @@ private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
tAgentTaskRequest.setRecover_tablet_req(request);
return tAgentTaskRequest;
}
case UPDATE_TABLET_META_INFO: {
UpdateTabletMetaInfoTask updateTabletMetaInfoTask = (UpdateTabletMetaInfoTask) task;
TUpdateTabletMetaInfoReq request = updateTabletMetaInfoTask.toThrift();
if (LOG.isDebugEnabled()) {
LOG.debug(request.toString());
}
tAgentTaskRequest.setUpdate_tablet_meta_info_req(request);
return tAgentTaskRequest;
}
default:
LOG.debug("could not find task type for task [{}]", task);
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.task;

import java.util.List;
import java.util.Map;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.thrift.TTabletMetaInfo;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUpdateTabletMetaInfoReq;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;

public class UpdateTabletMetaInfoTask extends AgentTask {

private static final Logger LOG = LogManager.getLogger(ClearTransactionTask.class);

private SetMultimap<Long, Integer> tabletWithoutPartitionId;

public UpdateTabletMetaInfoTask(long backendId, SetMultimap<Long, Integer> tabletWithoutPartitionId) {
super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, -1L, -1L, -1L, -1L, -1L, backendId);
this.tabletWithoutPartitionId = tabletWithoutPartitionId;
}

public TUpdateTabletMetaInfoReq toThrift() {
TUpdateTabletMetaInfoReq updateTabletMetaInfoReq = new TUpdateTabletMetaInfoReq();
List<TTabletMetaInfo> metaInfos = Lists.newArrayList();
int tabletEntryNum = 0;
for (Map.Entry<Long, Integer> entry : tabletWithoutPartitionId.entries()) {
// add at most 10000 tablet meta during one sync to avoid too large task
if (tabletEntryNum > 10000) {
break;
}
TTabletMetaInfo metaInfo = new TTabletMetaInfo();
metaInfo.setTablet_id(entry.getKey());
metaInfo.setSchema_hash(entry.getValue());
TabletMeta tabletMeta = Catalog.getInstance().getTabletInvertedIndex().getTabletMeta(entry.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TabletMeta tabletMeta = Catalog.getInstance().getTabletInvertedIndex().getTabletMeta(entry.getKey());
TabletMeta tabletMeta = Catalog.getCurrentInvertedIndex().getTabletMeta(entry.getKey());

if (tabletMeta == null) {
LOG.warn("could not find tablet [{}] in meta ignore it", entry.getKey());
continue;
}
metaInfo.setPartition_id(tabletMeta.getPartitionId());
metaInfos.add(metaInfo);
++tabletEntryNum;
}
updateTabletMetaInfoReq.setTabletMetaInfos(metaInfos);
return updateTabletMetaInfoReq;
}
}
13 changes: 12 additions & 1 deletion gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ struct TRecoverTabletReq {
4: optional Types.TVersionHash version_hash
}

struct TTabletMetaInfo {
1: optional Types.TTabletId tablet_id
2: optional Types.TSchemaHash schema_hash
3: optional Types.TPartitionId partition_id
}

struct TUpdateTabletMetaInfoReq {
1: optional list<TTabletMetaInfo> tabletMetaInfos
}

struct TAgentTaskRequest {
1: required TAgentServiceVersion protocol_version
2: required Types.TTaskType task_type
Expand All @@ -237,7 +247,8 @@ struct TAgentTaskRequest {
22: optional TMoveDirReq move_dir_req
23: optional TRecoverTabletReq recover_tablet_req
24: optional TAlterTabletReqV2 alter_tablet_req_v2
25: optional i64 recv_time; // time the task is inserted to queue
25: optional i64 recv_time // time the task is inserted to queue
26: optional TUpdateTabletMetaInfoReq update_tablet_meta_info_req
}

struct TAgentResult {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/MasterService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct TTabletInfo {
10: optional i64 path_hash
11: optional bool version_miss
12: optional bool used
13: optional Types.TPartitionId partition_id
}

struct TFinishTaskRequest {
Expand Down
Loading