Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,6 @@ default OlapTable getOlapTableOrAnalysisException(String tableName) throws Analy
}
return (OlapTable) table;
}

void dropTable(String tableName);
}
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
Expand Down Expand Up @@ -316,6 +317,7 @@ public class Env {
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
private MetastoreEventsProcessor metastoreEventsProcessor;

private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
Expand Down Expand Up @@ -549,6 +551,7 @@ private Env(boolean isCheckpointCatalog) {
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector();
this.metastoreEventsProcessor = new MetastoreEventsProcessor();

this.replayedJournalId = new AtomicLong(0L);
this.isElectable = false;
Expand Down Expand Up @@ -1402,6 +1405,10 @@ private void startMasterOnlyDaemonThreads() {
if (Config.enable_fqdn_mode) {
fqdnManager.start();
}
if (Config.enable_hms_events_incremental_sync) {
metastoreEventsProcessor.start();
}

}

// start threads that should running on all FE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,9 @@ public static ExternalDatabase read(DataInput in) throws IOException {

@Override
public void gsonPostProcess() throws IOException {}

@Override
public void dropTable(String tableName) {
throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,14 @@ public void addTableForTest(HMSExternalTable tbl) {
idToTbl.put(tbl.getId(), tbl);
tableNameToId.put(tbl.getName(), tbl.getId());
}

@Override
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);
Long tableId = tableNameToId.remove(tableName);
if (tableId == null) {
LOG.warn("drop table [{}] failed", tableName);
}
idToTbl.remove(tableId);
}
}
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1935,5 +1935,23 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static boolean enable_func_pushdown = true;

/**
* If set to true, doris will automatically synchronize hms metadata to the cache in fe.
*/
@ConfField(masterOnly = true)
public static boolean enable_hms_events_incremental_sync = false;

/**
* Maximum number of events to poll in each RPC.
*/
@ConfField(mutable = true, masterOnly = true)
public static int hms_events_batch_size_per_rpc = 500;

/**
* HMS polling interval in milliseconds.
*/
@ConfField(masterOnly = true)
public static int hms_events_polling_interval_ms = 20000;
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
Expand Down Expand Up @@ -435,13 +436,17 @@ public ShowResultSet showCreateCatalog(ShowCreateCatalogStmt showStmt) throws An
* Refresh the catalog meta and write the meta log.
*/
public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException {
CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
if (catalog == null) {
throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
}
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
refreshCatalog(log);
}

public void refreshCatalog(CatalogLog log) {
writeLock();
try {
CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
if (catalog == null) {
throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
}
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
replayRefreshCatalog(log);
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log);
} finally {
Expand Down Expand Up @@ -477,7 +482,7 @@ public void replayDropCatalog(CatalogLog log) {
/**
* Reply for refresh catalog event.
*/
public void replayRefreshCatalog(CatalogLog log) throws DdlException {
public void replayRefreshCatalog(CatalogLog log) {
writeLock();
try {
unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache());
Expand Down Expand Up @@ -550,6 +555,42 @@ public void replayRefreshExternalTable(ExternalObjectLog log) {
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
}

public void dropExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support drop ExternalCatalog Tables");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}

TableIf table = db.getTableNullable(tableName);
if (table == null) {
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
replayDropExternalTable(log);
Env.getCurrentEnv().getEditLog().logDropExternalTable(log);
}

public void replayDropExternalTable(ExternalObjectLog log) {
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(),
log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
ExternalTable table = db.getTableForReplay(log.getTableId());
db.dropTable(table.getName());
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ public void gsonPostProcess() throws IOException {
db.setTableExtCatalog(this);
}
objectCreated = false;
if (this instanceof HMSExternalCatalog) {
((HMSExternalCatalog) this).setLastSyncedEventId(-1L);
}
}

public void addDatabaseForTest(ExternalDatabase db) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.List;
Expand All @@ -42,8 +48,12 @@
* External catalog for hive metastore compatible data sources.
*/
public class HMSExternalCatalog extends ExternalCatalog {
private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);

private static final int MAX_CLIENT_POOL_SIZE = 8;
protected PooledHiveMetaStoreClient client;
// Record the latest synced event id when processing hive events
private long lastSyncedEventId;

/**
* Default constructor for HMSExternalCatalog.
Expand Down Expand Up @@ -170,4 +180,48 @@ public List<Column> getSchema(String dbName, String tblName) {
}
return tmpSchema;
}

public void setLastSyncedEventId(long lastSyncedEventId) {
this.lastSyncedEventId = lastSyncedEventId;
}

public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog)
throws MetastoreNotificationFetchException {
makeSureInitialized();
if (lastSyncedEventId < 0) {
lastSyncedEventId = getCurrentEventId();
refreshCatalog(hmsExternalCatalog);
LOG.info(
"First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
+ "lastSyncedEventId is [{}]",
hmsExternalCatalog.getName(), lastSyncedEventId);
return null;
}

long currentEventId = getCurrentEventId();
LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}",
hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId);
if (currentEventId == lastSyncedEventId) {
LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName());
return null;
}
return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use currentEventId here instead of lastSyncedEventId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The currentEventId represents the latest event ID. If we use the currentEventId to get events, we will never get events

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}

private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) {
CatalogLog log = new CatalogLog();
log.setCatalogId(hmsExternalCatalog.getId());
log.setInvalidCache(true);
Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log);
}

private long getCurrentEventId() {
makeSureInitialized();
CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId();
if (currentNotificationEventId == null) {
LOG.warn("Get currentNotificationEventId is null");
return -1;
}
return currentNotificationEventId.getEventId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.HMSResource;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;

import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.google.common.base.Preconditions;
Expand All @@ -29,8 +30,10 @@
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -145,6 +148,30 @@ public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
}
}

public CurrentNotificationEventId getCurrentNotificationEventId() {
try (CachedClient client = getClient()) {
return client.client.getCurrentNotificationEventId();
} catch (Exception e) {
LOG.warn("Failed to fetch current notification event id", e);
throw new MetastoreNotificationFetchException(
"Failed to get current notification event id. msg: " + e.getMessage());
}
}

public NotificationEventResponse getNextNotification(long lastEventId,
int maxEvents,
IMetaStoreClient.NotificationFilter filter)
throws MetastoreNotificationFetchException {
try (CachedClient client = getClient()) {
return client.client.getNextNotification(lastEventId, maxEvents, filter);
} catch (Exception e) {
LOG.warn("Failed to get next notification based on last event id {}", lastEventId, e);
throw new MetastoreNotificationFetchException(
"Failed to get next notification based on last event id: " + lastEventId + ". msg: " + e
.getMessage());
}
}

private class CachedClient implements AutoCloseable {
private final IMetaStoreClient client;

Expand Down
Loading