Coverage for anfis_toolbox / optim / pso.py: 100%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-05 18:47 -0300

1from __future__ import annotations 

2 

3from collections.abc import Generator 

4from contextlib import contextmanager 

5from dataclasses import dataclass, field 

6from typing import Any, TypedDict 

7 

8import numpy as np 

9 

10from ..losses import LossFunction 

11from .base import BaseTrainer, ModelLike 

12 

13 

14class _FlattenMeta(TypedDict): 

15 consequent_shape: tuple[int, ...] 

16 n_consequent: int 

17 membership_info: list[tuple[str, int, str]] 

18 

19 

20def _flatten_params(params: Any) -> tuple[np.ndarray, _FlattenMeta]: 

21 """Flatten model parameters into a 1D vector and return meta for reconstruction. 

22 

23 The expected structure matches model.get_parameters(): 

24 { 'consequent': np.ndarray, 'membership': { name: [ {param: val, ...}, ... ] } } 

25 """ 

26 cons = params["consequent"].ravel() 

27 memb_info: list[tuple[str, int, str]] = [] 

28 memb_vals: list[float] = [] 

29 for name in params["membership"].keys(): 

30 for i, mf_params in enumerate(params["membership"][name]): 

31 for key in mf_params.keys(): 

32 memb_info.append((name, i, key)) 

33 memb_vals.append(float(mf_params[key])) 

34 memb = np.asarray(memb_vals, dtype=float) 

35 if memb.size: 

36 theta = np.concatenate([cons, memb]) 

37 else: 

38 theta = cons.copy() 

39 meta: _FlattenMeta = { 

40 "consequent_shape": params["consequent"].shape, 

41 "n_consequent": cons.size, 

42 "membership_info": memb_info, 

43 } 

44 return theta, meta 

45 

46 

47def _unflatten_params(theta: np.ndarray, meta: _FlattenMeta, template: Any) -> dict[str, Any]: 

48 """Reconstruct parameter dictionary from theta using meta and template structure.""" 

49 n_cons = meta["n_consequent"] 

50 cons = theta[:n_cons].reshape(meta["consequent_shape"]) 

51 out: dict[str, Any] = {"consequent": cons.copy(), "membership": {}} 

52 offset = n_cons 

53 # Copy structure from template membership dict 

54 for name in template["membership"].keys(): 

55 out["membership"][name] = [] 

56 for _ in range(len(template["membership"][name])): 

57 out["membership"][name].append({}) 

58 # Assign values in the same order used in flatten 

59 for name, i, key in meta["membership_info"]: 

60 out["membership"][name][i][key] = float(theta[offset]) 

61 offset += 1 

62 return out 

63 

64 

65@dataclass 

66class PSOTrainer(BaseTrainer): 

67 """Particle Swarm Optimization (PSO) trainer for ANFIS. 

68 

69 Parameters: 

70 swarm_size: Number of particles. 

71 inertia: Inertia weight (w). 

72 cognitive: Cognitive coefficient (c1). 

73 social: Social coefficient (c2). 

74 epochs: Number of iterations of the swarm update. 

75 init_sigma: Std-dev for initializing particle positions around current params. 

76 clamp_velocity: Optional (min, max) to clip velocities element-wise. 

77 clamp_position: Optional (min, max) to clip positions element-wise. 

78 random_state: Seed for RNG to ensure determinism. 

79 verbose: Unused here; kept for API parity. 

80 

81 Notes: 

82 Optimizes the loss specified by ``loss`` (defaulting to mean squared error) by searching 

83 directly in parameter space without gradients. With ``ANFISClassifier`` you can set 

84 ``loss="cross_entropy"`` to optimize categorical cross-entropy on logits. 

85 """ 

86 

87 swarm_size: int = 20 

88 inertia: float = 0.7 

89 cognitive: float = 1.5 

90 social: float = 1.5 

91 epochs: int = 100 

92 init_sigma: float = 0.1 

93 clamp_velocity: None | tuple[float, float] = None 

94 clamp_position: None | tuple[float, float] = None 

95 random_state: None | int = None 

96 verbose: bool = False 

97 loss: LossFunction | str | None = None 

98 _loss_fn: LossFunction = field(init=False, repr=False) 

99 

100 def init_state(self, model: ModelLike, X: np.ndarray, y: np.ndarray) -> dict[str, Any]: 

101 """Initialize PSO swarm state and return as a dict.""" 

102 X = np.asarray(X, dtype=float) 

103 y = np.asarray(y, dtype=float) 

104 rng = np.random.default_rng(self.random_state) 

105 base_params = model.get_parameters() 

106 theta0, meta = _flatten_params(base_params) 

107 D = theta0.size 

108 positions = theta0[None, :] + self.init_sigma * rng.normal(size=(self.swarm_size, D)) 

109 velocities = np.zeros((self.swarm_size, D), dtype=float) 

110 # Initialize personal/global bests on provided data 

111 personal_best_pos = positions.copy() 

112 personal_best_val = np.empty(self.swarm_size, dtype=float) 

113 for i in range(self.swarm_size): 

114 params_i = _unflatten_params(positions[i], meta, base_params) 

115 with self._temporary_parameters(model, params_i): 

116 personal_best_val[i] = self._evaluate_loss(model, X, y) 

117 g_idx = int(np.argmin(personal_best_val)) 

118 global_best_pos = personal_best_pos[g_idx].copy() 

119 global_best_val = float(personal_best_val[g_idx]) 

120 return { 

121 "meta": meta, 

122 "template": base_params, 

123 "positions": positions, 

124 "velocities": velocities, 

125 "pbest_pos": personal_best_pos, 

126 "pbest_val": personal_best_val, 

127 "gbest_pos": global_best_pos, 

128 "gbest_val": global_best_val, 

129 "rng": rng, 

130 } 

131 

132 def train_step( 

133 self, model: ModelLike, Xb: np.ndarray, yb: np.ndarray, state: dict[str, Any] 

134 ) -> tuple[float, dict[str, Any]]: 

135 """Perform one PSO iteration over the swarm on a batch and return (best_loss, state).""" 

136 positions = state["positions"] 

137 velocities = state["velocities"] 

138 personal_best_pos = state["pbest_pos"] 

139 personal_best_val = state["pbest_val"] 

140 global_best_pos = state["gbest_pos"] 

141 global_best_val = state["gbest_val"] 

142 meta = state["meta"] 

143 template = state["template"] 

144 rng = state["rng"] 

145 

146 D = positions.shape[1] 

147 r1 = rng.random(size=(self.swarm_size, D)) 

148 r2 = rng.random(size=(self.swarm_size, D)) 

149 cognitive_term = self.cognitive * r1 * (personal_best_pos - positions) 

150 social_term = self.social * r2 * (global_best_pos[None, :] - positions) 

151 velocities = self.inertia * velocities + cognitive_term + social_term 

152 if self.clamp_velocity is not None: 

153 vmin, vmax = self.clamp_velocity 

154 velocities = np.clip(velocities, vmin, vmax) 

155 positions = positions + velocities 

156 if self.clamp_position is not None: 

157 pmin, pmax = self.clamp_position 

158 positions = np.clip(positions, pmin, pmax) 

159 

160 # Evaluate swarm and update bests 

161 for i in range(self.swarm_size): 

162 params_i = _unflatten_params(positions[i], meta, template) 

163 with self._temporary_parameters(model, params_i): 

164 val = self._evaluate_loss(model, Xb, yb) 

165 if val < personal_best_val[i]: 

166 personal_best_val[i] = val 

167 personal_best_pos[i] = positions[i].copy() 

168 if val < global_best_val: 

169 global_best_val = float(val) 

170 global_best_pos = positions[i].copy() 

171 

172 # Update state and set model to global best 

173 state.update( 

174 { 

175 "positions": positions, 

176 "velocities": velocities, 

177 "pbest_pos": personal_best_pos, 

178 "pbest_val": personal_best_val, 

179 "gbest_pos": global_best_pos, 

180 "gbest_val": global_best_val, 

181 } 

182 ) 

183 best_params = _unflatten_params(global_best_pos, meta, template) 

184 model.set_parameters(best_params) 

185 return float(global_best_val), state 

186 

187 @contextmanager 

188 def _temporary_parameters(self, model: Any, params: dict[str, Any]) -> Generator[None, None, None]: 

189 original = model.get_parameters() 

190 model.set_parameters(params) 

191 try: 

192 yield 

193 finally: 

194 model.set_parameters(original) 

195 

196 def _evaluate_loss(self, model: Any, X: np.ndarray, y: np.ndarray) -> float: 

197 loss_fn = self._get_loss_fn() 

198 preds = model.forward(X) 

199 return float(loss_fn.loss(y, preds)) 

200 

201 def compute_loss(self, model: Any, X: np.ndarray, y: np.ndarray) -> float: 

202 """Evaluate the swarm's current parameters on ``(X, y)`` without mutation.""" 

203 return self._evaluate_loss(model, X, y)