diff --git a/extensions-core/druid-catalog/pom.xml b/extensions-core/druid-catalog/pom.xml
index a2eb434c1a21..e86142b55d44 100644
--- a/extensions-core/druid-catalog/pom.xml
+++ b/extensions-core/druid-catalog/pom.xml
@@ -48,44 +48,22 @@
${project.parent.version}
provided
-
- org.apache.druid
- druid-indexing-service
- ${project.parent.version}
- provided
-
org.apache.druid
druid-sql
${project.parent.version}
provided
-
- org.apache.druid
- druid-services
- ${project.parent.version}
- provided
-
com.google.inject
guice
provided
-
- com.google.inject.extensions
- guice-multibindings
- provided
-
com.google.guava
guava
provided
-
- com.opencsv
- opencsv
- provided
-
com.fasterxml.jackson.core
jackson-databind
@@ -142,11 +120,6 @@
curator-client
provided
-
- com.fasterxml.jackson.dataformat
- jackson-dataformat-smile
- provided
-
org.jdbi
jdbi
@@ -162,11 +135,6 @@
jsr311-api
provided
-
- org.apache.commons
- commons-lang3
- provided
-
javax.servlet
javax.servlet-api
@@ -177,36 +145,6 @@
jersey-server
provided
-
- com.google.errorprone
- error_prone_annotations
- provided
-
-
- org.lz4
- lz4-java
- provided
-
-
- org.apache.datasketches
- datasketches-java
- provided
-
-
- org.apache.datasketches
- datasketches-memory
- provided
-
-
- it.unimi.dsi
- fastutil-core
- provided
-
-
- commons-io
- commons-io
- provided
-
@@ -214,31 +152,11 @@
easymock
test
-
- org.hamcrest
- hamcrest-all
- test
-
-
- org.hamcrest
- hamcrest-core
- test
-
junit
junit
test
-
- org.mockito
- mockito-core
- test
-
-
- nl.jqno.equalsverifier
- equalsverifier
- test
-
org.apache.druid
druid-processing
@@ -262,4 +180,34 @@
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+
+ javax.inject:javax.inject
+
+
+ javax.inject:javax.inject
+ jakarta.inject:jakarta.inject-api
+
+
+
+
+
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogBrokerModule.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogBrokerModule.java
new file mode 100644
index 000000000000..91de7842e133
--- /dev/null
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogBrokerModule.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.guice;
+
+import com.google.inject.Binder;
+import org.apache.druid.catalog.http.CatalogListenerResource;
+import org.apache.druid.catalog.model.SchemaRegistry;
+import org.apache.druid.catalog.model.SchemaRegistryImpl;
+import org.apache.druid.catalog.sql.LiveCatalogResolver;
+import org.apache.druid.catalog.sync.CachedMetadataCatalog;
+import org.apache.druid.catalog.sync.CatalogClient;
+import org.apache.druid.catalog.sync.CatalogUpdateListener;
+import org.apache.druid.catalog.sync.CatalogUpdateReceiver;
+import org.apache.druid.catalog.sync.MetadataCatalog;
+import org.apache.druid.catalog.sync.MetadataCatalog.CatalogSource;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.Jerseys;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
+
+/**
+ * Configures the metadata catalog on the Broker to use a cache
+ * and network communications for pull and push updates.
+ */
+@LoadScope(roles = NodeRole.BROKER_JSON_NAME)
+public class CatalogBrokerModule implements DruidModule
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ // The Broker (catalog client) uses a cached metadata catalog.
+ binder
+ .bind(CachedMetadataCatalog.class)
+ .in(LazySingleton.class);
+
+ // Broker code accesses he catalog through the
+ // MetadataCatalog interface.
+ binder
+ .bind(MetadataCatalog.class)
+ .to(CachedMetadataCatalog.class)
+ .in(LazySingleton.class);
+
+ // The cached metadata catalog needs a "pull" source,
+ // which is the network client.
+ binder
+ .bind(CatalogSource.class)
+ .to(CatalogClient.class)
+ .in(LazySingleton.class);
+
+ // The cached metadata catalog is the listener for"push" events.
+ binder
+ .bind(CatalogUpdateListener.class)
+ .to(CachedMetadataCatalog.class)
+ .in(LazySingleton.class);
+
+ // At present, the set of schemas is fixed.
+ binder
+ .bind(SchemaRegistry.class)
+ .to(SchemaRegistryImpl.class)
+ .in(LazySingleton.class);
+
+ // Lifecycle-managed class to prime the metadata cache
+ binder
+ .bind(CatalogUpdateReceiver.class)
+ .in(ManageLifecycle.class);
+ LifecycleModule.register(binder, CatalogUpdateReceiver.class);
+
+ // Catalog resolver for the planner. This will override the
+ // base binding.
+ binder
+ .bind(CatalogResolver.class)
+ .to(LiveCatalogResolver.class)
+ .in(LazySingleton.class);
+
+ // The listener resource sends to the catalog
+ // listener (the cached catalog.)
+ Jerseys.addResource(binder, CatalogListenerResource.class);
+ }
+}
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java
index 67b4d29fbef1..c100fbbbad82 100644
--- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java
@@ -67,4 +67,22 @@ public Response syncTable(final UpdateEvent event)
listener.updated(event);
return Response.status(Response.Status.ACCEPTED).build();
}
+
+ @POST
+ @Path("flush")
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response flush()
+ {
+ listener.flush();
+ return Response.status(Response.Status.ACCEPTED).build();
+ }
+
+ @POST
+ @Path("resync")
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response resync()
+ {
+ listener.resync();
+ return Response.status(Response.Status.ACCEPTED).build();
+ }
}
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java
index 551ba490c183..2590f57c8b38 100644
--- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java
@@ -278,8 +278,7 @@ public Response editTable(
// Retrieval
/**
- * Retrieves the list of all Druid schema names, all table names, or
- * all table metadata.
+ * Retrieves the list of all Druid schema names.
*
* @param format the format of the response. See the code for the
* available formats
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sql/LiveCatalogResolver.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sql/LiveCatalogResolver.java
new file mode 100644
index 000000000000..d436c93f2773
--- /dev/null
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sql/LiveCatalogResolver.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.sql;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.catalog.sync.MetadataCatalog;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.sql.calcite.table.DatasourceTable.EffectiveColumnMetadata;
+import org.apache.druid.sql.calcite.table.DatasourceTable.EffectiveMetadata;
+import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata;
+import org.apache.druid.sql.calcite.table.DruidTable;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A catalog resolver that uses the catalog stored on the Druid coordinator.
+ */
+public class LiveCatalogResolver implements CatalogResolver
+{
+ private final MetadataCatalog catalog;
+
+ @Inject
+ public LiveCatalogResolver(final MetadataCatalog catalog)
+ {
+ this.catalog = catalog;
+ }
+
+ @Nullable
+ private DatasourceFacade datasourceSpec(String name)
+ {
+ TableId tableId = TableId.datasource(name);
+ ResolvedTable table = catalog.resolveTable(tableId);
+ if (table == null) {
+ return null;
+ }
+ if (!DatasourceDefn.isDatasource(table)) {
+ return null;
+ }
+ return new DatasourceFacade(table);
+ }
+
+ /**
+ * Create a {@link DruidTable} based on the physical segments, catalog entry, or both.
+ */
+ @Override
+ public DruidTable resolveDatasource(String name, PhysicalDatasourceMetadata dsMetadata)
+ {
+ DatasourceFacade dsSpec = datasourceSpec(name);
+
+ // No catalog metadata. If there is no physical metadata, then the
+ // datasource does not exist. Else, if there is physical metadata, the
+ // datasource is based entirely on the physical information.
+ if (dsSpec == null) {
+ return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
+ }
+ if (dsMetadata == null) {
+ // Datasource exists only in the catalog: no physical segments.
+ return emptyDatasource(name, dsSpec);
+ } else {
+ // Datasource exists as both segments and a catalog entry.
+ return mergeDatasource(dsMetadata, dsSpec);
+ }
+ }
+
+ private DruidTable emptyDatasource(String name, DatasourceFacade dsSpec)
+ {
+ RowSignature.Builder builder = RowSignature.builder();
+ Map columns = new HashMap<>();
+ boolean hasTime = false;
+ for (ColumnSpec col : dsSpec.columns()) {
+ EffectiveColumnMetadata colMetadata = columnFromCatalog(col, null);
+ if (colMetadata.name().equals(Columns.TIME_COLUMN)) {
+ hasTime = true;
+ }
+ builder.add(col.name(), colMetadata.druidType());
+ columns.put(col.name(), colMetadata);
+ }
+ if (!hasTime) {
+ columns.put(Columns.TIME_COLUMN, new EffectiveColumnMetadata(
+ Columns.TIME_COLUMN,
+ ColumnType.LONG
+ ));
+ builder = RowSignature.builder()
+ .add(Columns.TIME_COLUMN, ColumnType.LONG)
+ .addAll(builder.build());
+ }
+
+ final PhysicalDatasourceMetadata mergedMetadata = new PhysicalDatasourceMetadata(
+ new TableDataSource(name),
+ builder.build(),
+ false, // Cannot join to an empty table
+ false // Cannot broadcast an empty table
+ );
+ return new DatasourceTable(
+ mergedMetadata.getRowSignature(),
+ mergedMetadata,
+ new EffectiveMetadata(dsSpec, columns, true)
+ );
+ }
+
+ private EffectiveColumnMetadata columnFromCatalog(ColumnSpec col, ColumnType physicalType)
+ {
+ ColumnType type = Columns.druidType(col);
+ if (type != null) {
+ // Use the type that the user provided.
+ } else if (physicalType == null) {
+ // Corner case: the user has defined a column in the catalog, has
+ // not specified a type (meaning the user wants Druid to decide), but
+ // there is no data at this moment. Guess String as the type for the
+ // null values. If new segments appear between now and execution, we'll
+ // convert the values to string, which is always safe.
+ type = ColumnType.STRING;
+ } else {
+ type = physicalType;
+ }
+ return new EffectiveColumnMetadata(col.name(), type);
+ }
+
+ private DruidTable mergeDatasource(
+ final PhysicalDatasourceMetadata dsMetadata,
+ final DatasourceFacade dsSpec)
+ {
+ final RowSignature physicalSchema = dsMetadata.getRowSignature();
+ Set physicalCols = new HashSet<>(physicalSchema.getColumnNames());
+
+ // Merge columns. All catalog-defined columns come first,
+ // in the order defined in the catalog.
+ final RowSignature.Builder builder = RowSignature.builder();
+ Map columns = new HashMap<>();
+ for (ColumnSpec col : dsSpec.columns()) {
+ ColumnType physicalType = null;
+ if (physicalCols.remove(col.name())) {
+ physicalType = dsMetadata.getRowSignature().getColumnType(col.name()).get();
+ }
+ EffectiveColumnMetadata colMetadata = columnFromCatalog(col, physicalType);
+ builder.add(col.name(), colMetadata.druidType());
+ columns.put(col.name(), colMetadata);
+ }
+
+ // Mark any hidden columns. Assumes that the hidden columns are a disjoint set
+ // from the defined columns.
+ if (dsSpec.hiddenColumns() != null) {
+ for (String colName : dsSpec.hiddenColumns()) {
+ physicalCols.remove(colName);
+ }
+ }
+
+ // Any remaining columns follow, if not marked as hidden
+ // in the catalog.
+ for (int i = 0; i < physicalSchema.size(); i++) {
+ String colName = physicalSchema.getColumnName(i);
+ if (!physicalCols.contains(colName)) {
+ continue;
+ }
+ ColumnType physicalType = dsMetadata.getRowSignature().getColumnType(colName).get();
+ EffectiveColumnMetadata colMetadata = EffectiveColumnMetadata.fromPhysical(colName, physicalType);
+ columns.put(colName, colMetadata);
+ builder.add(colName, physicalType);
+ }
+
+ EffectiveMetadata effectiveMetadata = new EffectiveMetadata(dsSpec, columns, false);
+ return new DatasourceTable(builder.build(), dsMetadata, effectiveMetadata);
+ }
+
+ @Override
+ public boolean ingestRequiresExistingTable()
+ {
+ return false;
+ }
+
+ @Override
+ public Set getTableNames(Set datasourceNames)
+ {
+ Set catalogTableNames = catalog.tableNames(TableId.DRUID_SCHEMA);
+ if (catalogTableNames.isEmpty()) {
+ return datasourceNames;
+ }
+ return ImmutableSet.builder()
+ .addAll(datasourceNames)
+ .addAll(catalogTableNames)
+ .build();
+ }
+}
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java
index b30a12ee9274..dfbf2b5731fe 100644
--- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java
@@ -277,6 +277,22 @@ public synchronized Set tableNames()
});
return tables;
}
+
+ /**
+ * Populate the cache by asking the catalog source for all tables for
+ * this schema.
+ */
+ public synchronized void resync(CatalogSource source)
+ {
+ List tables = source.tablesForSchema(schema.name());
+ cache.clear();
+ for (TableMetadata table : tables) {
+ cache.compute(
+ table.id().name(),
+ (k, v) -> computeCreate(v, table)
+ );
+ }
+ }
}
private final ConcurrentHashMap schemaCache = new ConcurrentHashMap<>();
@@ -326,6 +342,12 @@ public void updated(UpdateEvent event)
}
}
+ /**
+ * Get the list of table names in the cache. Does not attempt to
+ * lazy load the list since doing so is costly: we don't know when it
+ * might be out of date. Rely on priming the cache, and update notifications
+ * to keep the list accurate.
+ */
@Override
public Set tableNames(String schemaName)
{
@@ -333,8 +355,13 @@ public Set tableNames(String schemaName)
return schemaEntry == null ? Collections.emptySet() : schemaEntry.tableNames();
}
+ /**
+ * Clear the cache. Primarily for testing.
+ */
+ @Override
public void flush()
{
+ LOG.info("Flush requested");
schemaCache.clear();
}
@@ -347,4 +374,18 @@ private SchemaEntry entryFor(String schemaName)
return schema == null ? null : new SchemaEntry(schema);
});
}
+
+ /**
+ * Discard any existing cached tables and reload directly from the
+ * catalog source. Manages the two schemas which the catalog manages.
+ * If the catalog were to manage others, add those here as well.
+ * Done both at Broker startup, and on demand for testing.
+ */
+ @Override
+ public void resync()
+ {
+ LOG.info("Resync requested");
+ entryFor(TableId.DRUID_SCHEMA).resync(base);
+ entryFor(TableId.EXTERNAL_SCHEMA).resync(base);
+ }
}
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java
index a06addfce710..73126fc3041f 100644
--- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java
@@ -86,7 +86,7 @@ public CatalogClient(
@Override
public List tablesForSchema(String dbSchema)
{
- String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{dbSchema}", dbSchema);
+ String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{schema}", dbSchema);
List results = send(url, LIST_OF_TABLE_METADATA_TYPE);
// Not found for a list is an empty list.
@@ -96,7 +96,7 @@ public List tablesForSchema(String dbSchema)
@Override
public TableMetadata table(TableId id)
{
- String url = StringUtils.replace(TABLE_SYNC_PATH, "{dbSchema}", id.schema());
+ String url = StringUtils.replace(TABLE_SYNC_PATH, "{schema}", id.schema());
url = StringUtils.replace(url, "{name}", id.name());
return send(url, TABLE_METADATA_TYPE);
}
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java
index afddeb00221a..553ed49c00f3 100644
--- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java
@@ -28,4 +28,6 @@
public interface CatalogUpdateListener
{
void updated(UpdateEvent event);
+ void flush();
+ void resync();
}
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java
index f60507c8e2c9..1c817493a459 100644
--- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java
@@ -87,14 +87,14 @@ public CatalogUpdateNotifier(
public void start()
{
notifier.start();
- LOG.info("Catalog catalog update notifier started");
+ LOG.info("Catalog update notifier started");
}
@LifecycleStop
public void stop()
{
notifier.stop();
- LOG.info("Catalog catalog update notifier stopped");
+ LOG.info("Catalog update notifier stopped");
}
@Override
@@ -102,4 +102,16 @@ public void updated(UpdateEvent event)
{
notifier.send(JacksonUtils.toBytes(smileMapper, event));
}
+
+ @Override
+ public void flush()
+ {
+ // Not generated on this path
+ }
+
+ @Override
+ public void resync()
+ {
+ // Not generated on this path
+ }
}
diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateReceiver.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateReceiver.java
new file mode 100644
index 000000000000..1dfeaa1307db
--- /dev/null
+++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateReceiver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.sync;
+
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import javax.inject.Inject;
+
+/**
+ * Receiver which runs in the Broker to listen for catalog updates from the
+ * Coordinator. To prevent slowing initial queries, this class loads the
+ * current catalog contents into the local cache on lifecycle start, which
+ * avoids the on-demand reads that would otherwise occur. After the first load,
+ * events from the Coordinator keep the local cache evergreen.
+ */
+@ManageLifecycle
+public class CatalogUpdateReceiver
+{
+ private static final EmittingLogger LOG = new EmittingLogger(CatalogUpdateReceiver.class);
+
+ private final CachedMetadataCatalog cachedCatalog;
+
+ @Inject
+ public CatalogUpdateReceiver(
+ final CachedMetadataCatalog cachedCatalog
+ )
+ {
+ this.cachedCatalog = cachedCatalog;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ cachedCatalog.resync();
+ LOG.info("Catalog update receiver started");
+ }
+}
diff --git a/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index 3f9cd5ca725e..bcf0117c8a78 100644
--- a/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ b/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.druid.catalog.guice.CatalogCoordinatorModule
+org.apache.druid.catalog.guice.CatalogBrokerModule
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogQueryTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogQueryTest.java
new file mode 100644
index 000000000000..2ee9041fc929
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogQueryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.sql;
+
+import org.apache.druid.catalog.CatalogException;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.table.TableBuilder;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.CatalogTests;
+import org.apache.druid.catalog.sync.CachedMetadataCatalog;
+import org.apache.druid.catalog.sync.MetadataCatalog;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.SqlSchema;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
+import org.apache.druid.sql.calcite.util.SqlTestFramework;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
+public class CatalogQueryTest extends BaseCalciteQueryTest
+{
+ @Rule
+ public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+ private CatalogTests.DbFixture dbFixture;
+ private CatalogStorage storage;
+
+ @Test
+ public void testCatalogSchema()
+ {
+ SqlSchema schema = SqlSchema.builder()
+ .column("__time", "TIMESTAMP(3) NOT NULL")
+ .column("extra1", "VARCHAR")
+ .column("dim2", "VARCHAR")
+ .column("dim1", "VARCHAR")
+ .column("cnt", "BIGINT NOT NULL")
+ .column("m1", "DOUBLE NOT NULL")
+ .column("extra2", "BIGINT NOT NULL")
+ .column("extra3", "VARCHAR")
+ .column("m2", "DOUBLE NOT NULL")
+ .build();
+ testBuilder()
+ .sql("SELECT * FROM foo ORDER BY __time LIMIT 1")
+ .expectedResources(Collections.singletonList(dataSourceRead("foo")))
+ //.expectedSqlSchema(schema)
+ .run();
+ }
+
+ @After
+ public void catalogTearDown()
+ {
+ CatalogTests.tearDown(dbFixture);
+ }
+
+ @Override
+ public CatalogResolver createCatalogResolver()
+ {
+ dbFixture = new CatalogTests.DbFixture(derbyConnectorRule);
+ storage = dbFixture.storage;
+ MetadataCatalog catalog = new CachedMetadataCatalog(
+ storage,
+ storage.schemaRegistry(),
+ storage.jsonMapper()
+ );
+ return new LiveCatalogResolver(catalog);
+ }
+
+ @Override
+ public void finalizeTestFramework(SqlTestFramework sqlTestFramework)
+ {
+ super.finalizeTestFramework(sqlTestFramework);
+ buildFooDatasource();
+ }
+
+ private void createTableMetadata(TableMetadata table)
+ {
+ try {
+ storage.tables().create(table);
+ }
+ catch (CatalogException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void buildFooDatasource()
+ {
+ TableMetadata spec = TableBuilder.datasource("foo", "ALL")
+ .timeColumn()
+ .column("extra1", null)
+ .column("dim2", null)
+ .column("dim1", null)
+ .column("cnt", null)
+ .column("m1", Columns.DOUBLE)
+ .column("extra2", Columns.LONG)
+ .column("extra3", Columns.STRING)
+ .hiddenColumns(Arrays.asList("dim3", "unique_dim1"))
+ .build();
+ createTableMetadata(spec);
+ }
+}
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/LiveCatalogTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/LiveCatalogTest.java
new file mode 100644
index 000000000000..54b0191b97ea
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/LiveCatalogTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.sql;
+
+import org.apache.druid.catalog.CatalogException.DuplicateKeyException;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.table.TableBuilder;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.CatalogTests;
+import org.apache.druid.catalog.sync.LocalMetadataCatalog;
+import org.apache.druid.catalog.sync.MetadataCatalog;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
+import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata;
+import org.apache.druid.sql.calcite.table.DruidTable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+/**
+ * Test for the datasource resolution aspects of the live catalog resolver.
+ * Too tedious to test the insert resolution in its current state.
+ */
+public class LiveCatalogTest
+{
+ @Rule
+ public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+ private CatalogTests.DbFixture dbFixture;
+ private CatalogStorage storage;
+ private CatalogResolver resolver;
+
+ @Before
+ public void setUp()
+ {
+ dbFixture = new CatalogTests.DbFixture(derbyConnectorRule);
+ storage = dbFixture.storage;
+ MetadataCatalog catalog = new LocalMetadataCatalog(storage, storage.schemaRegistry());
+ resolver = new LiveCatalogResolver(catalog);
+ }
+
+ @After
+ public void tearDown()
+ {
+ CatalogTests.tearDown(dbFixture);
+ }
+
+ private void createTableMetadata(TableMetadata table)
+ {
+ try {
+ storage.tables().create(table);
+ }
+ catch (DuplicateKeyException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Populate the catalog with a few items using the REST resource.
+ */
+ private void populateCatalog(boolean withTimeCol)
+ {
+ TableMetadata table = TableBuilder.datasource("trivial", "PT1D")
+ .build();
+ createTableMetadata(table);
+
+ TableBuilder builder = TableBuilder.datasource("merge", "PT1D");
+ if (withTimeCol) {
+ builder.timeColumn();
+ }
+ table = builder
+ .column("dsa", null)
+ .column("dsb", Columns.STRING)
+ .column("dsc", Columns.LONG)
+ .column("dsd", Columns.FLOAT)
+ .column("dse", Columns.DOUBLE)
+ .column("newa", null)
+ .column("newb", Columns.STRING)
+ .column("newc", Columns.LONG)
+ .column("newd", Columns.FLOAT)
+ .column("newe", Columns.DOUBLE)
+ .hiddenColumns(Arrays.asList("dsf", "dsg"))
+ .build();
+ createTableMetadata(table);
+ }
+
+ private PhysicalDatasourceMetadata mockDatasource()
+ {
+ RowSignature sig = RowSignature.builder()
+ .add(Columns.TIME_COLUMN, ColumnType.LONG)
+ .add("dsa", ColumnType.DOUBLE)
+ .add("dsb", ColumnType.LONG)
+ .add("dsc", ColumnType.STRING)
+ .add("dsd", ColumnType.LONG)
+ .add("dse", ColumnType.FLOAT)
+ .add("dsf", ColumnType.STRING)
+ .add("dsg", ColumnType.LONG)
+ .add("dsh", ColumnType.DOUBLE)
+ .build();
+ return new PhysicalDatasourceMetadata(
+ new TableDataSource("merge"),
+ sig,
+ true,
+ true
+ );
+ }
+
+ @Test
+ public void testUnknownTable()
+ {
+ // No catalog, no datasource
+ assertNull(resolver.resolveDatasource("bogus", null));
+
+ // No catalog entry
+ PhysicalDatasourceMetadata dsMetadata = mockDatasource();
+ DruidTable table = resolver.resolveDatasource("merge", dsMetadata);
+ assertSame(dsMetadata.getRowSignature(), table.getRowSignature());
+ }
+
+ @Test
+ public void testKnownTableNoTime()
+ {
+ populateCatalog(false);
+
+ // Catalog, no datasource
+ DruidTable table = resolver.resolveDatasource("merge", null);
+ assertEquals(11, table.getRowSignature().size());
+ assertEquals("merge", ((TableDataSource) table.getDataSource()).getName());
+
+ // Spot check
+ assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG);
+ assertColumnEquals(table, 1, "dsa", ColumnType.STRING);
+ assertColumnEquals(table, 2, "dsb", ColumnType.STRING);
+ assertColumnEquals(table, 3, "dsc", ColumnType.LONG);
+
+ // Catalog, with datasource, result is merged
+ // Catalog has no time column
+ PhysicalDatasourceMetadata dsMetadata = mockDatasource();
+ table = resolver.resolveDatasource("merge", dsMetadata);
+ assertEquals(12, table.getRowSignature().size());
+ assertSame(dsMetadata.dataSource(), table.getDataSource());
+ assertEquals(dsMetadata.isBroadcast(), table.isBroadcast());
+ assertEquals(dsMetadata.isJoinable(), table.isJoinable());
+
+ // dsa uses Druid's type, others coerce the type
+ assertColumnEquals(table, 0, "dsa", ColumnType.DOUBLE);
+ assertColumnEquals(table, 1, "dsb", ColumnType.STRING);
+ assertColumnEquals(table, 2, "dsc", ColumnType.LONG);
+ assertColumnEquals(table, 3, "dsd", ColumnType.FLOAT);
+ assertColumnEquals(table, 4, "dse", ColumnType.DOUBLE);
+ assertColumnEquals(table, 5, "newa", ColumnType.STRING);
+ assertColumnEquals(table, 9, "newe", ColumnType.DOUBLE);
+ assertColumnEquals(table, 10, Columns.TIME_COLUMN, ColumnType.LONG);
+ assertColumnEquals(table, 11, "dsh", ColumnType.DOUBLE);
+ }
+
+ @Test
+ public void testKnownTableWithTime()
+ {
+ populateCatalog(true);
+
+ // Catalog, no datasource
+ DruidTable table = resolver.resolveDatasource("merge", null);
+ assertEquals(11, table.getRowSignature().size());
+ assertEquals("merge", ((TableDataSource) table.getDataSource()).getName());
+
+ // Spot check
+ assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG);
+ assertColumnEquals(table, 1, "dsa", ColumnType.STRING);
+ assertColumnEquals(table, 2, "dsb", ColumnType.STRING);
+ assertColumnEquals(table, 3, "dsc", ColumnType.LONG);
+
+ // Catalog, with datasource, result is merged
+ PhysicalDatasourceMetadata dsMetadata = mockDatasource();
+ table = resolver.resolveDatasource("merge", dsMetadata);
+ assertEquals(12, table.getRowSignature().size());
+ assertSame(dsMetadata.dataSource(), table.getDataSource());
+ assertEquals(dsMetadata.isBroadcast(), table.isBroadcast());
+ assertEquals(dsMetadata.isJoinable(), table.isJoinable());
+
+ assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG);
+ // dsa uses Druid's type, others coerce the type
+ assertColumnEquals(table, 1, "dsa", ColumnType.DOUBLE);
+ assertColumnEquals(table, 2, "dsb", ColumnType.STRING);
+ assertColumnEquals(table, 3, "dsc", ColumnType.LONG);
+ assertColumnEquals(table, 4, "dsd", ColumnType.FLOAT);
+ assertColumnEquals(table, 5, "dse", ColumnType.DOUBLE);
+ assertColumnEquals(table, 6, "newa", ColumnType.STRING);
+ assertColumnEquals(table, 10, "newe", ColumnType.DOUBLE);
+ assertColumnEquals(table, 11, "dsh", ColumnType.DOUBLE);
+ }
+
+ private void assertColumnEquals(DruidTable table, int i, String name, ColumnType type)
+ {
+ RowSignature sig = table.getRowSignature();
+ assertEquals(name, sig.getColumnName(i));
+ assertEquals(type, sig.getColumnType(i).get());
+ }
+}
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java
index 1b98ce272706..c138a71dd8cc 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java
@@ -207,8 +207,8 @@ public void testUpdateColumns() throws CatalogException
DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000
);
List cols = Arrays.asList(
- new ColumnSpec("a", Columns.VARCHAR, null),
- new ColumnSpec("b", Columns.BIGINT, null)
+ new ColumnSpec("a", Columns.STRING, null),
+ new ColumnSpec("b", Columns.LONG, null)
);
ColumnSpec colC = new ColumnSpec("c", Columns.DOUBLE, null);
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogCacheTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogCacheTest.java
new file mode 100644
index 000000000000..9027991399b4
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogCacheTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.sync;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.catalog.CatalogException.DuplicateKeyException;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.table.TableBuilder;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.CatalogTests;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Builds on the generic {@link CatalogSyncTest} to focus on cache-specific
+ * operations.
+ */
+public class CatalogCacheTest
+{
+ @Rule
+ public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+ private CatalogTests.DbFixture dbFixture;
+ private CatalogStorage storage;
+ private ObjectMapper jsonMapper;
+
+ @Before
+ public void setUp()
+ {
+ dbFixture = new CatalogTests.DbFixture(derbyConnectorRule);
+ storage = dbFixture.storage;
+ jsonMapper = new ObjectMapper();
+ }
+
+ @After
+ public void tearDown()
+ {
+ CatalogTests.tearDown(dbFixture);
+ }
+
+ /**
+ * Test overall cache lifecycle. Detailed checks of contents is done
+ * in {@link CatalogSyncTest} and is not repeated here.
+ */
+ @Test
+ public void testLifecycle() throws DuplicateKeyException
+ {
+ // Create entries with no listener.
+ TableMetadata table1 = TableBuilder.datasource("table1", "P1D")
+ .timeColumn()
+ .column("a", Columns.STRING)
+ .build();
+ storage.validate(table1);
+ storage.tables().create(table1);
+
+ // Create a listener. Starts with the cache empty
+ CachedMetadataCatalog cache1 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper);
+ storage.register(cache1);
+ assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty());
+
+ // Load table on demand.
+ assertNotNull(cache1.getTable(table1.id()));
+ assertEquals(1, cache1.tableNames(TableId.DRUID_SCHEMA).size());
+
+ // Flush to empty the cache.
+ cache1.flush();
+ assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty());
+
+ // Resync to reload the cache.
+ cache1.resync();
+ assertEquals(1, cache1.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache1.getTable(table1.id()));
+
+ // Add a table: cache is updated.
+ TableMetadata table2 = TableBuilder.datasource("table2", "P1D")
+ .timeColumn()
+ .column("dim", Columns.STRING)
+ .column("measure", Columns.LONG)
+ .build();
+ storage.validate(table2);
+ storage.tables().create(table2);
+ assertEquals(2, cache1.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache1.getTable(table2.id()));
+
+ // Second listener. Starts with the cache empty.
+ CachedMetadataCatalog cache2 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper);
+ storage.register(cache2);
+ assertTrue(cache2.tableNames(TableId.DRUID_SCHEMA).isEmpty());
+
+ // Second listener resyncs.
+ cache2.resync();
+ assertEquals(2, cache2.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache2.getTable(table1.id()));
+ assertNotNull(cache2.getTable(table2.id()));
+
+ // Add a third table: both caches updated.
+ TableMetadata table3 = TableBuilder.datasource("table3", "PT1H")
+ .timeColumn()
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
+ .build();
+ storage.validate(table3);
+ storage.tables().create(table3);
+ assertEquals(3, cache1.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache1.getTable(table3.id()));
+ assertEquals(3, cache2.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache2.getTable(table3.id()));
+
+ // Another resync puts us back where we are.
+ cache1.flush();
+ assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty());
+ cache1.resync();
+ assertEquals(3, cache1.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache1.getTable(table3.id()));
+
+ cache2.resync();
+ assertEquals(3, cache2.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache2.getTable(table3.id()));
+
+ // Third cache, managed by the receiver.
+ CachedMetadataCatalog cache3 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper);
+ storage.register(cache3);
+ CatalogUpdateReceiver receiver = new CatalogUpdateReceiver(cache3);
+ receiver.start();
+ assertEquals(3, cache3.tableNames(TableId.DRUID_SCHEMA).size());
+ assertNotNull(cache3.getTable(table3.id()));
+ }
+}
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java
index fc77735e499e..c159c51c5eec 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java
@@ -96,7 +96,7 @@ public void testInputValidation()
TableMetadata table = TableBuilder.external("externTable")
.inputSource(toMap(new InlineInputSource("a\nc")))
.inputFormat(BaseExternTableTest.CSV_FORMAT)
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
storage.validate(table);
}
@@ -114,7 +114,7 @@ public void testInputValidation()
{
TableMetadata table = TableBuilder.external("externTable")
.inputSource(toMap(new InlineInputSource("a\nc")))
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
assertThrows(IAE.class, () -> storage.validate(table));
}
@@ -195,15 +195,15 @@ private void populateCatalog() throws DuplicateKeyException
{
TableMetadata table1 = TableBuilder.datasource("table1", "P1D")
.timeColumn()
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
storage.validate(table1);
storage.tables().create(table1);
TableMetadata table2 = TableBuilder.datasource("table2", "P1D")
.timeColumn()
- .column("dim", Columns.VARCHAR)
- .column("measure", "BIGINT")
+ .column("dim", Columns.STRING)
+ .column("measure", Columns.LONG)
.build();
storage.validate(table2);
storage.tables().create(table2);
@@ -211,7 +211,7 @@ private void populateCatalog() throws DuplicateKeyException
TableMetadata table3 = TableBuilder.external("table3")
.inputFormat(BaseExternTableTest.CSV_FORMAT)
.inputSource(toMap(new InlineInputSource("a\nc")))
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
storage.validate(table3);
storage.tables().create(table3);
@@ -230,9 +230,9 @@ private void verifyInitial(MetadataCatalog catalog)
List cols = dsSpec.columns();
assertEquals(2, cols.size());
assertEquals(Columns.TIME_COLUMN, cols.get(0).name());
- assertEquals(Columns.TIMESTAMP, cols.get(0).sqlType());
+ assertEquals(Columns.LONG, cols.get(0).dataType());
assertEquals("a", cols.get(1).name());
- assertEquals(Columns.VARCHAR, cols.get(1).sqlType());
+ assertEquals(Columns.STRING, cols.get(1).dataType());
DatasourceFacade ds = new DatasourceFacade(catalog.resolveTable(id));
assertEquals("P1D", ds.segmentGranularityString());
@@ -249,11 +249,11 @@ private void verifyInitial(MetadataCatalog catalog)
assertEquals(3, cols.size());
assertEquals("__time", cols.get(0).name());
assertEquals(Columns.TIME_COLUMN, cols.get(0).name());
- assertEquals(Columns.TIMESTAMP, cols.get(0).sqlType());
+ assertEquals(Columns.LONG, cols.get(0).dataType());
assertEquals("dim", cols.get(1).name());
- assertEquals(Columns.VARCHAR, cols.get(1).sqlType());
+ assertEquals(Columns.STRING, cols.get(1).dataType());
assertEquals("measure", cols.get(2).name());
- assertEquals("BIGINT", cols.get(2).sqlType());
+ assertEquals(Columns.LONG, cols.get(2).dataType());
DatasourceFacade ds = new DatasourceFacade(catalog.resolveTable(id));
assertEquals("P1D", ds.segmentGranularityString());
@@ -273,7 +273,7 @@ private void verifyInitial(MetadataCatalog catalog)
List cols = inputSpec.columns();
assertEquals(1, cols.size());
assertEquals("a", cols.get(0).name());
- assertEquals(Columns.VARCHAR, cols.get(0).sqlType());
+ assertEquals(Columns.STRING, cols.get(0).dataType());
assertNotNull(inputSpec.properties());
}
@@ -303,7 +303,7 @@ private void alterCatalog() throws DuplicateKeyException, NotFoundException
// Create a table 3
TableMetadata table3 = TableBuilder.datasource("table3", "P1D")
.timeColumn()
- .column("x", "FLOAT")
+ .column("x", Columns.FLOAT)
.build();
storage.tables().create(table3);
}
@@ -320,7 +320,7 @@ private void verifyAltered(MetadataCatalog catalog)
assertEquals(Columns.TIME_COLUMN, cols.get(0).name());
assertEquals("a", cols.get(1).name());
assertEquals("b", cols.get(2).name());
- assertEquals(Columns.DOUBLE, cols.get(2).sqlType());
+ assertEquals(Columns.DOUBLE, cols.get(2).dataType());
}
{
TableId id = TableId.datasource("table3");
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java
index 7a5b7f03b273..16f2dcaa50e6 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java
@@ -55,4 +55,14 @@ public MetadataCatalog catalog()
{
return catalog;
}
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void resync()
+ {
+ }
}
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java
index eb7a49f78a82..066d5a44cad1 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java
@@ -150,9 +150,9 @@ public void testCreate()
TableSpec inputSpec = TableBuilder.external("inline")
.inputSource(toMap(new InlineInputSource("a,b,1\nc,d,2\n")))
.inputFormat(BaseExternTableTest.CSV_FORMAT)
- .column("a", Columns.VARCHAR)
- .column("b", Columns.VARCHAR)
- .column("c", Columns.BIGINT)
+ .column("a", Columns.STRING)
+ .column("b", Columns.STRING)
+ .column("c", Columns.LONG)
.buildSpec();
resp = resource.postTable(TableId.EXTERNAL_SCHEMA, "inline", inputSpec, 0, false, postBy(CatalogTests.WRITER_USER));
assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java
index bfb8fa02b701..5426e365f1f8 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java
@@ -401,7 +401,7 @@ public void testUpdateColumns() throws CatalogException
// Add a column
cmd = new UpdateColumns(
Collections.singletonList(
- new ColumnSpec("d", Columns.VARCHAR, null)
+ new ColumnSpec("d", Columns.STRING, null)
)
);
TableMetadata revised = doEdit(tableName, cmd);
@@ -411,14 +411,14 @@ public void testUpdateColumns() throws CatalogException
);
ColumnSpec colD = revised.spec().columns().get(3);
assertEquals("d", colD.name());
- assertEquals(Columns.VARCHAR, colD.sqlType());
+ assertEquals(Columns.STRING, colD.dataType());
// Update a column
cmd = new UpdateColumns(
Collections.singletonList(
new ColumnSpec(
"a",
- Columns.BIGINT,
+ Columns.LONG,
ImmutableMap.of("foo", "bar")
)
)
@@ -430,13 +430,13 @@ public void testUpdateColumns() throws CatalogException
);
ColumnSpec colA = revised.spec().columns().get(0);
assertEquals("a", colA.name());
- assertEquals(Columns.BIGINT, colA.sqlType());
+ assertEquals(Columns.LONG, colA.dataType());
assertEquals(ImmutableMap.of("foo", "bar"), colA.properties());
// Duplicates
UpdateColumns cmd2 = new UpdateColumns(
Arrays.asList(
- new ColumnSpec("e", Columns.VARCHAR, null),
+ new ColumnSpec("e", Columns.STRING, null),
new ColumnSpec("e", null, null)
)
);
@@ -445,7 +445,7 @@ public void testUpdateColumns() throws CatalogException
// Valid time column type
cmd = new UpdateColumns(
Collections.singletonList(
- new ColumnSpec(Columns.TIME_COLUMN, Columns.TIMESTAMP, null)
+ new ColumnSpec(Columns.TIME_COLUMN, Columns.LONG, null)
)
);
revised = doEdit(tableName, cmd);
diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertFromTable-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertFromTable-logicalPlan.txt
new file mode 100644
index 000000000000..fffc3d7d9874
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertFromTable-logicalPlan.txt
@@ -0,0 +1,3 @@
+LogicalInsert(target=[dst], partitionedBy=['ALL TIME'], clusteredBy=[])
+ LogicalProject(__time=[$0], extra1=[$1], dim2=[$2], dim1=[$3], cnt=[$4], m1=[$5], extra2=[$6], extra3=[$7], m2=[$8])
+ LogicalTableScan(table=[[druid, foo]])
diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy-logicalPlan.txt
new file mode 100644
index 000000000000..71df2ac7e3f9
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy-logicalPlan.txt
@@ -0,0 +1,4 @@
+LogicalInsert(target=[druid.clusterBy], partitionedBy=[], clusteredBy=[])
+ LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
+ LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
+ LogicalTableScan(table=[[druid, foo]])
diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy2-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy2-logicalPlan.txt
new file mode 100644
index 000000000000..24e70f5c8bb7
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy2-logicalPlan.txt
@@ -0,0 +1,4 @@
+LogicalInsert(target=[druid.clusterBy], partitionedBy=[], clusteredBy=[`floor_m1`, `dim1` DESC])
+ LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
+ LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
+ LogicalTableScan(table=[[druid, foo]])
diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt
new file mode 100644
index 000000000000..71df2ac7e3f9
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt
@@ -0,0 +1,4 @@
+LogicalInsert(target=[druid.clusterBy], partitionedBy=[], clusteredBy=[])
+ LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
+ LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
+ LogicalTableScan(table=[[druid, foo]])
diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy2-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy2-logicalPlan.txt
new file mode 100644
index 000000000000..8aba08a54968
--- /dev/null
+++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy2-logicalPlan.txt
@@ -0,0 +1,4 @@
+LogicalInsert(target=[dst], partitionedBy=[FLOOR(`__time` TO DAY)], clusteredBy=[`floor_m1`, `dim1` DESC])
+ LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
+ LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
+ LogicalTableScan(table=[[druid, foo]])
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 36301a5fe0f9..3f5367b27c50 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -520,7 +520,8 @@ public String getFormatString()
new PlannerConfig(),
viewManager,
new NoopDruidSchemaManager(),
- CalciteTests.TEST_AUTHORIZER_MAPPER
+ CalciteTests.TEST_AUTHORIZER_MAPPER,
+ CatalogResolver.NULL_RESOLVER
);
final SqlEngine engine = new MSQTaskSqlEngine(
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
index dcbbbcfb7acf..c35bc1824bc9 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
@@ -68,8 +68,8 @@
public class S3InputSourceDefnTest
{
private static final List COLUMNS = Arrays.asList(
- new ColumnSpec("x", Columns.VARCHAR, null),
- new ColumnSpec("y", Columns.BIGINT, null)
+ new ColumnSpec("x", Columns.STRING, null),
+ new ColumnSpec("y", Columns.LONG, null)
);
/**
@@ -110,7 +110,7 @@ public void testValidateEmptyInputSource()
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -176,7 +176,7 @@ public void testValidateNoFormatWithColumns()
Collections.singletonList("s3://foo/bar/file.csv"), null, null, null);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -215,7 +215,7 @@ public void testValidateGood()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -228,7 +228,7 @@ public void testBucketOnly()
.inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "s3://foo.com")
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -243,7 +243,7 @@ public void testBucketAndUri()
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -262,7 +262,7 @@ public void testBucketAndPrefix()
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -281,7 +281,7 @@ public void testBucketAndObject()
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -298,7 +298,7 @@ public void testBucketAndGlob()
)
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -564,8 +564,8 @@ public void testFullTableSpecHappyPath()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -602,8 +602,8 @@ public void testTableSpecWithoutConfig()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -646,8 +646,8 @@ public void testTableSpecWithBucketAndFormat()
.inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
diff --git a/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java b/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java
index e66a584cae60..841d592062bf 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java
@@ -98,7 +98,7 @@ public static T safeCast(Object value, Class type, String key)
return type.cast(value);
}
catch (ClassCastException e) {
- throw new IAE("Value [%s] is not valid for property %s, expected type %s",
+ throw new IAE("Value [%s] is not valid for property [%s], expected type [%s]",
value,
key,
type.getSimpleName()
diff --git a/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java b/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java
index 38ac4fdbe804..cfc9fc55cf43 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java
@@ -35,8 +35,7 @@
import java.util.Objects;
/**
- * Specification of table columns. Columns have multiple types
- * represented via the type field.
+ * Specification of table columns.
*/
@UnstableApi
public class ColumnSpec
@@ -49,14 +48,10 @@ public class ColumnSpec
private final String name;
/**
- * The data type of the column expressed as a supported SQL type. The data type here must
- * directly match a Druid storage type. So, {@code BIGINT} for {code long}, say.
- * This usage does not support Druid's usual "fudging": one cannot use {@code INTEGER}
- * to mean {@code long}. The type will likely encode complex and aggregation types
- * in the future, though that is not yet supported. The set of valid mappings is
- * defined in the {@link Columns} class.
+ * The data type of the column expressed as a supported Druid type. The data type here must
+ * directly match a Druid storage type.
*/
- private final String sqlType;
+ private final String dataType;
/**
* Properties for the column. At present, these are all user and application defined.
@@ -69,18 +64,18 @@ public class ColumnSpec
@JsonCreator
public ColumnSpec(
@JsonProperty("name")final String name,
- @JsonProperty("sqlType") @Nullable final String sqlType,
+ @JsonProperty("dataType") @Nullable final String dataType,
@JsonProperty("properties") @Nullable final Map properties
)
{
this.name = name;
- this.sqlType = sqlType;
+ this.dataType = dataType;
this.properties = properties == null ? Collections.emptyMap() : properties;
}
public ColumnSpec(ColumnSpec from)
{
- this(from.name, from.sqlType, from.properties);
+ this(from.name, from.dataType, from.properties);
}
@JsonProperty("name")
@@ -89,11 +84,11 @@ public String name()
return name;
}
- @JsonProperty("sqlType")
+ @JsonProperty("dataType")
@JsonInclude(Include.NON_NULL)
- public String sqlType()
+ public String dataType()
{
- return sqlType;
+ return dataType;
}
@JsonProperty("properties")
@@ -108,6 +103,16 @@ public void validate()
if (Strings.isNullOrEmpty(name)) {
throw new IAE("Column name is required");
}
+ if (Columns.isTimeColumn(name)) {
+ if (dataType != null && !Columns.LONG.equalsIgnoreCase(dataType)) {
+ throw new IAE(
+ "[%s] column must have type [%s] or no type. Found [%s]",
+ name,
+ Columns.LONG,
+ dataType
+ );
+ }
+ }
// Validate type in the next PR
}
@@ -126,7 +131,7 @@ public ColumnSpec merge(
final ColumnSpec update
)
{
- String revisedType = update.sqlType() == null ? sqlType() : update.sqlType();
+ String revisedType = update.dataType() == null ? dataType() : update.dataType();
Map revisedProps = CatalogUtils.mergeProperties(
columnProperties,
properties(),
@@ -152,7 +157,7 @@ public boolean equals(Object o)
}
ColumnSpec other = (ColumnSpec) o;
return Objects.equals(this.name, other.name)
- && Objects.equals(this.sqlType, other.sqlType)
+ && Objects.equals(this.dataType, other.dataType)
&& Objects.equals(this.properties, other.properties);
}
@@ -161,7 +166,7 @@ public int hashCode()
{
return Objects.hash(
name,
- sqlType,
+ dataType,
properties
);
}
diff --git a/server/src/main/java/org/apache/druid/catalog/model/Columns.java b/server/src/main/java/org/apache/druid/catalog/model/Columns.java
index e19f0052ed5e..9d64a4ed22e9 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/Columns.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/Columns.java
@@ -20,94 +20,89 @@
package org.apache.druid.catalog.model;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class Columns
{
public static final String TIME_COLUMN = "__time";
- public static final String VARCHAR = "VARCHAR";
- public static final String BIGINT = "BIGINT";
- public static final String FLOAT = "FLOAT";
- public static final String DOUBLE = "DOUBLE";
- public static final String VARCHAR_ARRAY = "VARCHAR ARRAY";
- public static final String BIGINT_ARRAY = "BIGINT ARRAY";
- public static final String FLOAT_ARRAY = "FLOAT ARRAY";
- public static final String DOUBLE_ARRAY = "DOUBLE ARRAY";
- public static final String TIMESTAMP = "TIMESTAMP";
-
- public static final Set NUMERIC_TYPES =
- ImmutableSet.of(BIGINT, FLOAT, DOUBLE);
- public static final Set SCALAR_TYPES =
- ImmutableSet.of(TIMESTAMP, VARCHAR, BIGINT, FLOAT, DOUBLE);
+ public static final String STRING = ValueType.STRING.name();
+ public static final String LONG = ValueType.LONG.name();
+ public static final String FLOAT = ValueType.FLOAT.name();
+ public static final String DOUBLE = ValueType.DOUBLE.name();
+
+ public static final String SQL_VARCHAR = "VARCHAR";
+ public static final String SQL_BIGINT = "BIGINT";
+ public static final String SQL_FLOAT = "FLOAT";
+ public static final String SQL_DOUBLE = "DOUBLE";
+ public static final String SQL_VARCHAR_ARRAY = "VARCHAR ARRAY";
+ public static final String SQL_BIGINT_ARRAY = "BIGINT ARRAY";
+ public static final String SQL_FLOAT_ARRAY = "FLOAT ARRAY";
+ public static final String SQL_DOUBLE_ARRAY = "DOUBLE ARRAY";
+ public static final String SQL_TIMESTAMP = "TIMESTAMP";
public static final Map SQL_TO_DRUID_TYPES =
new ImmutableMap.Builder()
- .put(TIMESTAMP, ColumnType.LONG)
- .put(BIGINT, ColumnType.LONG)
- .put(FLOAT, ColumnType.FLOAT)
- .put(DOUBLE, ColumnType.DOUBLE)
- .put(VARCHAR, ColumnType.STRING)
- .put(VARCHAR_ARRAY, ColumnType.STRING_ARRAY)
- .put(BIGINT_ARRAY, ColumnType.LONG_ARRAY)
- .put(FLOAT_ARRAY, ColumnType.FLOAT_ARRAY)
- .put(DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY)
+ .put(SQL_TIMESTAMP, ColumnType.LONG)
+ .put(SQL_BIGINT, ColumnType.LONG)
+ .put(SQL_FLOAT, ColumnType.FLOAT)
+ .put(SQL_DOUBLE, ColumnType.DOUBLE)
+ .put(SQL_VARCHAR, ColumnType.STRING)
+ .put(SQL_VARCHAR_ARRAY, ColumnType.STRING_ARRAY)
+ .put(SQL_BIGINT_ARRAY, ColumnType.LONG_ARRAY)
+ .put(SQL_FLOAT_ARRAY, ColumnType.FLOAT_ARRAY)
+ .put(SQL_DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY)
.build();
public static final Map DRUID_TO_SQL_TYPES =
new ImmutableMap.Builder()
- .put(ColumnType.LONG, BIGINT)
+ .put(ColumnType.LONG, SQL_BIGINT)
.put(ColumnType.FLOAT, FLOAT)
.put(ColumnType.DOUBLE, DOUBLE)
- .put(ColumnType.STRING, VARCHAR)
- .put(ColumnType.STRING_ARRAY, VARCHAR_ARRAY)
- .put(ColumnType.LONG_ARRAY, BIGINT_ARRAY)
- .put(ColumnType.FLOAT_ARRAY, FLOAT_ARRAY)
- .put(ColumnType.DOUBLE_ARRAY, DOUBLE_ARRAY)
+ .put(ColumnType.STRING, SQL_VARCHAR)
+ .put(ColumnType.STRING_ARRAY, SQL_VARCHAR_ARRAY)
+ .put(ColumnType.LONG_ARRAY, SQL_BIGINT_ARRAY)
+ .put(ColumnType.FLOAT_ARRAY, SQL_FLOAT_ARRAY)
+ .put(ColumnType.DOUBLE_ARRAY, SQL_DOUBLE_ARRAY)
.build();
private Columns()
{
}
- public static boolean isTimestamp(String type)
- {
- return TIMESTAMP.equalsIgnoreCase(type.trim());
- }
-
- public static boolean isScalar(String type)
- {
- return SCALAR_TYPES.contains(StringUtils.toUpperCase(type.trim()));
- }
-
- public static ColumnType druidType(String sqlType)
+ public static ColumnType druidType(ColumnSpec spec)
{
- if (sqlType == null) {
+ if (isTimeColumn(spec.name())) {
+ return ColumnType.LONG;
+ }
+ String dataType = spec.dataType();
+ if (dataType == null) {
return null;
}
- ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(sqlType));
+ ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(dataType));
if (druidType != null) {
return druidType;
}
- return ColumnType.fromString(sqlType);
+ return ColumnType.fromString(dataType);
}
- public static void validateScalarColumn(String name, String type)
+ public static String sqlType(ColumnSpec spec)
{
- if (type == null) {
- return;
+ if (isTimeColumn(spec.name())) {
+ return SQL_TIMESTAMP;
}
- if (!Columns.isScalar(type)) {
- throw new IAE("Not a supported SQL type: " + type);
+ ColumnType druidType = druidType(spec);
+ if (druidType == null) {
+ return null;
}
+ String sqlType = DRUID_TO_SQL_TYPES.get(druidType);
+ return sqlType == null ? druidType.asTypeString() : sqlType;
}
public static boolean isTimeColumn(String name)
@@ -119,10 +114,7 @@ public static RowSignature convertSignature(List columns)
{
RowSignature.Builder builder = RowSignature.builder();
for (ColumnSpec col : columns) {
- ColumnType druidType = null;
- if (col.sqlType() != null) {
- druidType = Columns.druidType(col.sqlType());
- }
+ ColumnType druidType = druidType(col);
if (druidType == null) {
druidType = ColumnType.STRING;
}
diff --git a/server/src/main/java/org/apache/druid/catalog/model/TableId.java b/server/src/main/java/org/apache/druid/catalog/model/TableId.java
index 7e278e34397e..55fcc797561b 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/TableId.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/TableId.java
@@ -85,6 +85,11 @@ public String sqlName()
return StringUtils.format("\"%s\".\"%s\"", schema, name);
}
+ public String unquoted()
+ {
+ return StringUtils.format("%s.%s", schema, name);
+ }
+
@Override
public String toString()
{
diff --git a/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java b/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java
index 6a15b57716b1..7ac00d9b608a 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java
@@ -19,15 +19,21 @@
package org.apache.druid.catalog.model.facade;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.ColumnType;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
/**
* Convenience wrapper on top of a resolved table (a table spec and its corresponding
@@ -38,10 +44,77 @@ public class DatasourceFacade extends TableFacade
{
private static final Logger LOG = new Logger(DatasourceFacade.class);
+ public static class ColumnFacade
+ {
+ public enum Kind
+ {
+ ANY,
+ TIME,
+ DIMENSION,
+ MEASURE
+ }
+
+ private final ColumnSpec spec;
+ private final String sqlType;
+
+ public ColumnFacade(ColumnSpec spec)
+ {
+ this.spec = spec;
+ if (Columns.isTimeColumn(spec.name()) && spec.dataType() == null) {
+ // For __time only, force a type if type is null.
+ this.sqlType = Columns.LONG;
+ } else {
+ this.sqlType = Columns.sqlType(spec);
+ }
+ }
+
+ public ColumnSpec spec()
+ {
+ return spec;
+ }
+
+ public boolean hasType()
+ {
+ return sqlType != null;
+ }
+
+ public boolean isTime()
+ {
+ return Columns.isTimeColumn(spec.name());
+ }
+
+ public ColumnType druidType()
+ {
+ return Columns.druidType(spec);
+ }
+
+ public String sqlStorageType()
+ {
+ return sqlType;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{spec=" + spec + ", sqlTtype=" + sqlType + "}";
+ }
+ }
+
+ private final List columns;
+ private final Map columnIndex;
public DatasourceFacade(ResolvedTable resolved)
{
super(resolved);
+ this.columns = resolved.spec().columns()
+ .stream()
+ .map(col -> new ColumnFacade(col))
+ .collect(Collectors.toList());
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (ColumnFacade col : columns) {
+ builder.put(col.spec.name(), col);
+ }
+ columnIndex = builder.build();
}
public String segmentGranularityString()
@@ -89,4 +162,14 @@ public boolean isSealed()
{
return booleanProperty(DatasourceDefn.SEALED_PROPERTY);
}
+
+ public List columnFacades()
+ {
+ return columns;
+ }
+
+ public ColumnFacade column(String name)
+ {
+ return columnIndex.get(name);
+ }
}
diff --git a/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java b/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java
index f2571150305e..11c3d60def98 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java
@@ -22,7 +22,6 @@
import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@@ -40,7 +39,7 @@ public RowSignature rowSignature()
List columns = spec().columns();
RowSignature.Builder builder = RowSignature.builder();
for (ColumnSpec col : columns) {
- ColumnType druidType = Columns.SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(col.sqlType()));
+ ColumnType druidType = Columns.druidType(col);
if (druidType == null) {
druidType = ColumnType.STRING;
}
diff --git a/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java b/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java
index b159ec8ebe7a..c41e24837545 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java
@@ -61,11 +61,7 @@ public List columns()
public static ColumnType druidType(ColumnSpec col)
{
- if (Columns.isTimeColumn(col.name())) {
- return ColumnType.LONG;
- }
- final String sqlType = col.sqlType();
- return sqlType == null ? null : Columns.druidType(sqlType);
+ return Columns.druidType(col);
}
public ObjectMapper jsonMapper()
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
index 660e73c35239..b5e47e99b113 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
@@ -275,7 +275,7 @@ protected List selectPartialTableColumns(
return columns;
} else if (!CollectionUtils.isNullOrEmpty(columns)) {
throw new IAE(
- "Catalog definition for the %s input source already contains column definitions",
+ "Catalog definition for the [%s] input source already contains column definitions",
typeValue()
);
} else {
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java
index 97b080995aca..aa50dd3498e4 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java
@@ -68,16 +68,6 @@ public class DatasourceDefn extends TableDefn
*/
public static final String HIDDEN_COLUMNS_PROPERTY = "hiddenColumns";
- /**
- * By default: columns are optional hints. If a datasource has columns defined,
- * well validate them, but MSQ and other tools are free to create additional columns.
- * That is, we assume "auto-discovered" columns by default. However, in some use cases,
- * the schema may be carefully designed. This is especially true for ETL use cases in
- * which multiple input schemas are mapped into a single datasource schema designed for
- * ease of end user use. In this second use case, we may want to reject an attempt to
- * ingest columns other than those in the schema. To do that, set {@code sealed = true}.
- * In other words, "sealed" mode works like a traditional RDBMS.
- */
public static final String SEALED_PROPERTY = "sealed";
public static final String TABLE_TYPE = "datasource";
@@ -148,7 +138,7 @@ public DatasourceDefn()
protected void validateColumn(ColumnSpec spec)
{
super.validateColumn(spec);
- if (Columns.isTimeColumn(spec.name()) && spec.sqlType() != null) {
+ if (Columns.isTimeColumn(spec.name()) && spec.dataType() != null) {
// Validate type in next PR
}
}
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java
index 779d50b0ddbc..09a19d118276 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java
@@ -205,6 +205,11 @@ public class ExternalTableDefn extends TableDefn
*/
public static final String TABLE_TYPE = "extern";
+ /**
+ * Column type for external tables.
+ */
+ public static final String EXTERNAL_COLUMN_TYPE = "extern";
+
/**
* Property which holds the input source specification as serialized as JSON.
*/
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
index a00e71b89174..787e929d9b5b 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
@@ -109,7 +109,7 @@ protected List addFormatParameters(
toAdd.add(prop);
} else if (existing.type() != prop.type()) {
throw new ISE(
- "Format %s, property %s of class %s conflicts with another format property of class %s",
+ "Format [%s], property [%s] of class [%s] conflicts with another format property of class [%s]",
format.typeValue(),
prop.name(),
prop.type().sqlName(),
@@ -131,7 +131,7 @@ protected InputFormat convertTableToFormat(ResolvedExternalTable table)
final InputFormatDefn formatDefn = formats.get(formatTag);
if (formatDefn == null) {
throw new IAE(
- "Format type [%s] for property %s is not valid",
+ "Format type [%s] for property [%s] is not valid",
formatTag,
InputFormat.TYPE_PROPERTY
);
diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java b/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java
index 898eb53993b0..488de63190c5 100644
--- a/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java
+++ b/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java
@@ -188,7 +188,7 @@ public TableBuilder column(ColumnSpec column)
public TableBuilder timeColumn()
{
- return column(Columns.TIME_COLUMN, Columns.TIMESTAMP);
+ return column(Columns.TIME_COLUMN, Columns.LONG);
}
public TableBuilder column(String name, String sqlType)
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java
index 2ae285b96868..b47d9487c1a5 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java
@@ -39,8 +39,8 @@ public class BaseExternTableTest
{
public static final Map CSV_FORMAT = ImmutableMap.of("type", CsvInputFormat.TYPE_KEY);
protected static final List COLUMNS = Arrays.asList(
- new ColumnSpec("x", Columns.VARCHAR, null),
- new ColumnSpec("y", Columns.BIGINT, null)
+ new ColumnSpec("x", Columns.STRING, null),
+ new ColumnSpec("y", Columns.LONG, null)
);
protected final ObjectMapper mapper = DefaultObjectMapper.INSTANCE;
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java
index 91926210419a..b2995f1838be 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java
@@ -48,7 +48,7 @@ public void testDefaults()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(CSV_FORMAT)
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -70,8 +70,8 @@ public void testConversion()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
- .column("a", Columns.VARCHAR)
- .column("b", Columns.BIGINT)
+ .column("a", Columns.STRING)
+ .column("b", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java
index 5d5f7f8e0f39..974f1b0e1c79 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java
@@ -28,12 +28,13 @@
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefn;
import org.apache.druid.catalog.model.TableDefnRegistry;
-import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.TableSpec;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.ColumnType;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -49,6 +50,7 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -230,6 +232,8 @@ public void testColumns()
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
+ DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
+ assertTrue(facade.columnFacades().isEmpty());
}
// OK to have no column type
@@ -241,72 +245,45 @@ public void testColumns()
table.validate();
DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
- assertNotNull(facade.jsonMapper());
- assertEquals(1, facade.properties().size());
+ assertEquals(1, facade.columnFacades().size());
+ ColumnFacade col = facade.columnFacades().get(0);
+ assertSame(spec.columns().get(0), col.spec());
+ assertFalse(col.isTime());
+ assertFalse(col.hasType());
+ assertNull(col.druidType());
}
// Can have a legal scalar type
{
TableSpec spec = builder.copy()
- .column("foo", Columns.VARCHAR)
+ .column("foo", Columns.STRING)
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
+ DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
+ assertEquals(1, facade.columnFacades().size());
+ ColumnFacade col = facade.columnFacades().get(0);
+ assertSame(spec.columns().get(0), col.spec());
+ assertFalse(col.isTime());
+ assertTrue(col.hasType());
+ assertSame(ColumnType.STRING, col.druidType());
}
// Reject duplicate columns
{
TableSpec spec = builder.copy()
- .column("foo", Columns.VARCHAR)
- .column("bar", Columns.BIGINT)
+ .column("foo", Columns.STRING)
+ .column("bar", Columns.LONG)
.buildSpec();
expectValidationSucceeds(spec);
}
{
TableSpec spec = builder.copy()
- .column("foo", Columns.VARCHAR)
- .column("foo", Columns.BIGINT)
+ .column("foo", Columns.STRING)
+ .column("foo", Columns.LONG)
.buildSpec();
expectValidationFails(spec);
}
- {
- TableSpec spec = builder.copy()
- .column(Columns.TIME_COLUMN, null)
- .column("s", Columns.VARCHAR)
- .column("bi", Columns.BIGINT)
- .column("f", Columns.FLOAT)
- .column("d", Columns.DOUBLE)
- .buildSpec();
- ResolvedTable table = registry.resolve(spec);
- table.validate();
- }
- }
-
- @Test
- public void testRollup()
- {
- TableMetadata table = TableBuilder.datasource("foo", "P1D")
- .column(Columns.TIME_COLUMN, "TIMESTAMP('PT1M')")
- .column("a", null)
- .column("b", Columns.VARCHAR)
- .column("c", "SUM(BIGINT)")
- .build();
-
- table.validate();
- List columns = table.spec().columns();
-
- assertEquals(4, columns.size());
- assertEquals(Columns.TIME_COLUMN, columns.get(0).name());
- assertEquals("TIMESTAMP('PT1M')", columns.get(0).sqlType());
-
- assertEquals("a", columns.get(1).name());
- assertNull(columns.get(1).sqlType());
-
- assertEquals("b", columns.get(2).name());
- assertEquals(Columns.VARCHAR, columns.get(2).sqlType());
-
- assertEquals("c", columns.get(3).name());
- assertEquals("SUM(BIGINT)", columns.get(3).sqlType());
}
@Test
@@ -321,6 +298,14 @@ public void testTimeColumn()
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
+
+ DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
+ assertEquals(1, facade.columnFacades().size());
+ ColumnFacade col = facade.columnFacades().get(0);
+ assertSame(spec.columns().get(0), col.spec());
+ assertTrue(col.isTime());
+ assertTrue(col.hasType());
+ assertSame(ColumnType.LONG, col.druidType());
}
// Time column can only have TIMESTAMP type
@@ -330,14 +315,20 @@ public void testTimeColumn()
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
+ DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
+ assertEquals(1, facade.columnFacades().size());
+ ColumnFacade col = facade.columnFacades().get(0);
+ assertSame(spec.columns().get(0), col.spec());
+ assertTrue(col.isTime());
+ assertTrue(col.hasType());
+ assertSame(ColumnType.LONG, col.druidType());
}
{
TableSpec spec = builder.copy()
- .column(Columns.TIME_COLUMN, "TIMESTAMP('PT5M')")
+ .column(Columns.TIME_COLUMN, Columns.STRING)
.buildSpec();
- ResolvedTable table = registry.resolve(spec);
- table.validate();
+ expectValidationFails(spec);
}
}
@@ -365,7 +356,7 @@ private TableSpec exampleSpec()
.property("tag1", "some value")
.property("tag2", "second value")
.column(new ColumnSpec("a", null, colProps))
- .column("b", Columns.VARCHAR)
+ .column("b", Columns.STRING)
.buildSpec();
// Sanity check
@@ -493,7 +484,7 @@ public void testMergeColsWithEmptyList()
List colUpdates = Collections.singletonList(
new ColumnSpec(
"a",
- Columns.BIGINT,
+ Columns.LONG,
null
)
);
@@ -502,7 +493,7 @@ public void testMergeColsWithEmptyList()
List columns = merged.columns();
assertEquals(1, columns.size());
assertEquals("a", columns.get(0).name());
- assertEquals(Columns.BIGINT, columns.get(0).sqlType());
+ assertEquals(Columns.LONG, columns.get(0).dataType());
}
@Test
@@ -521,12 +512,12 @@ public void testMergeCols()
List colUpdates = Arrays.asList(
new ColumnSpec(
"a",
- Columns.BIGINT,
+ Columns.LONG,
updatedProps
),
new ColumnSpec(
"c",
- Columns.VARCHAR,
+ Columns.STRING,
null
)
);
@@ -537,14 +528,14 @@ public void testMergeCols()
List columns = merged.columns();
assertEquals(3, columns.size());
assertEquals("a", columns.get(0).name());
- assertEquals(Columns.BIGINT, columns.get(0).sqlType());
+ assertEquals(Columns.LONG, columns.get(0).dataType());
Map colProps = columns.get(0).properties();
assertEquals(2, colProps.size());
assertEquals("new value", colProps.get("colProp1"));
assertEquals("third value", colProps.get("tag3"));
assertEquals("c", columns.get(2).name());
- assertEquals(Columns.VARCHAR, columns.get(2).sqlType());
+ assertEquals(Columns.STRING, columns.get(2).dataType());
}
/**
@@ -560,9 +551,9 @@ public void docExample()
.description("Web server performance metrics")
.property(DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000)
.hiddenColumns("foo", "bar")
- .column("__time", Columns.TIMESTAMP)
- .column("host", Columns.VARCHAR, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "The web server host"))
- .column("bytesSent", Columns.BIGINT, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "Number of response bytes sent"))
+ .column("__time", Columns.LONG)
+ .column("host", Columns.STRING, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "The web server host"))
+ .column("bytesSent", Columns.LONG, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "Number of response bytes sent"))
.clusterColumns(new ClusterKeySpec("a", false), new ClusterKeySpec("b", true))
.sealed(true)
.buildSpec();
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java
index db573563eecb..04494ec7c343 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java
@@ -55,7 +55,7 @@ public void testDefaults()
DelimitedFormatDefn.DELIMITER_FIELD, "|"
)
)
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -78,8 +78,8 @@ public void testConversion()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
- .column("a", Columns.VARCHAR)
- .column("b", Columns.BIGINT)
+ .column("a", Columns.STRING)
+ .column("b", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
index 96afcad9a8cc..8c8129bf0fcc 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java
@@ -126,7 +126,7 @@ public void testValidateSourceAndFormat()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -147,16 +147,16 @@ public void wikipediaDocExample()
.inputSource(toMap(inputSource))
.inputFormat(formatToMap(format))
.description("Sample Wikipedia data")
- .column("timetamp", Columns.VARCHAR)
- .column("page", Columns.VARCHAR)
- .column("language", Columns.VARCHAR)
- .column("unpatrolled", Columns.VARCHAR)
- .column("newPage", Columns.VARCHAR)
- .column("robot", Columns.VARCHAR)
- .column("added", Columns.VARCHAR)
- .column("namespace", Columns.BIGINT)
- .column("deleted", Columns.BIGINT)
- .column("delta", Columns.BIGINT)
+ .column("timetamp", Columns.STRING)
+ .column("page", Columns.STRING)
+ .column("language", Columns.STRING)
+ .column("unpatrolled", Columns.STRING)
+ .column("newPage", Columns.STRING)
+ .column("robot", Columns.STRING)
+ .column("added", Columns.STRING)
+ .column("namespace", Columns.LONG)
+ .column("deleted", Columns.LONG)
+ .column("delta", Columns.LONG)
.build();
LOG.info(table.spec().toString());
}
@@ -179,9 +179,9 @@ public void httpDocExample() throws URISyntaxException
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "https://example.com/{}")
.description("Example parameterized external table")
- .column("timetamp", Columns.VARCHAR)
- .column("metric", Columns.VARCHAR)
- .column("value", Columns.BIGINT)
+ .column("timetamp", Columns.STRING)
+ .column("metric", Columns.STRING)
+ .column("value", Columns.LONG)
.build();
LOG.info(table.spec().toString());
}
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 215c9e0c62e6..8a6385db59ce 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -68,8 +68,8 @@ public void testEmptyInputSource()
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -83,8 +83,8 @@ public void testInvalidTemplate()
.inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://example.com/")
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -103,8 +103,8 @@ public void testNoFormatWithURI() throws URISyntaxException
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -221,8 +221,8 @@ public void testFullTableSpecHappyPath() throws URISyntaxException
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -256,8 +256,8 @@ public void testTemplateSpecWithFormatHappyPath()
.inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -305,8 +305,8 @@ public void testTemplateSpecWithFormatAndPassword()
))
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
table.validate();
@@ -387,8 +387,8 @@ public void testMultipleURIsInTableSpec() throws URISyntaxException
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -421,8 +421,8 @@ public void testMultipleURIsWithTemplate() throws URISyntaxException
.inputSource(httpToMap(inputSource))
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -489,8 +489,8 @@ public void testEnvPassword() throws URISyntaxException
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
index 448e353664dc..bb3b2354bac9 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
@@ -51,7 +51,7 @@ public void testValidateEmptyInputSource()
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", InlineInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -63,7 +63,7 @@ public void testValidateNoFormat()
// No format: not valid. For inline, format must be provided to match data
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -86,7 +86,7 @@ public void testValidateGood()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
+ .column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -130,8 +130,8 @@ public void testValidAdHocFn()
args.put(InlineInputSourceDefn.DATA_PROPERTY, Arrays.asList("a,b", "c,d"));
args.put(FormattedInputSourceDefn.FORMAT_PARAMETER, CsvFormatDefn.TYPE_KEY);
final List columns = Arrays.asList(
- new ColumnSpec("a", Columns.VARCHAR, null),
- new ColumnSpec("b", Columns.VARCHAR, null)
+ new ColumnSpec("a", Columns.STRING, null),
+ new ColumnSpec("b", Columns.STRING, null)
);
final TableFunction fn = defn.adHocTableFn();
@@ -157,8 +157,8 @@ public void testPartialTable()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a,b\nc,d\n")))
.inputFormat(CSV_FORMAT)
- .column("a", Columns.VARCHAR)
- .column("b", Columns.VARCHAR)
+ .column("a", Columns.STRING)
+ .column("b", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -183,8 +183,8 @@ public void testPartialTable()
// Cannot supply columns with the function
List columns = Arrays.asList(
- new ColumnSpec("a", Columns.VARCHAR, null),
- new ColumnSpec("b", Columns.VARCHAR, null)
+ new ColumnSpec("a", Columns.STRING, null),
+ new ColumnSpec("b", Columns.STRING, null)
);
assertThrows(IAE.class, () -> fn.apply("x", new HashMap<>(), columns, mapper));
}
@@ -198,8 +198,8 @@ public void testDefinedTable()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a,b\nc,d")))
.inputFormat(formatToMap(format))
- .column("a", Columns.VARCHAR)
- .column("b", Columns.VARCHAR)
+ .column("a", Columns.STRING)
+ .column("b", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java
index ecc88db3f990..2c42b671da83 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java
@@ -48,7 +48,7 @@ public void testDefaults()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(ImmutableMap.of("type", JsonInputFormat.TYPE_KEY))
- .column("a", Columns.VARCHAR)
+ .column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -70,8 +70,8 @@ public void testConversion()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
- .column("a", Columns.VARCHAR)
- .column("b", Columns.BIGINT)
+ .column("a", Columns.STRING)
+ .column("b", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
index 3994bf011258..32f9b0641b87 100644
--- a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
+++ b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
@@ -61,8 +61,8 @@ public void testValidateEmptyInputSource()
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", LocalInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -75,8 +75,8 @@ public void testValidateNoFormat()
LocalInputSource inputSource = new LocalInputSource(new File("/tmp"), "*");
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -129,8 +129,8 @@ public void testValidateBaseDirWithFormat()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -150,8 +150,8 @@ public void testValidateFilesWithFormat()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@@ -169,8 +169,8 @@ public void testBaseDirAndFiles()
TableMetadata table = TableBuilder.external("foo")
.inputSource(source)
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@@ -321,8 +321,8 @@ public void testFullyDefinedBaseDirAndPattern()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -373,8 +373,8 @@ public void testFullyDefinedFiles()
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
@@ -418,8 +418,8 @@ public void testBaseDirAndFormat()
TableMetadata table = TableBuilder.external("foo")
.inputSource(BASE_DIR_ONLY)
.inputFormat(CSV_FORMAT)
- .column("x", Columns.VARCHAR)
- .column("y", Columns.BIGINT)
+ .column("x", Columns.STRING)
+ .column("y", Columns.LONG)
.build();
// Check validation
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 580b9acbb520..4e8431cd5c67 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.schema;
import org.apache.calcite.schema.Table;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import javax.inject.Inject;
@@ -29,14 +30,17 @@ public class DruidSchema extends AbstractTableSchema
{
private final BrokerSegmentMetadataCache segmentMetadataCache;
private final DruidSchemaManager druidSchemaManager;
+ private final CatalogResolver catalogResolver;
@Inject
public DruidSchema(
final BrokerSegmentMetadataCache segmentMetadataCache,
- final DruidSchemaManager druidSchemaManager
+ final DruidSchemaManager druidSchemaManager,
+ final CatalogResolver catalogResolver
)
{
this.segmentMetadataCache = segmentMetadataCache;
+ this.catalogResolver = catalogResolver;
if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) {
this.druidSchemaManager = druidSchemaManager;
} else {
@@ -56,7 +60,7 @@ public Table getTable(String name)
return druidSchemaManager.getTable(name);
} else {
DatasourceTable.PhysicalDatasourceMetadata dsMetadata = segmentMetadataCache.getDatasource(name);
- return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
+ return catalogResolver.resolveDatasource(name, dsMetadata);
}
}
@@ -66,7 +70,7 @@ public Set getTableNames()
if (druidSchemaManager != null) {
return druidSchemaManager.getTableNames();
} else {
- return segmentMetadataCache.getDatasourceNames();
+ return catalogResolver.getTableNames(segmentMetadataCache.getDatasourceNames());
}
}
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java
index eeac85a2c1ed..57cecf50645e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java
@@ -23,11 +23,15 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.DataSourceInformation;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
/**
@@ -79,6 +83,24 @@ public boolean isBroadcast()
return broadcast;
}
+ public Map toEffectiveColumns()
+ {
+ Map columns = new HashMap<>();
+ for (int i = 0; i < getRowSignature().size(); i++) {
+ String colName = getRowSignature().getColumnName(i);
+ ColumnType colType = getRowSignature().getColumnType(i).get();
+
+ EffectiveColumnMetadata colMetadata = EffectiveColumnMetadata.fromPhysical(colName, colType);
+ columns.put(colName, colMetadata);
+ }
+ return columns;
+ }
+
+ public EffectiveMetadata toEffectiveMetadata()
+ {
+ return new EffectiveMetadata(null, toEffectiveColumns(), false);
+ }
+
@Override
public boolean equals(Object o)
{
@@ -115,14 +137,107 @@ public String toString()
}
}
+ public static class EffectiveColumnMetadata
+ {
+ protected final String name;
+ protected final ColumnType type;
+
+ public EffectiveColumnMetadata(String name, ColumnType type)
+ {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String name()
+ {
+ return name;
+ }
+
+ public ColumnType druidType()
+ {
+ return type;
+ }
+
+ public static EffectiveColumnMetadata fromPhysical(String name, ColumnType type)
+ {
+ return new EffectiveColumnMetadata(name, type);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Column{" +
+ "name=" + name +
+ ", type=" + type.asTypeString() +
+ "}";
+ }
+ }
+
+ public static class EffectiveMetadata
+ {
+ private final DatasourceFacade catalogMetadata;
+ private final boolean isEmpty;
+ private final Map columns;
+
+ public EffectiveMetadata(
+ final DatasourceFacade catalogMetadata,
+ final Map columns,
+ final boolean isEmpty
+ )
+ {
+ this.catalogMetadata = catalogMetadata;
+ this.isEmpty = isEmpty;
+ this.columns = columns;
+ }
+
+ public DatasourceFacade catalogMetadata()
+ {
+ return catalogMetadata;
+ }
+
+ public EffectiveColumnMetadata column(String name)
+ {
+ return columns.get(name);
+ }
+
+ public boolean isEmpty()
+ {
+ return isEmpty;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "empty=" + isEmpty +
+ ", columns=" + columns +
+ "}";
+ }
+ }
+
private final PhysicalDatasourceMetadata physicalMetadata;
+ private final EffectiveMetadata effectiveMetadata;
public DatasourceTable(
final PhysicalDatasourceMetadata physicalMetadata
)
{
- super(physicalMetadata.getRowSignature());
+ this(
+ physicalMetadata.getRowSignature(),
+ physicalMetadata,
+ physicalMetadata.toEffectiveMetadata()
+ );
+ }
+
+ public DatasourceTable(
+ final RowSignature rowSignature,
+ final PhysicalDatasourceMetadata physicalMetadata,
+ final EffectiveMetadata effectiveMetadata
+ )
+ {
+ super(rowSignature);
this.physicalMetadata = physicalMetadata;
+ this.effectiveMetadata = effectiveMetadata;
}
@Override
@@ -143,6 +258,11 @@ public boolean isBroadcast()
return physicalMetadata.isBroadcast();
}
+ public EffectiveMetadata effectiveMetadata()
+ {
+ return effectiveMetadata;
+ }
+
@Override
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table)
{
@@ -176,9 +296,10 @@ public int hashCode()
public String toString()
{
// Don't include the row signature: it is the same as in
- // physicalMetadata.
- return "DruidTable{" +
- physicalMetadata +
+ // effectiveMetadata.
+ return "DruidTable{physicalMetadata=" +
+ (physicalMetadata == null ? "null" : physicalMetadata.toString()) +
+ ", effectiveMetadata=" + effectiveMetadata +
'}';
}
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
index 558625a10b36..ff29a8743242 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
@@ -481,9 +481,9 @@ protected String externClauseFromSig(final ExternalDataSource externalDataSource
buf.append(sig.getColumnName(i)).append(" ");
ColumnType type = sig.getColumnType(i).get();
if (type == ColumnType.STRING) {
- buf.append(Columns.VARCHAR);
+ buf.append(Columns.SQL_VARCHAR);
} else if (type == ColumnType.LONG) {
- buf.append(Columns.BIGINT);
+ buf.append(Columns.SQL_BIGINT);
} else if (type == ColumnType.DOUBLE) {
buf.append(Columns.DOUBLE);
} else if (type == ColumnType.FLOAT) {
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java
index 540b39e63899..e9c5d773fefa 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java
@@ -117,6 +117,7 @@ public SqlTestFramework get()
private SqlTestFramework createFramework(SqlTestFrameworkConfig config)
{
SqlTestFramework.Builder builder = new SqlTestFramework.Builder(testHost)
+ .catalogResolver(testHost.createCatalogResolver())
.minTopNThreshold(config.minTopNThreshold())
.mergeBufferCount(config.numMergeBuffers());
return builder.build();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
index cebb9e5446f8..08f16ae9307f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
@@ -31,6 +31,7 @@
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestTimelineServerView;
@@ -67,7 +68,7 @@ public void testInitializationWithNoData() throws Exception
cache.start();
cache.awaitInitialization();
- final DruidSchema druidSchema = new DruidSchema(cache, null);
+ final DruidSchema druidSchema = new DruidSchema(cache, null, CatalogResolver.NULL_RESOLVER);
Assert.assertEquals(ImmutableSet.of(), druidSchema.getTableNames());
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java
index 43551b3e0403..f2ac64826ee8 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java
@@ -38,6 +38,7 @@
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -72,7 +73,8 @@ public void setUp()
new PlannerConfig(),
null,
new NoopDruidSchemaManager(),
- CalciteTests.TEST_AUTHORIZER_MAPPER
+ CalciteTests.TEST_AUTHORIZER_MAPPER,
+ CatalogResolver.NULL_RESOLVER
);
informationSchema = new InformationSchema(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index e9cb1430a544..e0a3bb5b266a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -95,6 +95,7 @@
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
@@ -266,7 +267,7 @@ public void setUp() throws Exception
);
cache.start();
cache.awaitInitialization();
- druidSchema = new DruidSchema(cache, null);
+ druidSchema = new DruidSchema(cache, null, CatalogResolver.NULL_RESOLVER);
metadataView = EasyMock.createMock(MetadataSegmentView.class);
druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java
index c3d2dd9b2416..040912fd6133 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java
@@ -50,6 +50,7 @@
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
@@ -131,14 +132,16 @@ public static DruidSchemaCatalog createMockRootSchema(
final PlannerConfig plannerConfig,
@Nullable final ViewManager viewManager,
final DruidSchemaManager druidSchemaManager,
- final AuthorizerMapper authorizerMapper
+ final AuthorizerMapper authorizerMapper,
+ final CatalogResolver catalogResolver
)
{
DruidSchema druidSchema = createMockSchema(
injector,
conglomerate,
walker,
- druidSchemaManager
+ druidSchemaManager,
+ catalogResolver
);
SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, authorizerMapper);
@@ -193,7 +196,8 @@ public static DruidSchemaCatalog createMockRootSchema(
plannerConfig,
null,
new NoopDruidSchemaManager(),
- authorizerMapper
+ authorizerMapper,
+ CatalogResolver.NULL_RESOLVER
);
}
@@ -201,7 +205,8 @@ private static DruidSchema createMockSchema(
final Injector injector,
final QueryRunnerFactoryConglomerate conglomerate,
final SpecificSegmentsQuerySegmentWalker walker,
- final DruidSchemaManager druidSchemaManager
+ final DruidSchemaManager druidSchemaManager,
+ final CatalogResolver catalog
)
{
final BrokerSegmentMetadataCache cache = new BrokerSegmentMetadataCache(
@@ -220,7 +225,8 @@ public Set getDataSourceNames()
{
return ImmutableSet.of(CalciteTests.BROADCAST_DATASOURCE);
}
- }),
+ }
+ ),
null
);
@@ -233,7 +239,7 @@ public Set getDataSourceNames()
}
cache.stop();
- return new DruidSchema(cache, druidSchemaManager);
+ return new DruidSchema(cache, druidSchemaManager, catalog);
}
public static JoinableFactory createDefaultJoinableFactory(Injector injector)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
index 5db5090cb2a1..fa7055d3e582 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
@@ -155,6 +155,11 @@ SqlEngine createEngine(
Injector injector
);
+ default CatalogResolver createCatalogResolver()
+ {
+ return CatalogResolver.NULL_RESOLVER;
+ }
+
/**
* Configure the JSON mapper.
*
@@ -419,7 +424,8 @@ public PlannerFixture(
plannerConfig,
viewManager,
componentSupplier.createSchemaManager(),
- framework.authorizerMapper
+ framework.authorizerMapper,
+ framework.builder.catalogResolver
);
this.plannerFactory = new PlannerFactory(