"""Independence testing on samples of random variables."""

from typing import NamedTuple, Optional

from dcor.independence import distance_correlation_t_test
from multiprocessing import Pool, cpu_count
import numpy as np
import numpy.typing as npt
from scipy.spatial.distance import pdist, squareform
from scipy.stats import chi2, norm, rankdata

[docs] def dcov(samples): r"""Compute sample distance covariance matrix. Parameters ---------- samples : 2d numpy array of floats A :math:`N \times M` matrix with :math:`N` samples of :math:`M` random variables. Returns ------- 2d numpy array A square matrix :math:`C`, where :math:`C_{i,j}` is the sample distance covariance between random variables :math:`R_i` and :math:`R_j`. """ num_samps, num_feats = samples.shape num_pairs = num_samps * (num_samps - 1) // 2 dists = np.zeros((num_feats, num_pairs)) d_bars = np.zeros(num_feats) # compute doubly centered distance matrix for every feature: for feat_idx in range(num_feats): n = num_samps t = np.tile # raw distance matrix: d = squareform(pdist(samples[:, feat_idx].reshape(-1, 1), "cityblock")) # doubly centered: d_bar = d.mean() d -= t(d.mean(0), (n, 1)) + t(d.mean(1), (n, 1)).T - t(d_bar, (n, n)) d = squareform(d, checks=False) # ignore assymmetry due to numerical error dists[feat_idx] = d d_bars[feat_idx] = d_bar return dists @ dists.T / num_samps**2, d_bars
[docs] def estimate_UDG(sample, method="dcov_fast", significance_level=0.05): samp_size, num_feats = sample.shape if isinstance(method, np.ndarray): p_vals = method udg = p_vals < significance_level elif method == "dcov_fast": cov, d_bars = dcov(sample) crit_val = chi2(1).ppf(1 - significance_level) test_val = samp_size * cov / np.outer(d_bars, d_bars) udg = test_val >= crit_val p_vals = None elif method == "g-test": pass else: p_vals = np.zeros((num_feats, num_feats), float) idxs, jdxs = np.triu_indices(num_feats, 1) zipped = zip(idxs, jdxs) sample_iter = (sample[:, i_j].T for i_j in zipped) if method == "dcov_big": test = dcor_test elif method == "xicor": test = xicor_test with Pool(max(1, int(0.75 * cpu_count()))) as p: p_vals[idxs, jdxs] = p_vals[jdxs, idxs] = np.fromiter( p.imap(test, sample_iter, 100), float ) udg = p_vals < significance_level np.fill_diagonal(udg, False) return udg, p_vals
[docs] def dcor_test(x_y): x, y = x_y return distance_correlation_t_test(x, y).pvalue
[docs] def xicor_test(x_y): x, y = x_y xi, pvalue = xicorr(x, y) return pvalue
# The following is a modification of the xicorrelation source code, # Copyright 2021 Nikolay Novik (, # for compatibility with numpy 2.0+ class _XiCorr(NamedTuple): xi: float fr: npt.NDArray[np.float64] cu: float
[docs] class XiCorrResult(NamedTuple): correlation: float pvalue: Optional[float]
def _xicorr(x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> _XiCorr: # Ported from original R implementation. # n = x.size PI = rankdata(x, method="average") fr = rankdata(y, method="average") / n gr = rankdata(-y, method="average") / n cu = np.mean(gr * (1 - gr)) A1 = np.abs(np.diff(fr[np.argsort(PI, kind="quicksort")])).sum() / (2 * n) xi = 1.0 - A1 / cu return _XiCorr(xi, fr, cu)
[docs] def xicorr(x: npt.ArrayLike, y: npt.ArrayLike, ties: bool = True) -> XiCorrResult: """Compute the cross rank increment correlation coefficient xi [1]. Parameters ---------- x, y : array_like Arrays of rankings, of the same shape. If arrays are not 1-D, they will be flattened to 1-D. ties : bool, optional If ties is True, the algorithm assumes that the data has ties and employs the more elaborated theory for calculating s.d. and P-value. Otherwise, it uses the simpler theory. There is no harm in putting ties = True even if there are no ties. Returns ------- correlation : float The tau statistic. pvalue : float P-values computed by the asymptotic theory. See Also -------- spearmanr : Calculates a Spearman rank-order correlation coefficient. Example: >>> from xicorrelation import xicorr >>> x = [1, 2, 3, 4, 5] >>> y = [1, 4, 9, 16, 25] >>> xi, pvalue = xicorr(x, y) >>> print(xi, pvalue) References ---------- .. [1] Chatterjee, S., "A new coefficient of correlation",, 2020. Examples -------- >>> from scipy import stats >>> x1 = [12, 2, 1, 12, 2] >>> x2 = [1, 4, 7, 1, 0] >>> xi, p_value, _ = xicorr(x1, x2) >>> tau -0.47140452079103173 >>> p_value 0.2827454599327748 """ # x = np.asarray(x).ravel() y = np.asarray(y).ravel() if x.size != y.size: raise ValueError( "All inputs to `xicorr` must be of the same " f"size, found x-size {x.size} and y-size {y.size}" ) elif not x.size or not y.size: # Return NaN if arrays are empty return XiCorrResult(np.nan, np.nan) r = _xicorr(x, y) xi = r.xi fr = CU = pvalue: Optional[float] = None # n = x.size if not ties: # sd = np.sqrt(2.0 / (5.0 * n)) pvalue = 1.0 - norm.cdf(np.sqrt(n) * xi / np.sqrt(2.0 / 5.0)) else: qfr = np.sort(fr) ind = np.arange(1, n + 1) ind2 = 2 * n - 2 * ind + 1 ai = np.mean(ind2 * qfr * qfr) / n ci = np.mean(ind2 * qfr) / n cq = np.cumsum(qfr) m = (cq + (n - ind) * qfr) / n b = np.mean(m**2) v = (ai - 2.0 * b + ci**2) / (CU**2) # sd = np.sqrt(v / n) pvalue = 1.0 - norm.cdf(np.sqrt(n) * xi / np.sqrt(v)) return XiCorrResult(xi, pvalue)