""" Telegram Bot for A/D Infrastructure Sends notifications to group chat """ import os from datetime import datetime from fastapi import FastAPI, HTTPException, Depends, Header from pydantic import BaseModel import asyncpg from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup, Update from telegram.error import TelegramError from telegram.ext import Application, CallbackQueryHandler, ContextTypes from contextlib import asynccontextmanager # Configuration DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl") SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") # Database pool and bot db_pool = None bot = None app_telegram = None async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle inline button clicks""" query = update.callback_query await query.answer() callback_data = query.data chat_id = query.message.chat_id if callback_data.startswith("service_"): action, service_id = callback_data.rsplit("_", 1) action = action.replace("service_", "") try: import aiohttp controller_url = os.getenv("CONTROLLER_API", "http://controller:8001") async with aiohttp.ClientSession() as session: api_url = f"{controller_url}/services/{service_id}/action" headers = {"Authorization": f"Bearer {SECRET_TOKEN}"} data = {"action": action} async with session.post(api_url, json=data, headers=headers) as resp: if resp.status == 200: result = await resp.json() await query.edit_message_text( text=f"✅ Service action '{action}' executed successfully\n{query.message.text}" ) await log_message(chat_id, f"Button action: {action} service {service_id}", True) else: error_text = await resp.text() await query.edit_message_text( text=f"❌ Failed to execute action: {error_text}\n{query.message.text}" ) await log_message(chat_id, f"Button action failed: {action} service {service_id}", False, error_text) except Exception as e: await query.edit_message_text( text=f"❌ Error: {str(e)}\n{query.message.text}" ) await log_message(chat_id, f"Button action error: {callback_data}", False, str(e)) class MessageRequest(BaseModel): message: str chat_id: str = None service_id: int = None service_name: str = None class BulkMessageRequest(BaseModel): messages: list[str] chat_id: str = None # Auth dependency async def verify_token(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization header") token = authorization.replace("Bearer ", "") if token != SECRET_TOKEN: raise HTTPException(status_code=403, detail="Invalid token") return token # Database functions async def get_db(): return await db_pool.acquire() async def release_db(conn): await db_pool.release(conn) async def log_message(chat_id: int, message: str, success: bool, error_message: str = None): """Log sent message to database""" conn = await db_pool.acquire() try: await conn.execute( "INSERT INTO telegram_messages (chat_id, message, success, error_message) VALUES ($1, $2, $3, $4)", chat_id, message, success, error_message ) finally: await db_pool.release(conn) # Lifespan context @asynccontextmanager async def lifespan(app: FastAPI): global db_pool, bot, app_telegram db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) if TELEGRAM_BOT_TOKEN: bot = Bot(token=TELEGRAM_BOT_TOKEN) app_telegram = Application.builder().token(TELEGRAM_BOT_TOKEN).build() app_telegram.add_handler(CallbackQueryHandler(button_handler)) await app_telegram.initialize() yield if app_telegram: await app_telegram.shutdown() await db_pool.close() app = FastAPI(title="Telegram Bot API", lifespan=lifespan) # API Endpoints @app.get("/health") async def health_check(): return { "status": "ok", "bot_configured": bot is not None, "timestamp": datetime.utcnow().isoformat() } @app.post("/send", dependencies=[Depends(verify_token)]) async def send_message(request: MessageRequest): """Send a message to telegram chat with optional service control buttons""" if not bot: raise HTTPException(status_code=503, detail="Telegram bot not configured") chat_id = request.chat_id or TELEGRAM_CHAT_ID if not chat_id: raise HTTPException(status_code=400, detail="No chat_id provided and no default configured") try: kwargs = { "chat_id": int(chat_id), "text": request.message, "parse_mode": "HTML" } # Add inline buttons for service control if service_id is provided if request.service_id and request.service_name: keyboard = [ [ InlineKeyboardButton("▶️ Start", callback_data=f"service_start_{request.service_id}"), InlineKeyboardButton("⏹️ Stop", callback_data=f"service_stop_{request.service_id}"), InlineKeyboardButton("🔄 Restart", callback_data=f"service_restart_{request.service_id}") ] ] kwargs["reply_markup"] = InlineKeyboardMarkup(keyboard) message = await bot.send_message(**kwargs) await log_message(int(chat_id), request.message, True) return { "status": "sent", "message_id": message.message_id, "chat_id": chat_id } except TelegramError as e: await log_message(int(chat_id), request.message, False, str(e)) raise HTTPException(status_code=500, detail=f"Failed to send message: {str(e)}") @app.post("/send-bulk", dependencies=[Depends(verify_token)]) async def send_bulk_messages(request: BulkMessageRequest): """Send multiple messages to telegram chat""" if not bot: raise HTTPException(status_code=503, detail="Telegram bot not configured") chat_id = request.chat_id or TELEGRAM_CHAT_ID if not chat_id: raise HTTPException(status_code=400, detail="No chat_id provided and no default configured") results = [] for msg in request.messages: try: message = await bot.send_message( chat_id=int(chat_id), text=msg, parse_mode='HTML' ) await log_message(int(chat_id), msg, True) results.append({ "status": "sent", "message_id": message.message_id, "message": msg[:50] + "..." if len(msg) > 50 else msg }) except TelegramError as e: await log_message(int(chat_id), msg, False, str(e)) results.append({ "status": "failed", "error": str(e), "message": msg[:50] + "..." if len(msg) > 50 else msg }) return {"results": results, "total": len(results)} @app.get("/messages", dependencies=[Depends(verify_token)]) async def get_message_history(limit: int = 50): """Get message sending history""" conn = await get_db() try: rows = await conn.fetch( "SELECT * FROM telegram_messages ORDER BY sent_at DESC LIMIT $1", limit ) return [dict(row) for row in rows] finally: await release_db(conn) @app.get("/stats", dependencies=[Depends(verify_token)]) async def get_stats(): """Get message statistics""" conn = await get_db() try: total = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages") successful = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = true") failed = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = false") return { "total_messages": total, "successful": successful, "failed": failed, "success_rate": (successful / total * 100) if total > 0 else 0 } finally: await release_db(conn) @app.post("/webhook") async def webhook(update_data: dict): """Handle Telegram webhook updates for button callbacks""" if not app_telegram: raise HTTPException(status_code=503, detail="Telegram app not configured") try: update = Update.de_json(update_data, bot) if update: await app_telegram.process_update(update) return {"status": "ok"} except Exception as e: return {"status": "error", "message": str(e)} @app.post("/test", dependencies=[Depends(verify_token)]) async def test_connection(): """Test telegram bot connection""" if not bot: raise HTTPException(status_code=503, detail="Telegram bot not configured") try: me = await bot.get_me() return { "status": "ok", "bot_username": me.username, "bot_name": me.first_name, "bot_id": me.id } except TelegramError as e: raise HTTPException(status_code=500, detail=f"Bot test failed: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8003)