diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0204692..3e3c6d9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -52,7 +52,7 @@ repos: hooks: - id: isort args: ["--profile", "black"] - additional_dependencies: ["toml"] + additional_dependencies: [toml] - repo: https://github.com/pre-commit/mirrors-mypy rev: v0.940 hooks: @@ -63,3 +63,7 @@ repos: - id: pydocstyle args: # google style + __init__, see http://www.pydocstyle.org/en/stable/error_codes.html - --ignore=D100,D101,D102,D103,D106,D107,D203,D204,D213,D215,D400,D401,D403,D404,D406,D407,D408,D409,D413 + exclude: | + (?x)( + __init__.py + ) diff --git a/README.md b/README.md index 796f676..54db22b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,17 @@ + + Gitmoji + + # Pytometry: Flow & mass cytometry analytics This package is in private beta at this moment! -Follow https://twitter.com/laminlabs to learn about a first public release. +This package extends scanpy to for efficient and scalable handling of flow and mass cytometry data analysis. It provides + +- the functionality to read in flow data in the fcs file format as anndata objects +- flow and mass cytometry specific preprocessing tools +- access to the entire scanpy workflow functionality + +Follow https://twitter.com/mbuttner to learn about a first public release. -For beta users: Read the [docs](https://lamin.ai/pytometry). +For beta users: Read the [docs](https://pytometry.netlify.app/pytometry/). diff --git a/pyproject.toml b/pyproject.toml index 5412c6c..f5fdddb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,13 @@ dynamic = ["version"] description = "Pytometry is a Python package for flow and mass cytometry analysis." dependencies = [ "nbproject", + "numpy", + "pandas", + "anndata", + "scipy", + "seaborn", + "matplotlib", + "FlowCytometryTools" ] [project.urls] diff --git a/pytometry/__init__.py b/pytometry/__init__.py index ec50783..7cdf4e4 100644 --- a/pytometry/__init__.py +++ b/pytometry/__init__.py @@ -15,4 +15,7 @@ __version__ = "0.0.1" # denote a pre-release for 0.1.0 with 0.1a1 +from . import preprocessing as pp +from . import read_write as io +from . import tools as tl from ._core import ExampleClass, example_function # noqa diff --git a/pytometry/preprocessing/__init__.py b/pytometry/preprocessing/__init__.py new file mode 100644 index 0000000..8841a86 --- /dev/null +++ b/pytometry/preprocessing/__init__.py @@ -0,0 +1,8 @@ +from ._process_data import ( + compensate, + create_comp_mat, + create_spillover_mat, + find_indexes, + plotdata, + split_area, +) diff --git a/pytometry/preprocessing/_process_data.py b/pytometry/preprocessing/_process_data.py new file mode 100644 index 0000000..db808a0 --- /dev/null +++ b/pytometry/preprocessing/_process_data.py @@ -0,0 +1,263 @@ +import math + +import numpy as np +import pandas as pd +import seaborn as sb + +# import FlowCytometryTools as fct +from anndata import AnnData +from matplotlib import pyplot as plt +from matplotlib import rcParams + +from ..tools import normalize_arcsinh + +# import getpass +# import os.path + + +def create_spillover_mat(fcsdata, key="$SPILLOVER"): + """Create spillover matrix from meta data of an .fcs file. + + Args: + fcsdata (dict): Meta data from .fcs file. + key (str, optional): Spillover matrix as panda dataframe. + Defaults to '$SPILLOVER'. + + Returns: + pd.DataFrame: Spillover matrix as pd.DataFrame + """ + spillover = fcsdata.meta[key].split(",") + num_col = int(spillover[0]) + channel_names = spillover[1 : (int(spillover[0]) + 1)] + channel_data = fcsdata.meta["_channels_"] + + if "$PnS" in channel_data: + channel_renames = [ + str(channel_data["$PnS"][channel_data["$PnN"] == name][0]) + for name in channel_names + ] + else: + channel_renames = channel_names + + spill_values = np.reshape( + [float(inp) for inp in spillover[(int(spillover[0]) + 1) :]], [num_col, num_col] + ) + spill_df = pd.DataFrame(spill_values, columns=channel_renames) + return spill_df + + +def create_comp_mat(spillmat, relevant_data=""): + """Creates a compensation matrix from a spillover matrix. + + Args: + spillmat (pd.DataFrame): Spillover matrix as pandas dataframe. + relevant_data (str, optional):A list of channels for customized selection. + Defaults to ''. + + Returns: + pd.DataFrame: Compensation matrix as pandas dataframe. + """ + if relevant_data == "": + comp_mat = np.linalg.inv(spillmat) + compens = pd.DataFrame(comp_mat, columns=list(spillmat.columns)) + else: + comp_mat = np.linalg.inv(spillmat) + compens = pd.DataFrame(comp_mat, columns=relevant_data) + + return compens + + +def find_indexes(adata: AnnData, key_added="signal_type", data_type="facs"): + """Find channels of interest for computing compensation. + + Args: + adata (AnnData): anndata object + key_added (str, optional): key where result vector is added to the adata.var. + Defaults to 'signal_type'. + data_type (str, optional): either 'facs' or 'cytof'. + Defaults to 'facs'. + + Returns: + AnnData: anndata object with a categorical vector in + adata.var[f'{key_added}'] + """ + index = adata.var.index + index_array = [] + + if data_type == "facs": + for item in index: + if item.endswith("-A") and not item.count("SC-"): + index_array.append("area") + elif item.endswith("-H") and not item.count("SC-"): + index_array.append("height") + else: + index_array.append("other") + + elif data_type in ["cytof", "cyTOF"]: + for item in index: + if item.endswith("Di") or item.endswith("Dd"): + index_array.append("element") + else: + index_array.append("other") + else: + print( + f"{data_type} not recognized. Must be either 'facs' or " + " 'cytof'/'cyTOF'" + ) + adata.var["signal_type"] = pd.Categorical(index_array) + return adata + + +# rename compute bleedthr to compensate +def compensate(adata: AnnData, key="signal_type"): + """Computes bleedthrough for data channels. + + Args: + adata (AnnData): AnnData object + key (str, optional): key where result vector is added + to the adata.var. Defaults to 'signal_type'. + + Returns: + AnnData: AnnData object + """ + key_in = key + compens = adata.uns["comp_mat"] + # save original data as layer + if "original" not in adata.layers: + adata.layers["original"] = adata.X + + # Ignore channels 'FSC-H', 'FSC-A', 'SSC-H', 'SSC-A', + # 'FSC-Width', 'Time' + if key_in not in adata.var_keys(): + adata = find_indexes(adata, data_type="facs") + # select non other indices + indexes = np.invert(adata.var[key_in] == "other") + + bleedthrough = np.dot(adata.X[:, indexes], compens) + adata.X[:, indexes] = bleedthrough + return adata + + +def split_area(adata: AnnData, key="signal_type", option="area", data_type="facs"): + """Method to filter out height or area data. + + Args: + adata (AnnData): AnnData object containing data. + key (str, optional): key for adata.var where the variable type is stored. + Defaults to 'signal_type'. + option (str, optional): for choosing 'area' or 'height' in case of FACS data + and 'element' for cyTOF data. Defaults to 'area'. + data_type (str, optional): either 'facs' or 'cytof'/'cyTOF'. Defaults to 'facs'. + + Returns: + AnnData: AnnData object containing area or height data + """ + option_key = option + key_in = key + + possible_options = ["area", "height", "other", "element"] + + if option_key not in possible_options: + print(f"{option_key} is not a valid category. Return all.") + return adata + # Check if indices for area and height have been computed + if key_in not in adata.var_keys(): + adata = find_indexes(adata, data_type=data_type) + + index = adata.var[key_in] == option_key + # if none of the indices is true, abort + if sum(index) < 1: + print(f"{option_key} is not in adata.var[{key_in}]. Return all.") + return adata + + non_idx = np.flatnonzero(np.invert(index)) + + # merge non-idx entries in data matrix with obs + non_cols = adata.var_names[non_idx].values + for idx, colname in enumerate(non_cols): + adata.obs[colname] = adata.X[:, non_idx[idx]].copy() + + # create new anndata object (note: removes potential objects like obsm) + adataN = AnnData( + X=adata.X[:, np.flatnonzero(index)], + obs=adata.obs, + var=adata.var.iloc[np.flatnonzero(index)], + uns=adata.uns, + ) + adataN.var_names = adata.var_names[index].values + return adataN + + +# TODO: move function to plotting module +# Plot data. Choose between Area, Height both(default) +def plotdata( + adata: AnnData, + key="signal_type", + normalize=True, + cofactor=10, + figsize=(15, 6), + option="", + save="", + **kwargs, +): + """Creating histogram plot from Anndata object. + + :param adata: AnnData object containing data. + :param cofactor: float value to normalize with in arcsinh-transform + :param option: Switch to choose directly between area and height data. + :param save: Filename to save the shown figure + :param kwargs: Passed to :func:`matplotlib.pyplot.savefig` + """ + option_key = option + key_in = key + adata_ = adata.copy() + + # Check if indices for area and height have been computed + if key_in not in adata_.var_keys(): + adata_ = find_indexes(adata_) + + if normalize: + adata_ = normalize_arcsinh(adata_, cofactor) + + if option_key not in ["area", "height", "other"]: + print(f"Option {option_key} is not a valid category. Return all.") + datax = adata_.X + var_names = adata_.var_names.values + else: + index = adata_.var[key_in] == option_key + datax = adata_.X[:, index] + var_names = adata_.var_names[index].values + + if len(var_names) == 0: + print( + f"Option {option_key} led to the selection of 0 variables. " + " Nothing to plot." + ) + return + + rcParams["figure.figsize"] = figsize + + names = var_names + number = len(names) + + columns = 3 + rows = math.ceil(number / columns) + + fig = plt.figure() + fig.subplots_adjust(hspace=0.4, wspace=0.6) + + for idx in range(number): + ax = fig.add_subplot(rows, columns, idx + 1) + sb.distplot( + datax[:, names == names[idx]], + kde=False, + norm_hist=False, + bins=400, + ax=ax, + axlabel=names[idx], + ) + if save != "": + plt.savefig(save, bbox_inches="tight", **kwargs) + plt.show() + + return diff --git a/pytometry/read_write/__init__.py b/pytometry/read_write/__init__.py new file mode 100644 index 0000000..93aa105 --- /dev/null +++ b/pytometry/read_write/__init__.py @@ -0,0 +1 @@ +from .fileconverter import readandconvert diff --git a/pytometry/read_write/fileconverter.py b/pytometry/read_write/fileconverter.py new file mode 100644 index 0000000..5c1b746 --- /dev/null +++ b/pytometry/read_write/fileconverter.py @@ -0,0 +1,108 @@ +import getpass +import os.path +from pathlib import Path + +import FlowCytometryTools as fct +import pandas as pd +from anndata import AnnData + +from ..preprocessing import _process_data + + +def __toanndata(filenamefcs, fcsfile, spillover_key="$SPILLOVER", save=False): + """Converts .fcs file to .h5ad file. + + :param filenamefcs: filename without extension + :param fcsfile: path to .fcs file + :param spillover_key: name to access the spillover matrix, if any + :return: Anndata object with additional .uns entries + """ + fcsdata = fct.FCMeasurement(ID="FCS-file", datafile=fcsfile) + adata = AnnData(X=fcsdata.data[:].values) + adata.var_names = fcsdata.channel_names + adata.uns["meta"] = fcsdata.meta + + # check for any binary format types in the .uns['meta'] dictionary + # and replace by a string + keys_all = adata.uns["meta"].keys() + for key in keys_all: + types = type(adata.uns["meta"][key]) + if types is dict: + keys_sub = adata.uns["meta"][key].keys() + for key2 in keys_sub: + types2 = type(adata.uns["meta"][key][key2]) + if types2 is bytes: + adata.uns["meta"][key][key2] = adata.uns["meta"][key][key2].decode() + elif types is bytes: + adata.uns["meta"][key] = adata.uns["meta"][key].decode() + elif types is tuple: + adata.uns["meta"][key] = list(adata.uns["meta"][key]) + # check for data frame + elif isinstance(adata.uns["meta"][key], pd.DataFrame): + dict_tmp = {} + df_tmp = adata.uns["meta"][key] + for col in df_tmp.columns: + if type(df_tmp[col].iloc[0]) is list: + # iterate over list entries + for n in range(len(df_tmp[col].iloc[0])): + dict_tmp[col + str(n + 1)] = [entry[n] for entry in df_tmp[col]] + else: + df_tmp[col] = df_tmp[col].fillna(str(0)) + dict_tmp[col] = df_tmp[col].values + adata.uns["meta"][key] = dict_tmp + + if spillover_key in fcsdata.meta: + adata.uns["spill_mat"] = _process_data.create_spillover_mat( + fcsdata, key=spillover_key + ) + adata.uns["comp_mat"] = _process_data.create_comp_mat(adata.uns["spill_mat"]) + + if save: + adata.write_h5ad(Path(filenamefcs + "_converted" + ".h5ad")) + + return adata + + +def readandconvert(datafile="", spillover_key="$SPILLOVER", save_flag=False): + """Load files and converts them according to their extension. + + :rtype: A list of loaded files. + """ + elementlist = [] + + # Path to file + if datafile != "": + file_names = [datafile] + else: + from tkinter import Tk, filedialog + + username = getpass.getuser() # current username + + file_dialog = Tk() + file_dialog.withdraw() + + file_names = filedialog.askopenfilenames( + initialdir="/home/%s/" % username, + title="Select file", + filetypes=( + ("all files", "*.*"), + ("fcs files", "*.fcs"), + ("h5ad files", ".h5ad"), + ), + ) + + for file_name in file_names: + file_path = file_name + filename, file_extension = os.path.splitext(file_path) + + if file_extension in [".fcs", ".FCS"]: + elementlist.append( + __toanndata(filename, file_path, spillover_key, save_flag) + ) + else: + print("File " + file_name + " can not be converted!") + + if len(elementlist) == 1: + return elementlist[0] + else: + return elementlist diff --git a/pytometry/tools/__init__.py b/pytometry/tools/__init__.py new file mode 100644 index 0000000..6777556 --- /dev/null +++ b/pytometry/tools/__init__.py @@ -0,0 +1 @@ +from ._normalization import normalize_arcsinh, normalize_biExp, normalize_logicle diff --git a/pytometry/tools/_normalization.py b/pytometry/tools/_normalization.py new file mode 100644 index 0000000..fc30c34 --- /dev/null +++ b/pytometry/tools/_normalization.py @@ -0,0 +1,509 @@ +import numpy as np +from anndata import AnnData +from scipy import interpolate + + +def normalize_arcsinh(adata: AnnData, cofactor: float): + """Inverse hyperbolic sine transformation. + + Args: + adata (AnnData): anndata object + cofactor (float): all values are divided by this + factor before arcsinh transformation + recommended values for cyTOF data: 5 + and for flow data: 150 + + Returns: + AnnData: normalised adata object + """ + adata.X = np.arcsinh(adata.X / cofactor) + return adata + + +def normalize_logicle(adata: AnnData, t=262144, m=4.5, w=0.5, a=0): + """Logicle transformation. + + Logicle transformation, implemented as defined in the + GatingML 2.0 specification, adapted from FlowKit and Flowutils + Python packages: + + logicle(x, T, W, M, A) = root(B(y, T, W, M, A) - x) + + where B is a modified bi-exponential function defined as: + + B(y, T, W, M, A) = ae^(by) - ce^(-dy) - f + + The Logicle transformation was originally defined in the publication: + + Moore WA and Parks DR. Update for the logicle data scale + including operational code implementations. + Cytometry A., 2012:81A(4):273-277. + + :param adata: anndata object + :param t: parameter for the top of the linear scale + (e.g. 262144) + :param m: parameter for the number of decades the true + logarithmic scale approaches at the high end of the scale + :param w: parameter for the approximate number of decades + in the linear region + :param a: parameter for the additional number of negative decades + """ + # initialise precision + taylor_length = 16 + # initialise parameter dictionary + p = dict() + + T = t + M = m + W = w + A = a + + # actual parameters + # formulas from bi-exponential paper + p["w"] = W / (M + A) + p["x2"] = A / (M + A) + p["x1"] = p["x2"] + p["w"] + p["x0"] = p["x2"] + 2 * p["w"] + p["b"] = (M + A) * np.log(10) + p["d"] = _solve(p["b"], p["w"]) + + c_a = np.exp(p["x0"] * (p["b"] + p["d"])) + mf_a = np.exp(p["b"] * p["x1"]) - c_a / np.exp(p["d"] * p["x1"]) + p["a"] = T / ((np.exp(p["b"]) - mf_a) - c_a / np.exp(p["d"])) + p["c"] = c_a * p["a"] + p["f"] = -mf_a * p["a"] + + # use Taylor series near x1, i.e., data zero to + # avoid round off problems of formal definition + p["xTaylor"] = p["x1"] + p["w"] / 4 + + # compute coefficients of the Taylor series + posCoef = p["a"] * np.exp(p["b"] * p["x1"]) + negCoef = -p["c"] / np.exp(p["d"] * p["x1"]) + + # 16 is enough for full precision of typical scales + p["taylor"] = np.zeros(taylor_length) + + for i in range(0, taylor_length): + posCoef *= p["b"] / (i + 1) + negCoef *= -p["d"] / (i + 1) + p["taylor"][i] = posCoef + negCoef + + p["taylor"][1] = 0 # exact result of Logicle condition + + # end original initialize method + + # apply scaling to each value + for i in range(0, adata.n_vars): + for j in range(0, adata.n_obs): + adata.X[j, i] = _scale(adata.X[j, i], p) + + return adata + + +def _scale(value, p): + """Scale helper function. + + Args: + value (float): Entry in the anndata matrix + p (dict): Parameter dictionary + + Returns: + float: Scaled value or -1 + """ + DBL_EPSILON = 1e-9 # from C++, + # defined as the smallest difference between 1 + # and the next larger number + # handle true zero separately + if value == 0: + return p["x1"] + + # reflect negative values + negative = value < 0 + if negative: + value = -value + + # initial guess at solution + + if value < p["f"]: + # use linear approximation in the quasi linear region + x = p["x1"] + value / p["taylor"][0] + else: + # otherwise use ordinary logarithm + x = np.log(value / p["a"]) / p["b"] + + # try for double precision unless in extended range + tolerance = 3 * DBL_EPSILON + if x > 1: + tolerance = 3 * x * DBL_EPSILON + + for i in range(0, 40): + # compute the function and its first two derivatives + ae2bx = p["a"] * np.exp(p["b"] * x) + ce2mdx = p["c"] / np.exp(p["d"] * x) + + if x < p["xTaylor"]: + # near zero use the Taylor series + y = _seriesBiexponential(p, x) - value + else: + # this formulation has better round-off behavior + y = (ae2bx + p["f"]) - (ce2mdx + value) + abe2bx = p["b"] * ae2bx + cde2mdx = p["d"] * ce2mdx + dy = abe2bx + cde2mdx + ddy = p["b"] * abe2bx - p["d"] * cde2mdx + + # this is Halley's method with cubic convergence + delta = y / (dy * (1 - y * ddy / (2 * dy * dy))) + x -= delta + + # if we've reached the desired precision we're done + if abs(delta) < tolerance: + # handle negative arguments + if negative: + return 2 * p["x1"] - x + else: + return x + + # if we get here, scale did not converge + return -1 + + +def _solve(b, w): + """Helper function for biexponential transformation. + + Args: + b (float): parameter for biex trafo + w (float): parameter for biex trafo + """ + DBL_EPSILON = 1e-9 # from C++, defined as the + # smallest difference between 1 + # and the next larger number + + # w == 0 means its really arcsinh + if w == 0: + return b + + # precision is the same as that of b + tolerance = 2 * b * DBL_EPSILON + + # based on RTSAFE from Numerical Recipes 1st Edition + # bracket the root + d_lo = 0 + d_hi = b + + # bisection first step + d = (d_lo + d_hi) / 2 + last_delta = d_hi - d_lo + + # evaluate the f(w,b) = 2 * (ln(d) - ln(b)) + w * (b + d) + # and its derivative + f_b = -2 * np.log(b) + w * b + f = 2 * np.log(d) + w * d + f_b + last_f = np.nan + + for i in range(1, 40): + # compute the derivative + df = 2 / d + w + + # if Newton's method would step outside the bracket + # or if it isn't converging quickly enough + if ((d - d_hi) * df - f) * ((d - d_lo) * df - f) >= 0 or abs(1.9 * f) > abs( + last_delta * df + ): + # take a bisection step + delta = (d_hi - d_lo) / 2 + d = d_lo + delta + if d == d_lo: + return d # nothing changed, we're done + else: + # otherwise take a Newton's method step + delta = f / df + t = d + d -= delta + if d == t: + return d # nothing changed, we're done + + # if we've reached the desired precision we're done + if abs(delta) < tolerance: + return d + last_delta = delta + + # recompute the function + f = 2 * np.log(d) + w * d + f_b + if f == 0 or f == last_f: + return d # found the root or are not going to get any closer + last_f = f + + # update the bracketing interval + if f < 0: + d_lo = d + else: + d_hi = d + + return -1 + + +def _seriesBiexponential(p, value): + """Helper function to compute biex trafo. + + Args: + p (dict): Parameter dictionary + value (float): Start value for Taylor series expansion + """ + # initialise precision + taylor_length = 16 + # Taylor series is around x1 + x = value - p["x1"] + # note that taylor[1] should be identically zero according + # to the Logicle condition so skip it here + sum1 = p["taylor"][taylor_length - 1] * x + for i in range(taylor_length - 2, 1, -1): + sum1 = (sum1 + p["taylor"][i]) * x + + return (sum1 * x + p["taylor"][0]) * x + + +def normalize_biExp( + adata: AnnData, + negative=0.0, + width=-10.0, + positive=4.418540, + max_value=262144.000029, +): + """Biexponential transformation. + + Biex transform as implemented in FlowJo 10. Adapted from FlowKit + Python package. This transform is applied exactly as the FlowJo 10 + is implemented, using lookup tables with only a limited set + of parameter values. + + Information on the input parameters from the FlowJo docs: + Adjusting width: + The value for w will determine the amount of channels to be + compressed into linear space around zero. The space of linear does + not change, but rather the number of channels or bins being + compressed into the linear space. Width should be set high enough + that all of the data in the histogram is visible on screen, but not + so high that extra white space is seen to the left hand side of your + dimmest distribution. For most practical uses, once all events have + been shifted off the axis and there is no more axis 'pile-up', then + the optimal width basis value has been reached. + Negative: + Another component in the biexponential transform calculation is the + negative decades or negative space. This is the only other value you + will probably ever need to adjust. In cases where a high width basis + may start compressing dim events into the negative cluster, you may + want to lower the width basis (less compression around zero) and + instead, increase the negative space by 0.5 - 1.0. Doing this will + expand the space around zero so the dim events are still visible, + but also expand the negative space to remove the cells from the axis + and allow you to see the full distribution. + Positive: + The presence of the positive decade adjustment is due to the + algorithm used for logicle transformation, but is not useful in + 99.9% of the cases that require adjusting the biexponential + transform. It may be appropriate to adjust this value only if you + use data that displays data with a data range greater than 5 decades. + + :param adata: anndata object representing the FCS data + :param negative: Value for the FlowJo biex option 'negative' (float) + or pd.Series + :param width: Value for the FlowJo biex option 'width' (float) or + pd.Series + :param positive: Value for the FlowJo biex option 'positive' (float) + or pd.Series + :param max_value: parameter for the top of the linear scale + (default=262144) or pd.Series + """ + # check inputs + inputs = [negative, width, positive, max_value] + len_param = 0.0 + for N in inputs: + if hasattr(N, "__len__") and (not isinstance(N, str)): + len_param += len(N) / 4 + else: # integer values do not have len attribute + len_param += 0.25 + + # transform every variable the same: + if len_param == 1: + x, y = _generate_biex_lut( + neg=negative, width_basis=width, pos=positive, max_value=max_value + ) + + # lut_func to apply for transformation + lut_func = interpolate.interp1d( + x, y, kind="linear", bounds_error=False, fill_value=(np.min(y), np.max(y)) + ) + + # transform adata values using the biexponential function + adata.X = lut_func(adata.X) + + elif len_param == adata.n_vars: + for idx, marker in enumerate(adata.var_names): + # get correct row + row_idx = negative.index == marker + + negative_tmp = negative[row_idx][0] + width_tmp = width[row_idx][0] + positive_tmp = positive[row_idx][0] + max_value_tmp = max_value[row_idx][0] + + x, y = _generate_biex_lut( + neg=negative_tmp, + width_basis=width_tmp, + pos=positive_tmp, + max_value=max_value_tmp, + ) + + # lut_func to apply for transformation + lut_func = interpolate.interp1d( + x, + y, + kind="linear", + bounds_error=False, + fill_value=(np.min(y), np.max(y)), + ) + + # transform adata values using the biexponential function + adata.X[:, idx] = lut_func(adata.X[:, idx]) + else: + print( + "One of the parameters has the incorrect length. Return adata" + " without normalising." + ) + + return adata + + +def _generate_biex_lut( + channel_range=4096, pos=4.418540, neg=0.0, width_basis=-10, max_value=262144.000029 +): + """Creates a FlowJo compatible biex lookup table. + + Code adopted from FlowKit Python package. + Creates a FlowJo compatible biex lookup table. + + Implementation ported from the R library cytolib, which claims to be + directly ported from the legacy Java code from TreeStar. + + :param channel_range: Maximum positive value of the output range + :param pos: Number of decades + :param neg: Number of extra negative decades + :param width_basis: Controls the amount of input range compressed in + the zero / linear region. A higher + width basis value will include more input values in + the zero / linear region. + :param max_value: maximum input value to scale + :return: 2-column NumPy array of the LUT (column order: input, output) + """ + ln10 = np.log(10.0) + decades = pos + low_scale = width_basis + width = np.log10(-low_scale) + + decades = decades - (width / 2) + + extra = neg + + if extra < 0: + extra = 0 + + extra = extra + (width / 2) + + zero_point = int((extra * channel_range) / (extra + decades)) + zero_point = int(np.min([zero_point, channel_range / 2])) + + if zero_point > 0: + decades = extra * channel_range / zero_point + + width = width / (2 * decades) + + maximum = max_value + positive_range = ln10 * decades + minimum = maximum / np.exp(positive_range) + + negative_range = _log_root(positive_range, width) + + max_channel_value = channel_range + 1 + n_points = max_channel_value + + step = (max_channel_value - 1) / (n_points - 1) + + values = np.arange(n_points) + positive = np.exp(values / float(n_points) * positive_range) + negative = np.exp(values / float(n_points) * -negative_range) + + # apply step to values + values = values * step + + s = np.exp((positive_range + negative_range) * (width + extra / decades)) + + negative *= s + s = positive[zero_point] - negative[zero_point] + + positive[zero_point:n_points] = ( + positive[zero_point:n_points] - negative[zero_point:n_points] + ) + positive[zero_point:n_points] = minimum * (positive[zero_point:n_points] - s) + + neg_range = np.arange(zero_point) + m = 2 * zero_point - neg_range + + positive[neg_range] = -positive[m] + + return positive, values + + +def _log_root(b, w): + """Helper function. + + Args: + b (float): Upper bound + w (float): Step parameter + + Returns: + float: Solution to interpolation + """ + # Code adopted from FlowKit Python package + x_lo = 0 + x_hi = b + d = (x_lo + x_hi) / 2 + dx = abs(int(x_lo - x_hi)) + dx_last = dx + fb = -2 * np.log(b) + w * b + f = 2.0 * np.log(d) + w * b + fb + df = 2 / d + w + + if w == 0: + return b + + for i in range(100): + if (((d - x_hi) * df - f) - ((d - x_lo) * df - f)) > 0 or abs(2 * f) > abs( + dx_last * df + ): + dx = (x_hi - x_lo) / 2 + d = x_lo + dx + if d == x_lo: + return d + else: + dx = f / df + t = d + d -= dx + # if dx is smaller than some precision threshold + if d == t: + return d + + # if dx is smaller than some precision threshold + if abs(dx) < 1.0e-12: + return d + + dx_last = dx + f = 2 * np.log(d) + w * d + fb + df = 2 / d + w + if f < 0: + x_lo = d + else: + x_hi = d + + return d