Coverage for anfis_toolbox / layers.py: 100%
184 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 18:47 -0300
1from collections.abc import Sequence
2from itertools import product
3from typing import Any, cast
5import numpy as np
7from .membership import MembershipFunction
10class MembershipLayer:
11 """Membership layer for ANFIS (Adaptive Neuro-Fuzzy Inference System).
13 This is the first layer of ANFIS that applies membership functions to
14 input variables. Each input variable has multiple membership functions
15 that transform crisp input values into fuzzy membership degrees.
17 This layer serves as the fuzzification stage, converting crisp inputs
18 into fuzzy sets that can be processed by subsequent ANFIS layers.
20 Attributes:
21 input_mfs (dict): Dictionary mapping input names to lists of membership functions.
22 input_names (list): List of input variable names.
23 n_inputs (int): Number of input variables.
24 mf_per_input (list): Number of membership functions per input.
25 last (dict): Cache of last forward pass computations for backward pass.
26 """
28 def __init__(self, input_mfs: dict[str, list[MembershipFunction]]) -> None:
29 """Initializes the membership layer with input membership functions.
31 Parameters:
32 input_mfs (dict): Dictionary mapping input names to lists of membership functions.
33 Format: {input_name: [MembershipFunction, ...]}
34 """
35 self.input_mfs = input_mfs
36 self.input_names = list(input_mfs.keys())
37 self.n_inputs = len(input_mfs)
38 self.mf_per_input = [len(mfs) for mfs in input_mfs.values()]
39 self.last: dict[str, Any] = {}
41 @property
42 def membership_functions(self) -> dict[str, list[MembershipFunction]]:
43 """Alias for input_mfs to provide a standardized interface.
45 Returns:
46 dict: Dictionary mapping input names to lists of membership functions.
47 """
48 return self.input_mfs
50 def forward(self, x: np.ndarray) -> dict[str, np.ndarray]:
51 """Performs forward pass to compute membership degrees for all inputs.
53 Parameters:
54 x (np.ndarray): Input data with shape (batch_size, n_inputs).
56 Returns:
57 dict: Dictionary mapping input names to membership degree arrays.
58 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)}
59 """
60 _batch_size = x.shape[0]
61 membership_outputs = {}
63 # Compute membership degrees for each input variable
64 for i, name in enumerate(self.input_names):
65 mfs = self.input_mfs[name]
66 # Apply each membership function to the i-th input
67 mu_values = []
68 for mf in mfs:
69 mu = mf(x[:, i]) # (batch_size,)
70 mu_values.append(mu)
72 # Stack membership values for all MFs of this input
73 membership_outputs[name] = np.stack(mu_values, axis=-1) # (batch_size, n_mfs)
75 # Cache values for backward pass
76 self.last = {"x": x, "membership_outputs": membership_outputs}
78 return membership_outputs
80 def backward(self, gradients: dict[str, np.ndarray]) -> dict[str, dict[str, list[dict[str, float]]]]:
81 """Performs backward pass to compute gradients for membership functions.
83 Parameters:
84 gradients (dict): Dictionary mapping input names to gradient arrays.
85 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)}
87 Returns:
88 dict: Nested structure with parameter gradients mirroring ``model.get_gradients()``.
89 """
90 param_grads: dict[str, list[dict[str, float]]] = {}
92 for name in self.input_names:
93 mfs = self.input_mfs[name]
94 grad_array = gradients[name]
95 mf_param_grads: list[dict[str, float]] = []
97 for mf_idx, mf in enumerate(mfs):
98 prev = {key: float(value) for key, value in mf.gradients.items()}
99 mf_gradient = grad_array[:, mf_idx]
100 mf.backward(mf_gradient)
101 updated = mf.gradients
102 delta = {key: float(updated[key] - prev.get(key, 0.0)) for key in updated}
103 mf_param_grads.append(delta)
105 param_grads[name] = mf_param_grads
107 return {"membership": param_grads}
109 def reset(self) -> None:
110 """Resets all membership functions to their initial state.
112 Returns:
113 None
114 """
115 for name in self.input_names:
116 for mf in self.input_mfs[name]:
117 mf.reset()
118 self.last = {}
121class RuleLayer:
122 """Rule layer for ANFIS (Adaptive Neuro-Fuzzy Inference System).
124 This layer computes the rule strengths (firing strengths) by applying
125 the T-norm (typically product) operation to the membership degrees of
126 all input variables for each rule.
128 This is the second layer of ANFIS that takes membership degrees from
129 the MembershipLayer and computes rule activations.
131 Attributes:
132 input_names (list): List of input variable names.
133 n_inputs (int): Number of input variables.
134 mf_per_input (list): Number of membership functions per input.
135 rules (list): List of all possible rule combinations.
136 last (dict): Cache of last forward pass computations for backward pass.
137 """
139 def __init__(
140 self,
141 input_names: list[str],
142 mf_per_input: list[int],
143 rules: Sequence[Sequence[int]] | None = None,
144 ):
145 """Initializes the rule layer with input configuration.
147 Parameters:
148 input_names (list): List of input variable names.
149 mf_per_input (list): Number of membership functions per input variable.
150 rules (Sequence[Sequence[int]] | None): Optional explicit rule set where each
151 rule is a sequence of membership-function indices, one per input. When
152 ``None``, the full Cartesian product of membership functions is used.
153 """
154 self.input_names = input_names
155 self.n_inputs = len(input_names)
156 self.mf_per_input = list(mf_per_input)
158 if rules is None:
159 # Generate all possible rule combinations (Cartesian product)
160 self.rules = [tuple(rule) for rule in product(*[range(n) for n in self.mf_per_input])]
161 else:
162 validated_rules: list[tuple[int, ...]] = []
163 for idx, rule in enumerate(rules):
164 if len(rule) != self.n_inputs:
165 raise ValueError(
166 "Each rule must specify exactly one membership index per input. "
167 f"Rule at position {idx} has length {len(rule)} while {self.n_inputs} were expected."
168 )
169 normalized_rule: list[int] = []
170 for input_idx, mf_idx in enumerate(rule):
171 max_mf = self.mf_per_input[input_idx]
172 if not 0 <= mf_idx < max_mf:
173 raise ValueError(
174 "Rule membership index out of range. "
175 f"Received {mf_idx} for input {input_idx} with {max_mf} membership functions."
176 )
177 normalized_rule.append(int(mf_idx))
178 validated_rules.append(tuple(normalized_rule))
180 if not validated_rules:
181 raise ValueError("At least one rule must be provided when specifying custom rules.")
182 self.rules = validated_rules
184 self.n_rules = len(self.rules)
186 self.last: dict[str, Any] = {}
188 def forward(self, membership_outputs: dict[str, np.ndarray]) -> np.ndarray:
189 """Performs forward pass to compute rule strengths.
191 Parameters:
192 membership_outputs (dict): Dictionary mapping input names to membership degree arrays.
193 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)}
195 Returns:
196 np.ndarray: Rule strengths with shape (batch_size, n_rules).
197 """
198 # Convert membership outputs to array format for easier processing
199 mu_list = []
200 for name in self.input_names:
201 mu_list.append(membership_outputs[name]) # (batch_size, n_mfs)
202 mu = np.stack(mu_list, axis=1) # (batch_size, n_inputs, n_mfs)
204 _batch_size = mu.shape[0]
206 # Compute rule activations (firing strengths)
207 rule_activations_list: list[np.ndarray] = []
208 for rule in self.rules:
209 rule_mu = []
210 # Get membership degree for each input in this rule
211 for input_idx, mf_idx in enumerate(rule):
212 rule_mu.append(mu[:, input_idx, mf_idx]) # (batch_size,)
213 # Apply T-norm (product) to get rule strength
214 rule_strength = np.prod(rule_mu, axis=0) # (batch_size,)
215 rule_activations_list.append(rule_strength)
217 rule_activations = np.stack(rule_activations_list, axis=1) # (batch_size, n_rules)
219 # Cache values for backward pass
220 self.last = {"membership_outputs": membership_outputs, "mu": mu, "rule_activations": rule_activations}
222 return rule_activations
224 def backward(self, dL_dw: np.ndarray) -> dict[str, np.ndarray]:
225 """Performs backward pass to compute gradients for membership functions.
227 Parameters:
228 dL_dw (np.ndarray): Gradient of loss with respect to rule strengths.
229 Shape: (batch_size, n_rules)
231 Returns:
232 dict: Dictionary mapping input names to gradient arrays for membership functions.
233 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)}
234 """
235 batch_size = dL_dw.shape[0]
236 mu = self.last["mu"] # (batch_size, n_inputs, n_mfs)
238 # Initialize gradient accumulators for each input's membership functions
239 gradients = {}
240 for i, name in enumerate(self.input_names):
241 n_mfs = self.mf_per_input[i]
242 gradients[name] = np.zeros((batch_size, n_mfs))
244 # Compute gradients for each rule
245 for rule_idx, rule in enumerate(self.rules):
246 for input_idx, mf_idx in enumerate(rule):
247 name = self.input_names[input_idx]
249 # Compute partial derivative: d(rule_strength)/d(mu_ij)
250 # This is the product of all other membership degrees in the rule
251 other_factors = []
252 for j, j_mf in enumerate(rule):
253 if j == input_idx:
254 continue # Skip the current input
255 other_factors.append(mu[:, j, j_mf])
257 # Product of other factors (or 1 if no other factors)
258 partial = np.prod(other_factors, axis=0) if other_factors else np.ones(batch_size)
260 # Apply chain rule: dL/dmu = dL/dw * dw/dmu
261 gradients[name][:, mf_idx] += dL_dw[:, rule_idx] * partial
263 return gradients
266class NormalizationLayer:
267 """Normalization layer for ANFIS (Adaptive Neuro-Fuzzy Inference System).
269 This layer normalizes the rule strengths (firing strengths) to ensure
270 they sum to 1.0 for each sample in the batch. This is a crucial step
271 in ANFIS as it converts rule strengths to normalized rule weights.
273 The normalization formula is: norm_w_i = w_i / sum(w_j for all j)
275 Attributes:
276 last (dict): Cache of last forward pass computations for backward pass.
277 """
279 def __init__(self) -> None:
280 """Initializes the normalization layer."""
281 self.last: dict[str, Any] = {}
283 def forward(self, w: np.ndarray) -> np.ndarray:
284 """Performs forward pass to normalize rule weights.
286 Parameters:
287 w (np.ndarray): Rule strengths with shape (batch_size, n_rules).
289 Returns:
290 np.ndarray: Normalized rule weights with shape (batch_size, n_rules).
291 Each row sums to 1.0.
292 """
293 # Add small epsilon to avoid division by zero
294 sum_w = np.sum(w, axis=1, keepdims=True) + 1e-8
295 norm_w = w / sum_w
297 # Cache values for backward pass
298 self.last = {"w": w, "sum_w": sum_w, "norm_w": norm_w}
299 return cast(np.ndarray, norm_w)
301 def backward(self, dL_dnorm_w: np.ndarray) -> np.ndarray:
302 """Performs backward pass to compute gradients for original rule weights.
304 The gradient computation uses the quotient rule for derivatives:
305 If norm_w_i = w_i / sum_w, then:
306 - d(norm_w_i)/d(w_i) = (sum_w - w_i) / sum_w²
307 - d(norm_w_i)/d(w_j) = -w_j / sum_w² for j ≠ i
309 Parameters:
310 dL_dnorm_w (np.ndarray): Gradient of loss with respect to normalized weights.
311 Shape: (batch_size, n_rules)
313 Returns:
314 np.ndarray: Gradient of loss with respect to original weights.
315 Shape: (batch_size, n_rules)
316 """
317 w = self.last["w"] # (batch_size, n_rules)
318 sum_w = self.last["sum_w"] # (batch_size, 1)
320 # Jacobian-vector product without building the full Jacobian:
321 # (J^T g)_j = (sum_w * g_j - (g · w)) / sum_w^2
322 g = dL_dnorm_w # (batch_size, n_rules)
323 s = sum_w # (batch_size, 1)
324 gw_dot = np.sum(g * w, axis=1, keepdims=True) # (batch_size, 1)
325 dL_dw = (s * g - gw_dot) / (s**2) # (batch_size, n_rules)
327 return cast(np.ndarray, dL_dw)
330class ConsequentLayer:
331 """Consequent layer for ANFIS (Adaptive Neuro-Fuzzy Inference System).
333 This layer implements the consequent part of fuzzy rules in ANFIS.
334 Each rule has a linear consequent function of the form:
335 f_i(x) = p_i * x_1 + q_i * x_2 + ... + r_i (TSK model)
337 The final output is computed as a weighted sum:
338 y = Σ(w_i * f_i(x)) where w_i are normalized rule weights
340 Attributes:
341 n_rules (int): Number of fuzzy rules.
342 n_inputs (int): Number of input variables.
343 parameters (np.ndarray): Linear parameters for each rule with shape (n_rules, n_inputs + 1).
344 Each row contains [p_i, q_i, ..., r_i] for rule i.
345 gradients (np.ndarray): Accumulated gradients for parameters.
346 last (dict): Cache of last forward pass computations for backward pass.
347 """
349 def __init__(self, n_rules: int, n_inputs: int):
350 """Initializes the consequent layer with random linear parameters.
352 Parameters:
353 n_rules (int): Number of fuzzy rules.
354 n_inputs (int): Number of input variables.
355 """
356 # Each rule has (n_inputs + 1) parameters: p_i, q_i, ..., r_i (including bias)
357 self.n_rules = n_rules
358 self.n_inputs = n_inputs
359 self.parameters = np.random.randn(n_rules, n_inputs + 1)
360 self.gradients = np.zeros_like(self.parameters)
361 self.last: dict[str, Any] = {}
363 def forward(self, x: np.ndarray, norm_w: np.ndarray) -> np.ndarray:
364 """Performs forward pass to compute the final ANFIS output.
366 Parameters:
367 x (np.ndarray): Input data with shape (batch_size, n_inputs).
368 norm_w (np.ndarray): Normalized rule weights with shape (batch_size, n_rules).
370 Returns:
371 np.ndarray: Final ANFIS output with shape (batch_size, 1).
372 """
373 batch_size = x.shape[0]
375 # Augment input with bias term (column of ones)
376 X_aug = np.hstack([x, np.ones((batch_size, 1))]) # (batch_size, n_inputs + 1)
378 # Compute consequent function f_i(x) for each rule
379 # f[b, i] = p_i * x[b, 0] + q_i * x[b, 1] + ... + r_i
380 f = X_aug @ self.parameters.T # (batch_size, n_rules)
382 # Compute final output as weighted sum: y = Σ(w_i * f_i(x))
383 y_hat = np.sum(norm_w * f, axis=1, keepdims=True) # (batch_size, 1)
385 # Cache values for backward pass
386 self.last = {"X_aug": X_aug, "norm_w": norm_w, "f": f}
388 return cast(np.ndarray, y_hat)
390 def backward(self, dL_dy: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
391 """Performs backward pass to compute gradients for parameters and inputs.
393 Parameters:
394 dL_dy (np.ndarray): Gradient of loss with respect to layer output.
395 Shape: (batch_size, 1)
397 Returns:
398 tuple: (dL_dnorm_w, dL_dx) where:
399 - dL_dnorm_w: Gradient w.r.t. normalized weights, shape (batch_size, n_rules)
400 - dL_dx: Gradient w.r.t. input x, shape (batch_size, n_inputs)
401 """
402 X_aug = self.last["X_aug"] # (batch_size, n_inputs + 1)
403 norm_w = self.last["norm_w"] # (batch_size, n_rules)
404 f = self.last["f"] # (batch_size, n_rules)
406 batch_size = X_aug.shape[0]
408 # Compute gradients for consequent parameters
409 self.gradients = np.zeros_like(self.parameters)
411 for i in range(self.n_rules):
412 # Gradient of y_hat w.r.t. parameters of rule i: norm_w_i * x_aug
413 for b in range(batch_size):
414 self.gradients[i] += dL_dy[b, 0] * norm_w[b, i] * X_aug[b]
416 # Compute gradient of loss w.r.t. normalized weights
417 # dy/dnorm_w_i = f_i(x), so dL/dnorm_w_i = dL/dy * f_i(x)
418 dL_dnorm_w = dL_dy * f # (batch_size, n_rules)
420 # Compute gradient of loss w.r.t. input x (for backpropagation to previous layers)
421 dL_dx = np.zeros((batch_size, self.n_inputs))
423 for b in range(batch_size):
424 for i in range(self.n_rules):
425 # dy/dx = norm_w_i * parameters_i[:-1] (excluding bias term)
426 dL_dx[b] += dL_dy[b, 0] * norm_w[b, i] * self.parameters[i, :-1]
428 return dL_dnorm_w, dL_dx
430 def reset(self) -> None:
431 """Resets gradients and cached values.
433 Returns:
434 None
435 """
436 self.gradients = np.zeros_like(self.parameters)
437 self.last = {}
440class ClassificationConsequentLayer:
441 """Consequent layer that produces per-class logits for classification.
443 Each rule i has a vector of class logits with a linear function of inputs:
444 f_i(x) = W_i x + b_i, where W_i has shape (n_classes, n_inputs) and b_i (n_classes,).
445 We store parameters as a single array of shape (n_rules, n_classes, n_inputs + 1).
446 """
448 def __init__(self, n_rules: int, n_inputs: int, n_classes: int, random_state: int | None = None):
449 """Initializes the layer with the specified number of rules, inputs, and classes.
451 Args:
452 n_rules (int): Number of fuzzy rules in the layer.
453 n_inputs (int): Number of input features.
454 n_classes (int): Number of output classes.
455 random_state (int | None): Random seed for parameter initialization.
457 Attributes:
458 n_rules (int): Stores the number of fuzzy rules.
459 n_inputs (int): Stores the number of input features.
460 n_classes (int): Stores the number of output classes.
463 parameters (np.ndarray): Randomly initialized parameters for each rule, class, and input (including bias).
464 gradients (np.ndarray): Gradient values initialized to zeros, matching the shape of parameters.
465 last (dict): Dictionary for storing intermediate results or state.
466 """
467 self.n_rules = n_rules
468 self.n_inputs = n_inputs
469 self.n_classes = n_classes
470 if random_state is None:
471 self.parameters = np.random.randn(n_rules, n_classes, n_inputs + 1)
472 else:
473 rng = np.random.default_rng(random_state)
474 self.parameters = rng.normal(size=(n_rules, n_classes, n_inputs + 1))
475 self.gradients = np.zeros_like(self.parameters)
476 self.last: dict[str, Any] = {}
478 def forward(self, x: np.ndarray, norm_w: np.ndarray) -> np.ndarray:
479 """Computes the forward pass for the classification consequent layer."""
480 batch = x.shape[0]
481 X_aug = np.hstack([x, np.ones((batch, 1))]) # (b, d+1)
482 # Compute per-rule class logits: (b, r, k)
483 f = np.einsum("bd,rkd->brk", X_aug, self.parameters)
484 # Weighted sum over rules -> logits (b, k)
485 logits = np.einsum("br,brk->bk", norm_w, f)
486 self.last = {"X_aug": X_aug, "norm_w": norm_w, "f": f}
487 return cast(np.ndarray, logits)
489 def backward(self, dL_dlogits: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
490 """Computes the backward pass for the classification consequent layer."""
491 X_aug = self.last["X_aug"] # (b, d+1)
492 norm_w = self.last["norm_w"] # (b, r)
493 f = self.last["f"] # (b, r, k)
495 # Gradients w.r.t. per-rule parameters
496 self.gradients = np.zeros_like(self.parameters)
497 # dL/df_{brk} = dL/dlogits_{bk} * norm_w_{br}
498 dL_df = dL_dlogits[:, None, :] * norm_w[:, :, None] # (b, r, k)
499 # Accumulate over batch: grad[r,k,d] = sum_b dL_df[b,r,k] * X_aug[b,d]
500 self.gradients = np.einsum("brk,bd->rkd", dL_df, X_aug)
502 # dL/dnorm_w: sum_k dL/dlogits_{bk} * f_{brk}
503 dL_dnorm_w = np.einsum("bk,brk->br", dL_dlogits, f)
505 # dL/dx: sum_r sum_k dL/dlogits_{bk} * norm_w_{br} * W_{r,k,:}
506 W = self.parameters[:, :, :-1] # (r,k,d)
507 dL_dx = np.einsum("bk,br,rkd->bd", dL_dlogits, norm_w, W)
508 return dL_dnorm_w, dL_dx
510 def reset(self) -> None:
511 """Resets the gradients and cached values."""
512 self.gradients = np.zeros_like(self.parameters)
513 self.last = {}