From dd8feb488e2e64de76c9bf5795525f4a5cba5130 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 12 Dec 2025 20:34:40 +0300 Subject: [PATCH] added migrations --- .gitignore | 1 + README.md | 8 +- migrate.py | 135 +++++++++++++++++++++++++++++++ migrations/0001_csv_to_sqlite.py | 48 +++++++++++ migrations_state.json | 9 +++ 5 files changed, 200 insertions(+), 1 deletion(-) create mode 100644 migrate.py create mode 100644 migrations/0001_csv_to_sqlite.py create mode 100644 migrations_state.json diff --git a/.gitignore b/.gitignore index 82cf965..9648f0b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea .venv __pycache__ +*.sqlite \ No newline at end of file diff --git a/README.md b/README.md index 39d0efb..d248363 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ # dano2025 -dano 2025/2026 solve by FinalTry.exe team \ No newline at end of file +dano 2025/2026 solve by FinalTry.exe team + +## Dataset migrations +- Запуск всех миграций: `python migrate.py` +- Посмотреть список и статус: `python migrate.py --list` +- Принудительно переисполнить уже примененные миграции: `python migrate.py --force` +- По умолчанию миграции работают с `dataset/ds.csv` и создают SQLite базу `dataset/ds.sqlite` (таблица `communications`). diff --git a/migrate.py b/migrate.py new file mode 100644 index 0000000..4e4ee6f --- /dev/null +++ b/migrate.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import argparse +import importlib.util +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Callable, Dict, List, Sequence + +MIGRATIONS_DIR = Path("migrations") +DEFAULT_DATASET_DIR = Path("dataset") +DEFAULT_STATE_FILE = Path("migrations_state.json") + + +@dataclass +class MigrationContext: + dataset_dir: Path + force: bool = False + + @property + def csv_path(self) -> Path: + return self.dataset_dir / "ds.csv" + + @property + def sqlite_path(self) -> Path: + return self.dataset_dir / "ds.sqlite" + + +@dataclass +class Migration: + migration_id: str + description: str + path: Path + apply: Callable[[MigrationContext], None] + + +def load_state(path: Path) -> Dict: + if not path.exists(): + return {"applied": []} + with path.open("r", encoding="utf-8") as f: + return json.load(f) + + +def save_state(path: Path, state: Dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=True, indent=2) + + +def discover_migrations(root: Path) -> List[Migration]: + migrations: List[Migration] = [] + for module_path in sorted(root.glob("*.py")): + if module_path.name.startswith("_") or module_path.name == "__init__.py": + continue + spec = importlib.util.spec_from_file_location(module_path.stem, module_path) + if spec is None or spec.loader is None: + raise RuntimeError(f"Cannot load migration module {module_path}") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + apply_fn = getattr(module, "run", None) or getattr(module, "apply", None) + if not callable(apply_fn): + raise RuntimeError(f"Migration {module_path} must expose run(context) function") + migration_id = getattr(module, "MIGRATION_ID", module_path.stem) + description = getattr(module, "DESCRIPTION", "") + migrations.append(Migration(migration_id=migration_id, description=description, path=module_path, apply=apply_fn)) + seen = set() + for m in migrations: + if m.migration_id in seen: + raise RuntimeError(f"Duplicate migration id detected: {m.migration_id}") + seen.add(m.migration_id) + return migrations + + +def record_applied(state: Dict, migration: Migration) -> None: + applied = [entry for entry in state.get("applied", []) if entry.get("id") != migration.migration_id] + applied.append( + { + "id": migration.migration_id, + "filename": migration.path.name, + "applied_at": datetime.now(timezone.utc).isoformat(), + } + ) + state["applied"] = applied + + +def format_migration_list(migrations: Sequence[Migration], applied_ids: Sequence[str]) -> str: + applied_set = set(applied_ids) + lines = [] + for m in migrations: + status = "applied" if m.migration_id in applied_set else "pending" + lines.append(f"[{status:7}] {m.migration_id} - {m.description}") + return "\n".join(lines) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run dataset migrations") + parser.add_argument("--dataset-dir", type=Path, default=DEFAULT_DATASET_DIR, help="Path to dataset directory (default: dataset)") + parser.add_argument("--state-file", type=Path, default=DEFAULT_STATE_FILE, help="Path to migrations state file (default: migrations_state.json)") + parser.add_argument("--force", action="store_true", help="Re-run migrations even if marked as applied") + parser.add_argument("--list", action="store_true", help="List migrations and exit") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + dataset_dir = args.dataset_dir.resolve() + migrations_dir = MIGRATIONS_DIR.resolve() + state_file = args.state_file.resolve() + + if not dataset_dir.exists(): + raise SystemExit(f"Dataset directory not found: {dataset_dir}") + + ctx = MigrationContext(dataset_dir=dataset_dir, force=args.force) + migrations = discover_migrations(migrations_dir) + state = load_state(state_file) + applied_ids = [] if args.force else [entry.get("id") for entry in state.get("applied", [])] + + if args.list: + print(format_migration_list(migrations, applied_ids)) + return + + for migration in migrations: + if not args.force and migration.migration_id in applied_ids: + print(f"[skip] {migration.migration_id} already applied") + continue + print(f"[run ] {migration.migration_id} - {migration.description}") + migration.apply(ctx) + record_applied(state, migration) + save_state(state_file, state) + print(f"[done] {migration.migration_id}") + + +if __name__ == "__main__": + main() diff --git a/migrations/0001_csv_to_sqlite.py b/migrations/0001_csv_to_sqlite.py new file mode 100644 index 0000000..254f8a1 --- /dev/null +++ b/migrations/0001_csv_to_sqlite.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pandas as pd + +MIGRATION_ID = "0001_csv_to_sqlite" +DESCRIPTION = "Convert dataset/ds.csv into dataset/ds.sqlite table communications" + +CHUNK_SIZE = 10000 +TABLE_NAME = "communications" + + +def run(context) -> None: + dataset_dir = Path(getattr(context, "dataset_dir", Path.cwd())) + csv_path = getattr(context, "csv_path", dataset_dir / "ds.csv") + sqlite_path = getattr(context, "sqlite_path", dataset_dir / "ds.sqlite") + force = bool(getattr(context, "force", False)) + + if not csv_path.exists(): + raise FileNotFoundError(f"CSV file not found: {csv_path}") + + if sqlite_path.exists(): + if force: + sqlite_path.unlink() + else: + print(f"SQLite database already exists at {sqlite_path}, skipping migration") + return + + sqlite_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(sqlite_path) + try: + first_chunk = True + for chunk in pd.read_csv(csv_path, chunksize=CHUNK_SIZE): + chunk["business_dt"] = pd.to_datetime(chunk["business_dt"]).dt.strftime("%Y-%m-%d") + if_exists = "replace" if first_chunk else "append" + chunk.to_sql(TABLE_NAME, conn, if_exists=if_exists, index=False) + first_chunk = False + + if first_chunk: + raise RuntimeError("Source CSV is empty, no rows were written to SQLite") + + conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{TABLE_NAME}_id ON {TABLE_NAME}(id)") + conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{TABLE_NAME}_business_dt ON {TABLE_NAME}(business_dt)") + conn.commit() + finally: + conn.close() diff --git a/migrations_state.json b/migrations_state.json new file mode 100644 index 0000000..2eaa93e --- /dev/null +++ b/migrations_state.json @@ -0,0 +1,9 @@ +{ + "applied": [ + { + "id": "0001_csv_to_sqlite", + "filename": "0001_csv_to_sqlite.py", + "applied_at": "2025-12-12T17:33:17.218732Z" + } + ] +} \ No newline at end of file