Refactored test

This commit is contained in:
nik012003
2022-06-28 13:24:55 +02:00
parent 2d0e7d5b42
commit d781801bac
2 changed files with 125 additions and 46 deletions

58
tests/proxyapi.py Normal file
View File

@@ -0,0 +1,58 @@
from requests import Session
class ProxyAPI:
def __init__(self,address,password):
self.s = Session()
self.address = address
self.password = password
def connect(self):
req = self.s.post(f"{self.address}api/login", json={"password":self.password})
return req.json()["status"] == "ok"
def create_service(self,service_name,service_port):
req = self.s.post(f"{self.address}api/services/add" , json={"name":service_name,"port":service_port})
return req.json()["status"] == "ok"
def get_service_details(self,service_name):
req = self.s.get(f"{self.address}api/services")
internal_port = service_id = None
try:
for service in req.json():
if service["name"] == service_name:
service_id = service["id"]
internal_port = service["internal_port"]
break
except Exception:
pass
return service_id,internal_port
def get_service_status(self,service_id):
req = self.s.get(f"{self.address}api/service/{service_id}")
return req.json()["status"]
def get_service_regexes(self,service_id):
req = self.s.get(f"{self.address}api/service/{service_id}/regexes")
return req.json()
def start(self,service_id):
req = self.s.get(f"{self.address}api/service/{service_id}/start")
return req.json()["status"] == "ok"
def pause(self,service_id):
req = self.s.get(f"{self.address}api/service/{service_id}/pause")
return req.json()["status"] == "ok"
def stop(self,service_id):
req = self.s.get(f"{self.address}api/service/{service_id}/stop")
return req.json()["status"] == "ok"
def delete(self,service_id):
req = self.s.get(f"{self.address}api/service/{service_id}/delete")
return req.json()["status"] == "ok"
def add_regex(self,service_id,regex,is_blacklist = True, is_case_sensitive = True, mode = "B"):
req = self.s.post(f"{self.address}api/regexes/add",
json={"is_blacklist":is_blacklist,"is_case_sensitive":is_case_sensitive,"service_id":service_id,"mode":mode,"regex":regex})
return req.json()["status"] == "ok"

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse, socket,secrets, base64 import argparse, socket,secrets, base64
from time import sleep from time import sleep
from requests import Session
from multiprocessing import Process from multiprocessing import Process
from proxyapi import ProxyAPI
pref = "\033[" pref = "\033["
reset = f"{pref}0m" reset = f"{pref}0m"
@@ -21,6 +21,7 @@ def puts(text, *args, color=colors.white, is_bold=False, **kwargs):
print(f'{pref}{1 if is_bold else 0};{color}' + text + reset, *args, **kwargs) print(f'{pref}{1 if is_bold else 0};{color}' + text + reset, *args, **kwargs)
def sep(): puts("-----------------------------------", is_bold=True) def sep(): puts("-----------------------------------", is_bold=True)
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/") parser.add_argument("--address", "-a", type=str , required=False, help='Address of firegex backend', default="http://127.0.0.1:4444/")
parser.add_argument("--service_port", "-P", type=int , required=False, help='Port of the test service', default=1337) parser.add_argument("--service_port", "-P", type=int , required=False, help='Port of the test service', default=1337)
@@ -31,35 +32,41 @@ sep()
puts(f"Testing will start on ", color=colors.cyan, end="") puts(f"Testing will start on ", color=colors.cyan, end="")
puts(f"{args.address}", color=colors.yellow) puts(f"{args.address}", color=colors.yellow)
s = Session()
proxy = ProxyAPI(args.address,args.password)
service_created = False
#Connect to Firegex #Connect to Firegex
req = s.post(f"{args.address}api/login", json={"password":args.password}) if (proxy.connect()): puts(f"Sucessfully logged in ✔", color=colors.green)
assert req.json()["status"] == "ok", f"Test Failed: Unknown response or wrong passowrd {req.text}" else: puts(f"Test Failed: Unknown response or wrong passowrd ", color=colors.red); exit(1)
puts(f"Sucessfully logged in ✔", color=colors.green)
#Create new Service #Create new Service
req = s.post(f"{args.address}api/services/add" , json={"name":args.service_name,"port":args.service_port}) if (proxy.create_service(args.service_name,args.service_port)):
assert req.json()["status"] == "ok", f"Test Failed: Couldn't create service {req.text}"
puts(f"Sucessfully created service {args.service_name} with public port {args.service_port}", color=colors.green) puts(f"Sucessfully created service {args.service_name} with public port {args.service_port}", color=colors.green)
service_created = True
else: puts(f"Test Failed: Couldn't create service ✗", color=colors.red); exit(1)
#Delete the Service and exit
def exit_test(status_code=0):
if service_created:
if(proxy.delete(service_id)):
puts(f"Sucessfully delete service with id {service_id}", color=colors.green)
else:
puts(f"Test Failed: Couldn't delete service ✗", color=colors.red); exit(1)
sep()
server.terminate()
exit(status_code)
#Find the Service #Find the Service
req = s.get(f"{args.address}api/services") service_id, internal_port = proxy.get_service_details(args.service_name)
internal_port = service_id = None if (internal_port): puts(f"Sucessfully received the internal port {internal_port}", color=colors.green)
try: else: puts(f"Test Failed: Coulnd't get the service internal port ✗", color=colors.red); exit_test(1)
for service in req.json():
if service["name"] == args.service_name:
service_id = service["id"]
internal_port = service["internal_port"]
puts(f"Sucessfully received the internal port {internal_port}", color=colors.green)
break
except Exception:
puts(f"Test Failed: Coulnd't get the service internal port {req.text}", color=colors.red)
exit(1)
#Start listener #Start listener
def startServer(): def startServer(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', internal_port)) sock.bind(('127.0.0.1', port))
sock.listen(8) sock.listen(8)
while True: while True:
connection,address = sock.accept() connection,address = sock.accept()
@@ -67,13 +74,12 @@ def startServer():
connection.send(buf) connection.send(buf)
connection.close() connection.close()
server = Process(target=startServer) server = Process(target=startServer,args=[internal_port])
server.start() server.start()
#Start firewall #Start firewall
req = s.get(f"{args.address}api/service/{service_id}/start") if(proxy.start(service_id)): puts(f"Sucessfully started service with id {service_id}", color=colors.green)
assert req.json()["status"] == "ok", f"Test Failed: Couldn't start the service {req.text}" else: puts(f"Test Failed: Coulnd't start the service ", color=colors.red); exit_test(1)
puts(f"Sucessfully started service with id {service_id}", color=colors.green)
#Hacky solution - wait a bit for the server to start #Hacky solution - wait a bit for the server to start
sleep(1) sleep(1)
@@ -87,7 +93,7 @@ def sendCheckData(data):
s.close() s.close()
return received_data == data return received_data == data
if sendCheckData(b'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'): if sendCheckData(secrets.token_bytes(200)):
puts(f"Successfully tested first proxy with no regex ✔", color=colors.green) puts(f"Successfully tested first proxy with no regex ✔", color=colors.green)
else: else:
puts(f"Test Failed: Data was corrupted ", color=colors.red) puts(f"Test Failed: Data was corrupted ", color=colors.red)
@@ -95,31 +101,46 @@ else:
#Add new regex #Add new regex
secret = bytes(secrets.token_hex(16).encode()) secret = bytes(secrets.token_hex(16).encode())
regex = base64.b64encode(secret).decode() regex = base64.b64encode(secret).decode()
req = s.post(f"{args.address}api/regexes/add", if(proxy.add_regex(service_id,regex)): puts(f"Sucessfully added regex {secret}", color=colors.green)
json={"is_blacklist":True,"is_case_sensitive":True,"service_id":service_id,"mode":"B","regex":regex}) else: puts(f"Test Failed: Coulnd't start the service ✗", color=colors.red); exit_test(1)
assert req.json()["status"] == "ok", f"Test Failed: Couldn't add regex {req.text}"
puts(f"Sucessfully added regex to service with id {service_id}", color=colors.green)
#Check if regex is present in the service
def checkRegex(regex):
for i,r in enumerate(proxy.get_service_regexes(service_id)):
if r["regex"] == regex:
#Test the regex #Test the regex
if not sendCheckData(b'AAAAAAAAAAAAAAAA' + secret + b'AAAAAAAAAAAAAAAAA'): if not sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)):
puts(f"The malicious request was successfully blocked ✔", color=colors.green) puts(f"The malicious request was successfully blocked ✔", color=colors.green)
else: else:
puts(f"Test Failed: The request wasn't blocked", color=colors.red) puts(f"Test Failed: The request wasn't blocked", color=colors.red); exit_test(1)
return
puts(f"Test Failed: The regex wasn't found ✗", color=colors.red)
checkRegex(regex)
#Pause the proxy #Pause the proxy
req = s.get(f"{args.address}api/service/{service_id}/pause") if(proxy.pause(service_id)): puts(f"Sucessfully paused service with id {service_id}", color=colors.green)
assert req.json()["status"] == "ok", f"Test Failed: Couldn't delete service {req.text}" else: puts(f"Test Failed: Coulnd't pause the service ", color=colors.red); exit_test(1)
puts(f"Sucessfully stopped service with id {service_id}", color=colors.green)
#Check if it's actually paused #Check if it's actually paused
if sendCheckData(b'AAAAAAAAAAAAAAAA' + secret + b'AAAAAAAAAAAAAAAAA'): if sendCheckData(secrets.token_bytes(200) + secret + secrets.token_bytes(200)):
puts(f"The request wasn't blocked ✔", color=colors.green) puts(f"The request wasn't blocked ✔", color=colors.green)
else: else:
puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red) puts(f"Test Failed: The request was blocked when it shouldn't have", color=colors.red)
#Delete the Service #Stop the proxy
req = s.get(f"{args.address}api/service/{service_id}/delete") if(proxy.stop(service_id)): puts(f"Sucessfully stopped service with id {service_id}", color=colors.green)
assert req.json()["status"] == "ok", f"Test Failed: Couldn't delete service {req.text}" else: puts(f"Test Failed: Coulnd't stop the service ", color=colors.red); exit_test(1)
puts(f"Sucessfully delete service with id {service_id}", color=colors.green) sleep(100)
server.terminate() #Check if proxy is stopped
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', args.service_port))
puts(f"I could bind to the port, so {service_id} was stopped correctly ✔", color=colors.green)
sock.close()
except Exception as e:
print(e)
puts(f"Test Failed: Coulnd't bind to port {args.service_port}", color=colors.red); exit_test(1)
exit_test()