315 lines
12 KiB
Python
315 lines
12 KiB
Python
"""
|
|
API Controller for A/D Infrastructure
|
|
Manages docker-compose services with authentication
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Optional, List
|
|
from fastapi import FastAPI, HTTPException, Depends, Header
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import BaseModel
|
|
import asyncpg
|
|
from contextlib import asynccontextmanager
|
|
|
|
# Configuration
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl")
|
|
SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production")
|
|
SERVICES_DIR = os.getenv("SERVICES_DIR", "/services")
|
|
|
|
# Database pool
|
|
db_pool = None
|
|
|
|
class ServiceCreate(BaseModel):
|
|
name: str
|
|
path: str
|
|
git_url: Optional[str] = None
|
|
|
|
class ServiceAction(BaseModel):
|
|
action: str # start, stop, restart
|
|
|
|
class GitPullRequest(BaseModel):
|
|
auto: bool = False
|
|
|
|
class LogRequest(BaseModel):
|
|
lines: int = 100
|
|
|
|
# Auth dependency
|
|
async def verify_token(authorization: str = Header(None)):
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="Missing or invalid authorization header")
|
|
|
|
token = authorization.replace("Bearer ", "")
|
|
if token != SECRET_TOKEN:
|
|
raise HTTPException(status_code=403, detail="Invalid token")
|
|
return token
|
|
|
|
# Database functions
|
|
async def get_db():
|
|
return await db_pool.acquire()
|
|
|
|
async def release_db(conn):
|
|
await db_pool.release(conn)
|
|
|
|
async def log_service_action(conn, service_id: int, action: str, status: str, message: str = None):
|
|
await conn.execute(
|
|
"INSERT INTO service_logs (service_id, action, status, message) VALUES ($1, $2, $3, $4)",
|
|
service_id, action, status, message
|
|
)
|
|
|
|
# Lifespan context
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
global db_pool
|
|
db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10)
|
|
yield
|
|
await db_pool.close()
|
|
|
|
app = FastAPI(title="A/D Infrastructure Controller", lifespan=lifespan)
|
|
|
|
# Helper functions
|
|
async def run_docker_compose_command(service_path: str, command: List[str]) -> tuple[int, str, str]:
|
|
"""Run docker-compose command and return (returncode, stdout, stderr)"""
|
|
full_command = ["docker-compose", "-f", os.path.join(service_path, "docker-compose.yml")] + command
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*full_command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=service_path
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
return process.returncode, stdout.decode(), stderr.decode()
|
|
|
|
async def run_git_command(service_path: str, command: List[str]) -> tuple[int, str, str]:
|
|
"""Run git command and return (returncode, stdout, stderr)"""
|
|
process = await asyncio.create_subprocess_exec(
|
|
"git", *command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=service_path
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
return process.returncode, stdout.decode(), stderr.decode()
|
|
|
|
# API Endpoints
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
|
|
|
|
@app.post("/services", dependencies=[Depends(verify_token)])
|
|
async def create_service(service: ServiceCreate):
|
|
"""Register a new service"""
|
|
conn = await get_db()
|
|
try:
|
|
# Check if service directory exists
|
|
service_path = os.path.join(SERVICES_DIR, service.path)
|
|
if not os.path.exists(service_path):
|
|
raise HTTPException(status_code=404, detail=f"Service path not found: {service_path}")
|
|
|
|
# Check if docker-compose.yml exists
|
|
compose_file = os.path.join(service_path, "docker-compose.yml")
|
|
if not os.path.exists(compose_file):
|
|
raise HTTPException(status_code=404, detail=f"docker-compose.yml not found in {service_path}")
|
|
|
|
service_id = await conn.fetchval(
|
|
"INSERT INTO services (name, path, git_url, status) VALUES ($1, $2, $3, $4) RETURNING id",
|
|
service.name, service_path, service.git_url, "stopped"
|
|
)
|
|
|
|
await log_service_action(conn, service_id, "register", "success", "Service registered")
|
|
|
|
return {"id": service_id, "name": service.name, "status": "registered"}
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.get("/services", dependencies=[Depends(verify_token)])
|
|
async def list_services():
|
|
"""List all registered services"""
|
|
conn = await get_db()
|
|
try:
|
|
rows = await conn.fetch("SELECT * FROM services ORDER BY name")
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.get("/services/{service_id}", dependencies=[Depends(verify_token)])
|
|
async def get_service(service_id: int):
|
|
"""Get service details"""
|
|
conn = await get_db()
|
|
try:
|
|
service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
return dict(service)
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.post("/services/{service_id}/action", dependencies=[Depends(verify_token)])
|
|
async def service_action(service_id: int, action: ServiceAction):
|
|
"""Start, stop, or restart a service"""
|
|
conn = await get_db()
|
|
try:
|
|
service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
|
|
service_path = service['path']
|
|
|
|
# Map actions to docker-compose commands
|
|
command_map = {
|
|
"start": ["up", "-d"],
|
|
"stop": ["down"],
|
|
"restart": ["restart"]
|
|
}
|
|
|
|
if action.action not in command_map:
|
|
raise HTTPException(status_code=400, detail=f"Invalid action: {action.action}")
|
|
|
|
returncode, stdout, stderr = await run_docker_compose_command(
|
|
service_path, command_map[action.action]
|
|
)
|
|
|
|
if returncode == 0:
|
|
new_status = "running" if action.action in ["start", "restart"] else "stopped"
|
|
await conn.execute(
|
|
"UPDATE services SET status = $1, last_updated = $2 WHERE id = $3",
|
|
new_status, datetime.utcnow(), service_id
|
|
)
|
|
await log_service_action(conn, service_id, action.action, "success", stdout)
|
|
return {"status": "success", "action": action.action, "output": stdout}
|
|
else:
|
|
await log_service_action(conn, service_id, action.action, "failed", stderr)
|
|
raise HTTPException(status_code=500, detail=f"Command failed: {stderr}")
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.post("/services/{service_id}/pull", dependencies=[Depends(verify_token)])
|
|
async def git_pull(service_id: int, request: GitPullRequest):
|
|
"""Pull latest changes from git repository"""
|
|
conn = await get_db()
|
|
try:
|
|
service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
|
|
if not service['git_url']:
|
|
raise HTTPException(status_code=400, detail="Service has no git URL configured")
|
|
|
|
service_path = service['path']
|
|
|
|
# Check if it's a git repository
|
|
if not os.path.exists(os.path.join(service_path, ".git")):
|
|
raise HTTPException(status_code=400, detail="Not a git repository")
|
|
|
|
# Pull changes
|
|
returncode, stdout, stderr = await run_git_command(service_path, ["pull"])
|
|
|
|
if returncode == 0:
|
|
await log_service_action(conn, service_id, "git_pull", "success", stdout)
|
|
|
|
# Auto-restart if requested
|
|
if request.auto:
|
|
restart_returncode, restart_stdout, restart_stderr = await run_docker_compose_command(
|
|
service_path, ["restart"]
|
|
)
|
|
if restart_returncode == 0:
|
|
await log_service_action(conn, service_id, "auto_restart", "success", restart_stdout)
|
|
return {
|
|
"status": "success",
|
|
"pull_output": stdout,
|
|
"restart_output": restart_stdout
|
|
}
|
|
else:
|
|
await log_service_action(conn, service_id, "auto_restart", "failed", restart_stderr)
|
|
return {
|
|
"status": "partial_success",
|
|
"pull_output": stdout,
|
|
"restart_error": restart_stderr
|
|
}
|
|
|
|
return {"status": "success", "output": stdout}
|
|
else:
|
|
await log_service_action(conn, service_id, "git_pull", "failed", stderr)
|
|
raise HTTPException(status_code=500, detail=f"Git pull failed: {stderr}")
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.get("/services/{service_id}/logs", dependencies=[Depends(verify_token)])
|
|
async def get_logs(service_id: int, lines: int = 100):
|
|
"""Get service logs from docker-compose"""
|
|
conn = await get_db()
|
|
try:
|
|
service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
|
|
service_path = service['path']
|
|
|
|
returncode, stdout, stderr = await run_docker_compose_command(
|
|
service_path, ["logs", "--tail", str(lines)]
|
|
)
|
|
|
|
if returncode == 0:
|
|
return {"logs": stdout}
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get logs: {stderr}")
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.get("/services/{service_id}/status", dependencies=[Depends(verify_token)])
|
|
async def get_service_status(service_id: int):
|
|
"""Get real-time service status from docker-compose"""
|
|
conn = await get_db()
|
|
try:
|
|
service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
|
|
service_path = service['path']
|
|
|
|
returncode, stdout, stderr = await run_docker_compose_command(
|
|
service_path, ["ps"]
|
|
)
|
|
|
|
if returncode == 0:
|
|
return {"status": stdout, "db_status": service['status']}
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get status: {stderr}")
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.get("/services/{service_id}/action-logs", dependencies=[Depends(verify_token)])
|
|
async def get_action_logs(service_id: int, limit: int = 50):
|
|
"""Get service action history"""
|
|
conn = await get_db()
|
|
try:
|
|
logs = await conn.fetch(
|
|
"SELECT * FROM service_logs WHERE service_id = $1 ORDER BY created_at DESC LIMIT $2",
|
|
service_id, limit
|
|
)
|
|
return [dict(log) for log in logs]
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
@app.delete("/services/{service_id}", dependencies=[Depends(verify_token)])
|
|
async def delete_service(service_id: int):
|
|
"""Unregister a service"""
|
|
conn = await get_db()
|
|
try:
|
|
service = await conn.fetchrow("SELECT * FROM services WHERE id = $1", service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
|
|
await conn.execute("DELETE FROM services WHERE id = $1", service_id)
|
|
return {"status": "deleted", "service_id": service_id}
|
|
finally:
|
|
await release_db(conn)
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8001)
|