from collections.abc import Callable
from typing import Any, Optional, Union
import keras
import keras.ops as K
import numpy as np
from keras.config import epsilon
from keras.layers import Lambda, Layer
from decomon.core import (
BallDomain,
BoxDomain,
ForwardMode,
PerturbationDomain,
get_mode,
)
from decomon.layers.activations import softmax as softmax_
from decomon.layers.core import DecomonLayer
from decomon.models.models import DecomonModel
from decomon.models.utils import Convert2Mode
from decomon.types import BackendTensor, Tensor
[docs]
def get_model(model: DecomonModel) -> DecomonModel:
ibp = model.ibp
affine = model.affine
mode = get_mode(ibp, affine)
perturbation_domain = model.perturbation_domain
inputs = model.inputs
outputs = model.outputs
new_output: keras.KerasTensor
if mode == ForwardMode.IBP:
def func(outputs: list[Tensor]) -> Tensor:
u_c, l_c = outputs
return K.concatenate([K.expand_dims(u_c, -1), K.expand_dims(l_c, -1)], -1)
new_output = Lambda(func)(outputs)
elif mode == ForwardMode.AFFINE:
def func(outputs: list[Tensor]) -> Tensor:
x_0, w_u, b_u, w_l, b_l = outputs
if len(x_0.shape) == 2:
x_0_reshaped = x_0[:, :, None]
else:
x_0_reshaped = K.transpose(x_0, (0, 2, 1))
x_fake = K.sum(x_0_reshaped, 1)[:, None]
x_0_reshaped = K.concatenate([x_0_reshaped, x_fake], 1) # (None, n_in+1, n_comp)
w_b_u = K.concatenate([w_u, b_u[:, None]], 1) # (None, n_in+1, n_out)
w_b_l = K.concatenate([w_l, b_l[:, None]], 1)
w_b = K.concatenate([w_b_u, w_b_l], -1) # (None, n_in+1, 2*n_out)
return K.concatenate([x_0_reshaped, w_b], -1) # (None, n_in+1, n_comp+2*n_out)
new_output = Lambda(func)(outputs)
elif mode == ForwardMode.HYBRID:
def func(outputs: list[Tensor]) -> Tensor:
x_0, u_c, w_u, b_u, l_c, w_l, b_l = outputs
if len(x_0.shape) == 2:
x_0_reshaped = x_0[:, :, None]
else:
x_0_reshaped = K.transpose(x_0, (0, 2, 1))
x_fake = K.sum(x_0_reshaped, 1)[:, None]
x_0_reshaped = K.concatenate([x_0_reshaped, x_fake, x_fake], 1) # (None, n_in+2, n_comp)
w_b_u = K.concatenate([w_u, b_u[:, None]], 1) # (None, n_in+1, n_out)
w_b_l = K.concatenate([w_l, b_l[:, None]], 1)
w_b = K.concatenate([w_b_u, w_b_l], -1) # (None, n_in+1, 2*n_out)
u_l = K.concatenate([u_c, l_c], 1)[:, None] # (None, 1, 2*n_out)
w_b_u_l = K.concatenate([w_b, u_l], 1) # (None, n_in+2, 2*n_out)
return K.concatenate([x_0_reshaped, w_b_u_l], -1) # (None, n_in+1, n_comp+2*n_out)
new_output = Lambda(func)(outputs)
else:
raise ValueError(f"Unknown mode {mode}")
return DecomonModel(
inputs=inputs,
outputs=new_output,
perturbation_domain=model.perturbation_domain,
ibp=ibp,
affine=affine,
finetune=model.finetune,
)
[docs]
def get_upper_loss(model: DecomonModel) -> Callable[[Tensor, Tensor], Tensor]:
ibp = model.ibp
affine = model.affine
mode = get_mode(ibp, affine)
perturbation_domain = model.perturbation_domain
n_comp = perturbation_domain.get_nb_x_components()
n_out = np.prod(model.output[-1].shape[1:])
def upper_ibp(u_c: Tensor, u_ref: Tensor) -> Tensor:
# minimize the upper bound compared to the reference
return K.max(u_c - u_ref, -1)
def upper_affine(x: Tensor, w_u: Tensor, b_u: Tensor, u_ref: Tensor) -> Tensor:
upper = perturbation_domain.get_upper(x, w_u, b_u)
return K.max(upper - u_ref, -1)
def loss_upper(y_true: Tensor, y_pred: Tensor) -> Tensor:
if mode == ForwardMode.IBP:
u_c = y_pred[:, :, 0]
elif mode == ForwardMode.AFFINE:
if len(y_pred.shape) == 3:
x_0 = K.transpose(y_pred[:, :-1, :n_comp], (0, 2, 1))
else:
x_0 = y_pred[:, :-1, 0]
w_u = y_pred[:, :-1, n_comp : n_comp + n_out]
b_u = y_pred[:, -1, n_comp : n_comp + n_out]
elif mode == ForwardMode.HYBRID:
if len(y_pred.shape) == 3:
x_0 = K.transpose(y_pred[:, :-2, :n_comp], (0, 2, 1))
else:
x_0 = y_pred[:, :-2, 0]
w_u = y_pred[:, :-2, n_comp : n_comp + n_out]
b_u = y_pred[:, -2, n_comp : n_comp + n_out]
u_c = y_pred[:, -1, n_comp : n_comp + n_out]
else:
raise ValueError(f"Unknown mode {mode}")
if ibp:
score_ibp = upper_ibp(u_c, y_true)
if affine:
score_affine = upper_affine(x_0, w_u, b_u, y_true)
if mode == ForwardMode.IBP:
return K.mean(score_ibp)
elif mode == ForwardMode.AFFINE:
return K.mean(score_affine)
elif mode == ForwardMode.HYBRID:
return K.mean(K.minimum(score_ibp, score_affine))
raise NotImplementedError()
return loss_upper
[docs]
def get_lower_loss(model: DecomonModel) -> Callable[[Tensor, Tensor], Tensor]:
ibp = model.ibp
affine = model.affine
mode = get_mode(ibp, affine)
perturbation_domain = model.perturbation_domain
n_comp = perturbation_domain.get_nb_x_components()
n_out = np.prod(model.output[-1].shape[1:])
def lower_ibp(l_c: Tensor, l_ref: Tensor) -> Tensor:
# minimize the upper bound compared to the reference
return K.max(l_ref - l_c, -1)
def lower_affine(x: Tensor, w_l: Tensor, b_l: Tensor, l_ref: Tensor) -> Tensor:
lower = perturbation_domain.get_lower(x, w_l, b_l)
return K.max(l_ref - lower, -1)
def loss_lower(y_true: Tensor, y_pred: Tensor) -> Tensor:
if mode == ForwardMode.IBP:
l_c = y_pred[:, :, 1]
elif mode == ForwardMode.AFFINE:
if len(y_pred.shape) == 3:
x_0 = K.transpose(y_pred[:, :-1, :n_comp], (0, 2, 1))
else:
x_0 = y_pred[:, :-1, 0]
w_l = y_pred[:, :-1, n_comp + n_out :]
b_l = y_pred[:, -1, n_comp + n_out :]
elif mode == ForwardMode.HYBRID:
if len(y_pred.shape) == 3:
x_0 = K.transpose(y_pred[:, :-2, :n_comp], (0, 2, 1))
else:
x_0 = y_pred[:, :-2, 0]
w_l = y_pred[:, :-2, n_comp + n_out :]
b_l = y_pred[:, -2, n_comp + n_out :]
l_c = y_pred[:, -1, n_comp + n_out :]
else:
raise ValueError(f"Unknown mode {mode}")
if ibp:
score_ibp = lower_ibp(l_c, y_true)
if affine:
score_affine = lower_affine(x_0, w_l, b_l, y_true)
if mode == ForwardMode.IBP:
return K.mean(score_ibp)
elif mode == ForwardMode.AFFINE:
return K.mean(score_affine)
elif mode == ForwardMode.HYBRID:
return K.mean(K.minimum(score_ibp, score_affine))
raise NotImplementedError()
return loss_lower
[docs]
def get_adv_loss(
model: DecomonModel, sigmoid: bool = False, clip_value: Optional[float] = None, softmax: bool = False
) -> Callable[[Tensor, Tensor], Tensor]:
ibp = model.ibp
affine = model.affine
mode = get_mode(ibp, affine)
perturbation_domain = model.perturbation_domain
n_comp = perturbation_domain.get_nb_x_components()
n_out = np.prod(model.output[-1].shape[1:])
def adv_ibp(u_c: Tensor, l_c: Tensor, y_tensor: Tensor) -> Tensor:
t_tensor: Tensor = 1 - y_tensor
s_tensor = y_tensor
t_tensor = t_tensor[:, :, None]
s_tensor = s_tensor[:, None, :]
M = t_tensor * s_tensor
upper = K.expand_dims(u_c, -1) - K.expand_dims(l_c, 1)
const = (K.max(upper, (-1, -2)) - K.min(upper, (-1, -2)))[:, None, None]
upper = upper - (const + K.cast(1, const.dtype)) * (1 - M)
return K.max(upper, (-1, -2))
def adv_affine(
x: Tensor,
w_u: Tensor,
b_u: Tensor,
w_l: Tensor,
b_l: Tensor,
y_tensor: Tensor,
) -> Tensor:
w_u_reshaped = K.expand_dims(w_u, -1)
w_l_reshaped = K.expand_dims(w_l, -2)
w_adv = w_u_reshaped - w_l_reshaped
b_adv = K.expand_dims(b_u, -1) - K.expand_dims(b_l, 1)
upper = perturbation_domain.get_upper(x, w_adv, b_adv)
t_tensor: Tensor = 1 - y_tensor
s_tensor = y_tensor
t_tensor = t_tensor[:, :, None]
s_tensor = s_tensor[:, None, :]
M = t_tensor * s_tensor
const = (K.max(upper, (-1, -2)) - K.min(upper, (-1, -2)))[:, None, None]
upper = upper - (const + K.cast(1, const.dtype)) * (1 - M)
return K.max(upper, (-1, -2))
def loss_adv(y_true: Tensor, y_pred: Tensor) -> Tensor:
if mode == ForwardMode.IBP:
u_c = y_pred[:, :, 0]
l_c = y_pred[:, :, 1]
if softmax:
u_c, l_c = softmax_([u_c, l_c], mode=mode, perturbation_domain=perturbation_domain, clip=False)
elif mode == ForwardMode.AFFINE:
if len(y_pred.shape) == 3:
x_0 = K.transpose(y_pred[:, :-1, :n_comp], (0, 2, 1))
else:
x_0 = y_pred[:, :-1, 0]
w_u = y_pred[:, :-1, n_comp : n_comp + n_out]
b_u = y_pred[:, -1, n_comp : n_comp + n_out]
w_l = y_pred[:, :-1, n_comp + n_out :]
b_l = y_pred[:, -1, n_comp + n_out :]
if softmax:
_, w_u, b_u, w_l, b_l = softmax_(
[x_0, w_u, b_u, w_l, b_l], mode=mode, perturbation_domain=perturbation_domain, clip=False
)
elif mode == ForwardMode.HYBRID:
if len(y_pred.shape) == 3:
x_0 = K.transpose(y_pred[:, :-2, :n_comp], (0, 2, 1))
else:
x_0 = y_pred[:, :-2, 0]
w_u = y_pred[:, :-2, n_comp : n_comp + n_out]
b_u = y_pred[:, -2, n_comp : n_comp + n_out]
w_l = y_pred[:, :-2, n_comp + n_out :]
b_l = y_pred[:, -2, n_comp + n_out :]
u_c = y_pred[:, -1, n_comp : n_comp + n_out]
l_c = y_pred[:, -1, n_comp + n_out :]
_, u_c, w_u, b_u, l_c, w_l, b_l = softmax_(
[x_0, u_c, w_u, b_u, l_c, w_l, b_l], mode=mode, perturbation_domain=perturbation_domain, clip=False
)
else:
raise ValueError(f"Unknown mode {mode}")
if ibp:
score_ibp = adv_ibp(u_c, l_c, y_true)
if clip_value is not None:
score_ibp = K.maximum(score_ibp, K.cast(clip_value, dtype=score_ibp.dtype))
if affine:
score_affine = adv_affine(x_0, w_u, b_u, w_l, b_l, y_true)
if clip_value is not None:
score_affine = K.maximum(score_affine, K.cast(clip_value, dtype=score_affine.dtype))
if mode == ForwardMode.IBP:
if sigmoid:
return K.mean(K.sigmoid(score_ibp))
else:
return K.mean(score_ibp)
elif mode == ForwardMode.AFFINE:
if sigmoid:
return K.mean(K.sigmoid(score_affine))
else:
return K.mean(score_affine)
elif mode == ForwardMode.HYBRID:
if sigmoid:
return K.mean(K.sigmoid(K.minimum(score_ibp, score_affine)))
else:
return K.mean(K.minimum(score_ibp, score_affine))
raise NotImplementedError()
return loss_adv
def _create_identity_tensor_like(x: Tensor) -> BackendTensor:
identity_tensor = K.identity(x.shape[-1])
n_repeat = int(np.prod(x.shape[:-1]))
return K.reshape(K.repeat(identity_tensor[None], n_repeat, axis=0), tuple(x.shape) + (-1,))
# create a layer
[docs]
class DecomonLossFusion(DecomonLayer):
original_keras_layer_class = Layer
def __init__(
self,
asymptotic: bool = False,
backward: bool = False,
perturbation_domain: Optional[PerturbationDomain] = None,
dc_decomp: bool = False,
mode: Union[str, ForwardMode] = ForwardMode.HYBRID,
finetune: bool = False,
shared: bool = False,
fast: bool = True,
**kwargs: Any,
):
super().__init__(
perturbation_domain=perturbation_domain,
dc_decomp=dc_decomp,
mode=mode,
finetune=finetune,
shared=shared,
fast=fast,
**kwargs,
)
self.convert2mode_layer = Convert2Mode(
mode_from=mode,
mode_to=ForwardMode.IBP,
perturbation_domain=self.perturbation_domain,
)
self.asymptotic = asymptotic
self.backward = backward
[docs]
def get_config(self) -> dict[str, Any]:
config = super().get_config()
config.update(
{
"asymptotic": self.asymptotic,
"backward": self.backward,
}
)
return config
[docs]
def call_no_backward(self, inputs: list[BackendTensor], **kwargs: Any) -> BackendTensor:
if not self.asymptotic:
u_c, l_c = self.convert2mode_layer(inputs)
return -l_c + K.log(K.sum(K.exp(u_c - K.max(u_c, -1)[:, None]), -1))[:, None] + K.max(u_c, -1)[:, None]
else:
u_c, l_c = self.convert2mode_layer(inputs)
shape = u_c.shape[-1]
def adv_ibp(u_c: BackendTensor, l_c: BackendTensor, y_tensor: BackendTensor) -> BackendTensor:
t_tensor = 1 - y_tensor
s_tensor = y_tensor
t_tensor = t_tensor[:, :, None]
s_tensor = s_tensor[:, None, :]
M = t_tensor * s_tensor
upper = K.expand_dims(u_c, -1) - K.expand_dims(l_c, 1)
const = (K.max(upper, (-1, -2)) - K.min(upper, (-1, -2)))[:, None, None]
upper = upper - (const + K.cast(1, const.dtype)) * (1 - M)
return K.max(upper, (-1, -2))
source_tensor = _create_identity_tensor_like(l_c)
score = K.concatenate([adv_ibp(u_c, l_c, source_tensor[:, i])[:, None] for i in range(shape)], -1)
return K.maximum(
score, K.cast(-1, dtype=score.dtype)
) # + 1e-3*K.maximum(K.max(K.abs(u_c), -1)[:,None], K.abs(l_c))
[docs]
def call_backward(self, inputs: list[BackendTensor], **kwargs: Any) -> BackendTensor:
if not self.asymptotic:
u_c, l_c = self.convert2mode_layer(inputs)
return K.softmax(u_c)
else:
raise NotImplementedError()
[docs]
def call(self, inputs: list[BackendTensor], **kwargs: Any) -> BackendTensor:
if self.backward:
return self.call_backward(inputs, **kwargs)
else:
return self.call_no_backward(inputs, **kwargs)
[docs]
def compute_output_shape(self, input_shape: list[tuple[Optional[int], ...]]) -> tuple[Optional[int], ...]: # type: ignore
return input_shape[-1]
[docs]
def build(self, input_shape: list[tuple[Optional[int], ...]]) -> None:
return None
# new layer for new loss functions
[docs]
class DecomonRadiusRobust(DecomonLayer):
original_keras_layer_class = Layer
def __init__(
self,
backward: bool = False,
perturbation_domain: Optional[PerturbationDomain] = None,
dc_decomp: bool = False,
mode: Union[str, ForwardMode] = ForwardMode.HYBRID,
finetune: bool = False,
shared: bool = False,
fast: bool = True,
**kwargs: Any,
):
super().__init__(
perturbation_domain=perturbation_domain,
dc_decomp=dc_decomp,
mode=mode,
finetune=finetune,
shared=shared,
fast=fast,
**kwargs,
)
if self.mode == ForwardMode.IBP:
raise NotImplementedError
if not isinstance(self.perturbation_domain, BoxDomain):
raise NotImplementedError()
self.backward = backward
[docs]
def get_config(self) -> dict[str, Any]:
config = super().get_config()
config.update(
{
"backward": self.backward,
}
)
return config
[docs]
def call_no_backward(self, inputs: list[BackendTensor], **kwargs: Any) -> BackendTensor:
if self.mode == ForwardMode.HYBRID:
x, _, w_u, b_u, _, w_l, b_l = inputs
else:
x, w_u, b_u, w_l, b_l = inputs
# compute center
x_0 = (x[:, 0] + x[:, 1]) / 2.0
radius = K.maximum((x[:, 1] - x[:, 0]) / 2.0, K.cast(epsilon(), dtype=x.dtype))
source_tensor = _create_identity_tensor_like(b_l)
shape = b_l.shape[-1]
def radius_label(y_tensor: BackendTensor, backward: bool = False) -> BackendTensor:
t_tensor = 1 - y_tensor
s_tensor = y_tensor
W_adv = (
K.sum(-w_l * (s_tensor[:, None]), -1, keepdims=True) + w_u * t_tensor[:, None] + w_l * y_tensor[:, None]
) # (None, n_in, n_out)
b_adv = K.sum(-b_l * s_tensor, -1, keepdims=True) + b_u * t_tensor + (b_l - 1e6) * y_tensor # (None, n_out)
score = K.sum(W_adv * x_0[:, :, None], 1) + b_adv # (None, n_out)
denum = K.maximum(
K.sum(K.abs(W_adv * radius[:, :, None]), 1), K.cast(epsilon(), dtype=W_adv.dtype)
) # (None, n_out)
eps_adv = K.minimum(-score / denum + y_tensor, K.cast(2.0, dtype=score.dtype))
adv_volume = 1.0 - eps_adv
return K.max(adv_volume, -1, keepdims=True)
return K.concatenate([radius_label(source_tensor[:, i]) for i in range(shape)], -1)
[docs]
def call_backward(self, inputs: list[BackendTensor], **kwargs: Any) -> BackendTensor:
if self.mode == ForwardMode.HYBRID:
x, _, w_u, b_u, _, w_l, b_l = inputs
else:
x, w_u, b_u, w_l, b_l = inputs
# compute center
x_0 = (x[:, 0] + x[:, 1]) / 2.0
radius = K.maximum((x[:, 1] - x[:, 0]) / 2.0, K.cast(epsilon(), dtype=x.dtype))
source_tensor = _create_identity_tensor_like(b_l)
shape = b_l.shape[-1]
def radius_label(y_tensor: BackendTensor) -> BackendTensor:
W_adv = w_u
b_adv = b_u - 1e6 * y_tensor
score = K.sum(W_adv * x_0[:, :, None], 1) + b_adv # (None, n_out)
denum = K.maximum(
K.sum(K.abs(W_adv * radius[:, :, None]), 1), K.cast(epsilon(), dtype=W_adv.dtype)
) # (None, n_out)
eps_adv = K.minimum(-score / denum + y_tensor, K.cast(2.0, dtype=score.dtype))
adv_volume = 1.0 - eps_adv
return K.max(adv_volume, -1, keepdims=True)
return K.concatenate([radius_label(source_tensor[:, i]) for i in range(shape)], -1)
[docs]
def call(self, inputs: list[BackendTensor], **kwargs: Any) -> BackendTensor:
if self.backward:
return self.call_backward(inputs, **kwargs)
else:
return self.call_no_backward(inputs, **kwargs)
[docs]
def compute_output_shape(self, input_shape: Union[list[tuple[Optional[int], ...]],]) -> tuple[Optional[int], ...]: # type: ignore
return input_shape[-1]
[docs]
def build(self, input_shape: list[tuple[Optional[int], ...]]) -> None:
return None
[docs]
def build_radius_robust_model(model: DecomonModel) -> DecomonModel:
ibp = model.ibp
affine = model.affine
mode = get_mode(ibp, affine)
perturbation_domain = model.perturbation_domain
inputs = model.input
output = model.output
layer_robust = DecomonRadiusRobust(
mode=mode, perturbation_domain=perturbation_domain, backward=model.backward_bounds
)
output_robust = layer_robust(output)
return DecomonModel(
inputs=inputs,
outputs=output_robust,
perturbation_domain=model.perturbation_domain,
ibp=ibp,
affine=affine,
finetune=model.finetune,
)
##### DESIGN LOSS FUNCTIONS
[docs]
def build_crossentropy_model(model: DecomonModel) -> DecomonModel:
ibp = model.ibp
affine = model.affine
mode = get_mode(ibp, affine)
perturbation_domain = model.perturbation_domain
inputs = model.input
output = model.output
layer_fusion = DecomonLossFusion(mode=mode, backward=model.backward_bounds)
output_fusion = layer_fusion(output, mode=mode)
return DecomonModel(
inputs=inputs,
outputs=output_fusion,
perturbation_domain=model.perturbation_domain,
ibp=ibp,
affine=affine,
finetune=model.finetune,
)
##### DESIGN LOSS FUNCTIONS
[docs]
def build_asymptotic_crossentropy_model(model: DecomonModel) -> DecomonModel:
ibp = model.ibp
affine = model.affine
mode = get_mode(ibp, affine)
perturbation_domain = model.perturbation_domain
inputs = model.input
output = model.output
layer_fusion = DecomonLossFusion(mode=mode, asymptotic=True)
output_fusion = layer_fusion(output, mode=mode)
return DecomonModel(
inputs=inputs,
outputs=output_fusion,
perturbation_domain=model.perturbation_domain,
ibp=ibp,
affine=affine,
finetune=model.finetune,
)