-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Is your feature request related to a problem or challenge?
When debugging something downstream, I was quite confused by the following:
Make input:
echo "x,y" > /tmp/test.csv
echo "a,1" >> /tmp/test.csv
echo "a,2" >> /tmp/test.csv
echo "b,3" >> /tmp/test.csvRun in datafusion-cli:
Create table:
DROP TABLE IF EXISTS test ;
CREATE EXTERNAL TABLE test(x varchar, y bigint)
STORED AS CSV
WITH HEADER ROW
WITH ORDER (x ASC)
LOCATION '/tmp/test.csv'
;Then run a query:
❯ explain select * from test order by x ASC;
+---------------+----------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------+
| logical_plan | Sort: test.x ASC NULLS LAST |
| | TableScan: test projection=[x, y] |
| physical_plan | CsvExec: files={1 group: [[private/tmp/test.csv]]}, has_header=true, limit=None, projection=[x, y] |
| | |
+---------------+----------------------------------------------------------------------------------------------------+
Note that the CSV exec does NOT show output_ordering in the plan but the optimizer has used it (there is no sort in the actual plan)
Here is an example of a parquet file showing output_ordering=[tag0@0 ASC, time@1 ASC]
2023-05-02T13:39:55.659173Z TRACE datafusion::physical_plan::planner: Optimized physical plan by parquet_sortness:
SortExec: expr=[iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
ProjectionExec: expr=[select_test as iox::measurement, tag0 as key, tag0@0 as value]
AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[], ordering_mode=FullyOrdered
AggregateExec: mode=Partial, gby=[tag0@0 as tag0], aggr=[], ordering_mode=FullyOrdered
UnionExec
ProjectionExec: expr=[tag0@0 as tag0]
FilterExec: time@1 >= 631152000000000000
ParquetExec: limit=None, partitions={1 group: [[1/1/1/3a820ed1-c0a1-468d-b4de-edd49f2fef50.parquet]]}, predicate=time@12 >= 631152000000000000, pruning_predicate=time_max@0 >= 631152000000000000, output_ordering=[tag0@0 ASC, time@1 ASC], projection=[tag0, time]
Describe the solution you'd like
I would like all the listing tables (e.g. CsvExec, AvroExec, JsonExec, etc to have fmt_as that include output_ordering when it has one
Here is the relevant part in CsvExec:
Describe alternatives you've considered
The simple solution would be to copy the code from ParquetExec in https://github.com/apache/arrow-datafusion/blob/cda00b545e1b4492269f76f65545c82264f79b88/datafusion/core/src/physical_plan/file_format/parquet.rs#L422-L435
The (better) solution would be to make a generic way to format the base_config field that is used across all of the executors.
base_config: FileScanConfig,A generic solution would be better as it would be far more likely to remain in sync if additional fields are added
Additional context
I think this is a good first issue as it is a relatively straightforward coding exercise (and test output update exercise) that would help someone understand the codebase