Skip to content

Commit 4b7d8a7

Browse files
author
zhangshixin.1024
committed
[Feature] doris cross-cluster query
1 parent 741a012 commit 4b7d8a7

File tree

42 files changed

+1275
-74
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1275
-74
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import com.google.common.base.Joiner;
9696
import com.google.common.base.Preconditions;
9797
import com.google.common.base.Strings;
98+
import com.google.common.collect.ImmutableMap;
9899
import com.google.common.collect.Lists;
99100
import com.google.common.collect.Maps;
100101
import com.google.common.collect.Range;
@@ -182,10 +183,10 @@ public enum OlapTableState {
182183
private PartitionInfo partitionInfo;
183184
@SerializedName(value = "itp", alternate = {"idToPartition"})
184185
@Getter
185-
private ConcurrentHashMap<Long, Partition> idToPartition = new ConcurrentHashMap<>();
186+
protected ConcurrentHashMap<Long, Partition> idToPartition = new ConcurrentHashMap<>();
186187
// handled in postgsonprocess
187188
@Getter
188-
private Map<String, Partition> nameToPartition = Maps.newTreeMap();
189+
protected Map<String, Partition> nameToPartition = Maps.newTreeMap();
189190

190191
@SerializedName(value = "di", alternate = {"distributionInfo"})
191192
private DistributionInfo defaultDistributionInfo;
@@ -3698,4 +3699,48 @@ public Index getInvertedIndex(Column column, List<String> subPath) {
36983699
.filter(Index::isAnalyzedInvertedIndex).findFirst().orElse(null);
36993700
}
37003701
}
3702+
3703+
/**
3704+
* caller should acquire the read lock and should not modify any field of the return obj
3705+
*/
3706+
public OlapTable copyTableMeta() {
3707+
OlapTable table = new OlapTable();
3708+
// metaobj
3709+
table.signature = signature;
3710+
table.lastCheckTime = lastCheckTime;
3711+
// abstract table
3712+
table.id = id;
3713+
table.name = name;
3714+
table.qualifiedDbName = qualifiedDbName;
3715+
table.type = type;
3716+
table.createTime = createTime;
3717+
// olap table
3718+
// NOTE: currently do not need temp partitions, colocateGroup, autoIncrementGenerator
3719+
table.idToPartition = new ConcurrentHashMap<>();
3720+
table.tempPartitions = new TempPartitions();
3721+
3722+
table.state = state;
3723+
table.indexIdToMeta = indexIdToMeta;
3724+
table.indexNameToId = indexNameToId;
3725+
table.keysType = keysType;
3726+
table.partitionInfo = partitionInfo;
3727+
table.defaultDistributionInfo = defaultDistributionInfo;
3728+
table.bfColumns = bfColumns;
3729+
table.bfFpp = bfFpp;
3730+
table.indexes = indexes;
3731+
table.baseIndexId = baseIndexId;
3732+
return table;
3733+
}
3734+
3735+
public boolean isInternal() {
3736+
return false;
3737+
}
3738+
3739+
public long getCatalogId() {
3740+
return Env.getCurrentInternalCatalog().getId();
3741+
}
3742+
3743+
public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() throws AnalysisException {
3744+
return Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
3745+
}
37013746
}

fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,8 @@ public String toEngineName() {
437437
return "iceberg";
438438
case DICTIONARY:
439439
return "dictionary";
440+
case DORIS_EXTERNAL_TABLE:
441+
return "External_Doris";
440442
default:
441443
return null;
442444
}

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.doris.common.Config;
2323
import org.apache.doris.common.Pair;
2424
import org.apache.doris.common.ThreadPoolManager;
25+
import org.apache.doris.datasource.doris.DorisExternalMetaCacheMgr;
2526
import org.apache.doris.datasource.hive.HMSExternalCatalog;
2627
import org.apache.doris.datasource.hive.HMSExternalTable;
2728
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -100,6 +101,7 @@ public class ExternalMetaCacheMgr {
100101
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
101102
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
102103
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
104+
private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;
103105

104106
public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
105107
rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
@@ -131,6 +133,7 @@ public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
131133
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
132134
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
133135
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
136+
dorisExternalMetaCacheMgr = new DorisExternalMetaCacheMgr(commonRefreshExecutor);
134137
}
135138

136139
private ExecutorService newThreadPool(boolean isCheckpointCatalog, int numThread, int queueSize,
@@ -219,6 +222,10 @@ public ExternalRowCountCache getRowCountCache() {
219222
return rowCountCache;
220223
}
221224

225+
public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() {
226+
return dorisExternalMetaCacheMgr;
227+
}
228+
222229
public void removeCache(long catalogId) {
223230
if (cacheMap.remove(catalogId) != null) {
224231
LOG.info("remove hive metastore cache for catalog {}", catalogId);
@@ -232,6 +239,7 @@ public void removeCache(long catalogId) {
232239
icebergMetadataCacheMgr.removeCache(catalogId);
233240
maxComputeMetadataCacheMgr.removeCache(catalogId);
234241
paimonMetadataCacheMgr.removeCache(catalogId);
242+
dorisExternalMetaCacheMgr.removeCache(catalogId);
235243
}
236244

237245
public void invalidateTableCache(ExternalTable dorisTable) {
@@ -288,6 +296,7 @@ public void invalidateCatalogCache(long catalogId) {
288296
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
289297
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
290298
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
299+
dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
291300
if (LOG.isDebugEnabled()) {
292301
LOG.debug("invalid catalog cache for {}", catalogId);
293302
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.doris;
19+
20+
import org.apache.doris.catalog.Env;
21+
import org.apache.doris.common.CacheFactory;
22+
import org.apache.doris.common.Config;
23+
import org.apache.doris.system.Backend;
24+
25+
import com.github.benmanes.caffeine.cache.LoadingCache;
26+
import com.google.common.collect.ImmutableMap;
27+
import com.google.common.collect.Maps;
28+
import org.apache.logging.log4j.LogManager;
29+
import org.apache.logging.log4j.Logger;
30+
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.OptionalLong;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.stream.Collectors;
36+
37+
public class DorisExternalMetaCacheMgr {
38+
private static final Logger LOG = LogManager.getLogger(DorisExternalMetaCacheMgr.class);
39+
private final LoadingCache<Long, ImmutableMap<Long, Backend>> backendsCache;
40+
41+
public DorisExternalMetaCacheMgr(ExecutorService executor) {
42+
CacheFactory cacheFactory = new CacheFactory(
43+
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
44+
OptionalLong.of(Config.external_cache_refresh_time_minutes * 60),
45+
Config.max_external_table_cache_num,
46+
true,
47+
null);
48+
backendsCache = cacheFactory.buildCache(key -> loadBackends(key), executor);
49+
}
50+
51+
private ImmutableMap<Long, Backend> loadBackends(Long catalogId) {
52+
RemoteDorisExternalCatalog catalog = (RemoteDorisExternalCatalog) Env.getCurrentEnv().getCatalogMgr()
53+
.getCatalog(catalogId);
54+
List<Backend> backends = catalog.getFeServiceClient().listBackends();
55+
if (LOG.isDebugEnabled()) {
56+
List<String> names = backends.stream().map(b -> b.getAddress()).collect(Collectors.toList());
57+
LOG.debug("load backends:{} from:{}", String.join(",", names), catalog.getName());
58+
}
59+
Map<Long, Backend> backendMap = Maps.newHashMap();
60+
backends.forEach(backend -> backendMap.put(backend.getId(), backend));
61+
return ImmutableMap.copyOf(backendMap);
62+
}
63+
64+
public void removeCache(long catalogId) {
65+
backendsCache.invalidate(catalogId);
66+
}
67+
68+
public void invalidateBackendCache(long catalogId) {
69+
backendsCache.invalidate(catalogId);
70+
}
71+
72+
public void invalidateCatalogCache(long catalogId) {
73+
invalidateBackendCache(catalogId);
74+
}
75+
76+
public ImmutableMap<Long, Backend> getBackends(long catalogId) {
77+
ImmutableMap<Long, Backend> backends = backendsCache.get(catalogId);
78+
if (backends == null) {
79+
return ImmutableMap.of();
80+
}
81+
return backends;
82+
}
83+
}

0 commit comments

Comments
 (0)