diff --git a/stumpy/aamp.py b/stumpy/aamp.py index 201e4413b..82eb41639 100644 --- a/stumpy/aamp.py +++ b/stumpy/aamp.py @@ -240,7 +240,8 @@ def _aamp( return np.power(P[0, :, :], 1.0 / p), I[0, :, :] -def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0): +def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1): + # function needs to be changed to return top-k matrix profile """ Compute the non-normalized (i.e., without z-normalization) matrix profile @@ -267,6 +268,11 @@ def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0): p : float, default 2.0 The p-norm to apply for computing the Minkowski distance. + k : int, default 1 + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- out : numpy.ndarray diff --git a/stumpy/aamped.py b/stumpy/aamped.py index d6bf6d97b..4499c58b5 100644 --- a/stumpy/aamped.py +++ b/stumpy/aamped.py @@ -12,7 +12,8 @@ logger = logging.getLogger(__name__) -def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0): +def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1): + # function needs to be revised to return top-k matrix profile """ Compute the non-normalized (i.e., without z-normalization) matrix profile @@ -46,6 +47,11 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0): p : float, default 2.0 The p-norm to apply for computing the Minkowski distance. + k : int, default 1 + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- out : numpy.ndarray diff --git a/stumpy/core.py b/stumpy/core.py index 44b6f539e..535471761 100644 --- a/stumpy/core.py +++ b/stumpy/core.py @@ -7,7 +7,7 @@ import inspect import numpy as np -from numba import njit +from numba import njit, prange from scipy.signal import convolve from scipy.ndimage import maximum_filter1d, minimum_filter1d from scipy import linalg @@ -2559,3 +2559,48 @@ def _select_P_ABBA_value(P_ABBA, k, custom_func=None): MPdist = partition[k] return MPdist + + +@njit(parallel=True) +def _merge_topk_PI(PA, PB, IA, IB): + """ + Merge two top-k matrix profiles PA and PB, and update PA (in place) while + prioritizing values of PA in ties. Also, update IA accordingly. + + Parameters + ---------- + PA : numpy.ndarray + a (top-k) matrix profile, with ndim of 2, where values in each row are + sorted in ascending order. Also, it needs to be the same shape as PB. + + PB : numpy.ndarray + a (top-k) matrix profile, with ndim of 2, where values in each row are + sorted in ascending order. Also, it needs to be the same shape as PA. + + IA : numpy.ndarray + a (top-k) matrix profile indices, corresponding to PA + + IB : numpy.ndarray + a (top-k) matrix profile indices, corresponding to PB + + Returns + ------- + None + """ + for i in prange(PB.shape[0]): + start = 0 + stop = np.searchsorted(PA[i], PB[i, -1], side="right") + + for j in range(PB.shape[1]): + if PB[i, j] < PA[i, -1]: + idx = np.searchsorted(PA[i, start:stop], PB[i, j], side="right") + start + + for g in range(PB.shape[1] - 1, idx, -1): + PA[i, g] = PA[i, g - 1] + IA[i, g] = IA[i, g - 1] + + PA[i, idx] = PB[i, j] + IA[i, idx] = IB[i, j] + + start = idx + stop += 1 # because of shifting elements to the right by one diff --git a/stumpy/gpu_aamp.py b/stumpy/gpu_aamp.py index e62be7b02..0c9a21a85 100644 --- a/stumpy/gpu_aamp.py +++ b/stumpy/gpu_aamp.py @@ -339,7 +339,9 @@ def _gpu_aamp( return profile_fname, indices_fname -def gpu_aamp(T_A, m, T_B=None, ignore_trivial=True, device_id=0, p=2.0): +def gpu_aamp(T_A, m, T_B=None, ignore_trivial=True, device_id=0, p=2.0, k=1): + # function needs to be revised to return (top-k) matrix profile and + # matrix profile indices """ Compute the non-normalized (i.e., without z-normalization) matrix profile with one or more GPU devices @@ -375,6 +377,11 @@ def gpu_aamp(T_A, m, T_B=None, ignore_trivial=True, device_id=0, p=2.0): p : float, default 2.0 The p-norm to apply for computing the Minkowski distance. + k : int, default 1 + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- out : numpy.ndarray diff --git a/stumpy/gpu_stump.py b/stumpy/gpu_stump.py index 667dd8b56..8f4683388 100644 --- a/stumpy/gpu_stump.py +++ b/stumpy/gpu_stump.py @@ -15,9 +15,102 @@ logger = logging.getLogger(__name__) +@cuda.jit(device=True) +def _gpu_searchsorted_left(a, v, bfs, nlevel): + """ + Equivalent to numpy.searchsorted(a, v, side='left'), designed + to be used as device function + + Parameters + ---------- + a : numpy.ndarray + 1-dim array sorted in ascending order. + + v : float + value to insert into array `a` + + bfs : numpy.ndarray + The breadth-first-search indices where the missing leaves of its corresponding + binary search tree are filled with -1. + + nlevel : int + the number of levels in the binary search tree from which the array + `bfs` is obtained. + + Returns + ------- + idx : int + the index of the insertion point + """ + n = a.shape[0] + idx = 0 + for level in range(nlevel): + if v <= a[bfs[idx]]: + next_idx = 2 * idx + 1 + else: + next_idx = 2 * idx + 2 + + if level == nlevel - 1 or bfs[next_idx] < 0: + if v <= a[bfs[idx]]: + idx = max(bfs[idx], 0) + else: + idx = min(bfs[idx] + 1, n) + break + idx = next_idx + + return idx + + +@cuda.jit(device=True) +def _gpu_searchsorted_right(a, v, bfs, nlevel): + """ + Equivalent to numpy.searchsorted(a, v, side='right'), designed + to be used as device function + + Parameters + ---------- + a : numpy.ndarray + 1-dim array sorted in ascending order. + + v : float + value to insert into array `a` + + bfs : numpy.ndarray + The breadth-first-search indices where the missing leaves of its corresponding + binary search tree are filled with -1. + + nlevel : int + the number of levels in the binary search tree from which the array + `bfs` is obtained. + + Returns + ------- + idx : int + the index of the insertion point + """ + n = a.shape[0] + idx = 0 + for level in range(nlevel): + if v < a[bfs[idx]]: + next_idx = 2 * idx + 1 + else: + next_idx = 2 * idx + 2 + + if level == nlevel - 1 or bfs[next_idx] < 0: + if v < a[bfs[idx]]: + idx = max(bfs[idx], 0) + else: + idx = min(bfs[idx] + 1, n) + break + idx = next_idx + + return idx + + @cuda.jit( "(i8, f8[:], f8[:], i8, f8[:], f8[:], f8[:], f8[:], f8[:]," - "f8[:], f8[:], i8, b1, i8, f8[:, :], i8[:, :], b1)" + "f8[:], f8[:], i8, b1, i8, f8[:, :], f8[:], f8[:], i8[:, :], i8[:], i8[:]," + "b1, i8[:], i8, i2)" ) def _compute_and_update_PI_kernel( i, @@ -31,12 +124,19 @@ def _compute_and_update_PI_kernel( Σ_T, μ_Q, σ_Q, - k, + w, ignore_trivial, excl_zone, profile, + profile_L, + profile_R, indices, + indices_L, + indices_R, compute_QT, + bfs, + nlevel, + k, ): """ A Numba CUDA kernel to update the matrix profile and matrix profile indices @@ -79,7 +179,7 @@ def _compute_and_update_PI_kernel( σ_Q : numpy.ndarray Standard deviation of the query sequence, `Q` - k : int + w : int The total number of sliding windows to iterate over ignore_trivial : bool @@ -91,18 +191,39 @@ def _compute_and_update_PI_kernel( sliding window profile : numpy.ndarray - Matrix profile. The first column consists of the global matrix profile, - the second column consists of the left matrix profile, and the third - column consists of the right matrix profile. + The (top-k) matrix profile, sorted in ascending order per row + + profile_L : numpy.ndarray + The (top-1) left matrix profile + + profile_R : numpy.ndarray + The (top-1) right matrix profile indices : numpy.ndarray - The first column consists of the matrix profile indices, the second - column consists of the left matrix profile indices, and the third - column consists of the right matrix profile indices. + The (top-k) matrix profile indices + + indices_L : numpy.ndarray + The (top-1) left matrix profile indices + + indices_R : numpy.ndarray + The (top-1) right matrix profile indices compute_QT : bool A boolean flag for whether or not to compute QT + bfs : numpy.ndarray + The breadth-first-search indices where the missing leaves of its corresponding + binary search tree are filled with -1. + + nlevel : int + the number of levels in the binary search tree from which the array + `bfs` is obtained. + + k : int + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- None @@ -126,7 +247,7 @@ def _compute_and_update_PI_kernel( for j in range(start, QT_out.shape[0], stride): zone_start = max(0, j - excl_zone) - zone_stop = min(k, j + excl_zone) + zone_stop = min(w, j + excl_zone) if compute_QT: QT_out[j] = ( @@ -157,16 +278,21 @@ def _compute_and_update_PI_kernel( if ignore_trivial: if i <= zone_stop and i >= zone_start: p_norm = np.inf - if p_norm < profile[j, 1] and i < j: - profile[j, 1] = p_norm - indices[j, 1] = i - if p_norm < profile[j, 2] and i > j: - profile[j, 2] = p_norm - indices[j, 2] = i + if p_norm < profile_L[j] and i < j: + profile_L[j] = p_norm + indices_L[j] = i + if p_norm < profile_R[j] and i > j: + profile_R[j] = p_norm + indices_R[j] = i - if p_norm < profile[j, 0]: - profile[j, 0] = p_norm - indices[j, 0] = i + if p_norm < profile[j, -1]: + idx = _gpu_searchsorted_right(profile[j], p_norm, bfs, nlevel) + for g in range(k - 1, idx, -1): + profile[j, g] = profile[j, g - 1] + indices[j, g] = indices[j, g - 1] + + profile[j, idx] = p_norm + indices[j, idx] = i def _gpu_stump( @@ -181,10 +307,11 @@ def _gpu_stump( QT_first_fname, μ_Q_fname, σ_Q_fname, - k, + w, ignore_trivial=True, range_start=1, device_id=0, + k=1, ): """ A Numba CUDA version of STOMP for parallel computation of the @@ -235,7 +362,7 @@ def _gpu_stump( The file name for the standard deviation of the query sequence, `Q`, relative to the current sliding window - k : int + w : int The total number of sliding windows to iterate over ignore_trivial : bool @@ -249,6 +376,11 @@ def _gpu_stump( device_id : int The (GPU) device number to use. The default value is `0`. + k : int + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- profile_fname : str @@ -289,7 +421,7 @@ def _gpu_stump( Note that left and right matrix profiles are only available for self-joins. """ threads_per_block = config.STUMPY_THREADS_PER_BLOCK - blocks_per_grid = math.ceil(k / threads_per_block) + blocks_per_grid = math.ceil(w / threads_per_block) T_A = np.load(T_A_fname, allow_pickle=False) T_B = np.load(T_B_fname, allow_pickle=False) @@ -300,6 +432,10 @@ def _gpu_stump( μ_Q = np.load(μ_Q_fname, allow_pickle=False) σ_Q = np.load(σ_Q_fname, allow_pickle=False) + device_bfs = cuda.to_device(core._bfs_indices(k, fill_value=-1)) + nlevel = np.floor(np.log2(k) + 1).astype(np.int64) + # number of levels in binary seearch tree from which `bfs` is constructed. + with cuda.gpus[device_id]: device_T_A = cuda.to_device(T_A) device_QT_odd = cuda.to_device(QT) @@ -316,11 +452,22 @@ def _gpu_stump( device_M_T = cuda.to_device(M_T) device_Σ_T = cuda.to_device(Σ_T) - profile = np.full((k, 3), np.inf, dtype=np.float64) - indices = np.full((k, 3), -1, dtype=np.int64) + profile = np.full((w, k), np.inf, dtype=np.float64) + indices = np.full((w, k), -1, dtype=np.int64) + + profile_L = np.full(w, np.inf, dtype=np.float64) + indices_L = np.full(w, -1, dtype=np.int64) + + profile_R = np.full(w, np.inf, dtype=np.float64) + indices_R = np.full(w, -1, dtype=np.int64) device_profile = cuda.to_device(profile) + device_profile_L = cuda.to_device(profile_L) + device_profile_R = cuda.to_device(profile_R) device_indices = cuda.to_device(indices) + device_indices_L = cuda.to_device(indices_L) + device_indices_R = cuda.to_device(indices_R) + _compute_and_update_PI_kernel[blocks_per_grid, threads_per_block]( range_start - 1, device_T_A, @@ -333,12 +480,19 @@ def _gpu_stump( device_Σ_T, device_μ_Q, device_σ_Q, - k, + w, ignore_trivial, excl_zone, device_profile, + device_profile_L, + device_profile_R, device_indices, + device_indices_L, + device_indices_R, False, + device_bfs, + nlevel, + k, ) for i in range(range_start, range_stop): @@ -354,27 +508,52 @@ def _gpu_stump( device_Σ_T, device_μ_Q, device_σ_Q, - k, + w, ignore_trivial, excl_zone, device_profile, + device_profile_L, + device_profile_R, device_indices, + device_indices_L, + device_indices_R, True, + device_bfs, + nlevel, + k, ) profile = device_profile.copy_to_host() + profile_L = device_profile_L.copy_to_host() + profile_R = device_profile_R.copy_to_host() indices = device_indices.copy_to_host() + indices_L = device_indices_L.copy_to_host() + indices_R = device_indices_R.copy_to_host() + profile = np.sqrt(profile) + profile_L = np.sqrt(profile_L) + profile_R = np.sqrt(profile_R) profile_fname = core.array_to_temp_file(profile) + profile_L_fname = core.array_to_temp_file(profile_L) + profile_R_fname = core.array_to_temp_file(profile_R) indices_fname = core.array_to_temp_file(indices) + indices_L_fname = core.array_to_temp_file(indices_L) + indices_R_fname = core.array_to_temp_file(indices_R) - return profile_fname, indices_fname + return ( + profile_fname, + profile_L_fname, + profile_R_fname, + indices_fname, + indices_L_fname, + indices_R_fname, + ) @core.non_normalized(gpu_aamp) def gpu_stump( - T_A, m, T_B=None, ignore_trivial=True, device_id=0, normalize=True, p=2.0 + T_A, m, T_B=None, ignore_trivial=True, device_id=0, normalize=True, p=2.0, k=1 ): """ Compute the z-normalized matrix profile with one or more GPU devices @@ -417,13 +596,24 @@ def gpu_stump( The p-norm to apply for computing the Minkowski distance. This parameter is ignored when `normalize == True`. + k : int, default 1 + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- out : numpy.ndarray - The first column consists of the matrix profile, the second column - consists of the matrix profile indices, the third column consists of - the left matrix profile indices, and the fourth column consists of - the right matrix profile indices. + When k = 1 (default), the first column consists of the matrix profile, + the second column consists of the matrix profile indices, the third column + consists of the left matrix profile indices, and the fourth column consists + of the right matrix profile indices. However, when k > 1, the output array + will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k]) + consists of the top-k matrix profile, the next set of k columns + (i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile + indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or, + equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left + matrix profile indices and the top-1 right matrix profile indices, respectively. See Also -------- @@ -505,7 +695,7 @@ def gpu_stump( logger.warning("Try setting `ignore_trivial = False`.") n = T_B.shape[0] - k = T_A.shape[0] - m + 1 + w = T_A.shape[0] - m + 1 l = n - m + 1 excl_zone = int( np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM) @@ -518,8 +708,6 @@ def gpu_stump( μ_Q_fname = core.array_to_temp_file(μ_Q) σ_Q_fname = core.array_to_temp_file(σ_Q) - out = np.empty((k, 4), dtype=object) - if isinstance(device_id, int): device_ids = [device_id] else: @@ -528,6 +716,12 @@ def gpu_stump( profile = [None] * len(device_ids) indices = [None] * len(device_ids) + profile_L = [None] * len(device_ids) + indices_L = [None] * len(device_ids) + + profile_R = [None] * len(device_ids) + indices_R = [None] * len(device_ids) + for _id in device_ids: with cuda.gpus[_id]: if ( @@ -571,16 +765,24 @@ def gpu_stump( QT_first_fname, μ_Q_fname, σ_Q_fname, - k, + w, ignore_trivial, start + 1, device_ids[idx], + k, ), ) else: # Execute last chunk in parent process # Only parent process is executed when a single GPU is requested - profile[idx], indices[idx] = _gpu_stump( + ( + profile[idx], + profile_L[idx], + profile_R[idx], + indices[idx], + indices_L[idx], + indices_R[idx], + ) = _gpu_stump( T_A_fname, T_B_fname, m, @@ -592,10 +794,11 @@ def gpu_stump( QT_first_fname, μ_Q_fname, σ_Q_fname, - k, + w, ignore_trivial, start + 1, device_ids[idx], + k, ) # Clean up process pool for multi-GPU request @@ -606,7 +809,14 @@ def gpu_stump( # Collect results from spawned child processes if they exist for idx, result in enumerate(results): if result is not None: - profile[idx], indices[idx] = result.get() + ( + profile[idx], + profile_L[idx], + profile_R[idx], + indices[idx], + indices_L[idx], + indices_R[idx], + ) = result.get() os.remove(T_A_fname) os.remove(T_B_fname) @@ -621,22 +831,44 @@ def gpu_stump( for idx in range(len(device_ids)): profile_fname = profile[idx] + profile_L_fname = profile_L[idx] + profile_R_fname = profile_R[idx] indices_fname = indices[idx] + indices_L_fname = indices_L[idx] + indices_R_fname = indices_R[idx] + profile[idx] = np.load(profile_fname, allow_pickle=False) + profile_L[idx] = np.load(profile_L_fname, allow_pickle=False) + profile_R[idx] = np.load(profile_R_fname, allow_pickle=False) indices[idx] = np.load(indices_fname, allow_pickle=False) + indices_L[idx] = np.load(indices_L_fname, allow_pickle=False) + indices_R[idx] = np.load(indices_R_fname, allow_pickle=False) + os.remove(profile_fname) + os.remove(profile_L_fname) + os.remove(profile_R_fname) os.remove(indices_fname) - - for i in range(1, len(device_ids)): - # Update all matrix profiles and matrix profile indices - # (global, left, right) and store in profile[0] and indices[0] - for col in range(profile[0].shape[1]): # pragma: no cover - cond = profile[0][:, col] < profile[i][:, col] - profile[0][:, col] = np.where(cond, profile[0][:, col], profile[i][:, col]) - indices[0][:, col] = np.where(cond, indices[0][:, col], indices[i][:, col]) - - out[:, 0] = profile[0][:, 0] - out[:, 1:4] = indices[0][:, :] + os.remove(indices_L_fname) + os.remove(indices_R_fname) + + for i in range(1, len(device_ids)): # pragma: no cover + # Update (top-k) matrix profile and matrix profile indices + core._merge_topk_PI(profile[0], profile[i], indices[0], indices[i]) + + # Update (top-1) left matrix profile and matrix profile indices + cond = profile_L[0] < profile_L[i] + profile_L[0] = np.where(cond, profile_L[0], profile_L[i]) + indices_L[0] = np.where(cond, indices_L[0], indices_L[i]) + + # Update (top-1) right matrix profile and matrix profile indices + cond = profile_R[0] < profile_R[i] + profile_R[0] = np.where(cond, profile_R[0], profile_R[i]) + indices_R[0] = np.where(cond, indices_R[0], indices_R[i]) + + out = np.empty((w, 2 * k + 2), dtype=object) # last two columns are to store + # (top-1) left/right matrix profile indices + out[:, :k] = profile[0] + out[:, k:] = np.column_stack((indices[0], indices_L[0], indices_R[0])) threshold = 10e-6 if core.are_distances_too_small(out[:, 0], threshold=threshold): # pragma: no cover diff --git a/stumpy/scrump.py b/stumpy/scrump.py index 25f890bda..53d10b612 100644 --- a/stumpy/scrump.py +++ b/stumpy/scrump.py @@ -629,7 +629,7 @@ def update(self): if self._chunk_idx < self._n_chunks: start_idx, stop_idx = self._chunk_diags_ranges[self._chunk_idx] - P, I = _stump( + P, PL, PR, I, IL, IR = _stump( self._T_A, self._T_B, self._m, @@ -645,8 +645,12 @@ def update(self): self._T_B_subseq_isconstant, self._diags[start_idx:stop_idx], self._ignore_trivial, + 1, # revise module to accept parameter k for top-k matrix profile ) + P = np.column_stack((P, PL, PR)) + I = np.column_stack((I, IL, IR)) + # Update matrix profile and indices for i in range(self._P.shape[0]): if self._P[i, 0] > P[i, 0]: diff --git a/stumpy/stump.py b/stumpy/stump.py index 97334eb5a..f5a5fe811 100644 --- a/stumpy/stump.py +++ b/stumpy/stump.py @@ -40,13 +40,18 @@ def _compute_diagonal( diags_stop_idx, thread_idx, ρ, + ρL, + ρR, I, + IL, + IR, ignore_trivial, + k, ): """ - Compute (Numba JIT-compiled) and update the Pearson correlation, ρ, and I - sequentially along individual diagonals using a single thread and avoiding race - conditions + Compute (Numba JIT-compiled) and update the (top-k) Pearson correlation (ρ), + ρL, ρR, I, IL, and IR sequentially along individual diagonals using a single + thread and avoiding race conditions. Parameters ---------- @@ -116,15 +121,32 @@ def _compute_diagonal( The thread index ρ : numpy.ndarray - The Pearson correlations + The (top-k) Pearson correlations, sorted in ascending order per row + + ρL : numpy.ndarray + The top-1 left Pearson correlations + + ρR : numpy.ndarray + The top-1 right Pearson correlations I : numpy.ndarray - The matrix profile indices + The (top-k) matrix profile indices + + IL : numpy.ndarray + The top-1 left matrix profile indices + + IR : numpy.ndarray + The top-1 right matrix profile indices ignore_trivial : bool Set to `True` if this is a self-join. Otherwise, for AB-join, set this to `False`. Default is `True`. + k : int + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- None @@ -154,18 +176,18 @@ def _compute_diagonal( constant = (m - 1) * m_inverse * m_inverse # (m - 1)/(m * m) for diag_idx in range(diags_start_idx, diags_stop_idx): - k = diags[diag_idx] + g = diags[diag_idx] - if k >= 0: - iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - k)) + if g >= 0: + iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - g)) else: - iter_range = range(-k, min(n_A - m + 1, n_B - m + 1 - k)) + iter_range = range(-g, min(n_A - m + 1, n_B - m + 1 - g)) for i in iter_range: - if i == 0 or (k < 0 and i == -k): + if i == 0 or (g < 0 and i == -g): cov = ( np.dot( - (T_B[i + k : i + k + m] - M_T[i + k]), (T_A[i : i + m] - μ_Q[i]) + (T_B[i + g : i + g + m] - M_T[i + g]), (T_A[i : i + m] - μ_Q[i]) ) * m_inverse ) @@ -177,38 +199,46 @@ def _compute_diagonal( # - (T_B[i + k - 1] - M_T_m_1[i + k]) * (T_A[i - 1] - μ_Q_m_1[i]) # ) cov = cov + constant * ( - cov_a[i + k] * cov_b[i] - cov_c[i + k] * cov_d[i] + cov_a[i + g] * cov_b[i] - cov_c[i + g] * cov_d[i] ) - if T_B_subseq_isfinite[i + k] and T_A_subseq_isfinite[i]: + if T_B_subseq_isfinite[i + g] and T_A_subseq_isfinite[i]: # Neither subsequence contains NaNs - if T_B_subseq_isconstant[i + k] or T_A_subseq_isconstant[i]: + if T_B_subseq_isconstant[i + g] or T_A_subseq_isconstant[i]: pearson = 0.5 else: - pearson = cov * Σ_T_inverse[i + k] * σ_Q_inverse[i] + pearson = cov * Σ_T_inverse[i + g] * σ_Q_inverse[i] - if T_B_subseq_isconstant[i + k] and T_A_subseq_isconstant[i]: + if T_B_subseq_isconstant[i + g] and T_A_subseq_isconstant[i]: pearson = 1.0 if pearson > ρ[thread_idx, i, 0]: - ρ[thread_idx, i, 0] = pearson - I[thread_idx, i, 0] = i + k + idx = np.searchsorted(ρ[thread_idx, i], pearson) + ρ[thread_idx, i, : idx - 1] = ρ[thread_idx, i, 1:idx] + ρ[thread_idx, i, idx - 1] = pearson + + I[thread_idx, i, : idx - 1] = I[thread_idx, i, 1:idx] + I[thread_idx, i, idx - 1] = i + g if ignore_trivial: # self-joins only - if pearson > ρ[thread_idx, i + k, 0]: - ρ[thread_idx, i + k, 0] = pearson - I[thread_idx, i + k, 0] = i + if pearson > ρ[thread_idx, i + g, 0]: + idx = np.searchsorted(ρ[thread_idx, i + g], pearson) + ρ[thread_idx, i + g, : idx - 1] = ρ[thread_idx, i + g, 1:idx] + ρ[thread_idx, i + g, idx - 1] = pearson + + I[thread_idx, i + g, : idx - 1] = I[thread_idx, i + g, 1:idx] + I[thread_idx, i + g, idx - 1] = i - if i < i + k: + if i < i + g: # left pearson correlation and left matrix profile index - if pearson > ρ[thread_idx, i + k, 1]: - ρ[thread_idx, i + k, 1] = pearson - I[thread_idx, i + k, 1] = i + if pearson > ρL[thread_idx, i + g]: + ρL[thread_idx, i + g] = pearson + IL[thread_idx, i + g] = i # right pearson correlation and right matrix profile index - if pearson > ρ[thread_idx, i, 2]: - ρ[thread_idx, i, 2] = pearson - I[thread_idx, i, 2] = i + k + if pearson > ρR[thread_idx, i]: + ρR[thread_idx, i] = pearson + IR[thread_idx, i] = i + g return @@ -235,11 +265,13 @@ def _stump( T_B_subseq_isconstant, diags, ignore_trivial, + k, ): """ A Numba JIT-compiled version of STOMPopt with Pearson correlations for parallel - computation of the matrix profile, matrix profile indices, left matrix profile - indices, and right matrix profile indices. + computation of the (top-k) matrix profile, the (top-k) matrix profile indices, + the top-1 left matrix profile and its matrix profile index, and the top-1 right + matrix profile and its matrix profile index. Parameters ---------- @@ -294,15 +326,30 @@ def _stump( Set to `True` if this is a self-join. Otherwise, for AB-join, set this to `False`. Default is `True`. + k : int + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- profile : numpy.ndarray - Matrix profile + The (top-k) matrix profile indices : numpy.ndarray - The first column consists of the matrix profile indices, the second - column consists of the left matrix profile indices, and the third - column consists of the right matrix profile indices. + The (top-k) matrix profile indices + + left profile : numpy.ndarray + The (top-1) left matrix profile + + left indices : numpy.ndarray + The (top-1) left matrix profile indices + + right profile : numpy.ndarray + The (top-1) right matrix profile + + right indices : numpy.ndarray + The (top-1) right matrix profile indices Notes ----- @@ -353,8 +400,15 @@ def _stump( n_B = T_B.shape[0] l = n_A - m + 1 n_threads = numba.config.NUMBA_NUM_THREADS - ρ = np.full((n_threads, l, 3), -np.inf, dtype=np.float64) - I = np.full((n_threads, l, 3), -1, dtype=np.int64) + + ρ = np.full((n_threads, l, k), -np.inf, dtype=np.float64) + I = np.full((n_threads, l, k), -1, dtype=np.int64) + + ρL = np.full((n_threads, l), -np.inf, dtype=np.float64) + IL = np.full((n_threads, l), -1, dtype=np.int64) + + ρR = np.full((n_threads, l), -np.inf, dtype=np.float64) + IR = np.full((n_threads, l), -1, dtype=np.int64) ndist_counts = core._count_diagonal_ndist(diags, m, n_A, n_B) diags_ranges = core._get_array_ranges(ndist_counts, n_threads, False) @@ -377,7 +431,8 @@ def _stump( cov_d[:] = cov_d - μ_Q_m_1 for thread_idx in prange(n_threads): - # Compute and update cov, I within a single thread to avoiding race conditions + # Compute and update pearson correlations and matrix profile indices + # within a single thread and avoiding race conditions _compute_diagonal( T_A, T_B, @@ -399,47 +454,72 @@ def _stump( diags_ranges[thread_idx, 1], thread_idx, ρ, + ρL, + ρR, I, + IL, + IR, ignore_trivial, + k, ) # Reduction of results from all threads for thread_idx in range(1, n_threads): for i in prange(l): - if ρ[0, i, 0] < ρ[thread_idx, i, 0]: - ρ[0, i, 0] = ρ[thread_idx, i, 0] - I[0, i, 0] = I[thread_idx, i, 0] - # left pearson correlation and left matrix profile indices - if ρ[0, i, 1] < ρ[thread_idx, i, 1]: - ρ[0, i, 1] = ρ[thread_idx, i, 1] - I[0, i, 1] = I[thread_idx, i, 1] - # right pearson correlation and right matrix profile indices - if ρ[0, i, 2] < ρ[thread_idx, i, 2]: - ρ[0, i, 2] = ρ[thread_idx, i, 2] - I[0, i, 2] = I[thread_idx, i, 2] - - # Convert pearson correlations to distances - p_norm = np.abs(2 * m * (1 - ρ[0, :, :])) + # top-k + for j in range( + k - 1, -1, -1 + ): # reverse iteration to preserve order in ties + if ρ[0, i, 0] < ρ[thread_idx, i, j]: + idx = np.searchsorted(ρ[0, i], ρ[thread_idx, i, j]) + ρ[0, i, : idx - 1] = ρ[0, i, 1:idx] + ρ[0, i, idx - 1] = ρ[thread_idx, i, j] + + I[0, i, : idx - 1] = I[0, i, 1:idx] + I[0, i, idx - 1] = I[thread_idx, i, j] + + if ρL[0, i] < ρL[thread_idx, i]: + ρL[0, i] = ρL[thread_idx, i] + IL[0, i] = IL[thread_idx, i] + + if ρR[0, i] < ρR[thread_idx, i]: + ρR[0, i] = ρR[thread_idx, i] + IR[0, i] = IR[thread_idx, i] + + # Reverse top-k rho (and its associated I) to be in descending order and + # then convert from Pearson correlations to Euclidean distances (ascending order) + p_norm = np.abs(2 * m * (1 - ρ[0, :, ::-1])) + I = I[0, :, ::-1] + + p_norm_L = np.abs(2 * m * (1 - ρL[0, :])) + p_norm_R = np.abs(2 * m * (1 - ρR[0, :])) + for i in prange(p_norm.shape[0]): - if p_norm[i, 0] < config.STUMPY_P_NORM_THRESHOLD: - p_norm[i, 0] = 0.0 - if p_norm[i, 1] < config.STUMPY_P_NORM_THRESHOLD: - p_norm[i, 1] = 0.0 - if p_norm[i, 2] < config.STUMPY_P_NORM_THRESHOLD: - p_norm[i, 2] = 0.0 + for j in prange(p_norm.shape[1]): + if p_norm[i, j] < config.STUMPY_P_NORM_THRESHOLD: + p_norm[i, j] = 0.0 + + if p_norm_L[i] < config.STUMPY_P_NORM_THRESHOLD: + p_norm_L[i] = 0.0 + + if p_norm_R[i] < config.STUMPY_P_NORM_THRESHOLD: + p_norm_R[i] = 0.0 + P = np.sqrt(p_norm) + PL = np.sqrt(p_norm_L) + PR = np.sqrt(p_norm_R) - return P[:, :], I[0, :, :] + return P, PL, PR, I, IL[0, :], IR[0, :] @core.non_normalized(aamp) -def stump(T_A, m, T_B=None, ignore_trivial=True, normalize=True, p=2.0): +def stump(T_A, m, T_B=None, ignore_trivial=True, normalize=True, p=2.0, k=1): """ Compute the z-normalized matrix profile This is a convenience wrapper around the Numba JIT-compiled parallelized - `_stump` function which computes the matrix profile according to STOMPopt with - Pearson correlations. + `_stump` function which computes the (top-k) matrix profile according to + STOMPopt with Pearson correlations. Parameters ---------- @@ -467,13 +547,24 @@ def stump(T_A, m, T_B=None, ignore_trivial=True, normalize=True, p=2.0): The p-norm to apply for computing the Minkowski distance. This parameter is ignored when `normalize == True`. + k : int, default 1 + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- out : numpy.ndarray - The first column consists of the matrix profile, the second column - consists of the matrix profile indices, the third column consists of - the left matrix profile indices, and the fourth column consists of - the right matrix profile indices. + When k = 1 (default), the first column consists of the matrix profile, + the second column consists of the matrix profile indices, the third column + consists of the left matrix profile indices, and the fourth column consists + of the right matrix profile indices. However, when k > 1, the output array + will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k]) + consists of the top-k matrix profile, the next set of k columns + (i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile + indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or, + equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left + matrix profile indices and the top-1 right matrix profile indices, respectively. See Also -------- @@ -587,14 +678,13 @@ def stump(T_A, m, T_B=None, ignore_trivial=True, normalize=True, p=2.0): l = n_A - m + 1 excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM)) - out = np.empty((l, 4), dtype=object) if ignore_trivial: diags = np.arange(excl_zone + 1, n_A - m + 1, dtype=np.int64) else: diags = np.arange(-(n_A - m + 1) + 1, n_B - m + 1, dtype=np.int64) - P, I = _stump( + P, PL, PR, I, IL, IR = _stump( T_A, T_B, m, @@ -610,10 +700,13 @@ def stump(T_A, m, T_B=None, ignore_trivial=True, normalize=True, p=2.0): T_B_subseq_isconstant, diags, ignore_trivial, + k, ) - out[:, 0] = P[:, 0] - out[:, 1:] = I + out = np.empty((l, 2 * k + 2), dtype=object) # last two columns are to + # store left and right matrix profile indices + out[:, :k] = P + out[:, k:] = np.column_stack((I, IL, IR)) threshold = 10e-6 if core.are_distances_too_small(out[:, 0], threshold=threshold): # pragma: no cover diff --git a/stumpy/stumped.py b/stumpy/stumped.py index 09557e318..f98338ce9 100644 --- a/stumpy/stumped.py +++ b/stumpy/stumped.py @@ -14,12 +14,14 @@ @core.non_normalized(aamped) -def stumped(dask_client, T_A, m, T_B=None, ignore_trivial=True, normalize=True, p=2.0): +def stumped( + dask_client, T_A, m, T_B=None, ignore_trivial=True, normalize=True, p=2.0, k=1 +): """ - Compute the z-normalized matrix profile with a distributed dask cluster + Compute the z-normalized (top-k) matrix profile with a distributed dask cluster This is a highly distributed implementation around the Numba JIT-compiled - parallelized `_stump` function which computes the matrix profile according + parallelized `_stump` function which computes the (top-k) matrix profile according to STOMPopt with Pearson correlations. Parameters @@ -54,13 +56,24 @@ def stumped(dask_client, T_A, m, T_B=None, ignore_trivial=True, normalize=True, The p-norm to apply for computing the Minkowski distance. This parameter is ignored when `normalize == True`. + k : int, default 1 + The number of top `k` smallest distances used to construct the matrix profile. + Note that this will increase the total computational time and memory usage + when k > 1. + Returns ------- out : numpy.ndarray - The first column consists of the matrix profile, the second column - consists of the matrix profile indices, the third column consists of - the left matrix profile indices, and the fourth column consists of - the right matrix profile indices. + When k = 1 (default), the first column consists of the matrix profile, + the second column consists of the matrix profile indices, the third column + consists of the left matrix profile indices, and the fourth column consists + of the right matrix profile indices. However, when k > 1, the output array + will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k]) + consists of the top-k matrix profile, the next set of k columns + (i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile + indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or, + equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left + matrix profile indices and the top-1 right matrix profile indices, respectively. See Also -------- @@ -183,7 +196,6 @@ def stumped(dask_client, T_A, m, T_B=None, ignore_trivial=True, normalize=True, l = n_A - m + 1 excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM)) - out = np.empty((l, 4), dtype=object) hosts = list(dask_client.ncores().keys()) nworkers = len(hosts) @@ -248,20 +260,34 @@ def stumped(dask_client, T_A, m, T_B=None, ignore_trivial=True, normalize=True, T_B_subseq_isconstant_future, diags_futures[i], ignore_trivial, + k, ) ) + profile = np.empty((l, 2 * k)) + indices = np.empty((l, 2 * k)) + results = dask_client.gather(futures) - profile, indices = results[0] + profile, profile_L, profile_R, indices, indices_L, indices_R = results[0] + for i in range(1, len(hosts)): - P, I = results[i] - for col in range(P.shape[1]): # pragma: no cover - cond = P[:, col] < profile[:, col] - profile[:, col] = np.where(cond, P[:, col], profile[:, col]) - indices[:, col] = np.where(cond, I[:, col], indices[:, col]) - - out[:, 0] = profile[:, 0] - out[:, 1:4] = indices + P, PL, PR, I, IL, IR = results[i] + # Update top-k matrix profile and matrix profile indices + core._merge_topk_PI(profile, P, indices, I) + + # Update top-1 left matrix profile and matrix profile index + cond = PL < profile_L + profile_L = np.where(cond, PL, profile_L) + indices_L = np.where(cond, IL, indices_L) + + # Update top-1 right matrix profile and matrix profile index + cond = PR < profile_R + profile_R = np.where(cond, PR, profile_R) + indices_R = np.where(cond, IR, indices_R) + + out = np.empty((l, 2 * k + 2), dtype=object) + out[:, :k] = profile + out[:, k:] = np.column_stack((indices, indices_L, indices_R)) # Delete data from Dask cluster dask_client.cancel(T_A_future) diff --git a/tests/naive.py b/tests/naive.py index a34103ca3..fabe3d922 100644 --- a/tests/naive.py +++ b/tests/naive.py @@ -156,11 +156,23 @@ def stamp(T_A, m, T_B=None, exclusion_zone=None): # pragma: no cover return result -def stump(T_A, m, T_B=None, exclusion_zone=None, row_wise=False): +def searchsorted_right(a, v): """ - Traverse distance matrix diagonally and update the matrix profile and - matrix profile indices if the parameter `row_wise` is set to `False`. - If the parameter `row_wise` is set to `True`, it is a row-wise traversal. + Naive version of numpy.searchsorted(..., side='right') + """ + indices = np.flatnonzero(v < a) + if len(indices): + return indices.min() + else: # pragma: no cover + return len(a) + + +def stump(T_A, m, T_B=None, exclusion_zone=None, row_wise=False, k=1): + """ + Traverse distance matrix diagonally and update the top-k nearest neighbor + matrix profile and matrix profile indices if the parameter `row_wise` is + set to `False`. If the parameter `row_wise` is set to `True`, + it is a row-wise traversal. """ if T_B is None: # self-join: ignore_trivial = True @@ -182,35 +194,35 @@ def stump(T_A, m, T_B=None, exclusion_zone=None, row_wise=False): if exclusion_zone is None: exclusion_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM)) - P = np.full((l, 3), np.inf) - I = np.full((l, 3), -1, dtype=np.int64) + P = np.full((l, k + 2), np.inf) + I = np.full((l, k + 2), -1, dtype=np.int64) # two more columns are to store + # ... left and right top-1 matrix profile indices if row_wise: # row-wise traversal in distance matrix if ignore_trivial: # self-join for i in range(l): apply_exclusion_zone(distance_matrix[i], i, exclusion_zone, np.inf) - for i, D in enumerate(distance_matrix): + for i, D in enumerate(distance_matrix): # D: distance profile # self-join / AB-join: matrix proifle and indices - idx = np.argmin(D) - P[i, 0] = D[idx] - if P[i, 0] == np.inf: - idx = -1 - I[i, 0] = idx + indices = np.argsort(D)[:k] + P[i, :k] = D[indices] + indices[P[i, :k] == np.inf] = -1 + I[i, :k] = indices - # self-join: left matrix profile + # self-join: left matrix profile index (top-1) if ignore_trivial and i > 0: IL = np.argmin(D[:i]) if D[IL] == np.inf: IL = -1 - I[i, 1] = IL + I[i, k] = IL - # self-join: right matrix profile + # self-join: right matrix profile index (top-1) if ignore_trivial and i < D.shape[0]: - IR = i + np.argmin(D[i:]) # shift argmin by `i` to get true index + IR = i + np.argmin(D[i:]) # shift arg by `i` to get true index if D[IR] == np.inf: IR = -1 - I[i, 2] = IR + I[i, k + 1] = IR else: # diagonal traversal if ignore_trivial: @@ -218,37 +230,40 @@ def stump(T_A, m, T_B=None, exclusion_zone=None, row_wise=False): else: diags = np.arange(-(n_A - m + 1) + 1, n_B - m + 1) - for k in diags: - if k >= 0: - iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - k)) + for g in diags: + if g >= 0: + iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - g)) else: - iter_range = range(-k, min(n_A - m + 1, n_B - m + 1 - k)) + iter_range = range(-g, min(n_A - m + 1, n_B - m + 1 - g)) for i in iter_range: - D = distance_matrix[i, i + k] - if D < P[i, 0]: - P[i, 0] = D - I[i, 0] = i + k + D = distance_matrix[i, i + g] # D: a single element + if D < P[i, k - 1]: + idx = searchsorted_right(P[i], D) + # to keep the top-k, we must get rid of the last element. + P[i, :k] = np.insert(P[i, :k], idx, D)[:-1] + I[i, :k] = np.insert(I[i, :k], idx, i + g)[:-1] if ignore_trivial: # Self-joins only - if D < P[i + k, 0]: - P[i + k, 0] = D - I[i + k, 0] = i + if D < P[i + g, k - 1]: + idx = searchsorted_right(P[i + g], D) + P[i + g, :k] = np.insert(P[i + g, :k], idx, D)[:-1] + I[i + g, :k] = np.insert(I[i + g, :k], idx, i)[:-1] - if i < i + k: + if i < i + g: # Left matrix profile and left matrix profile index - if D < P[i + k, 1]: - P[i + k, 1] = D - I[i + k, 1] = i + if D < P[i + g, k]: + P[i + g, k] = D + I[i + g, k] = i - if D < P[i, 2]: + if D < P[i, k + 1]: # right matrix profile and right matrix profile index - P[i, 2] = D - I[i, 2] = i + k + P[i, k + 1] = D + I[i, k + 1] = i + g - result = np.empty((l, 4), dtype=object) - result[:, 0] = P[:, 0] - result[:, 1:4] = I[:, :] + result = np.empty((l, 2 * k + 2), dtype=object) + result[:, :k] = P[:, :k] + result[:, k:] = I[:, :] return result @@ -1757,3 +1772,15 @@ def _total_diagonal_ndists(tile_lower_diag, tile_upper_diag, tile_height, tile_w ) return total_ndists + + +def merge_topk_PI(PA, PB, IA, IB): + profile = np.column_stack((PA, PB)) + indices = np.column_stack((IA, IB)) + + idx = np.argsort(profile, axis=1) + profile = np.take_along_axis(profile, idx, axis=1) + indices = np.take_along_axis(indices, idx, axis=1) + + PA[:, :] = profile[:, : PA.shape[1]] + IA[:, :] = indices[:, : PA.shape[1]] diff --git a/tests/test_core.py b/tests/test_core.py index 1d1a70716..1087f999d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1059,3 +1059,31 @@ def test_select_P_ABBA_val_inf(): p_abba.sort() ref = p_abba[k - 1] npt.assert_almost_equal(ref, comp) + + +def test_merge_topk_PI(): + n = 50 + for k in range(1, 6): + PA = np.random.rand(n * k).reshape(n, k) + PA = np.sort(PA, axis=1) # sorting each row separately + + PB = np.random.rand(n * k).reshape(n, k) + col_idx = np.random.randint(0, k, size=n) + for i in range(n): # creating ties between values of PA and PB + PB[i, col_idx[i]] = np.random.choice(PA[i], size=1, replace=False) + PB = np.sort(PB, axis=1) + + IA = np.arange(n * k).reshape(n, k) + IB = IA + n * k + + ref_P = PA.copy() + ref_I = IA.copy() + + comp_P = PA.copy() + comp_I = IA.copy() + + naive.merge_topk_PI(ref_P, PB, ref_I, IB) + core._merge_topk_PI(comp_P, PB, comp_I, IB) + + npt.assert_array_equal(ref_P, comp_P) + npt.assert_array_equal(ref_I, comp_I) diff --git a/tests/test_gpu_stump.py b/tests/test_gpu_stump.py index 508b02a56..071337cd5 100644 --- a/tests/test_gpu_stump.py +++ b/tests/test_gpu_stump.py @@ -1,7 +1,9 @@ +import math import numpy as np import numpy.testing as npt import pandas as pd -from stumpy import gpu_stump +from stumpy import core, gpu_stump +from stumpy.gpu_stump import _gpu_searchsorted_left, _gpu_searchsorted_right from stumpy import config from numba import cuda @@ -39,6 +41,59 @@ def test_gpu_stump_int_input(): gpu_stump(np.arange(10), 5, ignore_trivial=True) +@cuda.jit("(f8[:, :], f8[:], i8[:], i8, b1, i8[:])") +def _gpu_searchsorted_kernel(A, V, bfs, nlevel, is_left, IDX): + # A wrapper kernel for calling device function _gpu_searchsorted_left/right. + i = cuda.grid(1) + if i < A.shape[0]: + if is_left: + IDX[i] = _gpu_searchsorted_left(A[i], V[i], bfs, nlevel) + else: + IDX[i] = _gpu_searchsorted_right(A[i], V[i], bfs, nlevel) + + +def test_gpu_searchsorted(): + n = 5000 + threads_per_block = config.STUMPY_THREADS_PER_BLOCK + blocks_per_grid = math.ceil(n / threads_per_block) + + for k in range(1, 32): + device_bfs = cuda.to_device(core._bfs_indices(k, fill_value=-1)) + nlevel = np.floor(np.log2(k) + 1).astype(np.int64) + + A = np.sort(np.random.rand(n, k), axis=1) + device_A = cuda.to_device(A) + + V = np.random.rand(n) + for i, idx in enumerate(np.random.choice(np.arange(n), size=k, replace=False)): + V[idx] = A[idx, i] # create ties + device_V = cuda.to_device(V) + + is_left = True # test case + ref_IDX = [np.searchsorted(A[i], V[i], side="left") for i in range(n)] + ref_IDX = np.asarray(ref_IDX, dtype=np.int64) + + comp_IDX = np.full(n, -1, dtype=np.int64) + device_comp_IDX = cuda.to_device(comp_IDX) + _gpu_searchsorted_kernel[blocks_per_grid, threads_per_block]( + device_A, device_V, device_bfs, nlevel, is_left, device_comp_IDX + ) + comp_IDX = device_comp_IDX.copy_to_host() + npt.assert_array_equal(ref_IDX, comp_IDX) + + is_left = False # test case + ref_IDX = [np.searchsorted(A[i], V[i], side="right") for i in range(n)] + ref_IDX = np.asarray(ref_IDX, dtype=np.int64) + + comp_IDX = np.full(n, -1, dtype=np.int64) + device_comp_IDX = cuda.to_device(comp_IDX) + _gpu_searchsorted_kernel[blocks_per_grid, threads_per_block]( + device_A, device_V, device_bfs, nlevel, is_left, device_comp_IDX + ) + comp_IDX = device_comp_IDX.copy_to_host() + npt.assert_array_equal(ref_IDX, comp_IDX) + + @pytest.mark.filterwarnings("ignore", category=NumbaPerformanceWarning) @pytest.mark.parametrize("T_A, T_B", test_data) def test_gpu_stump_self_join(T_A, T_B): @@ -350,3 +405,32 @@ def test_gpu_stump_nan_zero_mean_self_join(): naive.replace_inf(ref_mp) naive.replace_inf(comp_mp) npt.assert_almost_equal(ref_mp, comp_mp) + + +@pytest.mark.filterwarnings("ignore", category=NumbaPerformanceWarning) +@pytest.mark.parametrize("T_A, T_B", test_data) +def test_gpu_stump_self_join_KNN(T_A, T_B): + m = 3 + zone = int(np.ceil(m / 4)) + for k in range(1, 4): + ref_mp = naive.stump(T_B, m, exclusion_zone=zone, row_wise=True, k=k) + comp_mp = gpu_stump(T_B, m, ignore_trivial=True, k=k) + naive.replace_inf(ref_mp) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) + + comp_mp = gpu_stump(pd.Series(T_B), m, ignore_trivial=True, k=k) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) + + +@pytest.mark.filterwarnings("ignore", category=NumbaPerformanceWarning) +@pytest.mark.parametrize("T_A, T_B", test_data) +def test_gpu_stump_A_B_join_KNN(T_A, T_B): + m = 3 + for k in range(1, 4): + ref_mp = naive.stump(T_A, m, T_B=T_B, row_wise=True, k=k) + comp_mp = gpu_stump(T_A, m, T_B, ignore_trivial=False, k=k) + naive.replace_inf(ref_mp) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) diff --git a/tests/test_stump.py b/tests/test_stump.py index 6feaf1598..3e0b34299 100644 --- a/tests/test_stump.py +++ b/tests/test_stump.py @@ -240,3 +240,34 @@ def test_stump_nan_zero_mean_self_join(): naive.replace_inf(ref_mp) naive.replace_inf(comp_mp) npt.assert_almost_equal(ref_mp, comp_mp) + + +@pytest.mark.parametrize("T_A, T_B", test_data) +def test_stump_self_join_KNN(T_A, T_B): + m = 3 + zone = int(np.ceil(m / 4)) + for k in range(1, 4): + ref_mp = naive.stump(T_B, m, exclusion_zone=zone, k=k) + comp_mp = stump(T_B, m, ignore_trivial=True, k=k) + naive.replace_inf(ref_mp) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) + + comp_mp = stump(pd.Series(T_B), m, ignore_trivial=True, k=k) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) + + +@pytest.mark.parametrize("T_A, T_B", test_data) +def test_stump_A_B_join_KNN(T_A, T_B): + m = 3 + for k in range(1, 4): + ref_mp = naive.stump(T_A, m, T_B=T_B, k=k) + comp_mp = stump(T_A, m, T_B, ignore_trivial=False, k=k) + naive.replace_inf(ref_mp) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) + + comp_mp = stump(pd.Series(T_A), m, pd.Series(T_B), ignore_trivial=False, k=k) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) diff --git a/tests/test_stumped.py b/tests/test_stumped.py index ca53829fc..7e8b053d3 100644 --- a/tests/test_stumped.py +++ b/tests/test_stumped.py @@ -608,3 +608,36 @@ def test_stumped_two_subsequences_nan_inf_A_B_join_swap( naive.replace_inf(ref_mp) naive.replace_inf(comp_mp) npt.assert_almost_equal(ref_mp, comp_mp) + + +@pytest.mark.filterwarnings("ignore:numpy.dtype size changed") +@pytest.mark.filterwarnings("ignore:numpy.ufunc size changed") +@pytest.mark.filterwarnings("ignore:numpy.ndarray size changed") +@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning") +@pytest.mark.parametrize("T_A, T_B", test_data) +def test_stumped_self_join_KNN(T_A, T_B, dask_cluster): + with Client(dask_cluster) as dask_client: + m = 3 + zone = int(np.ceil(m / 4)) + for k in range(1, 4): + ref_mp = naive.stump(T_B, m, exclusion_zone=zone, k=k) + comp_mp = stumped(dask_client, T_B, m, ignore_trivial=True, k=k) + naive.replace_inf(ref_mp) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp) + + +@pytest.mark.filterwarnings("ignore:numpy.dtype size changed") +@pytest.mark.filterwarnings("ignore:numpy.ufunc size changed") +@pytest.mark.filterwarnings("ignore:numpy.ndarray size changed") +@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning") +@pytest.mark.parametrize("T_A, T_B", test_data) +def test_stumped_A_B_join_KNN(T_A, T_B, dask_cluster): + with Client(dask_cluster) as dask_client: + m = 3 + for k in range(1, 4): + ref_mp = naive.stump(T_A, m, T_B=T_B, k=k) + comp_mp = stumped(dask_client, T_A, m, T_B, ignore_trivial=False, k=k) + naive.replace_inf(ref_mp) + naive.replace_inf(comp_mp) + npt.assert_almost_equal(ref_mp, comp_mp)