Coverage for anfis_toolbox / clustering.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-05 18:47 -0300

1"""Clustering utilities (no external deps). 

2 

3Currently includes Fuzzy C-Means (FCM). 

4""" 

5 

6from __future__ import annotations 

7 

8from typing import cast 

9 

10import numpy as np 

11 

12from .metrics import classification_entropy as _ce 

13from .metrics import partition_coefficient as _pc 

14from .metrics import xie_beni_index as _xb 

15 

16 

17class FuzzyCMeans: 

18 """Fuzzy C-Means clustering. 

19 

20 Parameters: 

21 n_clusters: Number of clusters (>= 2). 

22 m: Fuzzifier (> 1). Default 2.0. 

23 max_iter: Maximum iterations. 

24 tol: Convergence tolerance on centers. 

25 random_state: Optional seed for reproducibility. 

26 """ 

27 

28 def __init__( 

29 self, 

30 n_clusters: int, 

31 m: float = 2.0, 

32 max_iter: int = 300, 

33 tol: float = 1e-4, 

34 random_state: int | None = None, 

35 ) -> None: 

36 """Initialize FuzzyCMeans with hyperparameters.""" 

37 if n_clusters < 2: 

38 raise ValueError("n_clusters must be >= 2") 

39 if m <= 1: 

40 raise ValueError("m (fuzzifier) must be > 1") 

41 self.n_clusters = int(n_clusters) 

42 self.m = float(m) 

43 self.max_iter = int(max_iter) 

44 self.tol = float(tol) 

45 self.random_state = random_state 

46 self.cluster_centers_: np.ndarray | None = None 

47 self.membership_: np.ndarray | None = None 

48 

49 # --------------------- 

50 # Helpers 

51 # --------------------- 

52 def _rng(self) -> np.random.RandomState: 

53 return np.random.RandomState(self.random_state) 

54 

55 def _check_X(self, X: np.ndarray) -> np.ndarray: 

56 X = np.asarray(X, dtype=float) 

57 if X.ndim == 1: 

58 X = X.reshape(-1, 1) 

59 if X.ndim != 2: 

60 raise ValueError("X must be 1D or 2D array-like") 

61 return X 

62 

63 def _init_membership(self, n_samples: int) -> np.ndarray: 

64 rng = self._rng() 

65 U = rng.rand(n_samples, self.n_clusters) 

66 U /= np.sum(U, axis=1, keepdims=True) 

67 return U 

68 

69 @staticmethod 

70 def _pairwise_sq_dists(X: np.ndarray, C: np.ndarray) -> np.ndarray: 

71 # (n,d) vs (k,d) -> (n,k) 

72 return cast(np.ndarray, ((X[:, None, :] - C[None, :, :]) ** 2).sum(axis=2)) 

73 

74 # --------------------- 

75 # Public API 

76 # --------------------- 

77 def fit(self, X: np.ndarray) -> FuzzyCMeans: 

78 """Fit the FCM model. 

79 

80 Sets cluster_centers_ (k,d) and membership_ (n,k). 

81 """ 

82 X = self._check_X(X) 

83 n, _ = X.shape 

84 if n < self.n_clusters: 

85 raise ValueError("n_samples must be >= n_clusters") 

86 U = self._init_membership(n) 

87 m = self.m 

88 

89 def update_centers(Um: np.ndarray) -> np.ndarray: 

90 num = Um.T @ X # (k,d) 

91 den = np.maximum(Um.sum(axis=0)[:, None], 1e-12) 

92 return cast(np.ndarray, num / den) 

93 

94 Um = U**m 

95 C = update_centers(Um) 

96 for _ in range(self.max_iter): 

97 d2 = np.maximum(self._pairwise_sq_dists(X, C), 1e-12) # (n,k) 

98 inv = d2 ** (-1.0 / (m - 1.0)) 

99 U_new = inv / np.sum(inv, axis=1, keepdims=True) 

100 Um_new = U_new**m 

101 C_new = update_centers(Um_new) 

102 if np.max(np.linalg.norm(C_new - C, axis=1)) < self.tol: 

103 U, C = U_new, C_new 

104 break 

105 U, C = U_new, C_new 

106 self.membership_ = U 

107 self.cluster_centers_ = C 

108 return self 

109 

110 def fit_predict(self, X: np.ndarray) -> np.ndarray: 

111 """Fit and return hard labels via argmax of membership.""" 

112 self.fit(X) 

113 return self.predict(X) 

114 

115 def predict(self, X: np.ndarray) -> np.ndarray: 

116 """Return hard labels via argmax of predict_proba.""" 

117 U = self.predict_proba(X) 

118 return cast(np.ndarray, np.argmax(U, axis=1)) 

119 

120 def predict_proba(self, X: np.ndarray) -> np.ndarray: 

121 """Return membership degrees for samples to clusters (rows sum to 1).""" 

122 if self.cluster_centers_ is None: 

123 raise RuntimeError("Call fit() before predict_proba().") 

124 X = self._check_X(X) 

125 C = self.cluster_centers_ 

126 m = self.m 

127 d2 = np.maximum(self._pairwise_sq_dists(X, C), 1e-12) 

128 inv = d2 ** (-1.0 / (m - 1.0)) 

129 return cast(np.ndarray, inv / np.sum(inv, axis=1, keepdims=True)) 

130 

131 def transform(self, X: np.ndarray) -> np.ndarray: 

132 """Alias for predict_proba.""" 

133 return self.predict_proba(X) 

134 

135 # Metrics 

136 def partition_coefficient(self) -> float: 

137 """Bezdek's Partition Coefficient (PC) in [1/k, 1]. Higher is crisper.""" 

138 if self.membership_ is None: 

139 raise RuntimeError("Fit the model before calling partition_coefficient().") 

140 return _pc(self.membership_) 

141 

142 def classification_entropy(self) -> float: 

143 """Classification Entropy (CE). Lower is better (crisper).""" 

144 if self.membership_ is None: 

145 raise RuntimeError("Fit the model before calling classification_entropy().") 

146 return _ce(self.membership_) 

147 

148 def xie_beni_index(self, X: np.ndarray) -> float: 

149 """Xie-Beni index (XB). Lower is better. 

150 

151 XB = sum_i sum_k u_ik^m ||x_i - v_k||^2 / (n * min_{p!=q} ||v_p - v_q||^2) 

152 """ 

153 if self.membership_ is None or self.cluster_centers_ is None: 

154 raise RuntimeError("Fit the model before calling xie_beni_index().") 

155 X = self._check_X(X) 

156 return _xb(X, self.membership_, self.cluster_centers_, m=self.m)