206 lines
6.0 KiB
Python
206 lines
6.0 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Task:
|
|
name: str
|
|
project: str
|
|
status: str # active | waiting | completed | unknown
|
|
fraction_done: float
|
|
elapsed_seconds: float
|
|
remaining_seconds: Optional[float]
|
|
deadline: Optional[datetime]
|
|
resources: Optional[str] = None
|
|
slot: Optional[str] = None
|
|
|
|
@property
|
|
def progress_percent(self) -> int:
|
|
return max(0, min(int(round(self.fraction_done * 100)), 100))
|
|
|
|
|
|
@dataclass
|
|
class BoincSnapshot:
|
|
tasks: List[Task]
|
|
fetched_at: datetime
|
|
raw: str = ""
|
|
|
|
@property
|
|
def active(self) -> List[Task]:
|
|
return [t for t in self.tasks if t.status == "active"]
|
|
|
|
@property
|
|
def queued(self) -> List[Task]:
|
|
return [t for t in self.tasks if t.status == "waiting"]
|
|
|
|
@property
|
|
def completed(self) -> List[Task]:
|
|
return [t for t in self.tasks if t.status == "completed"]
|
|
|
|
@property
|
|
def average_progress(self) -> float:
|
|
if not self.tasks:
|
|
return 0.0
|
|
return sum(t.progress_percent for t in self.tasks) / len(self.tasks)
|
|
|
|
@property
|
|
def total_remaining_seconds(self) -> float:
|
|
remaining = [t.remaining_seconds for t in self.tasks if t.remaining_seconds]
|
|
return sum(remaining) if remaining else 0.0
|
|
|
|
|
|
class BoincClient:
|
|
def __init__(
|
|
self,
|
|
host: str = "localhost",
|
|
port: int = 31416,
|
|
password: Optional[str] = None,
|
|
boinccmd_path: str = "boinccmd",
|
|
sample_output: Optional[str] = None,
|
|
) -> None:
|
|
self.host = host
|
|
self.port = port
|
|
self.password = password
|
|
self.boinccmd_path = boinccmd_path
|
|
self.sample_output = Path(sample_output) if sample_output else None
|
|
|
|
async def fetch_snapshot(self) -> BoincSnapshot:
|
|
raw = await self._fetch_raw()
|
|
tasks = parse_boinccmd_tasks(raw)
|
|
return BoincSnapshot(tasks=tasks, fetched_at=datetime.now(timezone.utc), raw=raw)
|
|
|
|
async def _fetch_raw(self) -> str:
|
|
if self.sample_output:
|
|
return self.sample_output.read_text()
|
|
|
|
cmd = [self.boinccmd_path, "--host", f"{self.host}:{self.port}", "--get_tasks"]
|
|
if self.password:
|
|
cmd.extend(["--passwd", self.password])
|
|
|
|
try:
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
)
|
|
except FileNotFoundError as exc:
|
|
raise RuntimeError(f"boinccmd not found at '{self.boinccmd_path}'") from exc
|
|
|
|
stdout, stderr = await process.communicate()
|
|
if process.returncode != 0:
|
|
err_text = stderr.decode() if stderr else "unknown error"
|
|
raise RuntimeError(f"boinccmd failed: {err_text.strip()}")
|
|
|
|
return stdout.decode()
|
|
|
|
|
|
def parse_boinccmd_tasks(output: str) -> List[Task]:
|
|
tasks: List[Task] = []
|
|
current: Dict[str, str] = {}
|
|
|
|
for line in output.splitlines():
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
if re.match(r"^\d+\)", stripped):
|
|
if current:
|
|
task = _task_from_dict(current)
|
|
if task:
|
|
tasks.append(task)
|
|
current = {}
|
|
continue
|
|
if stripped.startswith("========"):
|
|
continue
|
|
if ":" not in stripped:
|
|
continue
|
|
|
|
key, value = stripped.split(":", 1)
|
|
current[key.strip().lower()] = value.strip()
|
|
|
|
if current:
|
|
task = _task_from_dict(current)
|
|
if task:
|
|
tasks.append(task)
|
|
|
|
return tasks
|
|
|
|
|
|
def _task_from_dict(data: Dict[str, str]) -> Optional[Task]:
|
|
name = data.get("name", "unknown")
|
|
project = data.get("project url", "unknown project")
|
|
fraction_done = _parse_float(data.get("fraction done", "0"))
|
|
elapsed = _parse_float(data.get("elapsed time", "0"))
|
|
remaining = _parse_float(data.get("estimated cpu time remaining", "0"))
|
|
deadline = _parse_deadline(data.get("report deadline"))
|
|
resources = data.get("resources")
|
|
slot = data.get("slot")
|
|
status = _deduce_status(data)
|
|
|
|
return Task(
|
|
name=name,
|
|
project=project,
|
|
status=status,
|
|
fraction_done=fraction_done,
|
|
elapsed_seconds=elapsed,
|
|
remaining_seconds=remaining,
|
|
deadline=deadline,
|
|
resources=resources,
|
|
slot=slot,
|
|
)
|
|
|
|
|
|
def _deduce_status(data: Dict[str, str]) -> str:
|
|
ready = data.get("ready to report", "").lower() == "yes"
|
|
if ready:
|
|
return "completed"
|
|
|
|
active_state = data.get("active_task_state") or data.get("active task state")
|
|
if active_state:
|
|
active_state = active_state.upper()
|
|
if active_state in {"EXECUTING", "RUNNING", "READY"}:
|
|
return "active"
|
|
if active_state in {"SUSPENDED", "UNINITIALIZED", "SUSPENDED_VIA_GUI"}:
|
|
return "waiting"
|
|
|
|
scheduler_state = data.get("scheduler state") or data.get("state")
|
|
if scheduler_state and scheduler_state.strip().isdigit():
|
|
# 1 is typically waiting to run, 2 is executing
|
|
try:
|
|
sched_val = int(scheduler_state.strip())
|
|
if sched_val >= 2:
|
|
return "active"
|
|
if sched_val == 1:
|
|
return "waiting"
|
|
except ValueError:
|
|
pass
|
|
|
|
return "waiting"
|
|
|
|
|
|
def _parse_float(value: Optional[str]) -> float:
|
|
if not value:
|
|
return 0.0
|
|
try:
|
|
return float(value.split()[0])
|
|
except ValueError:
|
|
return 0.0
|
|
|
|
|
|
def _parse_deadline(value: Optional[str]) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
try:
|
|
# Example: Tue Jan 9 05:58:07 2024
|
|
return datetime.strptime(value, "%a %b %d %H:%M:%S %Y").replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
logger.debug("Could not parse deadline: %s", value)
|
|
return None
|
|
|