Coverage for anfis_toolbox / model.py: 100%

190 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-05 18:47 -0300

1"""ANFIS Model Implementation. 

2 

3This module implements the complete Adaptive Neuro-Fuzzy Inference System (ANFIS) 

4model that combines all the individual layers into a unified architecture. 

5""" 

6 

7import logging 

8from collections.abc import Mapping, Sequence 

9from typing import Any, Protocol, TypeAlias, TypedDict, cast, runtime_checkable 

10 

11import numpy as np 

12 

13from .layers import ClassificationConsequentLayer, ConsequentLayer, MembershipLayer, NormalizationLayer, RuleLayer 

14from .losses import LossFunction, resolve_loss 

15from .membership import MembershipFunction 

16from .metrics import softmax 

17 

18 

19class TrainingHistory(TypedDict, total=False): 

20 """History of loss values collected during training.""" 

21 

22 train: list[float] 

23 val: list[float | None] 

24 

25 

26@runtime_checkable 

27class TrainerProtocol(Protocol): 

28 """Minimal interface required for external trainers.""" 

29 

30 def fit( 

31 self, 

32 model: Any, 

33 X: np.ndarray, 

34 y: np.ndarray, 

35 *, 

36 validation_data: tuple[np.ndarray, np.ndarray] | None = None, 

37 validation_frequency: int = 1, 

38 ) -> TrainingHistory: 

39 """Train ``model`` using ``X`` and ``y`` and return a history mapping.""" 

40 

41 

42TrainerLike: TypeAlias = TrainerProtocol 

43 

44 

45MembershipSnapshot: TypeAlias = dict[str, list[dict[str, float]]] 

46 

47 

48class ParameterState(TypedDict, total=False): 

49 """Structure holding consequent and membership parameters.""" 

50 

51 membership: MembershipSnapshot 

52 consequent: np.ndarray 

53 

54 

55class GradientState(TypedDict): 

56 """Structure holding consequent and membership gradients.""" 

57 

58 membership: MembershipSnapshot 

59 consequent: np.ndarray 

60 

61 

62ConsequentLike: TypeAlias = ConsequentLayer | ClassificationConsequentLayer 

63 

64 

65class _TSKANFISSharedMixin: 

66 """Common ANFIS utilities shared between regression and classification models.""" 

67 

68 input_mfs: dict[str, list[MembershipFunction]] 

69 input_names: list[str] 

70 membership_layer: MembershipLayer 

71 rule_layer: RuleLayer 

72 consequent_layer: ConsequentLike 

73 

74 @property 

75 def membership_functions(self) -> dict[str, list[MembershipFunction]]: 

76 """Return the membership functions grouped by input.""" 

77 return self.input_mfs 

78 

79 @property 

80 def rules(self) -> list[tuple[int, ...]]: 

81 """Return the fuzzy rule definitions used by the model.""" 

82 return list(self.rule_layer.rules) 

83 

84 def reset_gradients(self) -> None: 

85 """Reset all accumulated gradients to zero.""" 

86 self.membership_layer.reset() 

87 self.consequent_layer.reset() 

88 

89 def get_parameters(self) -> ParameterState: 

90 """Return a snapshot of all trainable parameters. 

91 

92 Returns: 

93 ParameterState: Dictionary with ``"membership"`` and ``"consequent"`` keys. 

94 """ 

95 membership_params: MembershipSnapshot = { 

96 name: [dict(mf.parameters) for mf in self.input_mfs[name]] for name in self.input_names 

97 } 

98 return { 

99 "membership": membership_params, 

100 "consequent": self.consequent_layer.parameters.copy(), 

101 } 

102 

103 def set_parameters(self, parameters: Mapping[str, object]) -> None: 

104 """Load parameters into the model. 

105 

106 Args: 

107 parameters: Mapping matching the structure emitted by :meth:`get_parameters`. 

108 """ 

109 consequent = parameters.get("consequent") 

110 if consequent is not None: 

111 self.consequent_layer.parameters = np.array(consequent, copy=True) 

112 

113 membership = parameters.get("membership") 

114 if not isinstance(membership, Mapping): 

115 return 

116 

117 for name in self.input_names: 

118 mf_params_list = membership.get(name) 

119 if not isinstance(mf_params_list, Sequence): 

120 continue 

121 for mf, mf_params in zip(self.input_mfs[name], mf_params_list, strict=False): 

122 mf.parameters = {key: float(value) for key, value in mf_params.items()} 

123 

124 def get_gradients(self) -> GradientState: 

125 """Return the latest computed gradients. 

126 

127 Returns: 

128 GradientState: Dictionary with ``"membership"`` and ``"consequent"`` entries. 

129 """ 

130 membership_grads: MembershipSnapshot = { 

131 name: [dict(mf.gradients) for mf in self.input_mfs[name]] for name in self.input_names 

132 } 

133 return { 

134 "membership": membership_grads, 

135 "consequent": self.consequent_layer.gradients.copy(), 

136 } 

137 

138 def update_parameters(self, effective_learning_rate: float) -> None: 

139 """Apply a single gradient descent update step. 

140 

141 Args: 

142 effective_learning_rate: Step size used to update parameters. 

143 """ 

144 self.update_consequent_parameters(effective_learning_rate) 

145 self.update_membership_parameters(effective_learning_rate) 

146 

147 def update_consequent_parameters(self, effective_learning_rate: float) -> None: 

148 """Apply gradient descent to consequent parameters only. 

149 

150 Args: 

151 effective_learning_rate: Step size used to update parameters. 

152 """ 

153 self.consequent_layer.parameters -= effective_learning_rate * self.consequent_layer.gradients 

154 

155 def update_membership_parameters(self, effective_learning_rate: float) -> None: 

156 """Apply gradient descent to membership function parameters only. 

157 

158 Args: 

159 effective_learning_rate: Step size used to update parameters. 

160 """ 

161 for name in self.input_names: 

162 for mf in self.input_mfs[name]: 

163 for param_name, gradient in mf.gradients.items(): 

164 mf.parameters[param_name] -= effective_learning_rate * gradient 

165 

166 def __str__(self) -> str: 

167 """Returns string representation of the ANFIS model.""" 

168 return self.__repr__() 

169 

170 

171# Setup logger for ANFIS 

172logger = logging.getLogger(__name__) 

173 

174 

175class TSKANFIS(_TSKANFISSharedMixin): 

176 """Adaptive Neuro-Fuzzy Inference System (legacy TSK ANFIS) model. 

177 

178 Implements the classic 4-layer ANFIS architecture: 

179 

180 1) MembershipLayer — fuzzification of inputs 

181 2) RuleLayer — rule strength computation (T-norm) 

182 3) NormalizationLayer — weight normalization 

183 4) ConsequentLayer — final output via a TSK model 

184 

185 Supports forward/backward passes for training, parameter access/update, 

186 and a simple prediction API. 

187 

188 Attributes: 

189 input_mfs (dict[str, list[MembershipFunction]]): Mapping from input name 

190 to its list of membership functions. 

191 membership_layer (MembershipLayer): Layer 1 — fuzzification. 

192 rule_layer (RuleLayer): Layer 2 — rule strength computation. 

193 normalization_layer (NormalizationLayer): Layer 3 — weight normalization. 

194 consequent_layer (ConsequentLayer): Layer 4 — final TSK output. 

195 input_names (list[str]): Ordered list of input variable names. 

196 n_inputs (int): Number of input variables (features). 

197 n_rules (int): Number of fuzzy rules used by the system. 

198 """ 

199 

200 def __init__( 

201 self, 

202 input_mfs: dict[str, list[MembershipFunction]], 

203 rules: Sequence[Sequence[int]] | None = None, 

204 ): 

205 """Initialize the ANFIS model. 

206 

207 Args: 

208 input_mfs (dict[str, list[MembershipFunction]]): Mapping from input 

209 name to a list of membership functions. Example: 

210 ``{"x1": [GaussianMF(0,1), ...], "x2": [...]}``. 

211 rules: Optional explicit set of rules, each specifying one membership index per 

212 input. When ``None``, the Cartesian product of all membership functions is used. 

213 

214 Examples: 

215 >>> from anfis_toolbox.membership import GaussianMF 

216 >>> input_mfs = { 

217 ... "x1": [GaussianMF(0, 1), GaussianMF(1, 1)], 

218 ... "x2": [GaussianMF(0, 1), GaussianMF(1, 1)], 

219 ... } 

220 >>> model = ANFIS(input_mfs) 

221 """ 

222 self.input_mfs = input_mfs 

223 self.input_names = list(input_mfs.keys()) 

224 self.n_inputs = len(input_mfs) 

225 

226 # Calculate number of membership functions per input 

227 mf_per_input = [len(mfs) for mfs in input_mfs.values()] 

228 

229 # Initialize all layers 

230 self.membership_layer = MembershipLayer(input_mfs) 

231 self.rule_layer = RuleLayer(self.input_names, mf_per_input, rules=rules) 

232 self.n_rules = self.rule_layer.n_rules 

233 self.normalization_layer = NormalizationLayer() 

234 self.consequent_layer = ConsequentLayer(self.n_rules, self.n_inputs) 

235 

236 def forward(self, x: np.ndarray) -> np.ndarray: 

237 """Run a forward pass through the model. 

238 

239 Args: 

240 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``. 

241 

242 Returns: 

243 np.ndarray: Output array of shape ``(batch_size, 1)``. 

244 """ 

245 normalized_weights = self.forward_antecedents(x) 

246 output = self.forward_consequents(x, normalized_weights) 

247 return output 

248 

249 def forward_antecedents(self, x: np.ndarray) -> np.ndarray: 

250 """Run a forward pass through the antecedent layers only. 

251 

252 Args: 

253 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``. 

254 

255 Returns: 

256 np.ndarray: Normalized rule weights of shape ``(batch_size, n_rules)``. 

257 """ 

258 membership_outputs = self.membership_layer.forward(x) 

259 rule_strengths = self.rule_layer.forward(membership_outputs) 

260 normalized_weights = self.normalization_layer.forward(rule_strengths) 

261 return normalized_weights 

262 

263 def forward_consequents(self, x: np.ndarray, normalized_weights: np.ndarray) -> np.ndarray: 

264 """Run a forward pass through the consequent layer only. 

265 

266 Args: 

267 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``. 

268 normalized_weights (np.ndarray): Normalized rule weights of shape 

269 ``(batch_size, n_rules)``. 

270 

271 Returns: 

272 np.ndarray: Output array of shape ``(batch_size, 1)``. 

273 """ 

274 output = self.consequent_layer.forward(x, normalized_weights) 

275 return output 

276 

277 def backward(self, dL_dy: np.ndarray) -> None: 

278 """Run a backward pass through all layers. 

279 

280 Propagates gradients from the output back through all layers and stores 

281 parameter gradients for a later update step. 

282 

283 Args: 

284 dL_dy (np.ndarray): Gradient of the loss w.r.t. the model output, 

285 shape ``(batch_size, 1)``. 

286 """ 

287 # Backward pass through Layer 4: Consequent layer 

288 dL_dnorm_w, _ = self.consequent_layer.backward(dL_dy) 

289 

290 # Backward pass through Layer 3: Normalization layer 

291 dL_dw = self.normalization_layer.backward(dL_dnorm_w) 

292 

293 # Backward pass through Layer 2: Rule layer 

294 gradients = self.rule_layer.backward(dL_dw) 

295 

296 # Backward pass through Layer 1: Membership layer 

297 self.membership_layer.backward(gradients) 

298 

299 def predict(self, x: np.ndarray) -> np.ndarray: 

300 """Predict using the current model parameters. 

301 

302 Accepts Python lists, 1D or 2D arrays and coerces to the expected shape. 

303 

304 Args: 

305 x (np.ndarray | list[float]): Input data. If 1D, must have 

306 exactly ``n_inputs`` elements; if 2D, must be 

307 ``(batch_size, n_inputs)``. 

308 

309 Returns: 

310 np.ndarray: Predictions of shape ``(batch_size, 1)``. 

311 

312 Raises: 

313 ValueError: If input dimensionality or feature count does not match 

314 the model configuration. 

315 """ 

316 # Accept Python lists or 1D arrays by coercing to correct 2D shape 

317 x_arr = np.asarray(x, dtype=float) 

318 if x_arr.ndim == 1: 

319 # Single sample; ensure feature count matches 

320 if x_arr.size != self.n_inputs: 

321 raise ValueError(f"Expected {self.n_inputs} features, got {x_arr.size} in 1D input") 

322 x_arr = x_arr.reshape(1, self.n_inputs) 

323 elif x_arr.ndim == 2: 

324 # Validate feature count 

325 if x_arr.shape[1] != self.n_inputs: 

326 raise ValueError(f"Expected input with {self.n_inputs} features, got {x_arr.shape[1]}") 

327 else: 

328 raise ValueError("Expected input with shape (batch_size, n_inputs)") 

329 

330 return self.forward(x_arr) 

331 

332 def fit( 

333 self, 

334 x: np.ndarray, 

335 y: np.ndarray, 

336 epochs: int = 100, 

337 learning_rate: float = 0.01, 

338 verbose: bool = False, 

339 trainer: TrainerLike | None = None, 

340 *, 

341 validation_data: tuple[np.ndarray, np.ndarray] | None = None, 

342 validation_frequency: int = 1, 

343 ) -> TrainingHistory: 

344 """Train the ANFIS model. 

345 

346 If a trainer is provided (see ``anfis_toolbox.optim``), delegate training 

347 to it while preserving a scikit-learn-style ``fit(X, y)`` entry point. If 

348 no trainer is provided, a default ``HybridTrainer`` is used with the given 

349 hyperparameters. 

350 

351 Args: 

352 x (np.ndarray): Training inputs of shape ``(n_samples, n_inputs)``. 

353 y (np.ndarray): Training targets of shape ``(n_samples, 1)`` for 

354 regression. 

355 epochs (int, optional): Number of epochs. Defaults to ``100``. 

356 learning_rate (float, optional): Learning rate. Defaults to ``0.01``. 

357 verbose (bool, optional): Whether to log progress. Defaults to ``False``. 

358 trainer (TrainerLike | None, optional): External trainer implementing 

359 ``fit(model, X, y)``. Defaults to ``None``. 

360 validation_data (tuple[np.ndarray, np.ndarray] | None, optional): Optional 

361 validation inputs and targets evaluated according to ``validation_frequency``. 

362 validation_frequency (int, optional): Evaluate validation loss every N epochs. 

363 

364 Returns: 

365 TrainingHistory: Dictionary with ``"train"`` losses and optional ``"val"`` losses. 

366 """ 

367 if trainer is None: 

368 # Lazy import to avoid unnecessary dependency at module import time 

369 from .optim import HybridTrainer 

370 

371 trainer_instance: TrainerLike = HybridTrainer( 

372 learning_rate=learning_rate, 

373 epochs=epochs, 

374 verbose=verbose, 

375 ) 

376 else: 

377 trainer_instance = trainer 

378 if not isinstance(trainer_instance, TrainerProtocol): 

379 raise TypeError("trainer must implement fit(model, X, y)") 

380 

381 # Delegate training to the provided or default trainer 

382 fit_kwargs: dict[str, Any] = {} 

383 if validation_data is not None: 

384 fit_kwargs["validation_data"] = validation_data 

385 if validation_frequency != 1 or validation_data is not None: 

386 fit_kwargs["validation_frequency"] = validation_frequency 

387 

388 history = trainer_instance.fit(self, x, y, **fit_kwargs) 

389 if not isinstance(history, dict): 

390 raise TypeError("Trainer.fit must return a TrainingHistory dictionary") 

391 return history 

392 

393 def __repr__(self) -> str: 

394 """Returns detailed representation of the ANFIS model.""" 

395 return f"TSKANFIS(n_inputs={self.n_inputs}, n_rules={self.n_rules})" 

396 

397 

398class TSKANFISClassifier(_TSKANFISSharedMixin): 

399 """Adaptive Neuro-Fuzzy classifier with a softmax head (TSK variant). 

400 

401 Aggregates per-rule linear consequents into per-class logits and trains 

402 with cross-entropy loss. 

403 """ 

404 

405 def __init__( 

406 self, 

407 input_mfs: dict[str, list[MembershipFunction]], 

408 n_classes: int, 

409 random_state: int | None = None, 

410 rules: Sequence[Sequence[int]] | None = None, 

411 ): 

412 """Initialize the ANFIS model for classification. 

413 

414 Args: 

415 input_mfs (dict[str, list[MembershipFunction]]): Mapping from input 

416 variable name to its list of membership functions. 

417 n_classes (int): Number of output classes (>= 2). 

418 random_state (int | None): Optional random seed for parameter init. 

419 rules (Sequence[Sequence[int]] | None): Optional explicit rule definitions 

420 where each inner sequence lists the membership-function index per input. 

421 When ``None``, all combinations are used. 

422 

423 Raises: 

424 ValueError: If ``n_classes < 2``. 

425 

426 Attributes: 

427 input_mfs (dict[str, list[MembershipFunction]]): Membership functions per input. 

428 input_names (list[str]): Input variable names. 

429 n_inputs (int): Number of input variables. 

430 n_classes (int): Number of classes. 

431 n_rules (int): Number of fuzzy rules (product of MFs per input). 

432 membership_layer (MembershipLayer): Computes membership degrees. 

433 rule_layer (RuleLayer): Evaluates rule activations. 

434 normalization_layer (NormalizationLayer): Normalizes rule strengths. 

435 consequent_layer (ClassificationConsequentLayer): Computes class logits. 

436 """ 

437 if n_classes < 2: 

438 raise ValueError("n_classes must be >= 2") 

439 self.input_mfs = input_mfs 

440 self.input_names = list(input_mfs.keys()) 

441 self.n_inputs = len(input_mfs) 

442 self.n_classes = int(n_classes) 

443 mf_per_input = [len(mfs) for mfs in input_mfs.values()] 

444 self.membership_layer = MembershipLayer(input_mfs) 

445 self.rule_layer = RuleLayer(self.input_names, mf_per_input, rules=rules) 

446 self.n_rules = self.rule_layer.n_rules 

447 self.normalization_layer = NormalizationLayer() 

448 self.consequent_layer = ClassificationConsequentLayer( 

449 self.n_rules, self.n_inputs, self.n_classes, random_state=random_state 

450 ) 

451 

452 def forward(self, x: np.ndarray) -> np.ndarray: 

453 """Run a forward pass through the classifier. 

454 

455 Args: 

456 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``. 

457 

458 Returns: 

459 np.ndarray: Logits of shape ``(batch_size, n_classes)``. 

460 """ 

461 membership_outputs = self.membership_layer.forward(x) 

462 rule_strengths = self.rule_layer.forward(membership_outputs) 

463 normalized_weights = self.normalization_layer.forward(rule_strengths) 

464 logits = self.consequent_layer.forward(x, normalized_weights) # (b, k) 

465 return logits 

466 

467 def backward(self, dL_dlogits: np.ndarray) -> None: 

468 """Backpropagate gradients through all layers. 

469 

470 Args: 

471 dL_dlogits (np.ndarray): Gradient of the loss w.r.t. logits, 

472 shape ``(batch_size, n_classes)``. 

473 """ 

474 dL_dnorm_w, _ = self.consequent_layer.backward(dL_dlogits) 

475 dL_dw = self.normalization_layer.backward(dL_dnorm_w) 

476 gradients = self.rule_layer.backward(dL_dw) 

477 self.membership_layer.backward(gradients) 

478 

479 def predict_proba(self, x: np.ndarray) -> np.ndarray: 

480 """Predict per-class probabilities for the given inputs. 

481 

482 Args: 

483 x (np.ndarray | list[float]): Inputs. If 1D, must have exactly 

484 ``n_inputs`` elements; if 2D, must be ``(batch_size, n_inputs)``. 

485 

486 Returns: 

487 np.ndarray: Probabilities of shape ``(batch_size, n_classes)``. 

488 

489 Raises: 

490 ValueError: If input dimensionality or feature count is invalid. 

491 """ 

492 x_arr = np.asarray(x, dtype=float) 

493 if x_arr.ndim == 1: 

494 if x_arr.size != self.n_inputs: 

495 raise ValueError(f"Expected {self.n_inputs} features, got {x_arr.size} in 1D input") 

496 x_arr = x_arr.reshape(1, self.n_inputs) 

497 elif x_arr.ndim == 2: 

498 if x_arr.shape[1] != self.n_inputs: 

499 raise ValueError(f"Expected input with {self.n_inputs} features, got {x_arr.shape[1]}") 

500 else: 

501 raise ValueError("Expected input with shape (batch_size, n_inputs)") 

502 logits = self.forward(x_arr) 

503 return softmax(logits, axis=1) 

504 

505 def predict(self, x: np.ndarray) -> np.ndarray: 

506 """Predict the most likely class label for each sample. 

507 

508 Args: 

509 x (np.ndarray | list[float]): Inputs. If 1D, must have exactly 

510 ``n_inputs`` elements; if 2D, must be ``(batch_size, n_inputs)``. 

511 

512 Returns: 

513 np.ndarray: Predicted labels of shape ``(batch_size,)``. 

514 """ 

515 proba = self.predict_proba(x) 

516 return cast(np.ndarray, np.argmax(proba, axis=1)) 

517 

518 def fit( 

519 self, 

520 X: np.ndarray, 

521 y: np.ndarray, 

522 epochs: int = 100, 

523 learning_rate: float = 0.01, 

524 verbose: bool = False, 

525 trainer: TrainerLike | None = None, 

526 loss: LossFunction | str | None = None, 

527 *, 

528 validation_data: tuple[np.ndarray, np.ndarray] | None = None, 

529 validation_frequency: int = 1, 

530 ) -> TrainingHistory: 

531 """Fits the ANFIS model to the provided training data using the specified optimization strategy. 

532 

533 Parameters: 

534 X (np.ndarray): Input features for training. 

535 y (np.ndarray): Target values for training. 

536 epochs (int, optional): Number of training epochs. Defaults to 100. 

537 learning_rate (float, optional): Learning rate for the optimizer. Defaults to 0.01. 

538 verbose (bool, optional): If True, prints training progress. Defaults to False. 

539 trainer (TrainerLike | None, optional): Custom trainer instance. If None, 

540 uses AdamTrainer. Defaults to None. 

541 loss (LossFunction, str, or None, optional): Loss function to use. 

542 If None, defaults to cross-entropy for classification. 

543 validation_data (tuple[np.ndarray, np.ndarray] | None, optional): Optional validation dataset. 

544 validation_frequency (int, optional): Evaluate validation metrics every N epochs. 

545 

546 Returns: 

547 TrainingHistory: Dictionary containing ``"train"`` and optionally ``"val"`` loss curves. 

548 """ 

549 if loss is None: 

550 resolved_loss = resolve_loss("cross_entropy") 

551 else: 

552 resolved_loss = resolve_loss(loss) 

553 

554 if trainer is None: 

555 from .optim import AdamTrainer 

556 

557 trainer_instance: TrainerLike = AdamTrainer( 

558 learning_rate=learning_rate, 

559 epochs=epochs, 

560 verbose=verbose, 

561 loss=resolved_loss, 

562 ) 

563 else: 

564 trainer_instance = trainer 

565 if not isinstance(trainer_instance, TrainerProtocol): 

566 raise TypeError("trainer must implement fit(model, X, y)") 

567 if hasattr(trainer_instance, "loss"): 

568 trainer_instance.loss = resolved_loss 

569 

570 fit_kwargs: dict[str, Any] = {} 

571 if validation_data is not None: 

572 fit_kwargs["validation_data"] = validation_data 

573 if validation_frequency != 1 or validation_data is not None: 

574 fit_kwargs["validation_frequency"] = validation_frequency 

575 

576 history = trainer_instance.fit(self, X, y, **fit_kwargs) 

577 if not isinstance(history, dict): 

578 raise TypeError("Trainer.fit must return a TrainingHistory dictionary") 

579 return history 

580 

581 def __repr__(self) -> str: 

582 """Return a string representation of the ANFISClassifier. 

583 

584 Returns: 

585 str: A formatted string describing the classifier configuration. 

586 """ 

587 return f"TSKANFISClassifier(n_inputs={self.n_inputs}, n_rules={self.n_rules}, n_classes={self.n_classes})"