-
Notifications
You must be signed in to change notification settings - Fork 17.4k
Adding support for Pinot #6719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding support for Pinot #6719
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| # pylint: disable=C,R,W | ||
| from collections import OrderedDict | ||
| from collections import namedtuple, OrderedDict | ||
| from datetime import datetime | ||
| import logging | ||
|
|
||
|
|
@@ -46,6 +46,9 @@ | |
| config = app.config | ||
| metadata = Model.metadata # pylint: disable=no-member | ||
|
|
||
| SqlaQuery = namedtuple('SqlaQuery', ['sqla_query', 'labels_expected']) | ||
| QueryStringExtended = namedtuple('QueryStringExtended', ['sql', 'labels_expected']) | ||
|
|
||
|
|
||
| class AnnotationDatasource(BaseDatasource): | ||
| """ Dummy object so we can query annotations using 'Viz' objects just like | ||
|
|
@@ -141,23 +144,20 @@ def get_timestamp_expression(self, time_grain): | |
| """Getting the time component of the query""" | ||
| label = self.table.get_label(utils.DTTM_ALIAS) | ||
|
|
||
| db = self.table.database | ||
| pdf = self.python_date_format | ||
| is_epoch = pdf in ('epoch_s', 'epoch_ms') | ||
| if not self.expression and not time_grain and not is_epoch: | ||
| return column(self.column_name, type_=DateTime).label(label) | ||
|
|
||
| expr = self.expression or self.column_name | ||
| if is_epoch: | ||
| # if epoch, translate to DATE using db specific conf | ||
| db_spec = self.table.database.db_engine_spec | ||
| if pdf == 'epoch_s': | ||
| expr = db_spec.epoch_to_dttm().format(col=expr) | ||
| elif pdf == 'epoch_ms': | ||
| expr = db_spec.epoch_ms_to_dttm().format(col=expr) | ||
| grain = None | ||
| if time_grain: | ||
| grain = self.table.database.grains_dict().get(time_grain) | ||
| if grain: | ||
| expr = grain.function.format(col=expr) | ||
| grain = db.grains_dict().get(time_grain) | ||
| if not grain: | ||
| raise NotImplementedError( | ||
| f'No grain spec for {time_grain} for database {db.database_name}') | ||
| expr = db.db_engine_spec.get_time_expr( | ||
| self.expression or self.column_name, | ||
| pdf, time_grain, grain) | ||
| return literal_column(expr, type_=DateTime).label(label) | ||
|
|
||
| @classmethod | ||
|
|
@@ -476,15 +476,18 @@ def get_template_processor(self, **kwargs): | |
| return get_template_processor( | ||
| table=self, database=self.database, **kwargs) | ||
|
|
||
| def get_query_str(self, query_obj): | ||
| qry = self.get_sqla_query(**query_obj) | ||
| sql = self.database.compile_sqla_query(qry) | ||
| def get_query_str_extended(self, query_obj): | ||
| sqlaq = self.get_sqla_query(**query_obj) | ||
| sql = self.database.compile_sqla_query(sqlaq.sqla_query) | ||
| logging.info(sql) | ||
| sql = sqlparse.format(sql, reindent=True) | ||
| if query_obj['is_prequery']: | ||
| query_obj['prequeries'].append(sql) | ||
| sql = self.mutate_query_from_config(sql) | ||
| return sql | ||
| return QueryStringExtended(labels_expected=sqlaq.labels_expected, sql=sql) | ||
|
|
||
| def get_query_str(self, query_obj): | ||
| return self.get_query_str_extended(query_obj).sql | ||
|
|
||
| def get_sqla_table(self): | ||
| tbl = table(self.table_name) | ||
|
|
@@ -517,12 +520,11 @@ def adhoc_metric_to_sqla(self, metric, cols): | |
|
|
||
| if expression_type == utils.ADHOC_METRIC_EXPRESSION_TYPES['SIMPLE']: | ||
| column_name = metric.get('column').get('column_name') | ||
| sqla_column = column(column_name) | ||
| table_column = cols.get(column_name) | ||
|
|
||
| if table_column: | ||
| sqla_column = table_column.get_sqla_col() | ||
|
|
||
| else: | ||
| sqla_column = column(column_name) | ||
| sqla_metric = self.sqla_aggregations[metric.get('aggregate')](sqla_column) | ||
| sqla_metric = sqla_metric.label(label) | ||
| return sqla_metric | ||
|
|
@@ -551,7 +553,7 @@ def get_sqla_query( # sqla | |
| order_desc=True, | ||
| prequeries=None, | ||
| is_prequery=False, | ||
| ): | ||
| ): | ||
| """Querying any sqla table from this common interface""" | ||
| template_kwargs = { | ||
| 'from_dttm': from_dttm, | ||
|
|
@@ -640,6 +642,12 @@ def get_sqla_query( # sqla | |
| time_filters.append(dttm_col.get_time_filter(from_dttm, to_dttm)) | ||
|
|
||
| select_exprs += metrics_exprs | ||
|
|
||
| labels_expected = [str(c.name) for c in select_exprs] | ||
|
|
||
| select_exprs = db_engine_spec.make_select_compatible( | ||
| groupby_exprs_with_timestamp.values(), | ||
| select_exprs) | ||
| qry = sa.select(select_exprs) | ||
|
|
||
| tbl = self.get_from_clause(template_processor) | ||
|
|
@@ -793,7 +801,8 @@ def get_sqla_query( # sqla | |
| groupby_exprs_sans_timestamp) | ||
| qry = qry.where(top_groups) | ||
|
|
||
| return qry.select_from(tbl) | ||
| return SqlaQuery(sqla_query=qry.select_from(tbl), | ||
| labels_expected=labels_expected) | ||
|
|
||
| def _get_top_groups(self, df, dimensions, groupby_exprs): | ||
| groups = [] | ||
|
|
@@ -807,19 +816,21 @@ def _get_top_groups(self, df, dimensions, groupby_exprs): | |
|
|
||
| def query(self, query_obj): | ||
| qry_start_dttm = datetime.now() | ||
| sql = self.get_query_str(query_obj) | ||
| query_str_ext = self.get_query_str_extended(query_obj) | ||
| sql = query_str_ext.sql | ||
| status = utils.QueryStatus.SUCCESS | ||
| error_message = None | ||
| df = None | ||
| db_engine_spec = self.database.db_engine_spec | ||
| try: | ||
| df = self.database.get_df(sql, self.schema) | ||
| if self.mutated_labels: | ||
| df = df.rename(index=str, columns=self.mutated_labels) | ||
| db_engine_spec.mutate_df_columns(df, sql, query_str_ext.labels_expected) | ||
| except Exception as e: | ||
| status = utils.QueryStatus.FAILED | ||
| logging.exception(e) | ||
| error_message = ( | ||
| self.database.db_engine_spec.extract_error_message(e)) | ||
| logging.exception(f'Query {sql} on schema {self.schema} failed') | ||
| error_message = db_engine_spec.extract_error_message(e) | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems very specific to Pinot. I wonder if this could be dealt with in
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered that (After looking at your diff :-) ). But I decided against that for this reason: I think that the "labels" you want in a query is a property of the query and not a property of the table/db. In your case, you were creating unique labels mapping to real labels. But I don't have unique labels. Further, the problem is that pinot does not return me the labels. So if I do "select foo as bar from baz", I am going to get a column named "foo" from pinot instead of "baz" :(. I am simply working around that fact here. Basically, this "labels_expected" has to be passed as another return value from the get_query_str. I feel it shouldn't be some "state" in either the "db" or the "table". An alternative I did try was to re-parse the query in Pinot and extract the labels there, but it was proving a bit difficult to reliably do this re-parsing with all the query strings that superset can concoct. So I gave up there :)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I understand it, P.S. The rename logic I have done is rather controversial and does bend the rules, and could probably have been solved more elegantly, but would have required a bigger refactor that would have been unfeasible. But as it was a common problem for several dbs, the current solution was deemed a good compromise.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that the logic belongs in the DB API driver instead of Superset — it would also benefit other tools that use the driver. I wrote a Google Spreadsheets connector that has the same problem, since the SQL API supported is very crude and doesn't accept aliases. Maybe you can reuse the helper function I created? It's based on https://github.com/betodealmeida/gsheets-db-api/blob/master/gsheetsdb/query.py#L93
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am wondering if I could do that as a part of the future work :). ? I agree with you that the ideal way to do this is to have the DBAPI deal with this and that requires doing a re-parse of the query to infer the required label names. Given all the difficulty of parsing (or unparsing) PQL, I would rather do this more properly than hack something in here to unblock this diff. I anyway have to rework the pinot-dbapi to integrate it with dbapihelper, so I can also consider integrating it with this sql parser then too. I am just anxious of hanging onto this PR in our internal branch here that increases our merge burden. Thanks |
||
| # if this is a main query with prequeries, combine them together | ||
| if not query_obj['is_prequery']: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,6 +114,18 @@ class BaseEngineSpec(object): | |
| force_column_alias_quotes = False | ||
| arraysize = None | ||
|
|
||
| @classmethod | ||
| def get_time_expr(cls, expr, pdf, time_grain, grain): | ||
| # if epoch, translate to DATE using db specific conf | ||
| if pdf == 'epoch_s': | ||
| expr = cls.epoch_to_dttm().format(col=expr) | ||
| elif pdf == 'epoch_ms': | ||
| expr = cls.epoch_ms_to_dttm().format(col=expr) | ||
|
|
||
| if grain: | ||
| expr = grain.function.format(col=expr) | ||
| return expr | ||
|
|
||
| @classmethod | ||
| def get_time_grains(cls): | ||
| blacklist = config.get('TIME_GRAIN_BLACKLIST', []) | ||
|
|
@@ -124,6 +136,16 @@ def get_time_grains(cls): | |
| grain_functions.update(grain_addon_functions.get(cls.engine, {})) | ||
| return _create_time_grains_tuple(grains, grain_functions, blacklist) | ||
|
|
||
| @classmethod | ||
| def make_select_compatible(cls, groupby_exprs, select_exprs): | ||
| # Some databases will just return the group-by field into the select, but don't | ||
| # allow the group-by field to be put into the select list. | ||
| return select_exprs | ||
|
|
||
| @classmethod | ||
| def mutate_df_columns(cls, df, sql, labels_expected): | ||
| pass | ||
|
|
||
| @classmethod | ||
| def fetch_data(cls, cursor, limit): | ||
| if cls.arraysize: | ||
|
|
@@ -1413,6 +1435,64 @@ def epoch_to_dttm(cls): | |
| return 'from_unixtime({col})' | ||
|
|
||
|
|
||
| class PinotEngineSpec(BaseEngineSpec): | ||
| engine = 'pinot' | ||
| allows_subquery = False | ||
| inner_joins = False | ||
|
|
||
| _time_grain_to_datetimeconvert = { | ||
| 'PT1S': '1:SECONDS', | ||
| 'PT1M': '1:MINUTES', | ||
| 'PT1H': '1:HOURS', | ||
| 'P1D': '1:DAYS', | ||
| 'P1Y': '1:YEARS', | ||
| 'P1M': '1:MONTHS', | ||
| } | ||
|
|
||
| # Pinot does its own conversion below | ||
| time_grain_functions = {k: None for k in _time_grain_to_datetimeconvert.keys()} | ||
|
|
||
| @classmethod | ||
| def get_time_expr(cls, expr, pdf, time_grain, grain): | ||
| is_epoch = pdf in ('epoch_s', 'epoch_ms') | ||
| if not is_epoch: | ||
| raise NotImplementedError('Pinot currently only supports epochs') | ||
| # The DATETIMECONVERT pinot udf is documented at | ||
| # Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF | ||
| # We are not really converting any time units, just bucketing them. | ||
| seconds_or_ms = 'MILLISECONDS' if pdf == 'epoch_ms' else 'SECONDS' | ||
| tf = f'1:{seconds_or_ms}:EPOCH' | ||
| granularity = cls._time_grain_to_datetimeconvert.get(time_grain) | ||
| if not granularity: | ||
| raise NotImplementedError('No pinot grain spec for ' + str(time_grain)) | ||
| # In pinot the output is a string since there is no timestamp column like pg | ||
| return f'DATETIMECONVERT({expr}, "{tf}", "{tf}", "{granularity}")' | ||
|
|
||
| @classmethod | ||
| def make_select_compatible(cls, groupby_exprs, select_exprs): | ||
| # Pinot does not want the group by expr's to appear in the select clause | ||
| select_sans_groupby = [] | ||
| # We want identity and not equality, so doing the filtering manually | ||
| for s in select_exprs: | ||
| for gr in groupby_exprs: | ||
| if s is gr: | ||
| break | ||
| else: | ||
| select_sans_groupby.append(s) | ||
| return select_sans_groupby | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is slightly difficult to follow; a comment explaining what is happening here might be helpful.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay. This UDF is weird. I will add a link to its description and some rationale.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's super weird, I never understood how it works. |
||
| @classmethod | ||
| def mutate_df_columns(cls, df, sql, labels_expected): | ||
| if df is not None and \ | ||
| not df.empty and \ | ||
| labels_expected is not None: | ||
| if len(df.columns) != len(labels_expected): | ||
| raise Exception(f'For {sql}, df.columns: {df.columns}' | ||
| f' differs from {labels_expected}') | ||
| else: | ||
| df.columns = labels_expected | ||
|
|
||
|
|
||
| class ClickHouseEngineSpec(BaseEngineSpec): | ||
| """Dialect for ClickHouse analytical DB.""" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,6 +96,8 @@ def __init__(self, datasource, form_data, force=False): | |
| self.time_shift = timedelta() | ||
|
|
||
| self.status = None | ||
| self.error_msg = '' | ||
| self.results = None | ||
| self.error_message = None | ||
| self.force = force | ||
|
|
||
|
|
@@ -226,7 +228,22 @@ def get_df(self, query_obj=None): | |
| if DTTM_ALIAS in df.columns: | ||
| if timestamp_format in ('epoch_s', 'epoch_ms'): | ||
| # Column has already been formatted as a timestamp. | ||
| df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp) | ||
| dttm_col = df[DTTM_ALIAS] | ||
| one_ts_val = dttm_col[0] | ||
|
|
||
| # convert time column to pandas Timestamp, but different | ||
| # ways to convert depending on string or int types | ||
| try: | ||
| int(one_ts_val) | ||
| is_integral = True | ||
| except ValueError: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI there's a bug around this, fixed it here #7375 |
||
| is_integral = False | ||
| if is_integral: | ||
| unit = 's' if timestamp_format == 'epoch_s' else 'ms' | ||
| df[DTTM_ALIAS] = pd.to_datetime(dttm_col, utc=False, unit=unit, | ||
| origin='unix') | ||
| else: | ||
| df[DTTM_ALIAS] = dttm_col.apply(pd.Timestamp) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems quite specific to Pinot, and is slightly difficult to follow.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will try to fix and/or comment. Thanks. |
||
| else: | ||
| df[DTTM_ALIAS] = pd.to_datetime( | ||
| df[DTTM_ALIAS], utc=False, format=timestamp_format) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually also fixes a bug I found with Postgres, where the query is:
SELECT SUM(cnt) AS "SUM(cnt)" FROM ...But the cursor returns a column named
sum(cnt), lowercase, and then Pandas errors out when pivoting the table.