import datetime as dt
import shutil
import tempfile
import warnings
from contextlib import contextmanager
from pathlib import Path
import fmpy
import pandas as pd
from fmpy import simulate_fmu
from sklearn.pipeline import Pipeline
from corrai.base.model import Model
MODEL_VARIABLES_TYPES_MAP = {
"Real": float,
"Float32": float,
"Float64": float,
"Integer": int,
"Int8": int,
"UInt8": int,
"Int16": int,
"UInt16": int,
"Int32": int,
"UInt32": int,
"Int64": int,
"UInt64": int,
"Boolean": bool,
"String": str,
# Not yet implemented
# ('Enumeration'),
# ('Binary'),
# ('Clock'),
}
DEFAULT_SIMULATION_OPTIONS = {
"startTime": 0,
"stopTime": 24 * 3600,
"stepSize": 60,
"solver": "CVode",
"tolerance": 1e-6,
"fmi_type": "ModelExchange",
}
@contextmanager
def simulation_workspace(fmu_path: Path, boundary_path: Path | None):
"""
Create an isolated temporary workspace with a copy of the FMU and optional
boundary file. Cleans up everything automatically at exit.
"""
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
local_fmu = tmpdir / fmu_path.name
shutil.copy(fmu_path, local_fmu)
local_boundary = None
if boundary_path is not None:
local_boundary = tmpdir / boundary_path.name
shutil.copy(boundary_path, local_boundary)
yield local_fmu, local_boundary
def parse_simulation_times(start, stop, step, output_int):
if all(isinstance(elmt, int) for elmt in (start, stop, step, output_int)):
return start, stop, step, output_int
elif (
isinstance(start, (pd.Timestamp, dt.datetime))
and isinstance(stop, (pd.Timestamp, dt.datetime))
and isinstance(step, (pd.Timedelta, dt.timedelta))
and isinstance(output_int, (pd.Timedelta, dt.timedelta))
):
# Handle 2 years with datetime_index_to_seconds_index function
start_s, stop_s = datetime_index_to_seconds_index(
pd.date_range(start, stop, periods=2)
).astype(int)
step_s, output_int_s = map(datetime_to_second, (step, output_int))
return start_s, stop_s, step_s, output_int_s
raise ValueError("Invalid 'startTime', 'stopTime', 'stepSize', or 'outputInterval")
def seconds_index_to_datetime_index(
index_second: pd.Index, ref_year: int
) -> pd.DatetimeIndex:
"""
Convert an index of seconds into a pandas DatetimeIndex.
Parameters
----------
index_second : pd.Index
Index representing time in seconds since January 1st of `ref_year`.
ref_year : int
The reference year used to compute the origin (January 1st at 00:00).
Returns
-------
pd.DatetimeIndex
A naive datetime index corresponding to the seconds offset from the reference year.
Examples
--------
>>> import pandas as pd
>>> seconds = pd.Index([0, 3600, 7200])
>>> seconds_index_to_datetime_index(seconds, 2020)
DatetimeIndex(['2020-01-01 00:00:00',
'2020-01-01 01:00:00',
'2020-01-01 02:00:00'],
dtype='datetime64[ns]', freq=None)
"""
since = dt.datetime(ref_year, 1, 1, tzinfo=dt.timezone.utc)
diff_seconds = index_second + since.timestamp()
return pd.DatetimeIndex(pd.to_datetime(diff_seconds, unit="s"))
def datetime_to_second(datetime_in: dt.datetime | pd.Timestamp | pd.Timedelta):
"""
Convert a datetime or timestamp into the number of seconds since the beginning of its year.
Parameters
----------
datetime_in : datetime.datetime or pd.Timestamp
The datetime object to convert.
Returns
-------
float
Seconds elapsed since January 1st of the same year as `datetime_in`.
Examples
--------
>>> import datetime as dt
>>> datetime_to_second(dt.datetime(2020, 1, 1, 1, 0, 0))
3600.0
"""
if isinstance(datetime_in, (dt.datetime | pd.Timestamp)):
year = datetime_in.year
origin = dt.datetime(year, 1, 1, tzinfo=datetime_in.tz)
return int((datetime_in - origin).total_seconds())
return int(datetime_in.total_seconds())
def datetime_index_to_seconds_index(index_datetime: pd.DatetimeIndex) -> pd.Index:
"""
Convert a DatetimeIndex into a cumulative seconds index starting from January 1st.
Parameters
----------
index_datetime : pd.DatetimeIndex
The datetime index to convert.
Returns
-------
pd.Index
An index of seconds relative to the start of the year.
Examples
--------
>>> import pandas as pd
>>> idx = pd.date_range("2020-01-01 00:00:00", periods=3, freq="H")
>>> datetime_index_to_seconds_index(idx)
0 0.0
1 3600.0
2 7200.0
dtype: float64
"""
time_start = dt.datetime(index_datetime[0].year, 1, 1, tzinfo=dt.timezone.utc)
new_index = index_datetime.to_frame().diff().squeeze()
new_index.iloc[0] = dt.timedelta(
seconds=index_datetime[0].timestamp() - time_start.timestamp()
)
sec_dt = [elmt.total_seconds() for elmt in new_index]
return pd.Series(sec_dt).cumsum()
def df_to_combitimetable(df: pd.DataFrame, filename):
"""
Export a pandas DataFrame with a DatetimeIndex into a Modelica-compatible
CombiTimeTable text file.
Parameters
----------
df : pd.DataFrame
DataFrame containing boundary conditions with a DatetimeIndex or seconds index.
filename : str or Path
Path to the output file.
Raises
------
ValueError
If the datetime index is not monotonically increasing.
Examples
--------
>>> import pandas as pd
>>> df = pd.DataFrame(
... {"val": [1, 2]}, index=pd.date_range("2020-01-01", periods=2, freq="H")
... )
>>> df_to_combitimetable(df, "boundaries.txt")
# produces a text file compatible with Modelica
"""
if not df.index.is_monotonic_increasing:
raise ValueError(
"df DateTimeIndex is not monotonically increasing, this will"
"cause Modelica to crash."
)
df = df.copy()
with open(filename, "w") as file:
file.write("#1 \n")
line = ""
line += f"double table1({df.shape[0]}, {df.shape[1] + 1})\n"
line += "\t# Time (s)"
for i, col in enumerate(df.columns):
line += f"\t({i + 1}){col}"
file.write(f"{line} \n")
if isinstance(df.index, pd.DatetimeIndex):
df.index = datetime_index_to_seconds_index(df.index)
file.write(df.to_csv(header=False, sep="\t", lineterminator="\n"))
[docs]
class ModelicaFmuModel(Model):
"""
Wrapper for a Modelica FMU (Functional Mock-up Unit) in the corrai ``Model``
formalism.
Provides functionality to:
- Load an FMU and its metadata.
- Query property initial values.
- Run simulations with configurable options.
- Handle boundary conditions using a CombiTimeTable if defined.
Parameters
----------
fmu_path : Path or str
Path to the FMU file.
simulation_dir : Path, optional
Directory for simulation files. A temporary directory is created if not
provided.
output_list : list of str, optional
Names of FMU variables to record during simulation.
boundary_table_name : str or None, optional
Name of the CombiTimeTable object in the FMU used for boundary conditions.
If provided, boundary data can be passed through
``simulation_options["boundary"]`` or ``property_dict["boundary"]``. If
``None`` (default), boundaries are ignored. Boundaries specified in
``property_dict`` will always override ``simulation_options`` boundaries
Examples
--------
>>> import pandas as pd
>>> from corrai.fmu import ModelicaFmuModel
>>> simu = ModelicaFmuModel(
... fmu_path=fmu_path,
... output_list=["Boundaries.y[1]", "Boundaries.y[2]"],
... boundary_table_name="Boundaries",
... )
>>> new_bounds = pd.DataFrame(
... {"Boundaries.y[1]": [1, 2, 3], "Boundaries.y[2]": [3, 4, 5]},
... index=range(3, 6),
... )
>>> res = simu.simulate(
... simulation_options={
... "solver": "CVode",
... "startTime": 3,
... "stopTime": 5,
... "stepSize": 1,
... "boundary": new_bounds,
... },
... solver_duplicated_keep="last",
... )
Boundaries.y[1] Boundaries.y[2]
time
3.0 1.0 3.0
4.0 2.0 4.0
5.0 3.0 5.0
"""
[docs]
def __init__(
self,
fmu_path: Path | str,
simulation_dir: Path = None,
output_list: list[str] = None,
boundary_table_name: str | None = None,
):
super().__init__(is_dynamic=True)
fmu_path = Path(fmu_path) if isinstance(fmu_path, str) else fmu_path
if not fmu_path.exists() or not fmu_path.is_file():
raise FileNotFoundError(f"FMU file not found at {fmu_path}")
self.fmu_path = fmu_path
self.simulation_dir = (
Path(tempfile.mkdtemp()) if simulation_dir is None else simulation_dir
)
self.output_list = output_list
self.boundary_table_name = boundary_table_name
self.boundary_file_path = None
if self.boundary_table_name is not None:
model_description = fmpy.read_model_description(self.fmu_path.as_posix())
var_map = {var.name: var.start for var in model_description.modelVariables}
try:
combi_tab_property_name = f"{self.boundary_table_name}.fileName"
self.boundary_file_path = Path(rf"{var_map[combi_tab_property_name]}")
except KeyError:
warnings.warn(
f"Boundary combitimetable '{self.boundary_table_name}' "
f"not found in FMU -> ignoring boundary.",
UserWarning,
stacklevel=2,
)
self.property_dict = self._get_init_property_dict()
def _get_init_property_dict(self):
model_description = fmpy.read_model_description(self.fmu_path.as_posix())
fmu_param_dict = {
var.name: var
for var in model_description.modelVariables
if var.causality == "parameter"
}
prop_dict = {}
for name, var in fmu_param_dict.items():
try:
prop_dict[name] = MODEL_VARIABLES_TYPES_MAP[var.type](var.start)
except TypeError:
prop_dict[name] = var.start
except KeyError:
pass
return prop_dict
[docs]
def get_property_values(
self, property_list: str | tuple[str, ...] | list[str]
) -> list[str | int | float | None]:
"""
Retrieve initial values of FMU properties.
Parameters
----------
property_list : str or tuple of str or list of str
Name(s) of FMU properties to query.
Returns
-------
list
List of default values or None if unavailable.
Examples
--------
>>> from corrai.fmu import ModelicaFmuModel
>>> model = ModelicaFmuModel("rosen.fmu", output_list=["res.showNumber"])
>>> model.get_property_values("x.k")
['2.0']
>>> model.get_property_values(["x.k", "y.k"])
['2.0', '2.0']
"""
property_list = (
[property_list] if isinstance(property_list, str) else property_list
)
return [self.property_dict[prop] for prop in property_list]
[docs]
def set_property_values(self, property_dict: dict):
self.property_dict.update(property_dict.items())
[docs]
def simulate(
self,
property_dict: dict[str, float | int | str] = None,
simulation_options: dict = None,
solver_duplicated_keep: str = "last",
post_process_pipeline: Pipeline = None,
debug_param: bool = False,
debug_logging: bool = False,
logger=None,
) -> pd.DataFrame:
"""
Run an FMU simulation with properties and boundary configuration.
Parameters
----------
property_dict : dict, optional
Dictionary of FMU parameter values to set before simulation.
May include a key ``"boundary"`` with a DataFrame of boundary conditions.
If both ``property_dict`` and ``simulation_options`` specify boundaries,
the one in ``property_dict`` takes precedence.
simulation_options : dict, optional
Simulation settings. Supported keys include:
- ``startTime`` : float or pandas.Timestamp
- ``stopTime`` : float or pandas.Timestamp
- ``stepSize`` : float or pandas.TimeDelta
- ``outputInterval`` : float or pandas.TimeDelta. If not provided, it will
be set equal to ``stepSize``
- ``solver`` : str
- ``tolerance`` : float
- ``fmi_type`` : {"CoSimulation", "ModelExchange"}
- ``boundary`` : pandas.DataFrame of boundary conditions
solver_duplicated_keep : {"first", "last"}, default "last"
Which entry to keep if solver outputs duplicated indices.
post_process_pipeline : sklearn.Pipeline, optional
Transformation pipeline applied to simulation results before returning.
debug_param : bool, default False
If True, prints the property dictionary before simulation.
debug_logging : bool, default False
Enable verbose logging from fmpy.
logger : callable, optional
Custom logger for fmpy.
Returns
-------
pandas.DataFrame
Simulation results indexed by time. If ``startTime`` is a
:class:`pandas.Timestamp`, the index is a DateTimeIndex; otherwise,
a numeric index is used.
Raises
------
ValueError
If ``startTime`` or ``stopTime`` are outside the boundary DataFrame.
Notes
-----
- Duplicate time indices are resolved using ``solver_duplicated_keep``.
Examples
--------
Run a basic simulation with default options:
>>> model = ModelicaFmuModel("simple.fmu", output_list=["y"])
>>> res = model.simulate(
... simulation_options={"startTime": 0, "stopTime": 10, "stepSize": 1}
... )
>>> res.head()
y
0.0 0.0
1.0 1.1
2.0 2.3
...
Run a simulation with boundary conditions:
>>> import pandas as pd
>>> x = pd.DataFrame({"Boundaries.y[1]": [1, 2, 3]}, index=[0, 1, 2])
>>> model = ModelicaFmuModel(
... "boundary_test.fmu",
... output_list=["Boundaries.y[1]"],
... boundary_table_name="Boundaries",
... )
>>> res = model.simulate(
... simulation_options={
... "boundary": x,
... "startTime": 0,
... "stopTime": 2,
... "stepSize": 1,
... }
... )
>>> res.head()
Boundaries.y[1]
time
0.0 1.0
1.0 2.0
2.0 3.0
"""
simu_property = self.property_dict.copy()
simu_property.update(dict(property_dict or {}))
if property_dict and debug_param:
print(simu_property)
simulation_options = {
**DEFAULT_SIMULATION_OPTIONS,
**(simulation_options or {}),
}
start, stop, step, output_int = (
simulation_options.get(it, None)
for it in ["startTime", "stopTime", "stepSize", "outputInterval"]
)
if output_int is None:
output_int = step
start_sec, stop_sec, step_sec, output_int_sec = parse_simulation_times(
start, stop, step, output_int
)
boundary_df = None
if simu_property:
boundary_df = simu_property.pop("boundary", boundary_df)
if simulation_options:
sim_boundary = simulation_options.pop("boundary", boundary_df)
if boundary_df is None and sim_boundary is not None:
boundary_df = sim_boundary
elif boundary_df is not None and sim_boundary is not None:
warnings.warn(
"Boundary specified in both property_dict and "
"simulation_options. The one in property_dict will be used.",
UserWarning,
stacklevel=2,
)
if boundary_df is not None:
boundary_df = boundary_df.copy()
if isinstance(boundary_df.index, pd.DatetimeIndex):
boundary_df.index = datetime_index_to_seconds_index(boundary_df.index)
if not (
boundary_df.index[0] <= start_sec <= boundary_df.index[-1]
and boundary_df.index[0] <= stop_sec <= boundary_df.index[-1]
):
raise ValueError(
"'startTime' and 'stopTime' are outside boundary DataFrame"
)
self.boundary_file_path = self.simulation_dir / "boundaries.txt"
df_to_combitimetable(boundary_df, self.boundary_file_path)
with simulation_workspace(self.fmu_path, self.boundary_file_path) as (
local_fmu,
local_boundary,
):
if local_boundary is not None and self.boundary_table_name:
simu_property[f"{self.boundary_table_name}.fileName"] = (
local_boundary.as_posix()
)
result = simulate_fmu(
filename=local_fmu,
start_time=start_sec,
stop_time=stop_sec,
step_size=step_sec,
relative_tolerance=simulation_options["tolerance"],
start_values=simu_property,
output=self.output_list,
solver=simulation_options["solver"],
output_interval=output_int_sec,
fmi_type=simulation_options["fmi_type"],
debug_logging=debug_logging,
logger=logger,
)
columns = ["time"] + self.output_list if self.output_list else None
df = pd.DataFrame(result, columns=columns)
if isinstance(start, (pd.Timestamp, dt.datetime)):
df.index = seconds_index_to_datetime_index(df["time"], start.year)
df.index = df.index.round("s")
df = df.tz_localize(start.tz)
df.index.freq = df.index.inferred_freq
else:
df.index = round(df["time"], 2)
df.drop(columns=["time"], inplace=True)
df = df.loc[~df.index.duplicated(keep=solver_duplicated_keep)]
if post_process_pipeline is not None:
df = post_process_pipeline.fit_transform(df)
return df
[docs]
def save(self, file_path: Path):
"""
Save the FMU file to a new location.
Parameters
----------
file_path : Path
Destination path.
"""
shutil.copyfile(self.fmu_path, file_path)
[docs]
def __repr__(self):
"""
Return a string representation of the FMU model.
Returns
-------
str
A formatted string containing FMU name, description, version,
and available parameters.
Examples
--------
>>> print(model)
Model Name: rosen
Description: ModelDescription(
fmiVersion='2.0', modelName='rosen',
coSimulation=CoSimulation(modelIdentifier='rosen'),
modelExchange=ModelExchange(modelIdentifier='rosen'),
scheduledExecution=None
)
Version: 2.0
Parameters:
Name: x.k, Default Value: 2.0, Description: Constant output value
Name: y.k, Default Value: 2.0, Description: Constant output value
Name: res.significantDigits, Default Value: 2,
Description: Number of significant digits to be shown
"""
model_description = fmpy.read_model_description(self.fmu_path.as_posix())
model_info = f"Model Name: {model_description.modelName}\n"
model_info += (
f"Description: {fmpy.read_model_description(self.fmu_path.as_posix())}\n"
)
model_info += f"Version: {model_description.fmiVersion}\n"
model_info += "Parameters:\n"
parameters = [
var
for var in model_description.modelVariables
if var.causality == "parameter"
]
if not parameters:
model_info += " No parameters available.\n"
else:
for param in parameters:
default_value = (
param.start if param.start is not None else "Not specified"
)
desc = (
param.description
if param.description
else "No description available."
)
model_info += (
f" Name: {param.name}, "
f"Default Value: {default_value}, "
f"Description: {desc}\n"
)
return model_info