Coverage for /opt/conda/lib/python3.12/site-packages/medil/independence_testing.py: 20%

91 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-25 05:42 +0000

1"""Independence testing on samples of random variables.""" 

2 

3from typing import NamedTuple, Optional 

4 

5from multiprocessing import Pool, cpu_count 

6import numpy as np 

7import numpy.typing as npt 

8from scipy.spatial.distance import pdist, squareform 

9from scipy.stats import chi2, norm, rankdata 

10 

11 

12def dcov(samples): 

13 r"""Compute sample distance covariance matrix. 

14 Parameters 

15 ---------- 

16 samples : 2d numpy array of floats 

17 A :math:`N \times M` matrix with :math:`N` samples of 

18 :math:`M` random variables. 

19 Returns 

20 ------- 

21 2d numpy array 

22 A square matrix :math:`C`, where :math:`C_{i,j}` is the sample 

23 distance covariance between random variables :math:`R_i` and 

24 :math:`R_j`. 

25 

26 Notes 

27 ----- 

28 Trades time complexity for space complexity, so it can be too 

29 memory-intensive for larger datasets, in which case recommend to 

30 use xicor or distance_correlation_t_test from the dcor package. 

31 """ 

32 num_samps, num_feats = samples.shape 

33 num_pairs = num_samps * (num_samps - 1) // 2 

34 dists = np.zeros((num_feats, num_pairs)) 

35 d_bars = np.zeros(num_feats) 

36 # compute doubly centered distance matrix for every feature: 

37 for feat_idx in range(num_feats): 

38 n = num_samps 

39 t = np.tile 

40 # raw distance matrix: 

41 d = squareform(pdist(samples[:, feat_idx].reshape(-1, 1), "cityblock")) 

42 # doubly centered: 

43 d_bar = d.mean() 

44 d -= t(d.mean(0), (n, 1)) + t(d.mean(1), (n, 1)).T - t(d_bar, (n, n)) 

45 d = squareform(d, checks=False) # ignore assymmetry due to numerical error 

46 dists[feat_idx] = d 

47 d_bars[feat_idx] = d_bar 

48 return dists @ dists.T / num_samps**2, d_bars 

49 

50 

51def estimate_UDG(sample, method="dcov_fast", significance_level=0.05): 

52 samp_size, num_feats = sample.shape 

53 

54 if isinstance(method, np.ndarray): 

55 p_vals = method 

56 udg = p_vals < significance_level 

57 elif method == "dcov_fast": 

58 cov, d_bars = dcov(sample) 

59 crit_val = chi2(1).ppf(1 - significance_level) 

60 test_val = samp_size * cov / np.outer(d_bars, d_bars) 

61 udg = test_val >= crit_val 

62 p_vals = None 

63 elif method == "g-test": 

64 pass 

65 else: 

66 p_vals = np.zeros((num_feats, num_feats), float) 

67 idxs, jdxs = np.triu_indices(num_feats, 1) 

68 zipped = zip(idxs, jdxs) 

69 sample_iter = (sample[:, i_j].T for i_j in zipped) 

70 # if method == "dcov_big": 

71 # can use distance_correlation_t_test from dcor package 

72 if method == "xicor": 

73 test = xicor_test 

74 with Pool(max(1, int(0.75 * cpu_count()))) as p: 

75 p_vals[idxs, jdxs] = p_vals[jdxs, idxs] = np.fromiter( 

76 p.imap(test, sample_iter, 100), float 

77 ) 

78 udg = p_vals < significance_level 

79 np.fill_diagonal(udg, False) 

80 return udg, p_vals 

81 

82 

83def xicor_test(x_y): 

84 x, y = x_y 

85 xi, pvalue = xicorr(x, y) 

86 return pvalue 

87 

88 

89# The following is a modification of the xicorrelation source code, 

90# Copyright 2021 Nikolay Novik (https://github.com/jettify), 

91# for compatibility with numpy 2.0+ 

92class _XiCorr(NamedTuple): 

93 xi: float 

94 fr: npt.NDArray[np.float64] 

95 cu: float 

96 

97 

98class XiCorrResult(NamedTuple): 

99 correlation: float 

100 pvalue: Optional[float] 

101 

102 

103def _xicorr(x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> _XiCorr: 

104 # Ported from original R implementation. 

105 # https://github.com/cran/XICOR/blob/master/R/calculateXI.R 

106 n = x.size 

107 PI = rankdata(x, method="average") 

108 fr = rankdata(y, method="average") / n 

109 gr = rankdata(-y, method="average") / n 

110 cu = np.mean(gr * (1 - gr)) 

111 A1 = np.abs(np.diff(fr[np.argsort(PI, kind="quicksort")])).sum() / (2 * n) 

112 xi = 1.0 - A1 / cu 

113 return _XiCorr(xi, fr, cu) 

114 

115 

116def xicorr(x: npt.ArrayLike, y: npt.ArrayLike, ties: bool = True) -> XiCorrResult: 

117 """Compute the cross rank increment correlation coefficient xi [1]. 

118 

119 Parameters 

120 ---------- 

121 x, y : array_like 

122 Arrays of rankings, of the same shape. If arrays are not 1-D, they 

123 will be flattened to 1-D. 

124 ties : bool, optional 

125 If ties is True, the algorithm assumes that the data has ties and 

126 employs the more elaborated theory for calculating s.d. and P-value. 

127 Otherwise, it uses the simpler theory. There is no harm in putting 

128 ties = True even if there are no ties. 

129 

130 Returns 

131 ------- 

132 correlation : float 

133 The tau statistic. 

134 pvalue : float 

135 P-values computed by the asymptotic theory. 

136 

137 See Also 

138 -------- 

139 spearmanr : Calculates a Spearman rank-order correlation coefficient. 

140 

141 

142 Example: 

143 >>> from xicorrelation import xicorr 

144 >>> x = [1, 2, 3, 4, 5] 

145 >>> y = [1, 4, 9, 16, 25] 

146 >>> xi, pvalue = xicorr(x, y) 

147 >>> print(xi, pvalue) 

148 

149 References 

150 ---------- 

151 .. [1] Chatterjee, S., "A new coefficient of correlation", 

152 https://arxiv.org/abs/1909.10140, 2020. 

153 

154 Examples 

155 -------- 

156 >>> from scipy import stats 

157 >>> x1 = [12, 2, 1, 12, 2] 

158 >>> x2 = [1, 4, 7, 1, 0] 

159 >>> xi, p_value, _ = xicorr(x1, x2) 

160 >>> tau 

161 -0.47140452079103173 

162 >>> p_value 

163 0.2827454599327748 

164 """ 

165 # https://git.io/JSIlN 

166 x = np.asarray(x).ravel() 

167 y = np.asarray(y).ravel() 

168 

169 if x.size != y.size: 

170 raise ValueError( 

171 "All inputs to `xicorr` must be of the same " 

172 f"size, found x-size {x.size} and y-size {y.size}" 

173 ) 

174 elif not x.size or not y.size: 

175 # Return NaN if arrays are empty 

176 return XiCorrResult(np.nan, np.nan) 

177 

178 r = _xicorr(x, y) 

179 xi = r.xi 

180 fr = r.fr 

181 CU = r.cu 

182 

183 pvalue: Optional[float] = None 

184 # https://git.io/JSIlM 

185 n = x.size 

186 if not ties: 

187 # sd = np.sqrt(2.0 / (5.0 * n)) 

188 pvalue = 1.0 - norm.cdf(np.sqrt(n) * xi / np.sqrt(2.0 / 5.0)) 

189 else: 

190 qfr = np.sort(fr) 

191 ind = np.arange(1, n + 1) 

192 ind2 = 2 * n - 2 * ind + 1 

193 

194 ai = np.mean(ind2 * qfr * qfr) / n 

195 ci = np.mean(ind2 * qfr) / n 

196 cq = np.cumsum(qfr) 

197 m = (cq + (n - ind) * qfr) / n 

198 b = np.mean(m**2) 

199 v = (ai - 2.0 * b + ci**2) / (CU**2) 

200 

201 # sd = np.sqrt(v / n) 

202 pvalue = 1.0 - norm.cdf(np.sqrt(n) * xi / np.sqrt(v)) 

203 return XiCorrResult(xi, pvalue)