Coverage for anfis_toolbox / clustering.py: 100%
92 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
1"""Clustering utilities (no external deps).
3Currently includes Fuzzy C-Means (FCM).
4"""
6from __future__ import annotations
8from typing import cast
10import numpy as np
12from .metrics import classification_entropy as _ce
13from .metrics import partition_coefficient as _pc
14from .metrics import xie_beni_index as _xb
17class FuzzyCMeans:
18 """Fuzzy C-Means clustering.
20 Parameters:
21 n_clusters: Number of clusters (>= 2).
22 m: Fuzzifier (> 1). Default 2.0.
23 max_iter: Maximum iterations.
24 tol: Convergence tolerance on centers.
25 random_state: Optional seed for reproducibility.
26 """
28 def __init__(
29 self,
30 n_clusters: int,
31 m: float = 2.0,
32 max_iter: int = 300,
33 tol: float = 1e-4,
34 random_state: int | None = None,
35 ) -> None:
36 """Initialize FuzzyCMeans with hyperparameters."""
37 if n_clusters < 2:
38 raise ValueError("n_clusters must be >= 2")
39 if m <= 1:
40 raise ValueError("m (fuzzifier) must be > 1")
41 self.n_clusters = int(n_clusters)
42 self.m = float(m)
43 self.max_iter = int(max_iter)
44 self.tol = float(tol)
45 self.random_state = random_state
46 self.cluster_centers_: np.ndarray | None = None
47 self.membership_: np.ndarray | None = None
49 # ---------------------
50 # Helpers
51 # ---------------------
52 def _rng(self) -> np.random.RandomState:
53 return np.random.RandomState(self.random_state)
55 def _check_X(self, X: np.ndarray) -> np.ndarray:
56 X = np.asarray(X, dtype=float)
57 if X.ndim == 1:
58 X = X.reshape(-1, 1)
59 if X.ndim != 2:
60 raise ValueError("X must be 1D or 2D array-like")
61 return X
63 def _init_membership(self, n_samples: int) -> np.ndarray:
64 rng = self._rng()
65 U = rng.rand(n_samples, self.n_clusters)
66 U /= np.sum(U, axis=1, keepdims=True)
67 return U
69 @staticmethod
70 def _pairwise_sq_dists(X: np.ndarray, C: np.ndarray) -> np.ndarray:
71 # (n,d) vs (k,d) -> (n,k)
72 return cast(np.ndarray, ((X[:, None, :] - C[None, :, :]) ** 2).sum(axis=2))
74 # ---------------------
75 # Public API
76 # ---------------------
77 def fit(self, X: np.ndarray) -> FuzzyCMeans:
78 """Fit the FCM model.
80 Sets cluster_centers_ (k,d) and membership_ (n,k).
81 """
82 X = self._check_X(X)
83 n, _ = X.shape
84 if n < self.n_clusters:
85 raise ValueError("n_samples must be >= n_clusters")
86 U = self._init_membership(n)
87 m = self.m
89 def update_centers(Um: np.ndarray) -> np.ndarray:
90 num = Um.T @ X # (k,d)
91 den = np.maximum(Um.sum(axis=0)[:, None], 1e-12)
92 return cast(np.ndarray, num / den)
94 Um = U**m
95 C = update_centers(Um)
96 for _ in range(self.max_iter):
97 d2 = np.maximum(self._pairwise_sq_dists(X, C), 1e-12) # (n,k)
98 inv = d2 ** (-1.0 / (m - 1.0))
99 U_new = inv / np.sum(inv, axis=1, keepdims=True)
100 Um_new = U_new**m
101 C_new = update_centers(Um_new)
102 if np.max(np.linalg.norm(C_new - C, axis=1)) < self.tol:
103 U, C = U_new, C_new
104 break
105 U, C = U_new, C_new
106 self.membership_ = U
107 self.cluster_centers_ = C
108 return self
110 def fit_predict(self, X: np.ndarray) -> np.ndarray:
111 """Fit and return hard labels via argmax of membership."""
112 self.fit(X)
113 return self.predict(X)
115 def predict(self, X: np.ndarray) -> np.ndarray:
116 """Return hard labels via argmax of predict_proba."""
117 U = self.predict_proba(X)
118 return cast(np.ndarray, np.argmax(U, axis=1))
120 def predict_proba(self, X: np.ndarray) -> np.ndarray:
121 """Return membership degrees for samples to clusters (rows sum to 1)."""
122 if self.cluster_centers_ is None:
123 raise RuntimeError("Call fit() before predict_proba().")
124 X = self._check_X(X)
125 C = self.cluster_centers_
126 m = self.m
127 d2 = np.maximum(self._pairwise_sq_dists(X, C), 1e-12)
128 inv = d2 ** (-1.0 / (m - 1.0))
129 return cast(np.ndarray, inv / np.sum(inv, axis=1, keepdims=True))
131 def transform(self, X: np.ndarray) -> np.ndarray:
132 """Alias for predict_proba."""
133 return self.predict_proba(X)
135 # Metrics
136 def partition_coefficient(self) -> float:
137 """Bezdek's Partition Coefficient (PC) in [1/k, 1]. Higher is crisper."""
138 if self.membership_ is None:
139 raise RuntimeError("Fit the model before calling partition_coefficient().")
140 return _pc(self.membership_)
142 def classification_entropy(self) -> float:
143 """Classification Entropy (CE). Lower is better (crisper)."""
144 if self.membership_ is None:
145 raise RuntimeError("Fit the model before calling classification_entropy().")
146 return _ce(self.membership_)
148 def xie_beni_index(self, X: np.ndarray) -> float:
149 """Xie-Beni index (XB). Lower is better.
151 XB = sum_i sum_k u_ik^m ||x_i - v_k||^2 / (n * min_{p!=q} ||v_p - v_q||^2)
152 """
153 if self.membership_ is None or self.cluster_centers_ is None:
154 raise RuntimeError("Fit the model before calling xie_beni_index().")
155 X = self._check_X(X)
156 return _xb(X, self.membership_, self.cluster_centers_, m=self.m)