From 92be4e306b5eeb6387c4d2c6d59550efc9ebecef Mon Sep 17 00:00:00 2001 From: nik012003 Date: Tue, 2 Aug 2022 11:35:06 +0200 Subject: [PATCH] Netfilter Regex tests complete --- tests/api_test.py | 18 ++--- tests/nf_test.py | 149 ++++++++++++++++++++++++++++++-------- tests/run_tests.sh | 11 +++ tests/utils/firegexapi.py | 8 +- tests/utils/tcpserver.py | 11 +-- tests/utils/udpserver.py | 34 +++++++++ 6 files changed, 182 insertions(+), 49 deletions(-) create mode 100755 tests/run_tests.sh create mode 100644 tests/utils/udpserver.py diff --git a/tests/api_test.py b/tests/api_test.py index 906b108..f21f28d 100755 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -5,30 +5,28 @@ import argparse, secrets parser = argparse.ArgumentParser() parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") -parser.add_argument("--password", "-p", type=str, required=False, help='Firegex password') +parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password') args = parser.parse_args() sep() puts(f"Testing will start on ", color=colors.cyan, end="") puts(f"{args.address}", color=colors.yellow) firegex = FiregexAPI(args.address) -password = "" + #Connect to Firegex -if args.password: - password = args.password +if firegex.status()["status"] =="init": + if (firegex.set_password(args.password)): puts(f"Sucessfully set password to {args.password} ✔", color=colors.green) + else: puts(f"Test Failed: Unknown response or password already put ✗", color=colors.red); exit(1) +else: if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colors.green) else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) -else: - password = secrets.token_hex(10) - if (firegex.set_password(args.password)): puts(f"Sucessfully set password to {password} ✔", color=colors.green) - else: puts(f"Test Failed: Unknown response or password already put ✗", color=colors.red); exit(1) if(firegex.status()["loggined"]): puts(f"Correctly received status ✔", color=colors.green) else: puts(f"Test Failed: Unknown response or not logged in✗", color=colors.red); exit(1) #Prepare second instance firegex2 = FiregexAPI(args.address) -if (firegex2.login(password)): puts(f"Sucessfully logged in on second instance ✔", color=colors.green) +if (firegex2.login(args.password)): puts(f"Sucessfully logged in on second instance ✔", color=colors.green) else: puts(f"Test Failed: Unknown response or wrong passowrd on second instance ✗", color=colors.red); exit(1) if(firegex2.status()["loggined"]): puts(f"Correctly received status on second instance✔", color=colors.green) @@ -51,7 +49,7 @@ if (firegex2.login(new_password)): puts(f"Sucessfully logged in on second instan else: puts(f"Test Failed: Unknown response or wrong passowrd on second instance ✗", color=colors.red); exit(1) #Change it back -if (firegex.change_password(password,expire=False)): puts(f"Sucessfully restored the password ✔", color=colors.green) +if (firegex.change_password(args.password,expire=False)): puts(f"Sucessfully restored the password ✔", color=colors.green) else: puts(f"Test Failed: Coundl't change the password ✗", color=colors.red); exit(1) #Check if we are still logged in diff --git a/tests/nf_test.py b/tests/nf_test.py index 0efd249..4eda823 100755 --- a/tests/nf_test.py +++ b/tests/nf_test.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 from utils.colors import * from utils.firegexapi import * -from utils.tcpserver import * +from utils.tcpserver import TcpServer +from utils.udpserver import UdpServer import argparse, secrets, base64,time parser = argparse.ArgumentParser() @@ -9,6 +10,8 @@ parser.add_argument("--address", "-a", type=str , required=False, help='Address parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password') parser.add_argument("--service_name", "-n", type=str , required=False, help='Name of the test service', default="Test Service") parser.add_argument("--port", "-P", type=int , required=False, help='Port of the test service', default=1337) +parser.add_argument("--ipv6", "-6" , action="store_true", help='Test Ipv6', default=False) +parser.add_argument("--proto", "-m" , type=str, required=False, choices=["tcp","udp"], help='Select the protocol', default="tcp") args = parser.parse_args() sep() @@ -21,16 +24,17 @@ firegex = FiregexAPI(args.address) if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colors.green) else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) -#TCP tests -server = TcpServer(args.port) +#Create server +server = TcpServer(args.port,ipv6=args.ipv6) if args.proto == "tcp" else UdpServer(args.port,ipv6=args.ipv6) def exit_test(code): if service_id: server.stop() - firegex.nf_delete_service(service_id) + if(firegex.nf_delete_service(service_id)): puts(f"Sucessfully deleted service ✔", color=colors.green) + else: puts(f"Test Failed: Coulnd't deleted serivce ✗", color=colors.red); exit_test(1) exit(code) -service_id = firegex.nf_add_service(args.service_name, args.port, "tcp", "127.0.0.1/24") +service_id = firegex.nf_add_service(args.service_name, args.port, args.proto , "::1" if args.ipv6 else "127.0.0.1" ) if service_id: puts(f"Sucessfully created service {service_id} ✔", color=colors.green) else: puts(f"Test Failed: Failed to create service ✗", color=colors.red); exit(1) @@ -38,8 +42,8 @@ if(firegex.nf_start_service(service_id)): puts(f"Sucessfully started service ✔ else: puts(f"Test Failed: Failed to start service ✗", color=colors.red); exit_test(1) server.start() - -if server.sendCheckData(secrets.token_bytes(200)): +time.sleep(0.5) +if server.sendCheckData(secrets.token_bytes(432)): puts(f"Successfully tested first proxy with no regex ✔", color=colors.green) else: puts(f"Test Failed: Data was corrupted ", color=colors.red); exit_test(1) @@ -48,28 +52,35 @@ else: secret = bytes(secrets.token_hex(16).encode()) regex = base64.b64encode(secret).decode() if(firegex.nf_add_regex(service_id,regex,"B",active=True,is_blacklist=True,is_case_sensitive=True)): - puts(f"Sucessfully added regex {regex} ✔", color=colors.green) -else: puts(f"Test Failed: Coulnd't add the regex {secret} ✗", color=colors.red); exit_test(1) + puts(f"Sucessfully added regex {str(secret)} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't add the regex {str(secret)} ✗", color=colors.red); exit_test(1) #Check if regex is present in the service n_blocked = 0 -def checkRegex(regex): - global n_blocked - for r in firegex.nf_get_service_regexes(service_id): - if r["regex"] == regex: - #Test the regex - if not server.sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): - puts(f"The malicious request was successfully blocked ✔", color=colors.green) - n_blocked += 1 - if firegex.nf_get_regex(r["id"])["n_packets"] == n_blocked: - puts(f"The packed was reported as blocked ✔", color=colors.green) +def checkRegex(regex, should_work=True, upper=False): + if should_work: + global n_blocked + for r in firegex.nf_get_service_regexes(service_id): + if r["regex"] == regex: + #Test the regex + s = base64.b64decode(regex).upper() if upper else base64.b64decode(regex) + if not server.sendCheckData(secrets.token_bytes(200) + s + secrets.token_bytes(200)): + puts(f"The malicious request was successfully blocked ✔", color=colors.green) + n_blocked += 1 + if firegex.nf_get_regex(r["id"])["n_packets"] == n_blocked: + puts(f"The packed was reported as blocked ✔", color=colors.green) + else: + puts(f"Test Failed: The packed wasn't reported as blocked ✗", color=colors.red); exit_test(1) else: - puts(f"Test Failed: The packed wasn't reported as blocked ✗", color=colors.red); exit_test(1) - else: - puts(f"Test Failed: The request wasn't blocked ✗", color=colors.red);exit_test(1) - return - puts(f"Test Failed: The regex wasn't found ✗", color=colors.red); exit_test(1) + puts(f"Test Failed: The request wasn't blocked ✗", color=colors.red);exit_test(1) + return + puts(f"Test Failed: The regex wasn't found ✗", color=colors.red); exit_test(1) + else: + if server.sendCheckData(secrets.token_bytes(200) + base64.b64decode(regex) + secrets.token_bytes(200)): + puts(f"The request wasn't blocked ✔", color=colors.green) + else: + puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red) checkRegex(regex) @@ -78,10 +89,7 @@ if(firegex.nf_stop_service(service_id)): puts(f"Sucessfully paused service with else: puts(f"Test Failed: Coulnd't pause the service ✗", color=colors.red); exit_test(1) #Check if it's actually paused -if server.sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): - puts(f"The request wasn't blocked ✔", color=colors.green) -else: - puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red) +checkRegex(regex,should_work=False) #Start firewall if(firegex.nf_start_service(service_id)): puts(f"Sucessfully started service with id {service_id} ✔", color=colors.green) @@ -89,6 +97,87 @@ else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); ex checkRegex(regex) -#TODO: test whitelist, enable/disable regex, and UDP +#Disable regex +for r in firegex.nf_get_service_regexes(service_id): + if r["regex"] == regex: + if(firegex.nf_disable_regex(r["id"])): + puts(f"Sucessfully disabled regex with id {r['id']} ✔", color=colors.green) + else: + puts(f"Test Failed: Coulnd't disable the regex ✗", color=colors.red); exit_test(1) + break -exit_test(0) \ No newline at end of file +#Check if it's actually disabled +checkRegex(regex,should_work=False) + +#Enable regex +for r in firegex.nf_get_service_regexes(service_id): + if r["regex"] == regex: + if(firegex.nf_enable_regex(r["id"])): + puts(f"Sucessfully enabled regex with id {r['id']} ✔", color=colors.green) + else: + puts(f"Test Failed: Coulnd't enable the regex ✗", color=colors.red); exit_test(1) + break + +checkRegex(regex) + +#Delete regex +n_blocked = 0 +for r in firegex.nf_get_service_regexes(service_id): + if r["regex"] == regex: + if(firegex.nf_delete_regex(r["id"])): + puts(f"Sucessfully deleted regex with id {r['id']} ✔", color=colors.green) + else: + puts(f"Test Failed: Coulnd't delete the regex ✗", color=colors.red); exit_test(1) + break + +#Check if it's actually deleted +checkRegex(regex,should_work=False) + +#Add case insensitive regex +if(firegex.nf_add_regex(service_id,regex,"B",active=True,is_blacklist=True,is_case_sensitive=False)): + puts(f"Sucessfully added case insensitive regex {str(secret)} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't add the case insensitive regex {str(secret)} ✗", color=colors.red); exit_test(1) + +checkRegex(regex,upper=True) +checkRegex(regex) + +#Delete regex +n_blocked = 0 +for r in firegex.nf_get_service_regexes(service_id): + if r["regex"] == regex: + if(firegex.nf_delete_regex(r["id"])): + puts(f"Sucessfully deleted regex with id {r['id']} ✔", color=colors.green) + else: + puts(f"Test Failed: Coulnd't delete the regex ✗", color=colors.red); exit_test(1) + break + +#Add whitelist regex +if(firegex.nf_add_regex(service_id,regex,"B",active=True,is_blacklist=False,is_case_sensitive=True)): + puts(f"Sucessfully added case whitelist regex {str(secret)} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't add the case whiteblist regex {str(secret)} ✗", color=colors.red); exit_test(1) + +checkRegex(regex,should_work=False) +checkRegex(regex,upper=True) #Dirty way to test the whitelist :p + +#Delete regex +n_blocked = 0 +for r in firegex.nf_get_service_regexes(service_id): + if r["regex"] == regex: + if(firegex.nf_delete_regex(r["id"])): + puts(f"Sucessfully deleted regex with id {r['id']} ✔", color=colors.green) + else: + puts(f"Test Failed: Coulnd't delete the regex ✗", color=colors.red); exit_test(1) + break + +#Rename service +if(firegex.nf_rename_service(service_id,f"{args.service_name}2")): puts(f"Sucessfully renamed service to {args.service_name}2 ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't rename service ✗", color=colors.red); exit_test(1) + +#Check if service was renamed correctly +for services in firegex.nf_get_services(): + if services["name"] == f"{args.service_name}2": + puts(f"Checked that service was renamed correctly ✔", color=colors.green) + exit_test(0) + +puts(f"Test Failed: Service wasn't renamed correctly ✗", color=colors.red); exit_test(1) +exit_test(1) \ No newline at end of file diff --git a/tests/run_tests.sh b/tests/run_tests.sh new file mode 100755 index 0000000..4dc9707 --- /dev/null +++ b/tests/run_tests.sh @@ -0,0 +1,11 @@ +#!/bin/sh +echo "Running standard API test" +python3 api_test.py -p testpassword +echo "Running Netfilter Regex TCP ipv4" +python3 nf_test.py -p testpassword -m tcp +echo "Running Netfilter Regex TCP ipv6" +python3 nf_test.py -p testpassword -m tcp -6 +echo "Running Netfilter Regex UDP ipv4" +python3 nf_test.py -p testpassword -m udp +echo "Running Netfilter Regex UDP ipv6" +python3 nf_test.py -p testpassword -m udp -6 \ No newline at end of file diff --git a/tests/utils/firegexapi.py b/tests/utils/firegexapi.py index fe4cf67..d54389e 100644 --- a/tests/utils/firegexapi.py +++ b/tests/utils/firegexapi.py @@ -96,8 +96,8 @@ class FiregexAPI: req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/delete") return verify(req) - def nf_rename_service(self,newname): - req = self.s.post(f"{self.address}api/nfregex/services/add" , json={"name":service_name,"port":service_port, "ip_int": ip_int, "proto": proto}) + def nf_rename_service(self,service_id,newname): + req = self.s.post(f"{self.address}api/nfregex/service/{service_id}/rename" , json={"name":newname}) return verify(req) def nf_get_service_regexes(self,service_id): @@ -191,8 +191,8 @@ class FiregexAPI: json={"service_id": service_id, "regex": regex, "mode": mode, "active": active, "is_blacklist": is_blacklist, "is_case_sensitive": is_case_sensitive}) return verify(req) - def px_rename_service(self,newname): - req = self.s.post(f"{self.address}api/regexproxy/services/add" , json={"name":service_name,"port":service_port, "ip_int": ip_int, "proto": proto}) + def px_rename_service(self,service_id,newname): + req = self.s.post(f"{self.address}api/regexproxy/service/{service_id}/rename" , json={"name":newname}) return verify(req) def px_add_service(self, name: str, port: int, internalPort: [int,None]): diff --git a/tests/utils/tcpserver.py b/tests/utils/tcpserver.py index 738b7d4..6396d4e 100644 --- a/tests/utils/tcpserver.py +++ b/tests/utils/tcpserver.py @@ -2,17 +2,18 @@ from multiprocessing import Process import socket class TcpServer: - def __init__(self,port): + def __init__(self,port,ipv6): def _startServer(port): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(socket.AF_INET6 if ipv6 else socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('127.0.0.1', port)) + sock.bind(('::1' if ipv6 else '127.0.0.1', port)) sock.listen(8) while True: connection,address = sock.accept() buf = connection.recv(4096) connection.send(buf) connection.close() + self.ipv6 = ipv6 self.port = port self.server = Process(target=_startServer,args=[port]) @@ -23,8 +24,8 @@ class TcpServer: self.server.terminate() def sendCheckData(self,data): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', self.port)) + s = socket.socket(socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_STREAM) + s.connect(('::1' if self.ipv6 else '127.0.0.1', self.port)) s.sendall(data) received_data = s.recv(4096) s.close() diff --git a/tests/utils/udpserver.py b/tests/utils/udpserver.py new file mode 100644 index 0000000..39b3e9b --- /dev/null +++ b/tests/utils/udpserver.py @@ -0,0 +1,34 @@ +from multiprocessing import Process +import socket + +class UdpServer: + def __init__(self,port,ipv6): + def _startServer(port): + sock = socket.socket(socket.AF_INET6 if ipv6 else socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('::1' if ipv6 else '127.0.0.1', port)) + while True: + bytesAddressPair = sock.recvfrom(432) + message = bytesAddressPair[0] + address = bytesAddressPair[1] + sock.sendto(message, address) + + self.ipv6 = ipv6 + self.port = port + self.server = Process(target=_startServer,args=[port]) + + def start(self): + self.server.start() + + def stop(self): + self.server.terminate() + + def sendCheckData(self,data): + s = socket.socket(socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(2) + s.sendto(data, ('::1' if self.ipv6 else '127.0.0.1', self.port)) + try: + received_data = s.recvfrom(432) + except Exception: + return False + return received_data[0] == data \ No newline at end of file