added migrations

This commit is contained in:
dan
2025-12-12 20:34:40 +03:00
parent d1d016b2ed
commit dd8feb488e
5 changed files with 200 additions and 1 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.idea .idea
.venv .venv
__pycache__ __pycache__
*.sqlite

View File

@@ -1,3 +1,9 @@
# dano2025 # dano2025
dano 2025/2026 solve by FinalTry.exe team dano 2025/2026 solve by FinalTry.exe team
## Dataset migrations
- Запуск всех миграций: `python migrate.py`
- Посмотреть список и статус: `python migrate.py --list`
- Принудительно переисполнить уже примененные миграции: `python migrate.py --force`
- По умолчанию миграции работают с `dataset/ds.csv` и создают SQLite базу `dataset/ds.sqlite` (таблица `communications`).

135
migrate.py Normal file
View File

@@ -0,0 +1,135 @@
from __future__ import annotations
import argparse
import importlib.util
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List, Sequence
MIGRATIONS_DIR = Path("migrations")
DEFAULT_DATASET_DIR = Path("dataset")
DEFAULT_STATE_FILE = Path("migrations_state.json")
@dataclass
class MigrationContext:
dataset_dir: Path
force: bool = False
@property
def csv_path(self) -> Path:
return self.dataset_dir / "ds.csv"
@property
def sqlite_path(self) -> Path:
return self.dataset_dir / "ds.sqlite"
@dataclass
class Migration:
migration_id: str
description: str
path: Path
apply: Callable[[MigrationContext], None]
def load_state(path: Path) -> Dict:
if not path.exists():
return {"applied": []}
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_state(path: Path, state: Dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=True, indent=2)
def discover_migrations(root: Path) -> List[Migration]:
migrations: List[Migration] = []
for module_path in sorted(root.glob("*.py")):
if module_path.name.startswith("_") or module_path.name == "__init__.py":
continue
spec = importlib.util.spec_from_file_location(module_path.stem, module_path)
if spec is None or spec.loader is None:
raise RuntimeError(f"Cannot load migration module {module_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
apply_fn = getattr(module, "run", None) or getattr(module, "apply", None)
if not callable(apply_fn):
raise RuntimeError(f"Migration {module_path} must expose run(context) function")
migration_id = getattr(module, "MIGRATION_ID", module_path.stem)
description = getattr(module, "DESCRIPTION", "")
migrations.append(Migration(migration_id=migration_id, description=description, path=module_path, apply=apply_fn))
seen = set()
for m in migrations:
if m.migration_id in seen:
raise RuntimeError(f"Duplicate migration id detected: {m.migration_id}")
seen.add(m.migration_id)
return migrations
def record_applied(state: Dict, migration: Migration) -> None:
applied = [entry for entry in state.get("applied", []) if entry.get("id") != migration.migration_id]
applied.append(
{
"id": migration.migration_id,
"filename": migration.path.name,
"applied_at": datetime.now(timezone.utc).isoformat(),
}
)
state["applied"] = applied
def format_migration_list(migrations: Sequence[Migration], applied_ids: Sequence[str]) -> str:
applied_set = set(applied_ids)
lines = []
for m in migrations:
status = "applied" if m.migration_id in applied_set else "pending"
lines.append(f"[{status:7}] {m.migration_id} - {m.description}")
return "\n".join(lines)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run dataset migrations")
parser.add_argument("--dataset-dir", type=Path, default=DEFAULT_DATASET_DIR, help="Path to dataset directory (default: dataset)")
parser.add_argument("--state-file", type=Path, default=DEFAULT_STATE_FILE, help="Path to migrations state file (default: migrations_state.json)")
parser.add_argument("--force", action="store_true", help="Re-run migrations even if marked as applied")
parser.add_argument("--list", action="store_true", help="List migrations and exit")
return parser.parse_args()
def main() -> None:
args = parse_args()
dataset_dir = args.dataset_dir.resolve()
migrations_dir = MIGRATIONS_DIR.resolve()
state_file = args.state_file.resolve()
if not dataset_dir.exists():
raise SystemExit(f"Dataset directory not found: {dataset_dir}")
ctx = MigrationContext(dataset_dir=dataset_dir, force=args.force)
migrations = discover_migrations(migrations_dir)
state = load_state(state_file)
applied_ids = [] if args.force else [entry.get("id") for entry in state.get("applied", [])]
if args.list:
print(format_migration_list(migrations, applied_ids))
return
for migration in migrations:
if not args.force and migration.migration_id in applied_ids:
print(f"[skip] {migration.migration_id} already applied")
continue
print(f"[run ] {migration.migration_id} - {migration.description}")
migration.apply(ctx)
record_applied(state, migration)
save_state(state_file, state)
print(f"[done] {migration.migration_id}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
import sqlite3
from pathlib import Path
import pandas as pd
MIGRATION_ID = "0001_csv_to_sqlite"
DESCRIPTION = "Convert dataset/ds.csv into dataset/ds.sqlite table communications"
CHUNK_SIZE = 10000
TABLE_NAME = "communications"
def run(context) -> None:
dataset_dir = Path(getattr(context, "dataset_dir", Path.cwd()))
csv_path = getattr(context, "csv_path", dataset_dir / "ds.csv")
sqlite_path = getattr(context, "sqlite_path", dataset_dir / "ds.sqlite")
force = bool(getattr(context, "force", False))
if not csv_path.exists():
raise FileNotFoundError(f"CSV file not found: {csv_path}")
if sqlite_path.exists():
if force:
sqlite_path.unlink()
else:
print(f"SQLite database already exists at {sqlite_path}, skipping migration")
return
sqlite_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(sqlite_path)
try:
first_chunk = True
for chunk in pd.read_csv(csv_path, chunksize=CHUNK_SIZE):
chunk["business_dt"] = pd.to_datetime(chunk["business_dt"]).dt.strftime("%Y-%m-%d")
if_exists = "replace" if first_chunk else "append"
chunk.to_sql(TABLE_NAME, conn, if_exists=if_exists, index=False)
first_chunk = False
if first_chunk:
raise RuntimeError("Source CSV is empty, no rows were written to SQLite")
conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{TABLE_NAME}_id ON {TABLE_NAME}(id)")
conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{TABLE_NAME}_business_dt ON {TABLE_NAME}(business_dt)")
conn.commit()
finally:
conn.close()

9
migrations_state.json Normal file
View File

@@ -0,0 +1,9 @@
{
"applied": [
{
"id": "0001_csv_to_sqlite",
"filename": "0001_csv_to_sqlite.py",
"applied_at": "2025-12-12T17:33:17.218732Z"
}
]
}