From d29394379d7868824010c26213d1bffb173a82a7 Mon Sep 17 00:00:00 2001 From: syntron <32058823+syntron@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:32:11 +0100 Subject: [PATCH] Use logging in modelica system syntron (#228) * [ModelicaSystem] raise IOError if files are missing * [ModelicaSystem] define verbose as a class variables * this simplifies a lot of functions! * [ModelicaSystem] add class ModelicaSystemError and use it * class variable _raiseerrors * functions _check_error() & _raise_error() * [ModelicaSystem] use _check_error() * [ModelicaSystem] use _raise_error() * [ModelicaSystem] use 'raise ModelicaSystemError' * [ModelicaSystem] convert remaining print() to log messages using logger.*() * [ModelicaSystem] fix comment * [ModelicaSystem] use function setCommandLineOptions() * [ModelicaSystem] raise error if loadLibrary() fails * [ModelicaSystem] fix default argument - PyCharm: default argument value is mutable * [PyCharm:ModelicaSystem] style cleanup * [ModelicaSystem] add missing check for self-_verbose in loadFile() --- OMPython/__init__.py | 536 +++++++++++++++++++++++-------------------- 1 file changed, 286 insertions(+), 250 deletions(-) mode change 100755 => 100644 OMPython/__init__.py diff --git a/OMPython/__init__.py b/OMPython/__init__.py old mode 100755 new mode 100644 index 6e07b04..5e9ec47 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -788,8 +788,13 @@ def sendExpression(self, command, parsed=True): raise Exception("Process Exited, No connection with OMC. Create a new instance of OMCSession") +class ModelicaSystemError(Exception): + pass + + class ModelicaSystem(object): - def __init__(self, fileName=None, modelName=None, lmodel=[], useCorba=False, commandLineOptions=None, variableFilter=None, customBuildDirectory=None, verbose=True): # 1 + def __init__(self, fileName=None, modelName=None, lmodel=None, useCorba=False, commandLineOptions=None, + variableFilter=None, customBuildDirectory=None, verbose=True, raiseerrors=False): # 1 """ "constructor" It initializes to load file and build a model, generating object, exe, xml, mat, and json files. etc. It can be called : @@ -807,21 +812,24 @@ def __init__(self, fileName=None, modelName=None, lmodel=[], useCorba=False, com return self.tree = None - self.quantitiesList=[] - self.paramlist={} - self.inputlist={} - self.outputlist={} - self.continuouslist={} - self.simulateOptions={} - self.overridevariables={} - self.simoptionsoverride={} - self.linearOptions={'startTime':0.0, 'stopTime': 1.0, 'stepSize':0.002, 'tolerance':1e-8} - self.optimizeOptions={'startTime':0.0, 'stopTime': 1.0, 'numberOfIntervals':500, 'stepSize':0.002, 'tolerance':1e-8} + self.quantitiesList = [] + self.paramlist = {} + self.inputlist = {} + self.outputlist = {} + self.continuouslist = {} + self.simulateOptions = {} + self.overridevariables = {} + self.simoptionsoverride = {} + self.linearOptions = {'startTime': 0.0, 'stopTime': 1.0, 'stepSize': 0.002, 'tolerance': 1e-8} + self.optimizeOptions = {'startTime': 0.0, 'stopTime': 1.0, 'numberOfIntervals': 500, 'stepSize': 0.002, + 'tolerance': 1e-8} self.linearinputs = [] # linearization input list self.linearoutputs = [] # linearization output list self.linearstates = [] # linearization states list self.tempdir = "" + self._verbose = verbose + if useCorba: self.getconn = OMCSession() else: @@ -832,9 +840,10 @@ def __init__(self, fileName=None, modelName=None, lmodel=[], useCorba=False, com self._omc_process = self.getconn._omc_process ## set commandLineOptions if provided by users - if commandLineOptions is not None: - exp="".join(["setCommandLineOptions(","\"",commandLineOptions,"\"",")"]) - self.getconn.sendExpression(exp) + self.setCommandLineOptions(commandLineOptions=commandLineOptions) + + if lmodel is None: + lmodel = [] self.xmlFile = None self.lmodel = lmodel # may be needed if model is derived from other model @@ -844,12 +853,13 @@ def __init__(self, fileName=None, modelName=None, lmodel=[], useCorba=False, com self.simulationFlag = False # if the model is simulated? self.outputFlag = False self.csvFile = '' # for storing inputs condition - self.resultfile="" # for storing result file + self.resultfile = "" # for storing result file self.variableFilter = variableFilter - if fileName is not None and not os.path.exists(self.fileName): # if file does not eixt - print("File Error:" + os.path.abspath(self.fileName) + " does not exist!!!") - return + self._raiseerrors = raiseerrors + + if fileName is not None and not os.path.exists(self.fileName): # if file does not exist + raise IOError("File Error:" + os.path.abspath(self.fileName) + " does not exist!!!") ## set default command Line Options for linearization as ## linearize() will use the simulation executable and runtime @@ -860,36 +870,36 @@ def __init__(self, fileName=None, modelName=None, lmodel=[], useCorba=False, com self.setTempDirectory(customBuildDirectory) if fileName is not None: - self.loadLibrary(verbose) - self.loadFile(verbose) + self.loadLibrary() + self.loadFile() ## allow directly loading models from MSL without fileName if fileName is None and modelName is not None: - self.loadLibrary(verbose) + self.loadLibrary() - self.buildModel(variableFilter, verbose) + self.buildModel(variableFilter) def __del__(self): OMCSessionBase.__del__(self) - def setCommandLineOptions(self): + def setCommandLineOptions(self, commandLineOptions: str): ## set commandLineOptions if provided by users if commandLineOptions is not None: - exp="".join(["setCommandLineOptions(","\"",commandLineOptions,"\"",")"]) + exp = "".join(["setCommandLineOptions(", "\"", commandLineOptions, "\"", ")"]) cmdexp = self.getconn.sendExpression(exp) if not cmdexp: - return print(self.getconn.sendExpression("getErrorString()")) + self._check_error() - def loadFile(self, verbose): + def loadFile(self): # load file - loadFileExp="".join(["loadFile(","\"",self.fileName,"\"",")"]).replace("\\","/") + loadFileExp = "".join(["loadFile(", "\"", self.fileName, "\"", ")"]).replace("\\", "/") loadMsg = self.getconn.sendExpression(loadFileExp) ## Show notification or warnings to the user when verbose=True OR if some error occurred i.e., not result - if verbose or not loadMsg: - return print(self.getconn.sendExpression("getErrorString()")) + if self._verbose or not loadMsg: + self._check_error() # for loading file/package, loading model and building model - def loadLibrary(self, verbose): + def loadLibrary(self): # load Modelica standard libraries or Modelica files if needed for element in self.lmodel: if element is not None: @@ -906,29 +916,33 @@ def loadLibrary(self, verbose): libname = "".join(["loadModel(", element[0], ", ", "{", "\"", element[1], "\"", "}", ")"]) result = self.sendExpression(libname) else: - print("| info | loadLibrary() failed, Unknown type detected: ", element , " is of type ", type(element), ", The following patterns are supported\n1)[\"Modelica\"]\n2)[(\"Modelica\",\"3.2.3\"), \"PowerSystems\"]\n") + raise ModelicaSystemError("loadLibrary() failed, Unknown type detected: " + + "{} is of type {}, ".format(element, type(element)) + + "The following patterns are supported:\n" + + "1)[\"Modelica\"]\n" + + "2)[(\"Modelica\",\"3.2.3\"), \"PowerSystems\"]\n") ## Show notification or warnings to the user when verbose=True OR if some error occurred i.e., not result - if verbose or not result: - print(self.requestApi('getErrorString')) + if self._verbose or not result: + self._check_error() def setTempDirectory(self, customBuildDirectory): # create a unique temp directory for each session and build the model in that directory if customBuildDirectory is not None: if not os.path.exists(customBuildDirectory): - print(customBuildDirectory, " does not exist") + raise IOError(customBuildDirectory, " does not exist") self.tempdir = customBuildDirectory else: self.tempdir = tempfile.mkdtemp() if not os.path.exists(self.tempdir): - print(self.tempdir, " cannot be created") + raise IOError(self.tempdir, " cannot be created") - exp="".join(["cd(","\"",self.tempdir,"\"",")"]).replace("\\","/") + exp = "".join(["cd(", "\"", self.tempdir, "\"", ")"]).replace("\\", "/") self.getconn.sendExpression(exp) def getWorkDirectory(self): return self.tempdir - def _run_cmd(self, cmd: list, verbose: bool = True): + def _run_cmd(self, cmd: list): logger.debug("Run OM command {} in {}".format(cmd, self.tempdir)) if platform.system() == "Windows": @@ -960,41 +974,49 @@ def _run_cmd(self, cmd: list, verbose: bool = True): stdout = stdout.decode('ascii').strip() stderr = stderr.decode('ascii').strip() if stderr: - logger.warning("OM error: {}".format(stderr)) - if verbose and stdout: - logger.info("OM output:\n{}".format(stdout)) + raise ModelicaSystemError("Error running command {}: {}".format(cmd, stderr)) + if self._verbose and stdout: + logger.info("OM output for command {}:\n{}".format(cmd, stdout)) p.wait() p.terminate() os.chdir(currentDir) except Exception as e: os.chdir(currentDir) - raise Exception("Error running command {}: {}".format(repr(cmd), e)) + raise ModelicaSystemError("Exception {} running command {}: {}".format(type(e), cmd, e)) - def buildModel(self, variableFilter=None, verbose=True): + def _check_error(self): + errstr = self.getconn.sendExpression("getErrorString()") + if errstr is None or not errstr: + return + + self._raise_error(errstr=errstr) + + def _raise_error(self, errstr: str): + if self._raiseerrors: + raise ModelicaSystemError("OM error: {}".format(errstr)) + else: + logger.error(errstr) + + def buildModel(self, variableFilter=None): if variableFilter is not None: self.variableFilter = variableFilter if self.variableFilter is not None: varFilter = "variableFilter=" + "\"" + self.variableFilter + "\"" else: - varFilter = "variableFilter=" + "\".*""\"" - # print(varFilter) + varFilter = "variableFilter=" + "\".*""\"" + logger.debug(varFilter) # buildModelResult=self.getconn.sendExpression("buildModel("+ mName +")") buildModelResult = self.requestApi("buildModel", self.modelName, properties=varFilter) - buildModelError = self.requestApi("getErrorString") - - if ('' in buildModelResult): - print(buildModelError) + if self._verbose: + logger.info("OM model build result: {}".format(buildModelResult)) + self._check_error() - # Issue #145. Always print the getErrorString since it might contains build warnings. - if verbose: - print(buildModelError) - - self.xmlFile=os.path.join(os.path.dirname(buildModelResult[0]),buildModelResult[1]).replace("\\","/") + self.xmlFile = os.path.join(os.path.dirname(buildModelResult[0]), buildModelResult[1]).replace("\\", "/") self.xmlparse() - def sendExpression(self,expr,parsed=True): - return self.getconn.sendExpression(expr,parsed) + def sendExpression(self, expr, parsed=True): + return self.getconn.sendExpression(expr, parsed) # request to OMC def requestApi(self, apiName, entity=None, properties=None): # 2 @@ -1010,18 +1032,18 @@ def requestApi(self, apiName, entity=None, properties=None): # 2 try: res = self.getconn.sendExpression(exp) except Exception as e: - print(e) + errstr = "Exception {} raised: {}".format(type(e), e) + self._raise_error(errstr=errstr) res = None return res - def xmlparse(self): - if(os.path.exists(self.xmlFile)): + if (os.path.exists(self.xmlFile)): self.tree = ET.parse(self.xmlFile) self.root = self.tree.getroot() rootCQ = self.root for attr in rootCQ.iter('DefaultExperiment'): - self.simulateOptions["startTime"]= attr.get('startTime') + self.simulateOptions["startTime"] = attr.get('startTime') self.simulateOptions["stopTime"] = attr.get('stopTime') self.simulateOptions["stepSize"] = attr.get('stepSize') self.simulateOptions["tolerance"] = attr.get('tolerance') @@ -1029,7 +1051,7 @@ def xmlparse(self): self.simulateOptions["outputFormat"] = attr.get('outputFormat') for sv in rootCQ.iter('ScalarVariable'): - scalar={} + scalar = {} scalar["name"] = sv.get('name') scalar["changeable"] = sv.get('isValueChangeable') scalar["description"] = sv.get('description') @@ -1047,28 +1069,27 @@ def xmlparse(self): min = att.get('min') max = att.get('max') unit = att.get('unit') - scalar["start"] =start + scalar["start"] = start scalar["min"] = min scalar["max"] = max scalar["unit"] = unit - if(scalar["variability"]=="parameter"): + if (scalar["variability"] == "parameter"): if scalar["name"] in self.overridevariables: self.paramlist[scalar["name"]] = self.overridevariables[scalar["name"]] else: self.paramlist[scalar["name"]] = scalar["start"] - if(scalar["variability"]=="continuous"): - self.continuouslist[scalar["name"]]=scalar["start"] - if(scalar["causality"]=="input"): - self.inputlist[scalar["name"]]=scalar["start"] - if(scalar["causality"]=="output"): - self.outputlist[scalar["name"]]=scalar["start"] + if (scalar["variability"] == "continuous"): + self.continuouslist[scalar["name"]] = scalar["start"] + if (scalar["causality"] == "input"): + self.inputlist[scalar["name"]] = scalar["start"] + if (scalar["causality"] == "output"): + self.outputlist[scalar["name"]] = scalar["start"] self.quantitiesList.append(scalar) else: - print("Error: ! XML file not generated: " + self.xmlFile) - return - + errstr = "XML file not generated: " + self.xmlFile + self._raise_error(errstr=errstr) def getQuantities(self, names=None): # 3 """ @@ -1078,13 +1099,12 @@ def getQuantities(self, names=None): # 3 >>> getQuantities("Name1") >>> getQuantities(["Name1","Name2"]) """ - if(names==None): + if (names == None): return self.quantitiesList - elif(isinstance(names, str)): + elif (isinstance(names, str)): return [x for x in self.quantitiesList if x["name"] == names] elif isinstance(names, list): - return [x for y in names for x in self.quantitiesList if x["name"]==y] - + return [x for y in names for x in self.quantitiesList if x["name"] == y] def getContinuous(self, names=None): # 4 """ @@ -1095,39 +1115,39 @@ def getContinuous(self, names=None): # 4 >>> getContinuous(["Name1","Name2"]) """ if not self.simulationFlag: - if(names==None): + if names is None: return self.continuouslist - elif(isinstance(names, str)): - return [self.continuouslist.get(names ,"NotExist")] - elif(isinstance(names, list)): - return ([self.continuouslist.get(x ,"NotExist") for x in names]) + elif isinstance(names, str): + return [self.continuouslist.get(names, "NotExist")] + elif isinstance(names, list): + return [self.continuouslist.get(x, "NotExist") for x in names] else: - if(names==None): + if names is None: for i in self.continuouslist: try: value = self.getSolutions(i) - self.continuouslist[i]=value[0][-1] + self.continuouslist[i] = value[0][-1] except Exception: - print(i,"could not be computed") + raise ModelicaSystemError("OM error: {} could not be computed".format(i)) return self.continuouslist - elif(isinstance(names, str)): + elif (isinstance(names, str)): if names in self.continuouslist: value = self.getSolutions(names) - self.continuouslist[names]=value[0][-1] + self.continuouslist[names] = value[0][-1] return [self.continuouslist.get(names)] else: - return (names, " is not continuous") + raise ModelicaSystemError("OM error: {} is not continuous".format(names)) - elif(isinstance(names, list)): - valuelist=[] + elif (isinstance(names, list)): + valuelist = [] for i in names: if i in self.continuouslist: - value=self.getSolutions(i) - self.continuouslist[i]=value[0][-1] + value = self.getSolutions(i) + self.continuouslist[i] = value[0][-1] valuelist.append(value[0][-1]) else: - return (i," is not continuous") + raise ModelicaSystemError("OM error: {} is not continuous".format(i)) return valuelist def getParameters(self, names=None): # 5 @@ -1139,12 +1159,12 @@ def getParameters(self, names=None): # 5 >>> getParameters("Name1") >>> getParameters(["Name1","Name2"]) """ - if(names==None): + if (names == None): return self.paramlist - elif(isinstance(names, str)): - return [self.paramlist.get(names,"NotExist")] - elif(isinstance(names, list)): - return ([self.paramlist.get(x,"NotExist") for x in names]) + elif (isinstance(names, str)): + return [self.paramlist.get(names, "NotExist")] + elif (isinstance(names, list)): + return ([self.paramlist.get(x, "NotExist") for x in names]) def getlinearParameters(self, names=None): # 5 """ @@ -1152,12 +1172,12 @@ def getlinearParameters(self, names=None): # 5 If *name is None then the function will return dict which contain all parameter names as key and value as corresponding values. eg., getParameters() Otherwise variable number of arguments can be passed as parameter name in string format separated by commas. eg., getParameters('paraName1', 'paraName2') """ - if(names==0): + if (names == 0): return self.linearparameters - elif(isinstance(names, str)): - return [self.linearparameters.get(names,"NotExist")] + elif (isinstance(names, str)): + return [self.linearparameters.get(names, "NotExist")] else: - return ([self.linearparameters.get(x,"NotExist") for x in names]) + return ([self.linearparameters.get(x, "NotExist") for x in names]) def getInputs(self, names=None): # 6 """ @@ -1165,12 +1185,12 @@ def getInputs(self, names=None): # 6 If *name is None then the function will return dict which contain all input names as key and value as corresponding values. eg., getInputs() Otherwise variable number of arguments can be passed as input name in string format separated by commas. eg., getInputs('iName1', 'iName2') """ - if(names==None): + if (names == None): return self.inputlist - elif(isinstance(names, str)): - return [self.inputlist.get(names,"NotExist")] - elif(isinstance(names, list)): - return ([self.inputlist.get(x,"NotExist") for x in names]) + elif (isinstance(names, str)): + return [self.inputlist.get(names, "NotExist")] + elif (isinstance(names, list)): + return ([self.inputlist.get(x, "NotExist") for x in names]) def getOutputs(self, names=None): # 7 """ @@ -1182,31 +1202,31 @@ def getOutputs(self, names=None): # 7 >>> getOutputs(["Name1","Name2"]) """ if not self.simulationFlag: - if(names==None): + if (names == None): return self.outputlist - elif(isinstance(names, str)): - return [self.outputlist.get(names,"NotExist")] + elif (isinstance(names, str)): + return [self.outputlist.get(names, "NotExist")] else: - return ([self.outputlist.get(x,"NotExist") for x in names]) + return ([self.outputlist.get(x, "NotExist") for x in names]) else: - if (names== None): + if (names == None): for i in self.outputlist: value = self.getSolutions(i) - self.outputlist[i]=value[0][-1] + self.outputlist[i] = value[0][-1] return self.outputlist - elif(isinstance(names, str)): - if names in self.outputlist: - value = self.getSolutions(names) - self.outputlist[names]=value[0][-1] - return [self.outputlist.get(names)] - else: - return (names, " is not Output") - elif(isinstance(names, list)): - valuelist=[] + elif (isinstance(names, str)): + if names in self.outputlist: + value = self.getSolutions(names) + self.outputlist[names] = value[0][-1] + return [self.outputlist.get(names)] + else: + return (names, " is not Output") + elif (isinstance(names, list)): + valuelist = [] for i in names: if i in self.outputlist: - value=self.getSolutions(i) - self.outputlist[i]=value[0][-1] + value = self.getSolutions(i) + self.outputlist[i] = value[0][-1] valuelist.append(value[0][-1]) else: return (i, "is not Output") @@ -1221,12 +1241,12 @@ def getSimulationOptions(self, names=None): # 8 >>> getSimulationOptions("Name1") >>> getSimulationOptions(["Name1","Name2"]) """ - if(names==None): + if (names == None): return self.simulateOptions - elif(isinstance(names, str)): - return [self.simulateOptions.get(names,"NotExist")] - elif(isinstance(names, list)): - return ([self.simulateOptions.get(x,"NotExist") for x in names]) + elif (isinstance(names, str)): + return [self.simulateOptions.get(names, "NotExist")] + elif (isinstance(names, list)): + return ([self.simulateOptions.get(x, "NotExist") for x in names]) def getLinearizationOptions(self, names=None): # 9 """ @@ -1237,12 +1257,12 @@ def getLinearizationOptions(self, names=None): # 9 >>> getLinearizationOptions("Name1") >>> getLinearizationOptions(["Name1","Name2"]) """ - if(names==None): + if (names == None): return self.linearOptions - elif(isinstance(names, str)): - return [self.linearOptions.get(names,"NotExist")] - elif(isinstance(names, list)): - return ([self.linearOptions.get(x,"NotExist") for x in names]) + elif (isinstance(names, str)): + return [self.linearOptions.get(names, "NotExist")] + elif (isinstance(names, list)): + return ([self.linearOptions.get(x, "NotExist") for x in names]) def getOptimizationOptions(self, names=None): # 10 """ @@ -1251,42 +1271,43 @@ def getOptimizationOptions(self, names=None): # 10 >>> getOptimizationOptions("Name1") >>> getOptimizationOptions(["Name1","Name2"]) """ - if(names==None): + if (names == None): return self.optimizeOptions - elif(isinstance(names, str)): - return [self.optimizeOptions.get(names,"NotExist")] - elif(isinstance(names, list)): - return ([self.optimizeOptions.get(x,"NotExist") for x in names]) + elif (isinstance(names, str)): + return [self.optimizeOptions.get(names, "NotExist")] + elif (isinstance(names, list)): + return ([self.optimizeOptions.get(x, "NotExist") for x in names]) # to simulate or re-simulate model - def simulate(self, resultfile=None, simflags=None, verbose=True): # 11 + def simulate(self, resultfile=None, simflags=None): # 11 """ This method simulates model according to the simulation options. usage >>> simulate() >>> simulate(resultfile="a.mat") - >>> simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10) set runtime simulation flags + >>> simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") # set runtime simulation flags """ - if(resultfile is None): - r="" + if (resultfile is None): + r = "" self.resultfile = os.path.join(self.tempdir, self.modelName + "_res.mat").replace("\\", "/") else: if os.path.exists(resultfile): - r=" -r=" + resultfile + r = " -r=" + resultfile self.resultfile = resultfile else: - r=" -r=" + os.path.join(self.tempdir, resultfile).replace("\\", "/") + r = " -r=" + os.path.join(self.tempdir, resultfile).replace("\\", "/") self.resultfile = os.path.join(self.tempdir, resultfile).replace("\\", "/") # allow runtime simulation flags from user input - if(simflags is None): - simflags="" + if (simflags is None): + simflags = "" else: - simflags=" " + simflags + simflags = " " + simflags - overrideFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName + "_override", "txt")).replace("\\", "/") + overrideFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName + "_override", "txt")).replace("\\", + "/") if (self.overridevariables or self.simoptionsoverride): - tmpdict=self.overridevariables.copy() + tmpdict = self.overridevariables.copy() tmpdict.update(self.simoptionsoverride) # write to override file file = open(overrideFile, "w") @@ -1294,29 +1315,34 @@ def simulate(self, resultfile=None, simflags=None, verbose=True): # 11 name = key + "=" + value + "\n" file.write(name) file.close() - override =" -overrideFile=" + overrideFile + override = " -overrideFile=" + overrideFile else: - override ="" + override = "" if (self.inputFlag): # if model has input quantities for i in self.inputlist: - val=self.inputlist[i] - if(val==None): - val=[(float(self.simulateOptions["startTime"]), 0.0), (float(self.simulateOptions["stopTime"]), 0.0)] - self.inputlist[i]=[(float(self.simulateOptions["startTime"]), 0.0), (float(self.simulateOptions["stopTime"]), 0.0)] + val = self.inputlist[i] + if (val == None): + val = [(float(self.simulateOptions["startTime"]), 0.0), + (float(self.simulateOptions["stopTime"]), 0.0)] + self.inputlist[i] = [(float(self.simulateOptions["startTime"]), 0.0), + (float(self.simulateOptions["stopTime"]), 0.0)] if float(self.simulateOptions["startTime"]) != val[0][0]: - print("!!! startTime not matched for Input ",i) + errstr = "!!! startTime not matched for Input {}".format(i) + self._raise_error(errstr=errstr) return if float(self.simulateOptions["stopTime"]) != val[-1][0]: - print("!!! stopTime not matched for Input ",i) + errstr = "!!! stopTime not matched for Input {}".format(i) + self._raise_error(errstr=errstr) return if val[0][0] < float(self.simulateOptions["startTime"]): - print('Input time value is less than simulation startTime for inputs', i) + errstr = "Input time value is less than simulation startTime for inputs {}".format(i) + self._raise_error(errstr=errstr) return self.createCSVData() # create csv file - csvinput=" -csvInput=" + self.csvFile + csvinput = " -csvInput=" + self.csvFile else: - csvinput="" + csvinput = "" if (platform.system() == "Windows"): getExeFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName, "exe")).replace("\\", "/") @@ -1326,7 +1352,7 @@ def simulate(self, resultfile=None, simflags=None, verbose=True): # 11 if os.path.exists(getExeFile): cmd = getExeFile + override + csvinput + r + simflags cmd = cmd.split(" ") - self._run_cmd(cmd=cmd, verbose=verbose) + self._run_cmd(cmd=cmd) self.simulationFlag = True else: @@ -1352,17 +1378,19 @@ def getSolutions(self, varList=None, resultfile=None): # 12 # check for result file exits if (not os.path.exists(resFile)): - print("Error: Result file does not exist " + resFile) + errstr = "Error: Result file does not exist {}".format(resFile) + self._raise_error(errstr=errstr) return - #exit() + # exit() else: resultVars = self.getconn.sendExpression("readSimulationResultVars(\"" + resFile + "\")") self.getconn.sendExpression("closeSimulationResultFile()") if (varList == None): return resultVars - elif (isinstance(varList,str)): - if (varList not in resultVars and varList!="time"): - print('!!! ', varList, ' does not exist\n') + elif (isinstance(varList, str)): + if (varList not in resultVars and varList != "time"): + errstr = '!!! ' + varList + ' does not exist' + self._raise_error(errstr=errstr) return exp = "readSimulationResult(\"" + resFile + '",{' + varList + "})" res = self.getconn.sendExpression(exp) @@ -1371,12 +1399,13 @@ def getSolutions(self, varList=None, resultfile=None): # 12 self.getconn.sendExpression(exp2) return npRes elif (isinstance(varList, list)): - #varList, = varList + # varList, = varList for v in varList: if v == "time": continue if v not in resultVars: - print('!!! ', v, ' does not exist\n') + errstr = '!!! ' + v + ' does not exist' + self._raise_error(errstr=errstr) return variables = ",".join(varList) exp = "readSimulationResult(\"" + resFile + '",{' + variables + "})" @@ -1386,13 +1415,13 @@ def getSolutions(self, varList=None, resultfile=None): # 12 self.getconn.sendExpression(exp2) return npRes - def strip_space(self,name): - if(isinstance(name,str)): - return name.replace(" ","") - elif(isinstance(name,list)): - return [x.replace(" ","") for x in name] + def strip_space(self, name): + if (isinstance(name, str)): + return name.replace(" ", "") + elif (isinstance(name, list)): + return [x.replace(" ", "") for x in name] - def setMethodHelper(self,args1,args2,args3,args4=None,verbose=None): + def setMethodHelper(self, args1, args2, args3, args4=None): """ Helper function for setParameter(),setContinuous(),setSimulationOptions(),setLinearizationOption(),setOptimizationOption() args1 - string or list of string given by user @@ -1401,31 +1430,31 @@ def setMethodHelper(self,args1,args2,args3,args4=None,verbose=None): args4 - dict() which stores the new override variables list, """ def apply_single(args1): - args1=self.strip_space(args1) - value=args1.split("=") + args1 = self.strip_space(args1) + value = args1.split("=") if value[0] in args2: - if (args3 == "parameter" and self.isParameterChangeable(value[0], value[1], verbose)): - args2[value[0]]=value[1] - if(args4!=None): - args4[value[0]]=value[1] + if (args3 == "parameter" and self.isParameterChangeable(value[0], value[1])): + args2[value[0]] = value[1] + if (args4 != None): + args4[value[0]] = value[1] elif (args3 != "parameter"): - args2[value[0]]=value[1] - if(args4!=None): - args4[value[0]]=value[1] + args2[value[0]] = value[1] + if (args4 != None): + args4[value[0]] = value[1] return True else: - print("\"" + value[0] + "\"" + " is not a " + args3 + " variable") - return False + errstr = "\"" + value[0] + "\"" + " is not a" + args3 + " variable" + self._raise_error(errstr=errstr) result = [] if (isinstance(args1, str)): result = [apply_single(args1)] - elif(isinstance(args1,list)): + elif (isinstance(args1, list)): result = [] - args1=self.strip_space(args1) + args1 = self.strip_space(args1) for var in args1: result.append(apply_single(var)) @@ -1439,9 +1468,9 @@ def setContinuous(self, cvals): # 13 >>> setContinuous("Name=value") >>> setContinuous(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(cvals,self.continuouslist,"continuous",self.overridevariables) + return self.setMethodHelper(cvals, self.continuouslist, "continuous", self.overridevariables) - def setParameters(self, pvals, verbose=True): # 14 + def setParameters(self, pvals): # 14 """ This method is used to set parameter values. It can be called: with a sequence of parameter name and assigning corresponding value as arguments as show in the example below: @@ -1449,13 +1478,17 @@ def setParameters(self, pvals, verbose=True): # 14 >>> setParameters("Name=value") >>> setParameters(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(pvals,self.paramlist,"parameter",self.overridevariables, verbose) + return self.setMethodHelper(pvals, self.paramlist, "parameter", self.overridevariables) - def isParameterChangeable(self, name, value, verbose): + def isParameterChangeable(self, name, value): q = self.getQuantities(name) if (q[0]["changeable"] == "false"): - if verbose: - print("| info | setParameters() failed : It is not possible to set the following signal " + "\"" + name + "\"" + ", It seems to be structural, final, protected or evaluated or has a non-constant binding, use sendExpression(setParameterValue("+ self.modelName + ", " + name + ", " + value + "), parsed=false)" + " and rebuild the model using buildModel() API") + if self._verbose: + logger.info("setParameters() failed : It is not possible to set " + + "the following signal \"{}\", ".format(name) + "It seems to be structural, final, " + + "protected or evaluated or has a non-constant binding, use sendExpression(" + + "setParameterValue({}, {}, {}), ".format(self.modelName, name, value) + + "parsed=false) and rebuild the model using buildModel() API") return False return True @@ -1467,7 +1500,7 @@ def setSimulationOptions(self, simOptions): # 16 >>> setSimulationOptions("Name=value") >>> setSimulationOptions(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(simOptions,self.simulateOptions,"simulation-option",self.simoptionsoverride) + return self.setMethodHelper(simOptions, self.simulateOptions, "simulation-option", self.simoptionsoverride) def setLinearizationOptions(self, linearizationOptions): # 18 """ @@ -1477,7 +1510,7 @@ def setLinearizationOptions(self, linearizationOptions): # 18 >>> setLinearizationOptions("Name=value") >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(linearizationOptions,self.linearOptions,"Linearization-option",None) + return self.setMethodHelper(linearizationOptions, self.linearOptions, "Linearization-option", None) def setOptimizationOptions(self, optimizationOptions): # 17 """ @@ -1487,7 +1520,7 @@ def setOptimizationOptions(self, optimizationOptions): # 17 >>> setOptimizationOptions("Name=value") >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(optimizationOptions,self.optimizeOptions,"optimization-option",None) + return self.setMethodHelper(optimizationOptions, self.optimizeOptions, "optimization-option", None) def setInputs(self, name): # 15 """ @@ -1497,50 +1530,50 @@ def setInputs(self, name): # 15 >>> setInputs("Name=value") >>> setInputs(["Name1=value1","Name2=value2"]) """ - if (isinstance(name,str)): - name=self.strip_space(name) - value=name.split("=") + if (isinstance(name, str)): + name = self.strip_space(name) + value = name.split("=") if value[0] in self.inputlist: - tmpvalue=eval(value[1]) - if(isinstance(tmpvalue,int) or isinstance(tmpvalue, float)): - self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), (float(self.simulateOptions["stopTime"]), float(value[1]))] - elif(isinstance(tmpvalue,list)): + tmpvalue = eval(value[1]) + if (isinstance(tmpvalue, int) or isinstance(tmpvalue, float)): + self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), + (float(self.simulateOptions["stopTime"]), float(value[1]))] + elif (isinstance(tmpvalue, list)): self.checkValidInputs(tmpvalue) self.inputlist[value[0]] = tmpvalue - self.inputFlag=True + self.inputFlag = True else: - print(value[0], "!is not an input") - elif (isinstance(name,list)): - name=self.strip_space(name) + errstr = value[0] + " is not an input" + self._raise_error(errstr=errstr) + elif (isinstance(name, list)): + name = self.strip_space(name) for var in name: - value=var.split("=") + value = var.split("=") if value[0] in self.inputlist: - tmpvalue=eval(value[1]) - if(isinstance(tmpvalue,int) or isinstance(tmpvalue, float)): - self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), (float(self.simulateOptions["stopTime"]), float(value[1]))] - elif(isinstance(tmpvalue,list)): + tmpvalue = eval(value[1]) + if (isinstance(tmpvalue, int) or isinstance(tmpvalue, float)): + self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), + (float(self.simulateOptions["stopTime"]), float(value[1]))] + elif (isinstance(tmpvalue, list)): self.checkValidInputs(tmpvalue) self.inputlist[value[0]] = tmpvalue - self.inputFlag=True + self.inputFlag = True else: - print(value[0], "!is not an input") + errstr = value[0] + " is not an input" + self._raise_error(errstr=errstr) - def checkValidInputs(self,name): + def checkValidInputs(self, name): if name != sorted(name, key=lambda x: x[0]): - print('Time value should be in increasing order') - return + raise ModelicaSystemError('Time value should be in increasing order') for l in name: if isinstance(l, tuple): - #if l[0] < float(self.simValuesList[0]): + # if l[0] < float(self.simValuesList[0]): if l[0] < float(self.simulateOptions["startTime"]): - print('Input time value is less than simulation startTime') - return + ModelicaSystemError('Input time value is less than simulation startTime') if len(l) != 2: - print('Value for ' + l + ' is in incorrect format!') - return + ModelicaSystemError('Value for ' + l + ' is in incorrect format!') else: - print('Error!!! Value must be in tuple format') - return + ModelicaSystemError('Error!!! Value must be in tuple format') # To create csv file for inputs def createCSVData(self): @@ -1551,7 +1584,8 @@ def createCSVData(self): tmpinputlist = {} for (key, value) in self.inputlist.items(): if (value is None): - tmpinputlist[key] = [(float(self.simulateOptions["startTime"]), 0.0),(float(self.simulateOptions["stopTime"]), 0.0)] + tmpinputlist[key] = [(float(self.simulateOptions["startTime"]), 0.0), + (float(self.simulateOptions["stopTime"]), 0.0)] else: tmpinputlist[key] = value @@ -1633,15 +1667,16 @@ def createCSVData(self): interpolated_inputs_all.append(templist) name_ = 'time' - #name = ','.join(self.__getInputNames()) - name=','.join(list(self.inputlist.keys())) + # name = ','.join(self.__getInputNames()) + name = ','.join(list(self.inputlist.keys())) name = '{},{},{}'.format(name_, name, 'end') a = '' l = [] l.append(name) for i in range(0, len(sl)): - a = ("%s,%s" % (str(float(sl[i])), ",".join(list(str(float(inppp[i])) for inppp in interpolated_inputs_all)))) + ',0' + a = ("%s,%s" % (str(float(sl[i])), ",".join(list(str(float(inppp[i])) \ + for inppp in interpolated_inputs_all)))) + ',0' l.append(a) self.csvFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName, "csv")).replace("\\", "/") @@ -1662,17 +1697,19 @@ def convertMo2Fmu(self, version="2.0", fmuType="me_cs", fileNamePrefix="": - fileNamePrefix = self.modelName + fileNamePrefix = self.modelName if includeResources: - includeResourcesStr = "true" + includeResourcesStr = "true" else: - includeResourcesStr = "false" - properties = 'version="{}", fmuType="{}", fileNamePrefix="{}", includeResources={}'.format(version, fmuType, fileNamePrefix,includeResourcesStr) + includeResourcesStr = "false" + properties = 'version="{}", fmuType="{}", fileNamePrefix="{}", includeResources={}'.format(version, fmuType, + fileNamePrefix, + includeResourcesStr) fmu = self.requestApi('buildModelFMU', self.modelName, properties) ## report proper error message if not os.path.exists(fmu): - return print(self.getconn.sendExpression("getErrorString()")) + self._check_error() return fmu @@ -1689,7 +1726,7 @@ def convertFmu2Mo(self, fmuName): # 20 ## report proper error message if not os.path.exists(fileName): - return print(self.getconn.sendExpression("getErrorString()")) + self._check_error() return fileName @@ -1706,14 +1743,12 @@ def optimize(self): # 21 optimizeError = '' self.getconn.sendExpression("setCommandLineOptions(\"-g=Optimica\")") optimizeResult = self.requestApi('optimize', cName, properties) - optimizeError = self.requestApi('getErrorString') - if optimizeError: - print(optimizeError) + self._check_error() return optimizeResult # to linearize model - def linearize(self, lintime = None, simflags= None): # 22 + def linearize(self, lintime=None, simflags=None): # 22 """ This method linearizes model according to the linearized options. This will generate a linear model that consists of matrices A, B, C and D. It can be called: only without any arguments @@ -1722,9 +1757,11 @@ def linearize(self, lintime = None, simflags= None): # 22 """ if self.xmlFile is None: - return print("Linearization cannot be performed as the model is not build, use ModelicaSystem() to build the model first") + raise IOError("Linearization cannot be performed as the model is not build, " + "use ModelicaSystem() to build the model first") - overrideLinearFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName + "_override_linear", "txt")).replace("\\", "/") + overrideLinearFile = os.path.join(self.tempdir, + '{}.{}'.format(self.modelName + "_override_linear", "txt")).replace("\\", "/") file = open(overrideLinearFile, "w") for (key, value) in self.overridevariables.items(): @@ -1735,8 +1772,8 @@ def linearize(self, lintime = None, simflags= None): # 22 file.write(name) file.close() - override =" -overrideFile=" + overrideLinearFile - # print(override) + override = " -overrideFile=" + overrideLinearFile + logger.debug(f"overwrite = {override}") if self.inputFlag: nameVal = self.getInputs() @@ -1745,12 +1782,11 @@ def linearize(self, lintime = None, simflags= None): # 22 if tupleList is not None: for l in tupleList: if l[0] < float(self.simulateOptions["startTime"]): - print('Input time value is less than simulation startTime') - return + raise ModelicaSystemError('Input time value is less than simulation startTime') self.createCSVData() - csvinput =" -csvInput=" + self.csvFile + csvinput = " -csvInput=" + self.csvFile else: - csvinput="" + csvinput = "" ## prepare the linearization runtime command if (platform.system() == "Windows"): @@ -1771,10 +1807,10 @@ def linearize(self, lintime = None, simflags= None): # 22 cmd = cmd.split(' ') self._run_cmd(cmd=cmd) else: - raise Exception("Error: Application file path not found: " + getExeFile) + raise Exception("Error: Application file path not found: " + getExeFile) # code to get the matrix and linear inputs, outputs and states - linearFile = os.path.join(self.tempdir, "linearized_model.py").replace("\\","/") + linearFile = os.path.join(self.tempdir, "linearized_model.py").replace("\\", "/") # support older openmodelica versions before OpenModelica v1.16.2 where linearize() generates "linear_modelname.mo" file if not os.path.exists(linearFile): @@ -1793,12 +1829,11 @@ def linearize(self, lintime = None, simflags= None): # 22 self.linearoutputs = outputVars self.linearstates = stateVars return [A, B, C, D] - except: + except ModuleNotFoundError: raise Exception("ModuleNotFoundError: No module named 'linearized_model'") else: errormsg = self.getconn.sendExpression("getErrorString()") - return print("Linearization failed: ", "\"" , linearFile,"\"" ," not found \n", errormsg) - + raise ModelicaSystemError("Linearization failed: {} not found: {}".format(repr(linearFile), errormsg)) def getLinearInputs(self): """ @@ -1824,6 +1859,7 @@ def getLinearStates(self): """ return self.linearstates + def FindBestOMCSession(*args, **kwargs): """ Analyzes the OMC executable version string to find a suitable selection