import numpy as np import pandas as pd import statsmodels.api as sm from pathlib import Path from typing import Tuple, Optional from sklearn.metrics import r2_score, roc_auc_score import best_model_and_plots as bmp # Наследуем константы/визуальные настройки из scatter-скрипта X_COL = bmp.X_COL DEFAULT_X_MAX = bmp.DEFAULT_X_MAX DEFAULT_Y_MIN = bmp.DEFAULT_Y_MIN DEFAULT_Y_MAX = bmp.DEFAULT_Y_MAX DEFAULT_SCATTER_COLOR = bmp.DEFAULT_SCATTER_COLOR DEFAULT_POINT_SIZE = bmp.DEFAULT_POINT_SIZE DEFAULT_ALPHA = bmp.DEFAULT_ALPHA DEFAULT_ALPHA_MIN = bmp.DEFAULT_ALPHA_MIN DEFAULT_ALPHA_MAX = bmp.DEFAULT_ALPHA_MAX DEFAULT_BINS_X = bmp.DEFAULT_BINS_X DEFAULT_BINS_Y = bmp.DEFAULT_BINS_Y DEFAULT_IQR_K = bmp.DEFAULT_IQR_K DEFAULT_Q_LOW = bmp.DEFAULT_Q_LOW DEFAULT_Q_HIGH = bmp.DEFAULT_Q_HIGH DEFAULT_TREND_FRAC = bmp.DEFAULT_TREND_FRAC DEFAULT_TREND_COLOR = bmp.DEFAULT_TREND_COLOR DEFAULT_TREND_LINEWIDTH = bmp.DEFAULT_TREND_LINEWIDTH BASE_OUT_DIR = bmp.BASE_OUT_DIR def prepare_clean_data( y_col: str, *, x_col: str = X_COL, x_max: float = DEFAULT_X_MAX, iqr_k: float = DEFAULT_IQR_K, q_low: float = DEFAULT_Q_LOW, q_high: float = DEFAULT_Q_HIGH, ) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]: """Готовит очищенные данные: фильтр по x и IQR, возвращает x, y и DataFrame.""" df = bmp.load_client_level(bmp.DB_PATH) base = df[[x_col, y_col]].dropna() in_range = bmp.filter_x_range(base, x_col, x_max) cleaned = bmp.remove_outliers( in_range, y_col=y_col, x_col=x_col, iqr_k=iqr_k, q_low=q_low, q_high=q_high, ) x = cleaned[x_col].to_numpy() y = cleaned[y_col].to_numpy() return x, y, cleaned def fit_quadratic( x: np.ndarray, y_target: np.ndarray, weights: Optional[np.ndarray] = None, ) -> Tuple[sm.regression.linear_model.RegressionResultsWrapper, np.ndarray]: """Фитим квадратику по x -> y_target (WLS), предсказываем на тех же x.""" X_design = np.column_stack([x, x**2]) X_design = sm.add_constant(X_design) if weights is not None: model = sm.WLS(y_target, X_design, weights=weights).fit(cov_type="HC3") else: model = sm.OLS(y_target, X_design).fit(cov_type="HC3") y_hat = model.predict(X_design) return model, y_hat def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Tuple[Optional[float], Optional[float]]: """Возвращает (R2, AUC по метке y>0).""" r2 = r2_score(y_true, y_pred) auc = None try: auc = roc_auc_score((y_true > 0).astype(int), y_pred) except ValueError: auc = None return r2, auc def map_trend_to_points(x_points: np.ndarray, trend_x: np.ndarray, trend_y: np.ndarray) -> np.ndarray: """Интерполирует значения тренда в точках x_points.""" if len(trend_x) == 0: return np.zeros_like(x_points) # гарантируем отсортированность order = np.argsort(trend_x) tx = trend_x[order] ty = trend_y[order] return np.interp(x_points, tx, ty, left=ty[0], right=ty[-1]) def density_weights( df: pd.DataFrame, y_col: str, *, x_col: str = X_COL, x_max: float = DEFAULT_X_MAX, alpha_min: float = DEFAULT_ALPHA_MIN, alpha_max: float = DEFAULT_ALPHA_MAX, bins_x: int = DEFAULT_BINS_X, bins_y: int = DEFAULT_BINS_Y, y_min: float = DEFAULT_Y_MIN, y_max: float = DEFAULT_Y_MAX, ) -> np.ndarray: """Строит веса из плотности (та же схема, что и альфы на графике).""" alphas = bmp.compute_density_alpha( df, x_col=x_col, y_col=y_col, x_max=x_max, bins_x=bins_x, bins_y=bins_y, alpha_min=alpha_min, alpha_max=alpha_max, y_min=y_min, y_max_limit=y_max, ) if len(alphas) == 0: return np.ones(len(df)) denom = max(alpha_max - alpha_min, 1e-9) weights = (alphas - alpha_min) / denom weights = np.clip(weights, 0, None) return weights def plot_quadratic_overlay( df: pd.DataFrame, model: sm.regression.linear_model.RegressionResultsWrapper, y_col: str, out_path: Path, *, x_col: str = X_COL, x_max: float = DEFAULT_X_MAX, y_min: float = DEFAULT_Y_MIN, y_max: float = DEFAULT_Y_MAX, scatter_color: str = DEFAULT_SCATTER_COLOR, point_size: int = DEFAULT_POINT_SIZE, alpha: float = DEFAULT_ALPHA, alpha_min: float = DEFAULT_ALPHA_MIN, alpha_max: float = DEFAULT_ALPHA_MAX, bins_x: int = DEFAULT_BINS_X, bins_y: int = DEFAULT_BINS_Y, trend_frac: float = DEFAULT_TREND_FRAC, trend_color: str = DEFAULT_TREND_COLOR, trend_linewidth: float = DEFAULT_TREND_LINEWIDTH, trend_method: str = bmp.DEFAULT_TREND_METHOD, rolling_window: int = bmp.DEFAULT_ROLLING_WINDOW, savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW, savgol_poly: int = bmp.DEFAULT_SAVGOL_POLY, ) -> None: """Рисует облако + LOWESS-тренд + линию квадр. регрессии.""" fig, ax = bmp.plt.subplots(figsize=(8, 8)) alpha_values = bmp.compute_density_alpha( df, x_col=x_col, y_col=y_col, x_max=x_max, bins_x=bins_x, bins_y=bins_y, alpha_min=alpha_min, alpha_max=alpha_max, y_min=y_min, y_max_limit=y_max, ) ax.scatter( df[x_col], df[y_col], color=scatter_color, s=point_size, alpha=alpha_values if len(alpha_values) else alpha, linewidths=0, label="Точки (очищено)", ) # Тренд по выбранному методу tx, ty = bmp.compute_trend( df, y_col=y_col, x_col=x_col, method=trend_method, lowess_frac=trend_frac, rolling_window=rolling_window, savgol_window=savgol_window, savgol_poly=savgol_poly, ) if len(tx): ax.plot(tx, ty, color=trend_color, linewidth=trend_linewidth, label=f"{trend_method} тренд") # Квадратичная регрессия x_grid = np.linspace(0, x_max, 400) X_grid = sm.add_constant(np.column_stack([x_grid, x_grid**2])) y_grid = model.predict(X_grid) ax.plot(x_grid, y_grid, color="blue", linewidth=2.3, linestyle="--", label="Квадр. регрессия") ax.set_xlim(0, x_max) ax.set_ylim(y_min, y_max) ax.set_yticks(range(0, int(y_max) + 1, 2)) ax.set_xlabel("Среднее число показов в день") ax.set_ylabel(y_col) ax.set_title(f"Квадратичная регрессия: {y_col} vs {x_col}") ax.grid(alpha=0.3) ax.legend() out_path.parent.mkdir(parents=True, exist_ok=True) fig.tight_layout() fig.savefig(out_path, dpi=150) bmp.plt.close(fig) print(f"Saved {out_path}") def report_model( model: sm.regression.linear_model.RegressionResultsWrapper, r2: Optional[float], auc: Optional[float], *, r2_trend: Optional[float] = None, ) -> None: params = model.params pvals = model.pvalues fmt_p = lambda p: f"<1e-300" if p < 1e-300 else f"{p:.4g}" print("\n=== Квадратичная регрессия (y ~ 1 + x + x^2) ===") print(f"const: {params[0]:.6f} (p={fmt_p(pvals[0])})") print(f"beta1 x: {params[1]:.6f} (p={fmt_p(pvals[1])})") print(f"beta2 x^2: {params[2]:.6f} (p={fmt_p(pvals[2])})") print(f"R2: {r2:.4f}" if r2 is not None else "R2: n/a") if r2_trend is not None: print(f"R2 vs trend target: {r2_trend:.4f}") print(f"AUC (target y>0): {auc:.4f}" if auc is not None else "AUC: n/a (один класс)") def generate_quadratic_analysis( y_col: str, *, x_col: str = X_COL, base_out_dir: Path = BASE_OUT_DIR, config_name: str = "default", x_max: float = DEFAULT_X_MAX, y_min: float = DEFAULT_Y_MIN, y_max: float = DEFAULT_Y_MAX, scatter_color: str = DEFAULT_SCATTER_COLOR, point_size: int = DEFAULT_POINT_SIZE, alpha: float = DEFAULT_ALPHA, alpha_min: float = DEFAULT_ALPHA_MIN, alpha_max: float = DEFAULT_ALPHA_MAX, bins_x: int = DEFAULT_BINS_X, bins_y: int = DEFAULT_BINS_Y, trend_frac: float = DEFAULT_TREND_FRAC, trend_color: str = DEFAULT_TREND_COLOR, trend_linewidth: float = DEFAULT_TREND_LINEWIDTH, iqr_k: float = DEFAULT_IQR_K, q_low: float = DEFAULT_Q_LOW, q_high: float = DEFAULT_Q_HIGH, trend_method: str = bmp.DEFAULT_TREND_METHOD, rolling_window: int = bmp.DEFAULT_ROLLING_WINDOW, savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW, savgol_poly: int = bmp.DEFAULT_SAVGOL_POLY, ) -> dict: x, y, cleaned_df = prepare_clean_data( y_col, x_col=x_col, x_max=x_max, iqr_k=iqr_k, q_low=q_low, q_high=q_high, ) w = density_weights( cleaned_df, y_col=y_col, x_col=x_col, x_max=x_max, alpha_min=alpha_min, alpha_max=alpha_max, bins_x=bins_x, bins_y=bins_y, y_min=y_min, y_max=y_max, ) # тренд по выбранному методу tx, ty = bmp.compute_trend( cleaned_df, y_col=y_col, x_col=x_col, method=trend_method, lowess_frac=trend_frac, rolling_window=rolling_window, savgol_window=savgol_window, savgol_poly=savgol_poly, ) trend_target = map_trend_to_points(x, tx, ty) model, y_hat = fit_quadratic(x, trend_target, weights=w) r2_actual, auc = compute_metrics(y, y_hat) r2_trend = r2_score(trend_target, y_hat) if len(trend_target) else None report_model(model, r2_actual, auc, r2_trend=r2_trend) out_dir = base_out_dir / config_name / str(y_col).replace("/", "_") plot_quadratic_overlay( cleaned_df, model, y_col=y_col, out_path=out_dir / "quad_regression.png", x_col=x_col, x_max=x_max, y_min=y_min, y_max=y_max, scatter_color=scatter_color, point_size=point_size, alpha=alpha, alpha_min=alpha_min, alpha_max=alpha_max, bins_x=bins_x, bins_y=bins_y, trend_frac=trend_frac, trend_color=trend_color, trend_linewidth=trend_linewidth, trend_method=trend_method, rolling_window=rolling_window, savgol_window=savgol_window, savgol_poly=savgol_poly, ) return { "config": config_name, "y_col": y_col, "r2": r2_actual, "r2_trend": r2_trend, "auc": auc, "params": { "trend_method": trend_method, "trend_frac": trend_frac, "rolling_window": rolling_window, "savgol_window": savgol_window, "savgol_poly": savgol_poly, "x_max": x_max, "weights_alpha_range": (alpha_min, alpha_max), }, "coeffs": model.params.tolist(), "pvalues": model.pvalues.tolist(), } def main() -> None: generate_quadratic_analysis("orders_amt_total") if __name__ == "__main__": main()