Add tests for metrics endpoint
This commit is contained in:
@@ -81,7 +81,12 @@ else:
|
||||
#Check if regex is present in the service
|
||||
n_blocked = 0
|
||||
|
||||
def checkRegex(regex, should_work=True, upper=False):
|
||||
def getMetric(metric_name, regex):
|
||||
for metric in firegex.nf_get_metrics().split("\n"):
|
||||
if metric.startswith(metric_name + "{") and f'regex="{regex}"' in metric:
|
||||
return int(metric.split(" ")[-1])
|
||||
|
||||
def checkRegex(regex, should_work=True, upper=False, deleted=False):
|
||||
if should_work:
|
||||
global n_blocked
|
||||
for r in firegex.nf_get_service_regexes(service_id):
|
||||
@@ -93,9 +98,19 @@ def checkRegex(regex, should_work=True, upper=False):
|
||||
n_blocked += 1
|
||||
time.sleep(1)
|
||||
if firegex.nf_get_regex(r["id"])["n_packets"] == n_blocked:
|
||||
puts("The packed was reported as blocked ✔", color=colors.green)
|
||||
puts("The packet was reported as blocked in the API ✔", color=colors.green)
|
||||
else:
|
||||
puts("Test Failed: The packed wasn't reported as blocked ✗", color=colors.red)
|
||||
puts("Test Failed: The packet wasn't reported as blocked in the API ✗", color=colors.red)
|
||||
exit_test(1)
|
||||
if getMetric("firegex_blocked_packets", secret.decode()) == n_blocked:
|
||||
puts("The packet was reported as blocked in the metrics ✔", color=colors.green)
|
||||
else:
|
||||
puts("Test Failed: The packet wasn't reported as blocked in the metrics ✗", color=colors.red)
|
||||
exit_test(1)
|
||||
if getMetric("firegex_active", secret.decode()) == 1:
|
||||
puts("The regex was reported as active in the metrics ✔", color=colors.green)
|
||||
else:
|
||||
puts("Test Failed: The regex wasn't reported as active in the metrics ✗", color=colors.red)
|
||||
exit_test(1)
|
||||
else:
|
||||
puts("Test Failed: The request wasn't blocked ✗", color=colors.red)
|
||||
@@ -109,6 +124,12 @@ def checkRegex(regex, should_work=True, upper=False):
|
||||
else:
|
||||
puts("Test Failed: The request was blocked when it shouldn't have", color=colors.red)
|
||||
exit_test(1)
|
||||
if not deleted:
|
||||
if getMetric("firegex_active", secret.decode()) == 0:
|
||||
puts("The regex was reported as inactive in the metrics ✔", color=colors.green)
|
||||
else:
|
||||
puts("Test Failed: The regex wasn't reported as inactive in the metrics ✗", color=colors.red)
|
||||
exit_test(1)
|
||||
|
||||
def clear_regexes():
|
||||
global n_blocked
|
||||
@@ -121,6 +142,11 @@ def clear_regexes():
|
||||
puts("Test Failed: Coulnd't delete the regex ✗", color=colors.red)
|
||||
exit_test(1)
|
||||
break
|
||||
if f'regex="{secret.decode()}"' not in firegex.nf_get_metrics():
|
||||
puts(f"No regex metrics after deletion ✔", color=colors.green)
|
||||
else:
|
||||
puts("Test Failed: Metrics found after deleting the regex ✗", color=colors.red)
|
||||
exit_test(1)
|
||||
|
||||
checkRegex(regex)
|
||||
|
||||
@@ -172,7 +198,7 @@ checkRegex(regex)
|
||||
clear_regexes()
|
||||
|
||||
#Check if it's actually deleted
|
||||
checkRegex(regex,should_work=False)
|
||||
checkRegex(regex,should_work=False,deleted=True)
|
||||
|
||||
#Add case insensitive regex
|
||||
if(firegex.nf_add_regex(service_id,regex,"B",active=True, is_case_sensitive=False)):
|
||||
|
||||
@@ -132,6 +132,10 @@ class FiregexAPI:
|
||||
json={"name":name,"port":port, "proto": proto, "ip_int": ip_int})
|
||||
return req.json()["service_id"] if verify(req) else False
|
||||
|
||||
def nf_get_metrics(self):
|
||||
req = self.s.get(f"{self.address}api/nfregex/metrics")
|
||||
return req.text
|
||||
|
||||
#PortHijack
|
||||
def ph_get_services(self):
|
||||
req = self.s.get(f"{self.address}api/porthijack/services")
|
||||
|
||||
Reference in New Issue
Block a user