# -*- coding: utf-8 -*-
import os
from mpi4py import MPI
if MPI.COMM_WORLD.Get_size() > 1: # pragma: no cover
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
import numpy as np
import os.path as osp
from scipy import sparse
from Mordicus.Core.Containers import ProblemData
from mpi4py import MPI
[docs]class NoAxisDuplicateDict(dict):
"""
A dict imposing that ('a', 'b') is invalid key if 'a' is a existing key
"""
def __setitem__(self, key, value):
"""
Set self[key] to value
"""
# make an iterable from 'key'
if isinstance(key, (str, bytes)):
it = [key]
# test compatibility
for k in self.keys():
for ke in it:
try:
itk = iter(k)
if any([ke == i for i in itk]):#pragma: no cover
raise ValueError("Parameter {0} is already handled with key {1}".format(ke, k))
except TypeError:#pragma: no cover
if ke == k:
raise ValueError("Parameter {0} is already handled with key {1}".format(ke, k))
return dict.__setitem__(self, key, value)
[docs]class QuantityDefinitionDict(dict):
"""Override OrderedDict to implement visitor design pattern"""
[docs] def accept(self, visitor):
"""
Accept visitor
"""
return visitor.visitQuantityDefinitionDict(self)
[docs]class VariabilityDefinitionDict(dict):
"""Override OrderedDict to implement visitor design pattern"""
[docs] def accept(self, visitor):
"""
Accept visitor
"""
return visitor.visitVariabilityDefinitionDict(self)
[docs]class CollectionProblemData(object):
"""
Class containing a set of collection of problemData
Attributes
----------
problemDatas : dict(str: ProblemData)
dictionary with problem tags (str) as keys and problemDatas (ProblemData) as values
reducedOrderBases : dict(str: np.ndarray)
dictionary with solutionNames (str) as keys and reducedOrderBases (np.ndarray of size (numberOfModes, numberOfDOFs)) as values
solutionStructures : dict(str: SolutionStructure)
dictionary with solutionNames (str) as keys and SolutionStructure as values
variabilityDefinition : dict(str: dict)
defines the problem variability, dictionary with variability names (typically, names of parameters) as keys
variabilitySupport : dict(str or tuple: ndarray)
defines the allowed values for parameters (definition domain), dictionary with the parameter sampling as values
quantityDefinition : dict(str: (str,str))
defines a new quantity for results, dictionary with solutionNames (str) as keys and a couple for strings (solution description, unit) as values
templateDataset : SolverDataset
dataset to solve the HF problem for a new set of parameter values, contains a input_instruction_file with keywords to replace
reducedTemplateDataset : SolverDataset
dataset to solve the reduced problem for a new set of parameter values, contains a input_instruction_file with keywords to replace
specificDatasets : dict(str: SolverDataset)
dictonary with the name of the operation as key and corresponding SolverDataset as value
dataCompressionData : dict(str: custom_data_structure)
dictionary with solutionNames (str) as keys and data structure generated by the data compression step as values
operatorCompressionData : dict(str: custom_data_structure)
dictionary with solutionNames (str) as keys and data structure generated by the operator compression step as values
"""
def __init__(self):
self.problemDatas = {}
self.reducedOrderBases = {}
self.solutionStructures = {}
self.variabilityDefinition = VariabilityDefinitionDict()
self.variabilitySupport = NoAxisDuplicateDict()
self.quantityDefinition = QuantityDefinitionDict()
self.templateDataset = None
self.reducedTemplateDataset = None
self.specificDatasets = {}
self.dataCompressionData = {}
self.operatorCompressionData = {}
[docs] def AddReducedOrderBasis(self, solutionName, reducedOrderBasis):
"""
Adds a reducedOrderBasis corresponding to solution solutionName
Parameters
----------
solutionName : str
name of the solution for which reducedOrderBasis is a reduced order basis
reducedOrderBasis : np.ndarray
of size (numberOfModes, numberOfDOFs)
"""
assert isinstance(solutionName, str), "solutionName must be of type string"
assert (
isinstance(reducedOrderBasis, np.ndarray)
and len(reducedOrderBasis.shape) == 2
), "reducedOrderBasis must be a 2D np.ndarray"
self._CheckSolutionName(solutionName)
self.reducedOrderBases[solutionName] = reducedOrderBasis
[docs] def GetReducedOrderBasis(self, solutionName):
"""
Get a precomputed reducedOrderBases for solution solutionName
Parameters
----------
solutionName : str
name of the solution for which the reducedOrderBases is retrieved
Returns
-------
np.ndarray
of size (numberOfModes, numberOfDOFs)
"""
assert isinstance(solutionName, str), "name must be of type string"
if solutionName not in self.reducedOrderBases:
return None # pragma: no cover
else:
return self.reducedOrderBases[solutionName]
[docs] def GetReducedOrderBases(self):
"""
Get the dictionary of precomputed reducedOrderBases
Returns
-------
reducedOrderBases : dict
dictionary with solutionNames (str) as keys and reducedOrderBases (np.ndarray of size (numberOfModes, numberOfDOFs)) as values
"""
return self.reducedOrderBases
[docs] def GetReducedOrderBasisNumberOfModes(self, solutionName):
"""
Get the number of modes of a precomputed reducedOrderBases for solution solutionName
Parameters
----------
solutionName : str
name of the solution for which the reducedOrderBases is retrieved
Returns
-------
int :
numberOfModes
"""
assert isinstance(solutionName, str), "name must be of type string"
if solutionName not in self.reducedOrderBases:
raise RuntimeError(
"You must compute a reducedOrderBasis for solution named "
+ solutionName
+ " before trying to retrieve it"
) # pragma: no cover
return self.GetReducedOrderBasis(solutionName).shape[0]
[docs] def DefineVariabilityAxes(self, names, types, quantities=None, descriptions=None):
"""
Sets the axes of variability: names to be served as keys
to collect ProblemData instances and documenting properties
Parameters
----------
names : tuple
names of axes, to be served as keys
types : type
type of the expected values for this axis
quantities : iterable
(optional) describes the physical quantity of the parameter. For now an iterable of tuples (full name, unit)
descriptions : iterable
(optional) describes the role of the axis of variability
"""
qlist = [None]*len(names) if quantities is None else list(quantities)
dlist = [None]*len(names) if descriptions is None else list(descriptions)
if not all(l==len(names) for l in (len(list(types)), len(qlist), len(dlist))):
raise IndexError("Not all arguments to DefineVariabilityAxes have the same size") # pragma: no cover
for varname, vartype, quantity, description in zip(names, types, qlist, dlist):
self.AddVariabilityAxis(varname, vartype, quantity=quantity, description=description)
[docs] def AddVariabilityAxis(self, varname, vartype, **kwargs):
"""
Sets the axes of variability: names to be served as keys
to collect ProblemData instances and documenting properties
Parameters
----------
varname : str
name of the axis, to be served as keys
vartype : type
type of the expected values for this axis
quantity : tuple
(optional) describes the physical quantity of the parameter. For now a tuple (full name, unit)
description : str
(optional) describes the role of the axis of variability
"""
if varname in self.variabilityDefinition:
raise KeyError("Variability axis {} already exists in your CollectionProblemData".format(varname)) # pragma: no cover
self.variabilityDefinition[varname] = {'type': vartype}
for opt in ('quantity', 'description'):
if opt in kwargs and kwargs[opt] is not None:
self.variabilityDefinition[varname][opt] = kwargs[opt]
[docs] def DefineQuantity(self, key, full_name="", unit=""):
"""
Define a new quantity for results
Parameters
----------
key : str
Key to be served in ProblemData to retrieve the result
full_name : str
Full name of the quantity, e.g. "velocity"
unit : str
Measuring unit, e.g. "meter"
"""
if not isinstance(key, str):
raise TypeError("Quantity identifier should be a str") # pragma: no cover
self.quantityDefinition[key] = (full_name, unit)
[docs] def GetNumberOfVariabilityAxes(self):
"""
Gets the number of variability axes.
Returns
-------
int
number of variability axes
"""
return len(self.variabilityDefinition)
def _CheckPointInParameterSpace(self, **kwargs):
"""
Checks keys and values provided as parameter point.
Raises
------
ValueError
if the number of components provided is wrong
KeyError
if some keyword argument does not match any known axis of variability
TypeError
if some value has the wrong type
"""
if len(kwargs) != self.GetNumberOfVariabilityAxes():
raise ValueError("Provided point in parameter space has {0} components, {1} expected".format(len(kwargs), self.GetNumberOfVariabilityAxes())) # pragma: no cover
for k, v in kwargs.items():
if k not in self.variabilityDefinition:
raise KeyError("{} is not a defined axis of variability".format(k)) # pragma: no cover
if not isinstance(v, self.variabilityDefinition[k]['type']):
raise TypeError("Provided value {0} has type {1}, expected {2}".format(v, type(v), self.variabilityDefinition[k]['type'])) # pragma: no cover
def _CheckSolutionName(self, solutionName):
"""
Checks that solutionName has been defined as a quantity identifier before.
Argument
--------
solutionName:
candidate identifier
Raises
------
ValueError:
if not
"""
if solutionName not in self.quantityDefinition:
raise ValueError("Solution name {} was not defined as a quantity before".format(solutionName)) # pragma: no cover
[docs] def SetDataCompressionData(self, key, dataCompressionData):
"""
Adds a dataCompressionData corresponding to key
Parameters
----------
key : any type
key for which dataCompressionData corresponds
"""
self.dataCompressionData[key] = dataCompressionData
[docs] def GetDataCompressionData(self, key):
"""
Get a precomputed dataCompressionData for key
Parameters
----------
key : str
key for which dataCompressionData corresponds
Returns
-------
dataCompressionData type
"""
if key not in self.dataCompressionData:
return None # pragma: no cover
else:
return self.dataCompressionData[key]
[docs] def SetOperatorCompressionData(self, operatorCompressionData):
"""
Sets the operatorCompressionData
Parameters
----------
operatorCompressionData : custom data structure
"""
self.operatorCompressionData = operatorCompressionData
[docs] def GetOperatorCompressionData(self):
"""
Get the operatorCompressionData
Returns
-------
operatorCompressionData
custom data structure
"""
return self.operatorCompressionData
[docs] def AddProblemData(self, problemData, **kwargs):
"""
Adds a problemData to the structure
Parameters
----------
problemData : ProblemData
to add to the problemDatas dictionary
kwargs :
same type as the axes of variability, gives the point in parameter space at which to add data
"""
# check keys are all parameters
assert isinstance(
problemData, ProblemData.ProblemData
), "wrong type for problemData"
self._CheckPointInParameterSpace(**kwargs)
if tuple(kwargs.values()) in self.problemDatas.keys():
print("Warning: problemData at key "+str(tuple(kwargs.values()))+" already existing, overwriting it anyway")
self.problemDatas[tuple(kwargs.values())] = problemData
return
[docs] def GetProblemData(self, **kwargs):
"""
Returns a specific problemData
Parameters
----------
kwargs :
same type as the axes of variability, gives the point in parameter space at which to add data
Returns
-------
ProblemData
retrieved problemData
"""
self._CheckPointInParameterSpace(**kwargs)
return self.problemDatas[tuple(kwargs.values())]
[docs] def GetNumberOfProblemDatas(self):
"""
Returns the number of problemDatas
Returns
-------
int
the number of problemDatas
"""
return len(self.problemDatas.keys())
[docs] def GetParameterDimension(self):
"""
Asserts that the parameters of all problemDatas have the same parameterDimension
and returns this size
Returns
-------
int
parameterDimension of problemDatas
"""
listParameterDimension = [
problemData.GetParameterDimension()
for _, problemData in self.GetProblemDatas().items()
]
assert listParameterDimension.count(listParameterDimension[0]) == len(
listParameterDimension
)
return listParameterDimension[0]
[docs] def GetProblemDatas(self):
"""
Returns the complete problemDatas dictionary
Returns
-------
dict
problemDatas of the collectionProblemData
"""
return self.problemDatas
[docs] def GetProblemSampling(self):
"""
Return the tags of problemDatas
Returns
-------
list
list of tuples containing the computed parameter points of the available problemDatas
"""
return list(self.GetProblemDatas().keys())
[docs] def GetLoadingsOfType(self, type):
"""
Returns a list of loadings of a given type
Parameters
----------
type : str
type of loadings (eg. pressure) as defined in the LoadingBase structure
Returns
-------
list
of Loadings
"""
li = []
for _, problemData in self.GetProblemDatas().items():
li.extend(problemData.GetLoadingsOfType(type))
return li
[docs] def GetSolutionsNumberOfComponents(self, solutionName):
"""
Asserts that the solutions of name "solutionName" in all problemDatas have same nbeOfComponents
and return this size
Parameters
----------
solutionName : str
name of the solutions for which we want to return the nbeOfComponents
Returns
-------
int
nbeOfComponents of solutions of name "solutionName"
"""
nbeOfComponents = []
for _, problemData in self.GetProblemDatas().items():
try:
nbeOfComponents.append(problemData.GetSolution(solutionName).GetNbeOfComponents())
except KeyError:# pragma: no cover
continue
assert nbeOfComponents.count(nbeOfComponents[0]) == len(nbeOfComponents)
return nbeOfComponents[0]
[docs] def GetSolutionsNumberOfDofs(self, solutionName):
"""
Asserts that the solutions of name "solutionName" in all problemDatas have same nbeOfDofs
and return this size
Parameters
----------
solutionName : str
name of the solutions for which we want to return the nbeOfDofs
Returns
-------
int
nbeOfDofs of solutions of name "solutionName"
"""
nbeOfDofs = []
for _, problemData in self.GetProblemDatas().items():
try:
nbeOfDofs.append(problemData.GetSolution(solutionName).GetNumberOfDofs())
except KeyError:# pragma: no cover
continue
assert nbeOfDofs.count(nbeOfDofs[0]) == len(nbeOfDofs)
return nbeOfDofs[0]
[docs] def GetSolutionsNumberOfNodes(self, solutionName):
"""
Asserts that the solutions of name "solutionName" in all problemDatas have same nbeOfNodes
and return this size
Parameters
----------
solutionName : str
name of the solutions for which we want to return the nbeOfNodes
Returns
-------
int
nbeOfNodes of solutions of name "solutionName"
"""
nbeOfNodes = [
problemData.solutions[solutionName].GetNumberOfNodes()
for _, problemData in self.GetProblemDatas().items()
]
assert nbeOfNodes.count(nbeOfNodes[0]) == len(nbeOfNodes)
return nbeOfNodes[0]
[docs] def SnapshotsIterator(self, solutionName, skipFirst = False):
"""
Constructs an iterator over snapshots of solutions of name "solutionName" in all problemDatas.
Parameters
----------
solutionName : str
name of the solutions on which we want to iterate over snapshots
Returns
-------
iterator
an iterator over snapshots of solutions of name "solutionName" in all problemDatas
"""
this = self
self._CheckSolutionName(solutionName)
class iterator:
def __init__(self, solutionName, skipFirst):
self.solutionName = solutionName
self.problemDatas = this.problemDatas
if skipFirst == False:
self.startIndex = 0
else:
self.startIndex = 1
def __iter__(self):
for problemData in self.problemDatas.values():
try:
localIterator = problemData.GetSolution(self.solutionName).GetSnapshotsList()[self.startIndex:]
for snapshot in localIterator:
yield snapshot
except KeyError:# pragma: no cover
continue
res = iterator(solutionName, skipFirst)
return res
[docs] def GetSnapshots(self, solutionName, skipFirst = False):
"""
Returns snapshots in a numpy array
Parameters
----------
solutionName : str
name of the solutions on which we want to iterate over snapshots
skipFirst : bool
(optional) True for skipping first time step of each ProblemData
Returns
-------
np.ndarray
of size (nbSnapshots, numberOfDofs)
"""
self._CheckSolutionName(solutionName)
nbSnapshots = self.GetGlobalNumberOfSnapshots(solutionName, skipFirst)
numberOfDofs = self.GetSolutionsNumberOfDofs(solutionName)
snapshots = np.empty((nbSnapshots, numberOfDofs))
for i, s in enumerate(self.SnapshotsIterator(solutionName, skipFirst)):
snapshots[i,:] = s
return snapshots
[docs] def GetSnapshotsAtTimes(self, solutionName, timeSequence):
"""
Returns snapshots of solution solutionName from all problemDatas and for a given time sequence, in a numpy array
Parameters
----------
solutionName : str
name of the solutions on which we want to iterate over compressedSnapshots
timeSequence : list or numpy array
time steps for snapshot retrieving (doing time interpolation if needed)
Returns
-------
np.ndarray
of size (nbSnapshots, numberOfDofs)
"""
self._CheckSolutionName(solutionName)
nbTimeSteps = np.array(timeSequence).shape[0]
nbProblemDatas = self.GetNumberOfProblemDatas()
numberOfDofs = self.GetSolutionsNumberOfDofs(solutionName)
snapshots = np.empty((nbTimeSteps*nbProblemDatas, numberOfDofs))
count = 0
for problemData in self.problemDatas.values():
solution = problemData.GetSolution(solutionName)
for t in timeSequence:
snapshots[count,:] = solution.GetSnapshotAtTime(t)
count += 1
return snapshots
[docs] def CompressedSnapshotsIterator(self, solutionName, skipFirst = False):
"""
Constructs an iterator over compressedSnapshots of solutions of name "solutionName" in all problemDatas.
Parameters
----------
solutionName : str
name of the solutions on which we want to iterate over compressedSnapshots
skipFirst : bool
(optional) True for skipping first time step of each ProblemData
Returns
-------
iterator
an iterator over compressedSnapshots of solutions of name "solutionName" in all problemDatas
"""
this = self
self._CheckSolutionName(solutionName)
class iterator:
def __init__(self, solutionName, skipFirst):
self.solutionName = solutionName
self.problemDatas = this.problemDatas
if skipFirst == False:
self.startIndex = 0
else:
self.startIndex = 1
def __iter__(self):
for problemData in self.problemDatas.values():
localIterator = problemData.GetSolution(self.solutionName).GetCompressedSnapshotsList()[self.startIndex :]
for snapshot in localIterator:
yield snapshot
res = iterator(solutionName, skipFirst)
return res
[docs] def GetCompressedSnapshots(self, solutionName, skipFirst = False):
"""
Returns compressed snapshots of solution solutionName from all problemDatas, in a numpy array
Parameters
----------
solutionName : str
name of the solutions on which we want to iterate over compressedSnapshots
skipFirst : bool
(optional) True for skipping first time step of each ProblemData
Returns
-------
np.ndarray
of size (nbSnapshots, numberOfModes)
"""
self._CheckSolutionName(solutionName)
nbSnapshots = self.GetGlobalNumberOfSnapshots(solutionName, skipFirst)
numberOfModes = self.GetReducedOrderBasisNumberOfModes(solutionName)
compressedSnapshots = np.empty((nbSnapshots, numberOfModes))
for i, s in enumerate(self.CompressedSnapshotsIterator(solutionName, skipFirst)):
compressedSnapshots[i,:] = s
return compressedSnapshots
[docs] def GetCompressedSnapshotsAtTimes(self, solutionName, timeSequence):
"""
Returns compressed snapshots of solution solutionName from all problemDatas and for a given time sequence, in a numpy array
Parameters
----------
solutionName : str
name of the solutions on which we want to iterate over compressedSnapshots
timeSequence : list or numpy array
time steps for snapshot retrieving (doing time interpolation if needed)
Returns
-------
np.ndarray
of size (nbSnapshots, numberOfModes)
"""
self._CheckSolutionName(solutionName)
nbTimeSteps = np.array(timeSequence).shape[0]
nbProblemDatas = self.GetNumberOfProblemDatas()
numberOfModes = self.GetReducedOrderBasisNumberOfModes(solutionName)
compressedSnapshots = np.empty((nbTimeSteps*nbProblemDatas, numberOfModes))
count = 0
for problemData in self.problemDatas.values():
solution = problemData.GetSolution(solutionName)
for t in timeSequence:
compressedSnapshots[count,:] = solution.GetCompressedSnapshotsAtTime(t)
count += 1
return compressedSnapshots
[docs] def GetGlobalNumberOfSnapshots(self, solutionName, skipFirst = False):
"""
Iterates over problemDatas to return the complete number of snpashots for solutions of name "solutionName"
Parameters
----------
solutionName : str
name of the solutions for which we want to compute the total number of snapshots
Returns
-------
int
number of snpashots for solutions of name "solutionName"
"""
if skipFirst == False:
offset = 0
else:
offset = -1
number = 0
for _, problemData in self.problemDatas.items():
try:
number += problemData.GetSolution(solutionName).GetNumberOfSnapshots() + offset
except KeyError:# pragma: no cover
continue
return number
[docs] def GetSolutionTimeSteps(self, solutionName, skipFirst = False):
"""
GetTimeSteps
Parameters
----------
solutionName : str
name of the solutions for which we want to compute the total number of snapshots
"""
if skipFirst == False:
startIndex = 0
else:
startIndex = 1
solutionTimeSteps = np.array([])
for _, problemData in self.problemDatas.items():
solutionTimeSteps = np.append(solutionTimeSteps, problemData.GetSolution(solutionName).GetTimeSequenceFromSnapshots()[startIndex:], 0)
return solutionTimeSteps
[docs] def CompressSolutions(self, solutionName, snapshotCorrelationOperator = None):
"""
Compress solutions of name "solutionName" from all ProblemDatas in collectionProblemData, and update to corresponding solution.compressedSnapshots.
Parameters
----------
solutionName : str
name of the solutions to compress
snapshotCorrelationOperator : scipy.sparse.csr, optional
correlation operator between the snapshots
"""
assert isinstance(solutionName, str)
if snapshotCorrelationOperator is None:
snapshotCorrelationOperator = sparse.eye(self.GetSolutionsNumberOfDofs(solutionName))
reducedOrderBasis = self.GetReducedOrderBasis(solutionName)
for _, problemData in self.GetProblemDatas().items():
problemData.CompressSolution(solutionName, reducedOrderBasis, snapshotCorrelationOperator)
[docs] def ConvertCompressedSnapshotReducedOrderBasis(self, solutionName, projectedReducedOrderBasis):
"""
Converts the reducedSnapshot from the current reducedOrderBasis to a newReducedOrderBasis using a projectedReducedOrderBasis between the current one and a new one
Parameters
----------
solutionName : str
name of the solution whose compressedSnapshot is to convert
projectedReducedOrderBasis : np.ndarray
of size (newNumberOfModes, numberOfModes)
"""
for _, problemData in self.GetProblemDatas().items():
problemData.ConvertCompressedSnapshotReducedOrderBasis(solutionName, projectedReducedOrderBasis)
[docs] def ComputeReducedOrderBasisProjection(self, solutionName, newReducedOrderBasis, snapshotCorrelationOperator = None):
"""
Computes the projection of reducedOrderBasis[solutionName] on newReducedOrderBases
Parameters
----------
solutionName : str
name of the solutions to compress
newReducedOrderBasis : np.ndarray
of size (newNumberOfModes, numberOfDOFs)
snapshotCorrelationOperator : scipy.sparse.csr, optional
correlation operator between the snapshots
Returns
-------
np.ndarray
of size (newNumberOfModes, numberOfModes)
"""
assert isinstance(solutionName, str)
if snapshotCorrelationOperator is None:
snapshotCorrelationOperator = sparse.eye(self.GetSolutionsNumberOfDofs(solutionName))
reducedOrderBasis = self.GetReducedOrderBasis(solutionName)
numberOfModes = reducedOrderBasis.shape[0]
newNumberOfModes = newReducedOrderBasis.shape[0]
localProjectedReducedOrderBasis = np.dot(newReducedOrderBasis, snapshotCorrelationOperator.dot(reducedOrderBasis.T))
globalProjectedReducedOrderBasis = np.zeros((newNumberOfModes, numberOfModes))
MPI.COMM_WORLD.Allreduce([localProjectedReducedOrderBasis, MPI.DOUBLE], [globalProjectedReducedOrderBasis, MPI.DOUBLE])
return globalProjectedReducedOrderBasis
[docs] def SetTemplateDataset(self, templateDataset):
"""
Template dataset to compute high-fidelity solution
Parameters
----------
templateDataset : Dataset
"""
self.templateDataset = templateDataset
[docs] def SetReducedTemplateDataset(self, reducedTemplateDataset):
"""
Template dataset to compute reduced solution
Parameters
----------
reducedTemplateDataset : Dataset
"""
self.reducedTemplateDataset = reducedTemplateDataset
[docs] def Solve(self, **kwargs):
"""
New high-fidelity model evaluation
Parameters
----------
kwargs: (parameter_name=value) name of the variability as key and value stays value.
"""
extract = kwargs.pop("extract", None)
primalities = kwargs.pop("primalities", None)
solutionReaderType = kwargs.pop("solutionReaderType", None)
dataset = self.templateDataset.Instantiate(**kwargs)
# populate information for the structure
return dataset.Run(extract=extract,
solutionStructures=self.solutionStructures,
primalities=primalities,
solutionReaderType=solutionReaderType)
[docs] def SolveReduced(self, **kwargs):
"""
New high-fidelity model evaluation
Parameters
----------
kwargs: (parameter_name=value) name of the variability as key and value stays value.
"""
extract = kwargs.pop("extract", None)
primalities = kwargs.pop("primalities", None)
solutionReaderType = kwargs.pop("solutionReaderType", None)
kwargs["reduced"] = True
dataset = self.reducedTemplateDataset.Instantiate(**kwargs)
# populate information for the structure
return (dataset.Run(extract=extract,
solutionStructures=self.solutionStructures,
primalities=primalities,
solutionReaderType=solutionReaderType),
osp.join(dataset.inputData["inputRootFolder"], dataset.inputData["inputResultPath"])
)
[docs] def ComputeAPosterioriError(self, **kwargs):
"""
Computes the residual of equilibrium
Returns
-------
field
"""
dataset = self.specificDatasets["computeAPosterioriError"]
return dataset.Run(**kwargs)
[docs] def GetSolutionStructure(self, solutionName):
"""
Parameters
----------
solutionName : str
identifier of the physical quantity, e.g. "U", "sigma"
Returns
-------
SolutionStructureBase
solution structure for the demanded solution
"""
return self.solutionStructures[solutionName]
[docs] def SetSolutionStructure(self, solutionName, solutionStructure):
"""
Parameters
----------
solutionName : str
identifier of the physical quantity, e.g. "U", "sigma"
solutionStructure : SolutionStructureBase
solution structure for the demanded solution
"""
self.solutionStructures[solutionName] = solutionStructure
[docs] def DefineVariabilitySupport(self, parameters, ndarrays):
"""
Parameters
----------
parameters : iterable
iterable over the identifiers (str or tuple(str)) of parameters
ndarrays : iterable
iterable over the discrete support for each identifier
Note
----
Parameters may have a tuple of parameters as a component, when the discrete
support along these two or more parameters is not cartesian.
Then, the corresponding ndarray has shape (numberOfPoints, numerOfParametersInTuple)
The whole discrete support is generated as a cartesian product of discrete supports.
"""
if len(parameters) != len(ndarrays):#pragma: no cover
raise ValueError("Arguments 'parameters' and 'ndarrays' of defineVariabilitySupport should be of the same length.")
for k, v in zip(parameters, ndarrays):
self.variabilitySupport[k] = v
[docs] def GenerateVariabilitySupport(self):
"""
Realizes the catersian product to generate full grid for variability support
Returns
-------
ndarray
ndarray of shape(totalNumberOfPoints, numberOfParameters) with the coordinates of points
Note
----
On the last axis, parameters values are those of the parameters
in the order they were declared in defineVariabilitySupport
"""
ndarrays = self.variabilitySupport.values()
meshgrid = np.meshgrid(*ndarrays, indexing='ij')
return np.column_stack(tuple(m.flatten() for m in meshgrid))
[docs] def accept(self, visitor):
"""
Visitor design pattern
"""
return visitor.visitCPD(self)
def __str__(self):
res = "CollectionProblemData\n"
res += "number of problemDatas: " + str(self.GetNumberOfProblemDatas()) + "\n"
res += "problemDatas: " + str(self.GetProblemSampling())
return res
if __name__ == "__main__":# pragma: no cover
from Mordicus import RunTestFile
RunTestFile(__file__)