diff --git a/tests/proxyapi.py b/tests/proxyapi.py new file mode 100644 index 0000000..590dfa3 --- /dev/null +++ b/tests/proxyapi.py @@ -0,0 +1,58 @@ +from requests import Session + +class ProxyAPI: + def __init__(self,address,password): + self.s = Session() + self.address = address + self.password = password + + def connect(self): + req = self.s.post(f"{self.address}api/login", json={"password":self.password}) + return req.json()["status"] == "ok" + + def create_service(self,service_name,service_port): + req = self.s.post(f"{self.address}api/services/add" , json={"name":service_name,"port":service_port}) + return req.json()["status"] == "ok" + + def get_service_details(self,service_name): + req = self.s.get(f"{self.address}api/services") + internal_port = service_id = None + try: + for service in req.json(): + if service["name"] == service_name: + service_id = service["id"] + internal_port = service["internal_port"] + break + except Exception: + pass + return service_id,internal_port + + def get_service_status(self,service_id): + req = self.s.get(f"{self.address}api/service/{service_id}") + return req.json()["status"] + + def get_service_regexes(self,service_id): + req = self.s.get(f"{self.address}api/service/{service_id}/regexes") + return req.json() + + def start(self,service_id): + req = self.s.get(f"{self.address}api/service/{service_id}/start") + return req.json()["status"] == "ok" + + def pause(self,service_id): + req = self.s.get(f"{self.address}api/service/{service_id}/pause") + return req.json()["status"] == "ok" + + def stop(self,service_id): + req = self.s.get(f"{self.address}api/service/{service_id}/stop") + return req.json()["status"] == "ok" + + def delete(self,service_id): + req = self.s.get(f"{self.address}api/service/{service_id}/delete") + return req.json()["status"] == "ok" + + def add_regex(self,service_id,regex,is_blacklist = True, is_case_sensitive = True, mode = "B"): + req = self.s.post(f"{self.address}api/regexes/add", + json={"is_blacklist":is_blacklist,"is_case_sensitive":is_case_sensitive,"service_id":service_id,"mode":mode,"regex":regex}) + return req.json()["status"] == "ok" + diff --git a/tests/test.py b/tests/test.py index cd26933..28968be 100755 --- a/tests/test.py +++ b/tests/test.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 import argparse, socket,secrets, base64 from time import sleep -from requests import Session from multiprocessing import Process +from proxyapi import ProxyAPI pref = "\033[" reset = f"{pref}0m" @@ -21,6 +21,7 @@ def puts(text, *args, color=colors.white, is_bold=False, **kwargs): print(f'{pref}{1 if is_bold else 0};{color}' + text + reset, *args, **kwargs) def sep(): puts("-----------------------------------", is_bold=True) + parser = argparse.ArgumentParser() parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") parser.add_argument("--service_port", "-P", type=int , required=False, help='Port of the test service', default=1337) @@ -31,35 +32,41 @@ sep() puts(f"Testing will start on ", color=colors.cyan, end="") puts(f"{args.address}", color=colors.yellow) -s = Session() + +proxy = ProxyAPI(args.address,args.password) +service_created = False + #Connect to Firegex -req = s.post(f"{args.address}api/login", json={"password":args.password}) -assert req.json()["status"] == "ok", f"Test Failed: Unknown response or wrong passowrd {req.text}" -puts(f"Sucessfully logged in ✔", color=colors.green) +if (proxy.connect()): puts(f"Sucessfully logged in ✔", color=colors.green) +else: puts(f"Test Failed: Unknown response or wrong passowrd ✗", color=colors.red); exit(1) + #Create new Service -req = s.post(f"{args.address}api/services/add" , json={"name":args.service_name,"port":args.service_port}) -assert req.json()["status"] == "ok", f"Test Failed: Couldn't create service {req.text} ✔" -puts(f"Sucessfully created service {args.service_name} with public port {args.service_port} ✔", color=colors.green) +if (proxy.create_service(args.service_name,args.service_port)): + puts(f"Sucessfully created service {args.service_name} with public port {args.service_port} ✔", color=colors.green) + service_created = True +else: puts(f"Test Failed: Couldn't create service ✗", color=colors.red); exit(1) + +#Delete the Service and exit +def exit_test(status_code=0): + if service_created: + if(proxy.delete(service_id)): + puts(f"Sucessfully delete service with id {service_id} ✔", color=colors.green) + else: + puts(f"Test Failed: Couldn't delete service ✗", color=colors.red); exit(1) + sep() + server.terminate() + exit(status_code) #Find the Service -req = s.get(f"{args.address}api/services") -internal_port = service_id = None -try: - for service in req.json(): - if service["name"] == args.service_name: - service_id = service["id"] - internal_port = service["internal_port"] - puts(f"Sucessfully received the internal port {internal_port} ✔", color=colors.green) - break -except Exception: - puts(f"Test Failed: Coulnd't get the service internal port {req.text}", color=colors.red) - exit(1) +service_id, internal_port = proxy.get_service_details(args.service_name) +if (internal_port): puts(f"Sucessfully received the internal port {internal_port} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't get the service internal port ✗", color=colors.red); exit_test(1) #Start listener -def startServer(): +def startServer(port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('127.0.0.1', internal_port)) + sock.bind(('127.0.0.1', port)) sock.listen(8) while True: connection,address = sock.accept() @@ -67,13 +74,12 @@ def startServer(): connection.send(buf) connection.close() -server = Process(target=startServer) +server = Process(target=startServer,args=[internal_port]) server.start() #Start firewall -req = s.get(f"{args.address}api/service/{service_id}/start") -assert req.json()["status"] == "ok", f"Test Failed: Couldn't start the service {req.text}" -puts(f"Sucessfully started service with id {service_id} ✔", color=colors.green) +if(proxy.start(service_id)): puts(f"Sucessfully started service with id {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1) #Hacky solution - wait a bit for the server to start sleep(1) @@ -87,39 +93,54 @@ def sendCheckData(data): s.close() return received_data == data -if sendCheckData(b'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'): +if sendCheckData(secrets.token_bytes(200)): puts(f"Successfully tested first proxy with no regex ✔", color=colors.green) else: - puts(f"Test Failed: Data was corrupted", color=colors.red) + puts(f"Test Failed: Data was corrupted ", color=colors.red) #Add new regex secret = bytes(secrets.token_hex(16).encode()) regex = base64.b64encode(secret).decode() -req = s.post(f"{args.address}api/regexes/add", - json={"is_blacklist":True,"is_case_sensitive":True,"service_id":service_id,"mode":"B","regex":regex}) -assert req.json()["status"] == "ok", f"Test Failed: Couldn't add regex {req.text}" -puts(f"Sucessfully added regex to service with id {service_id} ✔", color=colors.green) +if(proxy.add_regex(service_id,regex)): puts(f"Sucessfully added regex {secret} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1) -#Test the regex -if not sendCheckData(b'AAAAAAAAAAAAAAAA' + secret + b'AAAAAAAAAAAAAAAAA'): - puts(f"The malicious request was successfully blocked ✔", color=colors.green) -else: - puts(f"Test Failed: The request wasn't blocked", color=colors.red) +#Check if regex is present in the service +def checkRegex(regex): + for i,r in enumerate(proxy.get_service_regexes(service_id)): + if r["regex"] == regex: + #Test the regex + if not sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): + puts(f"The malicious request was successfully blocked ✔", color=colors.green) + else: + puts(f"Test Failed: The request wasn't blocked ✗", color=colors.red); exit_test(1) + return + puts(f"Test Failed: The regex wasn't found ✗", color=colors.red) + +checkRegex(regex) #Pause the proxy -req = s.get(f"{args.address}api/service/{service_id}/pause") -assert req.json()["status"] == "ok", f"Test Failed: Couldn't delete service {req.text}" -puts(f"Sucessfully stopped service with id {service_id} ✔", color=colors.green) +if(proxy.pause(service_id)): puts(f"Sucessfully paused service with id {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't pause the service ✗", color=colors.red); exit_test(1) #Check if it's actually paused -if sendCheckData(b'AAAAAAAAAAAAAAAA' + secret + b'AAAAAAAAAAAAAAAAA'): +if sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)): puts(f"The request wasn't blocked ✔", color=colors.green) else: puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red) -#Delete the Service -req = s.get(f"{args.address}api/service/{service_id}/delete") -assert req.json()["status"] == "ok", f"Test Failed: Couldn't delete service {req.text}" -puts(f"Sucessfully delete service with id {service_id} ✔", color=colors.green) +#Stop the proxy +if(proxy.stop(service_id)): puts(f"Sucessfully stopped service with id {service_id} ✔", color=colors.green) +else: puts(f"Test Failed: Coulnd't stop the service ✗", color=colors.red); exit_test(1) +sleep(100) -server.terminate() \ No newline at end of file +#Check if proxy is stopped +try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('0.0.0.0', args.service_port)) + puts(f"I could bind to the port, so {service_id} was stopped correctly ✔", color=colors.green) + sock.close() +except Exception as e: + print(e) + puts(f"Test Failed: Coulnd't bind to port {args.service_port} ✗", color=colors.red); exit_test(1) + +exit_test() \ No newline at end of file