Coverage for anfis_toolbox / optim / pso.py: 100%
123 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
1from __future__ import annotations
3from collections.abc import Generator
4from contextlib import contextmanager
5from dataclasses import dataclass, field
6from typing import Any, TypedDict
8import numpy as np
10from ..losses import LossFunction
11from .base import BaseTrainer, ModelLike
14class _FlattenMeta(TypedDict):
15 consequent_shape: tuple[int, ...]
16 n_consequent: int
17 membership_info: list[tuple[str, int, str]]
20def _flatten_params(params: Any) -> tuple[np.ndarray, _FlattenMeta]:
21 """Flatten model parameters into a 1D vector and return meta for reconstruction.
23 The expected structure matches model.get_parameters():
24 { 'consequent': np.ndarray, 'membership': { name: [ {param: val, ...}, ... ] } }
25 """
26 cons = params["consequent"].ravel()
27 memb_info: list[tuple[str, int, str]] = []
28 memb_vals: list[float] = []
29 for name in params["membership"].keys():
30 for i, mf_params in enumerate(params["membership"][name]):
31 for key in mf_params.keys():
32 memb_info.append((name, i, key))
33 memb_vals.append(float(mf_params[key]))
34 memb = np.asarray(memb_vals, dtype=float)
35 if memb.size:
36 theta = np.concatenate([cons, memb])
37 else:
38 theta = cons.copy()
39 meta: _FlattenMeta = {
40 "consequent_shape": params["consequent"].shape,
41 "n_consequent": cons.size,
42 "membership_info": memb_info,
43 }
44 return theta, meta
47def _unflatten_params(theta: np.ndarray, meta: _FlattenMeta, template: Any) -> dict[str, Any]:
48 """Reconstruct parameter dictionary from theta using meta and template structure."""
49 n_cons = meta["n_consequent"]
50 cons = theta[:n_cons].reshape(meta["consequent_shape"])
51 out: dict[str, Any] = {"consequent": cons.copy(), "membership": {}}
52 offset = n_cons
53 # Copy structure from template membership dict
54 for name in template["membership"].keys():
55 out["membership"][name] = []
56 for _ in range(len(template["membership"][name])):
57 out["membership"][name].append({})
58 # Assign values in the same order used in flatten
59 for name, i, key in meta["membership_info"]:
60 out["membership"][name][i][key] = float(theta[offset])
61 offset += 1
62 return out
65@dataclass
66class PSOTrainer(BaseTrainer):
67 """Particle Swarm Optimization (PSO) trainer for ANFIS.
69 Parameters:
70 swarm_size: Number of particles.
71 inertia: Inertia weight (w).
72 cognitive: Cognitive coefficient (c1).
73 social: Social coefficient (c2).
74 epochs: Number of iterations of the swarm update.
75 init_sigma: Std-dev for initializing particle positions around current params.
76 clamp_velocity: Optional (min, max) to clip velocities element-wise.
77 clamp_position: Optional (min, max) to clip positions element-wise.
78 random_state: Seed for RNG to ensure determinism.
79 verbose: Unused here; kept for API parity.
81 Notes:
82 Optimizes the loss specified by ``loss`` (defaulting to mean squared error) by searching
83 directly in parameter space without gradients. With ``ANFISClassifier`` you can set
84 ``loss="cross_entropy"`` to optimize categorical cross-entropy on logits.
85 """
87 swarm_size: int = 20
88 inertia: float = 0.7
89 cognitive: float = 1.5
90 social: float = 1.5
91 epochs: int = 100
92 init_sigma: float = 0.1
93 clamp_velocity: None | tuple[float, float] = None
94 clamp_position: None | tuple[float, float] = None
95 random_state: None | int = None
96 verbose: bool = False
97 loss: LossFunction | str | None = None
98 _loss_fn: LossFunction = field(init=False, repr=False)
100 def init_state(self, model: ModelLike, X: np.ndarray, y: np.ndarray) -> dict[str, Any]:
101 """Initialize PSO swarm state and return as a dict."""
102 X = np.asarray(X, dtype=float)
103 y = np.asarray(y, dtype=float)
104 rng = np.random.default_rng(self.random_state)
105 base_params = model.get_parameters()
106 theta0, meta = _flatten_params(base_params)
107 D = theta0.size
108 positions = theta0[None, :] + self.init_sigma * rng.normal(size=(self.swarm_size, D))
109 velocities = np.zeros((self.swarm_size, D), dtype=float)
110 # Initialize personal/global bests on provided data
111 personal_best_pos = positions.copy()
112 personal_best_val = np.empty(self.swarm_size, dtype=float)
113 for i in range(self.swarm_size):
114 params_i = _unflatten_params(positions[i], meta, base_params)
115 with self._temporary_parameters(model, params_i):
116 personal_best_val[i] = self._evaluate_loss(model, X, y)
117 g_idx = int(np.argmin(personal_best_val))
118 global_best_pos = personal_best_pos[g_idx].copy()
119 global_best_val = float(personal_best_val[g_idx])
120 return {
121 "meta": meta,
122 "template": base_params,
123 "positions": positions,
124 "velocities": velocities,
125 "pbest_pos": personal_best_pos,
126 "pbest_val": personal_best_val,
127 "gbest_pos": global_best_pos,
128 "gbest_val": global_best_val,
129 "rng": rng,
130 }
132 def train_step(
133 self, model: ModelLike, Xb: np.ndarray, yb: np.ndarray, state: dict[str, Any]
134 ) -> tuple[float, dict[str, Any]]:
135 """Perform one PSO iteration over the swarm on a batch and return (best_loss, state)."""
136 positions = state["positions"]
137 velocities = state["velocities"]
138 personal_best_pos = state["pbest_pos"]
139 personal_best_val = state["pbest_val"]
140 global_best_pos = state["gbest_pos"]
141 global_best_val = state["gbest_val"]
142 meta = state["meta"]
143 template = state["template"]
144 rng = state["rng"]
146 D = positions.shape[1]
147 r1 = rng.random(size=(self.swarm_size, D))
148 r2 = rng.random(size=(self.swarm_size, D))
149 cognitive_term = self.cognitive * r1 * (personal_best_pos - positions)
150 social_term = self.social * r2 * (global_best_pos[None, :] - positions)
151 velocities = self.inertia * velocities + cognitive_term + social_term
152 if self.clamp_velocity is not None:
153 vmin, vmax = self.clamp_velocity
154 velocities = np.clip(velocities, vmin, vmax)
155 positions = positions + velocities
156 if self.clamp_position is not None:
157 pmin, pmax = self.clamp_position
158 positions = np.clip(positions, pmin, pmax)
160 # Evaluate swarm and update bests
161 for i in range(self.swarm_size):
162 params_i = _unflatten_params(positions[i], meta, template)
163 with self._temporary_parameters(model, params_i):
164 val = self._evaluate_loss(model, Xb, yb)
165 if val < personal_best_val[i]:
166 personal_best_val[i] = val
167 personal_best_pos[i] = positions[i].copy()
168 if val < global_best_val:
169 global_best_val = float(val)
170 global_best_pos = positions[i].copy()
172 # Update state and set model to global best
173 state.update(
174 {
175 "positions": positions,
176 "velocities": velocities,
177 "pbest_pos": personal_best_pos,
178 "pbest_val": personal_best_val,
179 "gbest_pos": global_best_pos,
180 "gbest_val": global_best_val,
181 }
182 )
183 best_params = _unflatten_params(global_best_pos, meta, template)
184 model.set_parameters(best_params)
185 return float(global_best_val), state
187 @contextmanager
188 def _temporary_parameters(self, model: Any, params: dict[str, Any]) -> Generator[None, None, None]:
189 original = model.get_parameters()
190 model.set_parameters(params)
191 try:
192 yield
193 finally:
194 model.set_parameters(original)
196 def _evaluate_loss(self, model: Any, X: np.ndarray, y: np.ndarray) -> float:
197 loss_fn = self._get_loss_fn()
198 preds = model.forward(X)
199 return float(loss_fn.loss(y, preds))
201 def compute_loss(self, model: Any, X: np.ndarray, y: np.ndarray) -> float:
202 """Evaluate the swarm's current parameters on ``(X, y)`` without mutation."""
203 return self._evaluate_loss(model, X, y)