Skip to content

API Reference

Public entrypoints for the package. Each section below is generated from the live docstrings in the source code.

Pipeline

run_single_its(df, intervention_date, target_col=None, date_col=None, covariate_cols=None, model_name='prophet_xgb', config_path=None, config_overrides=None, output_dir=None, seed=42, split_method=None)

Run a single ITS counterfactual analysis pipeline.

Parameters:

Name Type Description Default
df DataFrame

Full time series dataset.

required
intervention_date str or Timestamp

Date of the intervention.

required
target_col str

Target column name. Defaults to config value.

None
date_col str

Date column name. Defaults to config value.

None
covariate_cols list[str]

Covariate column names. Defaults to config value.

None
model_name str

Model to use. One of: prophet_xgb, prophet_then_xgb, neuralprophet, arima.

'prophet_xgb'
config_path str or Path

Path to custom YAML config.

None
config_overrides dict

Runtime config overrides.

None
output_dir str or Path

Directory for saving outputs. If None, no files are saved.

None
seed int

Random seed for bootstrap reproducibility.

42

Returns:

Type Description
PipelineResult
Source code in its2s/pipeline.py
def run_single_its(
    df,
    intervention_date,
    target_col=None,
    date_col=None,
    covariate_cols=None,
    model_name="prophet_xgb",
    config_path=None,
    config_overrides=None,
    output_dir=None,
    seed=42,
    split_method=None,
):
    """Run a single ITS counterfactual analysis pipeline.

    Parameters
    ----------
    df : pd.DataFrame
        Full time series dataset.
    intervention_date : str or pd.Timestamp
        Date of the intervention.
    target_col : str, optional
        Target column name. Defaults to config value.
    date_col : str, optional
        Date column name. Defaults to config value.
    covariate_cols : list[str], optional
        Covariate column names. Defaults to config value.
    model_name : str
        Model to use. One of: prophet_xgb, prophet_then_xgb, neuralprophet, arima.
    config_path : str or Path, optional
        Path to custom YAML config.
    config_overrides : dict, optional
        Runtime config overrides.
    output_dir : str or Path, optional
        Directory for saving outputs. If None, no files are saved.
    seed : int
        Random seed for bootstrap reproducibility.

    Returns
    -------
    PipelineResult
    """
    # 1. Load config
    config = load_config(config_path, config_overrides)
    date_col = date_col or config["data"]["date_col"]
    target_col = target_col or config["data"]["target_col"]
    covariate_cols = covariate_cols if covariate_cols is not None else config["data"]["covariate_cols"]
    config["data"]["date_col"] = date_col
    config["data"]["target_col"] = target_col

    # Resolve split-method config (function kwarg overrides config)
    periods_cfg = config["periods"]
    if split_method is not None:
        periods_cfg["split_method"] = split_method
    split_method_resolved = periods_cfg.get("split_method", "percent")
    test_pct_resolved = periods_cfg.get("test_pct", 0.20)
    holdout_pct_resolved = periods_cfg.get("holdout_pct", 1.0)
    test_days_resolved = periods_cfg.get("test_days", 365)
    holdout_days_resolved = periods_cfg.get("holdout_days", 365)

    # 1b. Validate inputs
    validate_inputs(df, intervention_date, date_col, target_col,
                    covariate_cols, model_name,
                    split_method=split_method_resolved,
                    test_pct=test_pct_resolved,
                    holdout_pct=holdout_pct_resolved,
                    test_days=test_days_resolved,
                    holdout_days=holdout_days_resolved)

    # M2-6: Check date-sort and warn if DataFrame is unsorted
    dates_check = pd.to_datetime(df[date_col])
    if not dates_check.is_monotonic_increasing:
        warnings.warn(
            f"DataFrame is not sorted by '{date_col}'. Rows will be reordered "
            "before model fitting. If covariate columns are present, ensure "
            "their values are aligned with the date column, not with the "
            "original row positions. Pre-sort your DataFrame to suppress this warning.",
            UserWarning,
            stacklevel=2,
        )

    # 1c. Handle missing data in target column
    missing_strategy = config["data"].get("missing_data", "error")
    n_missing = df[target_col].isna().sum()
    if n_missing > 0:
        if missing_strategy == "error":
            raise ValueError(
                f"Target column '{target_col}' contains {n_missing} missing "
                f"values. Set data.missing_data to 'drop' or 'interpolate' "
                f"in config to handle them automatically."
            )
        elif missing_strategy == "drop":
            warnings.warn(
                f"Dropping {n_missing} row(s) with missing values in '{target_col}'.",
                UserWarning,
                stacklevel=2,
            )
            df = df.dropna(subset=[target_col]).copy()
        elif missing_strategy == "interpolate":
            warnings.warn(
                f"Interpolating {n_missing} missing value(s) in '{target_col}' "
                "using linear method.",
                UserWarning,
                stacklevel=2,
            )
            df = df.copy()
            df[target_col] = df[target_col].interpolate(method="linear")
            df[target_col] = df[target_col].bfill().ffill()
        else:
            raise ValueError(
                f"Unknown missing_data strategy: '{missing_strategy}'. "
                f"Expected 'error', 'drop', or 'interpolate'."
            )

    # 2. Prepare splits
    splits = prepare_splits(
        df,
        intervention_date,
        date_col=date_col,
        split_method=split_method_resolved,
        test_pct=test_pct_resolved,
        holdout_pct=holdout_pct_resolved,
        test_days=test_days_resolved,
        holdout_days=holdout_days_resolved,
    )

    logger.info(
        "Splits: train=%d, test=%d, holdout=%d",
        len(splits.train_df), len(splits.test_df), len(splits.holdout_df),
    )

    # 3. Instantiate model
    model_params = get_model_config(config, model_name)
    model = _get_model(model_name, model_params)

    # 3b. Warn about long-horizon ARIMA forecasts (B5)
    holdout_days = len(splits.holdout_df)
    if model_name == "arima" and holdout_days > 90:
        warnings.warn(
            f"ARIMA with holdout_days={holdout_days}: ARIMA point forecasts "
            "converge to the unconditional mean over long horizons, which can "
            "bias the counterfactual estimate. Consider prophet_xgb or "
            "prophet_then_xgb for holdout windows beyond 90 days.",
            UserWarning,
            stacklevel=2,
        )

    # 4. Fit model
    logger.info("Fitting %s model...", model_name)
    fit_result = model.fit(
        splits.train_df, target_col=target_col,
        date_col=date_col, covariate_cols=covariate_cols or None,
    )

    # 4b. Compute residual diagnostics
    diag = compute_diagnostics(fit_result, model_name)

    # 5. Bootstrap CIs
    boot_config = config["bootstrap"]
    mbb = MovingBlockBootstrap(
        n_sim=boot_config["n_sim"],
        block_length=boot_config["block_length"],
        ci_method=boot_config["ci_method"],
        ci_level=boot_config["ci_level"],
        n_jobs=boot_config.get("n_jobs", 1),
    )

    logger.info("Running MBB with %d simulations...", boot_config["n_sim"])
    bootstrap_result = mbb.generate_cis(
        model, splits.train_df, splits.full_predict_df,
        target_col=target_col, date_col=date_col,
        covariate_cols=covariate_cols or None, seed=seed,
    )

    # 6. Compute metrics
    train_pred = model.predict(
        splits.train_df, target_col=target_col,
        date_col=date_col, covariate_cols=covariate_cols or None,
    )
    metrics_train = compute_metrics(
        splits.train_df[target_col].values,
        train_pred.predicted,
        seasonality=config["metrics"]["seasonality"],
    )

    # Test metrics from bootstrap point predictions
    test_mask = pd.to_datetime(bootstrap_result.dates) < splits.intervention_date
    if test_mask.any():
        metrics_test = compute_metrics(
            bootstrap_result.actual[test_mask],
            bootstrap_result.predicted[test_mask],
            training_actual=splits.train_df[target_col].values,
            seasonality=config["metrics"]["seasonality"],
        )
    else:
        metrics_test = metrics_train

    # 7. Excess calculation
    excess_table = calculate_excess(
        bootstrap_result,
        intervention_date=splits.intervention_date,
        periods_config=config.get("excess_periods") or None,
        ci_level=boot_config["ci_level"],
    )

    # 8. Save outputs
    if output_dir:
        out = Path(output_dir)
        out.mkdir(parents=True, exist_ok=True)

        result_for_plot = PipelineResult(
            model_name=model_name,
            fit_result=fit_result,
            bootstrap_result=bootstrap_result,
            metrics_train=metrics_train,
            metrics_test=metrics_test,
            excess_table=excess_table,
            config=config,
            diagnostics=diag,
        )

        plot_counterfactual(
            result_for_plot, splits,
            save_path=out / f"{model_name}_counterfactual.png",
            config=config,
        )
        save_excess_table(excess_table, out / f"{model_name}_excess.csv")
        save_metrics_table(
            {"train": metrics_train, "test": metrics_test},
            out / f"{model_name}_metrics.csv",
        )
        if not excess_table.daily_excess.empty:
            ate = calc_ate_summary(excess_table)
            save_ate_summary(ate, out / f"{model_name}_ate_summary.csv")

        logger.info("Outputs saved to %s", out)

    return PipelineResult(
        model_name=model_name,
        fit_result=fit_result,
        bootstrap_result=bootstrap_result,
        metrics_train=metrics_train,
        metrics_test=metrics_test,
        excess_table=excess_table,
        config=config,
        diagnostics=diag,
    )

run_batch(series_list, config_path=None, output_dir='output', n_jobs=1, seed=42)

Run ITS pipeline on multiple series.

Parameters:

Name Type Description Default
series_list list[dict]

Each dict has: series_id, df, intervention_date, target_col, date_col, covariate_cols, model_name (optional), config_overrides (optional).

required
config_path str or Path

Path to shared YAML config.

None
output_dir str or Path

Base output directory.

'output'
n_jobs int

Number of parallel jobs. 1 = sequential.

1
seed int

Global seed for reproducibility.

42

Returns:

Type Description
list[PipelineResult]
Source code in its2s/batch/runner.py
def run_batch(series_list, config_path=None, output_dir="output",
              n_jobs=1, seed=42):
    """Run ITS pipeline on multiple series.

    Parameters
    ----------
    series_list : list[dict]
        Each dict has: series_id, df, intervention_date, target_col,
        date_col, covariate_cols, model_name (optional), config_overrides (optional).
    config_path : str or Path, optional
        Path to shared YAML config.
    output_dir : str or Path
        Base output directory.
    n_jobs : int
        Number of parallel jobs. 1 = sequential.
    seed : int
        Global seed for reproducibility.

    Returns
    -------
    list[PipelineResult]
    """
    # M2-2: Validate series_list structure
    _REQUIRED_KEYS = {"series_id", "df", "intervention_date"}
    for i, spec in enumerate(series_list):
        missing_keys = _REQUIRED_KEYS - set(spec.keys())
        if missing_keys:
            raise ValueError(
                f"series_list[{i}] is missing required keys: {sorted(missing_keys)}. "
                f"Each entry must have 'series_id', 'df', and 'intervention_date'."
            )

    from ..settings import load_config
    config = load_config(config_path)
    n_sim = config["bootstrap"]["n_sim"]

    run_dir = _make_run_dir(output_dir, n_sim)
    logger.info("Batch output directory: %s", run_dir)

    def _process(spec):
        series_dir = run_dir / spec["series_id"]
        series_dir.mkdir(parents=True, exist_ok=True)
        return _run_one_series(spec, config_path, series_dir, seed)

    if n_jobs == 1:
        results = []
        for spec in series_list:
            logger.info("Processing series: %s", spec["series_id"])
            results.append(_process(spec))
    else:
        results = Parallel(n_jobs=n_jobs)(
            delayed(_process)(spec) for spec in series_list
        )

    logger.info("Batch complete: %d series processed.", len(results))
    return results

Cross-validation and tuning

time_series_cv(df, intervention_date, model_name='arima', n_folds=5, test_days=90, min_train_days=365, skip_days=0, cv_end_date=None, split_method='days', test_pct=0.1, min_train_pct=0.5, skip_pct=0.0, date_col=None, target_col=None, covariate_cols=None, config_path=None, config_overrides=None)

Evaluate a model using expanding-window time-series cross-validation.

Folds are non-overlapping by construction. Consecutive validation windows are separated by skip_days (matching the R reference implementation's skip parameter). The CV window can be capped at cv_end_date to prevent tuning or evaluation folds from touching the held-out test period defined by run_single_its.

Fold layout (train = expanding, test = fixed width):

|------ min_train_days ------|-- test_days --|-- skip_days --|-- test_days --|...
fold 1: train [0, T0),         test [T0, T0+test_days)
fold 2: train [0, T0+test_days+skip_days), test [T0+test_days+skip_days, ...)
...

Parameters:

Name Type Description Default
df DataFrame

Full time series dataset.

required
intervention_date str or Timestamp

CV uses only pre-intervention data (or data before cv_end_date if set).

required
model_name str

Model to evaluate.

'arima'
n_folds int

Maximum number of CV folds to attempt.

5
test_days int

Length of each validation window in days.

90
min_train_days int

Minimum training window for the first fold.

365
skip_days int

Gap in days between the end of one validation window and the start of the next. Set to 0 for adjacent non-overlapping folds. The R reference uses skip = "12 months" (365 days for daily data).

0
cv_end_date str or Timestamp

Upper bound on the data used for CV. Must be <= intervention_date. Use intervention_date - test_days to keep CV folds out of the held-out evaluation window used by run_single_its. Defaults to intervention_date (all pre-intervention data).

None
date_col str

Date column name. Defaults to config value.

None
target_col str

Target column name. Defaults to config value.

None
covariate_cols list[str]

Covariate column names.

None
config_path str or Path
None
config_overrides dict
None

Returns:

Type Description
CVResult
Source code in its2s/cross_validation.py
def time_series_cv(df, intervention_date, model_name="arima",
                   n_folds=5, test_days=90, min_train_days=365,
                   skip_days=0, cv_end_date=None,
                   split_method="days",
                   test_pct=0.10, min_train_pct=0.50, skip_pct=0.0,
                   date_col=None, target_col=None, covariate_cols=None,
                   config_path=None, config_overrides=None):
    """Evaluate a model using expanding-window time-series cross-validation.

    Folds are non-overlapping by construction. Consecutive validation windows
    are separated by `skip_days` (matching the R reference implementation's
    `skip` parameter). The CV window can be capped at `cv_end_date` to prevent
    tuning or evaluation folds from touching the held-out test period defined
    by `run_single_its`.

    Fold layout (train = expanding, test = fixed width):

        |------ min_train_days ------|-- test_days --|-- skip_days --|-- test_days --|...
        fold 1: train [0, T0),         test [T0, T0+test_days)
        fold 2: train [0, T0+test_days+skip_days), test [T0+test_days+skip_days, ...)
        ...

    Parameters
    ----------
    df : pd.DataFrame
        Full time series dataset.
    intervention_date : str or pd.Timestamp
        CV uses only pre-intervention data (or data before cv_end_date if set).
    model_name : str
        Model to evaluate.
    n_folds : int
        Maximum number of CV folds to attempt.
    test_days : int
        Length of each validation window in days.
    min_train_days : int
        Minimum training window for the first fold.
    skip_days : int
        Gap in days between the end of one validation window and the start of
        the next. Set to 0 for adjacent non-overlapping folds. The R reference
        uses skip = "12 months" (365 days for daily data).
    cv_end_date : str or pd.Timestamp, optional
        Upper bound on the data used for CV. Must be <= intervention_date.
        Use intervention_date - test_days to keep CV folds out of the
        held-out evaluation window used by run_single_its.
        Defaults to intervention_date (all pre-intervention data).
    date_col : str, optional
        Date column name. Defaults to config value.
    target_col : str, optional
        Target column name. Defaults to config value.
    covariate_cols : list[str], optional
        Covariate column names.
    config_path : str or Path, optional
    config_overrides : dict, optional

    Returns
    -------
    CVResult
    """
    # Lazy import to avoid circular dependency
    from .pipeline import _get_model

    config = load_config(config_path, config_overrides)
    date_col = date_col or config["data"]["date_col"]
    target_col = target_col or config["data"]["target_col"]
    covariate_cols = (covariate_cols if covariate_cols is not None
                      else config["data"]["covariate_cols"])
    seasonality = config["metrics"]["seasonality"]

    intervention_date = pd.Timestamp(intervention_date)
    df = df.copy()
    df[date_col] = pd.to_datetime(df[date_col])
    df = df.sort_values(date_col).reset_index(drop=True)

    # Determine upper bound for CV data
    if cv_end_date is not None:
        cv_end_date = pd.Timestamp(cv_end_date)
        if cv_end_date > intervention_date:
            raise ValueError(
                f"cv_end_date ({cv_end_date.date()}) must be <= "
                f"intervention_date ({intervention_date.date()})."
            )
        cv_df = df[df[date_col] < cv_end_date].copy()
    else:
        cv_df = df[df[date_col] < intervention_date].copy()

    n_cv = len(cv_df)

    if split_method == "percent":
        budget = min_train_pct + n_folds * test_pct
        if budget > 1.0:
            raise ValueError(
                f"CV percent budget exceeded: min_train_pct ({min_train_pct}) + "
                f"n_folds ({n_folds}) * test_pct ({test_pct}) = {budget:.3f} > 1.0. "
                "Reduce n_folds, test_pct, or min_train_pct."
            )
        if not (0 < test_pct < 1):
            raise ValueError(f"test_pct must be in (0, 1), got {test_pct}.")
        if not (0 < min_train_pct < 1):
            raise ValueError(f"min_train_pct must be in (0, 1), got {min_train_pct}.")
        if skip_pct < 0 or skip_pct >= 1:
            raise ValueError(f"skip_pct must be in [0, 1), got {skip_pct}.")
        test_days = max(1, int(round(test_pct * n_cv)))
        min_train_days = max(1, int(round(min_train_pct * n_cv)))
        skip_days = max(0, int(round(skip_pct * n_cv)))
    elif split_method != "days":
        raise ValueError(
            f"split_method must be 'percent' or 'days', got {split_method!r}."
        )

    if n_cv < min_train_days + test_days:
        raise ValueError(
            f"Not enough pre-intervention data for CV. Need at least "
            f"{min_train_days + test_days} rows, have {n_cv}."
        )

    model_params = get_model_config(config, model_name)
    fold_results = []

    # Non-overlapping fold layout: each fold's test window starts at
    # min_train_days + i * (test_days + skip_days), guaranteeing a gap of
    # skip_days between the end of fold i and the start of fold i+1.
    for i in range(n_folds):
        test_start_idx = min_train_days + i * (test_days + skip_days)
        test_end_idx = test_start_idx + test_days

        if test_end_idx > n_cv:
            break

        train_fold = cv_df.iloc[:test_start_idx].copy()
        test_fold = cv_df.iloc[test_start_idx:test_end_idx].copy()

        model = _get_model(model_name, model_params)
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                model.fit(train_fold, target_col=target_col,
                          date_col=date_col, covariate_cols=covariate_cols or None)
                pred = model.predict(test_fold, target_col=target_col,
                                     date_col=date_col,
                                     covariate_cols=covariate_cols or None)
        except Exception as e:
            logger.warning("CV fold %d failed: %s", i + 1, e)
            continue

        metrics = compute_metrics(
            test_fold[target_col].values,
            pred.predicted,
            training_actual=train_fold[target_col].values,
            seasonality=seasonality,
        )

        fold_results.append(CVFoldResult(
            fold=i + 1,
            train_end=train_fold[date_col].iloc[-1],
            test_start=test_fold[date_col].iloc[0],
            test_end=test_fold[date_col].iloc[-1],
            n_train=len(train_fold),
            n_test=len(test_fold),
            metrics=metrics,
        ))

        logger.info(
            "CV fold %d/%d: RMSE=%.4f, MAE=%.4f, R2=%.4f",
            i + 1, n_folds, metrics.rmse, metrics.mae, metrics.r2,
        )

    if not fold_results:
        raise RuntimeError("All CV folds failed. Check data and model.")

    rmses = [f.metrics.rmse for f in fold_results]
    maes = [f.metrics.mae for f in fold_results]
    mapes = [f.metrics.mape for f in fold_results]
    r2s = [f.metrics.r2 for f in fold_results]

    return CVResult(
        model_name=model_name,
        folds=fold_results,
        mean_rmse=float(np.mean(rmses)),
        mean_mae=float(np.mean(maes)),
        mean_mape=float(np.nanmean(mapes)),
        mean_r2=float(np.mean(r2s)),
        std_rmse=float(np.std(rmses, ddof=1)) if len(rmses) > 1 else 0.0,
        std_mae=float(np.std(maes, ddof=1)) if len(maes) > 1 else 0.0,
    )

tune_model(df: pd.DataFrame, intervention_date, model_name: str, n_trials: int | None = None, n_folds: int = 5, test_days: int = 365, min_train_days: int = 730, skip_days: int = 0, cv_end_date=None, split_method: str = 'percent', test_pct: float = 0.1, min_train_pct: float = 0.5, skip_pct: float = 0.0, metric: str = 'rmse', config_path=None, n_jobs: int = 1, seed: int = 42) -> TuningResult

Tune model hyperparameters via Latin hypercube grid search with time-series CV.

Mirrors the R reference implementation (Two_Stage_ITS): a one-shot space-filling sample of the parameter space is evaluated via expanding-window CV, and the combination with the lowest mean CV RMSE (or MAE) is selected.

R reference CV settings: 5 folds, 12-month validation window, 2-year initial training window, 12-month skip between folds. Matching those settings: n_folds=5, test_days=365, min_train_days=730, skip_days=365

To prevent tuning from seeing the held-out evaluation window that run_single_its uses, set cv_end_date to intervention_date minus test_days.

Parameters:

Name Type Description Default
df DataFrame

Full time series dataset.

required
intervention_date str or Timestamp

Only pre-intervention data is used for tuning CV.

required
model_name str

One of "arima", "neuralprophet", "prophet_xgb", "prophet_then_xgb".

required
n_trials int or None

Number of parameter combinations to evaluate. Defaults to model-specific values matching R reference (100 for most models, 75 for neuralprophet).

None
n_folds int

Number of expanding-window CV folds.

5
test_days int

Validation window per fold in days.

365
min_train_days int

Minimum training window for the first fold in days.

730
skip_days int

Gap in days between consecutive fold validation windows. Set to 365 to match the R reference (skip = "12 months"). Defaults to 0 (adjacent folds).

0
cv_end_date str or Timestamp

Upper bound on data used for CV folds. Must be <= intervention_date. Pass intervention_date - pd.Timedelta(days=test_days) to prevent tuning folds from overlapping with the held-out evaluation window. Defaults to None (use all pre-intervention data).

None
metric str

Objective for selecting the best parameter set. "rmse" or "mae".

'rmse'
config_path str or Path

Path to a custom base YAML config (merged before tuning overrides).

None
n_jobs int

Parallel workers for evaluating trials. -1 uses all available cores.

1
seed int

Random seed for the Latin hypercube sampler.

42

Returns:

Type Description
TuningResult

Contains best_params (inject via run_single_its config_overrides), trials_df (all evaluated combinations and their CV metrics), and summary statistics.

Examples:

Tune and apply best params (R-matched CV settings, leakage-free):

import pandas as pd
result = tune_model(
    df, "2025-01-07", "prophet_xgb",
    n_trials=100, n_folds=5,
    test_days=365, min_train_days=730, skip_days=365,
    cv_end_date=pd.Timestamp("2025-01-07") - pd.Timedelta(days=365),
)
run_single_its(
    df, "2025-01-07",
    model_name="prophet_xgb",
    config_overrides={"models": {"prophet_xgb": result.best_params}},
)
Source code in its2s/tuning.py
def tune_model(
    df: pd.DataFrame,
    intervention_date,
    model_name: str,
    n_trials: int | None = None,
    n_folds: int = 5,
    test_days: int = 365,
    min_train_days: int = 730,
    skip_days: int = 0,
    cv_end_date=None,
    split_method: str = "percent",
    test_pct: float = 0.10,
    min_train_pct: float = 0.50,
    skip_pct: float = 0.0,
    metric: str = "rmse",
    config_path=None,
    n_jobs: int = 1,
    seed: int = 42,
) -> TuningResult:
    """Tune model hyperparameters via Latin hypercube grid search with time-series CV.

    Mirrors the R reference implementation (Two_Stage_ITS): a one-shot space-filling
    sample of the parameter space is evaluated via expanding-window CV, and the
    combination with the lowest mean CV RMSE (or MAE) is selected.

    R reference CV settings: 5 folds, 12-month validation window, 2-year initial
    training window, 12-month skip between folds. Matching those settings:
        n_folds=5, test_days=365, min_train_days=730, skip_days=365

    To prevent tuning from seeing the held-out evaluation window that
    run_single_its uses, set cv_end_date to intervention_date minus test_days.

    Parameters
    ----------
    df : pd.DataFrame
        Full time series dataset.
    intervention_date : str or pd.Timestamp
        Only pre-intervention data is used for tuning CV.
    model_name : str
        One of "arima", "neuralprophet", "prophet_xgb", "prophet_then_xgb".
    n_trials : int or None
        Number of parameter combinations to evaluate. Defaults to model-specific
        values matching R reference (100 for most models, 75 for neuralprophet).
    n_folds : int
        Number of expanding-window CV folds.
    test_days : int
        Validation window per fold in days.
    min_train_days : int
        Minimum training window for the first fold in days.
    skip_days : int
        Gap in days between consecutive fold validation windows. Set to 365 to
        match the R reference (skip = "12 months"). Defaults to 0 (adjacent folds).
    cv_end_date : str or pd.Timestamp, optional
        Upper bound on data used for CV folds. Must be <= intervention_date.
        Pass intervention_date - pd.Timedelta(days=test_days) to prevent tuning
        folds from overlapping with the held-out evaluation window.
        Defaults to None (use all pre-intervention data).
    metric : str
        Objective for selecting the best parameter set. "rmse" or "mae".
    config_path : str or Path, optional
        Path to a custom base YAML config (merged before tuning overrides).
    n_jobs : int
        Parallel workers for evaluating trials. -1 uses all available cores.
    seed : int
        Random seed for the Latin hypercube sampler.

    Returns
    -------
    TuningResult
        Contains best_params (inject via run_single_its config_overrides),
        trials_df (all evaluated combinations and their CV metrics), and
        summary statistics.

    Examples
    --------
    Tune and apply best params (R-matched CV settings, leakage-free):

        import pandas as pd
        result = tune_model(
            df, "2025-01-07", "prophet_xgb",
            n_trials=100, n_folds=5,
            test_days=365, min_train_days=730, skip_days=365,
            cv_end_date=pd.Timestamp("2025-01-07") - pd.Timedelta(days=365),
        )
        run_single_its(
            df, "2025-01-07",
            model_name="prophet_xgb",
            config_overrides={"models": {"prophet_xgb": result.best_params}},
        )
    """
    if model_name not in _SEARCH_SPACES:
        raise ValueError(
            f"No search space defined for '{model_name}'. "
            f"Available: {list(_SEARCH_SPACES)}"
        )
    if metric not in ("rmse", "mae"):
        raise ValueError(f"metric must be 'rmse' or 'mae', got '{metric}'")

    n_trials = n_trials if n_trials is not None else _DEFAULT_N_TRIALS[model_name]

    # M2-3: Lower-bound parameter checks
    if n_trials < 1:
        raise ValueError(f"n_trials must be >= 1, got {n_trials}.")
    if n_folds < 2:
        raise ValueError(
            f"n_folds must be >= 2, got {n_folds}. "
            "At least 2 folds are required for meaningful cross-validation."
        )
    if split_method == "days" and test_days < 1:
        raise ValueError(f"test_days must be >= 1, got {test_days}.")
    search_space = _SEARCH_SPACES[model_name]

    flat_trials = _sample_lhs(search_space, n_trials, seed)
    nested_trials = [_unflatten_params(p) for p in flat_trials]

    cv_kwargs = {
        "n_folds":        n_folds,
        "test_days":      test_days,
        "min_train_days": min_train_days,
        "skip_days":      skip_days,
        "cv_end_date":    cv_end_date,
        "split_method":   split_method,
        "test_pct":       test_pct,
        "min_train_pct":  min_train_pct,
        "skip_pct":       skip_pct,
        "config_path":    config_path,
    }

    logger.info(
        "Tuning %s: %d trials x %d folds (metric=%s, n_jobs=%d)",
        model_name, n_trials, n_folds, metric, n_jobs,
    )

    results = Parallel(n_jobs=n_jobs)(
        delayed(_evaluate_trial)(df, intervention_date, model_name, params, cv_kwargs)
        for params in nested_trials
    )

    rows = []
    for i, (flat_p, metrics) in enumerate(zip(flat_trials, results)):
        row = {"trial_id": i}
        row.update(flat_p)
        row.update(metrics)
        rows.append(row)

    trials_df = pd.DataFrame(rows)

    metric_col = f"mean_{metric}"
    best_idx = int(trials_df[metric_col].idxmin())
    best_row = trials_df.loc[best_idx]
    best_params = nested_trials[best_idx]

    logger.info(
        "Tuning complete: %s | best %s=%.4f +/- %.4f (trial %d/%d)",
        model_name, metric_col,
        best_row[metric_col], best_row["std_rmse"],
        best_idx, n_trials,
    )

    return TuningResult(
        model_name=model_name,
        best_params=best_params,
        best_rmse=float(best_row["mean_rmse"]),
        best_std_rmse=float(best_row["std_rmse"]) if not math.isnan(best_row["std_rmse"]) else 0.0,
        trials_df=trials_df,
        n_trials=n_trials,
        n_folds=n_folds,
        metric=metric,
        seed=seed,
    )

TuningResult(model_name: str, best_params: dict, best_rmse: float, best_std_rmse: float, trials_df: pd.DataFrame, n_trials: int, n_folds: int, metric: str, seed: int) dataclass

Result from a hyperparameter tuning run.

Attributes:

Name Type Description
model_name str

Name of the tuned model (e.g. "arima").

best_params dict

Nested param dict ready to pass as config_overrides["models"][model_name].

best_rmse float

Mean CV RMSE of the best parameter combination.

best_std_rmse float

Std dev of CV RMSE across folds for the best combination.

trials_df DataFrame

One row per trial. Columns: trial_id, <param cols>, mean_rmse, std_rmse, mean_mae, mean_mape, mean_r2, n_folds_ok.

n_trials int

Number of parameter combinations evaluated.

n_folds int

Number of expanding-window CV folds used per trial.

metric str

Objective used for selection ("rmse" or "mae").

seed int

Random seed driving the Latin hypercube sample.

Comparison

compare_models(df, intervention_date, model_names=None, target_col=None, date_col=None, covariate_cols=None, config_path=None, config_overrides=None, output_dir=None, seed=42)

Run the ITS pipeline with multiple models and return a comparison table.

Parameters:

Name Type Description Default
df DataFrame

Full time series dataset.

required
intervention_date str or Timestamp

Date of the intervention.

required
model_names list[str]

Models to compare. Defaults to all available models.

None
target_col str
None
date_col str
None
covariate_cols list[str]
None
config_path str or Path
None
config_overrides dict
None
output_dir str or Path
None
seed int
42

Returns:

Type Description
tuple[DataFrame, dict[str, PipelineResult]]

(comparison_table, results_dict)

Source code in its2s/compare.py
def compare_models(df, intervention_date, model_names=None,
                   target_col=None, date_col=None, covariate_cols=None,
                   config_path=None, config_overrides=None,
                   output_dir=None, seed=42):
    """Run the ITS pipeline with multiple models and return a comparison table.

    Parameters
    ----------
    df : pd.DataFrame
        Full time series dataset.
    intervention_date : str or pd.Timestamp
        Date of the intervention.
    model_names : list[str], optional
        Models to compare. Defaults to all available models.
    target_col : str, optional
    date_col : str, optional
    covariate_cols : list[str], optional
    config_path : str or Path, optional
    config_overrides : dict, optional
    output_dir : str or Path, optional
    seed : int

    Returns
    -------
    tuple[pd.DataFrame, dict[str, PipelineResult]]
        (comparison_table, results_dict)
    """
    from .pipeline import run_single_its, _get_available_model_names

    if model_names is None:
        model_names = _get_available_model_names()

    results = {}
    rows = []

    for model_name in model_names:
        logger.info("Running model: %s", model_name)
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                result = run_single_its(
                    df, intervention_date,
                    target_col=target_col,
                    date_col=date_col,
                    covariate_cols=covariate_cols,
                    model_name=model_name,
                    config_path=config_path,
                    config_overrides=config_overrides,
                    output_dir=output_dir,
                    seed=seed,
                )
            results[model_name] = result

            row = {
                "model": model_name,
                "train_rmse": result.metrics_train.rmse,
                "train_r2": result.metrics_train.r2,
                "test_rmse": result.metrics_test.rmse,
                "test_mae": result.metrics_test.mae,
                "test_mape": result.metrics_test.mape,
                "test_r2": result.metrics_test.r2,
                "bootstrap_n_successful": result.bootstrap_result.n_successful,
            }

            if not result.excess_table.daily_excess.empty:
                ate = calc_ate_summary(result.excess_table)
                total_row = ate[ate["metric"] == "Total ATE"].iloc[0]
                daily_row = ate[ate["metric"] == "Mean Daily ATE"].iloc[0]
                row["total_ate"] = total_row["estimate"]
                row["total_ate_ci_lo"] = total_row["ci_lo"]
                row["total_ate_ci_hi"] = total_row["ci_hi"]
                row["mean_daily_ate"] = daily_row["estimate"]

            rows.append(row)

        except Exception as e:
            logger.error("Model '%s' failed: %s", model_name, e)
            rows.append({"model": model_name, "error": str(e)})

    comparison = pd.DataFrame(rows)
    return comparison, results

Configuration

load_config(path=None, overrides=None)

Load YAML config, merge with defaults, apply overrides.

Parameters:

Name Type Description Default
path str or Path

Path to a custom YAML config. Merged on top of defaults.

None
overrides dict

Runtime overrides applied last.

None

Returns:

Type Description
dict
Source code in its2s/settings.py
def load_config(path=None, overrides=None):
    """Load YAML config, merge with defaults, apply overrides.

    Parameters
    ----------
    path : str or Path, optional
        Path to a custom YAML config. Merged on top of defaults.
    overrides : dict, optional
        Runtime overrides applied last.

    Returns
    -------
    dict
    """
    with open(_DEFAULT_CONFIG_PATH) as f:
        config = yaml.safe_load(f)

    if path is not None:
        with open(path) as f:
            user_config = yaml.safe_load(f) or {}
        config = _deep_merge(config, user_config)

    if overrides:
        config = _deep_merge(config, overrides)

    return config