oh shit im scared, but its alive

This commit is contained in:
dan
2025-12-15 18:38:10 +03:00
parent b850d4459b
commit e2a36c74a3
51 changed files with 4956 additions and 578 deletions

View File

@@ -1,351 +1,151 @@
import numpy as np
import pandas as pd
import statsmodels.api as sm
from pathlib import Path
from typing import Tuple, Optional
from typing import Optional, Tuple
import numpy as np
import statsmodels.api as sm
from sklearn.metrics import r2_score, roc_auc_score
import best_model_and_plots as bmp
# Наследуем константы/визуальные настройки из scatter-скрипта
# Константы из scatter-скрипта
X_COL = bmp.X_COL
DEFAULT_X_MAX = bmp.DEFAULT_X_MAX
DEFAULT_Y_MIN = bmp.DEFAULT_Y_MIN
DEFAULT_Y_MAX = bmp.DEFAULT_Y_MAX
DEFAULT_SCATTER_COLOR = bmp.DEFAULT_SCATTER_COLOR
DEFAULT_POINT_SIZE = bmp.DEFAULT_POINT_SIZE
DEFAULT_ALPHA = bmp.DEFAULT_ALPHA
DEFAULT_ALPHA_MIN = bmp.DEFAULT_ALPHA_MIN
DEFAULT_ALPHA_MAX = bmp.DEFAULT_ALPHA_MAX
DEFAULT_BINS_X = bmp.DEFAULT_BINS_X
DEFAULT_BINS_Y = bmp.DEFAULT_BINS_Y
DEFAULT_IQR_K = bmp.DEFAULT_IQR_K
DEFAULT_Q_LOW = bmp.DEFAULT_Q_LOW
DEFAULT_Q_HIGH = bmp.DEFAULT_Q_HIGH
DEFAULT_TREND_FRAC = bmp.DEFAULT_TREND_FRAC
DEFAULT_TREND_COLOR = bmp.DEFAULT_TREND_COLOR
DEFAULT_TREND_LINEWIDTH = bmp.DEFAULT_TREND_LINEWIDTH
BASE_OUT_DIR = bmp.BASE_OUT_DIR
def prepare_clean_data(
y_col: str,
*,
x_col: str = X_COL,
x_max: float = DEFAULT_X_MAX,
iqr_k: float = DEFAULT_IQR_K,
q_low: float = DEFAULT_Q_LOW,
q_high: float = DEFAULT_Q_HIGH,
) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]:
"""Готовит очищенные данные: фильтр по x и IQR, возвращает x, y и DataFrame."""
df = bmp.load_client_level(bmp.DB_PATH)
base = df[[x_col, y_col]].dropna()
in_range = bmp.filter_x_range(base, x_col, x_max)
cleaned = bmp.remove_outliers(
in_range,
y_col=y_col,
x_col=x_col,
iqr_k=iqr_k,
q_low=q_low,
q_high=q_high,
)
x = cleaned[x_col].to_numpy()
y = cleaned[y_col].to_numpy()
return x, y, cleaned
Y_COL = "orders_amt_total"
X_MAX = bmp.DEFAULT_X_MAX
Y_MIN = bmp.DEFAULT_Y_MIN
Y_MAX = bmp.DEFAULT_Y_MAX
def fit_quadratic(
x: np.ndarray,
y_target: np.ndarray,
weights: Optional[np.ndarray] = None,
) -> Tuple[sm.regression.linear_model.RegressionResultsWrapper, np.ndarray]:
"""Фитим квадратику по x -> y_target (WLS), предсказываем на тех же x."""
X_design = np.column_stack([x, x**2])
X_design = sm.add_constant(X_design)
if weights is not None:
model = sm.WLS(y_target, X_design, weights=weights).fit(cov_type="HC3")
cleaned: bmp.pd.DataFrame,
trend_data: Optional[Tuple[np.ndarray, np.ndarray]],
*,
x_col: str = X_COL,
y_col: str = Y_COL,
x_max: float = X_MAX,
) -> Tuple[Optional[sm.regression.linear_model.RegressionResultsWrapper], dict]:
"""Фитит y ~ 1 + x + x^2. Если есть тренд, использует его как целевое для r2_trend."""
df = cleaned[[x_col, y_col]].dropna()
if len(df) < 3:
return None, {}
if trend_data is not None and trend_data[0] is not None:
tx, ty = trend_data
tx = np.asarray(tx)
ty = np.asarray(ty)
mask = (tx <= x_max) & ~np.isnan(ty)
tx = tx[mask]
ty = ty[mask]
else:
model = sm.OLS(y_target, X_design).fit(cov_type="HC3")
tx = ty = None
y_hat = model.predict(X_design)
return model, y_hat
x = df[x_col].to_numpy()
y = df[y_col].to_numpy()
X_design = sm.add_constant(np.column_stack([x, x**2]))
model = sm.OLS(y, X_design).fit(cov_type="HC3")
auc = np.nan
binary = (y > 0).astype(int)
if len(np.unique(binary)) > 1:
auc = roc_auc_score(binary, model.predict(X_design))
r2_trend = np.nan
if tx is not None and len(tx) >= 3:
X_trend = sm.add_constant(np.column_stack([tx, tx**2]))
y_hat_trend = model.predict(X_trend)
if np.nanvar(ty) > 0:
r2_trend = r2_score(ty, y_hat_trend)
metrics = {
"auc": auc,
"r2_trend": r2_trend,
}
return model, metrics
def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Tuple[Optional[float], Optional[float]]:
"""Возвращает (R2, AUC по метке y>0)."""
r2 = r2_score(y_true, y_pred)
auc = None
try:
auc = roc_auc_score((y_true > 0).astype(int), y_pred)
except ValueError:
auc = None
return r2, auc
def map_trend_to_points(x_points: np.ndarray, trend_x: np.ndarray, trend_y: np.ndarray) -> np.ndarray:
"""Интерполирует значения тренда в точках x_points."""
if len(trend_x) == 0:
return np.zeros_like(x_points)
# гарантируем отсортированность
order = np.argsort(trend_x)
tx = trend_x[order]
ty = trend_y[order]
return np.interp(x_points, tx, ty, left=ty[0], right=ty[-1])
def density_weights(
df: pd.DataFrame,
y_col: str,
*,
x_col: str = X_COL,
x_max: float = DEFAULT_X_MAX,
alpha_min: float = DEFAULT_ALPHA_MIN,
alpha_max: float = DEFAULT_ALPHA_MAX,
bins_x: int = DEFAULT_BINS_X,
bins_y: int = DEFAULT_BINS_Y,
y_min: float = DEFAULT_Y_MIN,
y_max: float = DEFAULT_Y_MAX,
) -> np.ndarray:
"""Строит веса из плотности (та же схема, что и альфы на графике)."""
alphas = bmp.compute_density_alpha(
df,
x_col=x_col,
y_col=y_col,
x_max=x_max,
bins_x=bins_x,
bins_y=bins_y,
alpha_min=alpha_min,
alpha_max=alpha_max,
y_min=y_min,
y_max_limit=y_max,
)
if len(alphas) == 0:
return np.ones(len(df))
denom = max(alpha_max - alpha_min, 1e-9)
weights = (alphas - alpha_min) / denom
weights = np.clip(weights, 0, None)
return weights
def plot_quadratic_overlay(
df: pd.DataFrame,
model: sm.regression.linear_model.RegressionResultsWrapper,
y_col: str,
out_path: Path,
*,
x_col: str = X_COL,
x_max: float = DEFAULT_X_MAX,
y_min: float = DEFAULT_Y_MIN,
y_max: float = DEFAULT_Y_MAX,
scatter_color: str = DEFAULT_SCATTER_COLOR,
point_size: int = DEFAULT_POINT_SIZE,
alpha: float = DEFAULT_ALPHA,
alpha_min: float = DEFAULT_ALPHA_MIN,
alpha_max: float = DEFAULT_ALPHA_MAX,
bins_x: int = DEFAULT_BINS_X,
bins_y: int = DEFAULT_BINS_Y,
trend_frac: float = DEFAULT_TREND_FRAC,
trend_color: str = DEFAULT_TREND_COLOR,
trend_linewidth: float = DEFAULT_TREND_LINEWIDTH,
trend_method: str = bmp.DEFAULT_TREND_METHOD,
rolling_window: int = bmp.DEFAULT_ROLLING_WINDOW,
def plot_overall_quad(
x_max: float = X_MAX,
y_min: float = Y_MIN,
y_max: float = Y_MAX,
savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW,
savgol_poly: int = bmp.DEFAULT_SAVGOL_POLY,
) -> None:
"""Рисует облако + LOWESS-тренд + линию квадр. регрессии."""
fig, ax = bmp.plt.subplots(figsize=(8, 8))
alpha_values = bmp.compute_density_alpha(
df,
x_col=x_col,
y_col=y_col,
out_dir = bmp.BASE_OUT_DIR / Y_COL
res = bmp.plot_clean_trend_scatter(
bmp.load_client_level(bmp.DB_PATH),
y_col=Y_COL,
out_dir=out_dir,
x_col=X_COL,
x_max=x_max,
bins_x=bins_x,
bins_y=bins_y,
alpha_min=alpha_min,
alpha_max=alpha_max,
scatter_color=bmp.DEFAULT_SCATTER_COLOR,
point_size=bmp.DEFAULT_POINT_SIZE,
alpha=bmp.DEFAULT_TREND_ALPHA,
iqr_k=bmp.DEFAULT_IQR_K,
q_low=bmp.DEFAULT_Q_LOW,
q_high=bmp.DEFAULT_Q_HIGH,
alpha_min=bmp.DEFAULT_ALPHA_MIN,
alpha_max=bmp.DEFAULT_ALPHA_MAX,
bins_x=bmp.DEFAULT_BINS_X,
bins_y=bmp.DEFAULT_BINS_Y,
y_min=y_min,
y_max_limit=y_max,
)
ax.scatter(
df[x_col],
df[y_col],
color=scatter_color,
s=point_size,
alpha=alpha_values if len(alpha_values) else alpha,
linewidths=0,
label="Точки (очищено)",
)
# Тренд по выбранному методу
tx, ty = bmp.compute_trend(
df,
y_col=y_col,
x_col=x_col,
method=trend_method,
lowess_frac=trend_frac,
rolling_window=rolling_window,
y_max=y_max,
trend_frac=bmp.DEFAULT_TREND_FRAC,
trend_color=bmp.DEFAULT_TREND_COLOR,
trend_linewidth=bmp.DEFAULT_TREND_LINEWIDTH,
trend_method=bmp.DEFAULT_TREND_METHOD,
savgol_window=savgol_window,
savgol_poly=savgol_poly,
return_components=True,
)
if len(tx):
ax.plot(tx, ty, color=trend_color, linewidth=trend_linewidth, label=f"{trend_method} тренд")
# Квадратичная регрессия
if res is None:
print("Нет данных для построения графика")
return
fig, ax, cleaned, trend_data = res
model, metrics = fit_quadratic(cleaned, trend_data, x_col=X_COL, y_col=Y_COL, x_max=x_max)
if model is None:
print("Недостаточно точек для квадратичной регрессии")
fig.savefig(out_dir / "scatter_trend.png", dpi=150)
bmp.plt.close(fig)
return
# Квадратичная линия поверх существующего тренда
x_grid = np.linspace(0, x_max, 400)
X_grid = sm.add_constant(np.column_stack([x_grid, x_grid**2]))
y_grid = model.predict(X_grid)
ax.plot(x_grid, y_grid, color="blue", linewidth=2.3, linestyle="--", label="Квадр. регрессия")
ax.set_xlim(0, x_max)
ax.set_ylim(y_min, y_max)
ax.set_yticks(range(0, int(y_max) + 1, 2))
ax.set_xlabel("Среднее число показов в день")
ax.set_ylabel(y_col)
ax.set_title(f"Квадратичная регрессия: {y_col} vs {x_col}")
ax.grid(alpha=0.3)
ax.plot(x_grid, y_grid, color="blue", linewidth=2.2, linestyle="--", label="Квадр. регрессия")
ax.legend()
out_path.parent.mkdir(parents=True, exist_ok=True)
fig.tight_layout()
fig.savefig(out_path, dpi=150)
bmp.plt.close(fig)
print(f"Saved {out_path}")
def report_model(
model: sm.regression.linear_model.RegressionResultsWrapper,
r2: Optional[float],
auc: Optional[float],
*,
r2_trend: Optional[float] = None,
) -> None:
params = model.params
pvals = model.pvalues
fmt_p = lambda p: f"<1e-300" if p < 1e-300 else f"{p:.4g}"
print("\n=== Квадратичная регрессия (y ~ 1 + x + x^2) ===")
print(f"const: {params[0]:.6f} (p={fmt_p(pvals[0])})")
print(f"beta1 x: {params[1]:.6f} (p={fmt_p(pvals[1])})")
print(f"beta2 x^2: {params[2]:.6f} (p={fmt_p(pvals[2])})")
print(f"R2: {r2:.4f}" if r2 is not None else "R2: n/a")
if r2_trend is not None:
print(f"R2 vs trend target: {r2_trend:.4f}")
print(f"AUC (target y>0): {auc:.4f}" if auc is not None else "AUC: n/a (один класс)")
def generate_quadratic_analysis(
y_col: str,
*,
x_col: str = X_COL,
base_out_dir: Path = BASE_OUT_DIR,
config_name: str = "default",
x_max: float = DEFAULT_X_MAX,
y_min: float = DEFAULT_Y_MIN,
y_max: float = DEFAULT_Y_MAX,
scatter_color: str = DEFAULT_SCATTER_COLOR,
point_size: int = DEFAULT_POINT_SIZE,
alpha: float = DEFAULT_ALPHA,
alpha_min: float = DEFAULT_ALPHA_MIN,
alpha_max: float = DEFAULT_ALPHA_MAX,
bins_x: int = DEFAULT_BINS_X,
bins_y: int = DEFAULT_BINS_Y,
trend_frac: float = DEFAULT_TREND_FRAC,
trend_color: str = DEFAULT_TREND_COLOR,
trend_linewidth: float = DEFAULT_TREND_LINEWIDTH,
iqr_k: float = DEFAULT_IQR_K,
q_low: float = DEFAULT_Q_LOW,
q_high: float = DEFAULT_Q_HIGH,
trend_method: str = bmp.DEFAULT_TREND_METHOD,
rolling_window: int = bmp.DEFAULT_ROLLING_WINDOW,
savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW,
savgol_poly: int = bmp.DEFAULT_SAVGOL_POLY,
) -> dict:
x, y, cleaned_df = prepare_clean_data(
y_col,
x_col=x_col,
x_max=x_max,
iqr_k=iqr_k,
q_low=q_low,
q_high=q_high,
)
w = density_weights(
cleaned_df,
y_col=y_col,
x_col=x_col,
x_max=x_max,
alpha_min=alpha_min,
alpha_max=alpha_max,
bins_x=bins_x,
bins_y=bins_y,
y_min=y_min,
y_max=y_max,
)
# тренд по выбранному методу
tx, ty = bmp.compute_trend(
cleaned_df,
y_col=y_col,
x_col=x_col,
method=trend_method,
lowess_frac=trend_frac,
rolling_window=rolling_window,
savgol_window=savgol_window,
savgol_poly=savgol_poly,
summary_lines = [
f"R2_trend={metrics['r2_trend']:.3f}",
f"AUC={metrics['auc']:.3f}",
f"b1={params[1]:.3f} (p={pvals[1]:.3g})",
f"b2={params[2]:.3f} (p={pvals[2]:.3g})",
f"n={len(cleaned)}",
]
ax.text(
0.02,
0.95,
"\n".join(summary_lines),
transform=ax.transAxes,
ha="left",
va="top",
fontsize=9,
bbox=dict(boxstyle="round,pad=0.2", facecolor="white", alpha=0.65, edgecolor="gray"),
)
trend_target = map_trend_to_points(x, tx, ty)
model, y_hat = fit_quadratic(x, trend_target, weights=w)
r2_actual, auc = compute_metrics(y, y_hat)
r2_trend = r2_score(trend_target, y_hat) if len(trend_target) else None
report_model(model, r2_actual, auc, r2_trend=r2_trend)
out_dir = base_out_dir / config_name / str(y_col).replace("/", "_")
plot_quadratic_overlay(
cleaned_df,
model,
y_col=y_col,
out_path=out_dir / "quad_regression.png",
x_col=x_col,
x_max=x_max,
y_min=y_min,
y_max=y_max,
scatter_color=scatter_color,
point_size=point_size,
alpha=alpha,
alpha_min=alpha_min,
alpha_max=alpha_max,
bins_x=bins_x,
bins_y=bins_y,
trend_frac=trend_frac,
trend_color=trend_color,
trend_linewidth=trend_linewidth,
trend_method=trend_method,
rolling_window=rolling_window,
savgol_window=savgol_window,
savgol_poly=savgol_poly,
)
return {
"config": config_name,
"y_col": y_col,
"r2": r2_actual,
"r2_trend": r2_trend,
"auc": auc,
"params": {
"trend_method": trend_method,
"trend_frac": trend_frac,
"rolling_window": rolling_window,
"savgol_window": savgol_window,
"savgol_poly": savgol_poly,
"x_max": x_max,
"weights_alpha_range": (alpha_min, alpha_max),
},
"coeffs": model.params.tolist(),
"pvalues": model.pvalues.tolist(),
}
quad_path = out_dir / "scatter_trend_quad.png"
fig.tight_layout()
fig.savefig(quad_path, dpi=150)
bmp.plt.close(fig)
print(f"Saved {quad_path}")
def main() -> None:
generate_quadratic_analysis("orders_amt_total")
plot_overall_quad()
if __name__ == "__main__":