from __future__ import annotations import asyncio import logging import re from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class Task: name: str project: str status: str # active | waiting | completed | unknown fraction_done: float elapsed_seconds: float remaining_seconds: Optional[float] deadline: Optional[datetime] resources: Optional[str] = None slot: Optional[str] = None @property def progress_percent(self) -> int: return max(0, min(int(round(self.fraction_done * 100)), 100)) @dataclass class BoincSnapshot: tasks: List[Task] fetched_at: datetime raw: str = "" @property def active(self) -> List[Task]: return [t for t in self.tasks if t.status == "active"] @property def queued(self) -> List[Task]: return [t for t in self.tasks if t.status == "waiting"] @property def completed(self) -> List[Task]: return [t for t in self.tasks if t.status == "completed"] @property def average_progress(self) -> float: if not self.tasks: return 0.0 return sum(t.progress_percent for t in self.tasks) / len(self.tasks) @property def total_remaining_seconds(self) -> float: remaining = [t.remaining_seconds for t in self.tasks if t.remaining_seconds] return sum(remaining) if remaining else 0.0 class BoincClient: def __init__( self, host: str = "localhost", port: int = 31416, password: Optional[str] = None, boinccmd_path: str = "boinccmd", sample_output: Optional[str] = None, ) -> None: self.host = host self.port = port self.password = password self.boinccmd_path = boinccmd_path self.sample_output = Path(sample_output) if sample_output else None async def fetch_snapshot(self) -> BoincSnapshot: raw = await self._fetch_raw() tasks = parse_boinccmd_tasks(raw) return BoincSnapshot(tasks=tasks, fetched_at=datetime.now(timezone.utc), raw=raw) async def _fetch_raw(self) -> str: if self.sample_output: return self.sample_output.read_text() cmd = [self.boinccmd_path, "--host", f"{self.host}:{self.port}", "--get_tasks"] if self.password: cmd.extend(["--passwd", self.password]) try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) except FileNotFoundError as exc: raise RuntimeError(f"boinccmd not found at '{self.boinccmd_path}'") from exc stdout, stderr = await process.communicate() if process.returncode != 0: err_text = stderr.decode() if stderr else "unknown error" raise RuntimeError(f"boinccmd failed: {err_text.strip()}") return stdout.decode() def parse_boinccmd_tasks(output: str) -> List[Task]: tasks: List[Task] = [] current: Dict[str, str] = {} for line in output.splitlines(): stripped = line.strip() if not stripped: continue if re.match(r"^\d+\)", stripped): if current: task = _task_from_dict(current) if task: tasks.append(task) current = {} continue if stripped.startswith("========"): continue if ":" not in stripped: continue key, value = stripped.split(":", 1) current[key.strip().lower()] = value.strip() if current: task = _task_from_dict(current) if task: tasks.append(task) return tasks def _task_from_dict(data: Dict[str, str]) -> Optional[Task]: name = data.get("name", "unknown") project = data.get("project url", "unknown project") fraction_done = _parse_float(data.get("fraction done", "0")) elapsed = _parse_float(data.get("elapsed time", "0")) remaining = _parse_float(data.get("estimated cpu time remaining", "0")) deadline = _parse_deadline(data.get("report deadline")) resources = data.get("resources") slot = data.get("slot") status = _deduce_status(data) return Task( name=name, project=project, status=status, fraction_done=fraction_done, elapsed_seconds=elapsed, remaining_seconds=remaining, deadline=deadline, resources=resources, slot=slot, ) def _deduce_status(data: Dict[str, str]) -> str: ready = data.get("ready to report", "").lower() == "yes" if ready: return "completed" active_state = data.get("active_task_state") or data.get("active task state") if active_state: active_state = active_state.upper() if active_state in {"EXECUTING", "RUNNING", "READY"}: return "active" if active_state in {"SUSPENDED", "UNINITIALIZED", "SUSPENDED_VIA_GUI"}: return "waiting" scheduler_state = data.get("scheduler state") or data.get("state") if scheduler_state and scheduler_state.strip().isdigit(): # 1 is typically waiting to run, 2 is executing try: sched_val = int(scheduler_state.strip()) if sched_val >= 2: return "active" if sched_val == 1: return "waiting" except ValueError: pass return "waiting" def _parse_float(value: Optional[str]) -> float: if not value: return 0.0 try: return float(value.split()[0]) except ValueError: return 0.0 def _parse_deadline(value: Optional[str]) -> Optional[datetime]: if not value: return None try: # Example: Tue Jan 9 05:58:07 2024 return datetime.strptime(value, "%a %b %d %H:%M:%S %Y").replace(tzinfo=timezone.utc) except Exception: logger.debug("Could not parse deadline: %s", value) return None