Files
ad-infr-control/tg-bot/main.py
ilyastar9999 154c0cda75 asd
2025-12-04 14:19:15 +03:00

341 lines
12 KiB
Python

"""
Telegram Bot for A/D Infrastructure
Sends notifications to group chat
"""
import os
import asyncio
import aiohttp
from datetime import datetime
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
import asyncpg
from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.error import TelegramError
from contextlib import asynccontextmanager
# Configuration
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl")
SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
CONTROLLER_API = os.getenv("CONTROLLER_API", "http://controller:8001")
# Database pool and bot
db_pool = None
bot = None
update_offset = 0
polling_task = None
async def handle_button_click(callback_data: str, chat_id: int, message_id: int):
"""Handle inline button click"""
if not callback_data.startswith("service_"):
return
# Parse: service_{action}_{id_or_name}
parts = callback_data.split("_", 2)
if len(parts) != 3:
return
action = parts[1] # start, stop, restart
identifier = parts[2] # numeric service_id or 'name_{service_name}'
# Determine if identifier is service_id (numeric) or service_name (prefixed with 'name_')
if identifier.startswith('name_'):
# Extract service name from identifier
service_name = identifier[5:] # Remove 'name_' prefix
service_id = None
# Look up service_id from controller API
try:
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {SECRET_TOKEN}"}
async with session.get(f"{CONTROLLER_API}/services", headers=headers) as resp:
if resp.status == 200:
services = await resp.json()
# Find service by name
for svc in services:
if svc.get('name') == service_name:
service_id = svc.get('id')
break
if not service_id:
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"❌ Service '{service_name}' not registered in controller"
)
return
else:
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"❌ Failed to fetch services (HTTP {resp.status})"
)
return
except Exception as e:
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"❌ Error: {str(e)[:100]}"
)
await log_message(chat_id, f"Service lookup error", False, str(e))
return
else:
# Identifier is numeric service_id
try:
service_id = int(identifier)
except ValueError:
return
try:
async with aiohttp.ClientSession() as session:
api_url = f"{CONTROLLER_API}/services/{service_id}/action"
headers = {"Authorization": f"Bearer {SECRET_TOKEN}"}
data = {"action": action}
async with session.post(api_url, json=data, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"✅ Service action '{action}' executed successfully"
)
await log_message(chat_id, f"Action: {action} on service {service_id}", True)
else:
error_text = await resp.text()
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"❌ Failed: {error_text[:100]}"
)
await log_message(chat_id, f"Action failed: {action}", False, error_text)
except Exception as e:
await log_message(chat_id, f"Action error: {action}", False, str(e))
async def poll_updates():
"""Poll Telegram for updates (button clicks)"""
global update_offset
if not bot:
return
while True:
try:
updates = await bot.get_updates(offset=update_offset, timeout=30)
for update in updates:
update_offset = update.update_id + 1
if update.callback_query:
query = update.callback_query
await handle_button_click(
query.data,
query.message.chat_id,
query.message.message_id
)
await bot.answer_callback_query(query.id)
except asyncio.CancelledError:
break
except Exception as e:
print(f"Error in polling: {e}")
await asyncio.sleep(5)
class MessageRequest(BaseModel):
message: str
chat_id: str = None
service_id: int = None
service_name: str = None
class BulkMessageRequest(BaseModel):
messages: list[str]
chat_id: str = None
# Auth dependency
async def verify_token(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid authorization header")
token = authorization.replace("Bearer ", "")
if token != SECRET_TOKEN:
raise HTTPException(status_code=403, detail="Invalid token")
return token
# Database functions
async def get_db():
return await db_pool.acquire()
async def release_db(conn):
await db_pool.release(conn)
async def log_message(chat_id: int, message: str, success: bool, error_message: str = None):
"""Log sent message to database"""
conn = await db_pool.acquire()
try:
await conn.execute(
"INSERT INTO telegram_messages (chat_id, message, success, error_message) VALUES ($1, $2, $3, $4)",
chat_id, message, success, error_message
)
finally:
await db_pool.release(conn)
# Lifespan context
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_pool, bot, polling_task
db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10)
if TELEGRAM_BOT_TOKEN:
bot = Bot(token=TELEGRAM_BOT_TOKEN)
polling_task = asyncio.create_task(poll_updates())
yield
if polling_task:
polling_task.cancel()
try:
await polling_task
except asyncio.CancelledError:
pass
await db_pool.close()
app = FastAPI(title="Telegram Bot API", lifespan=lifespan)
# API Endpoints
@app.get("/health")
async def health_check():
return {
"status": "ok",
"bot_configured": bot is not None,
"timestamp": datetime.utcnow().isoformat()
}
@app.post("/send", dependencies=[Depends(verify_token)])
async def send_message(request: MessageRequest):
"""Send a message to telegram chat with optional service control buttons"""
if not bot:
raise HTTPException(status_code=503, detail="Telegram bot not configured")
chat_id = request.chat_id or TELEGRAM_CHAT_ID
if not chat_id:
raise HTTPException(status_code=400, detail="No chat_id provided and no default configured")
try:
kwargs = {
"chat_id": int(chat_id),
"text": request.message,
"parse_mode": "HTML"
}
# Add inline buttons for service control if service_id or service_name is provided
if request.service_id or request.service_name:
# Use service_id if available, otherwise use service_name prefixed with 'name_'
identifier = str(request.service_id) if request.service_id else f"name_{request.service_name}"
keyboard = [
[
InlineKeyboardButton("▶️ Start", callback_data=f"service_start_{identifier}"),
InlineKeyboardButton("⏹️ Stop", callback_data=f"service_stop_{identifier}"),
InlineKeyboardButton("🔄 Restart", callback_data=f"service_restart_{identifier}")
]
]
kwargs["reply_markup"] = InlineKeyboardMarkup(keyboard)
message = await bot.send_message(**kwargs)
await log_message(int(chat_id), request.message, True)
return {
"status": "sent",
"message_id": message.message_id,
"chat_id": chat_id
}
except TelegramError as e:
await log_message(int(chat_id), request.message, False, str(e))
raise HTTPException(status_code=500, detail=f"Failed to send message: {str(e)}")
@app.post("/send-bulk", dependencies=[Depends(verify_token)])
async def send_bulk_messages(request: BulkMessageRequest):
"""Send multiple messages to telegram chat"""
if not bot:
raise HTTPException(status_code=503, detail="Telegram bot not configured")
chat_id = request.chat_id or TELEGRAM_CHAT_ID
if not chat_id:
raise HTTPException(status_code=400, detail="No chat_id provided and no default configured")
results = []
for msg in request.messages:
try:
message = await bot.send_message(
chat_id=int(chat_id),
text=msg,
parse_mode='HTML'
)
await log_message(int(chat_id), msg, True)
results.append({
"status": "sent",
"message_id": message.message_id,
"message": msg[:50] + "..." if len(msg) > 50 else msg
})
except TelegramError as e:
await log_message(int(chat_id), msg, False, str(e))
results.append({
"status": "failed",
"error": str(e),
"message": msg[:50] + "..." if len(msg) > 50 else msg
})
return {"results": results, "total": len(results)}
@app.get("/messages", dependencies=[Depends(verify_token)])
async def get_message_history(limit: int = 50):
"""Get message sending history"""
conn = await get_db()
try:
rows = await conn.fetch(
"SELECT * FROM telegram_messages ORDER BY sent_at DESC LIMIT $1",
limit
)
return [dict(row) for row in rows]
finally:
await release_db(conn)
@app.get("/stats", dependencies=[Depends(verify_token)])
async def get_stats():
"""Get message statistics"""
conn = await get_db()
try:
total = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages")
successful = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = true")
failed = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = false")
return {
"total_messages": total,
"successful": successful,
"failed": failed,
"success_rate": (successful / total * 100) if total > 0 else 0
}
finally:
await release_db(conn)
@app.post("/test", dependencies=[Depends(verify_token)])
async def test_connection():
"""Test telegram bot connection"""
if not bot:
raise HTTPException(status_code=503, detail="Telegram bot not configured")
try:
me = await bot.get_me()
return {
"status": "ok",
"bot_username": me.username,
"bot_name": me.first_name,
"bot_id": me.id
}
except TelegramError as e:
raise HTTPException(status_code=500, detail=f"Bot test failed: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8003)