Refresh web UI and make ML imports optional
All checks were successful
ci / tests (push) Successful in 21s
All checks were successful
ci / tests (push) Successful in 21s
This commit is contained in:
@@ -52,19 +52,20 @@ def test_api_game_flow() -> None:
|
||||
token = response.json()["token"]
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
response = client.post("/api/games", json={"name": "Arena", "max_players": 2}, headers=headers)
|
||||
response = client.post("/api/games", json={"name": "Arena", "max_players": 3}, headers=headers)
|
||||
assert response.status_code == 200
|
||||
game_id = response.json()["id"]
|
||||
|
||||
response = client.post(f"/api/games/{game_id}/join", headers=headers)
|
||||
assert response.status_code == 200
|
||||
|
||||
response = client.post(
|
||||
f"/api/games/{game_id}/add-ai",
|
||||
json={"ai_type": "random"},
|
||||
headers=headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
for _ in range(2):
|
||||
response = client.post(
|
||||
f"/api/games/{game_id}/add-ai",
|
||||
json={"ai_type": "random"},
|
||||
headers=headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
response = client.post(f"/api/games/{game_id}/start", headers=headers)
|
||||
assert response.status_code == 200
|
||||
|
||||
86
tests/e2e/test_user_lifecycle.py
Normal file
86
tests/e2e/test_user_lifecycle.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import List
|
||||
|
||||
import httpx
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from services.api import app as api_app
|
||||
from services.analytics import app as analytics_app
|
||||
from services.game import app as game_app
|
||||
|
||||
|
||||
def _install_asgi_clients() -> List[httpx.AsyncClient]:
|
||||
created: List[httpx.AsyncClient] = []
|
||||
|
||||
def make_client(app: FastAPI, base_url: str) -> httpx.AsyncClient:
|
||||
client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url=base_url)
|
||||
created.append(client)
|
||||
return client
|
||||
|
||||
api_app.clients["game"] = make_client(game_app.app, "http://game")
|
||||
api_app.clients["analytics"] = make_client(analytics_app.app, "http://analytics")
|
||||
|
||||
dummy_ai = FastAPI()
|
||||
|
||||
@dummy_ai.get("/models")
|
||||
def _models():
|
||||
return {"models": []}
|
||||
|
||||
@dummy_ai.post("/act")
|
||||
def _act():
|
||||
return {"action": {"type": "end_turn", "payload": {}}, "debug": {}}
|
||||
|
||||
api_app.clients["ai"] = make_client(dummy_ai, "http://ai")
|
||||
return created
|
||||
|
||||
|
||||
def _close_clients(clients: List[httpx.AsyncClient]) -> None:
|
||||
for client in clients:
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
except RuntimeError:
|
||||
asyncio.run(client.aclose())
|
||||
else:
|
||||
loop.run_until_complete(client.aclose())
|
||||
|
||||
|
||||
def test_full_user_flow() -> None:
|
||||
with TestClient(api_app.app) as client:
|
||||
created = _install_asgi_clients()
|
||||
try:
|
||||
# Register + login
|
||||
response = client.post("/api/auth/register", json={"username": "user_flow", "password": "secret"})
|
||||
assert response.status_code == 200
|
||||
token = response.json()["token"]
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
# Create lobby and seat host
|
||||
response = client.post("/api/games", json={"name": "Flow", "max_players": 3}, headers=headers)
|
||||
assert response.status_code == 200
|
||||
game_id = response.json()["id"]
|
||||
|
||||
response = client.post(f"/api/games/{game_id}/join", headers=headers)
|
||||
assert response.status_code == 200
|
||||
payload = response.json()
|
||||
assert payload["players"][0]["name"] == "user_flow"
|
||||
|
||||
# Seat AI opponent and start the game
|
||||
for _ in range(2):
|
||||
response = client.post(
|
||||
f"/api/games/{game_id}/add-ai",
|
||||
json={"ai_type": "random"},
|
||||
headers=headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
response = client.post(f"/api/games/{game_id}/start", headers=headers)
|
||||
assert response.status_code == 200
|
||||
state = response.json()
|
||||
assert state["status"] == "running"
|
||||
assert state["game"]["phase"] == "setup_round_one"
|
||||
assert len(state["legal_actions"]) > 0
|
||||
finally:
|
||||
_close_clients(created)
|
||||
@@ -9,7 +9,7 @@ from tests.utils import find_initial_spot, find_connected_edge, grant_resources
|
||||
|
||||
|
||||
def make_controller(seed: int = 7) -> CLIController:
|
||||
game = Game(GameConfig(player_names=["Alice", "Bob"], seed=seed))
|
||||
game = Game(GameConfig(player_names=["Alice", "Bob", "Cara"], seed=seed))
|
||||
controller = CLIController(game)
|
||||
_complete_setup(controller)
|
||||
return controller
|
||||
|
||||
@@ -7,10 +7,26 @@ from catan.ml.agents import RandomAgent
|
||||
from tests.utils import find_initial_spot
|
||||
|
||||
|
||||
def _resolve_robber(controller: CLIController) -> None:
|
||||
game = controller.game
|
||||
board = game.board
|
||||
for hex_id, tile in board.hexes.items():
|
||||
if hex_id == board.robber_hex:
|
||||
continue
|
||||
victims = {
|
||||
board.corners[corner].owner
|
||||
for corner in tile.corners
|
||||
if board.corners[corner].owner and board.corners[corner].owner != game.current_player.name
|
||||
}
|
||||
victim = next(iter(victims)) if victims else None
|
||||
game.move_robber(hex_id, victim)
|
||||
return
|
||||
|
||||
|
||||
def test_human_vs_random_bot_session() -> None:
|
||||
controller = CLIController(
|
||||
Game(GameConfig(player_names=["Human", "Bot"], seed=9)),
|
||||
bots={"Bot": RandomAgent(seed=3)},
|
||||
Game(GameConfig(player_names=["Human", "Bot", "Companion"], seed=9)),
|
||||
bots={"Bot": RandomAgent(seed=3), "Companion": RandomAgent(seed=5)},
|
||||
)
|
||||
|
||||
# Complete setup: human manually places, bot uses random agent.
|
||||
@@ -22,13 +38,16 @@ def test_human_vs_random_bot_session() -> None:
|
||||
else:
|
||||
controller._run_bot_turn(current) # noqa: SLF001 - exercising CLI internals
|
||||
|
||||
# Play a few turns: human rolls and ends, bot plays automatically.
|
||||
for _ in range(3):
|
||||
# Play a few rounds: human rolls and ends, bots play automatically.
|
||||
for _ in range(2):
|
||||
assert controller.game.current_player.name == "Human"
|
||||
controller.handle_command("roll")
|
||||
if controller.game.robber_move_required:
|
||||
_resolve_robber(controller)
|
||||
controller.handle_command("end")
|
||||
assert controller.game.current_player.name == "Bot"
|
||||
controller._run_bot_turn("Bot") # noqa: SLF001
|
||||
for name in ("Bot", "Companion"):
|
||||
assert controller.game.current_player.name == name
|
||||
controller._run_bot_turn(name) # noqa: SLF001
|
||||
|
||||
# Ensure bot activity is recorded in history.
|
||||
assert any(log.player == "Bot" for log in controller.game.history)
|
||||
assert any(log.player in {"Bot", "Companion"} for log in controller.game.history)
|
||||
|
||||
51
tests/test_dev_rules.py
Normal file
51
tests/test_dev_rules.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from catan.data import DevelopmentCard
|
||||
from catan.game import Game, GameConfig, Phase
|
||||
|
||||
|
||||
def _prepare_turn(game: Game) -> None:
|
||||
game.phase = Phase.MAIN
|
||||
game.has_rolled = True
|
||||
game.robber_move_required = False
|
||||
game.pending_discards.clear()
|
||||
|
||||
|
||||
def _other_hex(game: Game, exclude: int) -> int:
|
||||
return next(h for h in game.board.hexes if h != exclude)
|
||||
|
||||
|
||||
def test_only_one_dev_card_per_turn() -> None:
|
||||
game = Game(GameConfig(player_names=["A", "B", "C", "D"]))
|
||||
_prepare_turn(game)
|
||||
player = game.current_player
|
||||
player.dev_cards = [DevelopmentCard.KNIGHT, DevelopmentCard.KNIGHT]
|
||||
player.new_dev_cards.clear()
|
||||
|
||||
first_target = _other_hex(game, game.board.robber_hex)
|
||||
game.play_knight(first_target, None)
|
||||
|
||||
second_target = _other_hex(game, first_target)
|
||||
with pytest.raises(ValueError, match="development card"):
|
||||
game.play_knight(second_target, None)
|
||||
|
||||
|
||||
def test_dev_card_limit_resets_next_turn() -> None:
|
||||
game = Game(GameConfig(player_names=["A", "B", "C", "D"]))
|
||||
_prepare_turn(game)
|
||||
player = game.current_player
|
||||
player.dev_cards = [DevelopmentCard.KNIGHT]
|
||||
player.new_dev_cards.clear()
|
||||
target_hex = _other_hex(game, game.board.robber_hex)
|
||||
game.play_knight(target_hex, None)
|
||||
game.end_turn()
|
||||
|
||||
_prepare_turn(game)
|
||||
next_player = game.current_player
|
||||
next_player.dev_cards = [DevelopmentCard.KNIGHT]
|
||||
next_player.new_dev_cards.clear()
|
||||
next_target = _other_hex(game, game.board.robber_hex)
|
||||
# Should not raise now that it's a new turn.
|
||||
game.play_knight(next_target, None)
|
||||
@@ -15,7 +15,7 @@ def _auto_setup(env: CatanEnv) -> None:
|
||||
|
||||
|
||||
def test_bank_trade_action_and_port_ratio() -> None:
|
||||
env = CatanEnv(GameConfig(player_names=["Alice", "Bob"], seed=5))
|
||||
env = CatanEnv(GameConfig(player_names=["Alice", "Bob", "Cara"], seed=5))
|
||||
_auto_setup(env)
|
||||
game = env.game
|
||||
game.phase = Phase.MAIN
|
||||
@@ -58,12 +58,13 @@ def test_bank_trade_action_and_port_ratio() -> None:
|
||||
|
||||
|
||||
def test_player_trade_actions_available() -> None:
|
||||
env = CatanEnv(GameConfig(player_names=["Alice", "Bob"], seed=11))
|
||||
env = CatanEnv(GameConfig(player_names=["Alice", "Bob", "Cara"], seed=11))
|
||||
_auto_setup(env)
|
||||
game = env.game
|
||||
game.phase = Phase.MAIN
|
||||
game.has_rolled = True
|
||||
alice, bob = game.players
|
||||
alice = game.player_by_name("Alice")
|
||||
bob = game.player_by_name("Bob")
|
||||
alice.resources[Resource.WOOL] = 1
|
||||
bob.resources[Resource.ORE] = 1
|
||||
|
||||
|
||||
152
tests/test_rulebook.py
Normal file
152
tests/test_rulebook.py
Normal file
@@ -0,0 +1,152 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from catan.data import Resource
|
||||
from catan.game import Game, GameConfig, Phase
|
||||
|
||||
|
||||
def _prepare_main_phase(game: Game) -> None:
|
||||
game.phase = Phase.MAIN
|
||||
game.has_rolled = True
|
||||
game.pending_discards.clear()
|
||||
game.robber_move_required = False
|
||||
|
||||
|
||||
def _assign_settlement(game: Game, player, hex_id: int, corner_index: int = 0) -> None:
|
||||
corner_id = game.board.hexes[hex_id].corners[corner_index]
|
||||
corner = game.board.corners[corner_id]
|
||||
corner.owner = player.name
|
||||
corner.building = "settlement"
|
||||
player.settlements.add(corner_id)
|
||||
|
||||
|
||||
def _find_resource_hex(game: Game) -> Tuple[int, Resource, int]:
|
||||
for tile in game.board.hexes.values():
|
||||
if tile.resource != Resource.DESERT and tile.number_token:
|
||||
return tile.id, tile.resource, tile.number_token
|
||||
raise RuntimeError("No producible hex found")
|
||||
|
||||
|
||||
def _find_edge_path(game: Game, length: int) -> Tuple[List[Tuple[int, int, int]], List[Tuple[int, int, int]]]:
|
||||
board = game.board
|
||||
|
||||
def dfs(corner_id, path_edges, path_corners, visited_edges):
|
||||
if len(path_edges) == length:
|
||||
return list(path_edges), list(path_corners)
|
||||
for edge_id in board.corners[corner_id].edges:
|
||||
if edge_id in visited_edges:
|
||||
continue
|
||||
visited_edges.add(edge_id)
|
||||
edge = board.edges[edge_id]
|
||||
next_corner = next(c for c in edge.corners if c != corner_id)
|
||||
path_edges.append(edge_id)
|
||||
path_corners.append(next_corner)
|
||||
result = dfs(next_corner, path_edges, path_corners, visited_edges)
|
||||
if result:
|
||||
return result
|
||||
path_edges.pop()
|
||||
path_corners.pop()
|
||||
visited_edges.remove(edge_id)
|
||||
return None
|
||||
|
||||
for start in board.corners:
|
||||
result = dfs(start, [], [start], set())
|
||||
if result:
|
||||
return result
|
||||
raise RuntimeError("Unable to find connected edge path")
|
||||
|
||||
|
||||
def test_resource_shortage_blocks_multiple_players() -> None:
|
||||
game = Game(GameConfig(player_names=["A", "B", "C"]))
|
||||
hex_id, resource, token = _find_resource_hex(game)
|
||||
_assign_settlement(game, game.players[0], hex_id, 0)
|
||||
_assign_settlement(game, game.players[1], hex_id, 1)
|
||||
game.bank[resource] = 1
|
||||
|
||||
game._distribute_resources(token) # noqa: SLF001 - internal rule helper
|
||||
|
||||
assert game.players[0].resources[resource] == 0
|
||||
assert game.players[1].resources[resource] == 0
|
||||
assert game.bank[resource] == 1
|
||||
|
||||
|
||||
def test_resource_shortage_single_player_gets_remaining() -> None:
|
||||
game = Game(GameConfig(player_names=["A", "B", "C"]))
|
||||
hex_id, resource, token = _find_resource_hex(game)
|
||||
_assign_settlement(game, game.players[0], hex_id, 0)
|
||||
game.bank[resource] = 1
|
||||
|
||||
game._distribute_resources(token) # noqa: SLF001
|
||||
|
||||
assert game.players[0].resources[resource] == 1
|
||||
assert game.bank[resource] == 0
|
||||
|
||||
|
||||
def test_robber_requires_valid_victim() -> None:
|
||||
game = Game(GameConfig(player_names=["A", "B", "C"]))
|
||||
_prepare_main_phase(game)
|
||||
other = game.players[1]
|
||||
target_hex = next(h for h in game.board.hexes if h != game.board.robber_hex)
|
||||
_assign_settlement(game, other, target_hex, 0)
|
||||
other.resources[Resource.BRICK] = 1
|
||||
game.robber_move_required = True
|
||||
|
||||
with pytest.raises(ValueError, match="victim"):
|
||||
game.move_robber(target_hex)
|
||||
with pytest.raises(ValueError, match="adjacent"):
|
||||
game.move_robber(target_hex, victim=game.players[2].name)
|
||||
|
||||
empty_hex = next(
|
||||
hid
|
||||
for hid in game.board.hexes
|
||||
if hid not in {target_hex, game.board.robber_hex}
|
||||
and all(game.board.corners[c].owner is None for c in game.board.hexes[hid].corners)
|
||||
)
|
||||
game.robber_move_required = True
|
||||
with pytest.raises(ValueError, match="No valid victim"):
|
||||
game.move_robber(empty_hex, victim=other.name)
|
||||
|
||||
game.robber_move_required = True
|
||||
game.move_robber(target_hex, victim=other.name)
|
||||
assert not game.robber_move_required
|
||||
|
||||
|
||||
def test_trade_restrictions_enforced() -> None:
|
||||
game = Game(GameConfig(player_names=["A", "B", "C"]))
|
||||
_prepare_main_phase(game)
|
||||
player = game.current_player
|
||||
opponent = game.players[1]
|
||||
player.resources[Resource.BRICK] = 2
|
||||
opponent.resources[Resource.WOOL] = 2
|
||||
|
||||
with pytest.raises(ValueError, match="yourself"):
|
||||
game.trade_with_player(player.name, {Resource.BRICK: 1}, {Resource.WOOL: 1})
|
||||
with pytest.raises(ValueError, match="both ways"):
|
||||
game.trade_with_player(opponent.name, {Resource.BRICK: 1}, {})
|
||||
with pytest.raises(ValueError, match="positive"):
|
||||
game.trade_with_player(opponent.name, {Resource.BRICK: 0}, {Resource.WOOL: 1})
|
||||
with pytest.raises(ValueError, match="matching resources"):
|
||||
game.trade_with_player(opponent.name, {Resource.BRICK: 1}, {Resource.BRICK: 1})
|
||||
|
||||
game.trade_with_player(opponent.name, {Resource.BRICK: 1}, {Resource.WOOL: 1})
|
||||
assert player.resources[Resource.WOOL] == 1
|
||||
assert opponent.resources[Resource.BRICK] == 1
|
||||
|
||||
|
||||
def test_settlement_breaks_longest_road() -> None:
|
||||
game = Game(GameConfig(player_names=["A", "B", "C"]))
|
||||
player = game.players[0]
|
||||
rival = game.players[1]
|
||||
edges, corners = _find_edge_path(game, 5)
|
||||
for edge_id in edges:
|
||||
game.board.edges[edge_id].owner = player.name
|
||||
player.roads.add(edge_id)
|
||||
game._update_longest_road() # noqa: SLF001
|
||||
assert player.longest_road
|
||||
|
||||
block_corner = corners[2]
|
||||
game._build_settlement(rival, block_corner, require_connection=False) # noqa: SLF001
|
||||
assert not player.longest_road
|
||||
Reference in New Issue
Block a user