144 lines
5.4 KiB
Python
144 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
"""
|
|
Утилита для запуска файлов миграций из папки migrations и фиксации состояния применённых шагов.
|
|
"""
|
|
|
|
import argparse
|
|
import importlib.util
|
|
import json
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Callable, Dict, List, Sequence
|
|
|
|
MIGRATIONS_DIR = Path("migrations")
|
|
DEFAULT_DATASET_DIR = Path("dataset")
|
|
DEFAULT_STATE_FILE = Path("migrations_state.json")
|
|
|
|
|
|
@dataclass
|
|
class MigrationContext:
|
|
dataset_dir: Path
|
|
force: bool = False
|
|
|
|
@property
|
|
def csv_path(self) -> Path:
|
|
return self.dataset_dir / "ds.csv"
|
|
|
|
@property
|
|
def sqlite_path(self) -> Path:
|
|
return self.dataset_dir / "ds.sqlite"
|
|
|
|
|
|
@dataclass
|
|
class Migration:
|
|
migration_id: str
|
|
description: str
|
|
path: Path
|
|
apply: Callable[[MigrationContext], None]
|
|
|
|
|
|
def load_state(path: Path) -> Dict:
|
|
# Достаём список уже применённых миграций, чтобы не выполнять их повторно
|
|
if not path.exists():
|
|
return {"applied": []}
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_state(path: Path, state: Dict) -> None:
|
|
# Создаём файл состояния на диске после успешной миграции
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as f:
|
|
json.dump(state, f, ensure_ascii=True, indent=2)
|
|
|
|
|
|
def discover_migrations(root: Path) -> List[Migration]:
|
|
# Подгружаем все *.py в каталоге миграций и ищем в них функцию run/apply
|
|
migrations: List[Migration] = []
|
|
for module_path in sorted(root.glob("*.py")):
|
|
if module_path.name.startswith("_") or module_path.name == "__init__.py":
|
|
continue
|
|
spec = importlib.util.spec_from_file_location(module_path.stem, module_path)
|
|
if spec is None or spec.loader is None:
|
|
raise RuntimeError(f"Cannot load migration module {module_path}")
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
apply_fn = getattr(module, "run", None) or getattr(module, "apply", None)
|
|
if not callable(apply_fn):
|
|
raise RuntimeError(f"Migration {module_path} must expose run(context) function")
|
|
migration_id = getattr(module, "MIGRATION_ID", module_path.stem)
|
|
description = getattr(module, "DESCRIPTION", "")
|
|
migrations.append(Migration(migration_id=migration_id, description=description, path=module_path, apply=apply_fn))
|
|
seen = set()
|
|
for m in migrations:
|
|
if m.migration_id in seen:
|
|
raise RuntimeError(f"Duplicate migration id detected: {m.migration_id}")
|
|
seen.add(m.migration_id)
|
|
return migrations
|
|
|
|
|
|
def record_applied(state: Dict, migration: Migration) -> None:
|
|
# Обновляем состояние, фиксируя идентификатор и имя файла миграции
|
|
applied = [entry for entry in state.get("applied", []) if entry.get("id") != migration.migration_id]
|
|
applied.append(
|
|
{
|
|
"id": migration.migration_id,
|
|
"filename": migration.path.name,
|
|
"applied_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
)
|
|
state["applied"] = applied
|
|
|
|
|
|
def format_migration_list(migrations: Sequence[Migration], applied_ids: Sequence[str]) -> str:
|
|
applied_set = set(applied_ids)
|
|
lines = []
|
|
for m in migrations:
|
|
status = "applied" if m.migration_id in applied_set else "pending"
|
|
lines.append(f"[{status:7}] {m.migration_id} - {m.description}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Run dataset migrations")
|
|
parser.add_argument("--dataset-dir", type=Path, default=DEFAULT_DATASET_DIR, help="Path to dataset directory (default: dataset)")
|
|
parser.add_argument("--state-file", type=Path, default=DEFAULT_STATE_FILE, help="Path to migrations state file (default: migrations_state.json)")
|
|
parser.add_argument("--force", action="store_true", help="Re-run migrations even if marked as applied")
|
|
parser.add_argument("--list", action="store_true", help="List migrations and exit")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
dataset_dir = args.dataset_dir.resolve()
|
|
migrations_dir = MIGRATIONS_DIR.resolve()
|
|
state_file = args.state_file.resolve()
|
|
|
|
if not dataset_dir.exists():
|
|
raise SystemExit(f"Dataset directory not found: {dataset_dir}")
|
|
|
|
ctx = MigrationContext(dataset_dir=dataset_dir, force=args.force)
|
|
migrations = discover_migrations(migrations_dir)
|
|
state = load_state(state_file)
|
|
applied_ids = [] if args.force else [entry.get("id") for entry in state.get("applied", [])]
|
|
|
|
if args.list:
|
|
print(format_migration_list(migrations, applied_ids))
|
|
return
|
|
|
|
for migration in migrations:
|
|
if not args.force and migration.migration_id in applied_ids:
|
|
print(f"[skip] {migration.migration_id} already applied")
|
|
continue
|
|
print(f"[run ] {migration.migration_id} - {migration.description}")
|
|
migration.apply(ctx)
|
|
record_applied(state, migration)
|
|
save_state(state_file, state)
|
|
print(f"[done] {migration.migration_id}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|