""" Telegram Bot for A/D Infrastructure Sends notifications to group chat """ import os import asyncio import aiohttp from datetime import datetime from fastapi import FastAPI, HTTPException, Depends, Header from pydantic import BaseModel import asyncpg from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup, Update from telegram.error import TelegramError from contextlib import asynccontextmanager # Configuration DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl") SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") CONTROLLER_API = os.getenv("CONTROLLER_API", "http://controller:8001") # Database pool and bot db_pool = None bot = None update_offset = 0 polling_task = None async def handle_button_click(callback_data: str, chat_id: int, message_id: int): """Handle inline button click""" print(f"[BUTTON] Received callback: {callback_data} from chat {chat_id}") if not callback_data.startswith("service_"): print(f"[BUTTON] Invalid callback prefix: {callback_data}") return # Parse: service_{action}_{id_or_name} parts = callback_data.split("_", 2) if len(parts) != 3: print(f"[BUTTON] Invalid callback format: {callback_data} (got {len(parts)} parts)") return action = parts[1] # start, stop, restart identifier = parts[2] # numeric service_id or 'name_{service_name}' print(f"[BUTTON] Action: {action}, Identifier: {identifier}") # Determine if identifier is service_id (numeric) or service_name (prefixed with 'name_') if identifier.startswith('name_'): # Extract service name from identifier service_name = identifier[5:] # Remove 'name_' prefix service_id = None print(f"[BUTTON] Looking up service by name: {service_name}") # Look up service_id from controller API try: print(f"[BUTTON] Contacting controller at {CONTROLLER_API}/services") async with aiohttp.ClientSession() as session: headers = {"Authorization": f"Bearer {SECRET_TOKEN}"} async with session.get(f"{CONTROLLER_API}/services", headers=headers) as resp: print(f"[BUTTON] Controller response: HTTP {resp.status}") if resp.status == 200: services = await resp.json() print(f"[BUTTON] Found {len(services)} services: {[s.get('name') for s in services]}") # Find service by name for svc in services: if svc.get('name') == service_name: service_id = svc.get('id') print(f"[BUTTON] Matched service '{service_name}' to ID {service_id}") break if not service_id: error_msg = f"❌ Service '{service_name}' not registered in controller" print(f"[BUTTON] {error_msg}") await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=error_msg ) return else: error_msg = f"❌ Failed to fetch services (HTTP {resp.status})" print(f"[BUTTON] {error_msg}") await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=error_msg ) return except Exception as e: error_msg = f"❌ Error: {str(e)[:100]}" print(f"[BUTTON] Exception during service lookup: {str(e)}") await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=error_msg ) await log_message(chat_id, f"Service lookup error", False, str(e)) return else: # Identifier is numeric service_id try: service_id = int(identifier) print(f"[BUTTON] Using numeric service_id: {service_id}") except ValueError: print(f"[BUTTON] Failed to parse service_id from: {identifier}") return try: api_url = f"{CONTROLLER_API}/services/{service_id}/action" print(f"[BUTTON] Executing {action} on service {service_id} at {api_url}") async with aiohttp.ClientSession() as session: headers = {"Authorization": f"Bearer {SECRET_TOKEN}"} data = {"action": action} print(f"[BUTTON] Sending POST request with data: {data}") async with session.post(api_url, json=data, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: response_text = await resp.text() print(f"[BUTTON] Controller response: HTTP {resp.status}") print(f"[BUTTON] Response body: {response_text[:200]}") if resp.status == 200: success_msg = f"✅ Service action '{action}' executed successfully" print(f"[BUTTON] Success: {success_msg}") await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=success_msg ) await log_message(chat_id, f"Action: {action} on service {service_id}", True) else: error_msg = f"❌ Failed: {response_text[:100]}" print(f"[BUTTON] Failed: {error_msg}") await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=error_msg ) await log_message(chat_id, f"Action failed: {action}", False, response_text) except Exception as e: error_msg = f"❌ Exception: {str(e)[:100]}" print(f"[BUTTON] Exception during action execution: {str(e)}") print(f"[BUTTON] Full traceback:", exc_info=True) await log_message(chat_id, f"Action error: {action}", False, str(e)) async def poll_updates(): """Poll Telegram for updates (button clicks)""" global update_offset if not bot: print("[POLLING] Bot not configured, skipping polling") return print("[POLLING] Starting update polling") retry_count = 0 max_retries = 5 while True: try: retry_count = 0 # Reset on successful request print("[POLLING] Calling get_updates...") updates = await bot.get_updates(offset=update_offset, timeout=30, allowed_updates=["callback_query"]) if updates: print(f"[POLLING] Received {len(updates)} updates") for update in updates: update_offset = update.update_id + 1 if update.callback_query: query = update.callback_query print(f"[POLLING] Processing callback query: {query.data}") await handle_button_click( query.data, query.message.chat_id, query.message.message_id ) await bot.answer_callback_query(query.id) print(f"[POLLING] Callback processed successfully") else: print(f"[POLLING] Update ID {update.update_id} is not a callback_query") except asyncio.CancelledError: print("[POLLING] Polling task cancelled") break except Exception as e: retry_count += 1 print(f"[POLLING] Error in polling (attempt {retry_count}/{max_retries}): {type(e).__name__}: {e}") import traceback traceback.print_exc() if "Conflict" in str(e) or "terminated by other getUpdates" in str(e): print("[POLLING] CONFLICT: Another bot instance is polling the same token!") print("[POLLING] Check if another tg-bot container/process is running") print("[POLLING] Waiting 10 seconds before retry...") await asyncio.sleep(10) else: await asyncio.sleep(5) class MessageRequest(BaseModel): message: str chat_id: str = None service_id: int = None service_name: str = None class BulkMessageRequest(BaseModel): messages: list[str] chat_id: str = None # Auth dependency async def verify_token(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization header") token = authorization.replace("Bearer ", "") if token != SECRET_TOKEN: raise HTTPException(status_code=403, detail="Invalid token") return token # Database functions async def get_db(): return await db_pool.acquire() async def release_db(conn): await db_pool.release(conn) async def log_message(chat_id: int, message: str, success: bool, error_message: str = None): """Log sent message to database""" conn = await db_pool.acquire() try: await conn.execute( "INSERT INTO telegram_messages (chat_id, message, success, error_message) VALUES ($1, $2, $3, $4)", chat_id, message, success, error_message ) finally: await db_pool.release(conn) # Lifespan context @asynccontextmanager async def lifespan(app: FastAPI): global db_pool, bot, polling_task print("[STARTUP] Initializing Telegram Bot API") print(f"[STARTUP] DATABASE_URL: {DATABASE_URL}") print(f"[STARTUP] CONTROLLER_API: {CONTROLLER_API}") print(f"[STARTUP] TELEGRAM_BOT_TOKEN configured: {bool(TELEGRAM_BOT_TOKEN)}") print(f"[STARTUP] TELEGRAM_CHAT_ID: {TELEGRAM_CHAT_ID}") db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) print("[STARTUP] Database pool created") if TELEGRAM_BOT_TOKEN: bot = Bot(token=TELEGRAM_BOT_TOKEN) print("[STARTUP] Telegram Bot initialized") polling_task = asyncio.create_task(poll_updates()) print("[STARTUP] Polling task created") else: print("[STARTUP] WARNING: TELEGRAM_BOT_TOKEN not configured!") yield print("[SHUTDOWN] Shutting down Telegram Bot API") if polling_task: polling_task.cancel() try: await polling_task except asyncio.CancelledError: pass await db_pool.close() print("[SHUTDOWN] Database pool closed") app = FastAPI(title="Telegram Bot API", lifespan=lifespan) # API Endpoints @app.get("/health") async def health_check(): return { "status": "ok", "bot_configured": bot is not None, "timestamp": datetime.utcnow().isoformat() } @app.post("/send", dependencies=[Depends(verify_token)]) async def send_message(request: MessageRequest): """Send a message to telegram chat with optional service control buttons""" print(f"[SEND] Received message request: {request.message[:50]}...") print(f"[SEND] service_id={request.service_id}, service_name={request.service_name}") if not bot: print("[SEND] ERROR: Bot not configured") raise HTTPException(status_code=503, detail="Telegram bot not configured") chat_id = request.chat_id or TELEGRAM_CHAT_ID if not chat_id: print("[SEND] ERROR: No chat_id provided") raise HTTPException(status_code=400, detail="No chat_id provided and no default configured") try: kwargs = { "chat_id": int(chat_id), "text": request.message, "parse_mode": "HTML" } # Add inline buttons for service control if service_id or service_name is provided if request.service_id or request.service_name: # Use service_id if available, otherwise use service_name prefixed with 'name_' identifier = str(request.service_id) if request.service_id else f"name_{request.service_name}" print(f"[SEND] Adding buttons with identifier: {identifier}") keyboard = [ [ InlineKeyboardButton("▶️ Start", callback_data=f"service_start_{identifier}"), InlineKeyboardButton("⏹️ Stop", callback_data=f"service_stop_{identifier}"), InlineKeyboardButton("🔄 Restart", callback_data=f"service_restart_{identifier}") ] ] kwargs["reply_markup"] = InlineKeyboardMarkup(keyboard) print(f"[SEND] Buttons added to message") else: print(f"[SEND] No buttons - no service_id or service_name provided") print(f"[SEND] Sending message to chat {chat_id}") message = await bot.send_message(**kwargs) await log_message(int(chat_id), request.message, True) print(f"[SEND] Message sent successfully, message_id={message.message_id}") return { "status": "sent", "message_id": message.message_id, "chat_id": chat_id, "has_buttons": bool(request.service_id or request.service_name) } except TelegramError as e: print(f"[SEND] TelegramError: {str(e)}") await log_message(int(chat_id), request.message, False, str(e)) raise HTTPException(status_code=500, detail=f"Failed to send message: {str(e)}") @app.post("/send-bulk", dependencies=[Depends(verify_token)]) async def send_bulk_messages(request: BulkMessageRequest): """Send multiple messages to telegram chat""" if not bot: raise HTTPException(status_code=503, detail="Telegram bot not configured") chat_id = request.chat_id or TELEGRAM_CHAT_ID if not chat_id: raise HTTPException(status_code=400, detail="No chat_id provided and no default configured") results = [] for msg in request.messages: try: message = await bot.send_message( chat_id=int(chat_id), text=msg, parse_mode='HTML' ) await log_message(int(chat_id), msg, True) results.append({ "status": "sent", "message_id": message.message_id, "message": msg[:50] + "..." if len(msg) > 50 else msg }) except TelegramError as e: await log_message(int(chat_id), msg, False, str(e)) results.append({ "status": "failed", "error": str(e), "message": msg[:50] + "..." if len(msg) > 50 else msg }) return {"results": results, "total": len(results)} @app.get("/messages", dependencies=[Depends(verify_token)]) async def get_message_history(limit: int = 50): """Get message sending history""" conn = await get_db() try: rows = await conn.fetch( "SELECT * FROM telegram_messages ORDER BY sent_at DESC LIMIT $1", limit ) return [dict(row) for row in rows] finally: await release_db(conn) @app.get("/stats", dependencies=[Depends(verify_token)]) async def get_stats(): """Get message statistics""" conn = await get_db() try: total = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages") successful = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = true") failed = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = false") return { "total_messages": total, "successful": successful, "failed": failed, "success_rate": (successful / total * 100) if total > 0 else 0 } finally: await release_db(conn) @app.post("/test", dependencies=[Depends(verify_token)]) async def test_connection(): """Test telegram bot connection""" if not bot: raise HTTPException(status_code=503, detail="Telegram bot not configured") try: me = await bot.get_me() return { "status": "ok", "bot_username": me.username, "bot_name": me.first_name, "bot_id": me.id } except TelegramError as e: raise HTTPException(status_code=500, detail=f"Bot test failed: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8003)