Source code for corrai.surrogate

import time

import numpy as np
import pandas as pd

from scipy.stats import loguniform, randint

from sklearn.base import BaseEstimator, RegressorMixin, clone
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import (
    train_test_split,
    cross_val_score,
    RandomizedSearchCV,
)
from sklearn.neural_network import MLPRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.svm import SVR
from sklearn.utils.validation import check_is_fitted

from corrai.base.metrics import nmbe, cv_rmse
from corrai.base.utils import as_1_column_dataframe
from corrai.base.model import Model

GRID_DICT = {
    "TREE_REGRESSOR": [
        {
            "n_estimators": [100, 200, 300],
            "max_depth": [10, 20, 30, None],
            "min_samples_split": [2, 5, 10],
            "min_samples_leaf": [1, 2, 4],
            "max_features": ["sqrt", "log2"],
            "bootstrap": [True],
        }
    ],
    "RANDOM_FOREST": [
        {
            "n_estimators": [100, 200, 300],
            "max_depth": [10, 20, 30, None],
            "max_features": ["sqrt", "log2"],
            "min_samples_leaf": [1, 2, 5],
            "min_samples_split": [2, 5],
        }
    ],
    "LINEAR_REGRESSION": [{"fit_intercept": [True, False]}],
    "LINEAR_SECOND_ORDER": [{"poly__interaction_only": [True, False]}],
    "LINEAR_THIRD_ORDER": [{"poly__interaction_only": [True, False]}],
    "SUPPORT_VECTOR": [
        {
            "kernel": ["rbf", "linear"],
            "C": [0.1, 1, 10, 100],
            "gamma": ["scale", "auto", 0.001, 0.01],
            "epsilon": [0.01, 0.1],
        }
    ],
    "MULTI_LAYER_PERCEPTRON": [
        {
            "MLP__hidden_layer_sizes": [
                (100,),
                (150,),
                (100, 50),
                (150, 100),
            ],
            "MLP__activation": ["relu", "tanh"],
            "MLP__solver": ["adam"],
            "MLP__alpha": [0.0001, 0.001, 0.01],
            "MLP__learning_rate": ["adaptive"],
            "MLP__max_iter": [3000, 5000],
            "MLP__early_stopping": [True],
            "MLP__validation_fraction": [0.15],
            "MLP__n_iter_no_change": [15],
            "MLP__tol": [1e-4],
        }
    ],
}

# For use with RandomizedSearchCV - continuous distributions
# This allows exploring MORE distinct values
GRID_DICT_CONTINUOUS = {
    "TREE_REGRESSOR": {
        "n_estimators": randint(100, 400),
        "max_depth": [10, 20, 30, 40, None],
        "min_samples_split": randint(2, 15),
        "min_samples_leaf": randint(1, 8),
        "max_features": ["sqrt", "log2"],
        "bootstrap": [True],
    },
    "RANDOM_FOREST": {
        "n_estimators": randint(100, 400),
        "max_depth": [10, 20, 30, 40, None],
        "max_features": ["sqrt", "log2"],
        "min_samples_leaf": randint(1, 10),
        "min_samples_split": randint(2, 15),
    },
    "LINEAR_REGRESSION": {"fit_intercept": [True, False]},
    "LINEAR_SECOND_ORDER": {"poly__interaction_only": [True, False]},
    "LINEAR_THIRD_ORDER": {"poly__interaction_only": [True, False]},
    "SUPPORT_VECTOR": {
        "kernel": ["rbf", "linear"],
        "C": loguniform(0.1, 100),  # Continuous log scale
        "gamma": ["scale", "auto"]
        + list(loguniform(1e-4, 1e-1).rvs(5, random_state=42)),
        "epsilon": loguniform(0.001, 0.5),
    },
    "MULTI_LAYER_PERCEPTRON": {
        "MLP__hidden_layer_sizes": [
            (50,),
            (100,),
            (150,),
            (200,),
            (50, 50),
            (100, 50),
            (150, 100),
            (200, 100),
            (100, 50, 25),
            # (200, 50, 25),
        ],
        "MLP__activation": ["relu", "tanh"],
        "MLP__solver": ["adam", "lbfgs"],
        "MLP__alpha": loguniform(1e-5, 1e-2),
        "MLP__learning_rate": ["constant", "adaptive"],
        "MLP__max_iter": [3000, 5000],
        "MLP__early_stopping": [True],
        "MLP__validation_fraction": [0.05, 0.1],
        "MLP__n_iter_no_change": [15, 30, 50],
        "MLP__tol": [1e-4, 1e-5],
    },
}


# RECOMMENDED: Different n_iter for different model complexities
TUNING_N_ITER_BY_MODEL = {
    "TREE_REGRESSOR": 30,
    "RANDOM_FOREST": 30,
    "LINEAR_REGRESSION": 2,
    "LINEAR_SECOND_ORDER": 2,
    "LINEAR_THIRD_ORDER": 2,
    "SUPPORT_VECTOR": 25,
    "MULTI_LAYER_PERCEPTRON": 20,
}


MODEL_MAP = {
    "TREE_REGRESSOR": RandomForestRegressor(),
    "RANDOM_FOREST": RandomForestRegressor(),
    "LINEAR_REGRESSION": LinearRegression(),
    "LINEAR_SECOND_ORDER": Pipeline(
        [
            ("poly", PolynomialFeatures(2)),
            # Intercept is already added by PolynomialFeatures
            ("Line_reg", LinearRegression(fit_intercept=False)),
        ]
    ),
    "LINEAR_THIRD_ORDER": Pipeline(
        [
            ("poly", PolynomialFeatures(3)),
            ("Line_reg", LinearRegression(fit_intercept=False)),
        ]
    ),
    "SUPPORT_VECTOR": SVR(),
    "MULTI_LAYER_PERCEPTRON": Pipeline(
        [("scaler", StandardScaler()), ("MLP", MLPRegressor(max_iter=5000))]
    ),
}


[docs] class ModelTrainer:
[docs] def __init__(self, model, test_size: float = 0.2, random_state: float = 42): """ Initialize a ModelTrainer instance for training a machine learning model. :param model_pipe: A scikit-learn compatible model pipeline for training and prediction. :param test_size: The proportion of the dataset to set aside as the test set (default: 0.2). :param random_state: Seed for random number generation to ensure reproducibility (default: 42). The ModelTrainer prepares data for training and evaluation of the specified model. Attributes: - test_size: The proportion of data to be used as the test set. - model_pipe: The machine learning model pipeline to be trained. - random_state: Seed for random number generation. - x_train: Training data features. - x_test: Test data features. - y_train: Training data labels. - y_test: Test data labels. - _is_trained: A boolean indicating if the model has been trained. """ self.test_size = test_size self.model = model self.random_state = random_state self.x_train = None self.x_test = None self.y_train = None self.y_test = None self._is_trained = False
[docs] def train(self, X: pd.DataFrame, y: pd.DataFrame | pd.Series): if isinstance(y, pd.Series): y = as_1_column_dataframe(y) (self.x_train, self.x_test, self.y_train, self.y_test) = train_test_split( X, y, test_size=self.test_size, random_state=self.random_state ) self.model.fit(self.x_train, self.y_train) self._is_trained = True
@property def test_nmbe_score(self): if self._is_trained: return nmbe(self.model.predict(self.x_test), self.y_test) else: raise ValueError("Model is not trained yet. use train() method") @property def test_cvrmse_score(self): if self._is_trained: return cv_rmse(self.model.predict(self.x_test), self.y_test) else: raise ValueError("Model is not trained yet. use train() method")
[docs] class MultiModelSO(BaseEstimator, RegressorMixin): """ Multi-model selection and optimization wrapper for scikit-learn regressors. This class automates model training, cross-validation scoring, model selection, and optional fine-tuning via grid search. It compares multiple candidate models and selects the one with the best cross-validation performance according to a specified scoring metric. Parameters ---------- models : list of str, optional List of model keys to evaluate. Must be a subset of ``MODEL_MAP``. "TREE_REGRESSOR", "RANDOM_FOREST", "LINEAR_REGRESSION", "LINEAR_SECOND_ORDER", "LINEAR_THIRD_ORDER", "SUPPORT_VECTOR", "MULTI_LAYER_PERCEPTRON" If ``None`` (default), all models in ``MODEL_MAP`` are evaluated. cv : int, default=3 Number of cross-validation folds to use for model comparison. fine_tuning : bool, default=True If True, perform a grid search on the best model to fine-tune its hyperparameters. scoring : str, default="neg_mean_squared_error" Scoring function to evaluate models. Should be a valid scikit-learn scorer string (e.g. ``"r2"``, ``"neg_mean_absolute_error"``). n_jobs : int, default=-1 Number of parallel jobs for cross-validation and grid search. ``-1`` means using all available cores. random_state : int, optional Random seed for reproducibility. Attributes ---------- model_map : dict Dictionary mapping model keys to fitted estimator instances. best_model_key : str The key of the best-performing model after training. _is_fitted : bool Whether the estimator has been fitted. Methods ------- fit(X, y, verbose=True) Train and evaluate all models, selecting the best one. predict(X, model=None) Predict using the best model (or a specified model). get_model(model=None) Retrieve the fitted estimator by key. fine_tune(X, y, model=None, verbose=3) Perform grid search hyperparameter tuning on a given model. Examples -------- >>> import pandas as pd >>> from sklearn.datasets import load_diabetes >>> from sklearn.model_selection import train_test_split >>> from corrai.surrogate import MultiModelSO >>> >>> data = load_diabetes(as_frame=True) >>> X = data.data >>> y = data.target >>> X_train, X_test, y_train, y_test = train_test_split( ... X, y, test_size=0.2, random_state=42 ... ) >>> >>> model = MultiModelSO( ... models=["LINEAR_REGRESSION", "RANDOM_FOREST"], cv=5, fine_tuning=False ... ) >>> model.fit(X_train, y_train, verbose=True) === Training results === Cross validation neg_mean_squared_error scores of 5 folds mean(neg_mean_squared_error) std(neg_mean_squared_error) RANDOM_FOREST -3143.015307 355.466814 LINEAR_REGRESSION -3425.368758 525.460964 >>> y_pred = model.predict(X_test) >>> y_pred.head() 0 287 139.547558 211 179.517208 72 134.038756 321 291.417029 73 123.789659 >>> # Fast configuration and training (development) >>> model = MultiModelSO( ... models=["LINEAR_REGRESSION", "RANDOM_FOREST", "MULTI_LAYER_PERCEPTRON"], ... cv=3, ... fine_tuning=True, ... tuning_n_iter=TUNING_N_ITER_BY_MODEL, ... use_continuous_distributions=False, ... n_jobs=-1, ... ) >>> # Optimal configuration (production) >>> model = MultiModelSO( ... models=None, ... cv=5, ... fine_tuning=True, ... tuning_n_iter=TUNING_N_ITER_BY_MODEL, ... use_continuous_distributions=True, ... n_jobs=-1, ... random_state=42, ... ) """
[docs] def __init__( self, models: list[str] = None, cv: int = 3, scoring: str = "neg_mean_squared_error", fine_tuning: bool = True, tuning_n_iter: int | dict = None, use_continuous_distributions: bool = False, n_jobs: int = -1, random_state: int = None, ): self.cv = cv self.fine_tuning = fine_tuning self.tuning_n_iter = tuning_n_iter self.use_continuous_distributions = use_continuous_distributions self.scoring = scoring self.n_jobs = n_jobs self.random_state = random_state self.best_model_key = None self.models = models if models is not None else list(MODEL_MAP.keys()) self.model_map = {mod: clone(MODEL_MAP[mod]) for mod in self.models} self.training_times_ = {} self.cv_scores_ = {}
@property def feature_names_in_(self): check_is_fitted(self, ["_is_fitted"]) return self.get_model().feature_names_in_ def __sklearn_is_fitted__(self): return hasattr(self, "_is_fitted") and self._is_fitted
[docs] def fit(self, X: pd.DataFrame, y: pd.DataFrame | pd.Series, verbose=True): y = y.squeeze() if isinstance(y, pd.DataFrame) else y # Train all models with timing for mod_name, mod in self.model_map.items(): start_time = time.time() try: mod.fit(X, y) self.training_times_[mod_name] = time.time() - start_time except Exception as e: if verbose: print(f"Warning: {mod_name} failed to train: {e}") self.training_times_[mod_name] = None # Cross-validation scores score_frame = pd.DataFrame( columns=[ f"mean({self.scoring})", f"std({self.scoring})", "train_time_sec", ], index=self.models, ) for mod_name, mod in self.model_map.items(): if self.training_times_[mod_name] is None: continue cv_scores = cross_val_score( mod, X, y, scoring=self.scoring, cv=self.cv, n_jobs=self.n_jobs, # NEW: parallel CV ) score_frame.loc[mod_name, f"mean({self.scoring})"] = np.mean(cv_scores) score_frame.loc[mod_name, f"std({self.scoring})"] = np.std(cv_scores) score_frame.loc[mod_name, "train_time_sec"] = self.training_times_[mod_name] self.cv_scores_[mod_name] = cv_scores # Remove failed models score_frame = score_frame.dropna() score_frame.sort_values(f"mean({self.scoring})", ascending=False, inplace=True) if verbose: print( "=== Training results === \n" f"Cross validation {self.scoring} scores of {self.cv} folds\n" f"{score_frame}" ) if len(score_frame) == 0: raise ValueError("All models failed to train!") self.best_model_key = score_frame.index[0] if self.fine_tuning: if verbose: print(f"\n === Fine tuning === \n" f"Fine tuning {self.best_model_key}") self.fine_tune(X, y, self.best_model_key, verbose=verbose) self.target_name_ = y.name self._is_fitted = True return self
[docs] def predict( self, X: pd.DataFrame | np.ndarray | pd.Series, model: str = None ) -> pd.DataFrame: check_is_fitted(self) model_for_prediction = self.get_model(model) if X.ndim == 1: X = as_1_column_dataframe(X) if isinstance(X, pd.DataFrame): return pd.DataFrame( model_for_prediction.predict(X), index=X.index, columns=[self.target_name_], ) else: return pd.DataFrame( model_for_prediction.predict(X), columns=[self.target_name_] )
[docs] def get_model(self, model: str = None): if model is None: model = self.best_model_key if model not in self.model_map.keys(): raise ValueError(f"{model} is not found in 'models' attribute") return self.model_map[model]
[docs] def fine_tune( self, X: pd.DataFrame, y: pd.DataFrame | pd.Series, model: str = None, verbose=True, ): if model is None: model = self.best_model_key if self.use_continuous_distributions: param_dist = GRID_DICT_CONTINUOUS.get(model) else: param_dist = GRID_DICT.get(model) if param_dist is None: if verbose: print(f"Fine tuning for {model} not available - skipping") return model_to_tune = self.get_model(model) if isinstance(self.tuning_n_iter, dict): n_iter = self.tuning_n_iter.get(model, 20) elif self.tuning_n_iter is not None: n_iter = self.tuning_n_iter else: n_iter = TUNING_N_ITER_BY_MODEL.get(model, 20) verbose_level = 2 if verbose else 0 grid_search = RandomizedSearchCV( model_to_tune, param_distributions=param_dist, cv=self.cv, n_iter=n_iter, scoring=self.scoring, return_train_score=True, verbose=verbose_level, n_jobs=self.n_jobs, random_state=self.random_state, ) start_time = time.time() grid_search.fit(X, y) tuning_time = time.time() - start_time self.model_map[model] = grid_search.best_estimator_ if verbose: print(f"\nFine tuning completed in {tuning_time:.2f}s") print(f"Best params: {grid_search.best_params_}") print(f"Best CV score: {grid_search.best_score_:.4f}") # Show top 5 configurations cvres = grid_search.cv_results_ results = sorted( zip(cvres["mean_test_score"], cvres["params"]), reverse=True )[:5] print("\nTop 5 configurations:") for i, (score, params) in enumerate(results, 1): print(f"{i}. Score: {score:.4f} | Params: {params}")
[docs] def get_feature_importance(self, model: str = None, top_n: int = 10): check_is_fitted(self) mod = self.get_model(model) # Handle Pipeline if isinstance(mod, Pipeline): mod = mod.steps[-1][1] if not hasattr(mod, "feature_importances_"): raise ValueError( f"Model {model or self.best_model_key} doesn't support feature_importances_" ) importance_df = pd.DataFrame( {"feature": self.feature_names_in_, "importance": mod.feature_importances_} ).sort_values("importance", ascending=False) return importance_df.head(top_n) if top_n else importance_df
[docs] class StaticScikitModel(Model): """ Wrapper class for static surrogate MultiModelSingleOutput class and scikit-learn regressors within the Corrai framework. This class adapts corrai's `MultiModelSO` and scikit-learn models to the :class:`Model` interface, enabling parameter-to-property mapping and simulation execution. It is intended for non-dynamic (static) models where outputs are single values or vectors rather than time-dependent series. Parameters ---------- scikit_model : MultiModelSO or RegressorMixin The underlying scikit-learn model or a Corrai :class:`MultiModelSO` meta-estimator. target_name : str, optional Name of the output variable. Required when ``scikit_model`` is not an instance of :class:`MultiModelSO`. Attributes ---------- is_dynamic : bool Always ``False`` for this wrapper, since it represents static models. scikit_model : MultiModelSO or RegressorMixin The wrapped scikit-learn model used for predictions. target_name : str Output variable name. Raises ------ ValueError If ``target_name`` cannot be inferred and is not provided. """
[docs] def __init__( self, scikit_model: MultiModelSO | RegressorMixin, target_name: str = None ): super().__init__(is_dynamic=False) self.scikit_model = scikit_model self.target_name = self._resolve_target_name(target_name)
def _resolve_target_name(self, target_name: str = None) -> str: if target_name is not None: return target_name if hasattr(self.scikit_model, "target_name_"): return self.scikit_model.target_name_ raise ValueError( "target_name must be specified when scikit_model " "is not an instance of MultiModelSO" ) def _build_feature_dataframe( self, property_dict: dict[str, str | int | float], simulation_options: dict ) -> pd.DataFrame: if not property_dict and not simulation_options: return pd.DataFrame() # Merge dictionaries (simulation_options override property_dict) merged = {**(property_dict or {}), **(simulation_options or {})} return pd.DataFrame([merged]) def _validate_features(self, features: pd.DataFrame) -> None: unknown = set(features.columns) - set(self.scikit_model.feature_names_in_) if unknown: raise ValueError(f"Unknown features: {unknown}")
[docs] def simulate( self, property_dict: dict[str, str | int | float] = None, simulation_options: dict = None, **simulation_kwargs, ) -> pd.Series: """ Run the scikit-learn model prediction. Combines provided parameter values and simulation options into a feature vector, validates compatibility with the underlying model, and returns predictions as a pandas Series. Parameters ---------- property_dict : dict of {str: int, float, or str}, optional Mapping from feature names to values to use for prediction. simulation_options : dict, optional Additional feature overrides or configuration parameters to include in the feature vector. These values override those in ``property_dict`` if keys overlap. **simulation_kwargs Extra keyword arguments for future extensions (currently unused). Returns ------- pd.Series Prediction results with index ``[self.target_name]``. Raises ------ ValueError If unknown feature names are provided. """ features = self._build_feature_dataframe(property_dict, simulation_options) self._validate_features(features) pred = self.scikit_model.predict(features) if isinstance(pred, pd.DataFrame): pred = pred.squeeze() return pd.Series(pred, index=[self.target_name])