353 lines
11 KiB
Python
353 lines
11 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
import statsmodels.api as sm
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional
|
|
|
|
from sklearn.metrics import r2_score, roc_auc_score
|
|
|
|
import best_model_and_plots as bmp
|
|
|
|
# Наследуем константы/визуальные настройки из scatter-скрипта
|
|
X_COL = bmp.X_COL
|
|
DEFAULT_X_MAX = bmp.DEFAULT_X_MAX
|
|
DEFAULT_Y_MIN = bmp.DEFAULT_Y_MIN
|
|
DEFAULT_Y_MAX = bmp.DEFAULT_Y_MAX
|
|
DEFAULT_SCATTER_COLOR = bmp.DEFAULT_SCATTER_COLOR
|
|
DEFAULT_POINT_SIZE = bmp.DEFAULT_POINT_SIZE
|
|
DEFAULT_ALPHA = bmp.DEFAULT_ALPHA
|
|
DEFAULT_ALPHA_MIN = bmp.DEFAULT_ALPHA_MIN
|
|
DEFAULT_ALPHA_MAX = bmp.DEFAULT_ALPHA_MAX
|
|
DEFAULT_BINS_X = bmp.DEFAULT_BINS_X
|
|
DEFAULT_BINS_Y = bmp.DEFAULT_BINS_Y
|
|
DEFAULT_IQR_K = bmp.DEFAULT_IQR_K
|
|
DEFAULT_Q_LOW = bmp.DEFAULT_Q_LOW
|
|
DEFAULT_Q_HIGH = bmp.DEFAULT_Q_HIGH
|
|
DEFAULT_TREND_FRAC = bmp.DEFAULT_TREND_FRAC
|
|
DEFAULT_TREND_COLOR = bmp.DEFAULT_TREND_COLOR
|
|
DEFAULT_TREND_LINEWIDTH = bmp.DEFAULT_TREND_LINEWIDTH
|
|
BASE_OUT_DIR = bmp.BASE_OUT_DIR
|
|
|
|
|
|
def prepare_clean_data(
|
|
y_col: str,
|
|
*,
|
|
x_col: str = X_COL,
|
|
x_max: float = DEFAULT_X_MAX,
|
|
iqr_k: float = DEFAULT_IQR_K,
|
|
q_low: float = DEFAULT_Q_LOW,
|
|
q_high: float = DEFAULT_Q_HIGH,
|
|
) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]:
|
|
"""Готовит очищенные данные: фильтр по x и IQR, возвращает x, y и DataFrame."""
|
|
df = bmp.load_client_level(bmp.DB_PATH)
|
|
base = df[[x_col, y_col]].dropna()
|
|
in_range = bmp.filter_x_range(base, x_col, x_max)
|
|
cleaned = bmp.remove_outliers(
|
|
in_range,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
iqr_k=iqr_k,
|
|
q_low=q_low,
|
|
q_high=q_high,
|
|
)
|
|
x = cleaned[x_col].to_numpy()
|
|
y = cleaned[y_col].to_numpy()
|
|
return x, y, cleaned
|
|
|
|
|
|
def fit_quadratic(
|
|
x: np.ndarray,
|
|
y_target: np.ndarray,
|
|
weights: Optional[np.ndarray] = None,
|
|
) -> Tuple[sm.regression.linear_model.RegressionResultsWrapper, np.ndarray]:
|
|
"""Фитим квадратику по x -> y_target (WLS), предсказываем на тех же x."""
|
|
X_design = np.column_stack([x, x**2])
|
|
X_design = sm.add_constant(X_design)
|
|
if weights is not None:
|
|
model = sm.WLS(y_target, X_design, weights=weights).fit(cov_type="HC3")
|
|
else:
|
|
model = sm.OLS(y_target, X_design).fit(cov_type="HC3")
|
|
|
|
y_hat = model.predict(X_design)
|
|
return model, y_hat
|
|
|
|
|
|
def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Tuple[Optional[float], Optional[float]]:
|
|
"""Возвращает (R2, AUC по метке y>0)."""
|
|
r2 = r2_score(y_true, y_pred)
|
|
auc = None
|
|
try:
|
|
auc = roc_auc_score((y_true > 0).astype(int), y_pred)
|
|
except ValueError:
|
|
auc = None
|
|
return r2, auc
|
|
|
|
|
|
def map_trend_to_points(x_points: np.ndarray, trend_x: np.ndarray, trend_y: np.ndarray) -> np.ndarray:
|
|
"""Интерполирует значения тренда в точках x_points."""
|
|
if len(trend_x) == 0:
|
|
return np.zeros_like(x_points)
|
|
# гарантируем отсортированность
|
|
order = np.argsort(trend_x)
|
|
tx = trend_x[order]
|
|
ty = trend_y[order]
|
|
return np.interp(x_points, tx, ty, left=ty[0], right=ty[-1])
|
|
|
|
|
|
def density_weights(
|
|
df: pd.DataFrame,
|
|
y_col: str,
|
|
*,
|
|
x_col: str = X_COL,
|
|
x_max: float = DEFAULT_X_MAX,
|
|
alpha_min: float = DEFAULT_ALPHA_MIN,
|
|
alpha_max: float = DEFAULT_ALPHA_MAX,
|
|
bins_x: int = DEFAULT_BINS_X,
|
|
bins_y: int = DEFAULT_BINS_Y,
|
|
y_min: float = DEFAULT_Y_MIN,
|
|
y_max: float = DEFAULT_Y_MAX,
|
|
) -> np.ndarray:
|
|
"""Строит веса из плотности (та же схема, что и альфы на графике)."""
|
|
alphas = bmp.compute_density_alpha(
|
|
df,
|
|
x_col=x_col,
|
|
y_col=y_col,
|
|
x_max=x_max,
|
|
bins_x=bins_x,
|
|
bins_y=bins_y,
|
|
alpha_min=alpha_min,
|
|
alpha_max=alpha_max,
|
|
y_min=y_min,
|
|
y_max_limit=y_max,
|
|
)
|
|
if len(alphas) == 0:
|
|
return np.ones(len(df))
|
|
denom = max(alpha_max - alpha_min, 1e-9)
|
|
weights = (alphas - alpha_min) / denom
|
|
weights = np.clip(weights, 0, None)
|
|
return weights
|
|
|
|
|
|
def plot_quadratic_overlay(
|
|
df: pd.DataFrame,
|
|
model: sm.regression.linear_model.RegressionResultsWrapper,
|
|
y_col: str,
|
|
out_path: Path,
|
|
*,
|
|
x_col: str = X_COL,
|
|
x_max: float = DEFAULT_X_MAX,
|
|
y_min: float = DEFAULT_Y_MIN,
|
|
y_max: float = DEFAULT_Y_MAX,
|
|
scatter_color: str = DEFAULT_SCATTER_COLOR,
|
|
point_size: int = DEFAULT_POINT_SIZE,
|
|
alpha: float = DEFAULT_ALPHA,
|
|
alpha_min: float = DEFAULT_ALPHA_MIN,
|
|
alpha_max: float = DEFAULT_ALPHA_MAX,
|
|
bins_x: int = DEFAULT_BINS_X,
|
|
bins_y: int = DEFAULT_BINS_Y,
|
|
trend_frac: float = DEFAULT_TREND_FRAC,
|
|
trend_color: str = DEFAULT_TREND_COLOR,
|
|
trend_linewidth: float = DEFAULT_TREND_LINEWIDTH,
|
|
trend_method: str = bmp.DEFAULT_TREND_METHOD,
|
|
rolling_window: int = bmp.DEFAULT_ROLLING_WINDOW,
|
|
savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW,
|
|
savgol_poly: int = bmp.DEFAULT_SAVGOL_POLY,
|
|
) -> None:
|
|
"""Рисует облако + LOWESS-тренд + линию квадр. регрессии."""
|
|
fig, ax = bmp.plt.subplots(figsize=(8, 8))
|
|
alpha_values = bmp.compute_density_alpha(
|
|
df,
|
|
x_col=x_col,
|
|
y_col=y_col,
|
|
x_max=x_max,
|
|
bins_x=bins_x,
|
|
bins_y=bins_y,
|
|
alpha_min=alpha_min,
|
|
alpha_max=alpha_max,
|
|
y_min=y_min,
|
|
y_max_limit=y_max,
|
|
)
|
|
ax.scatter(
|
|
df[x_col],
|
|
df[y_col],
|
|
color=scatter_color,
|
|
s=point_size,
|
|
alpha=alpha_values if len(alpha_values) else alpha,
|
|
linewidths=0,
|
|
label="Точки (очищено)",
|
|
)
|
|
|
|
# Тренд по выбранному методу
|
|
tx, ty = bmp.compute_trend(
|
|
df,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
method=trend_method,
|
|
lowess_frac=trend_frac,
|
|
rolling_window=rolling_window,
|
|
savgol_window=savgol_window,
|
|
savgol_poly=savgol_poly,
|
|
)
|
|
if len(tx):
|
|
ax.plot(tx, ty, color=trend_color, linewidth=trend_linewidth, label=f"{trend_method} тренд")
|
|
|
|
# Квадратичная регрессия
|
|
x_grid = np.linspace(0, x_max, 400)
|
|
X_grid = sm.add_constant(np.column_stack([x_grid, x_grid**2]))
|
|
y_grid = model.predict(X_grid)
|
|
ax.plot(x_grid, y_grid, color="blue", linewidth=2.3, linestyle="--", label="Квадр. регрессия")
|
|
|
|
ax.set_xlim(0, x_max)
|
|
ax.set_ylim(y_min, y_max)
|
|
ax.set_yticks(range(0, int(y_max) + 1, 2))
|
|
ax.set_xlabel("Среднее число показов в день")
|
|
ax.set_ylabel(y_col)
|
|
ax.set_title(f"Квадратичная регрессия: {y_col} vs {x_col}")
|
|
ax.grid(alpha=0.3)
|
|
ax.legend()
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
fig.tight_layout()
|
|
fig.savefig(out_path, dpi=150)
|
|
bmp.plt.close(fig)
|
|
print(f"Saved {out_path}")
|
|
|
|
|
|
def report_model(
|
|
model: sm.regression.linear_model.RegressionResultsWrapper,
|
|
r2: Optional[float],
|
|
auc: Optional[float],
|
|
*,
|
|
r2_trend: Optional[float] = None,
|
|
) -> None:
|
|
params = model.params
|
|
pvals = model.pvalues
|
|
fmt_p = lambda p: f"<1e-300" if p < 1e-300 else f"{p:.4g}"
|
|
print("\n=== Квадратичная регрессия (y ~ 1 + x + x^2) ===")
|
|
print(f"const: {params[0]:.6f} (p={fmt_p(pvals[0])})")
|
|
print(f"beta1 x: {params[1]:.6f} (p={fmt_p(pvals[1])})")
|
|
print(f"beta2 x^2: {params[2]:.6f} (p={fmt_p(pvals[2])})")
|
|
print(f"R2: {r2:.4f}" if r2 is not None else "R2: n/a")
|
|
if r2_trend is not None:
|
|
print(f"R2 vs trend target: {r2_trend:.4f}")
|
|
print(f"AUC (target y>0): {auc:.4f}" if auc is not None else "AUC: n/a (один класс)")
|
|
|
|
|
|
def generate_quadratic_analysis(
|
|
y_col: str,
|
|
*,
|
|
x_col: str = X_COL,
|
|
base_out_dir: Path = BASE_OUT_DIR,
|
|
config_name: str = "default",
|
|
x_max: float = DEFAULT_X_MAX,
|
|
y_min: float = DEFAULT_Y_MIN,
|
|
y_max: float = DEFAULT_Y_MAX,
|
|
scatter_color: str = DEFAULT_SCATTER_COLOR,
|
|
point_size: int = DEFAULT_POINT_SIZE,
|
|
alpha: float = DEFAULT_ALPHA,
|
|
alpha_min: float = DEFAULT_ALPHA_MIN,
|
|
alpha_max: float = DEFAULT_ALPHA_MAX,
|
|
bins_x: int = DEFAULT_BINS_X,
|
|
bins_y: int = DEFAULT_BINS_Y,
|
|
trend_frac: float = DEFAULT_TREND_FRAC,
|
|
trend_color: str = DEFAULT_TREND_COLOR,
|
|
trend_linewidth: float = DEFAULT_TREND_LINEWIDTH,
|
|
iqr_k: float = DEFAULT_IQR_K,
|
|
q_low: float = DEFAULT_Q_LOW,
|
|
q_high: float = DEFAULT_Q_HIGH,
|
|
trend_method: str = bmp.DEFAULT_TREND_METHOD,
|
|
rolling_window: int = bmp.DEFAULT_ROLLING_WINDOW,
|
|
savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW,
|
|
savgol_poly: int = bmp.DEFAULT_SAVGOL_POLY,
|
|
) -> dict:
|
|
x, y, cleaned_df = prepare_clean_data(
|
|
y_col,
|
|
x_col=x_col,
|
|
x_max=x_max,
|
|
iqr_k=iqr_k,
|
|
q_low=q_low,
|
|
q_high=q_high,
|
|
)
|
|
w = density_weights(
|
|
cleaned_df,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
x_max=x_max,
|
|
alpha_min=alpha_min,
|
|
alpha_max=alpha_max,
|
|
bins_x=bins_x,
|
|
bins_y=bins_y,
|
|
y_min=y_min,
|
|
y_max=y_max,
|
|
)
|
|
# тренд по выбранному методу
|
|
tx, ty = bmp.compute_trend(
|
|
cleaned_df,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
method=trend_method,
|
|
lowess_frac=trend_frac,
|
|
rolling_window=rolling_window,
|
|
savgol_window=savgol_window,
|
|
savgol_poly=savgol_poly,
|
|
)
|
|
|
|
trend_target = map_trend_to_points(x, tx, ty)
|
|
model, y_hat = fit_quadratic(x, trend_target, weights=w)
|
|
r2_actual, auc = compute_metrics(y, y_hat)
|
|
r2_trend = r2_score(trend_target, y_hat) if len(trend_target) else None
|
|
report_model(model, r2_actual, auc, r2_trend=r2_trend)
|
|
|
|
out_dir = base_out_dir / config_name / str(y_col).replace("/", "_")
|
|
plot_quadratic_overlay(
|
|
cleaned_df,
|
|
model,
|
|
y_col=y_col,
|
|
out_path=out_dir / "quad_regression.png",
|
|
x_col=x_col,
|
|
x_max=x_max,
|
|
y_min=y_min,
|
|
y_max=y_max,
|
|
scatter_color=scatter_color,
|
|
point_size=point_size,
|
|
alpha=alpha,
|
|
alpha_min=alpha_min,
|
|
alpha_max=alpha_max,
|
|
bins_x=bins_x,
|
|
bins_y=bins_y,
|
|
trend_frac=trend_frac,
|
|
trend_color=trend_color,
|
|
trend_linewidth=trend_linewidth,
|
|
trend_method=trend_method,
|
|
rolling_window=rolling_window,
|
|
savgol_window=savgol_window,
|
|
savgol_poly=savgol_poly,
|
|
)
|
|
|
|
return {
|
|
"config": config_name,
|
|
"y_col": y_col,
|
|
"r2": r2_actual,
|
|
"r2_trend": r2_trend,
|
|
"auc": auc,
|
|
"params": {
|
|
"trend_method": trend_method,
|
|
"trend_frac": trend_frac,
|
|
"rolling_window": rolling_window,
|
|
"savgol_window": savgol_window,
|
|
"savgol_poly": savgol_poly,
|
|
"x_max": x_max,
|
|
"weights_alpha_range": (alpha_min, alpha_max),
|
|
},
|
|
"coeffs": model.params.tolist(),
|
|
"pvalues": model.pvalues.tolist(),
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
generate_quadratic_analysis("orders_amt_total")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|