Files
dano2025/migrate.py
2025-12-16 01:51:05 +03:00

144 lines
5.4 KiB
Python

from __future__ import annotations
"""
Утилита для запуска файлов миграций из папки migrations и фиксации состояния применённых шагов.
"""
import argparse
import importlib.util
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List, Sequence
MIGRATIONS_DIR = Path("migrations")
DEFAULT_DATASET_DIR = Path("dataset")
DEFAULT_STATE_FILE = Path("migrations_state.json")
@dataclass
class MigrationContext:
dataset_dir: Path
force: bool = False
@property
def csv_path(self) -> Path:
return self.dataset_dir / "ds.csv"
@property
def sqlite_path(self) -> Path:
return self.dataset_dir / "ds.sqlite"
@dataclass
class Migration:
migration_id: str
description: str
path: Path
apply: Callable[[MigrationContext], None]
def load_state(path: Path) -> Dict:
# Достаём список уже применённых миграций, чтобы не выполнять их повторно
if not path.exists():
return {"applied": []}
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_state(path: Path, state: Dict) -> None:
# Создаём файл состояния на диске после успешной миграции
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=True, indent=2)
def discover_migrations(root: Path) -> List[Migration]:
# Подгружаем все *.py в каталоге миграций и ищем в них функцию run/apply
migrations: List[Migration] = []
for module_path in sorted(root.glob("*.py")):
if module_path.name.startswith("_") or module_path.name == "__init__.py":
continue
spec = importlib.util.spec_from_file_location(module_path.stem, module_path)
if spec is None or spec.loader is None:
raise RuntimeError(f"Cannot load migration module {module_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
apply_fn = getattr(module, "run", None) or getattr(module, "apply", None)
if not callable(apply_fn):
raise RuntimeError(f"Migration {module_path} must expose run(context) function")
migration_id = getattr(module, "MIGRATION_ID", module_path.stem)
description = getattr(module, "DESCRIPTION", "")
migrations.append(Migration(migration_id=migration_id, description=description, path=module_path, apply=apply_fn))
seen = set()
for m in migrations:
if m.migration_id in seen:
raise RuntimeError(f"Duplicate migration id detected: {m.migration_id}")
seen.add(m.migration_id)
return migrations
def record_applied(state: Dict, migration: Migration) -> None:
# Обновляем состояние, фиксируя идентификатор и имя файла миграции
applied = [entry for entry in state.get("applied", []) if entry.get("id") != migration.migration_id]
applied.append(
{
"id": migration.migration_id,
"filename": migration.path.name,
"applied_at": datetime.now(timezone.utc).isoformat(),
}
)
state["applied"] = applied
def format_migration_list(migrations: Sequence[Migration], applied_ids: Sequence[str]) -> str:
applied_set = set(applied_ids)
lines = []
for m in migrations:
status = "applied" if m.migration_id in applied_set else "pending"
lines.append(f"[{status:7}] {m.migration_id} - {m.description}")
return "\n".join(lines)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run dataset migrations")
parser.add_argument("--dataset-dir", type=Path, default=DEFAULT_DATASET_DIR, help="Path to dataset directory (default: dataset)")
parser.add_argument("--state-file", type=Path, default=DEFAULT_STATE_FILE, help="Path to migrations state file (default: migrations_state.json)")
parser.add_argument("--force", action="store_true", help="Re-run migrations even if marked as applied")
parser.add_argument("--list", action="store_true", help="List migrations and exit")
return parser.parse_args()
def main() -> None:
args = parse_args()
dataset_dir = args.dataset_dir.resolve()
migrations_dir = MIGRATIONS_DIR.resolve()
state_file = args.state_file.resolve()
if not dataset_dir.exists():
raise SystemExit(f"Dataset directory not found: {dataset_dir}")
ctx = MigrationContext(dataset_dir=dataset_dir, force=args.force)
migrations = discover_migrations(migrations_dir)
state = load_state(state_file)
applied_ids = [] if args.force else [entry.get("id") for entry in state.get("applied", [])]
if args.list:
print(format_migration_list(migrations, applied_ids))
return
for migration in migrations:
if not args.force and migration.migration_id in applied_ids:
print(f"[skip] {migration.migration_id} already applied")
continue
print(f"[run ] {migration.migration_id} - {migration.description}")
migration.apply(ctx)
record_applied(state, migration)
save_state(state_file, state)
print(f"[done] {migration.migration_id}")
if __name__ == "__main__":
main()