Coverage for /opt/conda/lib/python3.12/site-packages/medil/independence_testing.py: 20%
91 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-25 05:42 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-25 05:42 +0000
1"""Independence testing on samples of random variables."""
3from typing import NamedTuple, Optional
5from multiprocessing import Pool, cpu_count
6import numpy as np
7import numpy.typing as npt
8from scipy.spatial.distance import pdist, squareform
9from scipy.stats import chi2, norm, rankdata
12def dcov(samples):
13 r"""Compute sample distance covariance matrix.
14 Parameters
15 ----------
16 samples : 2d numpy array of floats
17 A :math:`N \times M` matrix with :math:`N` samples of
18 :math:`M` random variables.
19 Returns
20 -------
21 2d numpy array
22 A square matrix :math:`C`, where :math:`C_{i,j}` is the sample
23 distance covariance between random variables :math:`R_i` and
24 :math:`R_j`.
26 Notes
27 -----
28 Trades time complexity for space complexity, so it can be too
29 memory-intensive for larger datasets, in which case recommend to
30 use xicor or distance_correlation_t_test from the dcor package.
31 """
32 num_samps, num_feats = samples.shape
33 num_pairs = num_samps * (num_samps - 1) // 2
34 dists = np.zeros((num_feats, num_pairs))
35 d_bars = np.zeros(num_feats)
36 # compute doubly centered distance matrix for every feature:
37 for feat_idx in range(num_feats):
38 n = num_samps
39 t = np.tile
40 # raw distance matrix:
41 d = squareform(pdist(samples[:, feat_idx].reshape(-1, 1), "cityblock"))
42 # doubly centered:
43 d_bar = d.mean()
44 d -= t(d.mean(0), (n, 1)) + t(d.mean(1), (n, 1)).T - t(d_bar, (n, n))
45 d = squareform(d, checks=False) # ignore assymmetry due to numerical error
46 dists[feat_idx] = d
47 d_bars[feat_idx] = d_bar
48 return dists @ dists.T / num_samps**2, d_bars
51def estimate_UDG(sample, method="dcov_fast", significance_level=0.05):
52 samp_size, num_feats = sample.shape
54 if isinstance(method, np.ndarray):
55 p_vals = method
56 udg = p_vals < significance_level
57 elif method == "dcov_fast":
58 cov, d_bars = dcov(sample)
59 crit_val = chi2(1).ppf(1 - significance_level)
60 test_val = samp_size * cov / np.outer(d_bars, d_bars)
61 udg = test_val >= crit_val
62 p_vals = None
63 elif method == "g-test":
64 pass
65 else:
66 p_vals = np.zeros((num_feats, num_feats), float)
67 idxs, jdxs = np.triu_indices(num_feats, 1)
68 zipped = zip(idxs, jdxs)
69 sample_iter = (sample[:, i_j].T for i_j in zipped)
70 # if method == "dcov_big":
71 # can use distance_correlation_t_test from dcor package
72 if method == "xicor":
73 test = xicor_test
74 with Pool(max(1, int(0.75 * cpu_count()))) as p:
75 p_vals[idxs, jdxs] = p_vals[jdxs, idxs] = np.fromiter(
76 p.imap(test, sample_iter, 100), float
77 )
78 udg = p_vals < significance_level
79 np.fill_diagonal(udg, False)
80 return udg, p_vals
83def xicor_test(x_y):
84 x, y = x_y
85 xi, pvalue = xicorr(x, y)
86 return pvalue
89# The following is a modification of the xicorrelation source code,
90# Copyright 2021 Nikolay Novik (https://github.com/jettify),
91# for compatibility with numpy 2.0+
92class _XiCorr(NamedTuple):
93 xi: float
94 fr: npt.NDArray[np.float64]
95 cu: float
98class XiCorrResult(NamedTuple):
99 correlation: float
100 pvalue: Optional[float]
103def _xicorr(x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> _XiCorr:
104 # Ported from original R implementation.
105 # https://github.com/cran/XICOR/blob/master/R/calculateXI.R
106 n = x.size
107 PI = rankdata(x, method="average")
108 fr = rankdata(y, method="average") / n
109 gr = rankdata(-y, method="average") / n
110 cu = np.mean(gr * (1 - gr))
111 A1 = np.abs(np.diff(fr[np.argsort(PI, kind="quicksort")])).sum() / (2 * n)
112 xi = 1.0 - A1 / cu
113 return _XiCorr(xi, fr, cu)
116def xicorr(x: npt.ArrayLike, y: npt.ArrayLike, ties: bool = True) -> XiCorrResult:
117 """Compute the cross rank increment correlation coefficient xi [1].
119 Parameters
120 ----------
121 x, y : array_like
122 Arrays of rankings, of the same shape. If arrays are not 1-D, they
123 will be flattened to 1-D.
124 ties : bool, optional
125 If ties is True, the algorithm assumes that the data has ties and
126 employs the more elaborated theory for calculating s.d. and P-value.
127 Otherwise, it uses the simpler theory. There is no harm in putting
128 ties = True even if there are no ties.
130 Returns
131 -------
132 correlation : float
133 The tau statistic.
134 pvalue : float
135 P-values computed by the asymptotic theory.
137 See Also
138 --------
139 spearmanr : Calculates a Spearman rank-order correlation coefficient.
142 Example:
143 >>> from xicorrelation import xicorr
144 >>> x = [1, 2, 3, 4, 5]
145 >>> y = [1, 4, 9, 16, 25]
146 >>> xi, pvalue = xicorr(x, y)
147 >>> print(xi, pvalue)
149 References
150 ----------
151 .. [1] Chatterjee, S., "A new coefficient of correlation",
152 https://arxiv.org/abs/1909.10140, 2020.
154 Examples
155 --------
156 >>> from scipy import stats
157 >>> x1 = [12, 2, 1, 12, 2]
158 >>> x2 = [1, 4, 7, 1, 0]
159 >>> xi, p_value, _ = xicorr(x1, x2)
160 >>> tau
161 -0.47140452079103173
162 >>> p_value
163 0.2827454599327748
164 """
165 # https://git.io/JSIlN
166 x = np.asarray(x).ravel()
167 y = np.asarray(y).ravel()
169 if x.size != y.size:
170 raise ValueError(
171 "All inputs to `xicorr` must be of the same "
172 f"size, found x-size {x.size} and y-size {y.size}"
173 )
174 elif not x.size or not y.size:
175 # Return NaN if arrays are empty
176 return XiCorrResult(np.nan, np.nan)
178 r = _xicorr(x, y)
179 xi = r.xi
180 fr = r.fr
181 CU = r.cu
183 pvalue: Optional[float] = None
184 # https://git.io/JSIlM
185 n = x.size
186 if not ties:
187 # sd = np.sqrt(2.0 / (5.0 * n))
188 pvalue = 1.0 - norm.cdf(np.sqrt(n) * xi / np.sqrt(2.0 / 5.0))
189 else:
190 qfr = np.sort(fr)
191 ind = np.arange(1, n + 1)
192 ind2 = 2 * n - 2 * ind + 1
194 ai = np.mean(ind2 * qfr * qfr) / n
195 ci = np.mean(ind2 * qfr) / n
196 cq = np.cumsum(qfr)
197 m = (cq + (n - ind) * qfr) / n
198 b = np.mean(m**2)
199 v = (ai - 2.0 * b + ci**2) / (CU**2)
201 # sd = np.sqrt(v / n)
202 pvalue = 1.0 - norm.cdf(np.sqrt(n) * xi / np.sqrt(v))
203 return XiCorrResult(xi, pvalue)