Add sort_actor to cudf-polars + rapidsmpf#21690
Add sort_actor to cudf-polars + rapidsmpf#21690rapids-bot[bot] merged 70 commits intorapidsai:mainfrom
sort_actor to cudf-polars + rapidsmpf#21690Conversation
| ) | ||
| if sort_ir.stable: | ||
| nrows = df.table.num_rows() | ||
| base = seq_num * (1 << 32) |
There was a problem hiding this comment.
This is incorrect in a multi-rank setting. I think what you're trying to do is give each row in the (concatenated) stream of local table chunks a unique id.
seq_num * (1 << 32) is a good offset for this because by construction no sequence numbers from chunks on the same rank can now overlap.
But, it seems unnecessary (you have the number of rows when adding the column, so just keep track of that locally).
However, what this doesn't do is ensure that two different ranks have different sequence numbers. For that, you need to incorporate the rank in the high bits of the id, such that when you sort globally, rows from rank-1 (say) come after rows from rank-0 if their keys are equal.
There was a problem hiding this comment.
Okay, thanks for explaining - I updated the logic here (hopefully in line with your suggestion).
| df = DataFrame.from_table( | ||
| out_table, | ||
| cast(Sequence[str], column_names_list), | ||
| dtypes_list, | ||
| stream, | ||
| ) | ||
| sort_order = [ | ||
| list(column_order)[by.index(n)] | ||
| if n in by | ||
| else plc.types.Order.ASCENDING | ||
| for n in column_names_list | ||
| ] | ||
| nulls = [ | ||
| list(null_order)[by.index(n)] | ||
| if n in by | ||
| else plc.types.NullOrder.AFTER | ||
| for n in column_names_list | ||
| ] | ||
| sorted_tbl = plc.sorting.sort( | ||
| df.table, sort_order, nulls, stream=stream | ||
| ) | ||
| out_table = plc.Table(sorted_tbl.columns()[:-1]) |
There was a problem hiding this comment.
This sort is wrong, because it sorts by all the columns in the table, whereas you only want to sort by the key columns (and the disambiguating seq_id column).
You want to be using sort_by_key
There was a problem hiding this comment.
Makes sense - I decided to construct a Sort so I can use do_evaluate, but I can move to an explicit sort_by_key if you'd prefer.
| by: list[str], | ||
| by_dtypes: list[DataType], |
There was a problem hiding this comment.
These arguments are only used to construct the schema of the "empty" table for the case that the local candidates are empty, which is then used to provide the schema for chunk_to_frame.
Two things:
- we only need the schema for all of those operations, so we should redo the utilities to take the schema.
- Let's construct the empty table outside if
local_candidatesis empty.
So this function becomes:
async def _compute_sort_boundaries(
context: Context,
comm: Communicator,
ir_context: IRExecutionContext,
local_candidates: list[TableChunk],
schema: Schema,
num_partitions: int,
column_order: list[plc.types.Order],
null_order: list[plc.types.NullOrder],
allgather_id: int
) -> plc.Table:
stream = ...
boundaries = _get_final_sort_boundaries(
chunk_to_frame(
await concat_batch(local_candidates, context, schema, ir_context)
)
)
if comm.nranks > 1:
...
boundaries = _get_final_sort_boundaries(
...
)
return boundaries.table
There was a problem hiding this comment.
Okay, yeah - I revised this a bit.
mroeschke
left a comment
There was a problem hiding this comment.
Just some non-blocking ideas/questions around sortedness:
- Are there any simplifications to be made to computing sort boundaries if the incoming data is already sorted?
- When processing the resulting message of the sorted TableChunk, is there a practical way to construct the cudf_polars DataFrame with sorted metadata i.e. set the
is_sortedflag on the Columns of the cudf_polars DataFrame? Maybe the message sends metadata with that sorted information?
The sort-boundary calculation requires the incoming chunks to be sorted. We guarantee this at lowering time by ensuring the child of a ShuffleSorted node is always a Sort (which is executed chunk-wise). In the future, we should have metadata like rapidsai/rapidsmpf#853 to skip those local sort operations when the data is already sorted.
I'm hoping to use rapidsai/rapidsmpf#853 to track this kind of information in the ChannelMetadata. |
wence-
left a comment
There was a problem hiding this comment.
Tiny changes, but I think the core logic looks good.
|
/merge |
Should close #21824 See test case for repro, but basically if you had ```python df = pl.LazyFrame({"a": [1, 2, 3]}) # Create two filters, both of which will give empty results q = pl.concat([df.filter(pl.col("a") == 0), df.filter(pl.col("a") == 4)]).sort("a") ``` then `q.collect(engine="gpu")` you'd get a cuda exception because we'd try to do an out of bounds access on an empty table. edit: turns out #21690 fixed this issue. This PR now will only contribute a test case. Authors: - J Berg (https://github.com/jberg5) Approvers: - Matthew Roeschke (https://github.com/mroeschke) URL: #21825
Description
Closes #20486
Depends on rapidsai/rapidsmpf#891 (or similar)
Adds
sort_actorfor the "rapidsmpf" runtime.Checklist