Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| import json | |
| from datetime import datetime | |
| import pandas as pd | |
| # 書き込み可能な場所をデフォルトにする(HFでは /app がReadOnlyな場合がある) | |
| DEFAULT_WRITABLE_DIR = "/tmp/adcopy_data" | |
| DATA_DIR = os.environ.get("DATA_DIR", DEFAULT_WRITABLE_DIR) | |
| LOG_PATH = os.path.join(DATA_DIR, "events.csv") | |
| META_PATH = os.path.join(DATA_DIR, "meta.json") | |
| SCHEMA = [ | |
| "ts", "date", "medium", "creative", "is_control", | |
| "impressions", "clicks", "conversions", "cost", "features_json" | |
| ] | |
| def _ensure_storage(): | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| if not os.path.exists(LOG_PATH): | |
| pd.DataFrame(columns=SCHEMA).to_csv(LOG_PATH, index=False) | |
| if not os.path.exists(META_PATH): | |
| with open(META_PATH, "w", encoding="utf-8") as f: | |
| json.dump({"created_at": datetime.utcnow().isoformat()}, f) | |
| # インポート時に準備 | |
| _ensure_storage() | |
| def read_events() -> pd.DataFrame: | |
| _ensure_storage() | |
| df = pd.read_csv(LOG_PATH) | |
| if df.empty: | |
| return df | |
| df["date"] = pd.to_datetime(df["date"]).dt.date.astype(str) | |
| df["is_control"] = df["is_control"].fillna(0).astype(int) | |
| for col in ["impressions", "clicks", "conversions"]: | |
| df[col] = df[col].fillna(0).astype(int) | |
| df["cost"] = df["cost"].fillna(0.0).astype(float) | |
| df["features_json"] = df["features_json"].fillna("{}") | |
| return df | |
| def append_events(rows: pd.DataFrame) -> None: | |
| _ensure_storage() | |
| for c in SCHEMA: | |
| if c not in rows.columns: | |
| if c == "features_json": | |
| rows[c] = "{}" | |
| elif c == "ts": | |
| rows[c] = datetime.utcnow().isoformat() | |
| elif c == "date": | |
| rows[c] = datetime.utcnow().date().isoformat() | |
| elif c in ("impressions", "clicks", "conversions", "is_control"): | |
| rows[c] = 0 | |
| elif c == "cost": | |
| rows[c] = 0.0 | |
| else: | |
| rows[c] = None | |
| rows = rows[SCHEMA] | |
| rows.to_csv(LOG_PATH, mode="a", header=False, index=False) | |
| def aggregate(levels=("medium", "creative")) -> pd.DataFrame: | |
| _ensure_storage() | |
| df = read_events() | |
| if df.empty: | |
| return pd.DataFrame(columns=[*levels, "is_control", "impressions", "clicks", "conversions", "cost"]) | |
| g = df.groupby([*levels, "is_control"], dropna=False).agg( | |
| impressions=("impressions", "sum"), | |
| clicks=("clicks", "sum"), | |
| conversions=("conversions", "sum"), | |
| cost=("cost", "sum"), | |
| ).reset_index() | |
| return g | |