Coverage for /opt/conda/lib/python3.12/site-packages/medil/graph.py: 89%

176 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-25 05:42 +0000

1"""Various types and representations of graphs.""" 

2 

3import numpy as np 

4from numpy.random import default_rng 

5 

6 

7class UndirectedDependenceGraph(object): 

8 r"""Adjacency matrix representation using a 2d numpy array. 

9 

10 Upon initialization, this class is fairly standard implementation 

11 of an undirected graph. However, upon calling the :meth:`make_aux` 

12 method, an auxilliary data structure in the form of several new 

13 attributes is created, which are used by 

14 :meth:`medil.ecc_algorithms.find_clique_min_cover` according to 

15 the algorithm in :cite:`Gramm_2009`. 

16 

17 Attributes 

18 ---------- 

19 

20 

21 Notes 

22 ----- 

23 The algorithms for finding the minMCM via ECC contain many 

24 algebraic operations, so adjacency matrix representation (via 

25 NumPy) is most covenient. 

26 

27 The diag is used to store information when the graph is reduced, 

28 not to indicate self loops, so it is important that diag = 1 at 

29 init. 

30 

31 """ 

32 

33 def __init__(self, adj_matrix, verbose=False): 

34 # doesn't behave well unless input is nparray 

35 self.adj_matrix = adj_matrix 

36 self.num_vertices = np.trace(adj_matrix) 

37 self.max_num_verts = len(adj_matrix) 

38 self.num_edges = np.triu(adj_matrix, 1).sum() 

39 self.verbose = verbose 

40 

41 def add_edges(self, edges): 

42 v_1s = edges[:, 0] 

43 v_2s = edges[:, 1] 

44 self.adj_matrix[v_1s, v_2s] = 1 

45 self.adj_matrix[v_2s, v_1s] = 1 

46 self.num_edges = np.triu(self.adj_matrix, 1).sum() 

47 

48 def rm_edges(self, edges): 

49 v_1s = edges[:, 0] 

50 v_2s = edges[:, 1] 

51 self.adj_matrix[v_1s, v_2s] = 0 

52 self.adj_matrix[v_2s, v_1s] = 0 

53 self.num_edges = np.triu(self.adj_matrix, 1).sum() 

54 

55 def make_aux(self): 

56 # this makes the auxilliary structure described in INITIALIZATION in the paper 

57 

58 # find neighbourhood for each vertex 

59 # each row corresponds to a unique edge 

60 max_num_edges = self.n_choose_2(self.max_num_verts) 

61 self.common_neighbors = np.zeros( 

62 (max_num_edges, self.max_num_verts), int 

63 ) # init 

64 

65 # mapping of edges to unique row idx 

66 triu_idx = np.triu_indices(self.max_num_verts, 1) 

67 nghbrhd_idx = np.zeros((self.max_num_verts, self.max_num_verts), int) 

68 nghbrhd_idx[triu_idx] = np.arange(max_num_edges) 

69 # nghbrhd_idx += nghbrhd_idx.T 

70 self.get_idx = lambda edge: nghbrhd_idx[edge[0], edge[1]] 

71 

72 # reverse mapping 

73 u, v = np.where(np.triu(np.ones_like(self.adj_matrix), 1)) 

74 self.get_edge = lambda idx: (u[idx], v[idx]) 

75 

76 # compute actual neighborhood for each edge = (v_1, v_2) 

77 self.nbrs = lambda edge: np.logical_and( 

78 self.adj_matrix[edge[0]], self.adj_matrix[edge[1]] 

79 ) 

80 

81 extant_edges = np.transpose(np.triu(self.adj_matrix, 1).nonzero()) 

82 self.extant_edges_idx = np.fromiter( 

83 {self.get_idx(edge) for edge in extant_edges}, dtype=int 

84 ) 

85 extant_nbrs = np.array([self.nbrs(edge) for edge in extant_edges], int) 

86 extant_nbrs_idx = np.array([self.get_idx(edge) for edge in extant_edges], int) 

87 

88 # from paper: set of N_{u, v} for all edges (u, v) 

89 self.common_neighbors[extant_nbrs_idx] = extant_nbrs 

90 

91 # number of cliques for each node? assignments? if we set diag=0 

92 # num_cliques = common_neighbors.sum(0) 

93 

94 # sum() of submatrix of graph containing exactly the rows/columns 

95 # corresponding to the nodes in common_neighbors(edge) using 

96 # logical indexing: 

97 

98 # make mask to identify subgraph (closed common neighborhood of 

99 # nodes u, v in edge u,v) 

100 mask = lambda edge_idx: np.array(self.common_neighbors[edge_idx], dtype=bool) 

101 

102 # make subgraph-adjacency matrix, and then subtract diag and 

103 # divide by two to get num edges in subgraph---same as sum() of 

104 # triu(subgraph-adjacency matrix) but probably a bit faster 

105 nbrhood = lambda edge_idx: self.adj_matrix[mask(edge_idx)][:, mask(edge_idx)] 

106 max_num_edges_in_nbrhood = ( 

107 lambda edge_idx: (nbrhood(edge_idx).sum() - mask(edge_idx).sum()) // 2 

108 ) 

109 

110 # from paper: set of c_{u, v} for all edges (u, v) 

111 self.nbrhood_edge_counts = np.array( 

112 [ 

113 max_num_edges_in_nbrhood(edge_idx) 

114 for edge_idx in np.arange(max_num_edges) 

115 ], 

116 int, 

117 ) 

118 

119 # important structs are: 

120 # self.common_neighbors 

121 # self.nbrhood_edge_counts 

122 # # and fun is 

123 # self.nbrs 

124 

125 @staticmethod 

126 def n_choose_2(n): 

127 return n * (n - 1) // 2 

128 

129 def reducible_copy(self): 

130 return ReducibleUndDepGraph(self) 

131 

132 def convert_to_nde(self, name="temp"): 

133 with open(name + ".nde", "w") as f: 

134 f.write(str(self.max_num_verts) + "\n") 

135 for idx, node in enumerate(self.adj_matrix): 

136 f.write(str(idx) + " " + str(node.sum()) + "\n") 

137 for v1, v2 in np.argwhere(np.triu(self.adj_matrix)): 

138 f.write(str(v1) + " " + str(v2) + "\n") 

139 

140 

141class ReducibleUndDepGraph(UndirectedDependenceGraph): 

142 def __init__(self, udg): 

143 self.unreduced = udg.unreduced if hasattr(udg, "unreduced") else udg 

144 self.adj_matrix = udg.adj_matrix.copy() 

145 self.num_vertices = udg.num_vertices 

146 self.num_edges = udg.num_edges 

147 

148 self.the_cover = None 

149 self.verbose = udg.verbose 

150 

151 # from auxilliary structure if needed 

152 if not hasattr(udg, "get_idx"): 

153 udg.make_aux() 

154 self.get_idx = udg.get_idx 

155 

156 # need to also update these all when self.cover_edges() is called? already done in rule_1 

157 self.common_neighbors = udg.common_neighbors.copy() 

158 self.nbrhood_edge_counts = udg.nbrhood_edge_counts.copy() 

159 

160 # update when cover_edges() is called, actually maybe just extant_edges? 

161 self.extant_edges_idx = udg.extant_edges_idx.copy() 

162 self.nbrs = udg.nbrs 

163 self.get_edge = udg.get_edge 

164 

165 if hasattr(udg, "reduced_away"): # then rule_3 wasn't applied 

166 self.reduced_away = udg.reduced_away 

167 

168 def reset(self): 

169 self.__init__(self.unreduced) 

170 

171 def reduzieren(self, k_num_cliques): 

172 # reduce by first applying rule 1 and then repeatedly applying 

173 # rule 2 or rule 3 (both of which followed by rule 1 again) 

174 # until they don't apply 

175 if self.verbose: 

176 print("\t\treducing:") 

177 self.k_num_cliques = k_num_cliques 

178 self.reducing = True 

179 while self.reducing: 

180 self.reducing = False 

181 self.rule_1() 

182 self.rule_2() 

183 if self.k_num_cliques < 0: 

184 return 

185 if self.reducing: 

186 continue 

187 self.rule_3() 

188 

189 def rule_1(self): 

190 # rule_1: Remove isolated vertices and vertices that are only 

191 # adjacent to covered edges 

192 

193 isolated_verts = np.where(self.adj_matrix.sum(0) + self.adj_matrix.sum(1) == 2)[ 

194 0 

195 ] 

196 if len(isolated_verts) > 0: # then Rule 1 is applied 

197 if self.verbose: 

198 print("\t\t\tapplying Rule 1...") 

199 

200 # update auxilliary attributes; LEMMA 2 

201 self.adj_matrix[isolated_verts, isolated_verts] = 0 

202 self.num_vertices -= len(isolated_verts) 

203 

204 # decrease nbrhood edge counts 

205 for vert in isolated_verts: 

206 open_nbrhood = self.unreduced.adj_matrix[vert].copy() 

207 open_nbrhood[vert] = 0 

208 idx_nbrhoods_to_update = np.where(self.common_neighbors[:, vert] == 1)[ 

209 0 

210 ] 

211 tiled = np.tile( 

212 open_nbrhood, (len(idx_nbrhoods_to_update), 1) 

213 ) # instead of another loop 

214 to_subtract = np.logical_and( 

215 tiled, self.common_neighbors[idx_nbrhoods_to_update] 

216 ).sum(1) 

217 self.nbrhood_edge_counts[idx_nbrhoods_to_update] -= to_subtract 

218 # my own addition: 

219 # self.nbrhood[:, vert] = 0 

220 

221 # remove isolated_verts from common neighborhoods 

222 self.common_neighbors[:, isolated_verts] = 0 

223 

224 # max_num_edges = self.n_choose_2(self.unreduced.max_num_verts) 

225 # mask = lambda edge_idx: np.array(self.common_neighbors[edge_idx], dtype=bool) 

226 

227 # # make subgraph-adjacency matrix, and then subtract diag and 

228 # # divide by two to get num edges in subgraph---same as sum() of 

229 # # triu(subgraph-adjacency matrix) but probably a bit faster 

230 # nbrhood = lambda edge_idx: self.adj_matrix[mask(edge_idx)][:, mask(edge_idx)] 

231 # max_num_edges_in_nbrhood = lambda edge_idx: (nbrhood(edge_idx).sum() - mask(edge_idx).sum()) // 2 

232 

233 # # from paper: set of c_{u, v} for all edges (u, v) 

234 # self.nbrhood_edge_counts = np.array([max_num_edges_in_nbrhood(edge_idx) for edge_idx in np.arange(max_num_edges)], int) 

235 # # assert (nbrhood_edge_counts==self.nbrhood_edge_counts).all() 

236 # # print(nbrhood_edge_counts, self.nbrhood_edge_counts) 

237 # # need to fix!!!!!!!! update isn't working; so just recomputing for now 

238 # # # # # # # # but actually update produces correct result though recomputing doesn't? 

239 

240 def rule_2(self): 

241 # rule_2: If an uncovered edge {u,v} is contained in exactly 

242 # one maximal clique C, i.e., the common neighbors of u and v 

243 # induce a clique, then add C to the solution, mark its edges 

244 # as covered, and decrease k by one 

245 

246 score = self.n_choose_2(self.common_neighbors.sum(1)) - self.nbrhood_edge_counts 

247 # score of n implies edge is in exactly n+1 maximal cliques, 

248 # so we want edges with score 0 

249 

250 clique_idxs = np.where(score[self.extant_edges_idx] == 0)[0] 

251 

252 if clique_idxs.size > 0: 

253 clique_idx = clique_idxs[-1] 

254 if self.verbose: 

255 print("\t\t\tapplying Rule 2...") 

256 clique = self.common_neighbors[self.extant_edges_idx[clique_idx]].copy() 

257 self.the_cover = ( 

258 clique.reshape(1, -1) 

259 if self.the_cover is None 

260 else np.vstack((self.the_cover, clique)) 

261 ) 

262 self.cover_edges() 

263 self.k_num_cliques -= 1 

264 self.reducing = True 

265 # start the loop over so Rule 1 can 'clean up' 

266 # self.common_neighbors[clique_idxs[0]] = 0 # zero out row, to update struct? not in paper? 

267 

268 def rule_3(self): 

269 # rule_3: Consider a vertex v that has at least one 

270 # prisoner. If each prisoner is connected to at least one 

271 # vertex other than v via an uncovered edge (automatically 

272 # given if instance is reduced w.r.t. rules 1 and 2), and the 

273 # prisoners dominate the exit,s then delete v. To reconstruct 

274 # a solution for the unreduced instance, add v to every clique 

275 # containing a prisoner of v. 

276 

277 for vert, nbrhood in enumerate(self.adj_matrix): 

278 if nbrhood[vert] == 0: # then nbrhood is empty 

279 continue 

280 exits = np.zeros(self.unreduced.max_num_verts, bool) 

281 nbrs = np.flatnonzero(nbrhood) 

282 for nbr in nbrs: 

283 if (nbrhood.astype(int) - self.adj_matrix[nbr].astype(int) == -1).any(): 

284 exits[nbr] = True 

285 # exits[j] == True iff j is an exit for vert 

286 # prisoners[j] == True iff j is a prisoner of vert 

287 prisoners = np.logical_and(~exits, self.adj_matrix[vert]) 

288 prisoners[vert] = False # a vert isn't its own prisoner 

289 

290 # check if each exit is adjacent to at least one prisoner 

291 prisoners_dominate_exits = self.adj_matrix[prisoners][:, exits].sum(0) 

292 if prisoners_dominate_exits.all(): # apply the rule 

293 self.reducing = True 

294 if self.verbose: 

295 print("\t\t\tapplying Rule 3...") 

296 

297 # mark edges covered so rule_1 will delete vertex 

298 edges = np.array( 

299 [ 

300 [vert, u] 

301 for u in np.flatnonzero(self.adj_matrix[vert]) 

302 if vert != u 

303 ] 

304 ) 

305 edges.sort() # so they're in triu, so that self.get_idx works 

306 self.cover_edges(edges) 

307 

308 # keep track of deleted nodes and their prisoners for reconstructing solution 

309 if not hasattr(self, "reduced_away"): 

310 self.reduced_away = np.zeros_like(self.adj_matrix, bool) 

311 self.reduced_away[vert] = prisoners 

312 break 

313 

314 def choose_nbrhood(self): 

315 # only compute for existing edges 

316 common_neighbors = self.common_neighbors[self.extant_edges_idx] 

317 nbrhood_edge_counts = self.nbrhood_edge_counts[self.extant_edges_idx] 

318 score = self.n_choose_2(common_neighbors.sum(1)) - nbrhood_edge_counts 

319 

320 # idx in reduced idx list, not from full edge list 

321 chosen_edge_idx = score.argmin() 

322 

323 mask = common_neighbors[chosen_edge_idx].astype(bool) 

324 

325 chosen_nbrhood = self.adj_matrix.copy() 

326 chosen_nbrhood[~mask, :] = 0 

327 chosen_nbrhood[:, ~mask] = 0 

328 if chosen_nbrhood.sum() <= mask.sum(): 

329 raise ValueError("This nbrhood has no edges") 

330 return chosen_nbrhood 

331 

332 def cover_edges(self, edges=None): 

333 if edges is None: 

334 # always call after updating the cover; only on single, recently added clique 

335 if self.the_cover is None: 

336 return # self.adj_matrix 

337 

338 # change edges to 0 if they're covered 

339 clique = self.the_cover[-1] 

340 covered = np.where(clique)[0] 

341 

342 # trick for getting combinations from idx 

343 comb_idx = np.triu_indices(len(covered), 1) 

344 

345 # actual pairwise combinations; ie all edges (v_i, v_j) covered by the clique 

346 covered_edges = np.empty((len(comb_idx[0]), 2), int) 

347 covered_edges[:, 0] = covered[comb_idx[0]] 

348 covered_edges[:, 1] = covered[comb_idx[1]] 

349 

350 else: 

351 covered_edges = edges 

352 

353 # cover (remove from reduced_graph) edges 

354 self.rm_edges(covered_edges) 

355 # update extant_edges_idx 

356 rmed_edges_idx = [self.get_idx(edge) for edge in covered_edges] 

357 extant_rmed_edges_idx = [ 

358 edge for edge in rmed_edges_idx if edge in self.extant_edges_idx 

359 ] 

360 idx_idx = np.array( 

361 [np.where(self.extant_edges_idx == idx) for idx in extant_rmed_edges_idx], 

362 int, 

363 ).flatten() 

364 

365 self.extant_edges_idx = np.delete(self.extant_edges_idx, idx_idx) 

366 # now here do all the updates to nbrs?----actually probably don't want this? see 2clique house example 

367 

368 # update self.common_neighbors 

369 # self.common_neighbors[rmed_edges_idx] = 0 # zero out rows covered edges; maybe not necessary, since common_neighbors is (probably?) only called with extant_edges_idx? 

370 

371 if self.verbose: 

372 print("\t\t\t{} uncovered edges remaining".format(self.num_edges)) 

373 

374 # if edges is None: 

375 # cover_orig = self.the_cover 

376 # while self.the_cover.shape[0] > 1: 

377 # self.the_cover = self.the_cover[:-1] 

378 # self.cover_edges() 

379 # self.the_cover = cover_orig 

380 

381 def reconstruct_cover(self, the_cover): 

382 if not hasattr(self, "reduced_away"): # then rule_3 wasn't applied 

383 return the_cover 

384 

385 # add the reduced away vert to all covering cliques containing at least one of its prisoners 

386 to_expand = np.flatnonzero(self.reduced_away.sum(1)) 

387 for vert in to_expand: 

388 prisoners = self.reduced_away[vert] 

389 tiled_prisoners = np.tile( 

390 prisoners, (len(the_cover), 1) 

391 ) # instead of another loop 

392 cliques_to_update_mask = ( 

393 np.logical_and(tiled_prisoners, the_cover).sum(1).astype(bool) 

394 ) 

395 the_cover[cliques_to_update_mask, vert] = 1 

396 

397 return the_cover 

398 

399 

400# class MCM(object): 

401# TODO: implement as large DAG adj matrix over L and M, or smaller bigraph for L-M connections and DAG adj matirx for L->L connections