From cb8b6e5b04ac23632f981132b72caa9407737374 Mon Sep 17 00:00:00 2001 From: nik012003 Date: Mon, 1 Aug 2022 23:59:32 +0200 Subject: [PATCH] Initial test refactoring --- tests/README.md | 65 ------------ tests/api_test.py | 62 ++++++++++++ tests/benchmark.csv | 11 --- tests/benchmark.py | 74 +++++--------- tests/firegexapi.py | 84 ---------------- tests/nf_test.py | 94 ++++++++++++++++++ tests/test.py | 167 ------------------------------- tests/utils/colors.py | 17 ++++ tests/utils/firegexapi.py | 201 ++++++++++++++++++++++++++++++++++++++ tests/utils/tcpserver.py | 31 ++++++ 10 files changed, 427 insertions(+), 379 deletions(-) delete mode 100644 tests/README.md create mode 100755 tests/api_test.py delete mode 100755 tests/benchmark.csv delete mode 100644 tests/firegexapi.py create mode 100755 tests/nf_test.py delete mode 100755 tests/test.py create mode 100644 tests/utils/colors.py create mode 100644 tests/utils/firegexapi.py create mode 100644 tests/utils/tcpserver.py diff --git a/tests/README.md b/tests/README.md deleted file mode 100644 index e850ce0..0000000 --- a/tests/README.md +++ /dev/null @@ -1,65 +0,0 @@ -# Firegex tests - -## [GO BACK](../README.md) - -Tests are a quick and dirty way to check if your modification to the backend code dind't break anything. - -# Running a Test - $ ./test.py - usage: test.py [-h] [--address ADDRESS] [--service_port SERVICE_PORT] [--service_name SERVICE_NAME] --password PASSWORD - -If you are running firegex locally, just run ```test.py -p FIREGEX_PASSWORD```. Otherwise, select a remote address with ```-a http://ADDRESS:PORT/``` . - -Output of the tests: - - Testing will start on http://127.0.0.1:5000/ - Sucessfully logged in ✔ - Sucessfully created service Test Service with public port 1337 ✔ - Sucessfully received the internal port 38222 ✔ - Sucessfully started service with id test-service ✔ - Successfully tested first proxy with no regex ✔ - Sucessfully added regex to service with id test-service ✔ - The malicious request was successfully blocked ✔ - Sucessfully stopped service with id test-service ✔ - The request wasn't blocked ✔ - Sucessfully delete service with id test-service ✔ - -The testing methodology will soon be updated with more edge-cases. - -# Running a Benchmark - ./benchmark.py - options: - --address ADDRESS, -a ADDRESS - Address of firegex backend - --service_port SERVICE_PORT, -P SERVICE_PORT - Port of the Benchmark service - --service_name SERVICE_NAME, -n SERVICE_NAME - Name of the Benchmark service - --password PASSWORD, -p PASSWORD - Firegex password - --num_of_regexes NUM_OF_REGEXES, -r NUM_OF_REGEXES - Number of regexes to benchmark with - --duration DURATION, -d DURATION - Duration of the Benchmark in seconds - --output_file OUTPUT_FILE, -o OUTPUT_FILE - Output results csv file - --num_of_streams NUM_OF_STREAMS, -s NUM_OF_STREAMS - Output results csv file - --new_istance, -i Create a new service - -Benchmarks let you evaluate the performance of the proxy. You can run one by typing in a shell ```test.py -p FIREGEX_PASSWORD -r NUM_OF_REGEX -d BENCHMARK_DURATION -i```. - -It uses iperf3 to benchmark the throughput in MB/s of the server, both with proxy, without proxy, and for each new added regex. It will automatically add a new random regex untill it has reached NUM_OF_REGEX specified in the arguments. - -You will find a new benchmark.csv file containg the results. - -# Firegex Performance Results - -The test was performed on: -- AMD Ryzen 7 3700X (16 thread) @ 3.600GHz -- RAM Speed: 3200 MT/s (Dual Channel) -- Kernel: 5.18.5-arch1-1 - -Command: `python3 benchmark.py -r 100 -d 1 -s 50` - -![Firegex Benchmark](/docs/FiregexBenchmark.png) diff --git a/tests/api_test.py b/tests/api_test.py new file mode 100755 index 0000000..906b108 --- /dev/null +++ b/tests/api_test.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +from utils.colors import * +from utils.firegexapi import * +import argparse, secrets + +parser = argparse.ArgumentParser() +parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") +parser.add_argument("--password", "-p", type=str, required=False, help='Firegex password') +args = parser.parse_args() +sep() +puts(f"Testing will start on ", color=colors.cyan, end="") +puts(f"{args.address}", color=colors.yellow) + +firegex = FiregexAPI(args.address) +password = "" +#Connect to Firegex +if args.password: + password = args.password + if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colors.green) + else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) +else: + password = secrets.token_hex(10) + if (firegex.set_password(args.password)): puts(f"Sucessfully set password to {password} ✔", color=colors.green) + else: puts(f"Test Failed: Unknown response or password already put ✗", color=colors.red); exit(1) + +if(firegex.status()["loggined"]): puts(f"Correctly received status ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or not logged in✗", color=colors.red); exit(1) + +#Prepare second instance +firegex2 = FiregexAPI(args.address) +if (firegex2.login(password)): puts(f"Sucessfully logged in on second instance ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or wrong passowrd on second instance ✗", color=colors.red); exit(1) + +if(firegex2.status()["loggined"]): puts(f"Correctly received status on second instance✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or not logged in on second instance✗", color=colors.red); exit(1) + +#Change password +new_password = secrets.token_hex(10) +if (firegex.change_password(new_password,expire=True)): puts(f"Sucessfully changed password to {new_password} ✔", color=colors.green) +else: puts(f"Test Failed: Coundl't change the password ✗", color=colors.red); exit(1) + +#Check if we are still logged in +if(firegex.status()["loggined"]): puts(f"Correctly received status after password change ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or not logged after password change ✗", color=colors.red); exit(1) + +#Check if second session expired and relog + +if(not firegex2.status()["loggined"]): puts(f"Second instance was expired currectly ✔", color=colors.green) +else: puts(f"Test Failed: Still logged in on second instance, expire expected ✗", color=colors.red); exit(1) +if (firegex2.login(new_password)): puts(f"Sucessfully logged in on second instance ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or wrong passowrd on second instance ✗", color=colors.red); exit(1) + +#Change it back +if (firegex.change_password(password,expire=False)): puts(f"Sucessfully restored the password ✔", color=colors.green) +else: puts(f"Test Failed: Coundl't change the password ✗", color=colors.red); exit(1) + +#Check if we are still logged in +if(firegex2.status()["loggined"]): puts(f"Correctly received status after password change ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or not logged after password change ✗", color=colors.red); exit(1) + +puts("List of available interfaces:", color=colors.yellow) +for interface in firegex.get_interfaces(): puts("name: {}, address: {}".format(interface["name"], interface["addr"]), color=colors.yellow) \ No newline at end of file diff --git a/tests/benchmark.csv b/tests/benchmark.csv deleted file mode 100755 index afc345c..0000000 --- a/tests/benchmark.csv +++ /dev/null @@ -1,11 +0,0 @@ -0,567.771 -1,103.886 -2,55.872 -3,37.958 -4,28.79 -5,22.956 -6,19.34 -7,16.586 -8,14.661 -9,13.158 -10,11.862 diff --git a/tests/benchmark.py b/tests/benchmark.py index decb866..cb60c8c 100755 --- a/tests/benchmark.py +++ b/tests/benchmark.py @@ -1,36 +1,22 @@ #!/usr/bin/env python3 -import argparse, socket,secrets, base64, iperf3, csv +from utils.colors import * +from utils.firegexapi import * +from utils.tcpserver import * +from multiprocessing import Process from time import sleep -from firegexapi import FiregexAPI -from multiprocessing import Process +import iperf3, csv, argparse, base64, secrets -pref = "\033[" -reset = f"{pref}0m" -class colors: - black = "30m" - red = "31m" - green = "32m" - yellow = "33m" - blue = "34m" - magenta = "35m" - cyan = "36m" - white = "37m" - -def puts(text, *args, color=colors.white, is_bold=False, **kwargs): - print(f'{pref}{1 if is_bold else 0};{color}' + text + reset, *args, **kwargs) - -def sep(): puts("-----------------------------------", is_bold=True) +#TODO: make it work with Proxy and not only netfilter parser = argparse.ArgumentParser() parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") -parser.add_argument("--service_port", "-P", type=int , required=False, help='Port of the Benchmark service', default=1337) +parser.add_argument("--port", "-P", type=int , required=False, help='Port of the Benchmark service', default=1337) parser.add_argument("--service_name", "-n", type=str , required=False, help='Name of the Benchmark service', default="Benchmark Service") parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password') parser.add_argument("--num_of_regexes", "-r", type=int, required=True, help='Number of regexes to benchmark with') parser.add_argument("--duration", "-d", type=int, required=False, help='Duration of the Benchmark in seconds', default=5) parser.add_argument("--output_file", "-o", type=str, required=False, help='Output results csv file', default="benchmark.csv") parser.add_argument("--num_of_streams", "-s", type=int, required=False, help='Output results csv file', default=1) -parser.add_argument("--new_istance", "-i", action="store_true", help='Create a new service', default=False) args = parser.parse_args() sep() @@ -43,28 +29,16 @@ firegex = FiregexAPI(args.address) if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colors.green) else: puts(f"Benchmark Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) - -if args.new_istance: - #Create new Service - if (firegex.create_service(args.service_name,args.service_port)): - puts(f"Sucessfully created service {args.service_name} with public port {args.service_port} ✔", color=colors.green) - service_created = True - else: puts(f"Benchmark Failed: Couldn't create service ✗", color=colors.red); exit(1) - -#Find the Service -service = firegex.get_service_details(args.service_name) -if (service): - internal_port= service["internal_port"] - service_id = service["id"] - puts(f"Sucessfully received the internal port {internal_port} ✔", color=colors.green) -else: puts(f"Benchmark Failed: Coulnd't get the service internal port ✗", color=colors.red); exit_test(1) - +#Create new Service +service_id = firegex.nf_add_service(args.service_name, args.port, "tcp", "127.0.0.1/24") +if service_id: puts(f"Sucessfully created service {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Failed to create service ✗", color=colors.red); exit(1) #Start iperf3 def startServer(): server = iperf3.Server() server.bind_address = '127.0.0.1' - server.port = internal_port + server.port = args.port server.verbose = False while True: server.run() @@ -85,27 +59,24 @@ sleep(1) #Get baseline reading puts(f"Baseline without proxy: ", color=colors.blue, end='') -print(f"{getReading(internal_port)} MB/s") +print(f"{getReading(args.port)} MB/s") #Start firewall -if(firegex.start(service_id)): puts(f"Sucessfully started service with id {service_id} ✔", color=colors.green) +if(firegex.nf_start_service(service_id)): puts(f"Sucessfully started service with id {service_id} ✔", color=colors.green) else: puts(f"Benchmark Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1) -#Hacky solution - wait a bit for the server to start -sleep(1) - #Get no regexs reading results = [] puts(f"Performance with no regexes: ", color=colors.yellow , end='') -results.append(getReading(args.service_port)) +results.append(getReading(args.port)) print(f"{results[0]} MB/s") #Add all the regexs for i in range(1,args.num_of_regexes+1): regex = base64.b64encode(bytes(secrets.token_hex(16).encode())).decode() - if(not firegex.add_regex(service_id,regex)): puts(f"Benchmark Failed: Coulnd't add the regex ✗", color=colors.red); exit_test(1) + if(not firegex.nf_add_regex(service_id,regex,"B",active=True,is_blacklist=True,is_case_sensitive=False)): puts(f"Benchmark Failed: Coulnd't add the regex ✗", color=colors.red); exit_test(1) puts(f"Performance with {i} regex(s): ", color=colors.red, end='') - results.append(getReading(args.service_port)) + results.append(getReading(args.port)) print(f"{results[i]} MB/s") with open(args.output_file,'w') as f: @@ -115,11 +86,10 @@ with open(args.output_file,'w') as f: puts(f"Sucessfully written results to {args.output_file} ✔", color=colors.magenta) -if args.new_istance: - #Delete the Service - if(firegex.delete(service_id)): - puts(f"Sucessfully delete service with id {service_id} ✔", color=colors.green) - else: - puts(f"Test Failed: Couldn't delete service ✗", color=colors.red); exit(1) +#Delete the Service +if(firegex.nf_delete_service(service_id)): + puts(f"Sucessfully delete service with id {service_id} ✔", color=colors.green) +else: + puts(f"Test Failed: Couldn't delete service ✗", color=colors.red); exit(1) server.terminate() diff --git a/tests/firegexapi.py b/tests/firegexapi.py deleted file mode 100644 index 1c94672..0000000 --- a/tests/firegexapi.py +++ /dev/null @@ -1,84 +0,0 @@ -from requests import Session - -class BearerSession(): - def __init__(self): - self.s = Session() - self.headers = {} - - def post(self, endpoint, json={}, data=""): - headers = self.headers - if data: - headers["Content-Type"] = "application/x-www-form-urlencoded" - return self.s.post(endpoint, json=json, data=data, headers=headers) - - def get(self, endpoint, json={}): - return self.s.get(endpoint, json=json, headers=self.headers) - - def set_token(self,token): - self.headers = {"Authorization": f"Bearer {token}"} - - def unset_token(self): - self.headers = {} - -class FiregexAPI: - def __init__(self,address): - self.s = BearerSession() - self.address = address - - def login(self,password): - req = self.s.post(f"{self.address}api/login", data=f"username=login&password={password}") - try : - self.s.set_token(req.json()["access_token"]) - return True - except Exception: - return False - - def logout(self): - self.s.unset_token() - return True - - def change_password(self,password,expire): - req = self.s.post(f"{self.address}api/change-password", json={"password":password, "expire":expire}) - try: - self.s.set_token(req.json()["access_token"]) - return True - except Exception: - return False - - def create_service(self,service_name,service_port, proto, ip_int): - req = self.s.post(f"{self.address}api/services/add" , json={"name":service_name,"port":service_port, "ip_int": ip_int, "proto": proto}) - return req.json()["service_id"] if req.json()["status"] == "ok" else None - - def get_service(self,service_id): - req = self.s.get(f"{self.address}api/service/{service_id}") - return req.json() - - def get_service_regexes(self,service_id): - req = self.s.get(f"{self.address}api/service/{service_id}/regexes") - return req.json() - - def get_regex(self,regex_id): - req = self.s.get(f"{self.address}api/regex/{regex_id}") - return req.json() - - def add_regex(self,service_id,regex,is_blacklist = True, is_case_sensitive = True, mode = "B"): - req = self.s.post(f"{self.address}api/regexes/add", - json={"is_blacklist":is_blacklist,"is_case_sensitive":is_case_sensitive,"service_id":service_id,"mode":mode,"regex":regex}) - return req.json()["status"] == "ok" - - def delete_regex(self,regex_id): - req = self.s.get(f"{self.address}api/regex/{regex_id}/delete") - return req.json()["status"] == "ok" - - def start(self,service_id): - req = self.s.get(f"{self.address}api/service/{service_id}/start") - return req.json()["status"] == "ok" - - def stop(self,service_id): - req = self.s.get(f"{self.address}api/service/{service_id}/stop") - return req.json()["status"] == "ok" - - def delete(self,service_id): - req = self.s.get(f"{self.address}api/service/{service_id}/delete") - return req.json()["status"] == "ok" - diff --git a/tests/nf_test.py b/tests/nf_test.py new file mode 100755 index 0000000..0efd249 --- /dev/null +++ b/tests/nf_test.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +from utils.colors import * +from utils.firegexapi import * +from utils.tcpserver import * +import argparse, secrets, base64,time + +parser = argparse.ArgumentParser() +parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") +parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password') +parser.add_argument("--service_name", "-n", type=str , required=False, help='Name of the test service', default="Test Service") +parser.add_argument("--port", "-P", type=int , required=False, help='Port of the test service', default=1337) + +args = parser.parse_args() +sep() +puts(f"Testing will start on ", color=colors.cyan, end="") +puts(f"{args.address}", color=colors.yellow) + +firegex = FiregexAPI(args.address) + +#Login +if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) + +#TCP tests +server = TcpServer(args.port) + +def exit_test(code): + if service_id: + server.stop() + firegex.nf_delete_service(service_id) + exit(code) + +service_id = firegex.nf_add_service(args.service_name, args.port, "tcp", "127.0.0.1/24") +if service_id: puts(f"Sucessfully created service {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Failed to create service ✗", color=colors.red); exit(1) + +if(firegex.nf_start_service(service_id)): puts(f"Sucessfully started service ✔", color=colors.green) +else: puts(f"Test Failed: Failed to start service ✗", color=colors.red); exit_test(1) + +server.start() + +if server.sendCheckData(secrets.token_bytes(200)): + puts(f"Successfully tested first proxy with no regex ✔", color=colors.green) +else: + puts(f"Test Failed: Data was corrupted ", color=colors.red); exit_test(1) + +#Add new regex +secret = bytes(secrets.token_hex(16).encode()) +regex = base64.b64encode(secret).decode() +if(firegex.nf_add_regex(service_id,regex,"B",active=True,is_blacklist=True,is_case_sensitive=True)): + puts(f"Sucessfully added regex {regex} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't add the regex {secret} ✗", color=colors.red); exit_test(1) + +#Check if regex is present in the service +n_blocked = 0 + +def checkRegex(regex): + global n_blocked + for r in firegex.nf_get_service_regexes(service_id): + if r["regex"] == regex: + #Test the regex + if not server.sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): + puts(f"The malicious request was successfully blocked ✔", color=colors.green) + n_blocked += 1 + if firegex.nf_get_regex(r["id"])["n_packets"] == n_blocked: + puts(f"The packed was reported as blocked ✔", color=colors.green) + else: + puts(f"Test Failed: The packed wasn't reported as blocked ✗", color=colors.red); exit_test(1) + else: + puts(f"Test Failed: The request wasn't blocked ✗", color=colors.red);exit_test(1) + return + puts(f"Test Failed: The regex wasn't found ✗", color=colors.red); exit_test(1) + +checkRegex(regex) + +#Pause the proxy +if(firegex.nf_stop_service(service_id)): puts(f"Sucessfully paused service with id {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't pause the service ✗", color=colors.red); exit_test(1) + +#Check if it's actually paused +if server.sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): + puts(f"The request wasn't blocked ✔", color=colors.green) +else: + puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red) + +#Start firewall +if(firegex.nf_start_service(service_id)): puts(f"Sucessfully started service with id {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1) + +checkRegex(regex) + +#TODO: test whitelist, enable/disable regex, and UDP + +exit_test(0) \ No newline at end of file diff --git a/tests/test.py b/tests/test.py deleted file mode 100755 index 231a1a5..0000000 --- a/tests/test.py +++ /dev/null @@ -1,167 +0,0 @@ -#!/usr/bin/env python3 -import argparse, socket,secrets, base64 -from time import sleep -from multiprocessing import Process -from firegexapi import FiregexAPI - -pref = "\033[" -reset = f"{pref}0m" - -class colors: - black = "30m" - red = "31m" - green = "32m" - yellow = "33m" - blue = "34m" - magenta = "35m" - cyan = "36m" - white = "37m" - -def puts(text, *args, color=colors.white, is_bold=False, **kwargs): - print(f'{pref}{1 if is_bold else 0};{color}' + text + reset, *args, **kwargs) - -def sep(): puts("-----------------------------------", is_bold=True) - -parser = argparse.ArgumentParser() -parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") -parser.add_argument("--service_port", "-P", type=int , required=False, help='Port of the test service', default=1337) -parser.add_argument("--service_name", "-n", type=str , required=False, help='Name of the test service', default="Test Service") -parser.add_argument("--password", "-p", type=str, required=True, help='Firegex password') -args = parser.parse_args() -sep() -puts(f"Testing will start on ", color=colors.cyan, end="") -puts(f"{args.address}", color=colors.yellow) - - -firegex = FiregexAPI(args.address) -service_created = False - -#Connect to Firegex -if (firegex.login(args.password)): puts(f"Sucessfully logged in ✔", color=colors.green) -else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) - -#Change password -new_password = secrets.token_hex(10) -if (firegex.change_password(new_password,True)): puts(f"Sucessfully changed password to {new_password} ✔", color=colors.green) -else: puts(f"Test Failed: Coundl't change the password ✗", color=colors.red); exit(1) -#Changed it back -if (firegex.change_password(args.password,True)): puts(f"Sucessfully restored the password ✔", color=colors.green) -else: puts(f"Test Failed: Coundl't change the password ✗", color=colors.red); exit(1) - - -#Create new Service -service = firegex.create_service(args.service_name,args.service_port,"tcp","127.0.0.1/24") -if service: - puts(f"Sucessfully created service {args.service_name} with public port {args.service_port} ✔", color=colors.green) - service_created = True -else: puts(f"Test Failed: Couldn't create service ✗", color=colors.red); exit(1) - -#Delete the Service and exit -def exit_test(status_code=0): - if service_created: - if(firegex.delete(service)): - puts(f"Sucessfully delete service with id {service} ✔", color=colors.green) - else: - puts(f"Test Failed: Couldn't delete service ✗", color=colors.red); exit(1) - sep() - server.terminate() - exit(status_code) - -#Start listener -def startServer(port): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('127.0.0.1', port)) - sock.listen(8) - while True: - connection,address = sock.accept() - buf = connection.recv(4096) - connection.send(buf) - connection.close() - -server = Process(target=startServer,args=[args.service_port]) -server.start() - -#Start firewall -if(firegex.start(service)): puts(f"Sucessfully started service with id {service} ✔", color=colors.green) -else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1) - -#Hacky solution - wait a bit for the server to start -sleep(1) - -#Send data via the proxy, and validate it -def sendCheckData(data): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', args.service_port)) - s.sendall(data) - received_data = s.recv(4096) - s.close() - return received_data == data - -if sendCheckData(secrets.token_bytes(200)): - puts(f"Successfully tested first proxy with no regex ✔", color=colors.green) -else: - puts(f"Test Failed: Data was corrupted ", color=colors.red) - -#Add new regex -secret = bytes(secrets.token_hex(16).encode()) -regex = base64.b64encode(secret).decode() -if(firegex.add_regex(service,regex)): puts(f"Sucessfully added regex {secret} ✔", color=colors.green) -else: puts(f"Test Failed: Coulnd't add the regex {secret} ✗", color=colors.red); exit_test(1) - -#Check if regex is present in the service -n_blocked = 0 - -def checkRegex(regex): - global n_blocked - for r in firegex.get_service_regexes(service): - if r["regex"] == regex: - #Test the regex - if not sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): - puts(f"The malicious request was successfully blocked ✔", color=colors.green) - n_blocked += 1 - sleep(10) - if firegex.get_regex(r["id"])["n_packets"] == n_blocked: - puts(f"The packed was reported as blocked ✔", color=colors.green) - else: - puts(f"Test Failed: The packed wasn't reported as blocked ✗", color=colors.red); exit_test(1) - else: - puts(f"Test Failed: The request wasn't blocked ✗", color=colors.red);exit_test(1) - return - puts(f"Test Failed: The regex wasn't found ✗", color=colors.red); exit_test(1) - -checkRegex(regex) - -#Pause the proxy -if(firegex.stop(service)): puts(f"Sucessfully paused service with id {service} ✔", color=colors.green) -else: puts(f"Test Failed: Coulnd't pause the service ✗", color=colors.red); exit_test(1) - -#Check if it's actually paused -if sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): - puts(f"The request wasn't blocked ✔", color=colors.green) -else: - puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red) - -#Start firewall -if(firegex.start(service)): puts(f"Sucessfully started service with id {service} ✔", color=colors.green) -else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1) - -checkRegex(regex) - -#Delete the regex -def removeRegex(regex): - for r in firegex.get_service_regexes(service): - if r["regex"] == regex: - if(firegex.delete_regex(r["id"])): puts(f"Sucessfully deleted regex ✔", color=colors.green) - else: puts(f"Test Failed: Coulnd't deleted the regex ✗", color=colors.red); exit_test(1) - return - puts(f"Test Failed: The regex wasn't found ✗", color=colors.red) - -removeRegex(regex) - -if sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): - puts(f"The request wasn't blocked ✔", color=colors.green) -else: - puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red) - -exit_test() diff --git a/tests/utils/colors.py b/tests/utils/colors.py new file mode 100644 index 0000000..cce3da3 --- /dev/null +++ b/tests/utils/colors.py @@ -0,0 +1,17 @@ +pref = "\033[" +reset = f"{pref}0m" + +class colors: + black = "30m" + red = "31m" + green = "32m" + yellow = "33m" + blue = "34m" + magenta = "35m" + cyan = "36m" + white = "37m" + +def puts(text, *args, color=colors.white, is_bold=False, **kwargs): + print(f'{pref}{1 if is_bold else 0};{color}' + text + reset, *args, **kwargs) + +def sep(): puts("-----------------------------------", is_bold=True) \ No newline at end of file diff --git a/tests/utils/firegexapi.py b/tests/utils/firegexapi.py new file mode 100644 index 0000000..fe4cf67 --- /dev/null +++ b/tests/utils/firegexapi.py @@ -0,0 +1,201 @@ +from requests import Session + +def verify(req): + try: + assert(req.json()["status"] == "ok") + except Exception: + return False + return True + +class BearerSession(): + def __init__(self): + self.s = Session() + self.headers = {} + + def post(self, endpoint, json={}, data=""): + headers = self.headers + if data: + headers["Content-Type"] = "application/x-www-form-urlencoded" + return self.s.post(endpoint, json=json, data=data, headers=headers) + + def get(self, endpoint, json={}): + return self.s.get(endpoint, json=json, headers=self.headers) + + def set_token(self,token): + self.headers = {"Authorization": f"Bearer {token}"} + + def unset_token(self): + self.headers = {} + +class FiregexAPI: + def __init__(self,address): + self.s = BearerSession() + self.address = address + + #General API + def status(self): + return self.s.get(f"{self.address}api/status").json() + + def login(self,password): + req = self.s.post(f"{self.address}api/login", data=f"username=login&password={password}") + try : + self.s.set_token(req.json()["access_token"]) + return True + except Exception: + return False + + def logout(self): + self.s.unset_token() + return True + + def set_password(self,password): + req = self.s.post(f"{self.address}api/set-password", json={"password":password}) + if verify(req): + self.s.set_token(req.json()["access_token"]) + return True + else: + return False + + def change_password(self,password,expire): + req = self.s.post(f"{self.address}api/change-password", json={"password":password, "expire":expire}) + if verify(req): + self.s.set_token(req.json()["access_token"]) + return True + else: + return False + + def get_interfaces(self): + req = self.s.get(f"{self.address}api/interfaces") + return req.json() + + def reset(self, delete): + req = self.s.post(f"{self.address}api/reset", json={"delete":delete}) + + #Netfilter regex + def nf_get_stats(): + req = self.s.get(f"{self.address}api/nfregex/stats") + return req.json() + + def nf_get_services(self): + req = self.s.get(f"{self.address}api/nfregex/services") + return req.json() + + def nf_get_service(self,service_id): + req = self.s.get(f"{self.address}api/nfregex/service/{service_id}") + return req.json() + + def nf_stop_service(self,service_id): + req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/stop") + return verify(req) + + def nf_start_service(self,service_id): + req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/start") + return verify(req) + + def nf_delete_service(self,service_id): + req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/delete") + return verify(req) + + def nf_rename_service(self,newname): + req = self.s.post(f"{self.address}api/nfregex/services/add" , json={"name":service_name,"port":service_port, "ip_int": ip_int, "proto": proto}) + return verify(req) + + def nf_get_service_regexes(self,service_id): + req = self.s.get(f"{self.address}api/nfregex/service/{service_id}/regexes") + return req.json() + + def nf_get_regex(self,regex_id): + req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}") + return req.json() + + def nf_delete_regex(self,regex_id): + req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}/delete") + return verify(req) + + def nf_enable_regex(self,regex_id): + req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}/enable") + return verify(req) + + def nf_disable_regex(self,regex_id): + req = self.s.get(f"{self.address}api/nfregex/regex/{regex_id}/disable") + return verify(req) + + def nf_add_regex(self, service_id: str, regex: str, mode: str, active: bool, is_blacklist: bool, is_case_sensitive: bool): + req = self.s.post(f"{self.address}api/nfregex/regexes/add", + json={"service_id": service_id, "regex": regex, "mode": mode, "active": active, "is_blacklist": is_blacklist, "is_case_sensitive": is_case_sensitive}) + return verify(req) + + def nf_add_service(self, name: str, port: int, proto: str, ip_int: str): + req = self.s.post(f"{self.address}api/nfregex/services/add" , + json={"name":name,"port":port, "proto": proto, "ip_int": ip_int}) + return req.json()["service_id"] if verify(req) else False + + #Proxy regex + def px_get_stats(): + req = self.s.get(f"{self.address}api/regexproxy/stats") + return req.json() + + def px_get_services(self): + req = self.s.get(f"{self.address}api/regexproxy/services") + return req.json() + + def px_get_service(self,service_id): + req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}") + return req.json() + + def px_stop_service(self,service_id): + req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/stop") + return verify(req) + + def px_pause_service(self,service_id): + req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/pause") + return verify(req) + + def px_start_service(self,service_id): + req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/start") + return verify(req) + + def px_delete_service(self,service_id): + req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/delete") + return verify(req) + + def px_change_service_port(self,service_id, port, internalPort): + payload = {} + if port: payload["port"] = port + if internalPort: payload["internalPort"] = internalPort + req = self.s.post(f"{self.address}api/regexproxy/service/{service_id}/start", json=payload) + return req.json() if verify(req) else False + + def px_get_service_regexes(self,service_id): + req = self.s.get(f"{self.address}api/regexproxy/service/{service_id}/regexes") + return req.json() + + def px_get_regex(self,regex_id): + req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}") + return req.json() + + def px_delete_regex(self,regex_id): + req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}/delete") + return verify(req) + + def px_enable_regex(self,regex_id): + req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}/enable") + return verify(req) + + def px_disable_regex(self,regex_id): + req = self.s.get(f"{self.address}api/regexproxy/regex/{regex_id}/disable") + return verify(req) + + def px_add_regex(self, service_id: str, regex: str, mode: str, active: bool, is_blacklist: bool, is_case_sensitive: bool): + req = self.s.post(f"{self.address}api/regexproxy/regexes/add", + json={"service_id": service_id, "regex": regex, "mode": mode, "active": active, "is_blacklist": is_blacklist, "is_case_sensitive": is_case_sensitive}) + return verify(req) + + def px_rename_service(self,newname): + req = self.s.post(f"{self.address}api/regexproxy/services/add" , json={"name":service_name,"port":service_port, "ip_int": ip_int, "proto": proto}) + return verify(req) + + def px_add_service(self, name: str, port: int, internalPort: [int,None]): + req = self.s.post(f"{self.address}api/regexproxy/services/add" , + json={"name":name,"port":port, "internalPort": internalPort}) + return req.json()["service_id"] if verify(req) else False \ No newline at end of file diff --git a/tests/utils/tcpserver.py b/tests/utils/tcpserver.py new file mode 100644 index 0000000..738b7d4 --- /dev/null +++ b/tests/utils/tcpserver.py @@ -0,0 +1,31 @@ +from multiprocessing import Process +import socket + +class TcpServer: + def __init__(self,port): + def _startServer(port): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('127.0.0.1', port)) + sock.listen(8) + while True: + connection,address = sock.accept() + buf = connection.recv(4096) + connection.send(buf) + connection.close() + self.port = port + self.server = Process(target=_startServer,args=[port]) + + def start(self): + self.server.start() + + def stop(self): + self.server.terminate() + + def sendCheckData(self,data): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('localhost', self.port)) + s.sendall(data) + received_data = s.recv(4096) + s.close() + return received_data == data \ No newline at end of file