Coverage for anfis_toolbox / layers.py: 100%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-05 18:47 -0300

1from collections.abc import Sequence 

2from itertools import product 

3from typing import Any, cast 

4 

5import numpy as np 

6 

7from .membership import MembershipFunction 

8 

9 

10class MembershipLayer: 

11 """Membership layer for ANFIS (Adaptive Neuro-Fuzzy Inference System). 

12 

13 This is the first layer of ANFIS that applies membership functions to 

14 input variables. Each input variable has multiple membership functions 

15 that transform crisp input values into fuzzy membership degrees. 

16 

17 This layer serves as the fuzzification stage, converting crisp inputs 

18 into fuzzy sets that can be processed by subsequent ANFIS layers. 

19 

20 Attributes: 

21 input_mfs (dict): Dictionary mapping input names to lists of membership functions. 

22 input_names (list): List of input variable names. 

23 n_inputs (int): Number of input variables. 

24 mf_per_input (list): Number of membership functions per input. 

25 last (dict): Cache of last forward pass computations for backward pass. 

26 """ 

27 

28 def __init__(self, input_mfs: dict[str, list[MembershipFunction]]) -> None: 

29 """Initializes the membership layer with input membership functions. 

30 

31 Parameters: 

32 input_mfs (dict): Dictionary mapping input names to lists of membership functions. 

33 Format: {input_name: [MembershipFunction, ...]} 

34 """ 

35 self.input_mfs = input_mfs 

36 self.input_names = list(input_mfs.keys()) 

37 self.n_inputs = len(input_mfs) 

38 self.mf_per_input = [len(mfs) for mfs in input_mfs.values()] 

39 self.last: dict[str, Any] = {} 

40 

41 @property 

42 def membership_functions(self) -> dict[str, list[MembershipFunction]]: 

43 """Alias for input_mfs to provide a standardized interface. 

44 

45 Returns: 

46 dict: Dictionary mapping input names to lists of membership functions. 

47 """ 

48 return self.input_mfs 

49 

50 def forward(self, x: np.ndarray) -> dict[str, np.ndarray]: 

51 """Performs forward pass to compute membership degrees for all inputs. 

52 

53 Parameters: 

54 x (np.ndarray): Input data with shape (batch_size, n_inputs). 

55 

56 Returns: 

57 dict: Dictionary mapping input names to membership degree arrays. 

58 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)} 

59 """ 

60 _batch_size = x.shape[0] 

61 membership_outputs = {} 

62 

63 # Compute membership degrees for each input variable 

64 for i, name in enumerate(self.input_names): 

65 mfs = self.input_mfs[name] 

66 # Apply each membership function to the i-th input 

67 mu_values = [] 

68 for mf in mfs: 

69 mu = mf(x[:, i]) # (batch_size,) 

70 mu_values.append(mu) 

71 

72 # Stack membership values for all MFs of this input 

73 membership_outputs[name] = np.stack(mu_values, axis=-1) # (batch_size, n_mfs) 

74 

75 # Cache values for backward pass 

76 self.last = {"x": x, "membership_outputs": membership_outputs} 

77 

78 return membership_outputs 

79 

80 def backward(self, gradients: dict[str, np.ndarray]) -> dict[str, dict[str, list[dict[str, float]]]]: 

81 """Performs backward pass to compute gradients for membership functions. 

82 

83 Parameters: 

84 gradients (dict): Dictionary mapping input names to gradient arrays. 

85 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)} 

86 

87 Returns: 

88 dict: Nested structure with parameter gradients mirroring ``model.get_gradients()``. 

89 """ 

90 param_grads: dict[str, list[dict[str, float]]] = {} 

91 

92 for name in self.input_names: 

93 mfs = self.input_mfs[name] 

94 grad_array = gradients[name] 

95 mf_param_grads: list[dict[str, float]] = [] 

96 

97 for mf_idx, mf in enumerate(mfs): 

98 prev = {key: float(value) for key, value in mf.gradients.items()} 

99 mf_gradient = grad_array[:, mf_idx] 

100 mf.backward(mf_gradient) 

101 updated = mf.gradients 

102 delta = {key: float(updated[key] - prev.get(key, 0.0)) for key in updated} 

103 mf_param_grads.append(delta) 

104 

105 param_grads[name] = mf_param_grads 

106 

107 return {"membership": param_grads} 

108 

109 def reset(self) -> None: 

110 """Resets all membership functions to their initial state. 

111 

112 Returns: 

113 None 

114 """ 

115 for name in self.input_names: 

116 for mf in self.input_mfs[name]: 

117 mf.reset() 

118 self.last = {} 

119 

120 

121class RuleLayer: 

122 """Rule layer for ANFIS (Adaptive Neuro-Fuzzy Inference System). 

123 

124 This layer computes the rule strengths (firing strengths) by applying 

125 the T-norm (typically product) operation to the membership degrees of 

126 all input variables for each rule. 

127 

128 This is the second layer of ANFIS that takes membership degrees from 

129 the MembershipLayer and computes rule activations. 

130 

131 Attributes: 

132 input_names (list): List of input variable names. 

133 n_inputs (int): Number of input variables. 

134 mf_per_input (list): Number of membership functions per input. 

135 rules (list): List of all possible rule combinations. 

136 last (dict): Cache of last forward pass computations for backward pass. 

137 """ 

138 

139 def __init__( 

140 self, 

141 input_names: list[str], 

142 mf_per_input: list[int], 

143 rules: Sequence[Sequence[int]] | None = None, 

144 ): 

145 """Initializes the rule layer with input configuration. 

146 

147 Parameters: 

148 input_names (list): List of input variable names. 

149 mf_per_input (list): Number of membership functions per input variable. 

150 rules (Sequence[Sequence[int]] | None): Optional explicit rule set where each 

151 rule is a sequence of membership-function indices, one per input. When 

152 ``None``, the full Cartesian product of membership functions is used. 

153 """ 

154 self.input_names = input_names 

155 self.n_inputs = len(input_names) 

156 self.mf_per_input = list(mf_per_input) 

157 

158 if rules is None: 

159 # Generate all possible rule combinations (Cartesian product) 

160 self.rules = [tuple(rule) for rule in product(*[range(n) for n in self.mf_per_input])] 

161 else: 

162 validated_rules: list[tuple[int, ...]] = [] 

163 for idx, rule in enumerate(rules): 

164 if len(rule) != self.n_inputs: 

165 raise ValueError( 

166 "Each rule must specify exactly one membership index per input. " 

167 f"Rule at position {idx} has length {len(rule)} while {self.n_inputs} were expected." 

168 ) 

169 normalized_rule: list[int] = [] 

170 for input_idx, mf_idx in enumerate(rule): 

171 max_mf = self.mf_per_input[input_idx] 

172 if not 0 <= mf_idx < max_mf: 

173 raise ValueError( 

174 "Rule membership index out of range. " 

175 f"Received {mf_idx} for input {input_idx} with {max_mf} membership functions." 

176 ) 

177 normalized_rule.append(int(mf_idx)) 

178 validated_rules.append(tuple(normalized_rule)) 

179 

180 if not validated_rules: 

181 raise ValueError("At least one rule must be provided when specifying custom rules.") 

182 self.rules = validated_rules 

183 

184 self.n_rules = len(self.rules) 

185 

186 self.last: dict[str, Any] = {} 

187 

188 def forward(self, membership_outputs: dict[str, np.ndarray]) -> np.ndarray: 

189 """Performs forward pass to compute rule strengths. 

190 

191 Parameters: 

192 membership_outputs (dict): Dictionary mapping input names to membership degree arrays. 

193 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)} 

194 

195 Returns: 

196 np.ndarray: Rule strengths with shape (batch_size, n_rules). 

197 """ 

198 # Convert membership outputs to array format for easier processing 

199 mu_list = [] 

200 for name in self.input_names: 

201 mu_list.append(membership_outputs[name]) # (batch_size, n_mfs) 

202 mu = np.stack(mu_list, axis=1) # (batch_size, n_inputs, n_mfs) 

203 

204 _batch_size = mu.shape[0] 

205 

206 # Compute rule activations (firing strengths) 

207 rule_activations_list: list[np.ndarray] = [] 

208 for rule in self.rules: 

209 rule_mu = [] 

210 # Get membership degree for each input in this rule 

211 for input_idx, mf_idx in enumerate(rule): 

212 rule_mu.append(mu[:, input_idx, mf_idx]) # (batch_size,) 

213 # Apply T-norm (product) to get rule strength 

214 rule_strength = np.prod(rule_mu, axis=0) # (batch_size,) 

215 rule_activations_list.append(rule_strength) 

216 

217 rule_activations = np.stack(rule_activations_list, axis=1) # (batch_size, n_rules) 

218 

219 # Cache values for backward pass 

220 self.last = {"membership_outputs": membership_outputs, "mu": mu, "rule_activations": rule_activations} 

221 

222 return rule_activations 

223 

224 def backward(self, dL_dw: np.ndarray) -> dict[str, np.ndarray]: 

225 """Performs backward pass to compute gradients for membership functions. 

226 

227 Parameters: 

228 dL_dw (np.ndarray): Gradient of loss with respect to rule strengths. 

229 Shape: (batch_size, n_rules) 

230 

231 Returns: 

232 dict: Dictionary mapping input names to gradient arrays for membership functions. 

233 Format: {input_name: np.ndarray with shape (batch_size, n_mfs)} 

234 """ 

235 batch_size = dL_dw.shape[0] 

236 mu = self.last["mu"] # (batch_size, n_inputs, n_mfs) 

237 

238 # Initialize gradient accumulators for each input's membership functions 

239 gradients = {} 

240 for i, name in enumerate(self.input_names): 

241 n_mfs = self.mf_per_input[i] 

242 gradients[name] = np.zeros((batch_size, n_mfs)) 

243 

244 # Compute gradients for each rule 

245 for rule_idx, rule in enumerate(self.rules): 

246 for input_idx, mf_idx in enumerate(rule): 

247 name = self.input_names[input_idx] 

248 

249 # Compute partial derivative: d(rule_strength)/d(mu_ij) 

250 # This is the product of all other membership degrees in the rule 

251 other_factors = [] 

252 for j, j_mf in enumerate(rule): 

253 if j == input_idx: 

254 continue # Skip the current input 

255 other_factors.append(mu[:, j, j_mf]) 

256 

257 # Product of other factors (or 1 if no other factors) 

258 partial = np.prod(other_factors, axis=0) if other_factors else np.ones(batch_size) 

259 

260 # Apply chain rule: dL/dmu = dL/dw * dw/dmu 

261 gradients[name][:, mf_idx] += dL_dw[:, rule_idx] * partial 

262 

263 return gradients 

264 

265 

266class NormalizationLayer: 

267 """Normalization layer for ANFIS (Adaptive Neuro-Fuzzy Inference System). 

268 

269 This layer normalizes the rule strengths (firing strengths) to ensure 

270 they sum to 1.0 for each sample in the batch. This is a crucial step 

271 in ANFIS as it converts rule strengths to normalized rule weights. 

272 

273 The normalization formula is: norm_w_i = w_i / sum(w_j for all j) 

274 

275 Attributes: 

276 last (dict): Cache of last forward pass computations for backward pass. 

277 """ 

278 

279 def __init__(self) -> None: 

280 """Initializes the normalization layer.""" 

281 self.last: dict[str, Any] = {} 

282 

283 def forward(self, w: np.ndarray) -> np.ndarray: 

284 """Performs forward pass to normalize rule weights. 

285 

286 Parameters: 

287 w (np.ndarray): Rule strengths with shape (batch_size, n_rules). 

288 

289 Returns: 

290 np.ndarray: Normalized rule weights with shape (batch_size, n_rules). 

291 Each row sums to 1.0. 

292 """ 

293 # Add small epsilon to avoid division by zero 

294 sum_w = np.sum(w, axis=1, keepdims=True) + 1e-8 

295 norm_w = w / sum_w 

296 

297 # Cache values for backward pass 

298 self.last = {"w": w, "sum_w": sum_w, "norm_w": norm_w} 

299 return cast(np.ndarray, norm_w) 

300 

301 def backward(self, dL_dnorm_w: np.ndarray) -> np.ndarray: 

302 """Performs backward pass to compute gradients for original rule weights. 

303 

304 The gradient computation uses the quotient rule for derivatives: 

305 If norm_w_i = w_i / sum_w, then: 

306 - d(norm_w_i)/d(w_i) = (sum_w - w_i) / sum_w² 

307 - d(norm_w_i)/d(w_j) = -w_j / sum_w² for j ≠ i 

308 

309 Parameters: 

310 dL_dnorm_w (np.ndarray): Gradient of loss with respect to normalized weights. 

311 Shape: (batch_size, n_rules) 

312 

313 Returns: 

314 np.ndarray: Gradient of loss with respect to original weights. 

315 Shape: (batch_size, n_rules) 

316 """ 

317 w = self.last["w"] # (batch_size, n_rules) 

318 sum_w = self.last["sum_w"] # (batch_size, 1) 

319 

320 # Jacobian-vector product without building the full Jacobian: 

321 # (J^T g)_j = (sum_w * g_j - (g · w)) / sum_w^2 

322 g = dL_dnorm_w # (batch_size, n_rules) 

323 s = sum_w # (batch_size, 1) 

324 gw_dot = np.sum(g * w, axis=1, keepdims=True) # (batch_size, 1) 

325 dL_dw = (s * g - gw_dot) / (s**2) # (batch_size, n_rules) 

326 

327 return cast(np.ndarray, dL_dw) 

328 

329 

330class ConsequentLayer: 

331 """Consequent layer for ANFIS (Adaptive Neuro-Fuzzy Inference System). 

332 

333 This layer implements the consequent part of fuzzy rules in ANFIS. 

334 Each rule has a linear consequent function of the form: 

335 f_i(x) = p_i * x_1 + q_i * x_2 + ... + r_i (TSK model) 

336 

337 The final output is computed as a weighted sum: 

338 y = Σ(w_i * f_i(x)) where w_i are normalized rule weights 

339 

340 Attributes: 

341 n_rules (int): Number of fuzzy rules. 

342 n_inputs (int): Number of input variables. 

343 parameters (np.ndarray): Linear parameters for each rule with shape (n_rules, n_inputs + 1). 

344 Each row contains [p_i, q_i, ..., r_i] for rule i. 

345 gradients (np.ndarray): Accumulated gradients for parameters. 

346 last (dict): Cache of last forward pass computations for backward pass. 

347 """ 

348 

349 def __init__(self, n_rules: int, n_inputs: int): 

350 """Initializes the consequent layer with random linear parameters. 

351 

352 Parameters: 

353 n_rules (int): Number of fuzzy rules. 

354 n_inputs (int): Number of input variables. 

355 """ 

356 # Each rule has (n_inputs + 1) parameters: p_i, q_i, ..., r_i (including bias) 

357 self.n_rules = n_rules 

358 self.n_inputs = n_inputs 

359 self.parameters = np.random.randn(n_rules, n_inputs + 1) 

360 self.gradients = np.zeros_like(self.parameters) 

361 self.last: dict[str, Any] = {} 

362 

363 def forward(self, x: np.ndarray, norm_w: np.ndarray) -> np.ndarray: 

364 """Performs forward pass to compute the final ANFIS output. 

365 

366 Parameters: 

367 x (np.ndarray): Input data with shape (batch_size, n_inputs). 

368 norm_w (np.ndarray): Normalized rule weights with shape (batch_size, n_rules). 

369 

370 Returns: 

371 np.ndarray: Final ANFIS output with shape (batch_size, 1). 

372 """ 

373 batch_size = x.shape[0] 

374 

375 # Augment input with bias term (column of ones) 

376 X_aug = np.hstack([x, np.ones((batch_size, 1))]) # (batch_size, n_inputs + 1) 

377 

378 # Compute consequent function f_i(x) for each rule 

379 # f[b, i] = p_i * x[b, 0] + q_i * x[b, 1] + ... + r_i 

380 f = X_aug @ self.parameters.T # (batch_size, n_rules) 

381 

382 # Compute final output as weighted sum: y = Σ(w_i * f_i(x)) 

383 y_hat = np.sum(norm_w * f, axis=1, keepdims=True) # (batch_size, 1) 

384 

385 # Cache values for backward pass 

386 self.last = {"X_aug": X_aug, "norm_w": norm_w, "f": f} 

387 

388 return cast(np.ndarray, y_hat) 

389 

390 def backward(self, dL_dy: np.ndarray) -> tuple[np.ndarray, np.ndarray]: 

391 """Performs backward pass to compute gradients for parameters and inputs. 

392 

393 Parameters: 

394 dL_dy (np.ndarray): Gradient of loss with respect to layer output. 

395 Shape: (batch_size, 1) 

396 

397 Returns: 

398 tuple: (dL_dnorm_w, dL_dx) where: 

399 - dL_dnorm_w: Gradient w.r.t. normalized weights, shape (batch_size, n_rules) 

400 - dL_dx: Gradient w.r.t. input x, shape (batch_size, n_inputs) 

401 """ 

402 X_aug = self.last["X_aug"] # (batch_size, n_inputs + 1) 

403 norm_w = self.last["norm_w"] # (batch_size, n_rules) 

404 f = self.last["f"] # (batch_size, n_rules) 

405 

406 batch_size = X_aug.shape[0] 

407 

408 # Compute gradients for consequent parameters 

409 self.gradients = np.zeros_like(self.parameters) 

410 

411 for i in range(self.n_rules): 

412 # Gradient of y_hat w.r.t. parameters of rule i: norm_w_i * x_aug 

413 for b in range(batch_size): 

414 self.gradients[i] += dL_dy[b, 0] * norm_w[b, i] * X_aug[b] 

415 

416 # Compute gradient of loss w.r.t. normalized weights 

417 # dy/dnorm_w_i = f_i(x), so dL/dnorm_w_i = dL/dy * f_i(x) 

418 dL_dnorm_w = dL_dy * f # (batch_size, n_rules) 

419 

420 # Compute gradient of loss w.r.t. input x (for backpropagation to previous layers) 

421 dL_dx = np.zeros((batch_size, self.n_inputs)) 

422 

423 for b in range(batch_size): 

424 for i in range(self.n_rules): 

425 # dy/dx = norm_w_i * parameters_i[:-1] (excluding bias term) 

426 dL_dx[b] += dL_dy[b, 0] * norm_w[b, i] * self.parameters[i, :-1] 

427 

428 return dL_dnorm_w, dL_dx 

429 

430 def reset(self) -> None: 

431 """Resets gradients and cached values. 

432 

433 Returns: 

434 None 

435 """ 

436 self.gradients = np.zeros_like(self.parameters) 

437 self.last = {} 

438 

439 

440class ClassificationConsequentLayer: 

441 """Consequent layer that produces per-class logits for classification. 

442 

443 Each rule i has a vector of class logits with a linear function of inputs: 

444 f_i(x) = W_i x + b_i, where W_i has shape (n_classes, n_inputs) and b_i (n_classes,). 

445 We store parameters as a single array of shape (n_rules, n_classes, n_inputs + 1). 

446 """ 

447 

448 def __init__(self, n_rules: int, n_inputs: int, n_classes: int, random_state: int | None = None): 

449 """Initializes the layer with the specified number of rules, inputs, and classes. 

450 

451 Args: 

452 n_rules (int): Number of fuzzy rules in the layer. 

453 n_inputs (int): Number of input features. 

454 n_classes (int): Number of output classes. 

455 random_state (int | None): Random seed for parameter initialization. 

456 

457 Attributes: 

458 n_rules (int): Stores the number of fuzzy rules. 

459 n_inputs (int): Stores the number of input features. 

460 n_classes (int): Stores the number of output classes. 

461 

462 

463 parameters (np.ndarray): Randomly initialized parameters for each rule, class, and input (including bias). 

464 gradients (np.ndarray): Gradient values initialized to zeros, matching the shape of parameters. 

465 last (dict): Dictionary for storing intermediate results or state. 

466 """ 

467 self.n_rules = n_rules 

468 self.n_inputs = n_inputs 

469 self.n_classes = n_classes 

470 if random_state is None: 

471 self.parameters = np.random.randn(n_rules, n_classes, n_inputs + 1) 

472 else: 

473 rng = np.random.default_rng(random_state) 

474 self.parameters = rng.normal(size=(n_rules, n_classes, n_inputs + 1)) 

475 self.gradients = np.zeros_like(self.parameters) 

476 self.last: dict[str, Any] = {} 

477 

478 def forward(self, x: np.ndarray, norm_w: np.ndarray) -> np.ndarray: 

479 """Computes the forward pass for the classification consequent layer.""" 

480 batch = x.shape[0] 

481 X_aug = np.hstack([x, np.ones((batch, 1))]) # (b, d+1) 

482 # Compute per-rule class logits: (b, r, k) 

483 f = np.einsum("bd,rkd->brk", X_aug, self.parameters) 

484 # Weighted sum over rules -> logits (b, k) 

485 logits = np.einsum("br,brk->bk", norm_w, f) 

486 self.last = {"X_aug": X_aug, "norm_w": norm_w, "f": f} 

487 return cast(np.ndarray, logits) 

488 

489 def backward(self, dL_dlogits: np.ndarray) -> tuple[np.ndarray, np.ndarray]: 

490 """Computes the backward pass for the classification consequent layer.""" 

491 X_aug = self.last["X_aug"] # (b, d+1) 

492 norm_w = self.last["norm_w"] # (b, r) 

493 f = self.last["f"] # (b, r, k) 

494 

495 # Gradients w.r.t. per-rule parameters 

496 self.gradients = np.zeros_like(self.parameters) 

497 # dL/df_{brk} = dL/dlogits_{bk} * norm_w_{br} 

498 dL_df = dL_dlogits[:, None, :] * norm_w[:, :, None] # (b, r, k) 

499 # Accumulate over batch: grad[r,k,d] = sum_b dL_df[b,r,k] * X_aug[b,d] 

500 self.gradients = np.einsum("brk,bd->rkd", dL_df, X_aug) 

501 

502 # dL/dnorm_w: sum_k dL/dlogits_{bk} * f_{brk} 

503 dL_dnorm_w = np.einsum("bk,brk->br", dL_dlogits, f) 

504 

505 # dL/dx: sum_r sum_k dL/dlogits_{bk} * norm_w_{br} * W_{r,k,:} 

506 W = self.parameters[:, :, :-1] # (r,k,d) 

507 dL_dx = np.einsum("bk,br,rkd->bd", dL_dlogits, norm_w, W) 

508 return dL_dnorm_w, dL_dx 

509 

510 def reset(self) -> None: 

511 """Resets the gradients and cached values.""" 

512 self.gradients = np.zeros_like(self.parameters) 

513 self.last = {}