Files
dano2025/migrate.py
2025-12-12 20:34:40 +03:00

136 lines
4.7 KiB
Python

from __future__ import annotations
import argparse
import importlib.util
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List, Sequence
MIGRATIONS_DIR = Path("migrations")
DEFAULT_DATASET_DIR = Path("dataset")
DEFAULT_STATE_FILE = Path("migrations_state.json")
@dataclass
class MigrationContext:
dataset_dir: Path
force: bool = False
@property
def csv_path(self) -> Path:
return self.dataset_dir / "ds.csv"
@property
def sqlite_path(self) -> Path:
return self.dataset_dir / "ds.sqlite"
@dataclass
class Migration:
migration_id: str
description: str
path: Path
apply: Callable[[MigrationContext], None]
def load_state(path: Path) -> Dict:
if not path.exists():
return {"applied": []}
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_state(path: Path, state: Dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=True, indent=2)
def discover_migrations(root: Path) -> List[Migration]:
migrations: List[Migration] = []
for module_path in sorted(root.glob("*.py")):
if module_path.name.startswith("_") or module_path.name == "__init__.py":
continue
spec = importlib.util.spec_from_file_location(module_path.stem, module_path)
if spec is None or spec.loader is None:
raise RuntimeError(f"Cannot load migration module {module_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
apply_fn = getattr(module, "run", None) or getattr(module, "apply", None)
if not callable(apply_fn):
raise RuntimeError(f"Migration {module_path} must expose run(context) function")
migration_id = getattr(module, "MIGRATION_ID", module_path.stem)
description = getattr(module, "DESCRIPTION", "")
migrations.append(Migration(migration_id=migration_id, description=description, path=module_path, apply=apply_fn))
seen = set()
for m in migrations:
if m.migration_id in seen:
raise RuntimeError(f"Duplicate migration id detected: {m.migration_id}")
seen.add(m.migration_id)
return migrations
def record_applied(state: Dict, migration: Migration) -> None:
applied = [entry for entry in state.get("applied", []) if entry.get("id") != migration.migration_id]
applied.append(
{
"id": migration.migration_id,
"filename": migration.path.name,
"applied_at": datetime.now(timezone.utc).isoformat(),
}
)
state["applied"] = applied
def format_migration_list(migrations: Sequence[Migration], applied_ids: Sequence[str]) -> str:
applied_set = set(applied_ids)
lines = []
for m in migrations:
status = "applied" if m.migration_id in applied_set else "pending"
lines.append(f"[{status:7}] {m.migration_id} - {m.description}")
return "\n".join(lines)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run dataset migrations")
parser.add_argument("--dataset-dir", type=Path, default=DEFAULT_DATASET_DIR, help="Path to dataset directory (default: dataset)")
parser.add_argument("--state-file", type=Path, default=DEFAULT_STATE_FILE, help="Path to migrations state file (default: migrations_state.json)")
parser.add_argument("--force", action="store_true", help="Re-run migrations even if marked as applied")
parser.add_argument("--list", action="store_true", help="List migrations and exit")
return parser.parse_args()
def main() -> None:
args = parse_args()
dataset_dir = args.dataset_dir.resolve()
migrations_dir = MIGRATIONS_DIR.resolve()
state_file = args.state_file.resolve()
if not dataset_dir.exists():
raise SystemExit(f"Dataset directory not found: {dataset_dir}")
ctx = MigrationContext(dataset_dir=dataset_dir, force=args.force)
migrations = discover_migrations(migrations_dir)
state = load_state(state_file)
applied_ids = [] if args.force else [entry.get("id") for entry in state.get("applied", [])]
if args.list:
print(format_migration_list(migrations, applied_ids))
return
for migration in migrations:
if not args.force and migration.migration_id in applied_ids:
print(f"[skip] {migration.migration_id} already applied")
continue
print(f"[run ] {migration.migration_id} - {migration.description}")
migration.apply(ctx)
record_applied(state, migration)
save_state(state_file, state)
print(f"[done] {migration.migration_id}")
if __name__ == "__main__":
main()