some refactoring
This commit is contained in:
@@ -1,154 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
# Paths and column groups
|
||||
DATA_PATH = Path("dataset/ds.csv")
|
||||
CATEGORIES: List[str] = ["ent", "super", "transport", "shopping", "hotel", "avia"]
|
||||
|
||||
ACTIVE_IMP_COLS = [f"active_imp_{c}" for c in CATEGORIES]
|
||||
PASSIVE_IMP_COLS = [f"passive_imp_{c}" for c in CATEGORIES]
|
||||
ACTIVE_CLICK_COLS = [f"active_click_{c}" for c in CATEGORIES]
|
||||
PASSIVE_CLICK_COLS = [f"passive_click_{c}" for c in CATEGORIES]
|
||||
ORDER_COLS = [f"orders_amt_{c}" for c in CATEGORIES]
|
||||
|
||||
NUMERIC_COLS = (
|
||||
ACTIVE_IMP_COLS
|
||||
+ PASSIVE_IMP_COLS
|
||||
+ ACTIVE_CLICK_COLS
|
||||
+ PASSIVE_CLICK_COLS
|
||||
+ ORDER_COLS
|
||||
+ ["age"]
|
||||
)
|
||||
CAT_COLS = ["gender_cd", "device_platform_cd"]
|
||||
|
||||
|
||||
def safe_divide(numerator: pd.Series | float, denominator: pd.Series | float) -> pd.Series:
|
||||
"""Divide with protection against zero (works for Series and scalars)."""
|
||||
if isinstance(denominator, pd.Series):
|
||||
denom = denominator.replace(0, np.nan)
|
||||
else:
|
||||
denom = np.nan if float(denominator) == 0 else denominator
|
||||
return numerator / denom
|
||||
|
||||
|
||||
def normalize_gender(series: pd.Series) -> pd.Series:
|
||||
cleaned = series.fillna("UNKNOWN").astype(str).str.strip().str.upper()
|
||||
mapping = {"M": "M", "MALE": "M", "F": "F", "FEMALE": "F"}
|
||||
return cleaned.map(mapping).fillna("UNKNOWN")
|
||||
|
||||
|
||||
def normalize_device(series: pd.Series) -> pd.Series:
|
||||
cleaned = series.fillna("unknown").astype(str).str.strip()
|
||||
lowered = cleaned.str.lower().str.replace(" ", "").str.replace("_", "")
|
||||
mapping = {"android": "Android", "ios": "iOS", "ipados": "iPadOS", "ipad": "iPadOS"}
|
||||
mapped = lowered.map(mapping)
|
||||
fallback = cleaned.str.title()
|
||||
return mapped.fillna(fallback)
|
||||
|
||||
|
||||
def add_age_group(df: pd.DataFrame) -> pd.DataFrame:
|
||||
bins = [0, 25, 35, 45, 55, np.inf]
|
||||
labels = ["<25", "25-34", "35-44", "45-54", "55+"]
|
||||
df["age_group"] = pd.cut(df["age"], bins=bins, labels=labels, right=False)
|
||||
return df
|
||||
|
||||
|
||||
def add_totals(df: pd.DataFrame) -> pd.DataFrame:
|
||||
df["active_imp_total"] = df[ACTIVE_IMP_COLS].sum(axis=1)
|
||||
df["passive_imp_total"] = df[PASSIVE_IMP_COLS].sum(axis=1)
|
||||
df["active_click_total"] = df[ACTIVE_CLICK_COLS].sum(axis=1)
|
||||
df["passive_click_total"] = df[PASSIVE_CLICK_COLS].sum(axis=1)
|
||||
df["orders_amt_total"] = df[ORDER_COLS].sum(axis=1)
|
||||
df["click_total"] = df["active_click_total"] + df["passive_click_total"]
|
||||
df["imp_total"] = df["active_imp_total"] + df["passive_imp_total"]
|
||||
df["active_ctr"] = safe_divide(df["active_click_total"], df["active_imp_total"])
|
||||
df["passive_ctr"] = safe_divide(df["passive_click_total"], df["passive_imp_total"])
|
||||
df["ctr_all"] = safe_divide(df["click_total"], df["imp_total"])
|
||||
df["cr_click2order"] = safe_divide(df["orders_amt_total"], df["click_total"])
|
||||
df["cr_imp2order"] = safe_divide(df["orders_amt_total"], df["imp_total"])
|
||||
return df
|
||||
|
||||
|
||||
def add_flags(df: pd.DataFrame) -> pd.DataFrame:
|
||||
df["has_active_comm"] = (df[ACTIVE_IMP_COLS + ACTIVE_CLICK_COLS].sum(axis=1) > 0).astype(int)
|
||||
df["has_passive_comm"] = (df[PASSIVE_IMP_COLS + PASSIVE_CLICK_COLS].sum(axis=1) > 0).astype(int)
|
||||
df["has_any_order"] = (df[ORDER_COLS].sum(axis=1) > 0).astype(int)
|
||||
df["order_categories_count"] = (df[ORDER_COLS] > 0).sum(axis=1)
|
||||
return df
|
||||
|
||||
|
||||
def load_data(path: Path | str = DATA_PATH) -> pd.DataFrame:
|
||||
df = pd.read_csv(path)
|
||||
df["business_dt"] = pd.to_datetime(df["business_dt"])
|
||||
df["gender_cd"] = normalize_gender(df["gender_cd"])
|
||||
df["device_platform_cd"] = normalize_device(df["device_platform_cd"])
|
||||
df = add_age_group(df)
|
||||
df = add_totals(df)
|
||||
df = add_flags(df)
|
||||
return df
|
||||
|
||||
|
||||
def describe_zero_share(df: pd.DataFrame, cols: Iterable[str]) -> pd.DataFrame:
|
||||
stats = []
|
||||
for col in cols:
|
||||
series = df[col]
|
||||
stats.append(
|
||||
{
|
||||
"col": col,
|
||||
"count": series.count(),
|
||||
"mean": series.mean(),
|
||||
"median": series.median(),
|
||||
"std": series.std(),
|
||||
"min": series.min(),
|
||||
"q25": series.quantile(0.25),
|
||||
"q75": series.quantile(0.75),
|
||||
"max": series.max(),
|
||||
"share_zero": (series == 0).mean(),
|
||||
"p95": series.quantile(0.95),
|
||||
"p99": series.quantile(0.99),
|
||||
}
|
||||
)
|
||||
return pd.DataFrame(stats)
|
||||
|
||||
|
||||
def build_daily(df: pd.DataFrame) -> pd.DataFrame:
|
||||
agg_cols = ACTIVE_IMP_COLS + PASSIVE_IMP_COLS + ACTIVE_CLICK_COLS + PASSIVE_CLICK_COLS + ORDER_COLS
|
||||
daily = df.groupby("business_dt")[agg_cols].sum().reset_index()
|
||||
daily = add_totals(daily)
|
||||
daily["day_of_week"] = daily["business_dt"].dt.day_name()
|
||||
return daily
|
||||
|
||||
|
||||
def build_client(df: pd.DataFrame) -> pd.DataFrame:
|
||||
agg_spec: Dict[str, str] = {col: "sum" for col in ACTIVE_IMP_COLS + PASSIVE_IMP_COLS + ACTIVE_CLICK_COLS + PASSIVE_CLICK_COLS + ORDER_COLS}
|
||||
meta_spec: Dict[str, str | callable] = {
|
||||
"age": "median",
|
||||
"gender_cd": lambda s: s.mode().iat[0] if not s.mode().empty else "UNKNOWN",
|
||||
"age_group": lambda s: s.mode().iat[0] if not s.mode().empty else np.nan,
|
||||
"device_platform_cd": lambda s: s.mode().iat[0] if not s.mode().empty else "Other",
|
||||
}
|
||||
agg_spec.update(meta_spec)
|
||||
client = df.groupby("id").agg(agg_spec).reset_index()
|
||||
contact_days = df.groupby("id")["business_dt"].nunique().rename("contact_days")
|
||||
imp_day = df.copy()
|
||||
imp_day["imp_day_total"] = imp_day[ACTIVE_IMP_COLS + PASSIVE_IMP_COLS].sum(axis=1)
|
||||
max_imp_day = imp_day.groupby("id")["imp_day_total"].max().rename("max_impressions_per_day")
|
||||
client = add_totals(client)
|
||||
client = add_flags(client)
|
||||
client = client.merge(contact_days, on="id", how="left")
|
||||
client = client.merge(max_imp_day, on="id", how="left")
|
||||
client = add_contact_density(client)
|
||||
return client
|
||||
|
||||
|
||||
def add_contact_density(df: pd.DataFrame) -> pd.DataFrame:
|
||||
# contact_days must already be present
|
||||
if "contact_days" in df.columns:
|
||||
df["avg_impressions_per_contact_day"] = safe_divide(df["imp_total"], df["contact_days"])
|
||||
return df
|
||||
return df
|
||||
Reference in New Issue
Block a user