nfqueue to hyperscan and stream match, removed proxyregex

This commit is contained in:
Domingo Dirutigliano
2025-02-02 19:54:42 +01:00
parent 3de629ebd5
commit 2d8f19679f
54 changed files with 1134 additions and 3092 deletions

View File

@@ -1,6 +1,6 @@
import asyncio
from modules.firewall.nftables import FiregexTables
from modules.firewall.models import *
from modules.firewall.models import Rule, FirewallSettings
from utils.sqlite import SQLite
from modules.firewall.models import Action
@@ -131,5 +131,5 @@ class FirewallManager:
return self.db.get("allow_dhcp", "1") == "1"
@drop_invalid.setter
def allow_dhcp(self, value):
def allow_dhcp_set(self, value):
self.db.set("allow_dhcp", "1" if value else "0")

View File

@@ -1,7 +1,9 @@
from modules.nfregex.nftables import FiregexTables
from utils import ip_parse, run_func
from utils import run_func
from modules.nfregex.models import Service, Regex
import re, os, asyncio
import re
import os
import asyncio
import traceback
nft = FiregexTables()
@@ -20,7 +22,8 @@ class RegexFilter:
self.regex = regex
self.is_case_sensitive = is_case_sensitive
self.is_blacklist = is_blacklist
if input_mode == output_mode: input_mode = output_mode = True # (False, False) == (True, True)
if input_mode == output_mode:
input_mode = output_mode = True # (False, False) == (True, True)
self.input_mode = input_mode
self.output_mode = output_mode
self.blocked = blocked_packets
@@ -37,8 +40,10 @@ class RegexFilter:
update_func = update_func
)
def compile(self):
if isinstance(self.regex, str): self.regex = self.regex.encode()
if not isinstance(self.regex, bytes): raise Exception("Invalid Regex Paramether")
if isinstance(self.regex, str):
self.regex = self.regex.encode()
if not isinstance(self.regex, bytes):
raise Exception("Invalid Regex Paramether")
re.compile(self.regex) # raise re.error if it's invalid!
case_sensitive = "1" if self.is_case_sensitive else "0"
if self.input_mode:
@@ -67,9 +72,9 @@ class FiregexInterceptor:
self.srv = srv
self.filter_map_lock = asyncio.Lock()
self.update_config_lock = asyncio.Lock()
input_range, output_range = await self._start_binary()
queue_range = await self._start_binary()
self.update_task = asyncio.create_task(self.update_blocked())
nft.add(self.srv, input_range, output_range)
nft.add(self.srv, queue_range)
return self
async def _start_binary(self):
@@ -87,7 +92,7 @@ class FiregexInterceptor:
line = line_fut.decode()
if line.startswith("QUEUES "):
params = line.split()
return (int(params[2]), int(params[3])), (int(params[5]), int(params[6]))
return (int(params[1]), int(params[2]))
else:
self.process.kill()
raise Exception("Invalid binary output")
@@ -102,8 +107,10 @@ class FiregexInterceptor:
if regex_id in self.filter_map:
self.filter_map[regex_id].blocked+=1
await self.filter_map[regex_id].update()
except asyncio.CancelledError: pass
except asyncio.IncompleteReadError: pass
except asyncio.CancelledError:
pass
except asyncio.IncompleteReadError:
pass
except Exception:
traceback.print_exc()
@@ -135,6 +142,7 @@ class FiregexInterceptor:
raw_filters = filter_obj.compile()
for filter in raw_filters:
res[filter] = filter_obj
except Exception: pass
except Exception:
pass
return res

View File

@@ -30,14 +30,15 @@ class ServiceManager:
new_filters = set([f.id for f in regexes])
#remove old filters
for f in old_filters:
if not f in new_filters:
if f not in new_filters:
del self.filters[f]
#add new filters
for f in new_filters:
if not f in old_filters:
if f not in old_filters:
filter = [ele for ele in regexes if ele.id == f][0]
self.filters[f] = RegexFilter.from_regex(filter, self._stats_updater)
if self.interceptor: await self.interceptor.reload(self.filters.values())
if self.interceptor:
await self.interceptor.reload(self.filters.values())
def __update_status_db(self, status):
self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", status, self.srv.id)
@@ -114,4 +115,5 @@ class FirewallManager:
else:
raise ServiceNotFoundException()
class ServiceNotFoundException(Exception): pass
class ServiceNotFoundException(Exception):
pass

View File

@@ -45,36 +45,35 @@ class FiregexTables(NFTableManager):
{"delete":{"chain":{"table":self.table_name,"family":"inet", "name":self.output_chain}}},
])
def add(self, srv:Service, queue_range_input, queue_range_output):
def add(self, srv:Service, queue_range):
for ele in self.get():
if ele.__eq__(srv): return
init, end = queue_range_output
init, end = queue_range
if init > end: init, end = end, init
self.cmd({ "insert":{ "rule": {
"family": "inet",
"table": self.table_name,
"chain": self.output_chain,
"expr": [
{'match': {'left': {'payload': {'protocol': ip_family(srv.ip_int), 'field': 'saddr'}}, 'op': '==', 'right': nftables_int_to_json(srv.ip_int)}},
{'match': {"left": { "payload": {"protocol": str(srv.proto), "field": "sport"}}, "op": "==", "right": int(srv.port)}},
{"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}}
self.cmd(
{ "insert":{ "rule": {
"family": "inet",
"table": self.table_name,
"chain": self.output_chain,
"expr": [
{'match': {'left': {'payload': {'protocol': ip_family(srv.ip_int), 'field': 'saddr'}}, 'op': '==', 'right': nftables_int_to_json(srv.ip_int)}},
{'match': {"left": { "payload": {"protocol": str(srv.proto), "field": "sport"}}, "op": "==", "right": int(srv.port)}},
{"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}}
]
}}})
init, end = queue_range_input
if init > end: init, end = end, init
self.cmd({"insert":{"rule":{
"family": "inet",
"table": self.table_name,
"chain": self.input_chain,
"expr": [
{'match': {'left': {'payload': {'protocol': ip_family(srv.ip_int), 'field': 'daddr'}}, 'op': '==', 'right': nftables_int_to_json(srv.ip_int)}},
{'match': {"left": { "payload": {"protocol": str(srv.proto), "field": "dport"}}, "op": "==", "right": int(srv.port)}},
{"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}}
]
}}})
}}},
{"insert":{"rule":{
"family": "inet",
"table": self.table_name,
"chain": self.input_chain,
"expr": [
{'match': {'left': {'payload': {'protocol': ip_family(srv.ip_int), 'field': 'daddr'}}, 'op': '==', 'right': nftables_int_to_json(srv.ip_int)}},
{'match': {"left": { "payload": {"protocol": str(srv.proto), "field": "dport"}}, "op": "==", "right": int(srv.port)}},
{"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}}
]
}}}
)
def get(self) -> list[FiregexFilter]:

View File

@@ -5,7 +5,8 @@ from utils.sqlite import SQLite
nft = FiregexTables()
class ServiceNotFoundException(Exception): pass
class ServiceNotFoundException(Exception):
pass
class ServiceManager:
def __init__(self, srv: Service, db):
@@ -29,7 +30,8 @@ class ServiceManager:
async def refresh(self, srv:Service):
self.srv = srv
if self.active: await self.restart()
if self.active:
await self.restart()
def _set_status(self,active):
self.active = active

View File

@@ -50,7 +50,8 @@ class FiregexTables(NFTableManager):
def add(self, srv:Service):
for ele in self.get():
if ele.__eq__(srv): return
if ele.__eq__(srv):
return
self.cmd({ "insert":{ "rule": {
"family": "inet",

View File

@@ -1,116 +0,0 @@
import re, os, asyncio
class Filter:
def __init__(self, regex, is_case_sensitive=True, is_blacklist=True, c_to_s=False, s_to_c=False, blocked_packets=0, code=None):
self.regex = regex
self.is_case_sensitive = is_case_sensitive
self.is_blacklist = is_blacklist
if c_to_s == s_to_c: c_to_s = s_to_c = True # (False, False) == (True, True)
self.c_to_s = c_to_s
self.s_to_c = s_to_c
self.blocked = blocked_packets
self.code = code
def compile(self):
if isinstance(self.regex, str): self.regex = self.regex.encode()
if not isinstance(self.regex, bytes): raise Exception("Invalid Regex Paramether")
re.compile(self.regex) # raise re.error if is invalid!
case_sensitive = "1" if self.is_case_sensitive else "0"
if self.c_to_s:
yield case_sensitive + "C" + self.regex.hex() if self.is_blacklist else case_sensitive + "c"+ self.regex.hex()
if self.s_to_c:
yield case_sensitive + "S" + self.regex.hex() if self.is_blacklist else case_sensitive + "s"+ self.regex.hex()
class Proxy:
def __init__(self, internal_port=0, public_port=0, callback_blocked_update=None, filters=None, public_host="0.0.0.0", internal_host="127.0.0.1"):
self.filter_map = {}
self.filter_map_lock = asyncio.Lock()
self.update_config_lock = asyncio.Lock()
self.status_change = asyncio.Lock()
self.public_host = public_host
self.public_port = public_port
self.internal_host = internal_host
self.internal_port = internal_port
self.filters = set(filters) if filters else set([])
self.process = None
self.callback_blocked_update = callback_blocked_update
async def start(self, in_pause=False):
await self.status_change.acquire()
if not self.isactive():
try:
self.filter_map = self.compile_filters()
filters_codes = self.get_filter_codes() if not in_pause else []
proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"../proxy")
self.process = await asyncio.create_subprocess_exec(
proxy_binary_path, str(self.public_host), str(self.public_port), str(self.internal_host), str(self.internal_port),
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
)
await self.update_config(filters_codes)
finally:
self.status_change.release()
try:
while True:
buff = await self.process.stdout.readuntil()
stdout_line = buff.decode()
if stdout_line.startswith("BLOCKED"):
regex_id = stdout_line.split()[1]
async with self.filter_map_lock:
if regex_id in self.filter_map:
self.filter_map[regex_id].blocked+=1
if self.callback_blocked_update: self.callback_blocked_update(self.filter_map[regex_id])
except Exception:
return await self.process.wait()
else:
self.status_change.release()
async def stop(self):
async with self.status_change:
if self.isactive():
self.process.kill()
return False
return True
async def restart(self, in_pause=False):
status = await self.stop()
await self.start(in_pause=in_pause)
return status
async def update_config(self, filters_codes):
async with self.update_config_lock:
if (self.isactive()):
self.process.stdin.write((" ".join(filters_codes)+"\n").encode())
await self.process.stdin.drain()
async def reload(self):
if self.isactive():
async with self.filter_map_lock:
self.filter_map = self.compile_filters()
filters_codes = self.get_filter_codes()
await self.update_config(filters_codes)
def get_filter_codes(self):
filters_codes = list(self.filter_map.keys())
filters_codes.sort(key=lambda a: self.filter_map[a].blocked, reverse=True)
return filters_codes
def isactive(self):
return self.process and self.process.returncode is None
async def pause(self):
if self.isactive():
await self.update_config([])
else:
await self.start(in_pause=True)
def compile_filters(self):
res = {}
for filter_obj in self.filters:
try:
raw_filters = filter_obj.compile()
for filter in raw_filters:
res[filter] = filter_obj
except Exception: pass
return res

View File

@@ -1,199 +0,0 @@
import secrets
from modules.regexproxy.proxy import Filter, Proxy
import random, socket, asyncio
from base64 import b64decode
from utils.sqlite import SQLite
from utils import socketio_emit
class STATUS:
WAIT = "wait"
STOP = "stop"
PAUSE = "pause"
ACTIVE = "active"
class ServiceNotFoundException(Exception): pass
class ServiceManager:
def __init__(self, id, db):
self.id = id
self.db = db
self.proxy = Proxy(
internal_host="127.0.0.1",
callback_blocked_update=self._stats_updater
)
self.status = STATUS.STOP
self.wanted_status = STATUS.STOP
self.filters = {}
self._update_port_from_db()
self._update_filters_from_db()
self.lock = asyncio.Lock()
self.starter = None
def _update_port_from_db(self):
res = self.db.query("""
SELECT
public_port,
internal_port
FROM services WHERE service_id = ?;
""", self.id)
if len(res) == 0: raise ServiceNotFoundException()
self.proxy.internal_port = res[0]["internal_port"]
self.proxy.public_port = res[0]["public_port"]
def _update_filters_from_db(self):
res = self.db.query("""
SELECT
regex, mode, regex_id `id`, is_blacklist,
blocked_packets n_packets, is_case_sensitive
FROM regexes WHERE service_id = ? AND active=1;
""", self.id)
#Filter check
old_filters = set(self.filters.keys())
new_filters = set([f["id"] for f in res])
#remove old filters
for f in old_filters:
if not f in new_filters:
del self.filters[f]
for f in new_filters:
if not f in old_filters:
filter_info = [ele for ele in res if ele["id"] == f][0]
self.filters[f] = Filter(
is_case_sensitive=filter_info["is_case_sensitive"],
c_to_s=filter_info["mode"] in ["C","B"],
s_to_c=filter_info["mode"] in ["S","B"],
is_blacklist=filter_info["is_blacklist"],
regex=b64decode(filter_info["regex"]),
blocked_packets=filter_info["n_packets"],
code=f
)
self.proxy.filters = list(self.filters.values())
def __update_status_db(self, status):
self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", status, self.id)
async def next(self,to):
async with self.lock:
return await self._next(to)
async def _next(self, to):
if self.status != to:
# ACTIVE -> PAUSE or PAUSE -> ACTIVE
if (self.status, to) in [(STATUS.ACTIVE, STATUS.PAUSE)]:
await self.proxy.pause()
self._set_status(to)
elif (self.status, to) in [(STATUS.PAUSE, STATUS.ACTIVE)]:
await self.proxy.reload()
self._set_status(to)
# ACTIVE -> STOP
elif (self.status,to) in [(STATUS.ACTIVE, STATUS.STOP), (STATUS.WAIT, STATUS.STOP), (STATUS.PAUSE, STATUS.STOP)]: #Stop proxy
if self.starter: self.starter.cancel()
await self.proxy.stop()
self._set_status(to)
# STOP -> ACTIVE or STOP -> PAUSE
elif (self.status, to) in [(STATUS.STOP, STATUS.ACTIVE), (STATUS.STOP, STATUS.PAUSE)]:
self.wanted_status = to
self._set_status(STATUS.WAIT)
self.__proxy_starter(to)
def _stats_updater(self,filter:Filter):
self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", filter.blocked, filter.code)
async def update_port(self):
async with self.lock:
self._update_port_from_db()
if self.status in [STATUS.PAUSE, STATUS.ACTIVE]:
next_status = self.status if self.status != STATUS.WAIT else self.wanted_status
await self._next(STATUS.STOP)
await self._next(next_status)
def _set_status(self,status):
self.status = status
self.__update_status_db(status)
async def update_filters(self):
async with self.lock:
self._update_filters_from_db()
if self.status in [STATUS.PAUSE, STATUS.ACTIVE]:
await self.proxy.reload()
def __proxy_starter(self,to):
async def func():
try:
while True:
if check_port_is_open(self.proxy.public_port):
self._set_status(to)
await socketio_emit(["regexproxy"])
await self.proxy.start(in_pause=(to==STATUS.PAUSE))
self._set_status(STATUS.STOP)
await socketio_emit(["regexproxy"])
return
else:
await asyncio.sleep(.5)
except asyncio.CancelledError:
self._set_status(STATUS.STOP)
await self.proxy.stop()
self.starter = asyncio.create_task(func())
class ProxyManager:
def __init__(self, db:SQLite):
self.db = db
self.proxy_table: dict[str, ServiceManager] = {}
self.lock = asyncio.Lock()
async def close(self):
for key in list(self.proxy_table.keys()):
await self.remove(key)
async def remove(self,id):
async with self.lock:
if id in self.proxy_table:
await self.proxy_table[id].next(STATUS.STOP)
del self.proxy_table[id]
async def reload(self):
async with self.lock:
for srv in self.db.query('SELECT service_id, status FROM services;'):
srv_id, req_status = srv["service_id"], srv["status"]
if srv_id in self.proxy_table:
continue
self.proxy_table[srv_id] = ServiceManager(srv_id,self.db)
await self.proxy_table[srv_id].next(req_status)
def get(self,id) -> ServiceManager:
if id in self.proxy_table:
return self.proxy_table[id]
else:
raise ServiceNotFoundException()
def check_port_is_open(port):
try:
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0',port))
sock.close()
return True
except Exception:
return False
def gen_service_id(db):
while True:
res = secrets.token_hex(8)
if len(db.query('SELECT 1 FROM services WHERE service_id = ?;', res)) == 0:
break
return res
def gen_internal_port(db):
while True:
res = random.randint(30000, 45000)
if len(db.query('SELECT 1 FROM services WHERE internal_port = ?;', res)) == 0:
break
return res