diff --git a/backend/requirements.txt b/backend/requirements.txt
index 556bd93..55d6516 100644
--- a/backend/requirements.txt
+++ b/backend/requirements.txt
@@ -4,5 +4,5 @@ uvicorn[standard]
psutil
python-jose[cryptography]
python-socketio
-git+https://github.com/google/brotli.git
+brotli
#git+https://salsa.debian.org/pkg-netfilter-team/pkg-nftables#egg=nftables&subdirectory=py
diff --git a/frontend/src/components/NavBar/index.tsx b/frontend/src/components/NavBar/index.tsx
index bd7aec4..13d84f1 100644
--- a/frontend/src/components/NavBar/index.tsx
+++ b/frontend/src/components/NavBar/index.tsx
@@ -9,14 +9,14 @@ import { getMainPath } from "../../js/utils";
import { BsRegex } from "react-icons/bs";
function NavBarButton({ navigate, closeNav, name, icon, color, disabled, onClick }:
- { navigate?: string, closeNav: () => void, name:string, icon:any, color:MantineColor, disabled?:boolean, onClick?:CallableFunction }) {
+ { navigate?: string, closeNav: () => void, name: string, icon: any, color: MantineColor, disabled?: boolean, onClick?: CallableFunction }) {
const navigator = useNavigate()
return {
- if(navigate){navigator(`/${navigate}`);closeNav()}
- if (onClick) onClick()
- }} disabled={disabled}>
+ className={`firegex__navbar__unstyled_button${navigate == getMainPath() ? " selected" : ""}${disabled ? " disabled" : ""}`}
+ onClick={() => {
+ if (navigate) { navigator(`/${navigate}`); closeNav() }
+ if (onClick) onClick()
+ }} disabled={disabled}>
{icon}
@@ -24,7 +24,7 @@ function NavBarButton({ navigate, closeNav, name, icon, color, disabled, onClick
{name}
-}
+}
export default function NavBar() {
const [toggle, setToggleState] = useState(false);
@@ -35,17 +35,17 @@ export default function NavBar() {
Options βοΈ
-
+
} />
} />
} />
-
+ } />
+ {/*
Experimental Features π§ͺ
-
- } />
+ */}
-
+
}
diff --git a/tests/api_test.py b/tests/api_test.py
index c4ece76..71baf7b 100644
--- a/tests/api_test.py
+++ b/tests/api_test.py
@@ -4,92 +4,126 @@ from utils.firegexapi import FiregexAPI
import argparse
import secrets
-parser = argparse.ArgumentParser()
-parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/")
-parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password')
-args = parser.parse_args()
-sep()
-puts("Testing will start on ", color=colors.cyan, end="")
-puts(f"{args.address}", color=colors.yellow)
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--address",
+ "-a",
+ type=str,
+ required=False,
+ help="Address of firegex backend",
+ default="http://127.0.0.1:4444/",
+ )
+ parser.add_argument(
+ "--password", "-p", type=str, required=True, help="Firegex password"
+ )
+ args = parser.parse_args()
+ sep()
+ puts("Testing will start on ", color=colors.cyan, end="")
+ puts(f"{args.address}", color=colors.yellow)
-firegex = FiregexAPI(args.address)
+ firegex = FiregexAPI(args.address)
-#Connect to Firegex
-if firegex.status()["status"] == "init":
- if (firegex.set_password(args.password)):
- puts(f"Sucessfully set password to {args.password} β", color=colors.green)
+ # Connect to Firegex
+ if firegex.status()["status"] == "init":
+ if firegex.set_password(args.password):
+ puts(f"Sucessfully set password to {args.password} β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Unknown response or password already put β",
+ color=colors.red,
+ )
+ exit(1)
else:
- puts("Test Failed: Unknown response or password already put β", color=colors.red)
- exit(1)
-else:
- if (firegex.login(args.password)):
- puts("Sucessfully logged in β", color=colors.green)
+ if firegex.login(args.password):
+ puts("Sucessfully logged in β", color=colors.green)
+ else:
+ puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
+ exit(1)
+
+ if firegex.status()["loggined"]:
+ puts("Correctly received status β", color=colors.green)
else:
- puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
+ puts("Test Failed: Unknown response or not logged inβ", color=colors.red)
exit(1)
-if(firegex.status()["loggined"]):
- puts("Correctly received status β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or not logged inβ", color=colors.red)
- exit(1)
+ # Prepare second instance
+ firegex2 = FiregexAPI(args.address)
+ if firegex2.login(args.password):
+ puts("Sucessfully logged in on second instance β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Unknown response or wrong passowrd on second instance β",
+ color=colors.red,
+ )
+ exit(1)
-#Prepare second instance
-firegex2 = FiregexAPI(args.address)
-if (firegex2.login(args.password)):
- puts("Sucessfully logged in on second instance β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or wrong passowrd on second instance β", color=colors.red)
- exit(1)
+ if firegex2.status()["loggined"]:
+ puts("Correctly received status on second instanceβ", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Unknown response or not logged in on second instanceβ",
+ color=colors.red,
+ )
+ exit(1)
-if(firegex2.status()["loggined"]):
- puts("Correctly received status on second instanceβ", color=colors.green)
-else:
- puts("Test Failed: Unknown response or not logged in on second instanceβ", color=colors.red)
- exit(1)
+ # Change password
+ new_password = secrets.token_hex(10)
+ if firegex.change_password(new_password, expire=True):
+ puts(f"Sucessfully changed password to {new_password} β", color=colors.green)
+ else:
+ puts("Test Failed: Coundl't change the password β", color=colors.red)
+ exit(1)
-#Change password
-new_password = secrets.token_hex(10)
-if (firegex.change_password(new_password,expire=True)):
- puts(f"Sucessfully changed password to {new_password} β", color=colors.green)
-else:
- puts("Test Failed: Coundl't change the password β", color=colors.red)
- exit(1)
+ # Check if we are still logged in
+ if firegex.status()["loggined"]:
+ puts("Correctly received status after password change β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Unknown response or not logged after password change β",
+ color=colors.red,
+ )
+ exit(1)
-#Check if we are still logged in
-if(firegex.status()["loggined"]):
- puts("Correctly received status after password change β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or not logged after password change β", color=colors.red)
- exit(1)
+ # Check if second session expired and relog
-#Check if second session expired and relog
+ if not firegex2.status()["loggined"]:
+ puts("Second instance was expired currectly β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Still logged in on second instance, expire expected β",
+ color=colors.red,
+ )
+ exit(1)
+ if firegex2.login(new_password):
+ puts("Sucessfully logged in on second instance β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Unknown response or wrong passowrd on second instance β",
+ color=colors.red,
+ )
+ exit(1)
-if(not firegex2.status()["loggined"]):
- puts("Second instance was expired currectly β", color=colors.green)
-else:
- puts("Test Failed: Still logged in on second instance, expire expected β", color=colors.red)
- exit(1)
-if (firegex2.login(new_password)):
- puts("Sucessfully logged in on second instance β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or wrong passowrd on second instance β", color=colors.red)
- exit(1)
+ # Change it back
+ if firegex.change_password(args.password, expire=False):
+ puts("Sucessfully restored the password β", color=colors.green)
+ else:
+ puts("Test Failed: Coundl't change the password β", color=colors.red)
+ exit(1)
-#Change it back
-if (firegex.change_password(args.password,expire=False)):
- puts("Sucessfully restored the password β", color=colors.green)
-else:
- puts("Test Failed: Coundl't change the password β", color=colors.red)
- exit(1)
+ # Check if we are still logged in
+ if firegex2.status()["loggined"]:
+ puts("Correctly received status after password change β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Unknown response or not logged after password change β",
+ color=colors.red,
+ )
+ exit(1)
-#Check if we are still logged in
-if(firegex2.status()["loggined"]):
- puts("Correctly received status after password change β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or not logged after password change β", color=colors.red)
- exit(1)
-
-puts("List of available interfaces:", color=colors.yellow)
-for interface in firegex.get_interfaces():
- puts("name: {}, address: {}".format(interface["name"], interface["addr"]), color=colors.yellow)
\ No newline at end of file
+ puts("List of available interfaces:", color=colors.yellow)
+ for interface in firegex.get_interfaces():
+ puts(
+ "name: {}, address: {}".format(interface["name"], interface["addr"]),
+ color=colors.yellow,
+ )
diff --git a/tests/nfproxy_test.py b/tests/nfproxy_test.py
index f9c4175..05efd6c 100644
--- a/tests/nfproxy_test.py
+++ b/tests/nfproxy_test.py
@@ -5,862 +5,877 @@ from utils.tcpserver import TcpServer
import argparse
import secrets
import time
+import textwrap
-parser = argparse.ArgumentParser()
-parser.add_argument(
- "--address",
- "-a",
- type=str,
- required=False,
- help="Address of firegex backend",
- default="http://127.0.0.1:4444/",
-)
-parser.add_argument("--password", "-p", type=str, required=True, help="Firegex password")
-parser.add_argument(
- "--service_name",
- "-n",
- type=str,
- required=False,
- help="Name of the test service",
- default="Test Service",
-)
-parser.add_argument(
- "--port",
- "-P",
- type=int,
- required=False,
- help="Port of the test service",
- default=1337,
-)
-parser.add_argument("--ipv6", "-6", action="store_true", help="Test Ipv6", default=False)
-parser.add_argument(
- "--verbose", "-V", action="store_true", help="Verbose output", default=False
-)
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--address",
+ "-a",
+ type=str,
+ required=False,
+ help="Address of firegex backend",
+ default="http://127.0.0.1:4444/",
+ )
+ parser.add_argument(
+ "--password", "-p", type=str, required=True, help="Firegex password"
+ )
+ parser.add_argument(
+ "--service_name",
+ "-n",
+ type=str,
+ required=False,
+ help="Name of the test service",
+ default="Test Service",
+ )
+ parser.add_argument(
+ "--port",
+ "-P",
+ type=int,
+ required=False,
+ help="Port of the test service",
+ default=1337,
+ )
+ parser.add_argument(
+ "--ipv6", "-6", action="store_true", help="Test Ipv6", default=False
+ )
+ parser.add_argument(
+ "--verbose", "-V", action="store_true", help="Verbose output", default=False
+ )
-args = parser.parse_args()
-sep()
-puts("Testing will start on ", color=colors.cyan, end="")
-puts(f"{args.address}", color=colors.yellow)
+ args = parser.parse_args()
+ sep()
+ puts("Testing will start on ", color=colors.cyan, end="")
+ puts(f"{args.address}", color=colors.yellow)
-firegex = FiregexAPI(args.address)
+ firegex = FiregexAPI(args.address)
-# Login
-if firegex.login(args.password):
- puts("Sucessfully logged in β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
- exit(1)
+ # Login
+ if firegex.login(args.password):
+ puts("Sucessfully logged in β", color=colors.green)
+ else:
+ puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
+ exit(1)
-# Create server
-server = TcpServer(args.port, ipv6=args.ipv6, verbose=args.verbose)
+ # Create server
+ server = TcpServer(args.port, ipv6=args.ipv6, verbose=args.verbose)
-srvs = firegex.nfproxy_get_services()
-for ele in srvs:
- if ele["name"] == args.service_name:
- firegex.nfproxy_delete_service(ele["service_id"])
+ srvs = firegex.nfproxy_get_services()
+ for ele in srvs:
+ if ele["name"] == args.service_name:
+ firegex.nfproxy_delete_service(ele["service_id"])
-service_id = firegex.nfproxy_add_service(
- args.service_name, args.port, "http", "::1" if args.ipv6 else "127.0.0.1"
-)
-if service_id:
- puts(f"Sucessfully created service {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Failed to create service β", color=colors.red)
- exit(1)
-
-
-def exit_test(code):
+ service_id = firegex.nfproxy_add_service(
+ args.service_name, args.port, "http", "::1" if args.ipv6 else "127.0.0.1"
+ )
if service_id:
- server.stop()
- """
- if firegex.nfproxy_delete_service(service_id):
- puts("Sucessfully deleted service β", color=colors.green)
- else:
- puts("Test Failed: Coulnd't delete serivce β", color=colors.red)
- """
- exit(code)
-
-
-if firegex.nfproxy_start_service(service_id):
- puts("Sucessfully started service β", color=colors.green)
-else:
- puts("Test Failed: Failed to start service β", color=colors.red)
- exit_test(1)
-
-server.start()
-time.sleep(0.5)
-try:
- if server.sendCheckData(secrets.token_bytes(432)):
- puts("Successfully tested first proxy with no filters β", color=colors.green)
+ puts(f"Sucessfully created service {service_id} β", color=colors.green)
else:
- puts("Test Failed: Data was corrupted ", color=colors.red)
+ puts("Test Failed: Failed to create service β", color=colors.red)
+ exit(1)
+
+ def exit_test(code):
+ if service_id:
+ server.stop()
+ if firegex.nfproxy_delete_service(service_id):
+ puts("Sucessfully deleted service β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't delete serivce β", color=colors.red)
+ exit(code)
+
+ if firegex.nfproxy_start_service(service_id):
+ puts("Sucessfully started service β", color=colors.green)
+ else:
+ puts("Test Failed: Failed to start service β", color=colors.red)
exit_test(1)
-except Exception:
- puts("Test Failed: Couldn't send data to the server ", color=colors.red)
- exit_test(1)
-BASE_FILTER_VERDICT_TEST = """
-from firegex.nfproxy.models import RawPacket
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def verdict_test(packet:RawPacket):
- if b"%%TEST%%" in packet.data:
- packet.l4_data = packet.l4_data.replace(b"%%TEST%%", b"%%MANGLE%%")
- return %%ACTION%%
-"""
-
-BASE_FILTER_VERDICT_NAME = "verdict_test"
-
-
-def get_vedict_test(to_match: str, action: str, mangle_to: str = "REDACTED"):
- return (
- BASE_FILTER_VERDICT_TEST.replace("%%TEST%%", to_match)
- .replace("%%ACTION%%", action)
- .replace("%%MANGLE%%", mangle_to)
- )
-
-
-# Check if filter is present in the service
-n_blocked = 0
-n_mangled = 0
-
-
-def checkFilter(match_bytes, filter_name, should_work=True, mangle_with=None):
- if mangle_with:
- if should_work:
- global n_mangled
- for r in firegex.nfproxy_get_service_pyfilters(service_id):
- if r["name"] == filter_name:
- # Test the filter
- pre_packet = secrets.token_bytes(40)
- post_packet = secrets.token_bytes(40)
- server.connect_client()
- server.send_packet(pre_packet + match_bytes + post_packet)
- real_response = server.recv_packet()
- expected_response = pre_packet + mangle_with + post_packet
- if real_response == expected_response:
- puts(
- "The malicious request was successfully mangled β",
- color=colors.green,
- )
- n_mangled += 1
- time.sleep(1)
- if (
- firegex.nfproxy_get_pyfilter(service_id, filter_name)[
- "edited_packets"
- ]
- == n_mangled
- ):
- puts(
- "The packet was reported as mangled in the API β",
- color=colors.green,
- )
- else:
- puts(
- "Test Failed: The packet wasn't reported as mangled in the API β",
- color=colors.red,
- )
- exit_test(1)
- server.send_packet(pre_packet)
- if server.recv_packet() == pre_packet:
- puts(
- "Is able to communicate after mangle β",
- color=colors.green,
- )
- else:
- puts(
- "Test Failed: Couldn't communicate after mangle β",
- color=colors.red,
- )
- exit_test(1)
- else:
- puts(
- "Test Failed: The request wasn't mangled β", color=colors.red
- )
- exit_test(1)
- server.close_client()
- return
- puts("Test Failed: The filter wasn't found β", color=colors.red)
- else:
- if server.sendCheckData(
- secrets.token_bytes(40) + match_bytes + secrets.token_bytes(40)
- ):
- puts("The request wasn't mangled β", color=colors.green)
- else:
- puts(
- "Test Failed: The request was mangled when it shouldn't have",
- color=colors.red,
- )
- exit_test(1)
- else:
- if should_work:
- global n_blocked
- for r in firegex.nfproxy_get_service_pyfilters(service_id):
- if r["name"] == filter_name:
- # Test the filter
- if not server.sendCheckData(
- secrets.token_bytes(40) + match_bytes + secrets.token_bytes(40)
- ):
- puts(
- "The malicious request was successfully blocked β",
- color=colors.green,
- )
- n_blocked += 1
- time.sleep(1)
- if (
- firegex.nfproxy_get_pyfilter(service_id, filter_name)[
- "blocked_packets"
- ]
- == n_blocked
- ):
- puts(
- "The packet was reported as blocked in the API β",
- color=colors.green,
- )
- else:
- puts(
- "Test Failed: The packet wasn't reported as blocked in the API β",
- color=colors.red,
- )
- exit_test(1)
- else:
- puts(
- "Test Failed: The request wasn't blocked β", color=colors.red
- )
- exit_test(1)
- return
- puts("Test Failed: The filter wasn't found β", color=colors.red)
- exit_test(1)
- else:
- if server.sendCheckData(
- secrets.token_bytes(40) + match_bytes + secrets.token_bytes(40)
- ):
- puts("The request wasn't blocked β", color=colors.green)
- else:
- puts(
- "Test Failed: The request was blocked when it shouldn't have",
- color=colors.red,
- )
- exit_test(1)
-
-
-# Add new filter
-secret = bytes(secrets.token_hex(16).encode())
-
-if firegex.nfproxy_set_code(service_id, get_vedict_test(secret.decode(), "REJECT")):
- puts(
- f"Sucessfully added filter for {str(secret)} in REJECT mode β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-checkFilter(secret, BASE_FILTER_VERDICT_NAME)
-
-# Pause the proxy
-if firegex.nfproxy_stop_service(service_id):
- puts(f"Sucessfully paused service with id {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't pause the service β", color=colors.red)
- exit_test(1)
-
-# Check if it's actually paused
-checkFilter(secret, BASE_FILTER_VERDICT_NAME, should_work=False)
-
-# Start firewall
-if firegex.nfproxy_start_service(service_id):
- puts(f"Sucessfully started service with id {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't start the service β", color=colors.red)
- exit_test(1)
-
-checkFilter(secret, BASE_FILTER_VERDICT_NAME)
-
-# Disable filter
-if firegex.nfproxy_disable_pyfilter(service_id, BASE_FILTER_VERDICT_NAME):
- puts(f"Sucessfully disabled filter {BASE_FILTER_VERDICT_NAME} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't disable the filter β", color=colors.red)
- exit_test(1)
-
-# Check if it's actually disabled
-checkFilter(secret, BASE_FILTER_VERDICT_NAME, should_work=False)
-
-# Enable filter
-if firegex.nfproxy_enable_pyfilter(service_id, BASE_FILTER_VERDICT_NAME):
- puts(f"Sucessfully enabled filter {BASE_FILTER_VERDICT_NAME} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't enable the regex β", color=colors.red)
- exit_test(1)
-
-checkFilter(secret, BASE_FILTER_VERDICT_NAME)
-
-
-def remove_filters():
- global n_blocked, n_mangled
- server.stop()
server.start()
- if not firegex.nfproxy_set_code(service_id, ""):
- puts("Test Failed: Couldn't remove the filter β", color=colors.red)
+ time.sleep(0.5)
+ try:
+ if server.sendCheckData(secrets.token_bytes(432)):
+ puts(
+ "Successfully tested first proxy with no filters β", color=colors.green
+ )
+ else:
+ puts("Test Failed: Data was corrupted ", color=colors.red)
+ exit_test(1)
+ except Exception:
+ puts("Test Failed: Couldn't send data to the server ", color=colors.red)
exit_test(1)
+
+ BASE_FILTER_VERDICT_TEST = textwrap.dedent("""
+ from firegex.nfproxy.models import RawPacket
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def verdict_test(packet:RawPacket):
+ if b"%%TEST%%" in packet.data:
+ packet.l4_data = packet.l4_data.replace(b"%%TEST%%", b"%%MANGLE%%")
+ return %%ACTION%%
+ """)
+
+ BASE_FILTER_VERDICT_NAME = "verdict_test"
+
+ def get_vedict_test(to_match: str, action: str, mangle_to: str = "REDACTED"):
+ return (
+ BASE_FILTER_VERDICT_TEST.replace("%%TEST%%", to_match)
+ .replace("%%ACTION%%", action)
+ .replace("%%MANGLE%%", mangle_to)
+ )
+
+ # Check if filter is present in the service
n_blocked = 0
n_mangled = 0
-
-remove_filters()
-
-# Check if it's actually deleted
-checkFilter(secret, BASE_FILTER_VERDICT_NAME, should_work=False)
-
-# Check if DROP works
-if firegex.nfproxy_set_code(service_id, get_vedict_test(secret.decode(), "DROP")):
- puts(
- f"Sucessfully added filter for {str(secret)} in DROP mode β", color=colors.green
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-checkFilter(secret, BASE_FILTER_VERDICT_NAME)
-
-remove_filters()
-
-# Check if UNSTABLE_MANGLE works
-mangle_result = secrets.token_hex(4).encode() # Mangle to a smaller packet
-if firegex.nfproxy_set_code(
- service_id,
- get_vedict_test(secret.decode(), "UNSTABLE_MANGLE", mangle_result.decode()),
-):
- puts(
- f"Sucessfully added filter for {str(secret)} in UNSTABLE_MANGLE mode to a smaller packet size β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-checkFilter(secret, BASE_FILTER_VERDICT_NAME, mangle_with=mangle_result)
-
-remove_filters()
-
-# Check if UNSTABLE_MANGLE works
-mangle_result = secrets.token_hex(60).encode() # Mangle to a bigger packet
-if firegex.nfproxy_set_code(
- service_id,
- get_vedict_test(secret.decode(), "UNSTABLE_MANGLE", mangle_result.decode()),
-):
- puts(
- f"Sucessfully added filter for {str(secret)} in UNSTABLE_MANGLE mode to a bigger packet size β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-checkFilter(secret, BASE_FILTER_VERDICT_NAME, mangle_with=mangle_result)
-
-remove_filters()
-
-secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
-TCP_INPUT_STREAM_TEST = f"""
-from firegex.nfproxy.models import TCPInputStream
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(packet:TCPInputStream):
- if {repr(secret)} in packet.data:
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, TCP_INPUT_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for TCPInputStream β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-data_split = len(secret) // 2
-server.connect_client()
-server.send_packet(secret[:data_split])
-if server.recv_packet() == secret[:data_split]:
- puts("The half-packet was successfully sent and received β", color=colors.green)
-else:
- puts("Test Failed: The half-packet wasn't received β", color=colors.red)
- exit_test(1)
-server.send_packet(secret[data_split:])
-if not server.recv_packet():
- puts("The malicious request was successfully blocked β", color=colors.green)
-else:
- puts("Test Failed: The request wasn't blocked β", color=colors.red)
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
-TCP_OUTPUT_STREAM_TEST = f"""
-from firegex.nfproxy.models import TCPOutputStream
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(packet:TCPOutputStream):
- if {repr(secret)} in packet.data:
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, TCP_OUTPUT_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for TCPOutputStream β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-data_split = len(secret) // 2
-server.connect_client()
-server.send_packet(secret[:data_split])
-if server.recv_packet() == secret[:data_split]:
- puts("The half-packet was successfully sent and received β", color=colors.green)
-else:
- puts("Test Failed: The half-packet wasn't received β", color=colors.red)
- exit_test(1)
-server.send_packet(secret[data_split:])
-if not server.recv_packet():
- puts("The malicious request was successfully blocked β", color=colors.green)
-else:
- puts("Test Failed: The request wasn't blocked β", color=colors.red)
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
-
-REQUEST_HEADER_TEST = f"""POST / HTTP/1.1
-Host: localhost
-X-TeSt: {secret.decode()}
-Content-Length: 15
-
-A Friendly Body""".replace("\n", "\r\n")
-
-REQUEST_BODY_TEST = f"""POST / HTTP/1.1
-Host: localhost
-X-TeSt: NotTheSecret
-Content-Length: {len(secret.decode())}
-
-{secret.decode()}""".replace("\n", "\r\n")
-
-HTTP_REQUEST_STREAM_TEST = f"""
-from firegex.nfproxy.models import HttpRequest
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(req:HttpRequest):
- if {repr(secret.decode())} in req.get_header("x-test"):
- return REJECT
- if req.body:
- if {repr(secret)} in req.body:
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, HTTP_REQUEST_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for HttpRequest β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-server.connect_client()
-server.send_packet(REQUEST_HEADER_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious header was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious header wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-server.connect_client()
-server.send_packet(REQUEST_BODY_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious body was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious body wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-HTTP_FULL_REQUEST_STREAM_TEST = f"""
-from firegex.nfproxy.models import HttpFullRequest
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(req:HttpFullRequest):
- if not req.body:
- return ACCEPT
- if not req.get_header("x-test"):
- return ACCEPT
- if {repr(secret.decode())} in req.get_header("x-test"):
- return REJECT
- if {repr(secret)} in req.body:
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, HTTP_FULL_REQUEST_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for HttpFullRequest β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-server.connect_client()
-server.send_packet(REQUEST_HEADER_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious header was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious header wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-server.connect_client()
-server.send_packet(REQUEST_BODY_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious body was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious body wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-
-HTTP_REQUEST_HEADER_STREAM_TEST = f"""
-from firegex.nfproxy.models import HttpRequestHeader
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(req:HttpRequestHeader):
- if {repr(secret.decode())} in req.get_header("x-test"):
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, HTTP_REQUEST_HEADER_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for HttpRequestHeader β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-server.connect_client()
-server.send_packet(REQUEST_HEADER_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious header was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious header wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
-
-RESPONSE_HEADER_TEST = f"""HTTP/1.1 200 OK
-Host: localhost
-X-TeSt: {secret.decode()}
-Content-Length: 15
-
-A Friendly Body""".replace("\n", "\r\n")
-
-RESPONSE_BODY_TEST = f"""HTTP/1.1 200 OK
-Host: localhost
-X-TeSt: NotTheSecret
-Content-Length: {len(secret.decode())}
-
-{secret.decode()}""".replace("\n", "\r\n")
-
-HTTP_RESPONSE_STREAM_TEST = f"""
-from firegex.nfproxy.models import HttpResponse
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(req:HttpResponse):
- if {repr(secret.decode())} in req.get_header("x-test"):
- return REJECT
- if req.body:
- if {repr(secret)} in req.body:
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, HTTP_RESPONSE_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for HttpResponse β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-server.connect_client()
-server.send_packet(RESPONSE_HEADER_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious header was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious header wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-server.connect_client()
-server.send_packet(RESPONSE_BODY_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious body was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious body wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-
-HTTP_FULL_RESPONSE_STREAM_TEST = f"""
-from firegex.nfproxy.models import HttpFullResponse
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(req:HttpFullResponse):
- if not req.body:
- return ACCEPT
- if not req.get_header("x-test"):
- return ACCEPT
- if {repr(secret.decode())} in req.get_header("x-test"):
- return REJECT
- if {repr(secret)} in req.body:
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, HTTP_FULL_RESPONSE_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for HttpFullResponse β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-server.connect_client()
-server.send_packet(RESPONSE_HEADER_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious header was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious header wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-server.connect_client()
-server.send_packet(RESPONSE_BODY_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious body was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious body wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-
-HTTP_RESPONSE_HEADER_STREAM_TEST = f"""
-from firegex.nfproxy.models import HttpResponseHeader
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(req:HttpResponseHeader):
- if {repr(secret.decode())} in req.get_header("x-test"):
- return REJECT
-
-"""
-
-if firegex.nfproxy_set_code(service_id, HTTP_RESPONSE_HEADER_STREAM_TEST):
- puts(
- f"Sucessfully added filter for {str(secret)} for HttpResponseHeader β",
- color=colors.green,
- )
-else:
- puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
- exit_test(1)
-
-server.connect_client()
-server.send_packet(RESPONSE_HEADER_TEST.encode())
-if not server.recv_packet():
- puts(
- "The malicious HTTP request with the malicious header was successfully blocked β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP request with the malicious header wasn't blocked β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-# Simulating requests is more complex due to websocket extensions handshake
-
-WS_REQUEST_PARSING_TEST = b"GET /sock/?EIO=4&transport=websocket HTTP/1.1\r\nHost: localhost:8080\r\nConnection: Upgrade\r\nPragma: no-cache\r\nCache-Control: no-cache\r\nUser-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)\xac AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36\r\nUpgrade: websocket\r\nOrigin: http://localhost:8080\r\nSec-WebSocket-Version: 13\r\nAccept-Encoding: gzip, deflate, br, zstd\r\nAccept-Language: it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,zh;q=0.5\r\nCookie: cookie-consent=true; _iub_cs-86405163=%7B%22timestamp%22%3A%222024-09-12T18%3A20%3A18.627Z%22%2C%22version%22%3A%221.65.1%22%2C%22purposes%22%3A%7B%221%22%3Atrue%2C%224%22%3Atrue%7D%2C%22id%22%3A86405163%2C%22cons%22%3A%7B%22rand%22%3A%222b09e6%22%7D%7D\r\nSec-WebSocket-Key: eE01O3/ZShPKsrykACLAaA==\r\nSec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n\r\n\xc1\x84#\x8a\xb2\xbb\x11\xbb\xb2\xbb"
-WS_RESPONSE_PARSING_TEST = b"HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: eGnJqUSoSKE3wOfKD2M3G82RsS8=\r\nSec-WebSocket-Extensions: permessage-deflate\r\ndate: Sat, 15 Mar 2025 12:04:19 GMT\r\nserver: uvicorn\r\n\r\n\xc1_2\xa8V*\xceLQ\xb2Rr1\xb4\xc8\xf6r\x0c\xf3\xaf\xd25\xf7\x8e\xf4\xb3LsttrW\xd2Q*-H/JLI-V\xb2\x8a\x8e\xd5Q*\xc8\xccK\x0f\xc9\xccM\xcd/-Q\xb222\x00\x02\x88\x98g^IjQYb\x0eP\xd0\x14,\x98\x9bX\x11\x90X\x99\x93\x9f\x084\xda\xd0\x00\x0cj\x01\x00\xc1\x1b21\x80\xd9e\xe1n\x19\x9e\xe3RP\x9a[Z\x99\x93j\xea\x15\x00\xb4\xcbC\xa9\x16\x00"
-
-HTTP_REQUEST_WS_PARSING_TEST = """
-from firegex.nfproxy.models import HttpRequest, HttpResponse
-from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
-
-@pyfilter
-def data_type_test(req:HttpRequest):
- print(req)
-
-@pyfilter
-def data_type_test(req:HttpResponse):
- print(req)
-
-"""
-
-if firegex.nfproxy_set_code(service_id, HTTP_REQUEST_WS_PARSING_TEST):
- puts(
- "Sucessfully added filter websocket parsing with HttpRequest and HttpResponse β",
- color=colors.green,
- )
-else:
- puts("Test Failed: Couldn't add the websocket parsing filter β", color=colors.red)
- exit_test(1)
-
-server.connect_client()
-server.send_packet(WS_REQUEST_PARSING_TEST, server_reply=WS_RESPONSE_PARSING_TEST)
-if server.recv_packet():
- puts(
- "The HTTP websocket upgrade request was successfully parsed β",
- color=colors.green,
- )
-else:
- puts(
- "Test Failed: The HTTP websocket upgrade request wasn't parsed (an error occurred) β",
- color=colors.red,
- )
- exit_test(1)
-server.close_client()
-
-remove_filters()
-
-# Rename service
-if firegex.nfproxy_rename_service(service_id, f"{args.service_name}2"):
- puts(f"Sucessfully renamed service to {args.service_name}2 β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't rename service β", color=colors.red)
- exit_test(1)
-
-# Check if service was renamed correctly
-service = firegex.nfproxy_get_service(service_id)
-if service["name"] == f"{args.service_name}2":
- puts("Checked that service was renamed correctly β", color=colors.green)
-else:
- puts("Test Failed: Service wasn't renamed correctly β", color=colors.red)
- exit_test(1)
-
-# Rename back service
-if firegex.nfproxy_rename_service(service_id, f"{args.service_name}"):
- puts(f"Sucessfully renamed service to {args.service_name} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't rename service β", color=colors.red)
- exit_test(1)
-
-# Change settings
-if firegex.nfproxy_settings_service(
- service_id, 1338, "::dead:beef" if args.ipv6 else "123.123.123.123", True
-):
- srv_updated = firegex.nfproxy_get_service(service_id)
- if (
- srv_updated["port"] == 1338
- and ("::dead:beef" if args.ipv6 else "123.123.123.123") in srv_updated["ip_int"]
- and srv_updated["fail_open"]
+ def checkFilter(match_bytes, filter_name, should_work=True, mangle_with=None):
+ if mangle_with:
+ if should_work:
+ global n_mangled
+ for r in firegex.nfproxy_get_service_pyfilters(service_id):
+ if r["name"] == filter_name:
+ # Test the filter
+ pre_packet = secrets.token_bytes(40)
+ post_packet = secrets.token_bytes(40)
+ server.connect_client()
+ server.send_packet(pre_packet + match_bytes + post_packet)
+ real_response = server.recv_packet()
+ expected_response = pre_packet + mangle_with + post_packet
+ if real_response == expected_response:
+ puts(
+ "The malicious request was successfully mangled β",
+ color=colors.green,
+ )
+ n_mangled += 1
+ time.sleep(1)
+ if (
+ firegex.nfproxy_get_pyfilter(service_id, filter_name)[
+ "edited_packets"
+ ]
+ == n_mangled
+ ):
+ puts(
+ "The packet was reported as mangled in the API β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The packet wasn't reported as mangled in the API β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.send_packet(pre_packet)
+ if server.recv_packet() == pre_packet:
+ puts(
+ "Is able to communicate after mangle β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: Couldn't communicate after mangle β",
+ color=colors.red,
+ )
+ exit_test(1)
+ else:
+ puts(
+ "Test Failed: The request wasn't mangled β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+ return
+ puts("Test Failed: The filter wasn't found β", color=colors.red)
+ else:
+ if server.sendCheckData(
+ secrets.token_bytes(40) + match_bytes + secrets.token_bytes(40)
+ ):
+ puts("The request wasn't mangled β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: The request was mangled when it shouldn't have",
+ color=colors.red,
+ )
+ exit_test(1)
+ else:
+ if should_work:
+ global n_blocked
+ for r in firegex.nfproxy_get_service_pyfilters(service_id):
+ if r["name"] == filter_name:
+ # Test the filter
+ if not server.sendCheckData(
+ secrets.token_bytes(40)
+ + match_bytes
+ + secrets.token_bytes(40)
+ ):
+ puts(
+ "The malicious request was successfully blocked β",
+ color=colors.green,
+ )
+ n_blocked += 1
+ time.sleep(1)
+ if (
+ firegex.nfproxy_get_pyfilter(service_id, filter_name)[
+ "blocked_packets"
+ ]
+ == n_blocked
+ ):
+ puts(
+ "The packet was reported as blocked in the API β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The packet wasn't reported as blocked in the API β",
+ color=colors.red,
+ )
+ exit_test(1)
+ else:
+ puts(
+ "Test Failed: The request wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ return
+ puts("Test Failed: The filter wasn't found β", color=colors.red)
+ exit_test(1)
+ else:
+ if server.sendCheckData(
+ secrets.token_bytes(40) + match_bytes + secrets.token_bytes(40)
+ ):
+ puts("The request wasn't blocked β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: The request was blocked when it shouldn't have",
+ color=colors.red,
+ )
+ exit_test(1)
+
+ # Add new filter
+ secret = bytes(secrets.token_hex(16).encode())
+
+ if firegex.nfproxy_set_code(service_id, get_vedict_test(secret.decode(), "REJECT")):
+ puts(
+ f"Sucessfully added filter for {str(secret)} in REJECT mode β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME)
+
+ # Pause the proxy
+ if firegex.nfproxy_stop_service(service_id):
+ puts(f"Sucessfully paused service with id {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't pause the service β", color=colors.red)
+ exit_test(1)
+
+ # Check if it's actually paused
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME, should_work=False)
+
+ # Start firewall
+ if firegex.nfproxy_start_service(service_id):
+ puts(f"Sucessfully started service with id {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't start the service β", color=colors.red)
+ exit_test(1)
+
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME)
+
+ # Disable filter
+ if firegex.nfproxy_disable_pyfilter(service_id, BASE_FILTER_VERDICT_NAME):
+ puts(
+ f"Sucessfully disabled filter {BASE_FILTER_VERDICT_NAME} β",
+ color=colors.green,
+ )
+ else:
+ puts("Test Failed: Coulnd't disable the filter β", color=colors.red)
+ exit_test(1)
+
+ # Check if it's actually disabled
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME, should_work=False)
+
+ # Enable filter
+ if firegex.nfproxy_enable_pyfilter(service_id, BASE_FILTER_VERDICT_NAME):
+ puts(
+ f"Sucessfully enabled filter {BASE_FILTER_VERDICT_NAME} β",
+ color=colors.green,
+ )
+ else:
+ puts("Test Failed: Coulnd't enable the regex β", color=colors.red)
+ exit_test(1)
+
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME)
+
+ def remove_filters():
+ global n_blocked, n_mangled
+ server.stop()
+ server.start()
+ time.sleep(0.5) # Wait for server to fully start
+ if not firegex.nfproxy_set_code(service_id, ""):
+ puts("Test Failed: Couldn't remove the filter β", color=colors.red)
+ exit_test(1)
+ n_blocked = 0
+ n_mangled = 0
+
+ remove_filters()
+
+ # Check if it's actually deleted
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME, should_work=False)
+
+ # Check if DROP works
+ if firegex.nfproxy_set_code(service_id, get_vedict_test(secret.decode(), "DROP")):
+ puts(
+ f"Sucessfully added filter for {str(secret)} in DROP mode β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME)
+
+ remove_filters()
+
+ # Check if UNSTABLE_MANGLE works
+ mangle_result = secrets.token_hex(4).encode() # Mangle to a smaller packet
+ if firegex.nfproxy_set_code(
+ service_id,
+ get_vedict_test(secret.decode(), "UNSTABLE_MANGLE", mangle_result.decode()),
):
- puts("Sucessfully changed service settings β", color=colors.green)
+ puts(
+ f"Sucessfully added filter for {str(secret)} in UNSTABLE_MANGLE mode to a smaller packet size β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME, mangle_with=mangle_result)
+
+ remove_filters()
+
+ # Check if UNSTABLE_MANGLE works
+ mangle_result = secrets.token_hex(60).encode() # Mangle to a bigger packet
+ if firegex.nfproxy_set_code(
+ service_id,
+ get_vedict_test(secret.decode(), "UNSTABLE_MANGLE", mangle_result.decode()),
+ ):
+ puts(
+ f"Sucessfully added filter for {str(secret)} in UNSTABLE_MANGLE mode to a bigger packet size β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ checkFilter(secret, BASE_FILTER_VERDICT_NAME, mangle_with=mangle_result)
+
+ remove_filters()
+
+ secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
+ TCP_INPUT_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import TCPInputStream
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(packet:TCPInputStream):
+ if {repr(secret)} in packet.data:
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, TCP_INPUT_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for TCPInputStream β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ data_split = len(secret) // 2
+ server.connect_client()
+ server.send_packet(secret[:data_split])
+ if server.recv_packet() == secret[:data_split]:
+ puts("The half-packet was successfully sent and received β", color=colors.green)
+ else:
+ puts("Test Failed: The half-packet wasn't received β", color=colors.red)
+ exit_test(1)
+ server.send_packet(secret[data_split:])
+ if not server.recv_packet():
+ puts("The malicious request was successfully blocked β", color=colors.green)
+ else:
+ puts("Test Failed: The request wasn't blocked β", color=colors.red)
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
+ TCP_OUTPUT_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import TCPOutputStream
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(packet:TCPOutputStream):
+ if {repr(secret)} in packet.data:
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, TCP_OUTPUT_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for TCPOutputStream β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ data_split = len(secret) // 2
+ server.connect_client()
+ server.send_packet(secret[:data_split])
+ if server.recv_packet() == secret[:data_split]:
+ puts("The half-packet was successfully sent and received β", color=colors.green)
+ else:
+ puts("Test Failed: The half-packet wasn't received β", color=colors.red)
+ exit_test(1)
+ server.send_packet(secret[data_split:])
+ if not server.recv_packet():
+ puts("The malicious request was successfully blocked β", color=colors.green)
+ else:
+ puts("Test Failed: The request wasn't blocked β", color=colors.red)
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
+
+ REQUEST_HEADER_TEST = textwrap.dedent(f"""POST / HTTP/1.1
+ Host: localhost
+ X-TeSt: {secret.decode()}
+ Content-Length: 15
+
+ A Friendly Body""").replace("\n", "\r\n")
+
+ REQUEST_BODY_TEST = textwrap.dedent(f"""POST / HTTP/1.1
+ Host: localhost
+ X-TeSt: NotTheSecret
+ Content-Length: {len(secret.decode())}
+
+ {secret.decode()}""").replace("\n", "\r\n")
+
+ HTTP_REQUEST_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import HttpRequest
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(req:HttpRequest):
+ if {repr(secret.decode())} in req.get_header("x-test"):
+ return REJECT
+ if req.body:
+ if {repr(secret)} in req.body:
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, HTTP_REQUEST_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for HttpRequest β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ server.connect_client()
+ server.send_packet(REQUEST_HEADER_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious header was successfully blocked β",
+ color=colors.green,
+ )
else:
puts(
- "Test Failed: Service settings weren't updated correctly β", color=colors.red
+ "Test Failed: The HTTP request with the malicious header wasn't blocked β",
+ color=colors.red,
)
exit_test(1)
-else:
- puts("Test Failed: Coulnd't change service settings β", color=colors.red)
- exit_test(1)
+ server.close_client()
-exit_test(0)
+ server.connect_client()
+ server.send_packet(REQUEST_BODY_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious body was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious body wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ HTTP_FULL_REQUEST_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import HttpFullRequest
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(req:HttpFullRequest):
+ if not req.body:
+ return ACCEPT
+ if not req.get_header("x-test"):
+ return ACCEPT
+ if {repr(secret.decode())} in req.get_header("x-test"):
+ return REJECT
+ if {repr(secret)} in req.body:
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, HTTP_FULL_REQUEST_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for HttpFullRequest β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ server.connect_client()
+ server.send_packet(REQUEST_HEADER_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious header was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious header wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ server.connect_client()
+ server.send_packet(REQUEST_BODY_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious body was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious body wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ HTTP_REQUEST_HEADER_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import HttpRequestHeader
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(req:HttpRequestHeader):
+ if {repr(secret.decode())} in req.get_header("x-test"):
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, HTTP_REQUEST_HEADER_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for HttpRequestHeader β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ server.connect_client()
+ server.send_packet(REQUEST_HEADER_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious header was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious header wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ secret = b"8331ee1bf75893dd7fa3d34f29bac7fc8935aa3ef6c565fe8b395ef7f485"
+
+ RESPONSE_HEADER_TEST = textwrap.dedent(f"""HTTP/1.1 200 OK
+ Host: localhost
+ X-TeSt: {secret.decode()}
+ Content-Length: 15
+
+ A Friendly Body""").replace("\n", "\r\n")
+
+ RESPONSE_BODY_TEST = textwrap.dedent(f"""HTTP/1.1 200 OK
+ Host: localhost
+ X-TeSt: NotTheSecret
+ Content-Length: {len(secret.decode())}
+
+ {secret.decode()}""").replace("\n", "\r\n")
+
+ HTTP_RESPONSE_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import HttpResponse
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(req:HttpResponse):
+ if {repr(secret.decode())} in req.get_header("x-test"):
+ return REJECT
+ if req.body:
+ if {repr(secret)} in req.body:
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, HTTP_RESPONSE_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for HttpResponse β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ server.connect_client()
+ server.send_packet(RESPONSE_HEADER_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious header was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious header wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ server.connect_client()
+ server.send_packet(RESPONSE_BODY_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious body was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious body wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ HTTP_FULL_RESPONSE_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import HttpFullResponse
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(req:HttpFullResponse):
+ if not req.body:
+ return ACCEPT
+ if not req.get_header("x-test"):
+ return ACCEPT
+ if {repr(secret.decode())} in req.get_header("x-test"):
+ return REJECT
+ if {repr(secret)} in req.body:
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, HTTP_FULL_RESPONSE_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for HttpFullResponse β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ server.connect_client()
+ server.send_packet(RESPONSE_HEADER_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious header was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious header wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ server.connect_client()
+ server.send_packet(RESPONSE_BODY_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious body was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious body wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ HTTP_RESPONSE_HEADER_STREAM_TEST = textwrap.dedent(f"""
+ from firegex.nfproxy.models import HttpResponseHeader
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(req:HttpResponseHeader):
+ if {repr(secret.decode())} in req.get_header("x-test"):
+ return REJECT
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, HTTP_RESPONSE_HEADER_STREAM_TEST):
+ puts(
+ f"Sucessfully added filter for {str(secret)} for HttpResponseHeader β",
+ color=colors.green,
+ )
+ else:
+ puts(f"Test Failed: Couldn't add the filter {str(secret)} β", color=colors.red)
+ exit_test(1)
+
+ server.connect_client()
+ server.send_packet(RESPONSE_HEADER_TEST.encode())
+ if not server.recv_packet():
+ puts(
+ "The malicious HTTP request with the malicious header was successfully blocked β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP request with the malicious header wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ # Simulating requests is more complex due to websocket extensions handshake
+
+ WS_REQUEST_PARSING_TEST = b"GET /sock/?EIO=4&transport=websocket HTTP/1.1\r\nHost: localhost:8080\r\nConnection: Upgrade\r\nPragma: no-cache\r\nCache-Control: no-cache\r\nUser-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)\xac AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36\r\nUpgrade: websocket\r\nOrigin: http://localhost:8080\r\nSec-WebSocket-Version: 13\r\nAccept-Encoding: gzip, deflate, br, zstd\r\nAccept-Language: it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,zh;q=0.5\r\nCookie: cookie-consent=true; _iub_cs-86405163=%7B%22timestamp%22%3A%222024-09-12T18%3A20%3A18.627Z%22%2C%22version%22%3A%221.65.1%22%2C%22purposes%22%3A%7B%221%22%3Atrue%2C%224%22%3Atrue%7D%2C%22id%22%3A86405163%2C%22cons%22%3A%7B%22rand%22%3A%222b09e6%22%7D%7D\r\nSec-WebSocket-Key: eE01O3/ZShPKsrykACLAaA==\r\nSec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n\r\n\xc1\x84#\x8a\xb2\xbb\x11\xbb\xb2\xbb"
+ WS_RESPONSE_PARSING_TEST = b"HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: eGnJqUSoSKE3wOfKD2M3G82RsS8=\r\nSec-WebSocket-Extensions: permessage-deflate\r\ndate: Sat, 15 Mar 2025 12:04:19 GMT\r\nserver: uvicorn\r\n\r\n\xc1_2\xa8V*\xceLQ\xb2Rr1\xb4\xc8\xf6r\x0c\xf3\xaf\xd25\xf7\x8e\xf4\xb3LsttrW\xd2Q*-H/JLI-V\xb2\x8a\x8e\xd5Q*\xc8\xccK\x0f\xc9\xccM\xcd/-Q\xb222\x00\x02\x88\x98g^IjQYb\x0eP\xd0\x14,\x98\x9bX\x11\x90X\x99\x93\x9f\x084\xda\xd0\x00\x0cj\x01\x00\xc1\x1b21\x80\xd9e\xe1n\x19\x9e\xe3RP\x9a[Z\x99\x93j\xea\x15\x00\xb4\xcbC\xa9\x16\x00"
+
+ HTTP_REQUEST_WS_PARSING_TEST = textwrap.dedent("""
+ from firegex.nfproxy.models import HttpRequest, HttpResponse
+ from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT
+
+ @pyfilter
+ def data_type_test(req:HttpRequest):
+ print(req)
+
+ @pyfilter
+ def data_type_test(req:HttpResponse):
+ print(req)
+
+ """)
+
+ if firegex.nfproxy_set_code(service_id, HTTP_REQUEST_WS_PARSING_TEST):
+ puts(
+ "Sucessfully added filter websocket parsing with HttpRequest and HttpResponse β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: Couldn't add the websocket parsing filter β", color=colors.red
+ )
+ exit_test(1)
+
+ server.connect_client()
+ server.send_packet(WS_REQUEST_PARSING_TEST, server_reply=WS_RESPONSE_PARSING_TEST)
+ if server.recv_packet():
+ puts(
+ "The HTTP websocket upgrade request was successfully parsed β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The HTTP websocket upgrade request wasn't parsed (an error occurred) β",
+ color=colors.red,
+ )
+ exit_test(1)
+ server.close_client()
+
+ remove_filters()
+
+ # Rename service
+ if firegex.nfproxy_rename_service(service_id, f"{args.service_name}2"):
+ puts(
+ f"Sucessfully renamed service to {args.service_name}2 β", color=colors.green
+ )
+ else:
+ puts("Test Failed: Coulnd't rename service β", color=colors.red)
+ exit_test(1)
+
+ # Check if service was renamed correctly
+ service = firegex.nfproxy_get_service(service_id)
+ if service["name"] == f"{args.service_name}2":
+ puts("Checked that service was renamed correctly β", color=colors.green)
+ else:
+ puts("Test Failed: Service wasn't renamed correctly β", color=colors.red)
+ exit_test(1)
+
+ # Rename back service
+ if firegex.nfproxy_rename_service(service_id, f"{args.service_name}"):
+ puts(
+ f"Sucessfully renamed service to {args.service_name} β", color=colors.green
+ )
+ else:
+ puts("Test Failed: Coulnd't rename service β", color=colors.red)
+ exit_test(1)
+
+ # Change settings
+ if firegex.nfproxy_settings_service(
+ service_id, 1338, "::dead:beef" if args.ipv6 else "123.123.123.123", True
+ ):
+ srv_updated = firegex.nfproxy_get_service(service_id)
+ if (
+ srv_updated["port"] == 1338
+ and ("::dead:beef" if args.ipv6 else "123.123.123.123")
+ in srv_updated["ip_int"]
+ and srv_updated["fail_open"]
+ ):
+ puts("Sucessfully changed service settings β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Service settings weren't updated correctly β",
+ color=colors.red,
+ )
+ exit_test(1)
+ else:
+ puts("Test Failed: Coulnd't change service settings β", color=colors.red)
+ exit_test(1)
+
+ exit_test(0)
diff --git a/tests/nfregex_test.py b/tests/nfregex_test.py
index d3e2758..4fb0836 100644
--- a/tests/nfregex_test.py
+++ b/tests/nfregex_test.py
@@ -8,247 +8,366 @@ import secrets
import base64
import time
-parser = argparse.ArgumentParser()
-parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/")
-parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password')
-parser.add_argument("--service_name", "-n", type=str , required=False, help='Name of the test service', default="Test Service")
-parser.add_argument("--port", "-P", type=int , required=False, help='Port of the test service', default=1337)
-parser.add_argument("--ipv6", "-6" , action="store_true", help='Test Ipv6', default=False)
-parser.add_argument("--proto", "-m" , type=str, required=False, choices=["tcp","udp"], help='Select the protocol', default="tcp")
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--address",
+ "-a",
+ type=str,
+ required=False,
+ help="Address of firegex backend",
+ default="http://127.0.0.1:4444/",
+ )
+ parser.add_argument(
+ "--password", "-p", type=str, required=True, help="Firegex password"
+ )
+ parser.add_argument(
+ "--service_name",
+ "-n",
+ type=str,
+ required=False,
+ help="Name of the test service",
+ default="Test Service",
+ )
+ parser.add_argument(
+ "--port",
+ "-P",
+ type=int,
+ required=False,
+ help="Port of the test service",
+ default=1337,
+ )
+ parser.add_argument(
+ "--ipv6", "-6", action="store_true", help="Test Ipv6", default=False
+ )
+ parser.add_argument(
+ "--proto",
+ "-m",
+ type=str,
+ required=False,
+ choices=["tcp", "udp"],
+ help="Select the protocol",
+ default="tcp",
+ )
-args = parser.parse_args()
-sep()
-puts("Testing will start on ", color=colors.cyan, end="")
-puts(f"{args.address}", color=colors.yellow)
+ args = parser.parse_args()
+ sep()
+ puts("Testing will start on ", color=colors.cyan, end="")
+ puts(f"{args.address}", color=colors.yellow)
-firegex = FiregexAPI(args.address)
+ firegex = FiregexAPI(args.address)
-#Login
-if (firegex.login(args.password)):
- puts("Sucessfully logged in β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
- exit(1)
-
-#Create server
-server = (TcpServer if args.proto == "tcp" else UdpServer)(args.port,ipv6=args.ipv6)
-
-def exit_test(code):
- if service_id:
- server.stop()
- if(firegex.nfregex_delete_service(service_id)):
- puts("Sucessfully deleted service β", color=colors.green)
- else:
- puts("Test Failed: Coulnd't delete serivce β", color=colors.red)
- exit_test(1)
- exit(code)
-
-srvs = firegex.nfregex_get_services()
-for ele in srvs:
- if ele['name'] == args.service_name:
- firegex.nfregex_delete_service(ele['service_id'])
-
-service_id = firegex.nfregex_add_service(args.service_name, args.port, args.proto , "::1" if args.ipv6 else "127.0.0.1" )
-if service_id:
- puts(f"Sucessfully created service {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Failed to create service β", color=colors.red)
- exit(1)
-
-if(firegex.nfregex_start_service(service_id)):
- puts("Sucessfully started service β", color=colors.green)
-else:
- puts("Test Failed: Failed to start service β", color=colors.red)
- exit_test(1)
-
-server.start()
-time.sleep(0.5)
-try:
- if server.sendCheckData(secrets.token_bytes(432)):
- puts("Successfully tested first proxy with no regex β", color=colors.green)
+ # Login
+ if firegex.login(args.password):
+ puts("Sucessfully logged in β", color=colors.green)
else:
- puts("Test Failed: Data was corrupted ", color=colors.red)
+ puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
+ exit(1)
+
+ # Create server
+ server = (TcpServer if args.proto == "tcp" else UdpServer)(
+ args.port, ipv6=args.ipv6
+ )
+
+ def exit_test(code):
+ if service_id:
+ server.stop()
+ if firegex.nfregex_delete_service(service_id):
+ puts("Sucessfully deleted service β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't delete serivce β", color=colors.red)
+ exit_test(1)
+ exit(code)
+
+ srvs = firegex.nfregex_get_services()
+ for ele in srvs:
+ if ele["name"] == args.service_name:
+ firegex.nfregex_delete_service(ele["service_id"])
+
+ service_id = firegex.nfregex_add_service(
+ args.service_name, args.port, args.proto, "::1" if args.ipv6 else "127.0.0.1"
+ )
+ if service_id:
+ puts(f"Sucessfully created service {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Failed to create service β", color=colors.red)
+ exit(1)
+
+ if firegex.nfregex_start_service(service_id):
+ puts("Sucessfully started service β", color=colors.green)
+ else:
+ puts("Test Failed: Failed to start service β", color=colors.red)
exit_test(1)
-except Exception:
- puts("Test Failed: Couldn't send data to the server ", color=colors.red)
- exit_test(1)
-#Add new regex
-secret = bytes(secrets.token_hex(16).encode())
-if firegex.nfregex_add_regex(service_id,secret,"B",active=True,is_case_sensitive=True):
- puts(f"Sucessfully added regex {str(secret)} β", color=colors.green)
-else:
- puts(f"Test Failed: Couldn't add the regex {str(secret)} β", color=colors.red)
- exit_test(1)
+ server.start()
+ time.sleep(0.5)
+ try:
+ if server.sendCheckData(secrets.token_bytes(432)):
+ puts("Successfully tested first proxy with no regex β", color=colors.green)
+ else:
+ puts("Test Failed: Data was corrupted ", color=colors.red)
+ exit_test(1)
+ except Exception:
+ puts("Test Failed: Couldn't send data to the server ", color=colors.red)
+ exit_test(1)
+ # Add new regex
+ secret = bytes(secrets.token_hex(16).encode())
+ if firegex.nfregex_add_regex(
+ service_id, secret, "B", active=True, is_case_sensitive=True
+ ):
+ puts(f"Sucessfully added regex {str(secret)} β", color=colors.green)
+ else:
+ puts(f"Test Failed: Couldn't add the regex {str(secret)} β", color=colors.red)
+ exit_test(1)
-#Check if regex is present in the service
-n_blocked = 0
+ # Check if regex is present in the service
+ n_blocked = 0
-def getMetric(metric_name, regex):
- for metric in firegex.nfregex_get_metrics().split("\n"):
- if metric.startswith(metric_name + "{") and f'regex="{regex}"' in metric:
- return int(metric.split(" ")[-1])
+ def getMetric(metric_name, regex):
+ for metric in firegex.nfregex_get_metrics().split("\n"):
+ if metric.startswith(metric_name + "{") and f'regex="{regex}"' in metric:
+ return int(metric.split(" ")[-1])
-def checkRegex(regex, should_work=True, upper=False, deleted=False):
- if should_work:
+ def checkRegex(regex, should_work=True, upper=False, deleted=False):
+ if should_work:
+ global n_blocked
+ for r in firegex.nfregex_get_service_regexes(service_id):
+ if r["regex"] == secret:
+ # Test the regex
+ s = regex.upper() if upper else regex
+ if not server.sendCheckData(
+ secrets.token_bytes(40) + s + secrets.token_bytes(40)
+ ):
+ puts(
+ "The malicious request was successfully blocked β",
+ color=colors.green,
+ )
+ n_blocked += 1
+ time.sleep(1)
+ if firegex.nfregex_get_regex(r["id"])["n_packets"] == n_blocked:
+ puts(
+ "The packet was reported as blocked in the API β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The packet wasn't reported as blocked in the API β",
+ color=colors.red,
+ )
+ exit_test(1)
+ if (
+ getMetric("firegex_blocked_packets", secret.decode())
+ == n_blocked
+ ):
+ puts(
+ "The packet was reported as blocked in the metrics β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The packet wasn't reported as blocked in the metrics β",
+ color=colors.red,
+ )
+ exit_test(1)
+ if getMetric("firegex_active", secret.decode()) == 1:
+ puts(
+ "The regex was reported as active in the metrics β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The regex wasn't reported as active in the metrics β",
+ color=colors.red,
+ )
+ exit_test(1)
+ else:
+ puts(
+ "Test Failed: The request wasn't blocked β",
+ color=colors.red,
+ )
+ exit_test(1)
+ return
+ puts("Test Failed: The regex wasn't found β", color=colors.red)
+ exit_test(1)
+ else:
+ if server.sendCheckData(
+ secrets.token_bytes(40)
+ + base64.b64decode(regex)
+ + secrets.token_bytes(40)
+ ):
+ puts("The request wasn't blocked β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: The request was blocked when it shouldn't have",
+ color=colors.red,
+ )
+ exit_test(1)
+ if not deleted:
+ if getMetric("firegex_active", secret.decode()) == 0:
+ puts(
+ "The regex was reported as inactive in the metrics β",
+ color=colors.green,
+ )
+ else:
+ puts(
+ "Test Failed: The regex wasn't reported as inactive in the metrics β",
+ color=colors.red,
+ )
+ exit_test(1)
+
+ def clear_regexes():
global n_blocked
+ n_blocked = 0
for r in firegex.nfregex_get_service_regexes(service_id):
if r["regex"] == secret:
- #Test the regex
- s = regex.upper() if upper else regex
- if not server.sendCheckData(secrets.token_bytes(40) + s + secrets.token_bytes(40)):
- puts("The malicious request was successfully blocked β", color=colors.green)
- n_blocked += 1
- time.sleep(1)
- if firegex.nfregex_get_regex(r["id"])["n_packets"] == n_blocked:
- puts("The packet was reported as blocked in the API β", color=colors.green)
- else:
- puts("Test Failed: The packet wasn't reported as blocked in the API β", color=colors.red)
- exit_test(1)
- if getMetric("firegex_blocked_packets", secret.decode()) == n_blocked:
- puts("The packet was reported as blocked in the metrics β", color=colors.green)
- else:
- puts("Test Failed: The packet wasn't reported as blocked in the metrics β", color=colors.red)
- exit_test(1)
- if getMetric("firegex_active", secret.decode()) == 1:
- puts("The regex was reported as active in the metrics β", color=colors.green)
- else:
- puts("Test Failed: The regex wasn't reported as active in the metrics β", color=colors.red)
- exit_test(1)
+ if firegex.nfregex_delete_regex(r["id"]):
+ puts(
+ f"Sucessfully deleted regex with id {r['id']} β",
+ color=colors.green,
+ )
else:
- puts("Test Failed: The request wasn't blocked β", color=colors.red)
+ puts("Test Failed: Coulnd't delete the regex β", color=colors.red)
exit_test(1)
- return
- puts("Test Failed: The regex wasn't found β", color=colors.red)
- exit_test(1)
- else:
- if server.sendCheckData(secrets.token_bytes(40) + base64.b64decode(regex) + secrets.token_bytes(40)):
- puts("The request wasn't blocked β", color=colors.green)
+ break
+ if f'regex="{secret.decode()}"' not in firegex.nfregex_get_metrics():
+ puts("No regex metrics after deletion β", color=colors.green)
else:
- puts("Test Failed: The request was blocked when it shouldn't have", color=colors.red)
+ puts(
+ "Test Failed: Metrics found after deleting the regex β",
+ color=colors.red,
+ )
exit_test(1)
- if not deleted:
- if getMetric("firegex_active", secret.decode()) == 0:
- puts("The regex was reported as inactive in the metrics β", color=colors.green)
- else:
- puts("Test Failed: The regex wasn't reported as inactive in the metrics β", color=colors.red)
- exit_test(1)
-def clear_regexes():
- global n_blocked
- n_blocked = 0
+ checkRegex(secret)
+
+ # Pause the proxy
+ if firegex.nfregex_stop_service(service_id):
+ puts(f"Sucessfully paused service with id {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't pause the service β", color=colors.red)
+ exit_test(1)
+
+ # Check if it's actually paused
+ checkRegex(secret, should_work=False)
+
+ # Start firewall
+ if firegex.nfregex_start_service(service_id):
+ puts(f"Sucessfully started service with id {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't start the service β", color=colors.red)
+ exit_test(1)
+
+ checkRegex(secret)
+
+ # Disable regex
for r in firegex.nfregex_get_service_regexes(service_id):
if r["regex"] == secret:
- if(firegex.nfregex_delete_regex(r["id"])):
- puts(f"Sucessfully deleted regex with id {r['id']} β", color=colors.green)
- else:
- puts("Test Failed: Coulnd't delete the regex β", color=colors.red)
+ if firegex.nfregex_disable_regex(r["id"]):
+ puts(
+ f"Sucessfully disabled regex with id {r['id']} β",
+ color=colors.green,
+ )
+ else:
+ puts("Test Failed: Coulnd't disable the regex β", color=colors.red)
exit_test(1)
break
- if f'regex="{secret.decode()}"' not in firegex.nfregex_get_metrics():
- puts("No regex metrics after deletion β", color=colors.green)
+
+ # Check if it's actually disabled
+ checkRegex(secret, should_work=False)
+
+ # Enable regex
+ for r in firegex.nfregex_get_service_regexes(service_id):
+ if r["regex"] == secret:
+ if firegex.nfregex_enable_regex(r["id"]):
+ puts(
+ f"Sucessfully enabled regex with id {r['id']} β", color=colors.green
+ )
+ else:
+ puts("Test Failed: Coulnd't enable the regex β", color=colors.red)
+ exit_test(1)
+ break
+
+ checkRegex(secret)
+
+ # Delete regex
+ clear_regexes()
+
+ # Check if it's actually deleted
+ checkRegex(secret, should_work=False, deleted=True)
+
+ # Add case insensitive regex
+ if firegex.nfregex_add_regex(
+ service_id, secret, "B", active=True, is_case_sensitive=False
+ ):
+ puts(
+ f"Sucessfully added case insensitive regex {str(secret)} β",
+ color=colors.green,
+ )
else:
- puts("Test Failed: Metrics found after deleting the regex β", color=colors.red)
+ puts(
+ f"Test Failed: Coulnd't add the case insensitive regex {str(secret)} β",
+ color=colors.red,
+ )
exit_test(1)
-checkRegex(secret)
+ checkRegex(secret, upper=True)
+ checkRegex(secret)
-#Pause the proxy
-if(firegex.nfregex_stop_service(service_id)):
- puts(f"Sucessfully paused service with id {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't pause the service β", color=colors.red)
- exit_test(1)
+ clear_regexes()
-#Check if it's actually paused
-checkRegex(secret,should_work=False)
-
-#Start firewall
-if(firegex.nfregex_start_service(service_id)):
- puts(f"Sucessfully started service with id {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't start the service β", color=colors.red)
- exit_test(1)
-
-checkRegex(secret)
-
-#Disable regex
-for r in firegex.nfregex_get_service_regexes(service_id):
- if r["regex"] == secret:
- if(firegex.nfregex_disable_regex(r["id"])):
- puts(f"Sucessfully disabled regex with id {r['id']} β", color=colors.green)
- else:
- puts("Test Failed: Coulnd't disable the regex β", color=colors.red)
- exit_test(1)
- break
-
-#Check if it's actually disabled
-checkRegex(secret,should_work=False)
-
-#Enable regex
-for r in firegex.nfregex_get_service_regexes(service_id):
- if r["regex"] == secret:
- if(firegex.nfregex_enable_regex(r["id"])):
- puts(f"Sucessfully enabled regex with id {r['id']} β", color=colors.green)
- else:
- puts("Test Failed: Coulnd't enable the regex β", color=colors.red)
- exit_test(1)
- break
-
-checkRegex(secret)
-
-#Delete regex
-clear_regexes()
-
-#Check if it's actually deleted
-checkRegex(secret,should_work=False,deleted=True)
-
-#Add case insensitive regex
-if(firegex.nfregex_add_regex(service_id,secret,"B",active=True, is_case_sensitive=False)):
- puts(f"Sucessfully added case insensitive regex {str(secret)} β", color=colors.green)
-else:
- puts(f"Test Failed: Coulnd't add the case insensitive regex {str(secret)} β", color=colors.red)
- exit_test(1)
-
-checkRegex(secret, upper=True)
-checkRegex(secret)
-
-clear_regexes()
-
-#Rename service
-if(firegex.nfregex_rename_service(service_id,f"{args.service_name}2")):
- puts(f"Sucessfully renamed service to {args.service_name}2 β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't rename service β", color=colors.red)
- exit_test(1)
-
-#Check if service was renamed correctly
-service = firegex.nfregex_get_service(service_id)
-if service["name"] == f"{args.service_name}2":
- puts("Checked that service was renamed correctly β", color=colors.green)
-else:
- puts("Test Failed: Service wasn't renamed correctly β", color=colors.red)
- exit_test(1)
-
-#Rename back service
-if(firegex.nfregex_rename_service(service_id,f"{args.service_name}")):
- puts(f"Sucessfully renamed service to {args.service_name} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't rename service β", color=colors.red)
- exit_test(1)
-
-#Change settings
-opposite_proto = "udp" if args.proto == "tcp" else "tcp"
-if(firegex.nfregex_settings_service(service_id, 1338, opposite_proto, "::dead:beef" if args.ipv6 else "123.123.123.123", True)):
- srv_updated = firegex.nfregex_get_service(service_id)
- if srv_updated["port"] == 1338 and srv_updated["proto"] == opposite_proto and ("::dead:beef" if args.ipv6 else "123.123.123.123") in srv_updated["ip_int"] and srv_updated["fail_open"]:
- puts("Sucessfully changed service settings β", color=colors.green)
+ # Rename service
+ if firegex.nfregex_rename_service(service_id, f"{args.service_name}2"):
+ puts(
+ f"Sucessfully renamed service to {args.service_name}2 β", color=colors.green
+ )
else:
- puts("Test Failed: Service settings weren't updated correctly β", color=colors.red)
+ puts("Test Failed: Coulnd't rename service β", color=colors.red)
exit_test(1)
-else:
- puts("Test Failed: Coulnd't change service settings β", color=colors.red)
- exit_test(1)
-exit_test(0)
+ # Check if service was renamed correctly
+ service = firegex.nfregex_get_service(service_id)
+ if service["name"] == f"{args.service_name}2":
+ puts("Checked that service was renamed correctly β", color=colors.green)
+ else:
+ puts("Test Failed: Service wasn't renamed correctly β", color=colors.red)
+ exit_test(1)
+
+ # Rename back service
+ if firegex.nfregex_rename_service(service_id, f"{args.service_name}"):
+ puts(
+ f"Sucessfully renamed service to {args.service_name} β", color=colors.green
+ )
+ else:
+ puts("Test Failed: Coulnd't rename service β", color=colors.red)
+ exit_test(1)
+
+ # Change settings
+ opposite_proto = "udp" if args.proto == "tcp" else "tcp"
+ if firegex.nfregex_settings_service(
+ service_id,
+ 1338,
+ opposite_proto,
+ "::dead:beef" if args.ipv6 else "123.123.123.123",
+ True,
+ ):
+ srv_updated = firegex.nfregex_get_service(service_id)
+ if (
+ srv_updated["port"] == 1338
+ and srv_updated["proto"] == opposite_proto
+ and ("::dead:beef" if args.ipv6 else "123.123.123.123")
+ in srv_updated["ip_int"]
+ and srv_updated["fail_open"]
+ ):
+ puts("Sucessfully changed service settings β", color=colors.green)
+ else:
+ puts(
+ "Test Failed: Service settings weren't updated correctly β",
+ color=colors.red,
+ )
+ exit_test(1)
+ else:
+ puts("Test Failed: Coulnd't change service settings β", color=colors.red)
+ exit_test(1)
+
+ exit_test(0)
diff --git a/tests/ph_test.py b/tests/ph_test.py
index c54dae2..f7e6894 100644
--- a/tests/ph_test.py
+++ b/tests/ph_test.py
@@ -7,131 +7,180 @@ import argparse
import secrets
import time
-parser = argparse.ArgumentParser()
-parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/")
-parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password')
-parser.add_argument("--service_name", "-n", type=str , required=False, help='Name of the test service', default="Test Service")
-parser.add_argument("--port", "-P", type=int , required=False, help='Port of the test service', default=1337)
-parser.add_argument("--ipv6", "-6" , action="store_true", help='Test Ipv6', default=False)
-parser.add_argument("--proto", "-m" , type=str, required=False, choices=["tcp","udp"], help='Select the protocol', default="tcp")
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--address",
+ "-a",
+ type=str,
+ required=False,
+ help="Address of firegex backend",
+ default="http://127.0.0.1:4444/",
+ )
+ parser.add_argument(
+ "--password", "-p", type=str, required=True, help="Firegex password"
+ )
+ parser.add_argument(
+ "--service_name",
+ "-n",
+ type=str,
+ required=False,
+ help="Name of the test service",
+ default="Test Service",
+ )
+ parser.add_argument(
+ "--port",
+ "-P",
+ type=int,
+ required=False,
+ help="Port of the test service",
+ default=1337,
+ )
+ parser.add_argument(
+ "--ipv6", "-6", action="store_true", help="Test Ipv6", default=False
+ )
+ parser.add_argument(
+ "--proto",
+ "-m",
+ type=str,
+ required=False,
+ choices=["tcp", "udp"],
+ help="Select the protocol",
+ default="tcp",
+ )
-args = parser.parse_args()
-sep()
-puts("Testing will start on ", color=colors.cyan, end="")
-puts(f"{args.address}", color=colors.yellow)
+ args = parser.parse_args()
+ sep()
+ puts("Testing will start on ", color=colors.cyan, end="")
+ puts(f"{args.address}", color=colors.yellow)
-firegex = FiregexAPI(args.address)
+ firegex = FiregexAPI(args.address)
-#Login
-if (firegex.login(args.password)):
- puts("Sucessfully logged in β", color=colors.green)
-else:
- puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
- exit(1)
-
-#Create server
-server = (TcpServer if args.proto == "tcp" else UdpServer)(args.port+1,ipv6=args.ipv6,proxy_port=args.port)
-
-def exit_test(code):
- if service_id:
- server.stop()
- if(firegex.ph_delete_service(service_id)):
- puts("Sucessfully deleted service β", color=colors.green)
- else:
- puts("Test Failed: Coulnd't delete serivce β", color=colors.red)
- exit_test(1)
- exit(code)
-
-srvs = firegex.ph_get_services()
-for ele in srvs:
- if ele['name'] == args.service_name:
- firegex.ph_delete_service(ele['service_id'])
-
-#Create and start serivce
-service_id = firegex.ph_add_service(args.service_name, args.port, args.port+1, args.proto , "::1" if args.ipv6 else "127.0.0.1", "::1" if args.ipv6 else "127.0.0.1")
-if service_id:
- puts(f"Sucessfully created service {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Failed to create service β", color=colors.red)
- exit(1)
-
-if(firegex.ph_start_service(service_id)):
- puts("Sucessfully started service β", color=colors.green)
-else:
- puts("Test Failed: Failed to start service β", color=colors.red)
- exit_test(1)
-
-server.start()
-time.sleep(0.5)
-
-#Check if it started
-def checkData(should_work):
- res = None
- try:
- res = server.sendCheckData(secrets.token_bytes(432))
- except (ConnectionRefusedError, TimeoutError):
- res = None
- if res:
- if should_work:
- puts("Successfully received data β", color=colors.green)
- else:
- puts("Test Failed: Connection wasn't blocked β", color=colors.red)
- exit_test(1)
+ # Login
+ if firegex.login(args.password):
+ puts("Sucessfully logged in β", color=colors.green)
else:
- if should_work:
- puts("Test Failed: Data wans't received β", color=colors.red)
- exit_test(1)
+ puts("Test Failed: Unknown response or wrong passowrd β", color=colors.red)
+ exit(1)
+
+ # Create server
+ server = (TcpServer if args.proto == "tcp" else UdpServer)(
+ args.port + 1, ipv6=args.ipv6, proxy_port=args.port
+ )
+
+ def exit_test(code):
+ if service_id:
+ server.stop()
+ if firegex.ph_delete_service(service_id):
+ puts("Sucessfully deleted service β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't delete serivce β", color=colors.red)
+ exit_test(1)
+ exit(code)
+
+ srvs = firegex.ph_get_services()
+ for ele in srvs:
+ if ele["name"] == args.service_name:
+ firegex.ph_delete_service(ele["service_id"])
+
+ # Create and start serivce
+ service_id = firegex.ph_add_service(
+ args.service_name,
+ args.port,
+ args.port + 1,
+ args.proto,
+ "::1" if args.ipv6 else "127.0.0.1",
+ "::1" if args.ipv6 else "127.0.0.1",
+ )
+ if service_id:
+ puts(f"Sucessfully created service {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Failed to create service β", color=colors.red)
+ exit(1)
+
+ if firegex.ph_start_service(service_id):
+ puts("Sucessfully started service β", color=colors.green)
+ else:
+ puts("Test Failed: Failed to start service β", color=colors.red)
+ exit_test(1)
+
+ server.start()
+ time.sleep(0.5)
+
+ # Check if it started
+ def checkData(should_work):
+ res = None
+ try:
+ res = server.sendCheckData(secrets.token_bytes(432))
+ except (ConnectionRefusedError, TimeoutError):
+ res = None
+ if res:
+ if should_work:
+ puts("Successfully received data β", color=colors.green)
+ else:
+ puts("Test Failed: Connection wasn't blocked β", color=colors.red)
+ exit_test(1)
else:
- puts("Successfully blocked connection β", color=colors.green)
+ if should_work:
+ puts("Test Failed: Data wans't received β", color=colors.red)
+ exit_test(1)
+ else:
+ puts("Successfully blocked connection β", color=colors.green)
-checkData(True)
+ checkData(True)
-#Pause the proxy
-if(firegex.ph_stop_service(service_id)):
- puts(f"Sucessfully paused service with id {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't pause the service β", color=colors.red)
+ # Pause the proxy
+ if firegex.ph_stop_service(service_id):
+ puts(f"Sucessfully paused service with id {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't pause the service β", color=colors.red)
+ exit_test(1)
+
+ checkData(False)
+
+ # Start firewall
+ if firegex.ph_start_service(service_id):
+ puts(f"Sucessfully started service with id {service_id} β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't start the service β", color=colors.red)
+ exit_test(1)
+
+ checkData(True)
+
+ # Change port
+ if firegex.ph_change_destination(
+ service_id, "::1" if args.ipv6 else "127.0.0.1", args.port + 2
+ ):
+ puts("Sucessfully changed port β", color=colors.green)
+ else:
+ puts("Test Failed: Coulnd't change destination β", color=colors.red)
+ exit_test(1)
+
+ checkData(False)
+
+ server.stop()
+ server = (TcpServer if args.proto == "tcp" else UdpServer)(
+ args.port + 2, ipv6=args.ipv6, proxy_port=args.port
+ )
+ server.start()
+ time.sleep(0.5)
+
+ checkData(True)
+
+ # Rename service
+ if firegex.ph_rename_service(service_id, f"{args.service_name}2"):
+ puts(
+ f"Sucessfully renamed service to {args.service_name}2 β", color=colors.green
+ )
+ else:
+ puts("Test Failed: Coulnd't rename service β", color=colors.red)
+ exit_test(1)
+
+ # Check if service was renamed correctly
+ for services in firegex.ph_get_services():
+ if services["name"] == f"{args.service_name}2":
+ puts("Checked that service was renamed correctly β", color=colors.green)
+ exit_test(0)
+
+ puts("Test Failed: Service wasn't renamed correctly β", color=colors.red)
exit_test(1)
-
-checkData(False)
-
-#Start firewall
-if(firegex.ph_start_service(service_id)):
- puts(f"Sucessfully started service with id {service_id} β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't start the service β", color=colors.red)
- exit_test(1)
-
-checkData(True)
-
-#Change port
-if(firegex.ph_change_destination(service_id, "::1" if args.ipv6 else "127.0.0.1", args.port+2)):
- puts("Sucessfully changed port β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't change destination β", color=colors.red)
- exit_test(1)
-
-checkData(False)
-
-server.stop()
-server = (TcpServer if args.proto == "tcp" else UdpServer)(args.port+2,ipv6=args.ipv6,proxy_port=args.port)
-server.start()
-time.sleep(0.5)
-
-checkData(True)
-
-#Rename service
-if(firegex.ph_rename_service(service_id,f"{args.service_name}2")):
- puts(f"Sucessfully renamed service to {args.service_name}2 β", color=colors.green)
-else:
- puts("Test Failed: Coulnd't rename service β", color=colors.red)
- exit_test(1)
-
-#Check if service was renamed correctly
-for services in firegex.ph_get_services():
- if services["name"] == f"{args.service_name}2":
- puts("Checked that service was renamed correctly β", color=colors.green)
- exit_test(0)
-
-puts("Test Failed: Service wasn't renamed correctly β", color=colors.red)
-exit_test(1)
diff --git a/tests/utils/tcpserver.py b/tests/utils/tcpserver.py
index 2d04547..98be531 100644
--- a/tests/utils/tcpserver.py
+++ b/tests/utils/tcpserver.py
@@ -1,9 +1,46 @@
+import queue
from multiprocessing import Process, Queue
import socket
import traceback
+
+def _start_tcp_server(port, server_queue: Queue, ipv6, verbose):
+ sock = socket.socket(
+ socket.AF_INET6 if ipv6 else socket.AF_INET, socket.SOCK_STREAM
+ )
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(("::1" if ipv6 else "127.0.0.1", port))
+ sock.listen(8)
+ while True:
+ connection, address = sock.accept()
+ while True:
+ try:
+ buf = connection.recv(4096)
+ if buf == b"":
+ break
+
+ reply = buf # Default to echo
+ try:
+ # See if there is a custom reply, but don't block
+ custom_reply = server_queue.get(block=False)
+ reply = custom_reply
+ except queue.Empty:
+ pass # No custom reply, just echo
+
+ if verbose:
+ print("SERVER: ", reply)
+ connection.sendall(reply)
+ except (ConnectionResetError, BrokenPipeError):
+ break # Client closed connection
+ except Exception:
+ if verbose:
+ traceback.print_exc()
+ break # Exit on other errors
+ connection.close()
+
+
class TcpServer:
- def __init__(self,port,ipv6,proxy_port=None, verbose=False):
+ def __init__(self, port, ipv6, proxy_port=None, verbose=False):
self.proxy_port = proxy_port
self.ipv6 = ipv6
self.port = port
@@ -12,43 +49,30 @@ class TcpServer:
self._regen_process()
def _regen_process(self):
- def _startServer(port, server_queue:Queue):
- sock = socket.socket(socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(('::1' if self.ipv6 else '127.0.0.1', port))
- sock.listen(8)
- while True:
- connection,address = sock.accept()
- while True:
- try:
- buf = connection.recv(4096)
- if buf == b'':
- break
- try:
- buf = server_queue.get(block=False)
- except Exception:
- pass
- if self.verbose:
- print("SERVER: ", buf)
- connection.sendall(buf)
- except Exception:
- if self.verbose:
- traceback.print_exc()
- connection.close()
- self.server = Process(target=_startServer,args=[self.port, self._server_data_queue])
+ self.server = Process(
+ target=_start_tcp_server,
+ args=[self.port, self._server_data_queue, self.ipv6, self.verbose],
+ )
def start(self):
self.server.start()
-
+
def stop(self):
self.server.terminate()
self.server.join()
self._regen_process()
def connect_client(self):
- self.client_sock = socket.socket(socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_STREAM)
+ self.client_sock = socket.socket(
+ socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_STREAM
+ )
self.client_sock.settimeout(1)
- self.client_sock.connect(('::1' if self.ipv6 else '127.0.0.1', self.proxy_port if self.proxy_port else self.port))
+ self.client_sock.connect(
+ (
+ "::1" if self.ipv6 else "127.0.0.1",
+ self.proxy_port if self.proxy_port else self.port,
+ )
+ )
def close_client(self):
if self.client_sock:
@@ -60,7 +84,7 @@ class TcpServer:
if server_reply:
self._server_data_queue.put(server_reply)
self.client_sock.sendall(packet)
-
+
def recv_packet(self):
try:
return self.client_sock.recv(4096)
@@ -68,7 +92,7 @@ class TcpServer:
if self.verbose:
traceback.print_exc()
return False
-
+
def sendCheckData(self, data, get_data=False):
self.connect_client()
self.send_packet(data)
diff --git a/tests/utils/udpserver.py b/tests/utils/udpserver.py
index 2429fe9..7c46feb 100644
--- a/tests/utils/udpserver.py
+++ b/tests/utils/udpserver.py
@@ -1,35 +1,94 @@
-from multiprocessing import Process
+from multiprocessing import Process, Queue
import socket
+import queue
+import traceback
+
+
+def _start_udp_server(port, server_queue: Queue, ipv6, verbose):
+ sock = socket.socket(socket.AF_INET6 if ipv6 else socket.AF_INET, socket.SOCK_DGRAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(("::1" if ipv6 else "127.0.0.1", port))
+ while True:
+ try:
+ bytesAddressPair = sock.recvfrom(4096)
+ message = bytesAddressPair[0]
+ address = bytesAddressPair[1]
+
+ reply = message # Default to echo
+ try:
+ # See if there is a custom reply, but don't block
+ custom_reply = server_queue.get(block=False)
+ reply = custom_reply
+ except queue.Empty:
+ pass # No custom reply, just echo
+
+ if verbose:
+ print(f"SERVER: sending {reply} to {address}")
+ sock.sendto(reply, address)
+ except Exception:
+ if verbose:
+ traceback.print_exc()
+
class UdpServer:
- def __init__(self,port,ipv6, proxy_port = None):
- def _startServer(port):
- sock = socket.socket(socket.AF_INET6 if ipv6 else socket.AF_INET, socket.SOCK_DGRAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(('::1' if ipv6 else '127.0.0.1', port))
- while True:
- bytesAddressPair = sock.recvfrom(432)
- message = bytesAddressPair[0]
- address = bytesAddressPair[1]
- sock.sendto(message, address)
-
- self.ipv6 = ipv6
+ def __init__(self, port, ipv6, proxy_port=None, verbose=False):
self.port = port
+ self.ipv6 = ipv6
self.proxy_port = proxy_port
- self.server = Process(target=_startServer,args=[port])
+ self.verbose = verbose
+ self._server_data_queue = Queue()
+ self._regen_process()
+
+ def _regen_process(self):
+ self.server = Process(
+ target=_start_udp_server,
+ args=[self.port, self._server_data_queue, self.ipv6, self.verbose],
+ )
def start(self):
self.server.start()
-
+
def stop(self):
self.server.terminate()
+ self.server.join()
+ self._regen_process()
- def sendCheckData(self,data):
- s = socket.socket(socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_DGRAM)
- s.settimeout(2)
- s.sendto(data, ('::1' if self.ipv6 else '127.0.0.1', self.proxy_port if self.proxy_port else self.port))
+ def connect_client(self):
+ self.client_sock = socket.socket(
+ socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_DGRAM
+ )
+ self.client_sock.settimeout(1)
+ self.client_sock.connect(
+ (
+ "::1" if self.ipv6 else "127.0.0.1",
+ self.proxy_port if self.proxy_port else self.port,
+ )
+ )
+
+ def close_client(self):
+ if self.client_sock:
+ self.client_sock.close()
+
+ def send_packet(self, packet, server_reply=None):
+ if self.verbose:
+ print("CLIENT: ", packet)
+ if server_reply:
+ self._server_data_queue.put(server_reply)
+ self.client_sock.sendall(packet)
+
+ def recv_packet(self):
try:
- received_data = s.recvfrom(432)
- except Exception:
+ return self.client_sock.recv(4096)
+ except (TimeoutError, ConnectionResetError):
+ if self.verbose:
+ traceback.print_exc()
return False
- return received_data[0] == data
\ No newline at end of file
+
+ def sendCheckData(self, data, get_data=False):
+ self.connect_client()
+ self.send_packet(data)
+ received_data = self.recv_packet()
+ self.close_client()
+ if get_data:
+ return received_data
+ return received_data == data