From 50b4871dfb6e4f564dbab97f1d0f95b3470f7246 Mon Sep 17 00:00:00 2001 From: nik012003 Date: Fri, 12 Aug 2022 16:30:43 +0200 Subject: [PATCH] Added tests for Porthijack --- tests/nf_test.py | 4 +- tests/ph_test.py | 99 +++++++++++++++++++++++++++++++++++++++ tests/run_tests.sh | 10 +++- tests/utils/firegexapi.py | 97 ++++++++++++++++++++++++++------------ tests/utils/udpserver.py | 5 +- 5 files changed, 179 insertions(+), 36 deletions(-) create mode 100755 tests/ph_test.py diff --git a/tests/nf_test.py b/tests/nf_test.py index 4eda823..e35f340 100755 --- a/tests/nf_test.py +++ b/tests/nf_test.py @@ -25,13 +25,13 @@ if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colo else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) #Create server -server = TcpServer(args.port,ipv6=args.ipv6) if args.proto == "tcp" else UdpServer(args.port,ipv6=args.ipv6) +server = (TcpServer if args.proto == "tcp" else UdpServer)(args.port,ipv6=args.ipv6) def exit_test(code): if service_id: server.stop() if(firegex.nf_delete_service(service_id)): puts(f"Sucessfully deleted service ✔", color=colors.green) - else: puts(f"Test Failed: Coulnd't deleted serivce ✗", color=colors.red); exit_test(1) + else: puts(f"Test Failed: Coulnd't delete serivce ✗", color=colors.red); exit_test(1) exit(code) service_id = firegex.nf_add_service(args.service_name, args.port, args.proto , "::1" if args.ipv6 else "127.0.0.1" ) diff --git a/tests/ph_test.py b/tests/ph_test.py new file mode 100755 index 0000000..82c7e16 --- /dev/null +++ b/tests/ph_test.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +from utils.colors import * +from utils.firegexapi import * +from utils.tcpserver import TcpServer +from utils.udpserver import UdpServer +import argparse, secrets, base64,time + +parser = argparse.ArgumentParser() +parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") +parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password') +parser.add_argument("--service_name", "-n", type=str , required=False, help='Name of the test service', default="Test Service") +parser.add_argument("--port", "-P", type=int , required=False, help='Port of the test service', default=1337) +parser.add_argument("--ipv6", "-6" , action="store_true", help='Test Ipv6', default=False) +parser.add_argument("--proto", "-m" , type=str, required=False, choices=["tcp","udp"], help='Select the protocol', default="tcp") + +args = parser.parse_args() +sep() +puts(f"Testing will start on ", color=colors.cyan, end="") +puts(f"{args.address}", color=colors.yellow) + +firegex = FiregexAPI(args.address) + +#Login +if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) + +#Create server +server = (TcpServer if args.proto == "tcp" else UdpServer)(args.port+1,ipv6=args.ipv6,proxy_port=args.port) + +def exit_test(code): + if service_id: + server.stop() + if(firegex.ph_delete_service(service_id)): puts(f"Sucessfully deleted service ✔", color=colors.green) + else: puts(f"Test Failed: Coulnd't delete serivce ✗", color=colors.red); exit_test(1) + exit(code) + +#Create and start serivce +service_id = firegex.ph_add_service(args.service_name, args.port, args.port+1, args.proto , "::1" if args.ipv6 else "127.0.0.1", "::1" if args.ipv6 else "127.0.0.1") +if service_id: puts(f"Sucessfully created service {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Failed to create service ✗", color=colors.red); exit(1) + +if(firegex.ph_start_service(service_id)): puts(f"Sucessfully started service ✔", color=colors.green) +else: puts(f"Test Failed: Failed to start service ✗", color=colors.red); exit_test(1) + +server.start() +time.sleep(0.5) + +#Check if it started +def checkData(should_work): + res = None + try: res = server.sendCheckData(secrets.token_bytes(432)) + except ConnectionRefusedError: res = False + if res: + if should_work: puts(f"Successfully received data ✔", color=colors.green) + else: puts("Test Failed: Connection wasn't blocked ✗", color=colors.red); exit_test(1) + else: + if should_work: puts(f"Test Failed: Data wans't received ✗", color=colors.red); exit_test(1) + else: puts(f"Successfully blcoked connection ✔", color=colors.green) + +checkData(True) + +#Pause the proxy +if(firegex.ph_stop_service(service_id)): puts(f"Sucessfully paused service with id {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't pause the service ✗", color=colors.red); exit_test(1) + +checkData(False) + +#Start firewall +if(firegex.ph_start_service(service_id)): puts(f"Sucessfully started service with id {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1) + +checkData(True) + +#Change port +if(firegex.ph_change_destination(service_id, "::1" if args.ipv6 else "127.0.0.1", args.port+2)): + puts(f"Sucessfully changed port ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't change destination ✗", color=colors.red); exit_test(1) + +checkData(False) + +server.stop() +server = (TcpServer if args.proto == "tcp" else UdpServer)(args.port+2,ipv6=args.ipv6,proxy_port=args.port) +server.start() +time.sleep(0.5) + +checkData(True) + +#Rename service +if(firegex.ph_rename_service(service_id,f"{args.service_name}2")): puts(f"Sucessfully renamed service to {args.service_name}2 ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't rename service ✗", color=colors.red); exit_test(1) + +#Check if service was renamed correctly +for services in firegex.ph_get_services(): + if services["name"] == f"{args.service_name}2": + puts(f"Checked that service was renamed correctly ✔", color=colors.green) + exit_test(0) + +puts(f"Test Failed: Service wasn't renamed correctly ✗", color=colors.red); exit_test(1) +exit_test(1) \ No newline at end of file diff --git a/tests/run_tests.sh b/tests/run_tests.sh index 2f67aec..4d448f9 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -11,4 +11,12 @@ python3 nf_test.py -p testpassword -m udp echo "Running Netfilter Regex UDP ipv6" python3 nf_test.py -p testpassword -m udp -6 echo "Running Proxy Regex" -python3 px_test.py -p testpassword \ No newline at end of file +python3 px_test.py -p testpassword +echo "Running Port Hijack TCP ipv4" +python3 ph_test.py -p testpassword -m tcp +echo "Running Port Hijack TCP ipv6" +python3 ph_test.py -p testpassword -m tcp -6 +echo "Running Port Hijack UDP ipv4" +python3 ph_test.py -p testpassword -m udp +echo "Running Port Hijack UDP ipv6" +python3 ph_test.py -p testpassword -m udp -6 \ No newline at end of file diff --git a/tests/utils/firegexapi.py b/tests/utils/firegexapi.py index 928cdbb..51da773 100644 --- a/tests/utils/firegexapi.py +++ b/tests/utils/firegexapi.py @@ -1,3 +1,4 @@ +import string from requests import Session def verify(req): @@ -36,7 +37,7 @@ class FiregexAPI: def status(self): return self.s.get(f"{self.address}api/status").json() - def login(self,password): + def login(self,password: str): req = self.s.post(f"{self.address}api/login", data=f"username=login&password={password}") try : self.s.set_token(req.json()["access_token"]) @@ -48,7 +49,7 @@ class FiregexAPI: self.s.unset_token() return True - def set_password(self,password): + def set_password(self,password: str): req = self.s.post(f"{self.address}api/set-password", json={"password":password}) if verify(req): self.s.set_token(req.json()["access_token"]) @@ -56,7 +57,7 @@ class FiregexAPI: else: return False - def change_password(self,password,expire): + def change_password(self, password: str, expire: bool): req = self.s.post(f"{self.address}api/change-password", json={"password":password, "expire":expire}) if verify(req): self.s.set_token(req.json()["access_token"]) @@ -68,11 +69,11 @@ class FiregexAPI: req = self.s.get(f"{self.address}api/interfaces") return req.json() - def reset(self, delete): + def reset(self, delete: bool): req = self.s.post(f"{self.address}api/reset", json={"delete":delete}) #Netfilter regex - def nf_get_stats(): + def nf_get_stats(self): req = self.s.get(f"{self.address}api/nfregex/stats") return req.json() @@ -80,43 +81,43 @@ class FiregexAPI: req = self.s.get(f"{self.address}api/nfregex/services") return req.json() - def nf_get_service(self,service_id): + def nf_get_service(self,service_id: str): req = self.s.get(f"{self.address}api/nfregex/service/{service_id}") return req.json() - def nf_stop_service(self,service_id): + def nf_stop_service(self,service_id: str): req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/stop") return verify(req) - def nf_start_service(self,service_id): + def nf_start_service(self,service_id: str): req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/start") return verify(req) - def nf_delete_service(self,service_id): + def nf_delete_service(self,service_id: str): req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/delete") return verify(req) - def nf_rename_service(self,service_id,newname): + def nf_rename_service(self,service_id: str, newname: str): req = self.s.post(f"{self.address}api/nfregex/service/{service_id}/rename" , json={"name":newname}) return verify(req) - def nf_get_service_regexes(self,service_id): + def nf_get_service_regexes(self,service_id: str): req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/regexes") return req.json() - def nf_get_regex(self,regex_id): + def nf_get_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}") return req.json() - def nf_delete_regex(self,regex_id): + def nf_delete_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}/delete") return verify(req) - def nf_enable_regex(self,regex_id): + def nf_enable_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}/enable") return verify(req) - def nf_disable_regex(self,regex_id): + def nf_disable_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}/disable") return verify(req) @@ -131,7 +132,7 @@ class FiregexAPI: return req.json()["service_id"] if verify(req) else False #Proxy regex - def px_get_stats(): + def px_get_stats(self): req = self.s.get(f"{self.address}api/regexproxy/stats") return req.json() @@ -139,54 +140,54 @@ class FiregexAPI: req = self.s.get(f"{self.address}api/regexproxy/services") return req.json() - def px_get_service(self,service_id): + def px_get_service(self,service_id: str): req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}") return req.json() - def px_stop_service(self,service_id): + def px_stop_service(self,service_id: str): req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/stop") return verify(req) - def px_pause_service(self,service_id): + def px_pause_service(self,service_id: str): req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/pause") return verify(req) - def px_start_service(self,service_id): + def px_start_service(self,service_id: str): req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/start") return verify(req) - def px_delete_service(self,service_id): + def px_delete_service(self,service_id: str): req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/delete") return verify(req) - def px_regen_service_port(self,service_id): + def px_regen_service_port(self,service_id: str): req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/regen-port") return verify(req) - def px_change_service_port(self,service_id, port=None, internalPort=None): + def px_change_service_port(self,service_id: str, port:int =None, internalPort:int =None): payload = {} if port: payload["port"] = port if internalPort: payload["internalPort"] = internalPort req = self.s.post(f"{self.address}api/regexproxy/service/{service_id}/change-ports", json=payload) return req.json() if verify(req) else False - def px_get_service_regexes(self,service_id): + def px_get_service_regexes(self,service_id: str): req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/regexes") return req.json() - def px_get_regex(self,regex_id): + def px_get_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}") return req.json() - def px_delete_regex(self,regex_id): + def px_delete_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}/delete") return verify(req) - def px_enable_regex(self,regex_id): + def px_enable_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}/enable") return verify(req) - def px_disable_regex(self,regex_id): + def px_disable_regex(self,regex_id: str): req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}/disable") return verify(req) @@ -195,15 +196,49 @@ class FiregexAPI: json={"service_id": service_id, "regex": regex, "mode": mode, "active": active, "is_blacklist": is_blacklist, "is_case_sensitive": is_case_sensitive}) return verify(req) - def px_rename_service(self,service_id,newname): + def px_rename_service(self,service_id: str, newname: str): req = self.s.post(f"{self.address}api/regexproxy/service/{service_id}/rename" , json={"name":newname}) return verify(req) - def px_add_service(self, name: str, port: int, internalPort = None): + def px_add_service(self, name: str, port: int, internalPort:int = None): payload = {} payload["name"] = name payload["port"] = port if internalPort: payload["internalPort"] = internalPort req = self.s.post(f"{self.address}api/regexproxy/services/add" , json=payload) - return req.json()["id"] if verify(req) else False \ No newline at end of file + return req.json()["id"] if verify(req) else False + + #PortHijack + def ph_get_services(self): + req = self.s.get(f"{self.address}api/porthijack/services") + return req.json() + + def ph_get_service(self,service_id: str): + req = self.s.get(f"{self.address}api/porthijack/service/{service_id}") + return req.json() + + def ph_stop_service(self,service_id: str): + req = self.s.get(f"{self.address}api/porthijack/service/{service_id}/stop") + return verify(req) + + def ph_start_service(self,service_id: str): + req = self.s.get(f"{self.address}api/porthijack/service/{service_id}/start") + return verify(req) + + def ph_delete_service(self,service_id: str): + req = self.s.get(f"{self.address}api/porthijack/service/{service_id}/delete") + return verify(req) + + def ph_rename_service(self,service_id: str,newname: str): + req = self.s.post(f"{self.address}api/porthijack/service/{service_id}/rename" , json={"name":newname}) + return verify(req) + + def ph_change_destination(self,service_id: str, ip_dst:string , proxy_port: int): + req = self.s.post(f"{self.address}api/porthijack/service/{service_id}/change-destination", json={"ip_dst": ip_dst, "proxy_port": proxy_port}) + return verify(req) + + def ph_add_service(self, name: str, public_port: int, proxy_port: int, proto: str, ip_src: str, ip_dst: str): + req = self.s.post(f"{self.address}api/porthijack/services/add" , + json={"name":name, "public_port": public_port, "proxy_port":proxy_port, "proto": proto, "ip_src": ip_src, "ip_dst": ip_dst}) + return req.json()["service_id"] if verify(req) else False diff --git a/tests/utils/udpserver.py b/tests/utils/udpserver.py index 39b3e9b..2429fe9 100644 --- a/tests/utils/udpserver.py +++ b/tests/utils/udpserver.py @@ -2,7 +2,7 @@ from multiprocessing import Process import socket class UdpServer: - def __init__(self,port,ipv6): + def __init__(self,port,ipv6, proxy_port = None): def _startServer(port): sock = socket.socket(socket.AF_INET6 if ipv6 else socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -15,6 +15,7 @@ class UdpServer: self.ipv6 = ipv6 self.port = port + self.proxy_port = proxy_port self.server = Process(target=_startServer,args=[port]) def start(self): @@ -26,7 +27,7 @@ class UdpServer: def sendCheckData(self,data): s = socket.socket(socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(2) - s.sendto(data, ('::1' if self.ipv6 else '127.0.0.1', self.port)) + s.sendto(data, ('::1' if self.ipv6 else '127.0.0.1', self.proxy_port if self.proxy_port else self.port)) try: received_data = s.recvfrom(432) except Exception: