diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index 4eb8adff..9e2c46e1 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -34,6 +34,7 @@ import csv import logging +import numbers import os import platform import re @@ -46,6 +47,7 @@ from dataclasses import dataclass import textwrap from typing import Optional +import warnings from OMPython.OMCSession import OMCSessionZMQ, OMCSessionException @@ -108,6 +110,207 @@ def __getitem__(self, index: int): return {0: self.A, 1: self.B, 2: self.C, 3: self.D}[index] +class ModelicaSystemCmd: + """ + Execute a simulation by running the compiled model. + """ + + def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[int] = None) -> None: + """ + Initialisation + + Parameters + ---------- + runpath : pathlib.Path + modelname : str + timeout : Optional[int], None + """ + self._runpath = pathlib.Path(runpath).resolve().absolute() + self._modelname = modelname + self._timeout = timeout + self._args: dict[str, str | None] = {} + self._arg_override: dict[str, str] = {} + + def arg_set(self, key: str, val: Optional[str | dict] = None) -> None: + """ + Set one argument for the executable model. + + Parameters + ---------- + key : str + val : str, None + """ + if not isinstance(key, str): + raise ModelicaSystemError(f"Invalid argument key: {repr(key)} (type: {type(key)})") + key = key.strip() + if val is None: + argval = None + elif isinstance(val, str): + argval = val.strip() + elif isinstance(val, numbers.Number): + argval = str(val) + elif key == 'override' and isinstance(val, dict): + for okey in val: + if not isinstance(okey, str) or not isinstance(val[okey], (str, numbers.Number)): + raise ModelicaSystemError("Invalid argument for 'override': " + f"{repr(okey)} = {repr(val[okey])}") + self._arg_override[okey] = val[okey] + + argval = ','.join([f"{okey}={str(self._arg_override[okey])}" for okey in self._arg_override]) + else: + raise ModelicaSystemError(f"Invalid argument value for {repr(key)}: {repr(val)} (type: {type(val)})") + + if key in self._args: + logger.warning(f"Overwrite model executable argument: {repr(key)} = {repr(argval)} " + f"(was: {repr(self._args[key])})") + self._args[key] = argval + + def args_set(self, args: dict[str, Optional[str | dict[str, str]]]) -> None: + """ + Define arguments for the model executable. + + Parameters + ---------- + args : dict[str, Optional[str | dict[str, str]]] + """ + for arg in args: + self.arg_set(key=arg, val=args[arg]) + + def get_exe(self) -> pathlib.Path: + """ + Get the path to the executable / complied model. + + Returns + ------- + pathlib.Path + """ + if platform.system() == "Windows": + path_exe = self._runpath / f"{self._modelname}.exe" + else: + path_exe = self._runpath / self._modelname + + if not path_exe.exists(): + raise ModelicaSystemError(f"Application file path not found: {path_exe}") + + return path_exe + + def get_cmd(self) -> list: + """ + Run the requested simulation + + Returns + ------- + list + """ + + path_exe = self.get_exe() + + cmdl = [path_exe.as_posix()] + for key in self._args: + if self._args[key] is None: + cmdl.append(f"-{key}") + else: + cmdl.append(f"-{key}={self._args[key]}") + + return cmdl + + def run(self) -> int: + """ + Run the requested simulation + + Returns + ------- + int + """ + + cmdl: list = self.get_cmd() + + logger.debug("Run OM command %s in %s", repr(cmdl), self._runpath.as_posix()) + + if platform.system() == "Windows": + path_dll = "" + + # set the process environment from the generated .bat file in windows which should have all the dependencies + path_bat = self._runpath / f"{self._modelname}.bat" + if not path_bat.exists(): + ModelicaSystemError("Batch file (*.bat) does not exist " + str(path_bat)) + + with open(path_bat, 'r') as file: + for line in file: + match = re.match(r"^SET PATH=([^%]*)", line, re.IGNORECASE) + if match: + path_dll = match.group(1).strip(';') # Remove any trailing semicolons + my_env = os.environ.copy() + my_env["PATH"] = path_dll + os.pathsep + my_env["PATH"] + else: + # TODO: how to handle path to resources of external libraries for any system not Windows? + my_env = None + + try: + cmdres = subprocess.run(cmdl, capture_output=True, text=True, env=my_env, cwd=self._runpath, + timeout=self._timeout) + stdout = cmdres.stdout.strip() + stderr = cmdres.stderr.strip() + returncode = cmdres.returncode + + logger.debug("OM output for command %s:\n%s", repr(cmdl), stdout) + + if stderr: + raise ModelicaSystemError(f"Error running command {repr(cmdl)}: {stderr}") + except subprocess.TimeoutExpired: + raise ModelicaSystemError(f"Timeout running command {repr(cmdl)}") + except subprocess.CalledProcessError as ex: + raise ModelicaSystemError(f"Error running command {repr(cmdl)}") from ex + + return returncode + + @staticmethod + def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, str]]]: + """ + Parse a simflag definition; this is depreciated! + + The return data can be used as input for self.args_set(). + + Parameters + ---------- + simflags : str + + Returns + ------- + dict + """ + warnings.warn("The argument 'simflags' is depreciated and will be removed in future versions; " + "please use 'simargs' instead", DeprecationWarning, stacklevel=2) + + simargs: dict[str, Optional[str | dict[str, str]]] = {} + + args = [s for s in simflags.split(' ') if s] + for arg in args: + if arg[0] != '-': + raise ModelicaSystemError(f"Invalid simulation flag: {arg}") + arg = arg[1:] + parts = arg.split('=') + if len(parts) == 1: + simargs[parts[0]] = None + elif parts[0] == 'override': + override = '='.join(parts[1:]) + + override_dict = {} + for item in override.split(','): + kv = item.split('=') + if not (0 < len(kv) < 3): + raise ModelicaSystemError(f"Invalid value for '-override': {override}") + if kv[0]: + try: + override_dict[kv[0]] = kv[1] + except (KeyError, IndexError) as ex: + raise ModelicaSystemError(f"Invalid value for '-override': {override}") from ex + + simargs[parts[0]] = override_dict + + return simargs + + class ModelicaSystem: def __init__( self, @@ -116,7 +319,7 @@ def __init__( lmodel: Optional[list[str | tuple[str, str]]] = None, commandLineOptions: Optional[str] = None, variableFilter: Optional[str] = None, - customBuildDirectory: Optional[str | os.PathLike] = None, + customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None, omhome: Optional[str] = None, session: Optional[OMCSessionZMQ] = None, build: Optional[bool] = True @@ -174,7 +377,6 @@ def __init__( self.linearinputs = [] # linearization input list self.linearoutputs = [] # linearization output list self.linearstates = [] # linearization states list - self.tempdir = "" if session is not None: if not isinstance(session, OMCSessionZMQ): @@ -200,7 +402,7 @@ def __init__( self.simulationFlag = False # if the model is simulated? self.outputFlag = False self.csvFile = '' # for storing inputs condition - self.resultfile = "" # for storing result file + self.resultfile = None # for storing result file self.variableFilter = variableFilter if self.fileName is not None and not self.fileName.is_file(): # if file does not exist @@ -212,7 +414,7 @@ def __init__( self.setCommandLineOptions("--linearizationDumpLanguage=python") self.setCommandLineOptions("--generateSymbolicLinearization") - self.setTempDirectory(customBuildDirectory) + self.tempdir = self.setTempDirectory(customBuildDirectory) if self.fileName is not None: self.loadLibrary(lmodel=self.lmodel) @@ -260,62 +462,25 @@ def loadLibrary(self, lmodel: list): '1)["Modelica"]\n' '2)[("Modelica","3.2.3"), "PowerSystems"]\n') - def setTempDirectory(self, customBuildDirectory): + def setTempDirectory(self, customBuildDirectory) -> pathlib.Path: # create a unique temp directory for each session and build the model in that directory if customBuildDirectory is not None: if not os.path.exists(customBuildDirectory): raise IOError(customBuildDirectory, " does not exist") - self.tempdir = customBuildDirectory + tempdir = pathlib.Path(customBuildDirectory) else: - self.tempdir = tempfile.mkdtemp() - if not os.path.exists(self.tempdir): - raise IOError(self.tempdir, " cannot be created") + tempdir = pathlib.Path(tempfile.mkdtemp()) + if not tempdir.is_dir(): + raise IOError(tempdir, " cannot be created") - logger.info("Define tempdir as %s", self.tempdir) - exp = f'cd("{pathlib.Path(self.tempdir).absolute().as_posix()}")' + logger.info("Define tempdir as %s", tempdir) + exp = f'cd("{tempdir.absolute().as_posix()}")' self.sendExpression(exp) - def getWorkDirectory(self): - return self.tempdir - - def _run_cmd(self, cmd: list, timeout: Optional[int] = None): - logger.debug("Run OM command %s in %s", cmd, self.tempdir) - - if platform.system() == "Windows": - dllPath = "" - - # set the process environment from the generated .bat file in windows which should have all the dependencies - batFilePath = pathlib.Path(self.tempdir) / f"{self.modelName}.bat" - if not batFilePath.exists(): - raise ModelicaSystemError("Batch file (*.bat) does not exist " + str(batFilePath)) - - with open(batFilePath, 'r') as file: - for line in file: - match = re.match(r"^SET PATH=([^%]*)", line, re.IGNORECASE) - if match: - dllPath = match.group(1).strip(';') # Remove any trailing semicolons - my_env = os.environ.copy() - my_env["PATH"] = dllPath + os.pathsep + my_env["PATH"] - else: - # TODO: how to handle path to resources of external libraries for any system not Windows? - my_env = None - - try: - cmdres = subprocess.run(cmd, capture_output=True, text=True, env=my_env, cwd=self.tempdir, - timeout=timeout) - stdout = cmdres.stdout.strip() - stderr = cmdres.stderr.strip() - - logger.debug("OM output for command %s:\n%s", cmd, stdout) + return tempdir - if cmdres.returncode != 0: - raise ModelicaSystemError(f"Error running command {cmd}: return code = {cmdres.returncode}") - if stderr: - raise ModelicaSystemError(f"Error running command {cmd}: {stderr}") - except subprocess.TimeoutExpired: - raise ModelicaSystemError(f"Timeout running command {repr(cmd)}") - except Exception as ex: - raise ModelicaSystemError(f"Error running command {cmd}") from ex + def getWorkDirectory(self) -> pathlib.Path: + return self.tempdir def buildModel(self, variableFilter=None): if variableFilter is not None: @@ -646,38 +811,38 @@ def getOptimizationOptions(self, names=None): # 10 raise ModelicaSystemError("Unhandled input for getOptimizationOptions()") - def get_exe_file(self) -> pathlib.Path: - """Get path to model executable.""" - if platform.system() == "Windows": - return pathlib.Path(self.tempdir) / f"{self.modelName}.exe" - else: - return pathlib.Path(self.tempdir) / self.modelName - - def simulate(self, resultfile=None, simflags=None, timeout: Optional[int] = None): # 11 + def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, + timeout: Optional[int] = None): # 11 """ This method simulates model according to the simulation options. usage >>> simulate() >>> simulate(resultfile="a.mat") >>> simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") # set runtime simulation flags + >>> simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "e=0.3,g=10"}) # using simargs """ + + om_cmd = ModelicaSystemCmd(runpath=self.tempdir, modelname=self.modelName, timeout=timeout) + if resultfile is None: # default result file generated by OM - self.resultfile = (pathlib.Path(self.tempdir) / f"{self.modelName}_res.mat").as_posix() + self.resultfile = self.tempdir / f"{self.modelName}_res.mat" elif os.path.exists(resultfile): - self.resultfile = resultfile + self.resultfile = pathlib.Path(resultfile) else: - self.resultfile = (pathlib.Path(self.tempdir) / resultfile).as_posix() + self.resultfile = self.tempdir / resultfile # always define the resultfile to use - resultfileflag = " -r=" + self.resultfile + om_cmd.arg_set(key="r", val=self.resultfile.as_posix()) # allow runtime simulation flags from user input - if simflags is None: - simflags = "" - else: - simflags = " " + simflags + if simflags is not None: + om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) + + if simargs: + om_cmd.args_set(args=simargs) - overrideFile = pathlib.Path(self.tempdir) / f"{self.modelName}_override.txt" + overrideFile = self.tempdir / f"{self.modelName}_override.txt" if self.overridevariables or self.simoptionsoverride: tmpdict = self.overridevariables.copy() tmpdict.update(self.simoptionsoverride) @@ -685,9 +850,8 @@ def simulate(self, resultfile=None, simflags=None, timeout: Optional[int] = None with open(overrideFile, "w") as file: for key, value in tmpdict.items(): file.write(f"{key}={value}\n") - override = " -overrideFile=" + overrideFile.as_posix() - else: - override = "" + + om_cmd.arg_set(key="overrideFile", val=overrideFile.as_posix()) if self.inputFlag: # if model has input quantities for i in self.inputlist: @@ -702,17 +866,18 @@ def simulate(self, resultfile=None, simflags=None, timeout: Optional[int] = None if float(self.simulateOptions["stopTime"]) != val[-1][0]: raise ModelicaSystemError(f"stopTime not matched for Input {i}!") self.csvFile = self.createCSVData() # create csv file - csvinput = " -csvInput=" + self.csvFile.as_posix() - else: - csvinput = "" - exe_file = self.get_exe_file() - if not exe_file.exists(): - raise ModelicaSystemError(f"Application file path not found: {exe_file}") + om_cmd.arg_set(key="csvInput", val=self.csvFile.as_posix()) + + # delete resultfile ... + if self.resultfile.is_file(): + self.resultfile.unlink() + # ... run simulation ... + returncode = om_cmd.run() + # and check returncode *AND* resultfile + if returncode != 0 and self.resultfile.is_file(): + logger.warning(f"Return code = {returncode} but result file exists!") - cmd = exe_file.as_posix() + override + csvinput + resultfileflag + simflags - cmd = [s for s in cmd.split(' ') if s] - self._run_cmd(cmd=cmd, timeout=timeout) self.simulationFlag = True # to extract simulation results @@ -729,7 +894,7 @@ def getSolutions(self, varList=None, resultfile=None): # 12 >>> getSolutions(["Name1","Name2"],resultfile=""c:/a.mat"") """ if resultfile is None: - resFile = self.resultfile + resFile = self.resultfile.as_posix() else: resFile = resultfile @@ -961,7 +1126,7 @@ def createCSVData(self) -> pathlib.Path: ] csv_rows.append(row) - csvFile = pathlib.Path(self.tempdir) / f'{self.modelName}.csv' + csvFile = self.tempdir / f'{self.modelName}.csv' with open(csvFile, "w", newline="") as f: writer = csv.writer(f) @@ -1028,13 +1193,15 @@ def optimize(self): # 21 return optimizeResult def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, timeout: Optional[int] = None) -> LinearizationResult: """Linearize the model according to linearOptions. Args: lintime: Override linearOptions["stopTime"] value. simflags: A string of extra command line flags for the model - binary. + binary. - depreciated in favor of simargs + simargs: A dict with command line flags and possible options; example: "simargs={'csvInput': 'a.csv'}" timeout: Possible timeout for the execution of OM. Returns: @@ -1051,7 +1218,9 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N raise IOError("Linearization cannot be performed as the model is not build, " "use ModelicaSystem() to build the model first") - overrideLinearFile = pathlib.Path(self.tempdir) / f'{self.modelName}_override_linear.txt' + om_cmd = ModelicaSystemCmd(runpath=self.tempdir, modelname=self.modelName, timeout=timeout) + + overrideLinearFile = self.tempdir / f'{self.modelName}_override_linear.txt' with open(overrideLinearFile, "w") as file: for key, value in self.overridevariables.items(): @@ -1059,8 +1228,7 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N for key, value in self.linearOptions.items(): file.write(f"{key}={value}\n") - override = " -overrideFile=" + overrideLinearFile.as_posix() - logger.debug(f"overwrite = {override}") + om_cmd.arg_set(key="overrideFile", val=overrideLinearFile.as_posix()) if self.inputFlag: nameVal = self.getInputs() @@ -1071,29 +1239,25 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N if l[0] < float(self.simulateOptions["startTime"]): raise ModelicaSystemError('Input time value is less than simulation startTime') self.csvFile = self.createCSVData() - csvinput = " -csvInput=" + self.csvFile.as_posix() - else: - csvinput = "" + om_cmd.arg_set(key="csvInput", val=self.csvFile.as_posix()) - # prepare the linearization runtime command - exe_file = self.get_exe_file() + om_cmd.arg_set(key="l", val=str(lintime or self.linearOptions["stopTime"])) - linruntime = f' -l={lintime or self.linearOptions["stopTime"]}' + # allow runtime simulation flags from user input + if simflags is not None: + om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) - if simflags is None: - simflags = "" - else: - simflags = " " + simflags + if simargs: + om_cmd.args_set(args=simargs) - if not exe_file.exists(): - raise ModelicaSystemError(f"Application file path not found: {exe_file}") - else: - cmd = exe_file.as_posix() + linruntime + override + csvinput + simflags - cmd = [s for s in cmd.split(' ') if s] - self._run_cmd(cmd=cmd, timeout=timeout) + returncode = om_cmd.run() + if returncode != 0: + raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") + + self.simulationFlag = True # code to get the matrix and linear inputs, outputs and states - linearFile = pathlib.Path(self.tempdir) / "linearized_model.py" + linearFile = self.tempdir / "linearized_model.py" # support older openmodelica versions before OpenModelica v1.16.2 where linearize() generates "linear_modelname.mo" file if not linearFile.exists(): diff --git a/OMPython/__init__.py b/OMPython/__init__.py index 0d4ab686..53368a03 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -37,11 +37,12 @@ """ from OMPython.OMCSession import OMCSessionCmd, OMCSessionZMQ, OMCSessionException -from OMPython.ModelicaSystem import ModelicaSystem, ModelicaSystemError, LinearizationResult +from OMPython.ModelicaSystem import ModelicaSystem, ModelicaSystemCmd, ModelicaSystemError, LinearizationResult # global names imported if import 'from OMPython import *' is used __all__ = [ 'ModelicaSystem', + 'ModelicaSystemCmd', 'ModelicaSystemError', 'LinearizationResult', diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystem.py index 145aa526..66dfd90d 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystem.py @@ -100,7 +100,7 @@ def test_customBuildDirectory(self): tmpdir = self.tmp / "tmpdir1" tmpdir.mkdir() m = OMPython.ModelicaSystem(filePath, "M", customBuildDirectory=tmpdir) - assert pathlib.Path(m.getWorkDirectory()).resolve() == tmpdir.resolve() + assert m.getWorkDirectory().resolve() == tmpdir.resolve() result_file = tmpdir / "a.mat" assert not result_file.exists() m.simulate(resultfile="a.mat") diff --git a/tests/test_ModelicaSystemCmd.py b/tests/test_ModelicaSystemCmd.py new file mode 100644 index 00000000..6257a2a6 --- /dev/null +++ b/tests/test_ModelicaSystemCmd.py @@ -0,0 +1,42 @@ +import OMPython +import pathlib +import shutil +import tempfile +import unittest + + +import logging +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + + +class ModelicaSystemCmdTester(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(ModelicaSystemCmdTester, self).__init__(*args, **kwargs) + self.tmp = pathlib.Path(tempfile.mkdtemp(prefix='tmpOMPython.tests')) + self.model = self.tmp / "M.mo" + with open(self.model, "w") as fout: + fout.write("""model M + Real x(start = 1, fixed = true); + parameter Real a = -1; +equation + der(x) = x*a; +end M; + """) + self.mod = OMPython.ModelicaSystem(self.model.as_posix(), "M") + + def __del__(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_simflags(self): + mscmd = OMPython.ModelicaSystemCmd(runpath=self.mod.tempdir, modelname=self.mod.modelName) + mscmd.args_set(args={"noEventEmit": None, "noRestart": None, "override": {'b': 2}}) + mscmd.args_set(args=mscmd.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) + + logger.info(mscmd.get_cmd()) + + assert mscmd.get_cmd() == [mscmd.get_exe().as_posix(), '-noEventEmit', '-noRestart', '-override=b=2,a=1,x=3'] + + +if __name__ == '__main__': + unittest.main()