Files
firegex-traffic-viewer/tests/utils/udpserver.py

95 lines
2.8 KiB
Python

from multiprocessing import Process, Queue
import socket
import queue
import traceback
def _start_udp_server(port, server_queue: Queue, ipv6, verbose):
sock = socket.socket(socket.AF_INET6 if ipv6 else socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("::1" if ipv6 else "127.0.0.1", port))
while True:
try:
bytesAddressPair = sock.recvfrom(4096)
message = bytesAddressPair[0]
address = bytesAddressPair[1]
reply = message # Default to echo
try:
# See if there is a custom reply, but don't block
custom_reply = server_queue.get(block=False)
reply = custom_reply
except queue.Empty:
pass # No custom reply, just echo
if verbose:
print(f"SERVER: sending {reply} to {address}")
sock.sendto(reply, address)
except Exception:
if verbose:
traceback.print_exc()
class UdpServer:
def __init__(self, port, ipv6, proxy_port=None, verbose=False):
self.port = port
self.ipv6 = ipv6
self.proxy_port = proxy_port
self.verbose = verbose
self._server_data_queue = Queue()
self._regen_process()
def _regen_process(self):
self.server = Process(
target=_start_udp_server,
args=[self.port, self._server_data_queue, self.ipv6, self.verbose],
)
def start(self):
self.server.start()
def stop(self):
self.server.terminate()
self.server.join()
self._regen_process()
def connect_client(self):
self.client_sock = socket.socket(
socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_DGRAM
)
self.client_sock.settimeout(1)
self.client_sock.connect(
(
"::1" if self.ipv6 else "127.0.0.1",
self.proxy_port if self.proxy_port else self.port,
)
)
def close_client(self):
if self.client_sock:
self.client_sock.close()
def send_packet(self, packet, server_reply=None):
if self.verbose:
print("CLIENT: ", packet)
if server_reply:
self._server_data_queue.put(server_reply)
self.client_sock.sendall(packet)
def recv_packet(self):
try:
return self.client_sock.recv(4096)
except (TimeoutError, ConnectionResetError):
if self.verbose:
traceback.print_exc()
return False
def sendCheckData(self, data, get_data=False):
self.connect_client()
self.send_packet(data)
received_data = self.recv_packet()
self.close_client()
if get_data:
return received_data
return received_data == data