Allow broker to use catalog for datasource schemas for SQL queries#15469
Allow broker to use catalog for datasource schemas for SQL queries#15469jon-wei merged 3 commits intoapache:masterfrom
Conversation
| SqlSchema schema = SqlSchema.builder() | ||
| .column("__time", "TIMESTAMP(3) NOT NULL") | ||
| .column("extra1", "VARCHAR") | ||
| .column("dim2", "VARCHAR") | ||
| .column("dim1", "VARCHAR") | ||
| .column("cnt", "BIGINT NOT NULL") | ||
| .column("m1", "DOUBLE NOT NULL") | ||
| .column("extra2", "BIGINT NOT NULL") | ||
| .column("extra3", "VARCHAR") | ||
| .column("m2", "DOUBLE NOT NULL") | ||
| .build(); |
Check notice
Code scanning / CodeQL
Unread local variable
8eb5d50 to
7f2bf29
Compare
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
||
| public class LiveCatalogResolver implements CatalogResolver |
| /** | ||
| * Create a {@link DruidTable} based on the physical segments, catalog entry, or both. | ||
| */ | ||
| @Override |
| columns.put(col.name(), colMetadata); | ||
| } | ||
|
|
||
| // Mark any hidden columns. Assumes that the hidden columns are a disjoint set |
There was a problem hiding this comment.
does anything enforce this disjoint property? What if hidden columns are also defined?
There was a problem hiding this comment.
There isn't enforcement of this right now, as-is if hidden columns overlap with the declared columns, the hiding would be ignored (the user would need to remove the column from the declared set to have the hiding take effect)
| // Merge columns. All catalog-defined columns come first, | ||
| // in the order defined in the catalog. | ||
| final RowSignature.Builder builder = RowSignature.builder(); | ||
| Map<String, EffectiveColumnMetadata> columns = new HashMap<>(); |
There was a problem hiding this comment.
Does the insertion of keys, values into hashMap preserve order that it was added? I assume that the order of columns of the effectiveMetadata is important?
There was a problem hiding this comment.
columns is just used as column name lookup, the order-preserving component in a DatasourceTable is the RowSignature which stores the columns in a list
| public List<TableMetadata> tablesForSchema(String dbSchema) | ||
| { | ||
| String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{dbSchema}", dbSchema); | ||
| String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{schema}", dbSchema); |
There was a problem hiding this comment.
nit: Good catch. should we move "{schema}" to a constant here?
| public TableMetadata table(TableId id) | ||
| { | ||
| String url = StringUtils.replace(TABLE_SYNC_PATH, "{dbSchema}", id.schema()); | ||
| String url = StringUtils.replace(TABLE_SYNC_PATH, "{schema}", id.schema()); |
| * Coordinator. To prevent slowing initial queries, this class loads the | ||
| * current catalog contents into the local cache on lifecycle start, which | ||
| * avoids the on-demand reads that would otherwise occur. After the first load, | ||
| * events from the Coordinator keep the local cache evergreen. |
There was a problem hiding this comment.
Where are events from the coordinator pushed after this receiver loads on lifecycle start? May be good to note that here.
| * directly match a Druid storage type. | ||
| */ | ||
| private final String sqlType; | ||
| private final String dataType; |
There was a problem hiding this comment.
What is the reasoning behind changing from sql type to druid type here?
There was a problem hiding this comment.
It now accepts both SQL and Druid native type strings in the catalog definition
| .put(VARCHAR, ColumnType.STRING) | ||
| .build(); | ||
| .put(SQL_BIGINT, ColumnType.LONG) | ||
| .put(SQL_VARCHAR, ColumnType.STRING) |
There was a problem hiding this comment.
Do you mean to remove mappings for SQL_FLOAT and SQL_DOUBLE and TIMESTAMP here?
There was a problem hiding this comment.
This part wasn't being used actually right now, removed it
| return null; | ||
| } | ||
| ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(sqlType)); | ||
| ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(dataType)); |
There was a problem hiding this comment.
It looks like we changed the dataType of columnSpec to be the druid type instead of the sql typ now, so should we take the type as is now instead of looking up mapping here?
There was a problem hiding this comment.
It checks if it's a SQL type first and attempts to convert, and then does ColumnType.fromString(dataType) if not to treat it as a Druid native type
| this.spec = spec; | ||
| if (Columns.isTimeColumn(spec.name()) && spec.dataType() == null) { | ||
| // For __time only, force a type if type is null. | ||
| this.sqlType = Columns.LONG; |
There was a problem hiding this comment.
Should this be COLUMNS.SQL_BIGINT?
There was a problem hiding this comment.
No, this is using the native type
| public static final String HIDDEN_COLUMNS_PROPERTY = "hiddenColumns"; | ||
|
|
||
| /** | ||
| * By default: columns are optional hints. If a datasource has columns defined, |
There was a problem hiding this comment.
Why does this description no longer apply?
There was a problem hiding this comment.
I'm not sure why this was removed in Paul's original PR
| /** | ||
| * Column type for external tables. | ||
| */ | ||
| public static final String EXTERNAL_COLUMN_TYPE = "extern"; |
There was a problem hiding this comment.
This is in another part of the original PR related to external table handling, it's not used currently but future changes will
| return columns.get(name); | ||
| } | ||
|
|
||
| public boolean isEmpty() |
There was a problem hiding this comment.
Does isEmpty signify whether or not the table had any data in it or not?
There was a problem hiding this comment.
Yes, it's set to true when there are no physical segments
7f2bf29 to
998898e
Compare
This PR contains a portion of the changes from the inactive draft PR for integrating the catalog with the Calcite planner #13686 from @paul-rogers, allowing the datasource table schemas defined in the catalog to be synced to the broker.
This does not include the MSQ INSERT and REPLACE validation using the catalog functionality from #13686, this PR is just the portion of changes needed to sync the queryable schemas to the broker.
This PR has: