#
# SPDX-License-Identifier: LGPL-3.0-or-later
# Copyright (c) 2024-2025, QUEENS contributors.
#
# This file is part of QUEENS.
#
# QUEENS is free software: you can redistribute it and/or modify it under the terms of the GNU
# Lesser General Public License as published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version. QUEENS is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You
# should have received a copy of the GNU Lesser General Public License along with QUEENS. If not,
# see <https://www.gnu.org/licenses/>.
#
"""Fourier Random fields class."""
from typing import TypeAlias
import numpy as np
from numpy.typing import ArrayLike
from scipy.spatial import ConvexHull # pylint: disable=no-name-in-module
from scipy.spatial.distance import pdist
from queens.distributions.mean_field_normal import MeanFieldNormal
from queens.parameters.parameters import HasGradLogPDF
from queens.parameters.random_fields._random_field import RandomField
DimensionMethods: TypeAlias = (
type["DimensionMethods1D"] | type["DimensionMethods2D"] | type["DimensionMethods3D"]
)
[docs]
class Fourier(RandomField):
"""FOURIER expansion of random fields class.
Attributes:
mean: Mean vector at nodes
std: Hyperparameter for standard-deviation of random field
corr_length: Hyperparameter for the correlation length
variability: Explained variance by the fourier decomposition
trunc_threshold: Truncation threshold for Fourier series
basis_dimension: Dimension of the complete Fourier basis up to the truncation threshold
(not the latent space)
latent_index: Index array mapping latent space variables to covariance values
covariance_index: Array indexing the covariance values below the truncation threshold
covariance: Fourier transformed covariance kernel
basis: Inverse cosine transformed fourier basis
coordinates: Vector of all coordinates in random field
field_dimension: Physical dimension of the random field
number_expansion_terms: Number of frequencies in all directions
dimension: Dimension of latent space
convex_hull_size: Euclidean distance between furthest apart coordinates in the field
"""
[docs]
def __init__(
self,
coords: dict,
mean: ArrayLike = 0.0,
std: float = 1.0,
corr_length: float = 0.3,
variability: float = 0.98,
trunc_threshold: int = 64,
):
"""Initialize Fourier object.
Args:
coords: Dictionary with coordinates of discretized random field and the corresponding
keys
mean: Mean vector at nodes
std: Hyperparameter for standard-deviation of random field
corr_length: Hyperparameter for the correlation length
variability: Explained variance of by the eigen
trunc_threshold: Truncation threshold for Fourier series.
"""
self.trunc_threshold = trunc_threshold
coords = self._convert_coords_to_2d_array(coords)
self.field_dimension = coords["coords"].shape[1]
dimension_methods_class: DimensionMethods
if self.field_dimension == 1:
dimension_methods_class = DimensionMethods1D
elif self.field_dimension == 2:
dimension_methods_class = DimensionMethods2D
elif self.field_dimension == 3:
dimension_methods_class = DimensionMethods3D
else:
raise ValueError("Only 1D, 2D or 3D fields are supported by Fourier expansion")
self.number_expansion_terms = int(np.sqrt(self.trunc_threshold)) + 1
(
self.covariance_index,
self.latent_index,
self.basis_dimension,
dimension,
) = dimension_methods_class.get_dim(self.trunc_threshold, self.number_expansion_terms)
distribution = MeanFieldNormal(mean=0, variance=1, dimension=dimension)
super().__init__(coords, distribution, dimension)
self.coordinates = self.coords["coords"]
self.mean = mean
self.std = std
self.corr_length = corr_length
self.variability = variability
# find max length in coords
if self.field_dimension == 1:
self.convex_hull_size = self.coordinates.max() - self.coordinates.min()
else:
convex_hull = ConvexHull(self.coordinates)
hull_coords = self.coordinates[convex_hull.vertices]
self.convex_hull_size = pdist(hull_coords).max()
if self.corr_length / self.convex_hull_size > 0.35:
raise ValueError("Correlation length too large, not a good approximation.")
self.covariance = dimension_methods_class.calculate_covariance(
self.number_expansion_terms, self.corr_length, self.convex_hull_size
)
self.check_convergence()
self.basis = dimension_methods_class.calculate_basis(
self.coordinates,
self.basis_dimension,
self.number_expansion_terms,
self.convex_hull_size,
self.covariance,
self.latent_index,
)
[docs]
def draw(self, num_samples: int) -> np.ndarray:
"""Draw samples from the latent representation of the random field.
Args:
num_samples: Number of draws of latent random samples
Returns:
Drawn samples
"""
return self.distribution.draw(num_samples)
[docs]
def logpdf(self, samples: np.ndarray) -> np.ndarray:
"""Get joint log-PDF of latent space.
Args:
samples: Samples for evaluating the log-PDF
Returns:
Log-PDF of the samples
"""
logpdf = self.distribution.logpdf(samples)
return logpdf
[docs]
def grad_logpdf(self, samples: np.ndarray) -> np.ndarray:
"""Get gradient of joint log-PDF of latent space.
Args:
samples: Samples for evaluating the gradient of the log-PDF
Returns:
Gradient of the log-PDF
"""
if not isinstance(self.distribution, HasGradLogPDF):
raise TypeError(
f"The distribution {self.distribution} does not have a grad_logpdf function."
)
return self.distribution.grad_logpdf(samples)
[docs]
def expanded_representation(self, samples: np.ndarray) -> np.ndarray:
"""Expand latent representation of samples.
Args:
samples: Latent representation of samples
Returns:
Expanded representation of samples
"""
sample_expanded = self.mean + self.std * np.matmul(samples, self.basis.T)
return sample_expanded
[docs]
def latent_gradient(self, upstream_gradient: np.ndarray) -> np.ndarray:
"""Gradient with respect to the latent parameters.
Args:
upstream_gradient: Gradient with respect to all coords of the field
Returns:
Gradient of the realization of the random field with respect to the latent space
variables
"""
latent_grad = self.std * np.matmul(upstream_gradient, self.basis)
return latent_grad
[docs]
def check_convergence(self) -> None:
"""Check if truncated terms converge to variability."""
if np.sum(self.covariance[self.covariance_index]) < self.variability:
raise ValueError("Variability not covered, increase number of expansion terms")
[docs]
class DimensionMethods1D:
"""1D FOURIER expansion helper methods."""
[docs]
@staticmethod
def get_dim(
trunc_threshold: int, number_expansion_terms: int
) -> tuple[np.ndarray, np.ndarray, int, int]:
"""Calculate dimension of latent space.
Args:
trunc_threshold: Truncation threshold
number_expansion_terms: Number of frequencies in the expansion
Returns:
Array indexing the covariance values below the truncation limit
Array indexing the basis terms corresponding to valid covariance values
Dimension of the complete Fourier basis up to the truncation threshold
Dimension of the latent space
"""
basis_dimension = int(2 * number_expansion_terms)
wave_numbers = (
np.linspace(0, number_expansion_terms - 1, number_expansion_terms, dtype=int) ** 2
)
dim_one_wave_numbers = wave_numbers
index = dim_one_wave_numbers
covariance_index = index <= trunc_threshold
latent_index = np.kron(covariance_index, np.ones(shape=(2,))).astype(bool)
dimension = np.sum(latent_index, dtype=int)
return covariance_index, latent_index, basis_dimension, dimension
[docs]
@staticmethod
def calculate_covariance(
number_expansion_terms: int, corr_length: float, convex_hull_size: float
) -> np.ndarray:
"""Calculate discrete fourier transform of the covariance kernel.
Based on the kernel description of the random field, build its
covariance matrix using the external geometry and coordinates.
Args:
number_expansion_terms: Number of frequencies
corr_length: Typical length in the field
convex_hull_size: Max distance on the grid
Returns:
Cosine transform of covariance matrix
"""
c_k = np.linspace(0, number_expansion_terms - 1, number_expansion_terms)
c_k = (
corr_length
* np.sqrt(np.pi)
/ convex_hull_size
* np.exp(-((c_k * np.pi * corr_length) ** 2) / (2 * convex_hull_size) ** 2)
)
c_k[0] = corr_length * np.sqrt(np.pi) / (2 * convex_hull_size)
covariance = np.sqrt(c_k)
return covariance
[docs]
@staticmethod
def calculate_basis(
coordinates: np.ndarray,
basis_dimension: int,
number_expansion_terms: int,
convex_hull_size: float,
covariance: np.ndarray,
index: np.ndarray,
) -> np.ndarray:
"""Calculate the fourier basis.
Args:
coordinates: Vector with coordinates of field
basis_dimension: Dimension of the complete Fourier basis (not the latent space)
number_expansion_terms: Number of frequencies
convex_hull_size: Maximum length on the mesh
covariance: Transform of covariance matrix
index: Array indexing valid basis terms in accordance to the truncation threshold
Returns:
Transformed and truncated fourier basis
"""
basis = np.zeros(shape=(coordinates.shape[0], basis_dimension))
k = (
np.linspace(0, number_expansion_terms - 1, number_expansion_terms)
* np.pi
/ convex_hull_size
)
arguements = np.outer(coordinates, k)
cos_terms = np.multiply(np.cos(arguements), covariance)
sin_terms = np.multiply(np.sin(arguements), covariance)
basis[:, 0::2] = cos_terms
basis[:, 1::2] = sin_terms
return basis[:, index]
[docs]
class DimensionMethods2D:
"""2D FOURIER expansion helper methods."""
[docs]
@staticmethod
def get_dim(
trunc_threshold: int, number_expansion_terms: int
) -> tuple[np.ndarray, np.ndarray, int, int]:
"""Calculate dimension of latent space.
Args:
trunc_threshold: Truncation threshold
number_expansion_terms: Number of frequencies in the expansion
Returns:
Array indexing the covariance values below the truncation limit
Array indexing the basis terms corresponding to valid covariance values
Dimension of the complete Fourier basis up to the truncation threshold
Dimension of the latent space
"""
basis_dimension = int(4 * (number_expansion_terms) ** 2)
wave_numbers = (
np.linspace(0, number_expansion_terms - 1, number_expansion_terms, dtype=int) ** 2
)
dim_one_wave_numbers = np.kron(wave_numbers, np.ones(shape=(number_expansion_terms,)))
dim_two_wave_numbers = np.kron(np.ones(shape=(number_expansion_terms,)), wave_numbers)
index = dim_one_wave_numbers + dim_two_wave_numbers
covariance_index = index <= trunc_threshold
latent_index = np.kron(covariance_index, np.ones(shape=(4,))).astype(bool)
dimension = np.sum(latent_index, dtype=int)
return covariance_index, latent_index, basis_dimension, dimension
[docs]
@staticmethod
def calculate_covariance(
number_expansion_terms: int, corr_length: float, convex_hull_size: float
) -> np.ndarray:
"""Calculate discrete fourier transform of the covariance kernel.
Based on the kernel description of the random field, build its
covariance matrix using the external geometry and coordinates.
Args:
number_expansion_terms: Number of frequencies
corr_length: Typical length in the field
convex_hull_size: Max distance on the grid
Returns:
Cosine transform of covariance matrix
"""
c_k = np.linspace(0, number_expansion_terms - 1, number_expansion_terms)
c_k = (
corr_length
* np.sqrt(np.pi)
/ convex_hull_size
* np.exp(-((c_k * np.pi * corr_length) ** 2) / (2 * convex_hull_size) ** 2)
)
c_k[0] = corr_length * np.sqrt(np.pi) / (2 * convex_hull_size)
cov_vector = np.kron(c_k, c_k)
covariance = np.sqrt(cov_vector)
return covariance
[docs]
@staticmethod
def calculate_basis(
coordinates: np.ndarray,
basis_dimension: int,
number_expansion_terms: int,
convex_hull_size: float,
covariance: np.ndarray,
index: np.ndarray,
) -> np.ndarray:
"""Calculate the fourier basis.
Args:
coordinates: Vector with coordinates of field
basis_dimension: Dimension of the complete Fourier basis (not the latent space)
number_expansion_terms: Number of frequencies
convex_hull_size: Maximum length on the mesh
covariance: Transform of covariance matrix
index: Array indexing valid basis terms in accordance to the truncation threshold
Returns:
Transformed and truncated fourier basis
"""
basis = np.zeros(shape=(coordinates.shape[0], basis_dimension))
k = (
np.linspace(0, number_expansion_terms - 1, number_expansion_terms)
* np.pi
/ convex_hull_size
)
arguements = [np.outer(coordinates[:, 0], k), np.outer(coordinates[:, 1], k)]
cosine_x0 = np.cos(arguements[0])
cosine_x1 = np.cos(arguements[1])
sin_x0 = np.sin(arguements[0])
sin_x1 = np.sin(arguements[1])
terms = np.zeros(shape=(4, len(coordinates), len(covariance)))
for i in range(len(coordinates)):
terms[:, i, :] = np.array(
[
np.kron(cosine_x0[i, :], cosine_x1[i, :]),
np.kron(sin_x0[i, :], sin_x1[i, :]),
np.kron(cosine_x0[i, :], sin_x1[i, :]),
np.kron(sin_x0[i, :], cosine_x1[i, :]),
]
)
terms = np.multiply(terms, covariance)
basis[:, 0::4] = terms[0]
basis[:, 1::4] = terms[1]
basis[:, 2::4] = terms[2]
basis[:, 3::4] = terms[3]
return basis[:, index]
[docs]
class DimensionMethods3D:
"""3D FOURIER expansion helper methods."""
[docs]
@staticmethod
def get_dim(
trunc_threshold: int, number_expansion_terms: int
) -> tuple[np.ndarray, np.ndarray, int, int]:
"""Calculate dimension of latent space.
Args:
trunc_threshold: Truncation threshold
number_expansion_terms: Number of frequencies in the expansion
Returns:
Array indexing the covariance values below the truncation limit
Array indexing the basis terms corresponding to valid covariance values
Dimension of the complete Fourier basis up to the truncation threshold
Dimension of the latent space
"""
basis_dimension = int(8 * (number_expansion_terms) ** 3)
wave_numbers = (
np.linspace(0, number_expansion_terms - 1, number_expansion_terms, dtype=int) ** 2
)
dim_one_wave_numbers = np.kron(wave_numbers, np.ones(shape=(number_expansion_terms**2,)))
dim_two_wave_numbers = np.kron(
np.kron(np.ones(shape=(number_expansion_terms,)), wave_numbers),
np.ones(shape=(number_expansion_terms,)),
)
dim_three_wave_numbers = np.kron(np.ones(shape=(number_expansion_terms**2,)), wave_numbers)
index = dim_one_wave_numbers + dim_two_wave_numbers + dim_three_wave_numbers
covarance_index = index <= trunc_threshold
latent_index = np.kron(covarance_index, np.ones(shape=(8,))).astype(bool)
dimension = np.sum(latent_index, dtype=int)
return covarance_index, latent_index, basis_dimension, dimension
[docs]
@staticmethod
def calculate_covariance(
number_expansion_terms: int, corr_length: float, convex_hull_size: float
) -> np.ndarray:
"""Calculate discrete fourier transform of the covariance kernel.
Based on the kernel description of the random field, build its
covariance matrix using the external geometry and coordinates.
Args:
number_expansion_terms: Number of frequencies
corr_length: Typical length in the field
convex_hull_size: Max distance on the grid
Returns:
Cosine transform of covariance matrix
"""
c_k = np.linspace(0, number_expansion_terms - 1, number_expansion_terms)
c_k = (
corr_length
* np.sqrt(np.pi)
/ convex_hull_size
* np.exp(-((c_k * np.pi * corr_length) ** 2) / (2 * convex_hull_size) ** 2)
)
c_k[0] = corr_length * np.sqrt(np.pi) / (2 * convex_hull_size)
cov_vector = np.kron(c_k, c_k)
cov_vector3d = np.kron(c_k, cov_vector)
covariance = np.sqrt(cov_vector3d)
return covariance
[docs]
@staticmethod
def calculate_basis(
coordinates: np.ndarray,
basis_dimension: int,
number_expansion_terms: int,
convex_hull_size: float,
covariance: np.ndarray,
index: np.ndarray,
) -> np.ndarray:
"""Calculate the fourier basis.
Args:
coordinates: Vector with coordinates of field
basis_dimension: Dimension of the complete Fourier basis (not the latent space)
number_expansion_terms: Number of frequencies
convex_hull_size: Maximum length on the mesh
covariance: Transform of covariance matrix
index: Array indexing valid basis terms in accordance to the truncation threshold
Returns:
Transformed and truncated fourier basis
"""
basis = np.zeros(shape=(coordinates.shape[0], basis_dimension))
k = (
np.linspace(0, number_expansion_terms - 1, number_expansion_terms)
* np.pi
/ convex_hull_size
)
arguements = [
np.outer(coordinates[:, 0], k),
np.outer(coordinates[:, 1], k),
np.outer(coordinates[:, 2], k),
]
cosine_x0 = np.cos(arguements[0])
cosine_x1 = np.cos(arguements[1])
cosine_x2 = np.cos(arguements[2])
sin_x0 = np.sin(arguements[0])
sin_x1 = np.sin(arguements[1])
sin_x2 = np.sin(arguements[2])
terms = np.zeros(shape=(8, len(coordinates), len(covariance)))
for i in range(len(coordinates)):
terms[:, i, :] = np.array(
[
np.kron(
np.kron(cosine_x0[i, :], cosine_x1[i, :]),
cosine_x2[i, :],
),
np.kron(
np.kron(sin_x0[i, :], sin_x1[i, :]),
cosine_x2[i, :],
),
np.kron(
np.kron(cosine_x0[i, :], sin_x1[i, :]),
cosine_x2[i, :],
),
np.kron(
np.kron(sin_x0[i, :], cosine_x1[i, :]),
cosine_x2[i, :],
),
np.kron(
np.kron(cosine_x0[i, :], cosine_x1[i, :]),
sin_x2[i, :],
),
np.kron(
np.kron(sin_x0[i, :], sin_x1[i, :]),
sin_x2[i, :],
),
np.kron(
np.kron(cosine_x0[i, :], sin_x1[i, :]),
sin_x2[i, :],
),
np.kron(
np.kron(sin_x0[i, :], cosine_x1[i, :]),
sin_x2[i, :],
),
]
)
terms = np.multiply(terms, covariance)
basis[:, 0::8] = terms[0]
basis[:, 1::8] = terms[1]
basis[:, 2::8] = terms[2]
basis[:, 3::8] = terms[3]
basis[:, 4::8] = terms[4]
basis[:, 5::8] = terms[5]
basis[:, 6::8] = terms[6]
basis[:, 7::8] = terms[7]
return basis[:, index]