""" Scoreboard Injector for ForcAD Monitors Socket.IO events for attacks and alerts on critical situations """ import os import json import asyncio from datetime import datetime, timedelta from typing import Optional, Dict, Any import socketio from fastapi import FastAPI, HTTPException, Depends, Header from pydantic import BaseModel import asyncpg from contextlib import asynccontextmanager # Configuration DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl") SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production") SCOREBOARD_URL = os.getenv("SCOREBOARD_URL", "http://10.60.0.1:8080") OUR_TEAM_ID = int(os.getenv("OUR_TEAM_ID", "1")) ALERT_THRESHOLD_POINTS = float(os.getenv("ALERT_THRESHOLD_POINTS", "100")) ALERT_THRESHOLD_TIME = int(os.getenv("ALERT_THRESHOLD_TIME", "300")) # seconds TELEGRAM_API_URL = os.getenv("TELEGRAM_API_URL", "http://tg-bot:8003/send") # Database pool db_pool = None ws_task = None class AttackStats(BaseModel): total_attacks: int attacks_by_us: int attacks_to_us: int recent_attacks: int critical_alerts: int # Auth dependency async def verify_token(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization header") token = authorization.replace("Bearer ", "") if token != SECRET_TOKEN: raise HTTPException(status_code=403, detail="Invalid token") return token # Database functions async def get_db(): return await db_pool.acquire() async def release_db(conn): await db_pool.release(conn) async def process_attack_event(event: Dict[str, Any]): """Process attack event from scoreboard""" conn = await db_pool.acquire() try: # Extract attack information from event # Handle multiple possible event formats from ForcAD event_type = event.get('type', 'unknown') # Try to extract attacker/victim IDs from various possible fields attacker_id = event.get('attacker_id') or event.get('team_id') or event.get('attacker') victim_id = event.get('victim_id') or event.get('target_id') or event.get('victim') or event.get('target') # Skip if we don't have both attacker and victim if attacker_id is None or victim_id is None: print(f"Skipping event with missing attacker/victim: {event}") return # Convert to integers if they're strings try: attacker_id = int(attacker_id) victim_id = int(victim_id) except (ValueError, TypeError): print(f"Invalid team IDs in event: attacker={attacker_id}, victim={victim_id}") return service_name = event.get('service') or event.get('service_name') or event.get('task_name') or 'unknown' flag = event.get('flag', '') # Handle timestamp time_str = event.get('time') or event.get('timestamp') if time_str: try: # Try parsing ISO format timestamp = datetime.fromisoformat(time_str.replace('Z', '+00:00')) except (ValueError, AttributeError): # Try parsing as Unix timestamp try: timestamp = datetime.fromtimestamp(float(time_str)) except (ValueError, TypeError): timestamp = datetime.utcnow() else: timestamp = datetime.utcnow() # Extract points (might be in different fields) points = float(event.get('points', 0) or event.get('score', 0) or 1.0) # Generate unique attack ID round_num = event.get('round', event.get('round_id', 0)) attack_id = event.get('id') or f"{round_num}_{attacker_id}_{victim_id}_{service_name}_{int(timestamp.timestamp())}" is_our_attack = attacker_id == OUR_TEAM_ID is_attack_to_us = victim_id == OUR_TEAM_ID # Only log if it involves our team if is_our_attack or is_attack_to_us: # Store attack in database inserted = await conn.fetchval(""" INSERT INTO attacks (attack_id, attacker_team_id, victim_team_id, service_name, flag, timestamp, points, is_our_attack, is_attack_to_us) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (attack_id) DO NOTHING RETURNING id """, attack_id, attacker_id, victim_id, service_name, flag, timestamp, points, is_our_attack, is_attack_to_us) if inserted: print(f"[{event_type}] Logged attack: Team {attacker_id} -> Team {victim_id} | {service_name} | {points} pts") # Check for alert conditions if attack is against us if is_attack_to_us: await check_and_create_alerts(conn, attacker_id, service_name) except Exception as e: print(f"Error processing attack event: {e}") print(f"Event data: {event}") finally: await db_pool.release(conn) async def check_and_create_alerts(conn, attacker_id: int, service_name: str): """Check if we should create an alert for attacks against us""" threshold_time = datetime.utcnow() - timedelta(seconds=ALERT_THRESHOLD_TIME) # Check total points lost from this attacker in threshold time result = await conn.fetchrow(""" SELECT COUNT(*) as attack_count, COALESCE(SUM(points), 0) as total_points FROM attacks WHERE is_attack_to_us = true AND attacker_team_id = $1 AND service_name = $2 AND timestamp > $3 """, attacker_id, service_name, threshold_time) if result and result['total_points'] >= ALERT_THRESHOLD_POINTS: # Create alert alert_message = f"CRITICAL: Team {attacker_id} has stolen {result['total_points']:.2f} points from service {service_name} in the last {ALERT_THRESHOLD_TIME}s ({result['attack_count']} attacks)" # Check if we already alerted recently recent_alert = await conn.fetchrow(""" SELECT id FROM attack_alerts WHERE alert_type = 'high_point_loss' AND message LIKE $1 AND created_at > $2 """, f"%Team {attacker_id}%{service_name}%", threshold_time) if not recent_alert: alert_id = await conn.fetchval(""" INSERT INTO attack_alerts (attack_id, alert_type, severity, message) VALUES ( (SELECT id FROM attacks WHERE attacker_team_id = $1 AND service_name = $2 ORDER BY timestamp DESC LIMIT 1), 'high_point_loss', 'critical', $3 ) RETURNING id """, attacker_id, service_name, alert_message) # Send to telegram await send_telegram_alert(alert_message) # Mark as notified await conn.execute("UPDATE attack_alerts SET notified = true WHERE id = $1", alert_id) async def send_telegram_alert(message: str): """Send alert to telegram bot""" import aiohttp try: async with aiohttp.ClientSession() as session: async with session.post( TELEGRAM_API_URL, json={"message": message}, headers={"Authorization": f"Bearer {SECRET_TOKEN}"} ) as resp: if resp.status != 200: print(f"Failed to send telegram alert: {await resp.text()}") except Exception as e: print(f"Error sending telegram alert: {e}") async def socketio_listener(): """Listen to ForcAD scoreboard using Socket.IO""" sio = socketio.AsyncClient(logger=False, engineio_logger=False) # Cache for task names task_names = {} @sio.event(namespace='/game_events') async def update_scoreboard(data): """Handle scoreboard update - compare with previous state to detect NEW attacks""" try: event_data = data.get('data', {}) round_num = event_data.get('round', 0) round_start = event_data.get('round_start', 0) team_tasks = event_data.get('team_tasks', []) print(f"📊 Round {round_num} - Processing {len(team_tasks)} team updates") conn = await db_pool.acquire() try: for team_task in team_tasks: team_id = team_task.get('team_id') task_id = team_task.get('task_id') current_stolen = team_task.get('stolen', 0) current_lost = team_task.get('lost', 0) service_name = task_names.get(task_id, f"task_{task_id}") # Get previous state from database prev_state = await conn.fetchrow( "SELECT stolen_flags, lost_flags FROM scoreboard_state WHERE team_id = $1 AND service_name = $2", team_id, service_name ) prev_stolen = prev_state['stolen_flags'] if prev_state else 0 prev_lost = prev_state['lost_flags'] if prev_state else 0 # Calculate NEW flags (difference from previous state) new_stolen = current_stolen - prev_stolen new_lost = current_lost - prev_lost # Update current state in database await conn.execute(""" INSERT INTO scoreboard_state (team_id, service_name, stolen_flags, lost_flags, last_updated) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT (team_id, service_name) DO UPDATE SET stolen_flags = $3, lost_flags = $4, last_updated = NOW() """, team_id, service_name, current_stolen, current_lost) # Create attack record only for NEW stolen flags if new_stolen > 0: timestamp = datetime.utcnow() attack_id = f"r{round_num}_stolen_team{team_id}_{service_name}_{int(timestamp.timestamp())}" is_our_attack = team_id == OUR_TEAM_ID await conn.execute(""" INSERT INTO attacks (attack_id, attacker_team_id, service_name, timestamp, points, is_our_attack, is_attack_to_us) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (attack_id) DO NOTHING """, attack_id, team_id, service_name, timestamp, float(new_stolen), is_our_attack, False) if is_our_attack: print(f" ✅ We stole {new_stolen} NEW flags from {service_name} (total: {current_stolen})") else: print(f" 📌 Team {team_id} stole {new_stolen} NEW flags from {service_name} (total: {current_stolen})") # Create attack record only for NEW lost flags if new_lost > 0: timestamp = datetime.utcnow() attack_id = f"r{round_num}_lost_team{team_id}_{service_name}_{int(timestamp.timestamp())}" is_attack_to_us = team_id == OUR_TEAM_ID await conn.execute(""" INSERT INTO attacks (attack_id, victim_team_id, service_name, timestamp, points, is_our_attack, is_attack_to_us) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (attack_id) DO NOTHING """, attack_id, team_id, service_name, timestamp, float(new_lost), False, is_attack_to_us) if is_attack_to_us: print(f" ⚠️ We LOST {new_lost} NEW flags on {service_name} (total lost: {current_lost})") # Check for alerts on high-value attacks if new_lost >= ALERT_THRESHOLD_POINTS: await check_and_create_alerts(conn, 0, service_name) else: print(f" 📌 Team {team_id} lost {new_lost} NEW flags on {service_name} (total: {current_lost})") finally: await db_pool.release(conn) except Exception as e: print(f"Error processing update_scoreboard: {e}") import traceback traceback.print_exc() @sio.event(namespace='/game_events') async def init_scoreboard(data): """Handle initial scoreboard data""" try: print("📡 Received initial scoreboard data") event_data = data.get('data', {}) teams = event_data.get('teams', []) tasks = event_data.get('tasks', []) # Cache task names for task in tasks: task_names[task.get('id')] = task.get('name') team_names_str = ', '.join([f"{t.get('name')} (ID:{t.get('id')})" for t in teams]) task_names_str = ', '.join([t.get('name') for t in tasks]) print(f" Teams: {team_names_str}") print(f" Tasks: {task_names_str}") except Exception as e: print(f"Error processing init_scoreboard: {e}") @sio.event async def connect(): print("✅ Connected to ForcAD scoreboard Socket.IO") @sio.event async def disconnect(): print("❌ Disconnected from scoreboard") while True: try: print(f"Connecting to {SCOREBOARD_URL}/socket.io ...") await sio.connect( SCOREBOARD_URL, namespaces=['/game_events'], transports=['websocket'] ) await sio.wait() except Exception as e: print(f"Socket.IO error: {e}") print("Reconnecting in 5 seconds...") await asyncio.sleep(5) # Lifespan context @asynccontextmanager async def lifespan(app: FastAPI): global db_pool, ws_task db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) print(f"Starting Socket.IO listener") print(f"Scoreboard URL: {SCOREBOARD_URL}") print(f"Our team ID: {OUR_TEAM_ID}") ws_task = asyncio.create_task(socketio_listener()) yield # Cleanup if ws_task: ws_task.cancel() try: await ws_task except asyncio.CancelledError: pass await db_pool.close() app = FastAPI(title="Scoreboard Injector", lifespan=lifespan) # API Endpoints @app.get("/health") async def health_check(): return { "status": "ok", "timestamp": datetime.utcnow().isoformat(), "team_id": OUR_TEAM_ID, "mode": "socketio", "scoreboard_url": SCOREBOARD_URL } @app.get("/stats", dependencies=[Depends(verify_token)]) async def get_stats(): """Get attack statistics""" conn = await get_db() try: total = await conn.fetchval("SELECT COUNT(*) FROM attacks") attacks_by_us = await conn.fetchval("SELECT COUNT(*) FROM attacks WHERE is_our_attack = true") attacks_to_us = await conn.fetchval("SELECT COUNT(*) FROM attacks WHERE is_attack_to_us = true") threshold_time = datetime.utcnow() - timedelta(minutes=5) recent = await conn.fetchval("SELECT COUNT(*) FROM attacks WHERE timestamp > $1", threshold_time) critical_alerts = await conn.fetchval( "SELECT COUNT(*) FROM attack_alerts WHERE severity = 'critical' AND created_at > $1", threshold_time ) return { "total_attacks": total, "attacks_by_us": attacks_by_us, "attacks_to_us": attacks_to_us, "recent_attacks_5min": recent, "critical_alerts_5min": critical_alerts } finally: await release_db(conn) @app.get("/attacks", dependencies=[Depends(verify_token)]) async def get_attacks(limit: int = 100, our_attacks: Optional[bool] = None, attacks_to_us: Optional[bool] = None): """Get recent attacks""" conn = await get_db() try: query = "SELECT * FROM attacks WHERE 1=1" params = [] param_count = 0 if our_attacks is not None: param_count += 1 query += f" AND is_our_attack = ${param_count}" params.append(our_attacks) if attacks_to_us is not None: param_count += 1 query += f" AND is_attack_to_us = ${param_count}" params.append(attacks_to_us) param_count += 1 query += f" ORDER BY timestamp DESC LIMIT ${param_count}" params.append(limit) rows = await conn.fetch(query, *params) return [dict(row) for row in rows] finally: await release_db(conn) @app.get("/alerts", dependencies=[Depends(verify_token)]) async def get_alerts(limit: int = 50, unnotified: bool = False): """Get alerts""" conn = await get_db() try: if unnotified: query = "SELECT * FROM attack_alerts WHERE notified = false ORDER BY created_at DESC LIMIT $1" else: query = "SELECT * FROM attack_alerts ORDER BY created_at DESC LIMIT $1" rows = await conn.fetch(query, limit) return [dict(row) for row in rows] finally: await release_db(conn) @app.post("/alerts/{alert_id}/acknowledge", dependencies=[Depends(verify_token)]) async def acknowledge_alert(alert_id: int): """Mark alert as acknowledged""" conn = await get_db() try: await conn.execute("UPDATE attack_alerts SET notified = true WHERE id = $1", alert_id) return {"status": "acknowledged", "alert_id": alert_id} finally: await release_db(conn) @app.get("/attacks/by-service", dependencies=[Depends(verify_token)]) async def get_attacks_by_service(): """Get attack statistics grouped by service""" conn = await get_db() try: rows = await conn.fetch(""" SELECT service_name, COUNT(*) as total_attacks, COUNT(*) FILTER (WHERE is_our_attack = true) as our_attacks, COUNT(*) FILTER (WHERE is_attack_to_us = true) as attacks_to_us, COALESCE(SUM(points) FILTER (WHERE is_our_attack = true), 0) as points_gained, COALESCE(SUM(points) FILTER (WHERE is_attack_to_us = true), 0) as points_lost FROM attacks GROUP BY service_name ORDER BY total_attacks DESC """) return [dict(row) for row in rows] finally: await release_db(conn) @app.post("/settings/team-id", dependencies=[Depends(verify_token)]) async def set_team_id(team_id: int): """Update our team ID""" global OUR_TEAM_ID OUR_TEAM_ID = team_id conn = await get_db() try: await conn.execute( "INSERT INTO settings (key, value) VALUES ('our_team_id', $1) ON CONFLICT (key) DO UPDATE SET value = $1", str(team_id) ) return {"team_id": team_id} finally: await release_db(conn) @app.get("/settings/team-id", dependencies=[Depends(verify_token)]) async def get_team_id(): """Get current team ID setting""" return {"team_id": OUR_TEAM_ID} @app.post("/test/inject-attack", dependencies=[Depends(verify_token)]) async def inject_test_attack(attacker_id: int, victim_id: int, service: str = "test-service", points: float = 10.0): """Manually inject a test attack event for debugging""" test_event = { "type": "attack", "attacker_id": attacker_id, "victim_id": victim_id, "service": service, "flag": "TEST_FLAG_" + datetime.utcnow().isoformat(), "points": points, "time": datetime.utcnow().isoformat(), "round": 1 } await process_attack_event(test_event) return {"status": "injected", "event": test_event} @app.get("/debug/scoreboard", dependencies=[Depends(verify_token)]) async def debug_scoreboard(): """Check if scoreboard is reachable and show connection info""" import aiohttp results = { "mode": "socketio", "config": { "scoreboard_url": SCOREBOARD_URL, "our_team_id": OUR_TEAM_ID }, "endpoints_tested": [] } try: async with aiohttp.ClientSession() as session: # Test Socket.IO endpoint socketio_url = f"{SCOREBOARD_URL}/socket.io/?EIO=4&transport=polling" try: async with session.get(socketio_url, timeout=aiohttp.ClientTimeout(total=5)) as resp: results["socketio_status"] = { "url": socketio_url, "status": resp.status, "reachable": resp.status == 200, "response_preview": (await resp.text())[:200] if resp.status == 200 else None } except Exception as e: results["socketio_status"] = { "url": socketio_url, "reachable": False, "error": str(e) } # Test base scoreboard URL try: async with session.get(SCOREBOARD_URL, timeout=aiohttp.ClientTimeout(total=5)) as resp: results["base_url_status"] = { "url": SCOREBOARD_URL, "status": resp.status, "reachable": resp.status == 200 } except Exception as e: results["base_url_status"] = { "url": SCOREBOARD_URL, "reachable": False, "error": str(e) } # Test attack_data endpoint (for reference only) attack_data_url = f"{SCOREBOARD_URL}/api/client/attack_data" try: async with session.get(attack_data_url, timeout=aiohttp.ClientTimeout(total=5)) as resp: result = { "url": attack_data_url, "status": resp.status, "reachable": resp.status == 200, "content_type": resp.headers.get('Content-Type', ''), "note": "Contains exploit credentials, not attack events" } if resp.status == 200 and 'application/json' in resp.headers.get('Content-Type', ''): data = await resp.json() result["services"] = list(data.keys()) if isinstance(data, dict) else None results["endpoints_tested"].append(result) except Exception as e: results["endpoints_tested"].append({ "url": attack_data_url, "reachable": False, "error": str(e) }) except Exception as e: results["error"] = str(e) return results if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8002)