from __future__ import annotations """ Утилита для запуска файлов миграций из папки migrations и фиксации состояния применённых шагов. """ import argparse import importlib.util import json from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable, Dict, List, Sequence MIGRATIONS_DIR = Path("migrations") DEFAULT_DATASET_DIR = Path("dataset") DEFAULT_STATE_FILE = Path("migrations_state.json") @dataclass class MigrationContext: dataset_dir: Path force: bool = False @property def csv_path(self) -> Path: return self.dataset_dir / "ds.csv" @property def sqlite_path(self) -> Path: return self.dataset_dir / "ds.sqlite" @dataclass class Migration: migration_id: str description: str path: Path apply: Callable[[MigrationContext], None] def load_state(path: Path) -> Dict: # Достаём список уже применённых миграций, чтобы не выполнять их повторно if not path.exists(): return {"applied": []} with path.open("r", encoding="utf-8") as f: return json.load(f) def save_state(path: Path, state: Dict) -> None: # Создаём файл состояния на диске после успешной миграции path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=True, indent=2) def discover_migrations(root: Path) -> List[Migration]: # Подгружаем все *.py в каталоге миграций и ищем в них функцию run/apply migrations: List[Migration] = [] for module_path in sorted(root.glob("*.py")): if module_path.name.startswith("_") or module_path.name == "__init__.py": continue spec = importlib.util.spec_from_file_location(module_path.stem, module_path) if spec is None or spec.loader is None: raise RuntimeError(f"Cannot load migration module {module_path}") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) apply_fn = getattr(module, "run", None) or getattr(module, "apply", None) if not callable(apply_fn): raise RuntimeError(f"Migration {module_path} must expose run(context) function") migration_id = getattr(module, "MIGRATION_ID", module_path.stem) description = getattr(module, "DESCRIPTION", "") migrations.append(Migration(migration_id=migration_id, description=description, path=module_path, apply=apply_fn)) seen = set() for m in migrations: if m.migration_id in seen: raise RuntimeError(f"Duplicate migration id detected: {m.migration_id}") seen.add(m.migration_id) return migrations def record_applied(state: Dict, migration: Migration) -> None: # Обновляем состояние, фиксируя идентификатор и имя файла миграции applied = [entry for entry in state.get("applied", []) if entry.get("id") != migration.migration_id] applied.append( { "id": migration.migration_id, "filename": migration.path.name, "applied_at": datetime.now(timezone.utc).isoformat(), } ) state["applied"] = applied def format_migration_list(migrations: Sequence[Migration], applied_ids: Sequence[str]) -> str: applied_set = set(applied_ids) lines = [] for m in migrations: status = "applied" if m.migration_id in applied_set else "pending" lines.append(f"[{status:7}] {m.migration_id} - {m.description}") return "\n".join(lines) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run dataset migrations") parser.add_argument("--dataset-dir", type=Path, default=DEFAULT_DATASET_DIR, help="Path to dataset directory (default: dataset)") parser.add_argument("--state-file", type=Path, default=DEFAULT_STATE_FILE, help="Path to migrations state file (default: migrations_state.json)") parser.add_argument("--force", action="store_true", help="Re-run migrations even if marked as applied") parser.add_argument("--list", action="store_true", help="List migrations and exit") return parser.parse_args() def main() -> None: args = parse_args() dataset_dir = args.dataset_dir.resolve() migrations_dir = MIGRATIONS_DIR.resolve() state_file = args.state_file.resolve() if not dataset_dir.exists(): raise SystemExit(f"Dataset directory not found: {dataset_dir}") ctx = MigrationContext(dataset_dir=dataset_dir, force=args.force) migrations = discover_migrations(migrations_dir) state = load_state(state_file) applied_ids = [] if args.force else [entry.get("id") for entry in state.get("applied", [])] if args.list: print(format_migration_list(migrations, applied_ids)) return for migration in migrations: if not args.force and migration.migration_id in applied_ids: print(f"[skip] {migration.migration_id} already applied") continue print(f"[run ] {migration.migration_id} - {migration.description}") migration.apply(ctx) record_applied(state, migration) save_state(state_file, state) print(f"[done] {migration.migration_id}") if __name__ == "__main__": main()