Files
BOINC-report-bot/bot/boinc.py
dan 55cfd73c24
All checks were successful
publish-images / build (push) Successful in 1m3s
Add BOINC Telegram bot, CI, and deploy compose
2026-01-06 09:44:48 +03:00

206 lines
6.0 KiB
Python

from __future__ import annotations
import asyncio
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class Task:
name: str
project: str
status: str # active | waiting | completed | unknown
fraction_done: float
elapsed_seconds: float
remaining_seconds: Optional[float]
deadline: Optional[datetime]
resources: Optional[str] = None
slot: Optional[str] = None
@property
def progress_percent(self) -> int:
return max(0, min(int(round(self.fraction_done * 100)), 100))
@dataclass
class BoincSnapshot:
tasks: List[Task]
fetched_at: datetime
raw: str = ""
@property
def active(self) -> List[Task]:
return [t for t in self.tasks if t.status == "active"]
@property
def queued(self) -> List[Task]:
return [t for t in self.tasks if t.status == "waiting"]
@property
def completed(self) -> List[Task]:
return [t for t in self.tasks if t.status == "completed"]
@property
def average_progress(self) -> float:
if not self.tasks:
return 0.0
return sum(t.progress_percent for t in self.tasks) / len(self.tasks)
@property
def total_remaining_seconds(self) -> float:
remaining = [t.remaining_seconds for t in self.tasks if t.remaining_seconds]
return sum(remaining) if remaining else 0.0
class BoincClient:
def __init__(
self,
host: str = "localhost",
port: int = 31416,
password: Optional[str] = None,
boinccmd_path: str = "boinccmd",
sample_output: Optional[str] = None,
) -> None:
self.host = host
self.port = port
self.password = password
self.boinccmd_path = boinccmd_path
self.sample_output = Path(sample_output) if sample_output else None
async def fetch_snapshot(self) -> BoincSnapshot:
raw = await self._fetch_raw()
tasks = parse_boinccmd_tasks(raw)
return BoincSnapshot(tasks=tasks, fetched_at=datetime.now(timezone.utc), raw=raw)
async def _fetch_raw(self) -> str:
if self.sample_output:
return self.sample_output.read_text()
cmd = [self.boinccmd_path, "--host", f"{self.host}:{self.port}", "--get_tasks"]
if self.password:
cmd.extend(["--passwd", self.password])
try:
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
except FileNotFoundError as exc:
raise RuntimeError(f"boinccmd not found at '{self.boinccmd_path}'") from exc
stdout, stderr = await process.communicate()
if process.returncode != 0:
err_text = stderr.decode() if stderr else "unknown error"
raise RuntimeError(f"boinccmd failed: {err_text.strip()}")
return stdout.decode()
def parse_boinccmd_tasks(output: str) -> List[Task]:
tasks: List[Task] = []
current: Dict[str, str] = {}
for line in output.splitlines():
stripped = line.strip()
if not stripped:
continue
if re.match(r"^\d+\)", stripped):
if current:
task = _task_from_dict(current)
if task:
tasks.append(task)
current = {}
continue
if stripped.startswith("========"):
continue
if ":" not in stripped:
continue
key, value = stripped.split(":", 1)
current[key.strip().lower()] = value.strip()
if current:
task = _task_from_dict(current)
if task:
tasks.append(task)
return tasks
def _task_from_dict(data: Dict[str, str]) -> Optional[Task]:
name = data.get("name", "unknown")
project = data.get("project url", "unknown project")
fraction_done = _parse_float(data.get("fraction done", "0"))
elapsed = _parse_float(data.get("elapsed time", "0"))
remaining = _parse_float(data.get("estimated cpu time remaining", "0"))
deadline = _parse_deadline(data.get("report deadline"))
resources = data.get("resources")
slot = data.get("slot")
status = _deduce_status(data)
return Task(
name=name,
project=project,
status=status,
fraction_done=fraction_done,
elapsed_seconds=elapsed,
remaining_seconds=remaining,
deadline=deadline,
resources=resources,
slot=slot,
)
def _deduce_status(data: Dict[str, str]) -> str:
ready = data.get("ready to report", "").lower() == "yes"
if ready:
return "completed"
active_state = data.get("active_task_state") or data.get("active task state")
if active_state:
active_state = active_state.upper()
if active_state in {"EXECUTING", "RUNNING", "READY"}:
return "active"
if active_state in {"SUSPENDED", "UNINITIALIZED", "SUSPENDED_VIA_GUI"}:
return "waiting"
scheduler_state = data.get("scheduler state") or data.get("state")
if scheduler_state and scheduler_state.strip().isdigit():
# 1 is typically waiting to run, 2 is executing
try:
sched_val = int(scheduler_state.strip())
if sched_val >= 2:
return "active"
if sched_val == 1:
return "waiting"
except ValueError:
pass
return "waiting"
def _parse_float(value: Optional[str]) -> float:
if not value:
return 0.0
try:
return float(value.split()[0])
except ValueError:
return 0.0
def _parse_deadline(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
# Example: Tue Jan 9 05:58:07 2024
return datetime.strptime(value, "%a %b %d %H:%M:%S %Y").replace(tzinfo=timezone.utc)
except Exception:
logger.debug("Could not parse deadline: %s", value)
return None