Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,46 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn limit() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 1)?;
ctx.register_table("t", table_with_sequence(1, 1000).unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at Limit yesterday - it seems to poll new batches even when the limit has been reached (and throw away the result in the end)? Not wrong - but quite inefficient of course :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure --- I have a program (https://github.com/influxdata/influxdb_iox/pull/1117) that I made for testing some stuff as we roll out IOx and there is definitely a problem with limit, though I haven't figured out if it is a problem in DataFusion or not

For this plan:

> explain select * from chunks limit 1;
+--------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                                                                    |
+--------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: 1                                                                                                                                |
|              |   Projection: #database_name, #id, #partition_key, #storage, #estimated_bytes, #time_of_first_write, #time_of_last_write, #time_closing |
|              |     TableScan: chunks projection=None                                                                                                   |
+--------------+-----------------------------------------------------------------------------------------------------------------------------------------+
1 rows in set. Query took 0 seconds.

Produces no rows (and no columns) when there is a limit:

> select * from chunks limit 1;
++
||
++
++
0 rows in set. Query took 0 seconds.

But there is definitely data in that table:

> select * from chunks;
+-----------------------------------+-----+---------------------+---------------------+-----------------+-------------------------------+-------------------------------+-------------------------------+
| database_name                     | id  | partition_key       | storage             | estimated_bytes | time_of_first_write           | time_of_last_write            | time_closing                  |
+-----------------------------------+-----+---------------------+---------------------+-----------------+-------------------------------+-------------------------------+-------------------------------+
| 844910ece80be8bc_05a7a51565539000 | 0   | 2021-04-05 21:00:00 | OpenMutableBuffer   | 259733          | 2021-04-05 21:29:38.978576237 | 2021-04-05 21:49:47.995408514 |                               |
....
| 844910ece80be8bc_eaec8df57a81a1e9 | 1   | 2021-04-05 21:00:00 | OpenMutableBuffer   | 6408933         | 2021-04-05 21:44:31.507950286 | 2021-04-05 21:55:37.226659960 |                               |
+-----------------------------------+-----+---------------------+---------------------+-----------------+-------------------------------+-------------------------------+-------------------------------+
226 rows in set. Query took 0 seconds.

I am still looking into it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/ARROW-12235 is one possibly problem I found

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at Limit yesterday - it seems to poll new batches even when the limit has been reached (and throw away the result in the end)? Not wrong - but quite inefficient of course :)

@Dandandan I spent some time looking into this and you were absolutely correct. I have a PR (draft) up with a proposed fix: #9926

.unwrap();

let results =
plan_and_collect(&mut ctx, "SELECT i FROM t ORDER BY i DESC limit 3")
.await
.unwrap();

let expected = vec![
"+------+", "| i |", "+------+", "| 1000 |", "| 999 |", "| 998 |",
"+------+",
];

assert_batches_eq!(expected, &results);

let results = plan_and_collect(&mut ctx, "SELECT i FROM t ORDER BY i limit 3")
.await
.unwrap();

let expected = vec![
"+---+", "| i |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
];

assert_batches_eq!(expected, &results);

let results = plan_and_collect(&mut ctx, "SELECT i FROM t limit 3")
.await
.unwrap();

// the actual rows are not guaranteed, so only check the count (should be 3)
let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
assert_eq!(num_rows, 3);

Ok(())
}

#[tokio::test]
async fn case_sensitive_identifiers_functions() {
let mut ctx = ExecutionContext::new();
Expand Down