docs completed
This commit is contained in:
@@ -12,6 +12,8 @@ from watchfiles import awatch, Change
|
||||
fake_ip_header = b"FAKE:IP:TCP:HEADERS:"
|
||||
fake_ip_header_len = len(fake_ip_header)
|
||||
|
||||
MANGLE_WARNING = True
|
||||
|
||||
class LogLevels:
|
||||
INFO = "INFO"
|
||||
WARNING = "WARNING"
|
||||
@@ -36,7 +38,7 @@ def load_level_str(level:str):
|
||||
def log_print(module:str, *args, level:str = LogLevels.INFO, **kwargs):
|
||||
return print(f"{load_level_str(level)}[deep_pink4 bold]\\[nfproxy][/][medium_orchid3 bold]\\[{escape(module)}][/]", *args, **kwargs)
|
||||
|
||||
async def watch_filter_file(filter_file: str, reload_action):
|
||||
async def _watch_filter_file(filter_file: str, reload_action):
|
||||
abs_path = os.path.abspath(filter_file)
|
||||
directory = os.path.dirname(abs_path)
|
||||
# Immediately call the reload action on startup.
|
||||
@@ -57,7 +59,7 @@ async def watch_filter_file(filter_file: str, reload_action):
|
||||
except asyncio.CancelledError:
|
||||
log_print("observer", "Watcher cancelled, stopping.")
|
||||
|
||||
async def forward_and_filter(filter_ctx: dict,
|
||||
async def _forward_and_filter(filter_ctx: dict,
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
is_input: bool,
|
||||
@@ -66,6 +68,7 @@ async def forward_and_filter(filter_ctx: dict,
|
||||
has_to_filter: bool = True):
|
||||
"""Asynchronously forward data from reader to writer applying filters."""
|
||||
try:
|
||||
has_to_drop = False
|
||||
while True:
|
||||
try:
|
||||
data = await reader.read(4096)
|
||||
@@ -73,7 +76,8 @@ async def forward_and_filter(filter_ctx: dict,
|
||||
break
|
||||
if not data:
|
||||
break
|
||||
|
||||
if has_to_drop:
|
||||
continue
|
||||
if has_to_filter:
|
||||
filter_ctx["__firegex_packet_info"] = {
|
||||
"data": data,
|
||||
@@ -128,8 +132,9 @@ async def forward_and_filter(filter_ctx: dict,
|
||||
continue
|
||||
|
||||
if action == DROP.value:
|
||||
log_print("drop-action", "Dropping packet can't be simulated, so the connection will be rejected", level=LogLevels.WARNING)
|
||||
action = REJECT.value
|
||||
log_print("drop-action", "Dropping connection caused by {escape(filter_name)} pyfilter")
|
||||
has_to_drop = True
|
||||
continue
|
||||
|
||||
if action == REJECT.value:
|
||||
log_print("reject-action", f"Rejecting connection caused by {escape(filter_name)} pyfilter")
|
||||
@@ -146,9 +151,10 @@ async def forward_and_filter(filter_ctx: dict,
|
||||
await writer.drain()
|
||||
continue
|
||||
log_print("mangle", f"Mangling packet caused by {escape(filter_name)} pyfilter")
|
||||
log_print("mangle",
|
||||
"In the real execution mangling is not so stable as the simulation does, l4_data can be different by data",
|
||||
level=LogLevels.WARNING)
|
||||
if MANGLE_WARNING:
|
||||
log_print("mangle",
|
||||
"In the real execution mangling is not so stable as the simulation does, l4_data can be different by data",
|
||||
level=LogLevels.WARNING)
|
||||
writer.write(mangled_packet[fake_ip_header_len:])
|
||||
await writer.drain()
|
||||
continue
|
||||
@@ -170,7 +176,7 @@ async def forward_and_filter(filter_ctx: dict,
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def handle_connection(
|
||||
async def _handle_connection(
|
||||
reader: asyncio.StreamReader, writer: asyncio.StreamWriter, filter_code: str,
|
||||
target_ip: str, target_port: int, ipv6: bool):
|
||||
"""Handle a new incoming connection and create a remote connection."""
|
||||
@@ -198,8 +204,8 @@ async def handle_connection(
|
||||
traceback.print_exc()
|
||||
filter_ctx = {}
|
||||
# Create asynchronous tasks for bidirectional forwarding.
|
||||
task1 = asyncio.create_task(forward_and_filter(filter_ctx, reader, remote_writer, True, ipv6, True, True))
|
||||
task2 = asyncio.create_task(forward_and_filter(filter_ctx, remote_reader, writer, False, ipv6, True, True))
|
||||
task1 = asyncio.create_task(_forward_and_filter(filter_ctx, reader, remote_writer, True, ipv6, True, True))
|
||||
task2 = asyncio.create_task(_forward_and_filter(filter_ctx, remote_reader, writer, False, ipv6, True, True))
|
||||
try:
|
||||
await asyncio.gather(task1, task2)
|
||||
except (KeyboardInterrupt, asyncio.CancelledError):
|
||||
@@ -219,7 +225,7 @@ async def _execute_proxy(
|
||||
"""Start the asyncio-based TCP proxy server."""
|
||||
addr_family = socket.AF_INET6 if ipv6 else socket.AF_INET
|
||||
server = await asyncio.start_server(
|
||||
lambda r, w: handle_connection(r, w, filter_code, target_ip, target_port, ipv6),
|
||||
lambda r, w: _handle_connection(r, w, filter_code, target_ip, target_port, ipv6),
|
||||
local_ip, local_port, family=addr_family)
|
||||
log_print("listener", f"TCP proxy listening on {escape(local_ip)}:{local_port} and forwarding to -> {escape(target_ip)}:{target_port}")
|
||||
async with server:
|
||||
@@ -286,7 +292,7 @@ def run_proxy_simulation(filter_file:str, proto:str, target_ip:str, target_port:
|
||||
proxy_process.start()
|
||||
|
||||
try:
|
||||
asyncio.run(watch_filter_file(filter_file, reload_proxy_proc))
|
||||
asyncio.run(_watch_filter_file(filter_file, reload_proxy_proc))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user