mirror of
https://github.com/umbra2728/ctfd-mcp.git
synced 2026-02-07 22:08:12 +03:00
761 lines
29 KiB
Python
761 lines
29 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from html import unescape
|
|
from html.parser import HTMLParser
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from .config import Config
|
|
|
|
|
|
class CTFdClientError(Exception):
|
|
"""Base error for CTFd client issues."""
|
|
|
|
|
|
class AuthError(CTFdClientError):
|
|
"""Authentication or authorization error."""
|
|
|
|
|
|
class NotFoundError(CTFdClientError):
|
|
"""Requested resource not found."""
|
|
|
|
|
|
class RateLimitError(CTFdClientError):
|
|
"""CTFd rate limit hit."""
|
|
|
|
def __init__(self, message: str, retry_after: str | None = None):
|
|
super().__init__(message)
|
|
self.retry_after = retry_after
|
|
|
|
|
|
class CTFdClient:
|
|
"""Thin async wrapper over the CTFd REST API for user-level actions."""
|
|
|
|
def __init__(self, config: Config, timeout: float | httpx.Timeout | None = None):
|
|
self.config = config
|
|
if timeout is None:
|
|
# generous defaults to avoid TLS stalls while still failing eventually
|
|
timeout = httpx.Timeout(
|
|
config.total_timeout if config.total_timeout is not None else 20.0,
|
|
connect=config.connect_timeout
|
|
if config.connect_timeout is not None
|
|
else 10.0,
|
|
read=config.read_timeout if config.read_timeout is not None else 15.0,
|
|
)
|
|
# Force h1 and send explicit Accept/UA to reduce chances of HTML/redirect responses.
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"User-Agent": "ctfd-mcp/0.1 (+https://github.com/)",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
**config.auth_header,
|
|
}
|
|
cookies = {}
|
|
if config.session_cookie:
|
|
cookies["session"] = config.session_cookie
|
|
self._client = httpx.AsyncClient(
|
|
base_url=config.base_url,
|
|
headers=headers,
|
|
timeout=timeout,
|
|
http2=False, # some CTFd deployments can stall on HTTP/2; force h1
|
|
follow_redirects=True,
|
|
cookies=cookies or None,
|
|
)
|
|
self._csrf_token = config.csrf_token
|
|
self._username = config.username
|
|
self._password = config.password
|
|
self._has_logged_in = False
|
|
|
|
async def aclose(self) -> None:
|
|
await self._client.aclose()
|
|
|
|
async def _refresh_csrf_token(self) -> None:
|
|
"""Fetch a fresh CSRF token after login; many CTFd installs require it for attempts."""
|
|
try:
|
|
resp = await self._client.get("/api/v1/csrf_token")
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
token = data.get("data", {}).get("csrf_token") or data.get("csrf_token")
|
|
if token:
|
|
self._csrf_token = token
|
|
except Exception:
|
|
pass
|
|
# parse nonce from challenges page (some deployers use this specific nonce)
|
|
try:
|
|
resp = await self._client.get("/challenges")
|
|
resp.raise_for_status()
|
|
token = self._extract_nonce(resp.text)
|
|
if token:
|
|
self._csrf_token = token
|
|
except Exception:
|
|
return
|
|
|
|
def _extract_nonce(self, html: str) -> str:
|
|
for pat in [r'name="nonce" value="([^"]+)"', r"'csrfNonce': \"([^\"]+)\""]:
|
|
m = re.search(pat, html)
|
|
if m:
|
|
return m.group(1)
|
|
return ""
|
|
|
|
async def _login(self) -> None:
|
|
if not (self._username and self._password):
|
|
return
|
|
try:
|
|
# Fetch login page to grab nonce like a browser.
|
|
page = await self._client.get("/login")
|
|
nonce = self._extract_nonce(page.text)
|
|
headers = {"Referer": f"{self.config.base_url}/login"}
|
|
if nonce:
|
|
headers["CSRF-Token"] = nonce
|
|
resp = await self._client.post(
|
|
"/login",
|
|
data={
|
|
"name": self._username,
|
|
"password": self._password,
|
|
"nonce": nonce,
|
|
},
|
|
headers=headers,
|
|
follow_redirects=True,
|
|
)
|
|
if resp.status_code not in (200, 302, 303, 403):
|
|
raise AuthError(
|
|
f"Login failed with provided credentials (code {resp.status_code})."
|
|
)
|
|
if nonce:
|
|
self._csrf_token = nonce
|
|
await self._refresh_csrf_token()
|
|
self._has_logged_in = True
|
|
except AuthError:
|
|
raise
|
|
except Exception as exc: # noqa: BLE001
|
|
raise AuthError(f"Login failed: {exc}")
|
|
|
|
async def _ensure_login(self) -> None:
|
|
"""
|
|
Ensure username/password login is performed when no token/session is present.
|
|
A no-op when token or session cookie is configured.
|
|
"""
|
|
if (
|
|
not self.config.token
|
|
and not self.config.session_cookie
|
|
and self._username
|
|
and self._password
|
|
and not self._has_logged_in
|
|
):
|
|
await self._login()
|
|
|
|
async def _ensure_csrf_token(self) -> None:
|
|
"""Refresh CSRF token if missing."""
|
|
if not self._csrf_token:
|
|
await self._refresh_csrf_token()
|
|
|
|
def _csrf_required(self) -> bool:
|
|
"""
|
|
CSRF is typically enforced for cookie-authenticated requests (session or browser login).
|
|
API-token authenticated requests generally do not require CSRF.
|
|
"""
|
|
return bool(self.config.session_cookie or self._has_logged_in)
|
|
|
|
async def list_challenges(
|
|
self, category: str | None = None, only_unsolved: bool = False
|
|
) -> list[dict[str, Any]]:
|
|
"""List challenges the user can see, optionally filtered by category and solve state."""
|
|
payload = await self._request("GET", "/api/v1/challenges")
|
|
challenges = payload.get("data") or []
|
|
|
|
normalized = []
|
|
for item in challenges:
|
|
if category and (item.get("category") or "").lower() != category.lower():
|
|
continue
|
|
|
|
solved = item.get("solved")
|
|
if solved is None:
|
|
solved = item.get("solved_by_me")
|
|
if only_unsolved and solved:
|
|
continue
|
|
|
|
normalized.append(
|
|
{
|
|
"id": item.get("id"),
|
|
"name": item.get("name"),
|
|
"category": item.get("category"),
|
|
"value": item.get("value"),
|
|
"solved": solved if solved is not None else item.get("solved"),
|
|
"type": item.get("type"),
|
|
"tags": item.get("tags") or [],
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
async def get_challenge(self, challenge_id: int) -> dict[str, Any]:
|
|
"""Get challenge details including description and attachment URLs."""
|
|
payload = await self._request("GET", f"/api/v1/challenges/{challenge_id}")
|
|
data = payload.get("data") or {}
|
|
|
|
description = data.get("description")
|
|
|
|
return {
|
|
"id": data.get("id"),
|
|
"name": data.get("name"),
|
|
"category": data.get("category"),
|
|
"value": data.get("value"),
|
|
"solved": data.get("solved")
|
|
if data.get("solved") is not None
|
|
else data.get("solved_by_me"),
|
|
"solved_by_me": data.get("solved_by_me"),
|
|
"description": description,
|
|
"description_text": _html_to_text(description) if description else None,
|
|
"connection_info": data.get("connection_info"),
|
|
"state": data.get("state"),
|
|
"type": data.get("type"),
|
|
"tags": data.get("tags") or [],
|
|
"files": [self._full_url(f) for f in data.get("files") or []],
|
|
"requirements": data.get("requirements"),
|
|
}
|
|
|
|
async def submit_flag(self, challenge_id: int, flag: str) -> dict[str, Any]:
|
|
"""Attempt a flag submission for a challenge."""
|
|
payload = await self._request(
|
|
"POST",
|
|
"/api/v1/challenges/attempt",
|
|
json={"challenge_id": challenge_id, "submission": flag},
|
|
)
|
|
data = payload.get("data") or {}
|
|
return {
|
|
"status": data.get("status"),
|
|
"message": data.get("message"),
|
|
}
|
|
|
|
async def start_dynamic_container(self, challenge_id: int) -> dict[str, Any]:
|
|
"""Start a dynamic_docker instance (ctfd-whale)."""
|
|
payload = await self._request(
|
|
"POST",
|
|
"/api/v1/containers",
|
|
json={"challenge_id": challenge_id},
|
|
)
|
|
# Some ctfd-whale deployments return connection details at the top level; prefer data if present.
|
|
data = payload.get("data") or payload or {}
|
|
host = data.get("host") or data.get("docker_server")
|
|
port = data.get("port")
|
|
connection_info = data.get("connection_info")
|
|
if not connection_info and host and port:
|
|
connection_info = f"{host}:{port}"
|
|
return self._trim_none(
|
|
{
|
|
"id": data.get("id"),
|
|
"challenge_id": data.get("challenge_id"),
|
|
"state": data.get("state"),
|
|
"connection_info": connection_info,
|
|
"ip": data.get("ip"),
|
|
"port": port,
|
|
"host": host,
|
|
"container_id": data.get("container_id"),
|
|
"created": data.get("created"),
|
|
"raw": data,
|
|
}
|
|
)
|
|
|
|
async def stop_dynamic_container(self, container_id: int) -> dict[str, Any]:
|
|
"""Stop a dynamic_docker instance."""
|
|
payload = await self._request(
|
|
"DELETE",
|
|
f"/api/v1/containers/{container_id}",
|
|
)
|
|
data = payload.get("data") or payload or {}
|
|
message = data.get("message") or data or "stopped"
|
|
if isinstance(message, dict):
|
|
message = message.get("message") or str(message)
|
|
return self._trim_none(
|
|
{
|
|
"status": data.get("status") or payload.get("success"),
|
|
"message": message,
|
|
"raw": data,
|
|
}
|
|
)
|
|
|
|
async def start_k8s_container(self, challenge_id: int) -> dict[str, Any]:
|
|
"""
|
|
Start a Kubernetes-backed instance exposed via /api/v1/k8s endpoints.
|
|
Uses multipart form with nonce/CSRF token when available, then polls the GET endpoint
|
|
for connection details.
|
|
"""
|
|
files = {"challenge_id": (None, str(challenge_id))}
|
|
if self._csrf_token:
|
|
files["nonce"] = (None, self._csrf_token)
|
|
|
|
await self._k8s_request(
|
|
"POST", "/api/v1/k8s/create", files=files, action="start"
|
|
)
|
|
status = await self._get_k8s_container(challenge_id)
|
|
return self._trim_none(status)
|
|
|
|
async def stop_k8s_container(self, challenge_id: int) -> dict[str, Any]:
|
|
"""
|
|
Stop a Kubernetes-backed instance exposed via /api/v1/k8s endpoints.
|
|
"""
|
|
files = {"challenge_id": (None, str(challenge_id))}
|
|
if self._csrf_token:
|
|
files["nonce"] = (None, self._csrf_token)
|
|
|
|
await self._k8s_request(
|
|
"POST", "/api/v1/k8s/delete", files=files, action="stop"
|
|
)
|
|
status = await self._get_k8s_container(challenge_id)
|
|
status["status"] = (
|
|
"stopped" if not status.get("instance_running") else "running"
|
|
)
|
|
return self._trim_none(status)
|
|
|
|
async def _get_k8s_container(self, challenge_id: int) -> dict[str, Any]:
|
|
resp = await self._k8s_request(
|
|
"GET",
|
|
"/api/v1/k8s/get",
|
|
params={"challenge_id": challenge_id},
|
|
action="get",
|
|
)
|
|
try:
|
|
payload = resp.json()
|
|
except ValueError as exc: # noqa: BLE001
|
|
snippet = resp.text[:500] if resp.text else "<empty>"
|
|
raise CTFdClientError(
|
|
f"CTFd returned non-JSON response from k8s get (status {resp.status_code}): {snippet}"
|
|
) from exc
|
|
if not isinstance(payload, dict):
|
|
raise CTFdClientError("Unexpected response shape from k8s get endpoint.")
|
|
return self._parse_k8s_payload(payload, challenge_id)
|
|
|
|
def _parse_k8s_payload(
|
|
self, data: dict[str, Any], challenge_id: int
|
|
) -> dict[str, Any]:
|
|
connection_url = data.get("ConnectionURL")
|
|
port = data.get("ConnectionPort")
|
|
connection_info = connection_url
|
|
if not connection_info and port:
|
|
host = data.get("ConnectionHost") or data.get("host")
|
|
if host:
|
|
connection_info = f"{host}:{port}"
|
|
|
|
expires_at = data.get("ExpireTime")
|
|
state = "running" if data.get("InstanceRunning") else "stopped"
|
|
|
|
return {
|
|
"challenge_id": challenge_id,
|
|
"connection_info": connection_info,
|
|
"connection_url": connection_url,
|
|
"connection_port": port,
|
|
"expires_at": expires_at,
|
|
"instance_running": data.get("InstanceRunning"),
|
|
"is_current_instance": data.get("ThisChallengeInstance"),
|
|
"extend_available": data.get("ExtendAvailable"),
|
|
"state": state,
|
|
"raw": data,
|
|
}
|
|
|
|
def _raise_for_k8s_response(self, response: httpx.Response, action: str) -> None:
|
|
if response.status_code in (401, 403):
|
|
raise AuthError(
|
|
"Unauthorized for k8s API. Check session cookie and CSRF token."
|
|
)
|
|
if response.status_code == 404:
|
|
raise NotFoundError("K8s API not found on this CTFd instance.")
|
|
if response.status_code == 429:
|
|
raise RateLimitError(
|
|
"Rate limit reached on k8s API. Retry later.",
|
|
retry_after=response.headers.get("Retry-After"),
|
|
)
|
|
if response.status_code >= 400:
|
|
raise CTFdClientError(
|
|
f"k8s {action} failed with status {response.status_code}: {response.text}"
|
|
)
|
|
|
|
async def _k8s_request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
params: dict[str, Any] | None = None,
|
|
files: dict[str, tuple[str | None, str]] | None = None,
|
|
headers: dict[str, str] | None = None,
|
|
action: str = "request",
|
|
_retried: bool = False,
|
|
) -> httpx.Response:
|
|
await self._ensure_login()
|
|
# Refresh CSRF each time to align with per-page nonce used by some k8s deployers.
|
|
await self._refresh_csrf_token()
|
|
request_headers = self._k8s_headers()
|
|
if headers:
|
|
request_headers.update(headers)
|
|
files_with_nonce = files.copy() if files else None
|
|
if files_with_nonce is not None and self._csrf_token:
|
|
files_with_nonce.setdefault("nonce", (None, self._csrf_token))
|
|
# Ensure Referer/Origin headers include /challenges to match browser flows.
|
|
request_headers.setdefault("Referer", f"{self.config.base_url}/challenges")
|
|
request_headers.setdefault("Origin", self.config.base_url)
|
|
response = await self._client.request(
|
|
method,
|
|
path,
|
|
params=params,
|
|
files=files_with_nonce,
|
|
headers=request_headers,
|
|
follow_redirects=False,
|
|
)
|
|
if response.status_code in (401, 403):
|
|
if _retried:
|
|
self._raise_for_k8s_response(response, action=action)
|
|
# refresh CSRF token and retry once (also re-login if credentials exist)
|
|
if self._username and self._password:
|
|
await self._login()
|
|
else:
|
|
await self._refresh_csrf_token()
|
|
if files_with_nonce is not None and self._csrf_token:
|
|
files_with_nonce["nonce"] = (None, self._csrf_token)
|
|
return await self._k8s_request(
|
|
method,
|
|
path,
|
|
params=params,
|
|
files=files_with_nonce,
|
|
headers=headers,
|
|
action=action,
|
|
_retried=True,
|
|
)
|
|
# k8s create/delete endpoints respond with 302 back to /challenges; treat as success.
|
|
if 300 <= response.status_code < 400:
|
|
return response
|
|
self._raise_for_k8s_response(response, action=action)
|
|
return response
|
|
|
|
def _k8s_headers(self) -> dict[str, str]:
|
|
headers: dict[str, str] = {
|
|
"Origin": self.config.base_url,
|
|
"Referer": f"{self.config.base_url}/challenges",
|
|
}
|
|
if self._csrf_token:
|
|
headers["CSRF-Token"] = self._csrf_token
|
|
return headers
|
|
|
|
async def start_owl_container(self, challenge_id: int) -> dict[str, Any]:
|
|
"""
|
|
Start a dynamic_check_docker (ctfd-owl) instance.
|
|
Requires session cookie; may require CSRF token depending on server config.
|
|
"""
|
|
headers = self._owl_headers()
|
|
|
|
initial_payload = await self._request(
|
|
"POST",
|
|
"/plugins/ctfd-owl/container",
|
|
params={"challenge_id": challenge_id},
|
|
json={},
|
|
headers=headers,
|
|
)
|
|
data = initial_payload.get("data") or initial_payload or {}
|
|
|
|
parsed = self._parse_owl_container(data, challenge_id)
|
|
|
|
# Some ctfd-owl versions return only {"success": true} on POST; fetch details with GET.
|
|
if not self._has_owl_details(parsed) and data.get("success"):
|
|
poll_payload = await self._request(
|
|
"GET",
|
|
"/plugins/ctfd-owl/container",
|
|
params={"challenge_id": challenge_id},
|
|
headers=headers,
|
|
)
|
|
poll_data = poll_payload.get("data") or poll_payload or {}
|
|
polled = self._parse_owl_container(poll_data, challenge_id)
|
|
if self._has_owl_details(polled):
|
|
polled["raw"] = poll_data
|
|
return self._trim_none(polled)
|
|
|
|
return self._trim_none(parsed)
|
|
|
|
def _parse_owl_container(
|
|
self, data: dict[str, Any], challenge_id: int
|
|
) -> dict[str, Any]:
|
|
# ctfd-owl returns connection details inside containers_data.
|
|
containers = data.get("containers_data") or data.get("containers") or []
|
|
container_info = containers[0] if containers else {}
|
|
|
|
host = container_info.get("lan_domain") or container_info.get("host")
|
|
ip = data.get("ip") or container_info.get("ip") or host
|
|
port = container_info.get("port") or data.get("port")
|
|
conntype = container_info.get("conntype") or data.get("conntype")
|
|
connect_host = data.get("ip") or container_info.get("ip") or host
|
|
# Prefer the externally reachable IP/host over LAN domain in connection_info.
|
|
connection_info = data.get("connection_info") or (
|
|
f"{connect_host}:{port}" if connect_host and port else None
|
|
)
|
|
if not connection_info and conntype and host and port:
|
|
connection_info = f"{conntype}://{host}:{port}"
|
|
|
|
return {
|
|
"id": data.get("id") or container_info.get("id"),
|
|
"challenge_id": data.get("challenge_id") or challenge_id,
|
|
"state": data.get("state") or container_info.get("state"),
|
|
"connection_info": connection_info,
|
|
"ip": ip,
|
|
"port": port,
|
|
"host": host,
|
|
"conntype": conntype,
|
|
"remaining_time": container_info.get("remaining_time")
|
|
or data.get("remaining_time"),
|
|
"container_id": container_info.get("container_id")
|
|
or data.get("container_id"),
|
|
"created": data.get("created") or container_info.get("created"),
|
|
"raw": data,
|
|
}
|
|
|
|
@staticmethod
|
|
def _has_owl_details(parsed: dict[str, Any]) -> bool:
|
|
return any(
|
|
parsed.get(key)
|
|
for key in (
|
|
"connection_info",
|
|
"host",
|
|
"ip",
|
|
"port",
|
|
"container_id",
|
|
"remaining_time",
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def _is_k8s_type(ctype: str | None) -> bool:
|
|
if not ctype:
|
|
return False
|
|
lowered = ctype.lower()
|
|
return "k8s" in lowered or "kube" in lowered
|
|
|
|
@staticmethod
|
|
def _trim_none(values: dict[str, Any]) -> dict[str, Any]:
|
|
"""Drop keys with None to keep responses compact."""
|
|
return {k: v for k, v in values.items() if v is not None}
|
|
|
|
async def stop_owl_container(self, challenge_id: int) -> dict[str, Any]:
|
|
"""
|
|
Stop a dynamic_check_docker (ctfd-owl) instance.
|
|
Uses the same endpoint with DELETE and challenge_id.
|
|
"""
|
|
headers = self._owl_headers()
|
|
payload = await self._request(
|
|
"DELETE",
|
|
"/plugins/ctfd-owl/container",
|
|
params={"challenge_id": challenge_id},
|
|
headers=headers,
|
|
)
|
|
data = payload.get("data") or payload
|
|
message = data.get("message") or data or "stopped"
|
|
if isinstance(message, dict):
|
|
message = message.get("message") or str(message)
|
|
return self._trim_none(
|
|
{
|
|
"status": data.get("status") or payload.get("success"),
|
|
"message": message,
|
|
"raw": data,
|
|
}
|
|
)
|
|
|
|
async def start_container(self, challenge_id: int) -> dict[str, Any]:
|
|
"""
|
|
Unified start: detects challenge type and calls the appropriate backend.
|
|
- dynamic_docker -> ctfd-whale /api/v1/containers
|
|
- dynamic_check_docker -> ctfd-owl /plugins/ctfd-owl/container
|
|
- k8s-backed -> /api/v1/k8s (form-based)
|
|
"""
|
|
details = await self.get_challenge(challenge_id)
|
|
ctype = (details.get("type") or "").lower()
|
|
if self._is_k8s_type(ctype):
|
|
return await self.start_k8s_container(challenge_id)
|
|
if ctype == "dynamic_docker":
|
|
try:
|
|
return await self.start_dynamic_container(challenge_id)
|
|
except NotFoundError:
|
|
# Some events expose dynamic challenges via /api/v1/k8s while keeping the same type.
|
|
return await self.start_k8s_container(challenge_id)
|
|
if ctype == "dynamic_check_docker":
|
|
return await self.start_owl_container(challenge_id)
|
|
raise CTFdClientError(
|
|
f"Unsupported challenge type '{ctype}' for container start."
|
|
)
|
|
|
|
async def stop_container(
|
|
self, *, container_id: int | None = None, challenge_id: int | None = None
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Unified stop:
|
|
- dynamic_docker: requires container_id
|
|
- dynamic_check_docker (owl): uses challenge_id
|
|
- k8s-backed: uses challenge_id
|
|
If no challenge_id is given, assumes dynamic_docker and stops by container_id.
|
|
"""
|
|
if container_id is None and challenge_id is None:
|
|
raise CTFdClientError(
|
|
"Provide container_id or challenge_id to stop a container."
|
|
)
|
|
|
|
ctype: str | None = None
|
|
if challenge_id is not None:
|
|
details = await self.get_challenge(challenge_id)
|
|
ctype = (details.get("type") or "").lower()
|
|
|
|
if self._is_k8s_type(ctype):
|
|
if challenge_id is None:
|
|
raise CTFdClientError("k8s stop requires challenge_id.")
|
|
return await self.stop_k8s_container(challenge_id)
|
|
if ctype == "dynamic_docker":
|
|
if container_id is None:
|
|
raise CTFdClientError("dynamic_docker stop requires container_id.")
|
|
try:
|
|
return await self.stop_dynamic_container(container_id)
|
|
except NotFoundError:
|
|
if challenge_id is not None:
|
|
return await self.stop_k8s_container(challenge_id)
|
|
raise
|
|
if ctype == "dynamic_check_docker":
|
|
if challenge_id is None:
|
|
raise CTFdClientError("ctfd-owl stop requires challenge_id.")
|
|
return await self.stop_owl_container(challenge_id)
|
|
|
|
if ctype:
|
|
raise CTFdClientError(
|
|
f"Unsupported challenge type '{ctype}' for container stop."
|
|
)
|
|
|
|
# No challenge_id was provided; default to whale-style stop by container_id.
|
|
if container_id is None:
|
|
raise CTFdClientError("dynamic_docker stop requires container_id.")
|
|
return await self.stop_dynamic_container(container_id)
|
|
|
|
def _owl_headers(self) -> dict[str, str]:
|
|
headers: dict[str, str] = {
|
|
"Origin": self.config.base_url,
|
|
"Referer": f"{self.config.base_url}/challenges",
|
|
}
|
|
if self._csrf_token:
|
|
headers["CSRF-Token"] = self._csrf_token
|
|
return headers
|
|
|
|
async def _request(
|
|
self, method: str, path: str, _retried: bool = False, **kwargs: Any
|
|
) -> dict[str, Any]:
|
|
# Lazy login if we only have username/password and no session/token.
|
|
await self._ensure_login()
|
|
|
|
method_upper = method.upper()
|
|
if (
|
|
method_upper in {"POST", "PUT", "PATCH", "DELETE"}
|
|
and self._csrf_required()
|
|
):
|
|
await self._ensure_csrf_token()
|
|
|
|
# Attach CSRF token to all requests when available (some CTFd deployments require it even for API).
|
|
headers = kwargs.setdefault("headers", {})
|
|
if self._csrf_token and "CSRF-Token" not in headers:
|
|
headers["CSRF-Token"] = self._csrf_token
|
|
if self._csrf_token and "Referer" not in headers:
|
|
headers["Referer"] = f"{self.config.base_url}/challenges"
|
|
|
|
try:
|
|
response = await self._client.request(method, path, **kwargs)
|
|
except httpx.RequestError as exc:
|
|
raise CTFdClientError(
|
|
f"Network error talking to CTFd ({exc.__class__.__name__}): {exc}"
|
|
) from exc
|
|
|
|
if (
|
|
response.status_code == 403
|
|
and self._csrf_required()
|
|
and not _retried
|
|
and method_upper in {"POST", "PUT", "PATCH", "DELETE"}
|
|
):
|
|
# Cookie-authenticated flows can require a per-session/per-page CSRF token/nonce.
|
|
# Refresh once and retry to avoid hard failures on CSRF-enforced deployments.
|
|
await self._refresh_csrf_token()
|
|
headers = kwargs.setdefault("headers", {})
|
|
if self._csrf_token:
|
|
headers["CSRF-Token"] = self._csrf_token
|
|
headers.setdefault("Referer", f"{self.config.base_url}/challenges")
|
|
response = await self._client.request(method, path, **kwargs)
|
|
|
|
if response.status_code == 401 and self._username and self._password:
|
|
# For username/password flows, re-login and retry once to avoid infinite recursion.
|
|
if _retried:
|
|
raise AuthError("Unauthorized after re-login. Check credentials.")
|
|
await self._login()
|
|
return await self._request(method, path, _retried=True, **kwargs)
|
|
|
|
if 300 <= response.status_code < 400:
|
|
raise AuthError(
|
|
f"Unexpected redirect ({response.status_code}) to {response.headers.get('Location')}. "
|
|
"Check token and CTFD_URL."
|
|
)
|
|
if response.status_code in (401, 403):
|
|
raise AuthError("Unauthorized. Check CTFD_TOKEN permissions or value.")
|
|
if response.status_code == 404:
|
|
raise NotFoundError("Resource not found.")
|
|
if response.status_code == 429:
|
|
raise RateLimitError(
|
|
"Rate limit reached. Retry later.",
|
|
retry_after=response.headers.get("Retry-After"),
|
|
)
|
|
if response.status_code >= 400:
|
|
raise CTFdClientError(f"CTFd error {response.status_code}: {response.text}")
|
|
|
|
try:
|
|
payload = response.json()
|
|
except ValueError as exc:
|
|
snippet = response.text[:500] if response.text else "<empty>"
|
|
ctype = response.headers.get("Content-Type")
|
|
raise CTFdClientError(
|
|
f"CTFd returned non-JSON response (status {response.status_code}, "
|
|
f"content-type={ctype}): {snippet}"
|
|
) from exc
|
|
|
|
if isinstance(payload, dict) and payload.get("success") is False:
|
|
message = (
|
|
payload.get("message")
|
|
or payload.get("data")
|
|
or "CTFd API reported failure"
|
|
)
|
|
raise CTFdClientError(str(message))
|
|
|
|
if not isinstance(payload, dict):
|
|
raise CTFdClientError("Unexpected response shape from CTFd.")
|
|
|
|
return payload
|
|
|
|
def _full_url(self, path: str) -> str:
|
|
if path.startswith("http://") or path.startswith("https://"):
|
|
return path
|
|
return f"{self.config.base_url}/{path.lstrip('/')}"
|
|
|
|
|
|
class _HTMLToTextParser(HTMLParser):
|
|
"""Lightweight HTML -> text converter to keep responses chat-friendly."""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._parts: list[str] = []
|
|
|
|
def handle_starttag(self, tag: str, attrs: list[tuple[str, str]]):
|
|
if tag in {"p", "div", "br", "li"}:
|
|
self._parts.append("\n")
|
|
|
|
def handle_endtag(self, tag: str):
|
|
if tag in {"p", "div", "li"}:
|
|
self._parts.append("\n")
|
|
|
|
def handle_data(self, data: str):
|
|
self._parts.append(data)
|
|
|
|
def get_text(self) -> str:
|
|
text = unescape("".join(self._parts))
|
|
# Collapse excessive blank lines but keep intentional spacing.
|
|
lines = [line.strip() for line in text.splitlines()]
|
|
return "\n".join([line for line in lines if line])
|
|
|
|
|
|
def _html_to_text(value: str) -> str:
|
|
parser = _HTMLToTextParser()
|
|
parser.feed(value)
|
|
return parser.get_text().strip()
|