466 lines
16 KiB
Python
466 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
import sys
|
|
from typing import Dict, Iterable, Optional, Tuple
|
|
|
|
import altair as alt
|
|
import numpy as np
|
|
import pandas as pd
|
|
import statsmodels.api as sm
|
|
from sklearn.metrics import roc_auc_score, r2_score
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent
|
|
sys.path.append(str(PROJECT_ROOT / "main_hypot"))
|
|
|
|
import best_model_and_plots as bmp
|
|
from category_quadreg import (
|
|
BASE_COLUMNS,
|
|
CATEGORIES,
|
|
COMBINED,
|
|
add_combined_category,
|
|
build_client_by_category,
|
|
)
|
|
|
|
OUTPUT_DIR = PROJECT_ROOT / "new_plots"
|
|
FONT_PATH = Path("/Users/dan/Downloads/AyuGram Desktop/SegoeUIVF.ttf")
|
|
|
|
def inject_font_css(html_path: Path) -> None:
|
|
"""Inject @font-face for SegoeUIVF into saved HTML if font exists."""
|
|
if not FONT_PATH.exists():
|
|
return
|
|
font_face = (
|
|
"@font-face{font-family:'Segoe UI Variable'; "
|
|
f"src: url('{FONT_PATH.as_uri()}') format('truetype'); "
|
|
"font-weight:100 900; font-style:normal;}\n"
|
|
)
|
|
css = f"<style>{font_face}body, text, .vega-bindings {{font-family:'Segoe UI Variable','Segoe UI',sans-serif;}}</style>"
|
|
html = html_path.read_text(encoding="utf-8")
|
|
if css in html:
|
|
return
|
|
if "</head>" in html:
|
|
html = html.replace("</head>", css + "\n</head>", 1)
|
|
else:
|
|
html = css + html
|
|
html_path.write_text(html, encoding="utf-8")
|
|
|
|
|
|
# Используем тематику/шрифты из примера
|
|
def configure_chart(chart: alt.Chart, title: str, width: int = 700, height: int = 500) -> alt.Chart:
|
|
alt.theme.enable("dark")
|
|
return (
|
|
chart.properties(
|
|
title=title,
|
|
width=width,
|
|
height=height,
|
|
padding=30,
|
|
)
|
|
.configure_title(
|
|
fontSize=18,
|
|
font="Segoe UI Variable",
|
|
fontWeight=600,
|
|
anchor="start",
|
|
)
|
|
.configure_axis(
|
|
grid=True,
|
|
labelFont="Segoe UI Variable",
|
|
titleFont="Segoe UI Variable",
|
|
labelFontSize=16,
|
|
titleFontSize=18,
|
|
labelFontWeight=400,
|
|
titleFontWeight=600,
|
|
)
|
|
.configure_legend(
|
|
labelFont="Segoe UI Variable",
|
|
titleFont="Segoe UI Variable",
|
|
)
|
|
)
|
|
|
|
|
|
def prepare_client_data() -> pd.DataFrame:
|
|
"""Поднимаем агрегаты по клиентам из существующего скрипта."""
|
|
return bmp.load_client_level(bmp.DB_PATH)
|
|
|
|
|
|
def prepare_category_client_data() -> pd.DataFrame:
|
|
raw = pd.read_sql_query("select * from communications", bmp.sqlite3.connect(bmp.DB_PATH), parse_dates=["business_dt"])
|
|
client = build_client_by_category(raw)
|
|
for combo_name, cats in COMBINED.items():
|
|
client = add_combined_category(client, combo_name, cats)
|
|
return client
|
|
|
|
|
|
def filter_and_trend(
|
|
df: pd.DataFrame,
|
|
y_col: str,
|
|
*,
|
|
x_col: str = bmp.X_COL,
|
|
x_max: float = bmp.DEFAULT_X_MAX,
|
|
y_max: float = bmp.DEFAULT_Y_MAX,
|
|
q_low: float = bmp.DEFAULT_Q_LOW,
|
|
q_high: float = bmp.DEFAULT_Q_HIGH,
|
|
iqr_k: float = bmp.DEFAULT_IQR_K,
|
|
trend_method: str = bmp.DEFAULT_TREND_METHOD,
|
|
trend_frac: float = bmp.DEFAULT_TREND_FRAC,
|
|
savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW,
|
|
) -> Tuple[pd.DataFrame, Tuple[np.ndarray, np.ndarray]]:
|
|
base = df[[x_col, y_col]].dropna()
|
|
in_range = bmp.filter_x_range(base, x_col, x_max)
|
|
cleaned = bmp.remove_outliers(
|
|
in_range,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
iqr_k=iqr_k,
|
|
q_low=q_low,
|
|
q_high=q_high,
|
|
)
|
|
# Обрезаем по y_max для удобства визуализации
|
|
cleaned = cleaned[cleaned[y_col] <= y_max].copy()
|
|
tx, ty = bmp.compute_trend(
|
|
cleaned,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
method=trend_method,
|
|
lowess_frac=trend_frac,
|
|
savgol_window=savgol_window,
|
|
)
|
|
return cleaned, (tx, ty)
|
|
|
|
|
|
def compute_density_alpha(df: pd.DataFrame, x_col: str, y_col: str, x_max: float, y_max: float) -> pd.Series:
|
|
alphas = bmp.compute_density_alpha(
|
|
df,
|
|
x_col=x_col,
|
|
y_col=y_col,
|
|
x_max=x_max,
|
|
bins_x=bmp.DEFAULT_BINS_X,
|
|
bins_y=bmp.DEFAULT_BINS_Y,
|
|
alpha_min=bmp.DEFAULT_ALPHA_MIN,
|
|
alpha_max=bmp.DEFAULT_ALPHA_MAX,
|
|
y_min=bmp.DEFAULT_Y_MIN,
|
|
y_max_limit=y_max,
|
|
)
|
|
if len(alphas) == 0:
|
|
return pd.Series([bmp.DEFAULT_ALPHA] * len(df), index=df.index)
|
|
return pd.Series(alphas, index=df.index)
|
|
|
|
|
|
def fit_quadratic(
|
|
df: pd.DataFrame,
|
|
y_col: str,
|
|
trend_data: Tuple[np.ndarray, np.ndarray],
|
|
*,
|
|
x_col: str = bmp.X_COL,
|
|
x_max: float = bmp.DEFAULT_X_MAX,
|
|
force_negative_b2: bool = False,
|
|
) -> Tuple[Optional[sm.regression.linear_model.RegressionResultsWrapper], dict]:
|
|
if len(df) < 3:
|
|
return None, {}
|
|
|
|
x = df[x_col].to_numpy()
|
|
y = df[y_col].to_numpy()
|
|
quad_term = -x**2 if force_negative_b2 else x**2
|
|
X_design = sm.add_constant(np.column_stack([x, quad_term]))
|
|
model = sm.OLS(y, X_design).fit(cov_type="HC3")
|
|
|
|
# AUC по бинарному флагу заказа
|
|
auc = np.nan
|
|
binary = (y > 0).astype(int)
|
|
if len(np.unique(binary)) > 1:
|
|
auc = roc_auc_score(binary, model.predict(X_design))
|
|
|
|
# R2 по тренду
|
|
tx, ty = trend_data
|
|
r2_trend = np.nan
|
|
if tx is not None and len(tx) >= 3:
|
|
mask = (tx <= x_max) & ~np.isnan(ty)
|
|
tx = tx[mask]
|
|
ty = ty[mask]
|
|
if len(tx) >= 3 and np.nanvar(ty) > 0:
|
|
quad_trend = -tx**2 if force_negative_b2 else tx**2
|
|
X_trend = sm.add_constant(np.column_stack([tx, quad_trend]))
|
|
y_hat_trend = model.predict(X_trend)
|
|
r2_trend = r2_score(ty, y_hat_trend)
|
|
|
|
return model, {"auc": auc, "r2_trend": r2_trend}
|
|
|
|
|
|
def build_annotation(
|
|
params: np.ndarray,
|
|
pvals: np.ndarray,
|
|
metrics: dict,
|
|
n: int,
|
|
*,
|
|
b2_effective: Optional[float] = None,
|
|
x_pos: float = 0.5,
|
|
) -> pd.DataFrame:
|
|
b2_val = b2_effective if b2_effective is not None else params[2]
|
|
lines = [
|
|
f"R2_trend={metrics.get('r2_trend', np.nan):.3f}",
|
|
f"AUC={metrics.get('auc', np.nan):.3f}",
|
|
f"b1={params[1]:.3f} (p={pvals[1]:.3g})",
|
|
f"b2={b2_val:.3f} (p={pvals[2]:.3g})",
|
|
f"n={n}",
|
|
]
|
|
return pd.DataFrame(
|
|
{
|
|
"x": [x_pos] * len(lines),
|
|
"y": [metrics.get("y_max_for_anno", 0) - i * 0.4 for i in range(len(lines))],
|
|
"label": lines,
|
|
}
|
|
)
|
|
|
|
|
|
def save_scatter_trend_quad(
|
|
df: pd.DataFrame,
|
|
y_col: str,
|
|
out_path: Path,
|
|
*,
|
|
x_col: str = bmp.X_COL,
|
|
x_max: float = bmp.DEFAULT_X_MAX,
|
|
y_max: float = bmp.DEFAULT_Y_MAX,
|
|
force_negative_b2: bool = False,
|
|
savgol_window: int = bmp.DEFAULT_SAVGOL_WINDOW,
|
|
title: str = "",
|
|
) -> None:
|
|
cleaned, trend_data = filter_and_trend(
|
|
df,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
x_max=x_max,
|
|
y_max=y_max,
|
|
trend_method=bmp.DEFAULT_TREND_METHOD,
|
|
trend_frac=bmp.DEFAULT_TREND_FRAC,
|
|
savgol_window=savgol_window,
|
|
)
|
|
if trend_data[0] is None:
|
|
print(f"[{y_col}] нет тренда/данных для построения")
|
|
return
|
|
|
|
cleaned = cleaned.copy()
|
|
cleaned["alpha"] = compute_density_alpha(cleaned, x_col, y_col, x_max, y_max)
|
|
|
|
model, metrics = fit_quadratic(cleaned, y_col, trend_data, x_col=x_col, x_max=x_max, force_negative_b2=force_negative_b2)
|
|
if model is None:
|
|
print(f"[{y_col}] недостаточно точек для квадрата")
|
|
return
|
|
|
|
params = model.params
|
|
pvals = model.pvalues
|
|
b2_effective = -abs(params[2]) if force_negative_b2 else params[2]
|
|
|
|
x_grid = np.linspace(0, x_max, 400)
|
|
quad_term = -x_grid**2 if force_negative_b2 else x_grid**2
|
|
quad_df = pd.DataFrame(
|
|
{
|
|
x_col: x_grid,
|
|
"quad": model.predict(sm.add_constant(np.column_stack([x_grid, quad_term]))),
|
|
}
|
|
)
|
|
|
|
trend_df = pd.DataFrame({x_col: trend_data[0], "trend": trend_data[1]})
|
|
metrics["y_max_for_anno"] = y_max * 0.95
|
|
metrics_text = [
|
|
f"R2_trend={metrics['r2_trend']:.3f}",
|
|
f"AUC={metrics['auc']:.3f}",
|
|
f"b1={params[1]:.3f} (p={pvals[1]:.3g})",
|
|
f"b2={b2_effective:.3f} (p={pvals[2]:.3g})",
|
|
f"n={len(cleaned)}",
|
|
]
|
|
|
|
x_scale = alt.Scale(domain=(0, x_max), clamp=True, nice=False, domainMin=0, domainMax=x_max)
|
|
y_scale = alt.Scale(domain=(bmp.DEFAULT_Y_MIN, y_max), clamp=True, nice=False)
|
|
|
|
points = alt.Chart(cleaned).mark_circle(size=40).encode(
|
|
x=alt.X(x_col, title="Среднее число показов в день", scale=x_scale),
|
|
y=alt.Y(y_col, title=y_col, scale=y_scale),
|
|
opacity=alt.Opacity("alpha:Q", scale=alt.Scale(domain=(0, 1), clamp=True)),
|
|
color=alt.value(bmp.DEFAULT_SCATTER_COLOR),
|
|
tooltip=[x_col, y_col],
|
|
)
|
|
|
|
trend_line = alt.Chart(trend_df).mark_line(color=bmp.DEFAULT_TREND_COLOR, strokeWidth=2.5).encode(
|
|
x=alt.X(x_col, scale=x_scale),
|
|
y=alt.Y("trend", scale=y_scale),
|
|
)
|
|
quad_line = alt.Chart(quad_df).mark_line(color="blue", strokeWidth=2.2, strokeDash=[6, 4]).encode(
|
|
x=alt.X(x_col, scale=x_scale),
|
|
y=alt.Y("quad", scale=y_scale),
|
|
)
|
|
|
|
subtitle = " • ".join(metrics_text)
|
|
|
|
chart = alt.layer(points, trend_line, quad_line).resolve_scale(opacity="independent")
|
|
chart = configure_chart(chart, (title or f"{y_col} vs {x_col}") + f" — {subtitle}", width=800, height=600)
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
chart.save(out_path)
|
|
inject_font_css(out_path)
|
|
print(f"Saved {out_path}")
|
|
|
|
|
|
def save_correlation_heatmap(df: pd.DataFrame, cols: Iterable[str], title: str, out_path: Path) -> None:
|
|
corr = df[list(cols)].corr()
|
|
corr_long = corr.reset_index().melt(id_vars="index", var_name="col", value_name="corr")
|
|
corr_long = corr_long.rename(columns={"index": "row"})
|
|
|
|
chart = (
|
|
alt.Chart(corr_long)
|
|
.mark_rect()
|
|
.encode(
|
|
x=alt.X("col:N", title=""),
|
|
y=alt.Y("row:N", title=""),
|
|
color=alt.Color("corr:Q", scale=alt.Scale(domain=(-1, 1), scheme="redblue"), legend=alt.Legend(title="corr")),
|
|
tooltip=["row", "col", alt.Tooltip("corr:Q", format=".3f")],
|
|
)
|
|
)
|
|
chart = configure_chart(chart, title, width=400, height=400)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
chart.save(out_path)
|
|
inject_font_css(out_path)
|
|
print(f"Saved {out_path}")
|
|
|
|
|
|
def generate_total_plots() -> None:
|
|
df = prepare_client_data()
|
|
out_base = OUTPUT_DIR / "orders_amt_total"
|
|
save_scatter_trend_quad(
|
|
df,
|
|
y_col="orders_amt_total",
|
|
out_path=out_base / "scatter_trend_quad.html",
|
|
x_max=bmp.DEFAULT_X_MAX,
|
|
y_max=bmp.DEFAULT_Y_MAX,
|
|
savgol_window=bmp.DEFAULT_SAVGOL_WINDOW,
|
|
title="Заказы vs средние показы (все клиенты)",
|
|
)
|
|
|
|
|
|
def generate_category_plots() -> None:
|
|
client = prepare_category_client_data()
|
|
|
|
x_max_overrides = {
|
|
"ent": 4,
|
|
"transport": 6,
|
|
"super": 4,
|
|
"avia": 4,
|
|
"shopping": 4,
|
|
"avia_hotel": 5,
|
|
}
|
|
y_max_overrides = {
|
|
"ent": 2.5,
|
|
"transport": 8,
|
|
"avia": 1.5,
|
|
"shopping": 2.5,
|
|
"super": 5.5,
|
|
"avia_hotel": 2.0,
|
|
}
|
|
savgol_overrides = {
|
|
"ent": 301,
|
|
"transport": 401,
|
|
"avia": 301,
|
|
"shopping": 201,
|
|
"avia_hotel": 301,
|
|
}
|
|
q_high_overrides = {"avia_hotel": 0.9}
|
|
iqr_overrides = {"avia_hotel": 1.2}
|
|
|
|
cats_all = CATEGORIES + list(COMBINED.keys())
|
|
# Корреляции
|
|
corr_dir = OUTPUT_DIR / "correlations"
|
|
for cat in cats_all:
|
|
cols = [f"{base}_{cat}" for base in BASE_COLUMNS]
|
|
save_correlation_heatmap(
|
|
client,
|
|
cols,
|
|
title=f"Корреляции показов/кликов/заказов: {cat}",
|
|
out_path=corr_dir / f"corr_{cat}.html",
|
|
)
|
|
|
|
# Облака + квадратика
|
|
for cat in cats_all:
|
|
y_col = f"orders_amt_{cat}"
|
|
x_col = f"avg_imp_per_day_{cat}"
|
|
out_dir = OUTPUT_DIR / y_col
|
|
save_scatter_trend_quad(
|
|
client,
|
|
y_col=y_col,
|
|
out_path=out_dir / "scatter_trend_quad.html",
|
|
x_col=x_col,
|
|
x_max=x_max_overrides.get(cat, bmp.DEFAULT_X_MAX),
|
|
y_max=y_max_overrides.get(cat, bmp.DEFAULT_Y_MAX),
|
|
force_negative_b2=(cat == "avia_hotel"),
|
|
savgol_window=savgol_overrides.get(cat, bmp.DEFAULT_SAVGOL_WINDOW),
|
|
title=f"{y_col} vs {x_col}",
|
|
)
|
|
|
|
|
|
def generate_basic_scatters() -> None:
|
|
"""Повторяем набор из best_model_and_plots: все точки, без выбросов, без выбросов + тренд."""
|
|
df = prepare_client_data()
|
|
y_col = "orders_amt_total"
|
|
x_col = bmp.X_COL
|
|
x_max = bmp.DEFAULT_X_MAX
|
|
y_max = bmp.DEFAULT_Y_MAX
|
|
out_dir = OUTPUT_DIR / y_col
|
|
|
|
base = df[[x_col, y_col]].dropna()
|
|
base = bmp.filter_x_range(base, x_col, x_max)
|
|
base = base.copy()
|
|
base["alpha"] = compute_density_alpha(base, x_col, y_col, x_max, y_max)
|
|
|
|
def scatter_chart(data: pd.DataFrame, title: str, trend: Tuple[np.ndarray, np.ndarray] | None = None) -> alt.Chart:
|
|
x_scale = alt.Scale(domain=(0, x_max), clamp=True, nice=False, domainMin=0, domainMax=x_max)
|
|
y_scale = alt.Scale(domain=(bmp.DEFAULT_Y_MIN, y_max), clamp=True, nice=False)
|
|
points = alt.Chart(data).mark_circle(size=40).encode(
|
|
x=alt.X(x_col, title="Среднее число показов в день", scale=x_scale),
|
|
y=alt.Y(y_col, title=y_col, scale=y_scale),
|
|
opacity=alt.Opacity("alpha:Q", scale=alt.Scale(domain=(0, 1), clamp=True)),
|
|
color=alt.value(bmp.DEFAULT_SCATTER_COLOR),
|
|
tooltip=[x_col, y_col],
|
|
)
|
|
layers = [points]
|
|
if trend is not None and trend[0] is not None:
|
|
trend_df = pd.DataFrame({x_col: trend[0], "trend": trend[1]})
|
|
layers.append(
|
|
alt.Chart(trend_df).mark_line(color=bmp.DEFAULT_TREND_COLOR, strokeWidth=2.5).encode(
|
|
x=alt.X(x_col, scale=x_scale),
|
|
y=alt.Y("trend", scale=y_scale),
|
|
)
|
|
)
|
|
chart = alt.layer(*layers).resolve_scale(opacity="independent")
|
|
return configure_chart(chart, title, width=800, height=600)
|
|
|
|
# 1) все точки
|
|
scatter_chart(base, "Облако: все точки").save(out_dir / "scatter_all.html")
|
|
inject_font_css(out_dir / "scatter_all.html")
|
|
|
|
# 2) без выбросов
|
|
cleaned = bmp.remove_outliers(base, y_col=y_col, x_col=x_col, iqr_k=bmp.DEFAULT_IQR_K, q_low=bmp.DEFAULT_Q_LOW, q_high=bmp.DEFAULT_Q_HIGH)
|
|
cleaned = cleaned.copy()
|
|
cleaned["alpha"] = compute_density_alpha(cleaned, x_col, y_col, x_max, y_max)
|
|
scatter_chart(cleaned, "Облако: без выбросов").save(out_dir / "scatter_clean.html")
|
|
inject_font_css(out_dir / "scatter_clean.html")
|
|
|
|
# 3) без выбросов + тренд
|
|
tx, ty = bmp.compute_trend(
|
|
cleaned,
|
|
y_col=y_col,
|
|
x_col=x_col,
|
|
method=bmp.DEFAULT_TREND_METHOD,
|
|
lowess_frac=bmp.DEFAULT_TREND_FRAC,
|
|
savgol_window=bmp.DEFAULT_SAVGOL_WINDOW,
|
|
)
|
|
scatter_chart(cleaned, "Облако: без выбросов + тренд", trend=(tx, ty)).save(out_dir / "scatter_clean_trend.html")
|
|
inject_font_css(out_dir / "scatter_clean_trend.html")
|
|
|
|
|
|
def main() -> None:
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
generate_basic_scatters()
|
|
generate_total_plots()
|
|
generate_category_plots()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|