Coverage for /opt/conda/lib/python3.12/site-packages/medil/sample.py: 98%

58 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-25 05:42 +0000

1"""Generate random minimum MeDIL causal model graph or parameters.""" 

2 

3import numpy as np 

4import numpy.typing as npt 

5from numpy.random import default_rng 

6 

7from .models import GaussianMCM 

8from .ecc_algorithms import find_clique_min_cover 

9 

10 

11def mcm( 

12 rng: np.random.Generator = default_rng(0), 

13 parameterization: str = "Gaussian", 

14 biadj: npt.NDArray = np.array([]), 

15 **kwargs, 

16) -> GaussianMCM: 

17 if biadj.size == 0: 

18 biadj = _biadj(**kwargs) 

19 if parameterization == "Gaussian": 

20 mcm = GaussianMCM(biadj=biadj) 

21 params = mcm.parameters 

22 

23 num_edges = biadj.sum() 

24 weights = (rng.random(num_edges) * 1.5) + 0.5 

25 weights[rng.choice((True, False), num_edges)] *= -1 

26 params.biadj_weights = np.zeros_like(biadj, float) 

27 params.biadj_weights[biadj] = weights 

28 

29 num_meas = biadj.shape[1] 

30 params.error_means = rng.random(num_meas) * 2 

31 params.error_means[rng.choice((True, False), num_meas)] *= -1 

32 

33 params.error_variances = (rng.random(num_meas) * 1.5) + 0.5 

34 

35 elif parameterization == "VAE": 

36 raise (NotImplementedError) 

37 

38 else: 

39 raise ValueError(f"Parameterization '{parameterization}' is invalid.") 

40 

41 return mcm 

42 

43 

44def biadj( 

45 num_meas: int, 

46 density: float = 0.2, 

47 one_pure_child: bool = True, 

48 num_latent: int = 0, 

49 rng: np.random.Generator = default_rng(0), 

50) -> npt.NDArray: 

51 """Randomly generate biadjacency matrix for graphical minMCM.""" 

52 if one_pure_child: 

53 """Define a maximum independent set of size `num_latent`, and then grow these into a minimum edge clique cover with average max clique size `2 + (num_meas - num_latent) * density`.""" 

54 if num_latent == 0: 

55 num_latent = rng.integers(1, num_meas) 

56 if density is None: 

57 density = rng.random() 

58 

59 # specify pure children/independent set 

60 biadj = np.zeros((num_latent, num_meas), bool) 

61 biadj[:, :num_latent] = np.eye(num_latent) 

62 

63 # every child gets a parent; specifically L_0, until the 

64 # within-column perm below using np.permuted 

65 biadj[0, num_latent:] = True 

66 

67 # randomly fill in remaining density * (num_meas - num_latent) 

68 # * (num_latent - 1) edges 

69 max_num_edges = (num_meas - num_latent) * (num_latent - 1) 

70 num_edges = np.round(max_num_edges * density).astype(int) 

71 

72 edges = np.zeros(max_num_edges, bool) 

73 edges[:num_edges] = True 

74 edges = rng.permutation(edges).reshape(num_latent - 1, num_meas - num_latent) 

75 

76 biadj[1:][:, num_latent:] = edges 

77 

78 nonpure_children = biadj[:, num_latent:] 

79 biadj[:, num_latent:] = rng.permuted(nonpure_children, axis=0) 

80 

81 # change child order, so pure children aren't first 

82 biadj = rng.permutation(biadj, axis=1) 

83 

84 else: 

85 """Generate minMCM from Erdős–Rényi random undirected graph 

86 over observed variables.""" 

87 if num_latent != 0: 

88 msg = "`num_latent` can only be specified when `one_pure_child==True`." 

89 raise ValueError(msg) 

90 

91 udg = np.zeros((num_meas, num_meas), bool) 

92 

93 max_edges = (num_meas * (num_meas - 1)) // 2 

94 num_edges = np.round(density * max_edges).astype(int) 

95 

96 edges = np.ones(max_edges) 

97 edges[num_edges:] = 0 

98 

99 udg[np.triu_indices(num_meas, k=1)] = rng.permutation(edges) 

100 udg += udg.T 

101 np.fill_diagonal(udg, True) 

102 

103 # find latent connections (minimum edge clique cover) 

104 biadj = find_clique_min_cover(udg).astype(bool) 

105 

106 return biadj 

107 

108 

109_biadj = biadj