Files
ad-infr-control/scoreboard_injector/main.py
ilyastar9999 c5bcfab44c Update main.py
2025-12-03 10:50:18 +03:00

591 lines
25 KiB
Python

"""
Scoreboard Injector for ForcAD
Monitors WebSocket for attacks and alerts on critical situations
"""
import os
import json
import asyncio
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import aiohttp
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
import asyncpg
from contextlib import asynccontextmanager
# Configuration
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl")
SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production")
SCOREBOARD_WS_URL = os.getenv("SCOREBOARD_WS_URL", "ws://scoreboard:8080/api/events")
SCOREBOARD_API_URL = os.getenv("SCOREBOARD_API_URL", "http://10.60.0.1:8080/api")
USE_HTTP_POLLING = os.getenv("USE_HTTP_POLLING", "false").lower() == "true"
POLLING_INTERVAL = int(os.getenv("POLLING_INTERVAL", "10")) # seconds
OUR_TEAM_ID = int(os.getenv("OUR_TEAM_ID", "1"))
ALERT_THRESHOLD_POINTS = float(os.getenv("ALERT_THRESHOLD_POINTS", "100"))
ALERT_THRESHOLD_TIME = int(os.getenv("ALERT_THRESHOLD_TIME", "300")) # seconds
TELEGRAM_API_URL = os.getenv("TELEGRAM_API_URL", "http://tg-bot:8003/send")
# Database pool
db_pool = None
ws_task = None
class AttackStats(BaseModel):
total_attacks: int
attacks_by_us: int
attacks_to_us: int
recent_attacks: int
critical_alerts: int
# Auth dependency
async def verify_token(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid authorization header")
token = authorization.replace("Bearer ", "")
if token != SECRET_TOKEN:
raise HTTPException(status_code=403, detail="Invalid token")
return token
# Database functions
async def get_db():
return await db_pool.acquire()
async def release_db(conn):
await db_pool.release(conn)
async def process_attack_event(event: Dict[str, Any]):
"""Process attack event from scoreboard"""
conn = await db_pool.acquire()
try:
# Extract attack information from event
# Handle multiple possible event formats from ForcAD
event_type = event.get('type', 'unknown')
# Try to extract attacker/victim IDs from various possible fields
attacker_id = event.get('attacker_id') or event.get('team_id') or event.get('attacker')
victim_id = event.get('victim_id') or event.get('target_id') or event.get('victim') or event.get('target')
# Skip if we don't have both attacker and victim
if attacker_id is None or victim_id is None:
print(f"Skipping event with missing attacker/victim: {event}")
return
# Convert to integers if they're strings
try:
attacker_id = int(attacker_id)
victim_id = int(victim_id)
except (ValueError, TypeError):
print(f"Invalid team IDs in event: attacker={attacker_id}, victim={victim_id}")
return
service_name = event.get('service') or event.get('service_name') or event.get('task_name') or 'unknown'
flag = event.get('flag', '')
# Handle timestamp
time_str = event.get('time') or event.get('timestamp')
if time_str:
try:
# Try parsing ISO format
timestamp = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
except (ValueError, AttributeError):
# Try parsing as Unix timestamp
try:
timestamp = datetime.fromtimestamp(float(time_str))
except (ValueError, TypeError):
timestamp = datetime.utcnow()
else:
timestamp = datetime.utcnow()
# Extract points (might be in different fields)
points = float(event.get('points', 0) or event.get('score', 0) or 1.0)
# Generate unique attack ID
round_num = event.get('round', event.get('round_id', 0))
attack_id = event.get('id') or f"{round_num}_{attacker_id}_{victim_id}_{service_name}_{int(timestamp.timestamp())}"
is_our_attack = attacker_id == OUR_TEAM_ID
is_attack_to_us = victim_id == OUR_TEAM_ID
# Only log if it involves our team
if is_our_attack or is_attack_to_us:
# Store attack in database
inserted = await conn.fetchval("""
INSERT INTO attacks (attack_id, attacker_team_id, victim_team_id, service_name, flag, timestamp, points, is_our_attack, is_attack_to_us)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (attack_id) DO NOTHING
RETURNING id
""", attack_id, attacker_id, victim_id, service_name, flag, timestamp, points, is_our_attack, is_attack_to_us)
if inserted:
print(f"[{event_type}] Logged attack: Team {attacker_id} -> Team {victim_id} | {service_name} | {points} pts")
# Check for alert conditions if attack is against us
if is_attack_to_us:
await check_and_create_alerts(conn, attacker_id, service_name)
except Exception as e:
print(f"Error processing attack event: {e}")
print(f"Event data: {event}")
finally:
await db_pool.release(conn)
async def check_and_create_alerts(conn, attacker_id: int, service_name: str):
"""Check if we should create an alert for attacks against us"""
threshold_time = datetime.utcnow() - timedelta(seconds=ALERT_THRESHOLD_TIME)
# Check total points lost from this attacker in threshold time
result = await conn.fetchrow("""
SELECT COUNT(*) as attack_count, COALESCE(SUM(points), 0) as total_points
FROM attacks
WHERE is_attack_to_us = true
AND attacker_team_id = $1
AND service_name = $2
AND timestamp > $3
""", attacker_id, service_name, threshold_time)
if result and result['total_points'] >= ALERT_THRESHOLD_POINTS:
# Create alert
alert_message = f"CRITICAL: Team {attacker_id} has stolen {result['total_points']:.2f} points from service {service_name} in the last {ALERT_THRESHOLD_TIME}s ({result['attack_count']} attacks)"
# Check if we already alerted recently
recent_alert = await conn.fetchrow("""
SELECT id FROM attack_alerts
WHERE alert_type = 'high_point_loss'
AND message LIKE $1
AND created_at > $2
""", f"%Team {attacker_id}%{service_name}%", threshold_time)
if not recent_alert:
alert_id = await conn.fetchval("""
INSERT INTO attack_alerts (attack_id, alert_type, severity, message)
VALUES (
(SELECT id FROM attacks WHERE attacker_team_id = $1 AND service_name = $2 ORDER BY timestamp DESC LIMIT 1),
'high_point_loss',
'critical',
$3
)
RETURNING id
""", attacker_id, service_name, alert_message)
# Send to telegram
await send_telegram_alert(alert_message)
# Mark as notified
await conn.execute("UPDATE attack_alerts SET notified = true WHERE id = $1", alert_id)
async def send_telegram_alert(message: str):
"""Send alert to telegram bot"""
try:
async with aiohttp.ClientSession() as session:
async with session.post(
TELEGRAM_API_URL,
json={"message": message},
headers={"Authorization": f"Bearer {SECRET_TOKEN}"}
) as resp:
if resp.status != 200:
print(f"Failed to send telegram alert: {await resp.text()}")
except Exception as e:
print(f"Error sending telegram alert: {e}")
async def http_polling_listener():
"""Poll scoreboard HTTP API for attacks as alternative to WebSocket"""
last_check_time = datetime.utcnow()
while True:
try:
async with aiohttp.ClientSession() as session:
# ForcAD real API endpoints (not Vue.js router paths)
# These should be accessed without /api prefix or with different structure
base_url = SCOREBOARD_API_URL.rstrip('/api').rstrip('/')
endpoints_to_try = [
# Try direct API access patterns
(f'{base_url}/flags', 'GET'),
(f'{base_url}/stolen_flags', 'GET'),
(f'{base_url}/flag_stats', 'GET'),
(f'{base_url}/game_state', 'GET'),
(f'{base_url}/scoreboard', 'GET'),
# Try with /api/ prefix variations
(f'{base_url}/api/flags/', 'GET'),
(f'{base_url}/api/stolen_flags/', 'GET'),
(f'{base_url}/api/game_state/', 'GET'),
]
data_found = False
for endpoint, method in endpoints_to_try:
try:
async with session.get(endpoint, timeout=aiohttp.ClientTimeout(total=5)) as resp:
if resp.status == 200:
content_type = resp.headers.get('Content-Type', '')
# Skip HTML responses
if 'text/html' in content_type:
continue
try:
data = await resp.json()
print(f"✓ Fetched JSON from {endpoint}")
data_found = True
# Process based on response structure
if isinstance(data, list):
# List of attacks/events
for item in data:
if isinstance(item, dict):
await process_attack_event(item)
elif isinstance(data, dict):
# Try common keys for attack data
for key in ['attacks', 'stolen_flags', 'flags', 'events', 'data', 'rows']:
if key in data:
items = data[key]
if isinstance(items, list):
print(f" Found {len(items)} items in '{key}'")
for item in items:
if isinstance(item, dict):
await process_attack_event(item)
break
else:
# Might be a single event or game state
# Check if it contains attack-like data
if any(k in data for k in ['attacker_id', 'victim_id', 'team_id', 'flag']):
await process_attack_event(data)
if data_found:
break # Found valid data, no need to try other endpoints
except (json.JSONDecodeError, aiohttp.ContentTypeError):
# Not JSON, skip
continue
except aiohttp.ClientError as e:
continue
except Exception as e:
print(f"Error fetching {endpoint}: {e}")
continue
if not data_found:
print(f"No valid JSON data found from scoreboard API (last check: {datetime.utcnow().strftime('%H:%M:%S')})")
except Exception as e:
print(f"HTTP polling error: {e}")
await asyncio.sleep(POLLING_INTERVAL)
async def websocket_listener():
"""Listen to scoreboard WebSocket for events"""
reconnect_delay = 5
max_reconnect_delay = 60
while True:
try:
print(f"Attempting to connect to scoreboard WebSocket: {SCOREBOARD_WS_URL}")
async with aiohttp.ClientSession() as session:
try:
async with session.ws_connect(
SCOREBOARD_WS_URL,
timeout=aiohttp.ClientTimeout(total=10)
) as ws:
print(f"✓ Connected to scoreboard WebSocket")
reconnect_delay = 5 # Reset delay on successful connection
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
event = json.loads(msg.data)
event_type = event.get('type', 'unknown')
# Process attack-related events
# ForcAD can send: 'attack', 'flag_stolen', 'stolen_flag', 'flag_submit'
if event_type in ['attack', 'flag_stolen', 'stolen_flag', 'flag_submit', 'flag']:
await process_attack_event(event)
# Optionally log other event types for debugging
# else:
# print(f"Received event type: {event_type}")
except json.JSONDecodeError as e:
print(f"Failed to decode WebSocket message: {msg.data[:200]}")
except Exception as e:
print(f"Error processing event: {e}")
elif msg.type == aiohttp.WSMsgType.CLOSED:
print("WebSocket connection closed by server")
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
except aiohttp.ClientError as e:
print(f"WebSocket client error: {e}")
except asyncio.TimeoutError:
print(f"Connection timeout to {SCOREBOARD_WS_URL}")
except Exception as e:
print(f"WebSocket connection error: {e}")
print(f"Reconnecting in {reconnect_delay} seconds...")
await asyncio.sleep(reconnect_delay)
# Exponential backoff
reconnect_delay = min(reconnect_delay * 1.5, max_reconnect_delay)
# Lifespan context
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_pool, ws_task
db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10)
# Start listener based on configuration
if USE_HTTP_POLLING:
print(f"Starting HTTP polling mode (interval: {POLLING_INTERVAL}s)")
print(f"Scoreboard API: {SCOREBOARD_API_URL}")
ws_task = asyncio.create_task(http_polling_listener())
else:
print(f"Starting WebSocket listener mode")
print(f"Scoreboard WS: {SCOREBOARD_WS_URL}")
ws_task = asyncio.create_task(websocket_listener())
print(f"Our team ID: {OUR_TEAM_ID}")
yield
# Cleanup
if ws_task:
ws_task.cancel()
try:
await ws_task
except asyncio.CancelledError:
pass
await db_pool.close()
app = FastAPI(title="Scoreboard Injector", lifespan=lifespan)
# API Endpoints
@app.get("/health")
async def health_check():
return {
"status": "ok",
"timestamp": datetime.utcnow().isoformat(),
"team_id": OUR_TEAM_ID,
"mode": "http_polling" if USE_HTTP_POLLING else "websocket",
"scoreboard_url": SCOREBOARD_API_URL if USE_HTTP_POLLING else SCOREBOARD_WS_URL
}
@app.get("/stats", dependencies=[Depends(verify_token)])
async def get_stats():
"""Get attack statistics"""
conn = await get_db()
try:
total = await conn.fetchval("SELECT COUNT(*) FROM attacks")
attacks_by_us = await conn.fetchval("SELECT COUNT(*) FROM attacks WHERE is_our_attack = true")
attacks_to_us = await conn.fetchval("SELECT COUNT(*) FROM attacks WHERE is_attack_to_us = true")
threshold_time = datetime.utcnow() - timedelta(minutes=5)
recent = await conn.fetchval("SELECT COUNT(*) FROM attacks WHERE timestamp > $1", threshold_time)
critical_alerts = await conn.fetchval(
"SELECT COUNT(*) FROM attack_alerts WHERE severity = 'critical' AND created_at > $1",
threshold_time
)
return {
"total_attacks": total,
"attacks_by_us": attacks_by_us,
"attacks_to_us": attacks_to_us,
"recent_attacks_5min": recent,
"critical_alerts_5min": critical_alerts
}
finally:
await release_db(conn)
@app.get("/attacks", dependencies=[Depends(verify_token)])
async def get_attacks(limit: int = 100, our_attacks: Optional[bool] = None, attacks_to_us: Optional[bool] = None):
"""Get recent attacks"""
conn = await get_db()
try:
query = "SELECT * FROM attacks WHERE 1=1"
params = []
param_count = 0
if our_attacks is not None:
param_count += 1
query += f" AND is_our_attack = ${param_count}"
params.append(our_attacks)
if attacks_to_us is not None:
param_count += 1
query += f" AND is_attack_to_us = ${param_count}"
params.append(attacks_to_us)
param_count += 1
query += f" ORDER BY timestamp DESC LIMIT ${param_count}"
params.append(limit)
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
finally:
await release_db(conn)
@app.get("/alerts", dependencies=[Depends(verify_token)])
async def get_alerts(limit: int = 50, unnotified: bool = False):
"""Get alerts"""
conn = await get_db()
try:
if unnotified:
query = "SELECT * FROM attack_alerts WHERE notified = false ORDER BY created_at DESC LIMIT $1"
else:
query = "SELECT * FROM attack_alerts ORDER BY created_at DESC LIMIT $1"
rows = await conn.fetch(query, limit)
return [dict(row) for row in rows]
finally:
await release_db(conn)
@app.post("/alerts/{alert_id}/acknowledge", dependencies=[Depends(verify_token)])
async def acknowledge_alert(alert_id: int):
"""Mark alert as acknowledged"""
conn = await get_db()
try:
await conn.execute("UPDATE attack_alerts SET notified = true WHERE id = $1", alert_id)
return {"status": "acknowledged", "alert_id": alert_id}
finally:
await release_db(conn)
@app.get("/attacks/by-service", dependencies=[Depends(verify_token)])
async def get_attacks_by_service():
"""Get attack statistics grouped by service"""
conn = await get_db()
try:
rows = await conn.fetch("""
SELECT
service_name,
COUNT(*) as total_attacks,
COUNT(*) FILTER (WHERE is_our_attack = true) as our_attacks,
COUNT(*) FILTER (WHERE is_attack_to_us = true) as attacks_to_us,
COALESCE(SUM(points) FILTER (WHERE is_our_attack = true), 0) as points_gained,
COALESCE(SUM(points) FILTER (WHERE is_attack_to_us = true), 0) as points_lost
FROM attacks
GROUP BY service_name
ORDER BY total_attacks DESC
""")
return [dict(row) for row in rows]
finally:
await release_db(conn)
@app.post("/settings/team-id", dependencies=[Depends(verify_token)])
async def set_team_id(team_id: int):
"""Update our team ID"""
global OUR_TEAM_ID
OUR_TEAM_ID = team_id
conn = await get_db()
try:
await conn.execute(
"INSERT INTO settings (key, value) VALUES ('our_team_id', $1) ON CONFLICT (key) DO UPDATE SET value = $1",
str(team_id)
)
return {"team_id": team_id}
finally:
await release_db(conn)
@app.get("/settings/team-id", dependencies=[Depends(verify_token)])
async def get_team_id():
"""Get current team ID setting"""
return {"team_id": OUR_TEAM_ID}
@app.post("/test/inject-attack", dependencies=[Depends(verify_token)])
async def inject_test_attack(attacker_id: int, victim_id: int, service: str = "test-service", points: float = 10.0):
"""Manually inject a test attack event for debugging"""
test_event = {
"type": "attack",
"attacker_id": attacker_id,
"victim_id": victim_id,
"service": service,
"flag": "TEST_FLAG_" + datetime.utcnow().isoformat(),
"points": points,
"time": datetime.utcnow().isoformat(),
"round": 1
}
await process_attack_event(test_event)
return {"status": "injected", "event": test_event}
@app.get("/debug/scoreboard", dependencies=[Depends(verify_token)])
async def debug_scoreboard():
"""Check if scoreboard is reachable and show raw data"""
results = {
"mode": "http_polling" if USE_HTTP_POLLING else "websocket",
"endpoints_tested": []
}
try:
async with aiohttp.ClientSession() as session:
# Test WebSocket
results["websocket_url"] = SCOREBOARD_WS_URL
try:
async with session.ws_connect(SCOREBOARD_WS_URL, timeout=aiohttp.ClientTimeout(total=5)) as ws:
results["websocket_status"] = "reachable"
except Exception as e:
results["websocket_status"] = f"unreachable: {str(e)}"
# Test HTTP endpoints - both HTML and API paths
base_url = SCOREBOARD_API_URL.rstrip('/api').rstrip('/')
endpoints = [
f"{base_url}/flags",
f"{base_url}/stolen_flags",
f"{base_url}/flag_stats",
f"{base_url}/game_state",
f"{base_url}/scoreboard",
f"{base_url}/api/flags/",
f"{base_url}/api/stolen_flags/",
f"{base_url}/api/game_state/",
f"{base_url}/api/client/attack_data",
]
for endpoint in endpoints:
try:
async with session.get(endpoint, timeout=aiohttp.ClientTimeout(total=5)) as resp:
content_type = resp.headers.get('Content-Type', '')
is_json = 'application/json' in content_type
is_html = 'text/html' in content_type
result = {
"url": endpoint,
"status": resp.status,
"reachable": resp.status == 200,
"content_type": content_type,
"is_json": is_json,
"is_html": is_html
}
if resp.status == 200:
if is_json:
try:
data = await resp.json()
result["json_preview"] = str(data)[:500]
result["json_keys"] = list(data.keys()) if isinstance(data, dict) else "list"
except:
pass
else:
text = await resp.text()
result["data_preview"] = text[:200]
results["endpoints_tested"].append(result)
except Exception as e:
results["endpoints_tested"].append({
"url": endpoint,
"reachable": False,
"error": str(e)
})
except Exception as e:
results["error"] = str(e)
return results
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)