From 1990432c3c0ba71b8d79ab2589e6a0f5b193cd5f Mon Sep 17 00:00:00 2001 From: yiguolei Date: Tue, 16 Jul 2019 13:39:53 +0800 Subject: [PATCH 1/2] Add partition id to tablet meta in be --- be/src/agent/agent_server.cpp | 16 +++++ be/src/agent/agent_server.h | 1 + be/src/agent/task_worker_pool.cpp | 57 +++++++++++++++++ be/src/agent/task_worker_pool.h | 4 +- be/src/olap/tablet_manager.cpp | 1 + be/src/olap/tablet_manager.h | 3 + .../doris/catalog/TabletInvertedIndex.java | 15 ++++- .../apache/doris/master/ReportHandler.java | 26 +++++++- .../org/apache/doris/task/AgentBatchTask.java | 10 +++ .../doris/task/UpdateTabletMetaInfoTask.java | 63 +++++++++++++++++++ gensrc/thrift/AgentService.thrift | 13 +++- gensrc/thrift/MasterService.thrift | 1 + gensrc/thrift/Types.thrift | 3 +- 13 files changed, 206 insertions(+), 7 deletions(-) create mode 100644 fe/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 53b9dc4b60109e..f390990bfd8bc8 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -152,6 +152,10 @@ AgentServer::AgentServer(ExecEnv* exec_env, TaskWorkerPool::TaskWorkerType::RECOVER_TABLET, _exec_env, master_info); + _update_tablet_meta_info_workers = new TaskWorkerPool( + TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO, + _exec_env, + master_info); #ifndef BE_TEST _create_tablet_workers->start(); _drop_tablet_workers->start(); @@ -173,6 +177,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, _release_snapshot_workers->start(); _move_dir_workers->start(); _recover_tablet_workers->start(); + _update_tablet_meta_info_workers->start(); // Add subscriber here and register listeners TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info); LOG(INFO) << "Register user resource listener"; @@ -238,6 +243,10 @@ AgentServer::~AgentServer() { if (_recover_tablet_workers != NULL) { delete _recover_tablet_workers; } + + if (_update_tablet_meta_info_workers != NULL) { + delete _update_tablet_meta_info_workers; + } if (_release_snapshot_workers != NULL) { delete _release_snapshot_workers; } @@ -389,6 +398,13 @@ void AgentServer::submit_tasks( status_code = TStatusCode::ANALYSIS_ERROR; } break; + case TTaskType::UPDATE_TABLET_META_INFO: + if (task.__isset.update_tablet_meta_info_req) { + _update_tablet_meta_info_workers->submit_task(task); + } else { + status_code = TStatusCode::ANALYSIS_ERROR; + } + break; default: status_code = TStatusCode::ANALYSIS_ERROR; break; diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index aa74c075123425..b346699e52d5b1 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -113,6 +113,7 @@ class AgentServer { TaskWorkerPool* _release_snapshot_workers; TaskWorkerPool* _move_dir_workers; TaskWorkerPool* _recover_tablet_workers; + TaskWorkerPool* _update_tablet_meta_info_workers; DISALLOW_COPY_AND_ASSIGN(AgentServer); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 1f635ff201fb51..60357739c91327 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -197,6 +197,10 @@ void TaskWorkerPool::start() { _worker_count = 1; _callback_function = _recover_tablet_thread_callback; break; + case TaskWorkerType::UPDATE_TABLET_META_INFO: + _worker_count = 1; + _callback_function = _update_tablet_meta_worker_thread_callback; + break; default: // pass break; @@ -949,6 +953,59 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t return (void*)0; } +void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this) { + + TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; + while (true) { + TAgentTaskRequest agent_task_req; + TUpdateTabletMetaInfoReq update_tablet_meta_req; + { + lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); + while (worker_pool_this->_tasks.empty()) { + worker_pool_this->_worker_thread_condition_lock.wait(); + } + + agent_task_req = worker_pool_this->_tasks.front(); + update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; + worker_pool_this->_tasks.pop_front(); + } + LOG(INFO) << "get update tablet meta task, signature:" << agent_task_req.signature; + + TStatusCode::type status_code = TStatusCode::OK; + vector error_msgs; + TStatus task_status; + + for (auto tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( + tablet_meta_info.tablet_id, tablet_meta_info.schema_hash); + if (tablet == nullptr) { + LOG(WARNING) << "could not find tablet when update partition id" + << " tablet_id=" << tablet_meta_info.tablet_id + << " schema_hash=" << tablet_meta_info.schema_hash; + continue; + } + WriteLock wrlock(tablet->get_header_lock_ptr()); + tablet->set_partition_id(tablet_meta_info.partition_id); + tablet->save_meta(); + } + + LOG(INFO) << "finish update tablet meta task. signature:" << agent_task_req.signature; + + task_status.__set_status_code(status_code); + task_status.__set_error_msgs(error_msgs); + + TFinishTaskRequest finish_task_request; + finish_task_request.__set_task_status(task_status); + finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_task_type(agent_task_req.task_type); + finish_task_request.__set_signature(agent_task_req.signature); + + worker_pool_this->_finish_task(finish_task_request); + worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, ""); + } + return (void*)0; +} + void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index ac55859a76654d..29bfa7cc2466ed 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -62,7 +62,8 @@ class TaskWorkerPool { MAKE_SNAPSHOT, RELEASE_SNAPSHOT, MOVE, - RECOVER_TABLET + RECOVER_TABLET, + UPDATE_TABLET_META_INFO }; typedef void* (*CALLBACK_FUNCTION)(void*); @@ -111,6 +112,7 @@ class TaskWorkerPool { static void* _release_snapshot_thread_callback(void* arg_this); static void* _move_dir_thread_callback(void* arg_this); static void* _recover_tablet_thread_callback(void* arg_this); + static void* _update_tablet_meta_worker_thread_callback(void* arg_this); void _alter_tablet( TaskWorkerPool* worker_pool_this, diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index c828990c473334..25f23dc2496f3b 100755 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -1124,6 +1124,7 @@ void TabletManager::_build_tablet_info(TabletSharedPtr tablet, TTabletInfo* tabl tablet->max_continuous_version_from_begining(&version, &v_hash); tablet_info->version = version.second; tablet_info->version_hash = v_hash; + tablet_info->__set_partition_id(tablet->partition_id()); } void TabletManager::_build_tablet_stat() { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index d2589882490d2d..6394ace69a5530 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -196,6 +196,9 @@ class TabletManager { std::vector _shutdown_tablets; + // a map from partition id to + std::map> partition_tablet_map; + DISALLOW_COPY_AND_ASSIGN(TabletManager); }; diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index b0bc6b4fef48cc..d140f8027701b6 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -34,6 +34,7 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; @@ -108,14 +109,22 @@ public void tabletReport(long backendId, Map backendTablets, Set foundTabletsWithValidSchema, Map foundTabletsWithInvalidSchema, ListMultimap tabletMigrationMap, - Map> transactionsToPublish, - ListMultimap transactionsToClear, - ListMultimap tabletRecoveryMap) { + Map> transactionsToPublish, + ListMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + SetMultimap tabletWithoutPartitionId) { long start = 0L; readLock(); try { LOG.info("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size()); start = System.currentTimeMillis(); + for (TTablet backendTablet : backendTablets.values()) { + for (TTabletInfo tabletInfo : backendTablet.tablet_infos) { + if (!tabletInfo.isSetPartition_id() || tabletInfo.getPartition_id() < 1) { + tabletWithoutPartitionId.put(tabletInfo.getTablet_id(), tabletInfo.getSchema_hash()); + } + } + } Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { // traverse replicas in meta with this backend diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index d8295b43e31d53..1fe7023fbbeb9b 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -52,6 +52,7 @@ import org.apache.doris.task.PushTask; import org.apache.doris.task.RecoverTabletTask; import org.apache.doris.task.StorageMediaMigrationTask; +import org.apache.doris.task.UpdateTabletMetaInfoTask; import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TMasterResult; @@ -66,11 +67,13 @@ import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TTaskType; +import com.google.common.collect.HashMultimap; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import com.google.common.collect.SetMultimap; import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; @@ -260,6 +263,8 @@ private static void tabletReport(long backendId, Map backendTable // db id -> tablet id ListMultimap tabletRecoveryMap = LinkedListMultimap.create(); + + SetMultimap tabletWithoutPartitionId = HashMultimap.create(); // 1. do the diff. find out (intersection) / (be - meta) / (meta - be) Catalog.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap, @@ -270,7 +275,9 @@ private static void tabletReport(long backendId, Map backendTable tabletMigrationMap, transactionsToPublish, transactionsToClear, - tabletRecoveryMap); + tabletRecoveryMap, + tabletWithoutPartitionId); + // 2. sync sync(backendTablets, tabletSyncMap, backendId, backendReportVersion); @@ -297,6 +304,9 @@ private static void tabletReport(long backendId, Map backendTable // 9. send force create replica task to be // handleForceCreateReplica(createReplicaTasks, backendId, forceRecovery); + // 10. send set tablet partition info to backend + handleSetTabletMetaInfo(backendId, tabletWithoutPartitionId); + long end = System.currentTimeMillis(); LOG.info("tablet report from backend[{}] cost: {} ms", backendId, (end - start)); } @@ -860,6 +870,20 @@ private static void handleForceCreateReplica(List createRepli AgentTaskExecutor.submit(batchTask); } + + private static void handleSetTabletMetaInfo(long backendId, SetMultimap tabletWithoutPartitionId) { + + LOG.info("find [{}] tablets without partition id, try to set them", tabletWithoutPartitionId.size()); + if (tabletWithoutPartitionId.size() < 1) { + return; + } + AgentBatchTask batchTask = new AgentBatchTask(); + UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId, tabletWithoutPartitionId); + AgentTaskQueue.addTask(task); + batchTask.addTask(task); + AgentTaskExecutor.submit(batchTask); + } + private static void handleClearTransactions(ListMultimap transactionsToClear, long backendId) { AgentBatchTask batchTask = new AgentBatchTask(); for (Long transactionId : transactionsToClear.keySet()) { diff --git a/fe/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/src/main/java/org/apache/doris/task/AgentBatchTask.java index 1a2597b972ae2e..6f09c3016f4099 100644 --- a/fe/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -41,6 +41,7 @@ import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStorageMediumMigrateReq; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TUpdateTabletMetaInfoReq; import org.apache.doris.thrift.TUploadReq; import org.apache.logging.log4j.LogManager; @@ -307,6 +308,15 @@ private TAgentTaskRequest toAgentTaskRequest(AgentTask task) { tAgentTaskRequest.setRecover_tablet_req(request); return tAgentTaskRequest; } + case UPDATE_TABLET_META_INFO: { + UpdateTabletMetaInfoTask updateTabletMetaInfoTask = (UpdateTabletMetaInfoTask) task; + TUpdateTabletMetaInfoReq request = updateTabletMetaInfoTask.toThrift(); + if (LOG.isDebugEnabled()) { + LOG.debug(request.toString()); + } + tAgentTaskRequest.setUpdate_tablet_meta_info_req(request); + return tAgentTaskRequest; + } default: LOG.debug("could not find task type for task [{}]", task); return null; diff --git a/fe/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java new file mode 100644 index 00000000000000..f08b4a380b3224 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import java.util.List; +import java.util.Map; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.thrift.TTabletMetaInfo; +import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TUpdateTabletMetaInfoReq; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; + +public class UpdateTabletMetaInfoTask extends AgentTask { + + private static final Logger LOG = LogManager.getLogger(ClearTransactionTask.class); + + private SetMultimap tabletWithoutPartitionId; + + public UpdateTabletMetaInfoTask(long backendId, SetMultimap tabletWithoutPartitionId) { + super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, -1L, -1L, -1L, -1L, -1L, backendId); + this.tabletWithoutPartitionId = tabletWithoutPartitionId; + } + + public TUpdateTabletMetaInfoReq toThrift() { + TUpdateTabletMetaInfoReq updateTabletMetaInfoReq = new TUpdateTabletMetaInfoReq(); + List metaInfos = Lists.newArrayList(); + for (Map.Entry entry : tabletWithoutPartitionId.entries()) { + TTabletMetaInfo metaInfo = new TTabletMetaInfo(); + metaInfo.setTablet_id(entry.getKey()); + metaInfo.setSchema_hash(entry.getValue()); + TabletMeta tabletMeta = Catalog.getInstance().getTabletInvertedIndex().getTabletMeta(entry.getKey()); + if (tabletMeta == null) { + LOG.warn("could not find tablet [{}] in meta ignore it", entry.getKey()); + continue; + } + metaInfo.setPartition_id(tabletMeta.getPartitionId()); + metaInfos.add(metaInfo); + } + updateTabletMetaInfoReq.setTabletMetaInfos(metaInfos); + return updateTabletMetaInfoReq; + } +} \ No newline at end of file diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 2d71ca4965881d..e1029b036d9aab 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -212,6 +212,16 @@ struct TRecoverTabletReq { 4: optional Types.TVersionHash version_hash } +struct TTabletMetaInfo { + 1: optional Types.TTabletId tablet_id + 2: optional Types.TSchemaHash schema_hash + 3: optional Types.TPartitionId partition_id +} + +struct TUpdateTabletMetaInfoReq { + 1: optional list tabletMetaInfos +} + struct TAgentTaskRequest { 1: required TAgentServiceVersion protocol_version 2: required Types.TTaskType task_type @@ -237,7 +247,8 @@ struct TAgentTaskRequest { 22: optional TMoveDirReq move_dir_req 23: optional TRecoverTabletReq recover_tablet_req 24: optional TAlterTabletReqV2 alter_tablet_req_v2 - 25: optional i64 recv_time; // time the task is inserted to queue + 25: optional i64 recv_time // time the task is inserted to queue + 26: optional TUpdateTabletMetaInfoReq update_tablet_meta_info_req } struct TAgentResult { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 9699719db91b55..4489740d245eca 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -36,6 +36,7 @@ struct TTabletInfo { 10: optional i64 path_hash 11: optional bool version_miss 12: optional bool used + 13: optional Types.TPartitionId partition_id } struct TFinishTaskRequest { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index bf868c13455e1f..567bc93d59556d 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -158,7 +158,8 @@ enum TTaskType { CLEAR_ALTER_TASK, CLEAR_TRANSACTION_TASK, RECOVER_TABLET, - STREAM_LOAD + STREAM_LOAD, + UPDATE_TABLET_META_INFO } enum TStmtType { From 61c247c8ba57d692a3d05eb82a6695c23407f917 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Wed, 17 Jul 2019 10:02:38 +0800 Subject: [PATCH 2/2] Sync at most 10k tablets during set tablet meta --- fe/src/main/java/org/apache/doris/master/ReportHandler.java | 1 - .../org/apache/doris/task/UpdateTabletMetaInfoTask.java | 6 ++++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index 1fe7023fbbeb9b..4a70d9fc45762a 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -879,7 +879,6 @@ private static void handleSetTabletMetaInfo(long backendId, SetMultimap table public TUpdateTabletMetaInfoReq toThrift() { TUpdateTabletMetaInfoReq updateTabletMetaInfoReq = new TUpdateTabletMetaInfoReq(); List metaInfos = Lists.newArrayList(); + int tabletEntryNum = 0; for (Map.Entry entry : tabletWithoutPartitionId.entries()) { + // add at most 10000 tablet meta during one sync to avoid too large task + if (tabletEntryNum > 10000) { + break; + } TTabletMetaInfo metaInfo = new TTabletMetaInfo(); metaInfo.setTablet_id(entry.getKey()); metaInfo.setSchema_hash(entry.getValue()); @@ -56,6 +61,7 @@ public TUpdateTabletMetaInfoReq toThrift() { } metaInfo.setPartition_id(tabletMeta.getPartitionId()); metaInfos.add(metaInfo); + ++tabletEntryNum; } updateTabletMetaInfoReq.setTabletMetaInfos(metaInfos); return updateTabletMetaInfoReq;