Source code for Mordicus.Core.Containers.Visitor

import json
from jschon import create_catalog, JSON, JSONSchema
import numpy as np
import os
import os.path as osp
import shutil

from Mordicus.Core.Containers.CollectionProblemData import CollectionProblemData
from Mordicus.Core.Containers import ProblemData
from Mordicus.Core.Containers.Solution import Solution

[docs]class ExportToJSONVisitor(object): """ Abstract visitor """ def __init__(self, folder, solutionReader=None, reconstruct=False): """ Initializes visitor """ self.reconstruct = reconstruct self.folder = folder self.solutionReader = solutionReader
[docs] def visitCPD(self, cpd): """ Visit Collection Problem Data Structure """ root = {"CollectionProblemData": {}} quantityDefinitions = cpd.quantityDefinition.accept(self) root["CollectionProblemData"]["quantityDefinitions"] = quantityDefinitions variabilityDefinitions = cpd.variabilityDefinition.accept(self) root["CollectionProblemData"]["variabilityDefinitions"] = variabilityDefinitions # if cpd.reducedTemplateDataset is not None: # subelement = cpd.reducedTemplateDataset.accept(self, cpd) # root["CollectionProblemData"].append(subelement) if self.reconstruct: # save solution structures if cpd.solutionStructures: elts = [] for quantity, structure in cpd.solutionStructures.items(): elts.append(structure.accept(self, quantity)) root["CollectionProblemData"]["solutionStructures"] = elts # save reduced bases elts = [] for quantity, basis in cpd.reducedOrderBases.items(): filepath = osp.join(self.folder, "reducedOrderBasis" + quantity) # Try to write it to custom format, otherwise to numpy try: self.solutionReader.WriteReducedOrderBasis(filepath, cpd.solutionStructures[quantity], basis, quantity) except Exception: if osp.exists(filepath): os.remove(filepath) filepath = filepath + ".npy" np.save(filepath, basis) elts.append({"path": filepath, "quantity": quantity }) root["CollectionProblemData"]["reducedOrderBases"] = elts # save problem datas elts = {} for paramValues, problemData in cpd.problemDatas.items(): params = {} param_names = [k for k in cpd.variabilityDefinition.keys()] for i, val in enumerate(paramValues): params[param_names[i]] = val name, pb_data = problemData.accept(self) pb_data["params"] = params elts[name] = pb_data root["CollectionProblemData"]["problemDatas"] = elts # loop operatorCompressionData elts = [] for key, reducedOperator in cpd.operatorCompressionData.items(): # Try exporting through reader, and numpy if this fails filepath = osp.join(self.folder, "reducedOperator" + key) try: self.solutionReader.WriteOperatorCompressionData(filepath, key, cpd) except Exception: if osp.exists(filepath): os.remove(filepath) filepath = filepath + ".npy" np.save(filepath, reducedOperator) elts.append({"path": filepath, "key": key}) root["CollectionProblemData"]["operatorCompressionData"] = elts return root
[docs] def visitSolutionStructure(self, structure, quantity): """ Visit Solution Structure """ filepath = osp.join(self.folder, "solutionStructure" + quantity) # It would be cool to have a file extension attribute to solutionStructure if self.solutionReader: self.solutionReader.WriteSolutionStructure(filepath, structure, quantity) return {"path": filepath, "quantity": quantity, "derivedType": type(structure).__name__}
[docs] def visitDataSet(self, dataset, cpd): """ Visit Dataset """ attrib = {"derivedType" : type(dataset).__name__, "produced_object" : type(dataset.produced_object).__name__, "solver" : dataset.solver.id} # Handle input_data input_data = dataset.input_data # Copy the content of input_root_folder dest = osp.join(self.folder, "reducedDataset") shutil.copytree(input_data["input_root_folder"], dest) attribData = {} attribData["input_root_folder"] = dest # Copy input_main_file and input_instruction_file if not already in tree cwd = os.getcwd() os.chdir(input_data["input_root_folder"]) try: for key in ("input_main_file", "input_instruction_file"): if key in input_data: if not osp.abspath(osp.realpath(input_data[key])).startswith(input_data["input_root_folder"]): dest = osp.join(self.folder, "reducedDataset", osp.basename(input_data[key])) shutil.copy2(input_data[key], dest) attribData[key] = dest else: attribData[key] = input_data[key] finally: os.chdir(cwd) # Copy input_result_type and input_result_path for key in ("input_result_type", "input_result_path"): attribData[key] = str(input_data[key]) # mordicus_input_data: loop over keys: # - if key is "modes": reference reducedOrderBases # - if key is in operatorCompressionData: reference reducedOperator<key> # - otherwise, copy and reference original path inputMordicus = [] for key, value in input_data["input_mordicus_data"].items(): if key in ("modes", ): inputModes = [] for k in value.keys(): inputModes.append({"quantity": k, "path": osp.join(self.folder, "reducedOrderBasis"+k)}) inputMordicus.append(inputModes) elif key in cpd.operatorCompressionData: inputMordicus.append({"key": key, "path": osp.join(self.folder, "reducedOperator" + key)}) else: cwd = os.getcwd() os.chdir(input_data["input_root_folder"]) try: if not isinstance(value, str): value = value.GetInternalStorage() if not osp.abspath(osp.realpath(value)).startswith(input_data["input_root_folder"]): dest = osp.join(self.folder, "reducedDataset", osp.basename(value)) if osp.isdir(value): shutil.copytree(value, dest) else: shutil.copy2(value, dest) path = dest else: path = value finally: os.chdir(cwd) inputMordicus.append({"key": key, "path": path}) attribData["inputMordicusDatas"] = inputMordicus attrib["inputData"] = attribData return attrib
[docs] def visitQuantityDefinitionDict(self, defdict): """Visit quantity definition dictionary""" elts = [] for name, (full_name, unit) in defdict.items(): elts.append({"name":name, "full_name":full_name, "unit":unit}) return elts
[docs] def visitVariabilityDefinitionDict(self, defdict): """Visit variability definition dictionary""" elts = [] for name, valdict in defdict.items(): attrib = valdict.copy() attrib["name"] = name for k, v in valdict.items(): if k == "type": attrib[k] = v.__name__ if k == "quantity": attrib[k] = v[0] elts.append(attrib) return elts
[docs] def visitProblemData(self, problemData): """Visit problemData""" elts = {} for name, solution in problemData.solutions.items(): elts[name] = solution.accept(self) return problemData.problemName, elts
[docs] def visitSolution(self, solution): """Visit solution""" attrib = {"quantity": solution.solutionName, "nbeOfComponents": solution.nbeOfComponents, "numberOfNodes": solution.numberOfNodes, "primality": str(solution.primality).lower()} elts = [] for t, arr in solution.compressedSnapshots.items(): elts.append({"time": str(t), "values": np.array2string(arr, precision=8, separator=',')}) attrib["compressedSnapshots"] = elts return attrib
[docs] def visitSolver(self, solver): """Visit Solver""" attrib = {"id": solver.id, "solver_call_procedure_type": solver.solver_call_procedure_type, "call_script": solver.call_script} cfgs = [] for key, value in solver.solver_cfg: cfgs.append({"name": key, "value": value}) attrib["SolverConfiguration"] = cfgs return attrib
[docs]def importFromJSON(folder, filename="reducedModel.json", solutionReader=None, reconstruct=False): """ Import study from a json """ visitor = ImportFromJSONVisitor(folder, filename, solutionReader, reconstruct) cpd = CollectionProblemData() visitor.visitCPD(cpd) return cpd
[docs]def exportToJSON(folder, cpd, solutionReader=None, reconstruct=False): """ Export study to a folder with directing JSON file """ os.makedirs(folder, exist_ok=True) visitor = ExportToJSONVisitor(folder, solutionReader=solutionReader, reconstruct=reconstruct) root = visitor.visitCPD(cpd) with open(osp.join(folder, "reducedModel.json"), "w") as out_file: json.dump(root, out_file, indent=2)
[docs]def checkValidity(json_path): """ Checks validity of JSON file """ create_catalog('2020-12') with open(osp.join(osp.dirname(__file__), "Mordicus.json")) as schema_file: schema = JSONSchema(json.load(schema_file)) with open(json_path) as json_file: json_doc = JSON(json.load(json_file)) return schema.evaluate(json_doc)
[docs]class ImportFromJSONVisitor(object): """ Abstract visitor """ def __init__(self, folder, filename="reducedModel.json", solutionReader=None, reconstruct=False): """ Initializes visitor """ self.reconstruct = reconstruct self.folder = folder self.solutionReader = solutionReader self.filename = filename with open(osp.join(folder, filename)) as f: self.json_data = json.load(f)
[docs] def visitCPD(self, cpd): """Visit Collection Problem Data""" if "CollectionProblemData" not in self.json_data: return root = self.json_data["CollectionProblemData"] if "quantityDefinitions" in root: cpd.quantityDefinition.accept(self) if "variabilityDefinitions" in root: cpd.variabilityDefinition.accept(self) if self.reconstruct: if "solutionStructures" in root: pass if "reducedOrderBases" in root: for item in root["reducedOrderBases"]: filepath = item['path'] quantity = item['quantity'] try: basis = self.solutionReader.ReadReducedOrderBasis(filepath, cpd.solutionStructures[quantity], quantity) except: basis = np.load(filepath) cpd.AddReducedOrderBasis(quantity, basis) if "problemDatas" in root: for pb_name, pb_data in root['problemDatas'].items(): problemData = ProblemData.ProblemData(pb_name) for key, data in pb_data.items(): if key == "params": params = data else: quantity = data['quantity'] nbeOfComponents = data['nbeOfComponents'] numberOfNodes = data['numberOfNodes'] primality = data['primality'] solution = Solution(quantity, nbeOfComponents, numberOfNodes, primality) if 'compressedSnapshots' in data: for item in data['compressedSnapshots']: values = eval('np.array(' + item['values'] + ')') # not secure ! solution.AddCompressedSnapshots(values, float(item['time'])) problemData.AddSolution(solution) cpd.AddProblemData(problemData, **params) if 'operatorCompressionData' in root: datas = {} for item in root['operatorCompressionData']: filepath = item['path'] key = item['key'] try: data = self.solutionReader.ReadOperatorCompressionData(filepath,key) except: data = np.load(filepath) datas[key] = data cpd.SetOperatorCompressionData(datas)
[docs] def visitQuantityDefinitionDict(self, defdict): """Visit quantity definition dictionary""" for item in self.json_data["CollectionProblemData"]["quantityDefinitions"]: defdict[item['name']] = (item['full_name'],item['unit'])
[docs] def visitVariabilityDefinitionDict(self, defdict): """Visit variability definition dictionary""" for item in self.json_data["CollectionProblemData"]["variabilityDefinitions"]: defdict[item['name']] = {} for k,v in item.items(): if k == 'type': if v == 'float': defdict[item['name']]['type'] = float elif k == 'description': defdict[item['name']]['description'] = v elif k == 'quantity': defdict[item['name']]['quantity'] = (v,)