Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/biocutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@

from .biocobject import BiocObject
from .table import table
from .split import split
132 changes: 132 additions & 0 deletions src/biocutils/split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing import Any, Sequence, Union
from functools import singledispatch

import numpy

from .NamedList import NamedList
from .Factor import Factor
from .match import match
from .subset import subset
from .get_height import get_height


@singledispatch
def split(
x: Any,
f: Sequence,
skip: Union[set, Sequence] = [None, numpy.ma.masked],
drop: bool = False,
as_NamedList: bool = False,
) -> Union[dict, NamedList]:
"""
Split a sequence ``x`` into groups defined by a categorical factor ``f``.

Args:
x:
Values to be divided into groups.
Any object that supports :py:func:`~biocutils.subset.subset` can be used here.

f:
A sequence of categorical variables defining the groupings.
This should have length equal to the "height" of ``x`` (see :py:func:`~biocutils.get_height.get_height`).

The order of groups is defined by sorting all unique variables in ``f``.
If a :py:class:`~biocutils.Factor.Factor` is provided, the order of groups is defined by the existing levels.

skip:
Values of ``f`` to be skipped.
The corresponding entries of ``x`` are also omitted from the output.

drop:
Whether to drop unused levels, if ``f`` is a ``Factor``.

as_NamedList:
Whether to return the results as a :py:class:`~biocutils.NamedList.NamedList`.
This automatically converts all groups into strings.

Returns:
A dictionary where each key is a unique group and each value contains that group's entries from ``x``.
If ``as_NamedList = true``, this is a ``NamedList`` instead.

Examples:
>>> import numpy
>>> x = numpy.random.rand(
... 10
... )
>>> f = numpy.random.choice(
... ["A", "B", "C"],
... 10,
... )
>>> import biocutils
>>> biocutils.split(
... x, f
... )
>>> biocutils.split(
... x,
... f,
... as_NamedList=True,
... )
>>> biocutils.split(
... x,
... biocutils.Factor.from_sequence(
... f,
... [
... "X",
... "A",
... "Y",
... "B",
... "Z",
... "C",
... ],
... ),
... drop=False,
... )
"""

if isinstance(f, Factor):
if drop:
f = f.drop_unused_levels()
if len(skip) > 0:
levels = []
reindex = []
for lev in f.get_levels():
ix = -1
if lev not in skip:
ix = len(levels)
levels.append(lev)
reindex.append(ix)
indices = []
for code in f.get_codes():
if code >= 0:
code = reindex[code]
indices.append(code)
else:
levels = f.get_levels()
indices = f.get_codes()
else:
if len(skip) > 0:
levels = set()
for y in f:
if y not in skip:
levels.add(y)
else:
levels = set(f)
levels = sorted(list(levels))
indices = match(f, levels)

if get_height(x) != get_height(f):
raise ValueError("heights of 'x' and 'f' should be the same")

collected = []
for lev in levels:
collected.append([])
for i, j in enumerate(indices):
if j >= 0:
collected[j].append(i)
for i, c in enumerate(collected):
collected[i] = subset(x, c)

if as_NamedList:
return NamedList(collected, levels)
else:
return dict(zip(levels, collected))
62 changes: 62 additions & 0 deletions tests/test_split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import numpy
import biocutils


def test_split_basic():
x = numpy.random.rand(10)
f = ["B", "A"] * 5

frag = biocutils.split(x, f)
assert list(frag.keys()) == ["A", "B"]
assert (frag["A"] == x[1:10:2]).all()
assert (frag["B"] == x[0:10:2]).all()

frag2 = biocutils.split(x, f, skip=[])
assert list(frag.keys()) == list(frag2.keys())
assert (frag["A"] == frag2["A"]).all()
assert (frag["B"] == frag2["B"]).all()

nfrag = biocutils.split(x, f, as_NamedList=True)
assert nfrag.get_names().as_list() == ["A", "B"]


def test_split_basic_none():
x = numpy.random.rand(15)
f = ["A", "B", None] * 5

frag = biocutils.split(x, f)
assert list(frag.keys()) == ["A", "B"]
assert (frag["A"] == x[0:15:3]).all()
assert (frag["B"] == x[1:15:3]).all()


def test_split_Factor():
x = numpy.random.rand(10)
f = biocutils.Factor.from_sequence(["B", "D"] * 5, levels=["E", "D", "C", "B", "A"])

frag = biocutils.split(x, f, drop=True)
assert list(frag.keys()) == ["D", "B"]
assert (frag["B"] == x[0:10:2]).all()
assert (frag["D"] == x[1:10:2]).all()

frag2 = biocutils.split(x, f, skip=[], drop=True)
assert list(frag.keys()) == list(frag2.keys())
assert (frag["B"] == frag2["B"]).all()
assert (frag["D"] == frag2["D"]).all()

frag = biocutils.split(x, f, drop=False)
assert list(frag.keys()) == ["E", "D", "C", "B", "A"]


def test_split_Factor_none():
x = numpy.random.rand(15)
f = biocutils.Factor.from_sequence(["A", "B", None] * 5)

frag = biocutils.split(x, f)
assert list(frag.keys()) == ["A", "B"]
assert (frag["A"] == x[0:15:3]).all()
assert (frag["B"] == x[1:15:3]).all()

frag = biocutils.split(x, f, skip=set([None, "A"]))
assert list(frag.keys()) == ["B"]
assert (frag["B"] == x[1:15:3]).all()