from __future__ import annotations import argparse import importlib.util import json from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable, Dict, List, Sequence MIGRATIONS_DIR = Path("migrations") DEFAULT_DATASET_DIR = Path("dataset") DEFAULT_STATE_FILE = Path("migrations_state.json") @dataclass class MigrationContext: dataset_dir: Path force: bool = False @property def csv_path(self) -> Path: return self.dataset_dir / "ds.csv" @property def sqlite_path(self) -> Path: return self.dataset_dir / "ds.sqlite" @dataclass class Migration: migration_id: str description: str path: Path apply: Callable[[MigrationContext], None] def load_state(path: Path) -> Dict: if not path.exists(): return {"applied": []} with path.open("r", encoding="utf-8") as f: return json.load(f) def save_state(path: Path, state: Dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=True, indent=2) def discover_migrations(root: Path) -> List[Migration]: migrations: List[Migration] = [] for module_path in sorted(root.glob("*.py")): if module_path.name.startswith("_") or module_path.name == "__init__.py": continue spec = importlib.util.spec_from_file_location(module_path.stem, module_path) if spec is None or spec.loader is None: raise RuntimeError(f"Cannot load migration module {module_path}") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) apply_fn = getattr(module, "run", None) or getattr(module, "apply", None) if not callable(apply_fn): raise RuntimeError(f"Migration {module_path} must expose run(context) function") migration_id = getattr(module, "MIGRATION_ID", module_path.stem) description = getattr(module, "DESCRIPTION", "") migrations.append(Migration(migration_id=migration_id, description=description, path=module_path, apply=apply_fn)) seen = set() for m in migrations: if m.migration_id in seen: raise RuntimeError(f"Duplicate migration id detected: {m.migration_id}") seen.add(m.migration_id) return migrations def record_applied(state: Dict, migration: Migration) -> None: applied = [entry for entry in state.get("applied", []) if entry.get("id") != migration.migration_id] applied.append( { "id": migration.migration_id, "filename": migration.path.name, "applied_at": datetime.now(timezone.utc).isoformat(), } ) state["applied"] = applied def format_migration_list(migrations: Sequence[Migration], applied_ids: Sequence[str]) -> str: applied_set = set(applied_ids) lines = [] for m in migrations: status = "applied" if m.migration_id in applied_set else "pending" lines.append(f"[{status:7}] {m.migration_id} - {m.description}") return "\n".join(lines) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run dataset migrations") parser.add_argument("--dataset-dir", type=Path, default=DEFAULT_DATASET_DIR, help="Path to dataset directory (default: dataset)") parser.add_argument("--state-file", type=Path, default=DEFAULT_STATE_FILE, help="Path to migrations state file (default: migrations_state.json)") parser.add_argument("--force", action="store_true", help="Re-run migrations even if marked as applied") parser.add_argument("--list", action="store_true", help="List migrations and exit") return parser.parse_args() def main() -> None: args = parse_args() dataset_dir = args.dataset_dir.resolve() migrations_dir = MIGRATIONS_DIR.resolve() state_file = args.state_file.resolve() if not dataset_dir.exists(): raise SystemExit(f"Dataset directory not found: {dataset_dir}") ctx = MigrationContext(dataset_dir=dataset_dir, force=args.force) migrations = discover_migrations(migrations_dir) state = load_state(state_file) applied_ids = [] if args.force else [entry.get("id") for entry in state.get("applied", [])] if args.list: print(format_migration_list(migrations, applied_ids)) return for migration in migrations: if not args.force and migration.migration_id in applied_ids: print(f"[skip] {migration.migration_id} already applied") continue print(f"[run ] {migration.migration_id} - {migration.description}") migration.apply(ctx) record_applied(state, migration) save_state(state_file, state) print(f"[done] {migration.migration_id}") if __name__ == "__main__": main()