""" API Controller for A/D Infrastructure Manages docker-compose services with authentication """ import os import subprocess import asyncio from datetime import datetime from typing import Optional, List from fastapi import FastAPI, HTTPException, Depends, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import asyncpg from contextlib import asynccontextmanager # Configuration DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl") SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production") SERVICES_DIR = os.getenv("SERVICES_DIR", "/services") # Database pool db_pool = None class ServiceCreate(BaseModel): name: str path: str git_url: Optional[str] = None class ServiceAction(BaseModel): action: str # start, stop, restart class GitPullRequest(BaseModel): auto: bool = False class LogRequest(BaseModel): lines: int = 100 # Auth dependency async def verify_token(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization header") token = authorization.replace("Bearer ", "") if token != SECRET_TOKEN: raise HTTPException(status_code=403, detail="Invalid token") return token # Database functions async def get_db(): return await db_pool.acquire() async def release_db(conn): await db_pool.release(conn) async def log_service_action(conn, service_id: int, action: str, status: str, message: str = None): await conn.execute( "INSERT INTO service_logs (service_id, action, status, message) VALUES ($1, $2, $3, $4)", service_id, action, status, message ) # Lifespan context @asynccontextmanager async def lifespan(app: FastAPI): global db_pool db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) yield await db_pool.close() app = FastAPI(title="A/D Infrastructure Controller", lifespan=lifespan) # Helper functions def find_compose_file(service_path: str) -> Optional[str]: """Find docker-compose file in service directory""" possible_names = ["docker-compose.yml", "docker-compose.yaml", "compose.yml", "compose.yaml"] for name in possible_names: compose_file = os.path.join(service_path, name) if os.path.exists(compose_file): return compose_file return None def parse_docker_status(ps_output: str) -> dict: """Parse docker-compose ps output to get container status""" lines = ps_output.strip().split('\n') containers = [] running_count = 0 stopped_count = 0 for line in lines[1:]: # Skip header if line.strip(): # Look for status indicators in the line if 'Up' in line: running_count += 1 containers.append({'status': 'running', 'info': line.strip()}) elif 'Exit' in line or 'Exited' in line: stopped_count += 1 containers.append({'status': 'stopped', 'info': line.strip()}) else: containers.append({'status': 'unknown', 'info': line.strip()}) # Determine overall status if running_count > 0 and stopped_count == 0: overall_status = 'running' elif running_count == 0 and stopped_count > 0: overall_status = 'stopped' elif running_count > 0 and stopped_count > 0: overall_status = 'partial' else: overall_status = 'unknown' return { 'overall_status': overall_status, 'running': running_count, 'stopped': stopped_count, 'containers': containers } async def run_docker_compose_command(service_path: str, command: List[str]) -> tuple[int, str, str]: """Run docker-compose command and return (returncode, stdout, stderr)""" compose_file = find_compose_file(service_path) if not compose_file: return 1, "", "docker-compose file not found" full_command = ["docker-compose", "-f", compose_file] + command process = await asyncio.create_subprocess_exec( *full_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=service_path ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode(), stderr.decode() async def run_git_command(service_path: str, command: List[str]) -> tuple[int, str, str]: """Run git command and return (returncode, stdout, stderr)""" process = await asyncio.create_subprocess_exec( "git", *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=service_path ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode(), stderr.decode() # API Endpoints @app.get("/health") async def health_check(): return {"status": "ok", "timestamp": datetime.utcnow().isoformat()} @app.get("/services/scan", dependencies=[Depends(verify_token)]) async def scan_services_directory(): """Scan /services directory for available services with docker-compose files""" if not os.path.exists(SERVICES_DIR): raise HTTPException(status_code=404, detail=f"Services directory not found: {SERVICES_DIR}") available_services = [] try: for entry in os.listdir(SERVICES_DIR): entry_path = os.path.join(SERVICES_DIR, entry) if os.path.isdir(entry_path): compose_file = find_compose_file(entry_path) if compose_file: # Check if already registered conn = await get_db() try: existing = await conn.fetchrow( "SELECT id, name, status FROM services WHERE path = $1", entry_path ) if existing: available_services.append({ "name": entry, "path": entry_path, "compose_file": os.path.basename(compose_file), "registered": True, "service_id": existing['id'], "service_name": existing['name'], "status": existing['status'] }) else: available_services.append({ "name": entry, "path": entry_path, "compose_file": os.path.basename(compose_file), "registered": False }) finally: await release_db(conn) return { "services_dir": SERVICES_DIR, "available_services": available_services, "count": len(available_services) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error scanning directory: {str(e)}") @app.post("/services", dependencies=[Depends(verify_token)]) async def create_service(service: ServiceCreate): """Register a new service""" conn = await get_db() try: # Build full service path - path should be relative to SERVICES_DIR # Remove leading slash if present relative_path = service.path.lstrip('/') service_path = os.path.join(SERVICES_DIR, relative_path) # Check if service directory exists if not os.path.exists(service_path): raise HTTPException(status_code=404, detail=f"Service path not found: {service_path}") if not os.path.isdir(service_path): raise HTTPException(status_code=400, detail=f"Service path is not a directory: {service_path}") # Check if docker-compose file exists compose_file = find_compose_file(service_path) if not compose_file: raise HTTPException(status_code=404, detail=f"docker-compose file not found in {service_path}") service_id = await conn.fetchval( "INSERT INTO services (name, path, git_url, status) VALUES ($1, $2, $3, $4) RETURNING id", service.name, service_path, service.git_url, "stopped" ) await log_service_action(conn, service_id, "register", "success", "Service registered") return {"id": service_id, "name": service.name, "status": "registered", "path": service_path} finally: await release_db(conn) @app.get("/services", dependencies=[Depends(verify_token)]) async def list_services(live_status: bool = True): """List all registered services with optional live status from docker""" conn = await get_db() try: rows = await conn.fetch("SELECT * FROM services ORDER BY name") services = [] for row in rows: service_dict = dict(row) # Get live status from docker if requested if live_status: returncode, stdout, stderr = await run_docker_compose_command( service_dict['path'], ["ps"] ) if returncode == 0 and stdout.strip(): parsed_status = parse_docker_status(stdout) service_dict['live_status'] = parsed_status['overall_status'] service_dict['containers_running'] = parsed_status['running'] service_dict['containers_stopped'] = parsed_status['stopped'] # Update DB status if different if parsed_status['overall_status'] != service_dict['status']: await conn.execute( "UPDATE services SET status = $1, last_updated = $2 WHERE id = $3", parsed_status['overall_status'], datetime.utcnow(), service_dict['id'] ) service_dict['status'] = parsed_status['overall_status'] else: service_dict['live_status'] = 'unavailable' service_dict['containers_running'] = 0 service_dict['containers_stopped'] = 0 services.append(service_dict) return services finally: await release_db(conn) @app.get("/services/{service_id}", dependencies=[Depends(verify_token)]) async def get_service(service_id: int): """Get service details""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") return dict(service) finally: await release_db(conn) @app.post("/services/{service_id}/action", dependencies=[Depends(verify_token)]) async def service_action(service_id: int, action: ServiceAction): """Start, stop, or restart a service""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") service_path = service['path'] # Map actions to docker-compose commands command_map = { "start": ["up", "-d"], "stop": ["down"], "restart": ["restart"] } if action.action not in command_map: raise HTTPException(status_code=400, detail=f"Invalid action: {action.action}") returncode, stdout, stderr = await run_docker_compose_command( service_path, command_map[action.action] ) if returncode == 0: new_status = "running" if action.action in ["start", "restart"] else "stopped" await conn.execute( "UPDATE services SET status = $1, last_updated = $2 WHERE id = $3", new_status, datetime.utcnow(), service_id ) await log_service_action(conn, service_id, action.action, "success", stdout) return {"status": "success", "action": action.action, "output": stdout} else: await log_service_action(conn, service_id, action.action, "failed", stderr) raise HTTPException(status_code=500, detail=f"Command failed: {stderr}") finally: await release_db(conn) @app.post("/services/{service_id}/pull", dependencies=[Depends(verify_token)]) async def git_pull(service_id: int, request: GitPullRequest): """Pull latest changes from git repository""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") if not service['git_url']: raise HTTPException(status_code=400, detail="Service has no git URL configured") service_path = service['path'] # Check if it's a git repository if not os.path.exists(os.path.join(service_path, ".git")): raise HTTPException(status_code=400, detail="Not a git repository") # Pull changes returncode, stdout, stderr = await run_git_command(service_path, ["pull"]) if returncode == 0: await log_service_action(conn, service_id, "git_pull", "success", stdout) # Auto-restart if requested if request.auto: restart_returncode, restart_stdout, restart_stderr = await run_docker_compose_command( service_path, ["restart"] ) if restart_returncode == 0: await log_service_action(conn, service_id, "auto_restart", "success", restart_stdout) return { "status": "success", "pull_output": stdout, "restart_output": restart_stdout } else: await log_service_action(conn, service_id, "auto_restart", "failed", restart_stderr) return { "status": "partial_success", "pull_output": stdout, "restart_error": restart_stderr } return {"status": "success", "output": stdout} else: await log_service_action(conn, service_id, "git_pull", "failed", stderr) raise HTTPException(status_code=500, detail=f"Git pull failed: {stderr}") finally: await release_db(conn) @app.get("/services/{service_id}/logs", dependencies=[Depends(verify_token)]) async def get_logs(service_id: int, lines: int = 100): """Get service logs from docker-compose""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") service_path = service['path'] returncode, stdout, stderr = await run_docker_compose_command( service_path, ["logs", "--tail", str(lines)] ) if returncode == 0: return {"logs": stdout} else: raise HTTPException(status_code=500, detail=f"Failed to get logs: {stderr}") finally: await release_db(conn) @app.get("/services/{service_id}/status", dependencies=[Depends(verify_token)]) async def get_service_status(service_id: int): """Get real-time service status from docker-compose ps""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") service_path = service['path'] returncode, stdout, stderr = await run_docker_compose_command( service_path, ["ps"] ) if returncode == 0: parsed_status = parse_docker_status(stdout) # Update database with current status if parsed_status['overall_status'] != service['status']: await conn.execute( "UPDATE services SET status = $1, last_updated = $2 WHERE id = $3", parsed_status['overall_status'], datetime.utcnow(), service_id ) return { "service_id": service_id, "service_name": service['name'], "live_status": parsed_status['overall_status'], "containers_running": parsed_status['running'], "containers_stopped": parsed_status['stopped'], "containers": parsed_status['containers'], "db_status": service['status'], "raw_output": stdout } else: raise HTTPException(status_code=500, detail=f"Failed to get status: {stderr}") finally: await release_db(conn) @app.get("/services/{service_id}/action-logs", dependencies=[Depends(verify_token)]) async def get_action_logs(service_id: int, limit: int = 50): """Get service action history""" conn = await get_db() try: logs = await conn.fetch( "SELECT * FROM service_logs WHERE service_id = $1 ORDER BY created_at DESC LIMIT $2", service_id, limit ) return [dict(log) for log in logs] finally: await release_db(conn) @app.delete("/services/{service_id}", dependencies=[Depends(verify_token)]) async def delete_service(service_id: int): """Unregister a service""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") await conn.execute("DELETE FROM services WHERE id = $1", service_id) return {"status": "deleted", "service_id": service_id} finally: await release_db(conn) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)