Coverage for anfis_toolbox / model.py: 100%
190 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
1"""ANFIS Model Implementation.
3This module implements the complete Adaptive Neuro-Fuzzy Inference System (ANFIS)
4model that combines all the individual layers into a unified architecture.
5"""
7import logging
8from collections.abc import Mapping, Sequence
9from typing import Any, Protocol, TypeAlias, TypedDict, cast, runtime_checkable
11import numpy as np
13from .layers import ClassificationConsequentLayer, ConsequentLayer, MembershipLayer, NormalizationLayer, RuleLayer
14from .losses import LossFunction, resolve_loss
15from .membership import MembershipFunction
16from .metrics import softmax
19class TrainingHistory(TypedDict, total=False):
20 """History of loss values collected during training."""
22 train: list[float]
23 val: list[float | None]
26@runtime_checkable
27class TrainerProtocol(Protocol):
28 """Minimal interface required for external trainers."""
30 def fit(
31 self,
32 model: Any,
33 X: np.ndarray,
34 y: np.ndarray,
35 *,
36 validation_data: tuple[np.ndarray, np.ndarray] | None = None,
37 validation_frequency: int = 1,
38 ) -> TrainingHistory:
39 """Train ``model`` using ``X`` and ``y`` and return a history mapping."""
42TrainerLike: TypeAlias = TrainerProtocol
45MembershipSnapshot: TypeAlias = dict[str, list[dict[str, float]]]
48class ParameterState(TypedDict, total=False):
49 """Structure holding consequent and membership parameters."""
51 membership: MembershipSnapshot
52 consequent: np.ndarray
55class GradientState(TypedDict):
56 """Structure holding consequent and membership gradients."""
58 membership: MembershipSnapshot
59 consequent: np.ndarray
62ConsequentLike: TypeAlias = ConsequentLayer | ClassificationConsequentLayer
65class _TSKANFISSharedMixin:
66 """Common ANFIS utilities shared between regression and classification models."""
68 input_mfs: dict[str, list[MembershipFunction]]
69 input_names: list[str]
70 membership_layer: MembershipLayer
71 rule_layer: RuleLayer
72 consequent_layer: ConsequentLike
74 @property
75 def membership_functions(self) -> dict[str, list[MembershipFunction]]:
76 """Return the membership functions grouped by input."""
77 return self.input_mfs
79 @property
80 def rules(self) -> list[tuple[int, ...]]:
81 """Return the fuzzy rule definitions used by the model."""
82 return list(self.rule_layer.rules)
84 def reset_gradients(self) -> None:
85 """Reset all accumulated gradients to zero."""
86 self.membership_layer.reset()
87 self.consequent_layer.reset()
89 def get_parameters(self) -> ParameterState:
90 """Return a snapshot of all trainable parameters.
92 Returns:
93 ParameterState: Dictionary with ``"membership"`` and ``"consequent"`` keys.
94 """
95 membership_params: MembershipSnapshot = {
96 name: [dict(mf.parameters) for mf in self.input_mfs[name]] for name in self.input_names
97 }
98 return {
99 "membership": membership_params,
100 "consequent": self.consequent_layer.parameters.copy(),
101 }
103 def set_parameters(self, parameters: Mapping[str, object]) -> None:
104 """Load parameters into the model.
106 Args:
107 parameters: Mapping matching the structure emitted by :meth:`get_parameters`.
108 """
109 consequent = parameters.get("consequent")
110 if consequent is not None:
111 self.consequent_layer.parameters = np.array(consequent, copy=True)
113 membership = parameters.get("membership")
114 if not isinstance(membership, Mapping):
115 return
117 for name in self.input_names:
118 mf_params_list = membership.get(name)
119 if not isinstance(mf_params_list, Sequence):
120 continue
121 for mf, mf_params in zip(self.input_mfs[name], mf_params_list, strict=False):
122 mf.parameters = {key: float(value) for key, value in mf_params.items()}
124 def get_gradients(self) -> GradientState:
125 """Return the latest computed gradients.
127 Returns:
128 GradientState: Dictionary with ``"membership"`` and ``"consequent"`` entries.
129 """
130 membership_grads: MembershipSnapshot = {
131 name: [dict(mf.gradients) for mf in self.input_mfs[name]] for name in self.input_names
132 }
133 return {
134 "membership": membership_grads,
135 "consequent": self.consequent_layer.gradients.copy(),
136 }
138 def update_parameters(self, effective_learning_rate: float) -> None:
139 """Apply a single gradient descent update step.
141 Args:
142 effective_learning_rate: Step size used to update parameters.
143 """
144 self.update_consequent_parameters(effective_learning_rate)
145 self.update_membership_parameters(effective_learning_rate)
147 def update_consequent_parameters(self, effective_learning_rate: float) -> None:
148 """Apply gradient descent to consequent parameters only.
150 Args:
151 effective_learning_rate: Step size used to update parameters.
152 """
153 self.consequent_layer.parameters -= effective_learning_rate * self.consequent_layer.gradients
155 def update_membership_parameters(self, effective_learning_rate: float) -> None:
156 """Apply gradient descent to membership function parameters only.
158 Args:
159 effective_learning_rate: Step size used to update parameters.
160 """
161 for name in self.input_names:
162 for mf in self.input_mfs[name]:
163 for param_name, gradient in mf.gradients.items():
164 mf.parameters[param_name] -= effective_learning_rate * gradient
166 def __str__(self) -> str:
167 """Returns string representation of the ANFIS model."""
168 return self.__repr__()
171# Setup logger for ANFIS
172logger = logging.getLogger(__name__)
175class TSKANFIS(_TSKANFISSharedMixin):
176 """Adaptive Neuro-Fuzzy Inference System (legacy TSK ANFIS) model.
178 Implements the classic 4-layer ANFIS architecture:
180 1) MembershipLayer — fuzzification of inputs
181 2) RuleLayer — rule strength computation (T-norm)
182 3) NormalizationLayer — weight normalization
183 4) ConsequentLayer — final output via a TSK model
185 Supports forward/backward passes for training, parameter access/update,
186 and a simple prediction API.
188 Attributes:
189 input_mfs (dict[str, list[MembershipFunction]]): Mapping from input name
190 to its list of membership functions.
191 membership_layer (MembershipLayer): Layer 1 — fuzzification.
192 rule_layer (RuleLayer): Layer 2 — rule strength computation.
193 normalization_layer (NormalizationLayer): Layer 3 — weight normalization.
194 consequent_layer (ConsequentLayer): Layer 4 — final TSK output.
195 input_names (list[str]): Ordered list of input variable names.
196 n_inputs (int): Number of input variables (features).
197 n_rules (int): Number of fuzzy rules used by the system.
198 """
200 def __init__(
201 self,
202 input_mfs: dict[str, list[MembershipFunction]],
203 rules: Sequence[Sequence[int]] | None = None,
204 ):
205 """Initialize the ANFIS model.
207 Args:
208 input_mfs (dict[str, list[MembershipFunction]]): Mapping from input
209 name to a list of membership functions. Example:
210 ``{"x1": [GaussianMF(0,1), ...], "x2": [...]}``.
211 rules: Optional explicit set of rules, each specifying one membership index per
212 input. When ``None``, the Cartesian product of all membership functions is used.
214 Examples:
215 >>> from anfis_toolbox.membership import GaussianMF
216 >>> input_mfs = {
217 ... "x1": [GaussianMF(0, 1), GaussianMF(1, 1)],
218 ... "x2": [GaussianMF(0, 1), GaussianMF(1, 1)],
219 ... }
220 >>> model = ANFIS(input_mfs)
221 """
222 self.input_mfs = input_mfs
223 self.input_names = list(input_mfs.keys())
224 self.n_inputs = len(input_mfs)
226 # Calculate number of membership functions per input
227 mf_per_input = [len(mfs) for mfs in input_mfs.values()]
229 # Initialize all layers
230 self.membership_layer = MembershipLayer(input_mfs)
231 self.rule_layer = RuleLayer(self.input_names, mf_per_input, rules=rules)
232 self.n_rules = self.rule_layer.n_rules
233 self.normalization_layer = NormalizationLayer()
234 self.consequent_layer = ConsequentLayer(self.n_rules, self.n_inputs)
236 def forward(self, x: np.ndarray) -> np.ndarray:
237 """Run a forward pass through the model.
239 Args:
240 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``.
242 Returns:
243 np.ndarray: Output array of shape ``(batch_size, 1)``.
244 """
245 normalized_weights = self.forward_antecedents(x)
246 output = self.forward_consequents(x, normalized_weights)
247 return output
249 def forward_antecedents(self, x: np.ndarray) -> np.ndarray:
250 """Run a forward pass through the antecedent layers only.
252 Args:
253 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``.
255 Returns:
256 np.ndarray: Normalized rule weights of shape ``(batch_size, n_rules)``.
257 """
258 membership_outputs = self.membership_layer.forward(x)
259 rule_strengths = self.rule_layer.forward(membership_outputs)
260 normalized_weights = self.normalization_layer.forward(rule_strengths)
261 return normalized_weights
263 def forward_consequents(self, x: np.ndarray, normalized_weights: np.ndarray) -> np.ndarray:
264 """Run a forward pass through the consequent layer only.
266 Args:
267 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``.
268 normalized_weights (np.ndarray): Normalized rule weights of shape
269 ``(batch_size, n_rules)``.
271 Returns:
272 np.ndarray: Output array of shape ``(batch_size, 1)``.
273 """
274 output = self.consequent_layer.forward(x, normalized_weights)
275 return output
277 def backward(self, dL_dy: np.ndarray) -> None:
278 """Run a backward pass through all layers.
280 Propagates gradients from the output back through all layers and stores
281 parameter gradients for a later update step.
283 Args:
284 dL_dy (np.ndarray): Gradient of the loss w.r.t. the model output,
285 shape ``(batch_size, 1)``.
286 """
287 # Backward pass through Layer 4: Consequent layer
288 dL_dnorm_w, _ = self.consequent_layer.backward(dL_dy)
290 # Backward pass through Layer 3: Normalization layer
291 dL_dw = self.normalization_layer.backward(dL_dnorm_w)
293 # Backward pass through Layer 2: Rule layer
294 gradients = self.rule_layer.backward(dL_dw)
296 # Backward pass through Layer 1: Membership layer
297 self.membership_layer.backward(gradients)
299 def predict(self, x: np.ndarray) -> np.ndarray:
300 """Predict using the current model parameters.
302 Accepts Python lists, 1D or 2D arrays and coerces to the expected shape.
304 Args:
305 x (np.ndarray | list[float]): Input data. If 1D, must have
306 exactly ``n_inputs`` elements; if 2D, must be
307 ``(batch_size, n_inputs)``.
309 Returns:
310 np.ndarray: Predictions of shape ``(batch_size, 1)``.
312 Raises:
313 ValueError: If input dimensionality or feature count does not match
314 the model configuration.
315 """
316 # Accept Python lists or 1D arrays by coercing to correct 2D shape
317 x_arr = np.asarray(x, dtype=float)
318 if x_arr.ndim == 1:
319 # Single sample; ensure feature count matches
320 if x_arr.size != self.n_inputs:
321 raise ValueError(f"Expected {self.n_inputs} features, got {x_arr.size} in 1D input")
322 x_arr = x_arr.reshape(1, self.n_inputs)
323 elif x_arr.ndim == 2:
324 # Validate feature count
325 if x_arr.shape[1] != self.n_inputs:
326 raise ValueError(f"Expected input with {self.n_inputs} features, got {x_arr.shape[1]}")
327 else:
328 raise ValueError("Expected input with shape (batch_size, n_inputs)")
330 return self.forward(x_arr)
332 def fit(
333 self,
334 x: np.ndarray,
335 y: np.ndarray,
336 epochs: int = 100,
337 learning_rate: float = 0.01,
338 verbose: bool = False,
339 trainer: TrainerLike | None = None,
340 *,
341 validation_data: tuple[np.ndarray, np.ndarray] | None = None,
342 validation_frequency: int = 1,
343 ) -> TrainingHistory:
344 """Train the ANFIS model.
346 If a trainer is provided (see ``anfis_toolbox.optim``), delegate training
347 to it while preserving a scikit-learn-style ``fit(X, y)`` entry point. If
348 no trainer is provided, a default ``HybridTrainer`` is used with the given
349 hyperparameters.
351 Args:
352 x (np.ndarray): Training inputs of shape ``(n_samples, n_inputs)``.
353 y (np.ndarray): Training targets of shape ``(n_samples, 1)`` for
354 regression.
355 epochs (int, optional): Number of epochs. Defaults to ``100``.
356 learning_rate (float, optional): Learning rate. Defaults to ``0.01``.
357 verbose (bool, optional): Whether to log progress. Defaults to ``False``.
358 trainer (TrainerLike | None, optional): External trainer implementing
359 ``fit(model, X, y)``. Defaults to ``None``.
360 validation_data (tuple[np.ndarray, np.ndarray] | None, optional): Optional
361 validation inputs and targets evaluated according to ``validation_frequency``.
362 validation_frequency (int, optional): Evaluate validation loss every N epochs.
364 Returns:
365 TrainingHistory: Dictionary with ``"train"`` losses and optional ``"val"`` losses.
366 """
367 if trainer is None:
368 # Lazy import to avoid unnecessary dependency at module import time
369 from .optim import HybridTrainer
371 trainer_instance: TrainerLike = HybridTrainer(
372 learning_rate=learning_rate,
373 epochs=epochs,
374 verbose=verbose,
375 )
376 else:
377 trainer_instance = trainer
378 if not isinstance(trainer_instance, TrainerProtocol):
379 raise TypeError("trainer must implement fit(model, X, y)")
381 # Delegate training to the provided or default trainer
382 fit_kwargs: dict[str, Any] = {}
383 if validation_data is not None:
384 fit_kwargs["validation_data"] = validation_data
385 if validation_frequency != 1 or validation_data is not None:
386 fit_kwargs["validation_frequency"] = validation_frequency
388 history = trainer_instance.fit(self, x, y, **fit_kwargs)
389 if not isinstance(history, dict):
390 raise TypeError("Trainer.fit must return a TrainingHistory dictionary")
391 return history
393 def __repr__(self) -> str:
394 """Returns detailed representation of the ANFIS model."""
395 return f"TSKANFIS(n_inputs={self.n_inputs}, n_rules={self.n_rules})"
398class TSKANFISClassifier(_TSKANFISSharedMixin):
399 """Adaptive Neuro-Fuzzy classifier with a softmax head (TSK variant).
401 Aggregates per-rule linear consequents into per-class logits and trains
402 with cross-entropy loss.
403 """
405 def __init__(
406 self,
407 input_mfs: dict[str, list[MembershipFunction]],
408 n_classes: int,
409 random_state: int | None = None,
410 rules: Sequence[Sequence[int]] | None = None,
411 ):
412 """Initialize the ANFIS model for classification.
414 Args:
415 input_mfs (dict[str, list[MembershipFunction]]): Mapping from input
416 variable name to its list of membership functions.
417 n_classes (int): Number of output classes (>= 2).
418 random_state (int | None): Optional random seed for parameter init.
419 rules (Sequence[Sequence[int]] | None): Optional explicit rule definitions
420 where each inner sequence lists the membership-function index per input.
421 When ``None``, all combinations are used.
423 Raises:
424 ValueError: If ``n_classes < 2``.
426 Attributes:
427 input_mfs (dict[str, list[MembershipFunction]]): Membership functions per input.
428 input_names (list[str]): Input variable names.
429 n_inputs (int): Number of input variables.
430 n_classes (int): Number of classes.
431 n_rules (int): Number of fuzzy rules (product of MFs per input).
432 membership_layer (MembershipLayer): Computes membership degrees.
433 rule_layer (RuleLayer): Evaluates rule activations.
434 normalization_layer (NormalizationLayer): Normalizes rule strengths.
435 consequent_layer (ClassificationConsequentLayer): Computes class logits.
436 """
437 if n_classes < 2:
438 raise ValueError("n_classes must be >= 2")
439 self.input_mfs = input_mfs
440 self.input_names = list(input_mfs.keys())
441 self.n_inputs = len(input_mfs)
442 self.n_classes = int(n_classes)
443 mf_per_input = [len(mfs) for mfs in input_mfs.values()]
444 self.membership_layer = MembershipLayer(input_mfs)
445 self.rule_layer = RuleLayer(self.input_names, mf_per_input, rules=rules)
446 self.n_rules = self.rule_layer.n_rules
447 self.normalization_layer = NormalizationLayer()
448 self.consequent_layer = ClassificationConsequentLayer(
449 self.n_rules, self.n_inputs, self.n_classes, random_state=random_state
450 )
452 def forward(self, x: np.ndarray) -> np.ndarray:
453 """Run a forward pass through the classifier.
455 Args:
456 x (np.ndarray): Input array of shape ``(batch_size, n_inputs)``.
458 Returns:
459 np.ndarray: Logits of shape ``(batch_size, n_classes)``.
460 """
461 membership_outputs = self.membership_layer.forward(x)
462 rule_strengths = self.rule_layer.forward(membership_outputs)
463 normalized_weights = self.normalization_layer.forward(rule_strengths)
464 logits = self.consequent_layer.forward(x, normalized_weights) # (b, k)
465 return logits
467 def backward(self, dL_dlogits: np.ndarray) -> None:
468 """Backpropagate gradients through all layers.
470 Args:
471 dL_dlogits (np.ndarray): Gradient of the loss w.r.t. logits,
472 shape ``(batch_size, n_classes)``.
473 """
474 dL_dnorm_w, _ = self.consequent_layer.backward(dL_dlogits)
475 dL_dw = self.normalization_layer.backward(dL_dnorm_w)
476 gradients = self.rule_layer.backward(dL_dw)
477 self.membership_layer.backward(gradients)
479 def predict_proba(self, x: np.ndarray) -> np.ndarray:
480 """Predict per-class probabilities for the given inputs.
482 Args:
483 x (np.ndarray | list[float]): Inputs. If 1D, must have exactly
484 ``n_inputs`` elements; if 2D, must be ``(batch_size, n_inputs)``.
486 Returns:
487 np.ndarray: Probabilities of shape ``(batch_size, n_classes)``.
489 Raises:
490 ValueError: If input dimensionality or feature count is invalid.
491 """
492 x_arr = np.asarray(x, dtype=float)
493 if x_arr.ndim == 1:
494 if x_arr.size != self.n_inputs:
495 raise ValueError(f"Expected {self.n_inputs} features, got {x_arr.size} in 1D input")
496 x_arr = x_arr.reshape(1, self.n_inputs)
497 elif x_arr.ndim == 2:
498 if x_arr.shape[1] != self.n_inputs:
499 raise ValueError(f"Expected input with {self.n_inputs} features, got {x_arr.shape[1]}")
500 else:
501 raise ValueError("Expected input with shape (batch_size, n_inputs)")
502 logits = self.forward(x_arr)
503 return softmax(logits, axis=1)
505 def predict(self, x: np.ndarray) -> np.ndarray:
506 """Predict the most likely class label for each sample.
508 Args:
509 x (np.ndarray | list[float]): Inputs. If 1D, must have exactly
510 ``n_inputs`` elements; if 2D, must be ``(batch_size, n_inputs)``.
512 Returns:
513 np.ndarray: Predicted labels of shape ``(batch_size,)``.
514 """
515 proba = self.predict_proba(x)
516 return cast(np.ndarray, np.argmax(proba, axis=1))
518 def fit(
519 self,
520 X: np.ndarray,
521 y: np.ndarray,
522 epochs: int = 100,
523 learning_rate: float = 0.01,
524 verbose: bool = False,
525 trainer: TrainerLike | None = None,
526 loss: LossFunction | str | None = None,
527 *,
528 validation_data: tuple[np.ndarray, np.ndarray] | None = None,
529 validation_frequency: int = 1,
530 ) -> TrainingHistory:
531 """Fits the ANFIS model to the provided training data using the specified optimization strategy.
533 Parameters:
534 X (np.ndarray): Input features for training.
535 y (np.ndarray): Target values for training.
536 epochs (int, optional): Number of training epochs. Defaults to 100.
537 learning_rate (float, optional): Learning rate for the optimizer. Defaults to 0.01.
538 verbose (bool, optional): If True, prints training progress. Defaults to False.
539 trainer (TrainerLike | None, optional): Custom trainer instance. If None,
540 uses AdamTrainer. Defaults to None.
541 loss (LossFunction, str, or None, optional): Loss function to use.
542 If None, defaults to cross-entropy for classification.
543 validation_data (tuple[np.ndarray, np.ndarray] | None, optional): Optional validation dataset.
544 validation_frequency (int, optional): Evaluate validation metrics every N epochs.
546 Returns:
547 TrainingHistory: Dictionary containing ``"train"`` and optionally ``"val"`` loss curves.
548 """
549 if loss is None:
550 resolved_loss = resolve_loss("cross_entropy")
551 else:
552 resolved_loss = resolve_loss(loss)
554 if trainer is None:
555 from .optim import AdamTrainer
557 trainer_instance: TrainerLike = AdamTrainer(
558 learning_rate=learning_rate,
559 epochs=epochs,
560 verbose=verbose,
561 loss=resolved_loss,
562 )
563 else:
564 trainer_instance = trainer
565 if not isinstance(trainer_instance, TrainerProtocol):
566 raise TypeError("trainer must implement fit(model, X, y)")
567 if hasattr(trainer_instance, "loss"):
568 trainer_instance.loss = resolved_loss
570 fit_kwargs: dict[str, Any] = {}
571 if validation_data is not None:
572 fit_kwargs["validation_data"] = validation_data
573 if validation_frequency != 1 or validation_data is not None:
574 fit_kwargs["validation_frequency"] = validation_frequency
576 history = trainer_instance.fit(self, X, y, **fit_kwargs)
577 if not isinstance(history, dict):
578 raise TypeError("Trainer.fit must return a TrainingHistory dictionary")
579 return history
581 def __repr__(self) -> str:
582 """Return a string representation of the ANFISClassifier.
584 Returns:
585 str: A formatted string describing the classifier configuration.
586 """
587 return f"TSKANFISClassifier(n_inputs={self.n_inputs}, n_rules={self.n_rules}, n_classes={self.n_classes})"