From 4a89c5659fc8af75c94caa9bea0dd398fd322f5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sluka?= Date: Thu, 17 Apr 2025 19:48:06 +0200 Subject: [PATCH] Refactor createCSVData I found the old code really hard to understand, so I decided to rewrite it from scratch. It should behave exactly the same (and the tests in test_ModelicaSystem prove that). --- OMPython/ModelicaSystem.py | 148 +++++++++++-------------------------- 1 file changed, 44 insertions(+), 104 deletions(-) diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index fae397ea7..1227732f4 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -801,112 +801,52 @@ def checkValidInputs(self, name): else: ModelicaSystemError('Error!!! Value must be in tuple format') - # To create csv file for inputs - def createCSVData(self): - sl = [] # Actual timestamps - skip = False - - # check for NONE in input list and replace with proper data (e.g) [(startTime, 0.0), (stopTime, 0.0)] - tmpinputlist = {} - for key, value in self.inputlist.items(): - if value is None: - tmpinputlist[key] = [(float(self.simulateOptions["startTime"]), 0.0), - (float(self.simulateOptions["stopTime"]), 0.0)] + def createCSVData(self) -> None: + start_time: float = float(self.simulateOptions["startTime"]) + stop_time: float = float(self.simulateOptions["stopTime"]) + + # Replace None inputs with a default constant zero signal + inputs: dict[str, list[tuple[float, float]]] = {} + for input_name, input_signal in self.inputlist.items(): + if input_signal is None: + inputs[input_name] = [(start_time, 0.0), (stop_time, 0.0)] else: - tmpinputlist[key] = value - - inp = list(tmpinputlist.values()) - - for i in inp: - cl = list() - el = list() - for t, x in i: - cl.append(t) - for i in cl: - if skip is True: - skip = False - continue - if i not in sl: - el.append(i) - else: - elem_no = cl.count(i) - sl_no = sl.count(i) - if elem_no == 2 and sl_no == 1: - el.append(i) - skip = True - sl = sl + el - - sl.sort() - for t in sl: - for i in inp: - for ttt in [tt[0] for tt in i]: - if t not in [tt[0] for tt in i]: - i.append((t, '?')) - inpSortedList = list() - sortedList = list() - for i in inp: - sortedList = sorted(i, key=lambda x: x[0]) - inpSortedList.append(sortedList) - for i in inpSortedList: - ind = 0 - for t, x in i: - if x == '?': - t1 = i[ind - 1][0] - u1 = i[ind - 1][1] - t2 = i[ind + 1][0] - u2 = i[ind + 1][1] - nex = 2 - while (u2 == '?'): - u2 = i[ind + nex][1] - t2 = i[ind + nex][0] - nex += 1 - x = float(u1 + (u2 - u1) * (t - t1) / (t2 - t1)) - i[ind] = (t, x) - ind += 1 - slSet = list() - slSet = set(sl) - for i in inpSortedList: - tempTime = list() - for (t, x) in i: - tempTime.append(t) - inSl = None - inI = None - for s in slSet: - inSl = sl.count(s) - inI = tempTime.count(s) - if inSl != inI: - test = list() - test = [(x, y) for x, y in i if x == s] - i.append(test[0]) - newInpList = list() - tempSorting = list() - for i in inpSortedList: - # i.sort() => just sorting might not work so need to sort according to 1st element of a tuple - tempSorting = sorted(i, key=lambda x: x[0]) - newInpList.append(tempSorting) - - interpolated_inputs_all = list() - for i in newInpList: - templist = list() - for (t, x) in i: - templist.append(x) - interpolated_inputs_all.append(templist) - - name = ','.join(list(self.inputlist.keys())) - name = f'time,{name},end' - - a = '' - l = [] - l.append(name) - for i in range(0, len(sl)): - a = f'{float(sl[i])},{",".join(str(float(inppp[i])) for inppp in interpolated_inputs_all)},0' - l.append(a) - - self.csvFile = (pathlib.Path(self.tempdir) / f'{self.modelName}.csv').as_posix() + inputs[input_name] = input_signal + + # Collect all unique timestamps across all input signals + all_times = np.array( + sorted({t for signal in inputs.values() for t, _ in signal}), + dtype=float + ) + + # Interpolate missing values + interpolated_inputs: dict[str, np.ndarray] = {} + for signal_name, signal_values in inputs.items(): + signal = np.array(signal_values) + interpolated_inputs[signal_name] = np.interp( + all_times, + signal[:, 0], # times + signal[:, 1] # values + ) + + # Write CSV file + input_names = list(interpolated_inputs.keys()) + header = ['time'] + input_names + ['end'] + + csv_rows = [header] + for i, t in enumerate(all_times): + row = [ + t, # time + *(interpolated_inputs[name][i] for name in input_names), # input values + 0 # trailing 'end' column + ] + csv_rows.append(row) + + self.csvFile: str = (pathlib.Path(self.tempdir) / f'{self.modelName}.csv').as_posix() + with open(self.csvFile, "w", newline="") as f: - writer = csv.writer(f, delimiter='\n') - writer.writerow(l) - f.close() + writer = csv.writer(f) + writer.writerows(csv_rows) # to convert Modelica model to FMU def convertMo2Fmu(self, version="2.0", fmuType="me_cs", fileNamePrefix="", includeResources=True): # 19