from __future__ import annotations import sqlite3 import sys from pathlib import Path from typing import Optional, Tuple import altair as alt import numpy as np import pandas as pd import statsmodels.api as sm from sklearn.metrics import roc_auc_score, r2_score PROJECT_ROOT = Path(__file__).resolve().parent OUT_DIR = PROJECT_ROOT / "new_plots" / "final_result" sys.path.append(str(PROJECT_ROOT / "main_hypot")) import best_model_and_plots as bmp # noqa: E402 sys.path.append(str(PROJECT_ROOT)) import new_plots as npplots # noqa: E402 CATEGORIES = ["ent", "super", "transport", "shopping", "hotel", "avia"] FONT_PATH = Path("/Users/dan/Downloads/AyuGram Desktop/SegoeUIVF.ttf") BASE_WINDOW = bmp.DEFAULT_SAVGOL_WINDOW ACTIVE_WINDOW_FACTOR = 0.5 PASSIVE_WINDOW_FACTOR = 0.95 COMBINED_WINDOW_FACTOR = 1.0 def load_raw() -> pd.DataFrame: conn = sqlite3.connect(bmp.DB_PATH) df = pd.read_sql_query("select * from communications", conn, parse_dates=["business_dt"]) conn.close() return df def build_client(df: pd.DataFrame) -> pd.DataFrame: agg_spec = { **{f"active_imp_{c}": "sum" for c in CATEGORIES}, **{f"passive_imp_{c}": "sum" for c in CATEGORIES}, **{f"orders_amt_{c}": "sum" for c in CATEGORIES}, "business_dt": "nunique", } client = df.groupby("id").agg(agg_spec).reset_index() client = client.rename(columns={"business_dt": "contact_days"}) client["active_imp_total"] = client[[f"active_imp_{c}" for c in CATEGORIES]].sum(axis=1) client["passive_imp_total"] = client[[f"passive_imp_{c}" for c in CATEGORIES]].sum(axis=1) client["orders_amt_total"] = client[[f"orders_amt_{c}" for c in CATEGORIES]].sum(axis=1) for c in CATEGORIES: client[f"active_imp_per_day_{c}"] = bmp.safe_divide(client[f"active_imp_{c}"], client["contact_days"]) client[f"passive_imp_per_day_{c}"] = bmp.safe_divide(client[f"passive_imp_{c}"], client["contact_days"]) client[f"total_imp_per_day_{c}"] = bmp.safe_divide( client[f"active_imp_{c}"] + client[f"passive_imp_{c}"], client["contact_days"] ) client["active_imp_per_day_total"] = bmp.safe_divide(client["active_imp_total"], client["contact_days"]) client["passive_imp_per_day_total"] = bmp.safe_divide(client["passive_imp_total"], client["contact_days"]) client["total_imp_per_day_total"] = bmp.safe_divide( client["active_imp_total"] + client["passive_imp_total"], client["contact_days"] ) return client def compute_limits(df: pd.DataFrame, x_col: str, y_col: str) -> Tuple[float, float]: x_q = df[x_col].quantile(0.99) y_q = df[y_col].quantile(0.99) x_max = float(max(0.1, x_q + 2.0)) y_max = float(max(0.1, y_q + 3.0)) return x_max, y_max def fit_quadratic( df: pd.DataFrame, trend_data: Tuple[np.ndarray, np.ndarray], *, x_col: str, y_col: str, x_max: float, ) -> Tuple[Optional[sm.regression.linear_model.RegressionResultsWrapper], dict]: if len(df) < 3: return None, {} x = df[x_col].to_numpy() y = df[y_col].to_numpy() X = sm.add_constant(np.column_stack([x, x**2])) model = sm.OLS(y, X).fit(cov_type="HC3") auc = np.nan binary = (y > 0).astype(int) if len(np.unique(binary)) > 1: auc = roc_auc_score(binary, model.predict(X)) tx, ty = trend_data r2_trend = np.nan if tx is not None and len(tx) >= 3: mask = (tx <= x_max) & ~np.isnan(ty) txm = tx[mask] tym = ty[mask] if len(txm) >= 3 and np.nanvar(tym) > 0: X_trend = sm.add_constant(np.column_stack([txm, txm**2])) y_hat_trend = model.predict(X_trend) r2_trend = r2_score(tym, y_hat_trend) return model, {"auc": auc, "r2_trend": r2_trend} def scatter_trend_quad( df: pd.DataFrame, x_col: str, y_col: str, out_path: Path, *, title: str, window: int, x_override: float | None = None, y_override: float | None = None, x_scale_factor: float | None = None, ) -> None: # Авто-лимиты x_max, y_max = compute_limits(df, x_col, y_col) if x_override is not None: x_max = x_override + 5.5 if y_override is not None: y_max = y_override + 3.0 if x_scale_factor is not None: x_max = x_max * x_scale_factor # Фильтр base = df[[x_col, y_col]].dropna() base = bmp.filter_x_range(base, x_col, x_max) base = base[base[y_col] <= y_max].copy() base["alpha"] = bmp.compute_density_alpha( base, x_col=x_col, y_col=y_col, x_max=x_max, bins_x=bmp.DEFAULT_BINS_X, bins_y=bmp.DEFAULT_BINS_Y, alpha_min=bmp.DEFAULT_ALPHA_MIN, alpha_max=bmp.DEFAULT_ALPHA_MAX, y_min=bmp.DEFAULT_Y_MIN, y_max_limit=y_max, ) tx, ty = bmp.compute_trend( base, y_col=y_col, x_col=x_col, method=bmp.DEFAULT_TREND_METHOD, lowess_frac=bmp.DEFAULT_TREND_FRAC, savgol_window=window, ) trend_data = (tx, ty) model, metrics = fit_quadratic(base, trend_data, x_col=x_col, y_col=y_col, x_max=x_max) if model is None: print(f"[{y_col}] not enough data") return params = model.params pvals = model.pvalues x_scale = alt.Scale(domain=(0, x_max), clamp=True, nice=False, domainMin=0, domainMax=x_max) y_scale = alt.Scale(domain=(bmp.DEFAULT_Y_MIN, y_max), clamp=True, nice=False) points = alt.Chart(base).mark_circle(size=40).encode( x=alt.X(x_col, title="Показы в день", scale=x_scale), y=alt.Y(y_col, title="Заказы", scale=y_scale), opacity=alt.Opacity("alpha:Q", scale=alt.Scale(domain=(0, 1), clamp=True), legend=None), color=alt.value("#FFDD2D"), tooltip=[x_col, y_col], ) layers = [points] if tx is not None and len(tx): trend_df = pd.DataFrame({x_col: tx, "trend": ty}) layers.append( alt.Chart(trend_df).mark_line(color="#EB4146", strokeWidth=2.5).encode( x=alt.X(x_col, scale=x_scale), y=alt.Y("trend", scale=y_scale), ) ) x_grid = np.linspace(0, x_max, 400) quad_df = pd.DataFrame( { x_col: x_grid, "quad": model.predict(sm.add_constant(np.column_stack([x_grid, x_grid**2]))), } ) layers.append( alt.Chart(quad_df).mark_line(color="#198F51", strokeWidth=2.2).encode( x=alt.X(x_col, scale=x_scale), y=alt.Y("quad", scale=y_scale), ) ) metrics_text = [ f"R2_trend={metrics['r2_trend']:.3f}", f"AUC={metrics['auc']:.3f}", f"b1={params[1]:.3f} (p={pvals[1]:.3g})", f"b2={params[2]:.3f} (p={pvals[2]:.3g})", f"n={len(base)}", ] subtitle = " • ".join(metrics_text) chart = alt.layer(*layers).resolve_scale(opacity="independent") chart = npplots.configure_chart(chart, f"{title} — {subtitle}", width=800, height=600) out_path.parent.mkdir(parents=True, exist_ok=True) chart.save(out_path) npplots.inject_font_css(out_path) print(f"Saved {out_path}") def main() -> None: raw = load_raw() client = build_client(raw) targets = [("total", "orders_amt_total")] + [(c, f"orders_amt_{c}") for c in CATEGORIES] for name, y_col in targets: out_dir = OUT_DIR / name # активные x_col_active = f"active_imp_per_day_{name}" if name != "total" else "active_imp_per_day_total" scatter_trend_quad( client, x_col=x_col_active, y_col=y_col, out_path=out_dir / f"{name}_active_scatter.html", title=f"{y_col}: активные показы/день", window=int(max(5, BASE_WINDOW * ACTIVE_WINDOW_FACTOR)), y_override=6.5 if name == "total" else None, x_scale_factor=0.5 if name == "total" else None, ) # пассивные x_col_passive = f"passive_imp_per_day_{name}" if name != "total" else "passive_imp_per_day_total" scatter_trend_quad( client, x_col=x_col_passive, y_col=y_col, out_path=out_dir / f"{name}_passive_scatter.html", title=f"{y_col}: пассивные показы/день", window=int(max(5, BASE_WINDOW * PASSIVE_WINDOW_FACTOR)), y_override=6.5 if name == "total" else None, x_scale_factor=0.95 if name == "total" else None, ) # комбинированные x_col_total = f"total_imp_per_day_{name}" if name != "total" else "total_imp_per_day_total" scatter_trend_quad( client, x_col=x_col_total, y_col=y_col, out_path=out_dir / f"{name}_combined_scatter.html", title=f"{y_col}: все показы/день", window=int(max(5, BASE_WINDOW * COMBINED_WINDOW_FACTOR)), x_override=13 if name == "total" else None, y_override=6.5 if name == "total" else None, ) if __name__ == "__main__": main()