""" API Controller for A/D Infrastructure Manages docker-compose services with authentication """ import os import subprocess import asyncio from datetime import datetime from typing import Optional, List from fastapi import FastAPI, HTTPException, Depends, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import asyncpg from contextlib import asynccontextmanager # Configuration DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl") SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production") SERVICES_DIR = os.getenv("SERVICES_DIR", "/services") # Database pool db_pool = None class ServiceCreate(BaseModel): name: str path: str git_url: Optional[str] = None class ServiceAction(BaseModel): action: str # start, stop, restart class GitPullRequest(BaseModel): auto: bool = False class LogRequest(BaseModel): lines: int = 100 # Auth dependency async def verify_token(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization header") token = authorization.replace("Bearer ", "") if token != SECRET_TOKEN: raise HTTPException(status_code=403, detail="Invalid token") return token # Database functions async def get_db(): return await db_pool.acquire() async def release_db(conn): await db_pool.release(conn) async def log_service_action(conn, service_id: int, action: str, status: str, message: str = None): await conn.execute( "INSERT INTO service_logs (service_id, action, status, message) VALUES ($1, $2, $3, $4)", service_id, action, status, message ) # Lifespan context @asynccontextmanager async def lifespan(app: FastAPI): global db_pool db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) yield await db_pool.close() app = FastAPI(title="A/D Infrastructure Controller", lifespan=lifespan) # Helper functions async def run_docker_compose_command(service_path: str, command: List[str]) -> tuple[int, str, str]: """Run docker-compose command and return (returncode, stdout, stderr)""" full_command = ["docker-compose", "-f", os.path.join(service_path, "docker-compose.yml")] + command process = await asyncio.create_subprocess_exec( *full_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=service_path ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode(), stderr.decode() async def run_git_command(service_path: str, command: List[str]) -> tuple[int, str, str]: """Run git command and return (returncode, stdout, stderr)""" process = await asyncio.create_subprocess_exec( "git", *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=service_path ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode(), stderr.decode() # API Endpoints @app.get("/health") async def health_check(): return {"status": "ok", "timestamp": datetime.utcnow().isoformat()} @app.post("/services", dependencies=[Depends(verify_token)]) async def create_service(service: ServiceCreate): """Register a new service""" conn = await get_db() try: # Check if service directory exists service_path = os.path.join(SERVICES_DIR, service.path) if not os.path.exists(service_path): raise HTTPException(status_code=404, detail=f"Service path not found: {service_path}") # Check if docker-compose file exists (yml or yaml) compose_file = os.path.join(service_path, "docker-compose.yml") if not os.path.exists(compose_file): compose_file = os.path.join(service_path, "docker-compose.yaml") if not os.path.exists(compose_file): compose_file = os.path.join(service_path, "compose.yml") if not os.path.exists(compose_file): raise HTTPException(status_code=404, detail=f"docker-compose file not found in {service_path}") service_id = await conn.fetchval( "INSERT INTO services (name, path, git_url, status) VALUES ($1, $2, $3, $4) RETURNING id", service.name, service_path, service.git_url, "stopped" ) await log_service_action(conn, service_id, "register", "success", "Service registered") return {"id": service_id, "name": service.name, "status": "registered"} finally: await release_db(conn) @app.get("/services", dependencies=[Depends(verify_token)]) async def list_services(): """List all registered services""" conn = await get_db() try: rows = await conn.fetch("SELECT * FROM services ORDER BY name") return [dict(row) for row in rows] finally: await release_db(conn) @app.get("/services/{service_id}", dependencies=[Depends(verify_token)]) async def get_service(service_id: int): """Get service details""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") return dict(service) finally: await release_db(conn) @app.post("/services/{service_id}/action", dependencies=[Depends(verify_token)]) async def service_action(service_id: int, action: ServiceAction): """Start, stop, or restart a service""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") service_path = service['path'] # Map actions to docker-compose commands command_map = { "start": ["up", "-d"], "stop": ["down"], "restart": ["restart"] } if action.action not in command_map: raise HTTPException(status_code=400, detail=f"Invalid action: {action.action}") returncode, stdout, stderr = await run_docker_compose_command( service_path, command_map[action.action] ) if returncode == 0: new_status = "running" if action.action in ["start", "restart"] else "stopped" await conn.execute( "UPDATE services SET status = $1, last_updated = $2 WHERE id = $3", new_status, datetime.utcnow(), service_id ) await log_service_action(conn, service_id, action.action, "success", stdout) return {"status": "success", "action": action.action, "output": stdout} else: await log_service_action(conn, service_id, action.action, "failed", stderr) raise HTTPException(status_code=500, detail=f"Command failed: {stderr}") finally: await release_db(conn) @app.post("/services/{service_id}/pull", dependencies=[Depends(verify_token)]) async def git_pull(service_id: int, request: GitPullRequest): """Pull latest changes from git repository""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") if not service['git_url']: raise HTTPException(status_code=400, detail="Service has no git URL configured") service_path = service['path'] # Check if it's a git repository if not os.path.exists(os.path.join(service_path, ".git")): raise HTTPException(status_code=400, detail="Not a git repository") # Pull changes returncode, stdout, stderr = await run_git_command(service_path, ["pull"]) if returncode == 0: await log_service_action(conn, service_id, "git_pull", "success", stdout) # Auto-restart if requested if request.auto: restart_returncode, restart_stdout, restart_stderr = await run_docker_compose_command( service_path, ["restart"] ) if restart_returncode == 0: await log_service_action(conn, service_id, "auto_restart", "success", restart_stdout) return { "status": "success", "pull_output": stdout, "restart_output": restart_stdout } else: await log_service_action(conn, service_id, "auto_restart", "failed", restart_stderr) return { "status": "partial_success", "pull_output": stdout, "restart_error": restart_stderr } return {"status": "success", "output": stdout} else: await log_service_action(conn, service_id, "git_pull", "failed", stderr) raise HTTPException(status_code=500, detail=f"Git pull failed: {stderr}") finally: await release_db(conn) @app.get("/services/{service_id}/logs", dependencies=[Depends(verify_token)]) async def get_logs(service_id: int, lines: int = 100): """Get service logs from docker-compose""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") service_path = service['path'] returncode, stdout, stderr = await run_docker_compose_command( service_path, ["logs", "--tail", str(lines)] ) if returncode == 0: return {"logs": stdout} else: raise HTTPException(status_code=500, detail=f"Failed to get logs: {stderr}") finally: await release_db(conn) @app.get("/services/{service_id}/status", dependencies=[Depends(verify_token)]) async def get_service_status(service_id: int): """Get real-time service status from docker-compose""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") service_path = service['path'] returncode, stdout, stderr = await run_docker_compose_command( service_path, ["ps"] ) if returncode == 0: return {"status": stdout, "db_status": service['status']} else: raise HTTPException(status_code=500, detail=f"Failed to get status: {stderr}") finally: await release_db(conn) @app.get("/services/{service_id}/action-logs", dependencies=[Depends(verify_token)]) async def get_action_logs(service_id: int, limit: int = 50): """Get service action history""" conn = await get_db() try: logs = await conn.fetch( "SELECT * FROM service_logs WHERE service_id = $1 ORDER BY created_at DESC LIMIT $2", service_id, limit ) return [dict(log) for log in logs] finally: await release_db(conn) @app.delete("/services/{service_id}", dependencies=[Depends(verify_token)]) async def delete_service(service_id: int): """Unregister a service""" conn = await get_db() try: service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") await conn.execute("DELETE FROM services WHERE id = $1", service_id) return {"status": "deleted", "service_id": service_id} finally: await release_db(conn) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)