Moduled Firegex, Merging pt1 (not finished and not working yet)
This commit is contained in:
@@ -1,2 +0,0 @@
|
||||
from .firewall import FirewallManager
|
||||
from .sqlite import SQLite
|
||||
@@ -1,276 +0,0 @@
|
||||
from typing import Dict, List, Set
|
||||
from utils import ip_parse, ip_family, run_func
|
||||
from modules.sqlite import Service
|
||||
import re, os, asyncio
|
||||
import traceback, nftables
|
||||
|
||||
from modules.sqlite import Regex
|
||||
|
||||
QUEUE_BASE_NUM = 1000
|
||||
|
||||
class FiregexFilter():
|
||||
def __init__(self, proto:str, port:int, ip_int:str, queue=None, target:str=None, id=None):
|
||||
self.nftables = nftables.Nftables()
|
||||
self.id = int(id) if id else None
|
||||
self.queue = queue
|
||||
self.target = target
|
||||
self.proto = proto
|
||||
self.port = int(port)
|
||||
self.ip_int = str(ip_int)
|
||||
|
||||
def __eq__(self, o: object) -> bool:
|
||||
if isinstance(o, FiregexFilter):
|
||||
return self.port == o.port and self.proto == o.proto and ip_parse(self.ip_int) == ip_parse(o.ip_int)
|
||||
return False
|
||||
|
||||
class FiregexTables:
|
||||
|
||||
def __init__(self):
|
||||
self.table_name = "firegex"
|
||||
self.nft = nftables.Nftables()
|
||||
|
||||
def raw_cmd(self, *cmds):
|
||||
return self.nft.json_cmd({"nftables": list(cmds)})
|
||||
|
||||
def cmd(self, *cmds):
|
||||
code, out, err = self.raw_cmd(*cmds)
|
||||
|
||||
if code == 0: return out
|
||||
else: raise Exception(err)
|
||||
|
||||
def init(self):
|
||||
self.reset()
|
||||
code, out, err = self.raw_cmd({"create":{"table":{"name":self.table_name,"family":"inet"}}})
|
||||
if code == 0:
|
||||
self.cmd(
|
||||
{"create":{"chain":{
|
||||
"family":"inet",
|
||||
"table":self.table_name,
|
||||
"name":"input",
|
||||
"type":"filter",
|
||||
"hook":"prerouting",
|
||||
"prio":-150,
|
||||
"policy":"accept"
|
||||
}}},
|
||||
{"create":{"chain":{
|
||||
"family":"inet",
|
||||
"table":self.table_name,
|
||||
"name":"output",
|
||||
"type":"filter",
|
||||
"hook":"postrouting",
|
||||
"prio":-150,
|
||||
"policy":"accept"
|
||||
}}}
|
||||
)
|
||||
|
||||
|
||||
def reset(self):
|
||||
self.raw_cmd(
|
||||
{"flush":{"table":{"name":"firegex","family":"inet"}}},
|
||||
{"delete":{"table":{"name":"firegex","family":"inet"}}},
|
||||
)
|
||||
|
||||
def list(self):
|
||||
return self.cmd({"list": {"ruleset": None}})["nftables"]
|
||||
|
||||
def add_output(self, queue_range, proto, port, ip_int):
|
||||
init, end = queue_range
|
||||
if init > end: init, end = end, init
|
||||
ip_int = ip_parse(ip_int)
|
||||
ip_addr = str(ip_int).split("/")[0]
|
||||
ip_addr_cidr = int(str(ip_int).split("/")[1])
|
||||
self.cmd({ "insert":{ "rule": {
|
||||
"family": "inet",
|
||||
"table": self.table_name,
|
||||
"chain": "output",
|
||||
"expr": [
|
||||
{'match': {'left': {'payload': {'protocol': ip_family(ip_int), 'field': 'saddr'}}, 'op': '==', 'right': {"prefix": {"addr": ip_addr, "len": ip_addr_cidr}}}}, #ip_int
|
||||
{'match': {'left': {'meta': {'key': 'l4proto'}}, 'op': '==', 'right': str(proto)}},
|
||||
{'match': {"left": { "payload": {"protocol": str(proto), "field": "sport"}}, "op": "==", "right": int(port)}},
|
||||
{"queue": {"num": str(init) if init == end else f"{init}-{end}", "flags": ["bypass"]}}
|
||||
]
|
||||
}}})
|
||||
|
||||
def add_input(self, queue_range, proto = None, port = None, ip_int = None):
|
||||
init, end = queue_range
|
||||
if init > end: init, end = end, init
|
||||
ip_int = ip_parse(ip_int)
|
||||
ip_addr = str(ip_int).split("/")[0]
|
||||
ip_addr_cidr = int(str(ip_int).split("/")[1])
|
||||
self.cmd({"insert":{"rule":{
|
||||
"family": "inet",
|
||||
"table": self.table_name,
|
||||
"chain": "input",
|
||||
"expr": [
|
||||
{'match': {'left': {'payload': {'protocol': ip_family(ip_int), 'field': 'daddr'}}, 'op': '==', 'right': {"prefix": {"addr": ip_addr, "len": ip_addr_cidr}}}}, #ip_int
|
||||
{'match': {"left": { "payload": {"protocol": str(proto), "field": "dport"}}, "op": "==", "right": int(port)}},
|
||||
{"queue": {"num": str(init) if init == end else f"{init}-{end}", "flags": ["bypass"]}}
|
||||
]
|
||||
}}})
|
||||
|
||||
def get(self) -> List[FiregexFilter]:
|
||||
res = []
|
||||
for filter in [ele["rule"] for ele in self.list() if "rule" in ele and ele["rule"]["table"] == self.table_name]:
|
||||
queue_str = str(filter["expr"][2]["queue"]["num"]).split("-")
|
||||
queue = None
|
||||
if len(queue_str) == 1: queue = int(queue_str[0]), int(queue_str[0])
|
||||
else: queue = int(queue_str[0]), int(queue_str[1])
|
||||
ip_int = None
|
||||
if isinstance(filter["expr"][0]["match"]["right"],str):
|
||||
ip_int = str(ip_parse(filter["expr"][0]["match"]["right"]))
|
||||
else:
|
||||
ip_int = f'{filter["expr"][0]["match"]["right"]["prefix"]["addr"]}/{filter["expr"][0]["match"]["right"]["prefix"]["len"]}'
|
||||
res.append(FiregexFilter(
|
||||
target=filter["chain"],
|
||||
id=int(filter["handle"]),
|
||||
queue=queue,
|
||||
proto=filter["expr"][1]["match"]["left"]["payload"]["protocol"],
|
||||
port=filter["expr"][1]["match"]["right"],
|
||||
ip_int=ip_int
|
||||
))
|
||||
return res
|
||||
|
||||
async def add(self, filter:FiregexFilter):
|
||||
if filter in self.get(): return None
|
||||
return await FiregexInterceptor.start( filter=filter, n_queues=int(os.getenv("N_THREADS_NFQUEUE","1")))
|
||||
|
||||
def delete_by_srv(self, srv:Service):
|
||||
for filter in self.get():
|
||||
if filter.port == srv.port and filter.proto == srv.proto and ip_parse(filter.ip_int) == ip_parse(srv.ip_int):
|
||||
self.cmd({"delete":{"rule": {"handle": filter.id, "table": self.table_name, "chain": filter.target, "family": "inet"}}})
|
||||
|
||||
|
||||
class RegexFilter:
|
||||
def __init__(
|
||||
self, regex,
|
||||
is_case_sensitive=True,
|
||||
is_blacklist=True,
|
||||
input_mode=False,
|
||||
output_mode=False,
|
||||
blocked_packets=0,
|
||||
id=None,
|
||||
update_func = None
|
||||
):
|
||||
self.regex = regex
|
||||
self.is_case_sensitive = is_case_sensitive
|
||||
self.is_blacklist = is_blacklist
|
||||
if input_mode == output_mode: input_mode = output_mode = True # (False, False) == (True, True)
|
||||
self.input_mode = input_mode
|
||||
self.output_mode = output_mode
|
||||
self.blocked = blocked_packets
|
||||
self.id = id
|
||||
self.update_func = update_func
|
||||
self.compiled_regex = self.compile()
|
||||
|
||||
@classmethod
|
||||
def from_regex(cls, regex:Regex, update_func = None):
|
||||
return cls(
|
||||
id=regex.id, regex=regex.regex, is_case_sensitive=regex.is_case_sensitive,
|
||||
is_blacklist=regex.is_blacklist, blocked_packets=regex.blocked_packets,
|
||||
input_mode = regex.mode in ["C","B"], output_mode=regex.mode in ["S","B"],
|
||||
update_func = update_func
|
||||
)
|
||||
def compile(self):
|
||||
if isinstance(self.regex, str): self.regex = self.regex.encode()
|
||||
if not isinstance(self.regex, bytes): raise Exception("Invalid Regex Paramether")
|
||||
re.compile(self.regex) # raise re.error if it's invalid!
|
||||
case_sensitive = "1" if self.is_case_sensitive else "0"
|
||||
if self.input_mode:
|
||||
yield case_sensitive + "C" + self.regex.hex() if self.is_blacklist else case_sensitive + "c"+ self.regex.hex()
|
||||
if self.output_mode:
|
||||
yield case_sensitive + "S" + self.regex.hex() if self.is_blacklist else case_sensitive + "s"+ self.regex.hex()
|
||||
|
||||
async def update(self):
|
||||
if self.update_func:
|
||||
await run_func(self.update_func, self)
|
||||
|
||||
class FiregexInterceptor:
|
||||
|
||||
def __init__(self):
|
||||
self.filter:FiregexFilter
|
||||
self.filter_map_lock:asyncio.Lock
|
||||
self.filter_map: Dict[str, RegexFilter]
|
||||
self.regex_filters: Set[RegexFilter]
|
||||
self.update_config_lock:asyncio.Lock
|
||||
self.process:asyncio.subprocess.Process
|
||||
self.n_queues:int
|
||||
self.update_task: asyncio.Task
|
||||
|
||||
@classmethod
|
||||
async def start(cls, filter: FiregexFilter, n_queues:int = 1):
|
||||
self = cls()
|
||||
self.filter = filter
|
||||
self.n_queues = n_queues
|
||||
self.filter_map_lock = asyncio.Lock()
|
||||
self.update_config_lock = asyncio.Lock()
|
||||
input_range, output_range = await self._start_binary()
|
||||
self.update_task = asyncio.create_task(self.update_blocked())
|
||||
FiregexTables().add_input(queue_range=input_range, proto=self.filter.proto, port=self.filter.port, ip_int=self.filter.ip_int)
|
||||
FiregexTables().add_output(queue_range=output_range, proto=self.filter.proto, port=self.filter.port, ip_int=self.filter.ip_int)
|
||||
return self
|
||||
|
||||
async def _start_binary(self):
|
||||
proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"./cppqueue")
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
proxy_binary_path, str(self.n_queues),
|
||||
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
|
||||
)
|
||||
line_fut = self.process.stdout.readuntil()
|
||||
try:
|
||||
line_fut = await asyncio.wait_for(line_fut, timeout=3)
|
||||
except asyncio.TimeoutError:
|
||||
self.process.kill()
|
||||
raise Exception("Invalid binary output")
|
||||
line = line_fut.decode()
|
||||
if line.startswith("QUEUES "):
|
||||
params = line.split()
|
||||
return (int(params[2]), int(params[3])), (int(params[5]), int(params[6]))
|
||||
else:
|
||||
self.process.kill()
|
||||
raise Exception("Invalid binary output")
|
||||
|
||||
async def update_blocked(self):
|
||||
try:
|
||||
while True:
|
||||
line = (await self.process.stdout.readuntil()).decode()
|
||||
if line.startswith("BLOCKED"):
|
||||
regex_id = line.split()[1]
|
||||
async with self.filter_map_lock:
|
||||
if regex_id in self.filter_map:
|
||||
self.filter_map[regex_id].blocked+=1
|
||||
await self.filter_map[regex_id].update()
|
||||
except asyncio.CancelledError: pass
|
||||
except asyncio.IncompleteReadError: pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
||||
async def stop(self):
|
||||
self.update_task.cancel()
|
||||
if self.process and self.process.returncode is None:
|
||||
self.process.kill()
|
||||
|
||||
async def _update_config(self, filters_codes):
|
||||
async with self.update_config_lock:
|
||||
self.process.stdin.write((" ".join(filters_codes)+"\n").encode())
|
||||
await self.process.stdin.drain()
|
||||
|
||||
async def reload(self, filters:List[RegexFilter]):
|
||||
async with self.filter_map_lock:
|
||||
self.filter_map = self.compile_filters(filters)
|
||||
filters_codes = self.get_filter_codes()
|
||||
await self._update_config(filters_codes)
|
||||
|
||||
def get_filter_codes(self):
|
||||
filters_codes = list(self.filter_map.keys())
|
||||
filters_codes.sort(key=lambda a: self.filter_map[a].blocked, reverse=True)
|
||||
return filters_codes
|
||||
|
||||
def compile_filters(self, filters:List[RegexFilter]):
|
||||
res = {}
|
||||
for filter_obj in filters:
|
||||
try:
|
||||
raw_filters = filter_obj.compile()
|
||||
for filter in raw_filters:
|
||||
res[filter] = filter_obj
|
||||
except Exception: pass
|
||||
return res
|
||||
0
backend/modules/nfregex/__init__.py
Normal file
0
backend/modules/nfregex/__init__.py
Normal file
151
backend/modules/nfregex/firegex.py
Normal file
151
backend/modules/nfregex/firegex.py
Normal file
@@ -0,0 +1,151 @@
|
||||
from typing import Dict, List, Set
|
||||
from utils.firegextables import FiregexFilter, FiregexTables
|
||||
from utils import ip_parse, ip_family, run_func
|
||||
from modules.nfregex.models import Service, Regex
|
||||
import re, os, asyncio
|
||||
import traceback
|
||||
|
||||
QUEUE_BASE_NUM = 1000
|
||||
|
||||
|
||||
class RegexFilter:
|
||||
def __init__(
|
||||
self, regex,
|
||||
is_case_sensitive=True,
|
||||
is_blacklist=True,
|
||||
input_mode=False,
|
||||
output_mode=False,
|
||||
blocked_packets=0,
|
||||
id=None,
|
||||
update_func = None
|
||||
):
|
||||
self.regex = regex
|
||||
self.is_case_sensitive = is_case_sensitive
|
||||
self.is_blacklist = is_blacklist
|
||||
if input_mode == output_mode: input_mode = output_mode = True # (False, False) == (True, True)
|
||||
self.input_mode = input_mode
|
||||
self.output_mode = output_mode
|
||||
self.blocked = blocked_packets
|
||||
self.id = id
|
||||
self.update_func = update_func
|
||||
self.compiled_regex = self.compile()
|
||||
|
||||
@classmethod
|
||||
def from_regex(cls, regex:Regex, update_func = None):
|
||||
return cls(
|
||||
id=regex.id, regex=regex.regex, is_case_sensitive=regex.is_case_sensitive,
|
||||
is_blacklist=regex.is_blacklist, blocked_packets=regex.blocked_packets,
|
||||
input_mode = regex.mode in ["C","B"], output_mode=regex.mode in ["S","B"],
|
||||
update_func = update_func
|
||||
)
|
||||
def compile(self):
|
||||
if isinstance(self.regex, str): self.regex = self.regex.encode()
|
||||
if not isinstance(self.regex, bytes): raise Exception("Invalid Regex Paramether")
|
||||
re.compile(self.regex) # raise re.error if it's invalid!
|
||||
case_sensitive = "1" if self.is_case_sensitive else "0"
|
||||
if self.input_mode:
|
||||
yield case_sensitive + "C" + self.regex.hex() if self.is_blacklist else case_sensitive + "c"+ self.regex.hex()
|
||||
if self.output_mode:
|
||||
yield case_sensitive + "S" + self.regex.hex() if self.is_blacklist else case_sensitive + "s"+ self.regex.hex()
|
||||
|
||||
async def update(self):
|
||||
if self.update_func:
|
||||
await run_func(self.update_func, self)
|
||||
|
||||
class FiregexInterceptor:
|
||||
|
||||
def __init__(self):
|
||||
self.filter:FiregexFilter
|
||||
self.filter_map_lock:asyncio.Lock
|
||||
self.filter_map: Dict[str, RegexFilter]
|
||||
self.regex_filters: Set[RegexFilter]
|
||||
self.update_config_lock:asyncio.Lock
|
||||
self.process:asyncio.subprocess.Process
|
||||
self.n_queues:int
|
||||
self.update_task: asyncio.Task
|
||||
|
||||
@classmethod
|
||||
async def start(cls, filter: FiregexFilter, n_queues:int = int(os.getenv("NTHREADS","1"))):
|
||||
self = cls()
|
||||
self.filter = filter
|
||||
self.n_queues = n_queues
|
||||
self.filter_map_lock = asyncio.Lock()
|
||||
self.update_config_lock = asyncio.Lock()
|
||||
input_range, output_range = await self._start_binary()
|
||||
self.update_task = asyncio.create_task(self.update_blocked())
|
||||
if not filter in FiregexTables().get():
|
||||
FiregexTables().add_input(queue_range=input_range, proto=self.filter.proto, port=self.filter.port, ip_int=self.filter.ip_int)
|
||||
FiregexTables().add_output(queue_range=output_range, proto=self.filter.proto, port=self.filter.port, ip_int=self.filter.ip_int)
|
||||
return self
|
||||
|
||||
async def _start_binary(self):
|
||||
proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"../cppqueue")
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
proxy_binary_path, str(self.n_queues),
|
||||
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
|
||||
)
|
||||
line_fut = self.process.stdout.readuntil()
|
||||
try:
|
||||
line_fut = await asyncio.wait_for(line_fut, timeout=3)
|
||||
except asyncio.TimeoutError:
|
||||
self.process.kill()
|
||||
raise Exception("Invalid binary output")
|
||||
line = line_fut.decode()
|
||||
if line.startswith("QUEUES "):
|
||||
params = line.split()
|
||||
return (int(params[2]), int(params[3])), (int(params[5]), int(params[6]))
|
||||
else:
|
||||
self.process.kill()
|
||||
raise Exception("Invalid binary output")
|
||||
|
||||
async def update_blocked(self):
|
||||
try:
|
||||
while True:
|
||||
line = (await self.process.stdout.readuntil()).decode()
|
||||
if line.startswith("BLOCKED"):
|
||||
regex_id = line.split()[1]
|
||||
async with self.filter_map_lock:
|
||||
if regex_id in self.filter_map:
|
||||
self.filter_map[regex_id].blocked+=1
|
||||
await self.filter_map[regex_id].update()
|
||||
except asyncio.CancelledError: pass
|
||||
except asyncio.IncompleteReadError: pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
||||
async def stop(self):
|
||||
self.update_task.cancel()
|
||||
if self.process and self.process.returncode is None:
|
||||
self.process.kill()
|
||||
|
||||
async def _update_config(self, filters_codes):
|
||||
async with self.update_config_lock:
|
||||
self.process.stdin.write((" ".join(filters_codes)+"\n").encode())
|
||||
await self.process.stdin.drain()
|
||||
|
||||
async def reload(self, filters:List[RegexFilter]):
|
||||
async with self.filter_map_lock:
|
||||
self.filter_map = self.compile_filters(filters)
|
||||
filters_codes = self.get_filter_codes()
|
||||
await self._update_config(filters_codes)
|
||||
|
||||
def get_filter_codes(self):
|
||||
filters_codes = list(self.filter_map.keys())
|
||||
filters_codes.sort(key=lambda a: self.filter_map[a].blocked, reverse=True)
|
||||
return filters_codes
|
||||
|
||||
def compile_filters(self, filters:List[RegexFilter]):
|
||||
res = {}
|
||||
for filter_obj in filters:
|
||||
try:
|
||||
raw_filters = filter_obj.compile()
|
||||
for filter in raw_filters:
|
||||
res[filter] = filter_obj
|
||||
except Exception: pass
|
||||
return res
|
||||
|
||||
def delete_by_srv(srv:Service):
|
||||
nft = FiregexTables()
|
||||
for filter in nft.get():
|
||||
if filter.port == srv.port and filter.proto == srv.proto and ip_parse(filter.ip_int) == ip_parse(srv.ip_int):
|
||||
nft.cmd({"delete":{"rule": {"handle": filter.id, "table": nft.table_name, "chain": filter.target, "family": "inet"}}})
|
||||
@@ -1,7 +1,8 @@
|
||||
import asyncio
|
||||
from typing import Dict
|
||||
from modules.firegex import FiregexFilter, FiregexTables, RegexFilter
|
||||
from modules.sqlite import Regex, SQLite, Service
|
||||
from modules.nfregex.firegex import FiregexFilter, FiregexInterceptor, FiregexTables, RegexFilter, delete_by_srv
|
||||
from modules.nfregex.models import Regex, Service
|
||||
from utils.sqlite import SQLite
|
||||
|
||||
class STATUS:
|
||||
STOP = "stop"
|
||||
@@ -93,13 +94,13 @@ class ServiceManager:
|
||||
|
||||
async def start(self):
|
||||
if not self.interceptor:
|
||||
FiregexTables().delete_by_srv(self.srv)
|
||||
self.interceptor = await FiregexTables().add(FiregexFilter(self.srv.proto,self.srv.port, self.srv.ip_int))
|
||||
delete_by_srv(self.srv)
|
||||
self.interceptor = await FiregexInterceptor.start(FiregexFilter(self.srv.proto,self.srv.port, self.srv.ip_int))
|
||||
await self._update_filters_from_db()
|
||||
self._set_status(STATUS.ACTIVE)
|
||||
|
||||
async def stop(self):
|
||||
FiregexTables().delete_by_srv(self.srv)
|
||||
delete_by_srv(self.srv)
|
||||
if self.interceptor:
|
||||
await self.interceptor.stop()
|
||||
self.interceptor = None
|
||||
30
backend/modules/nfregex/models.py
Normal file
30
backend/modules/nfregex/models.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import base64
|
||||
|
||||
class Service:
|
||||
def __init__(self, id: str, status: str, port: int, name: str, proto: str, ip_int: str):
|
||||
self.id = id
|
||||
self.status = status
|
||||
self.port = port
|
||||
self.name = name
|
||||
self.proto = proto
|
||||
self.ip_int = ip_int
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, var: dict):
|
||||
return cls(id=var["service_id"], status=var["status"], port=var["port"], name=var["name"], proto=var["proto"], ip_int=var["ip_int"])
|
||||
|
||||
|
||||
class Regex:
|
||||
def __init__(self, id: int, regex: bytes, mode: str, service_id: str, is_blacklist: bool, blocked_packets: int, is_case_sensitive: bool, active: bool):
|
||||
self.regex = regex
|
||||
self.mode = mode
|
||||
self.service_id = service_id
|
||||
self.is_blacklist = is_blacklist
|
||||
self.blocked_packets = blocked_packets
|
||||
self.id = id
|
||||
self.is_case_sensitive = is_case_sensitive
|
||||
self.active = active
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, var: dict):
|
||||
return cls(id=var["regex_id"], regex=base64.b64decode(var["regex"]), mode=var["mode"], service_id=var["service_id"], is_blacklist=var["is_blacklist"], blocked_packets=var["blocked_packets"], is_case_sensitive=var["is_case_sensitive"], active=var["active"])
|
||||
BIN
backend/modules/proxy
Executable file
BIN
backend/modules/proxy
Executable file
Binary file not shown.
0
backend/modules/regexproxy/__init__.py
Normal file
0
backend/modules/regexproxy/__init__.py
Normal file
116
backend/modules/regexproxy/proxy.py
Executable file
116
backend/modules/regexproxy/proxy.py
Executable file
@@ -0,0 +1,116 @@
|
||||
import re, os, asyncio
|
||||
|
||||
class Filter:
|
||||
def __init__(self, regex, is_case_sensitive=True, is_blacklist=True, c_to_s=False, s_to_c=False, blocked_packets=0, code=None):
|
||||
self.regex = regex
|
||||
self.is_case_sensitive = is_case_sensitive
|
||||
self.is_blacklist = is_blacklist
|
||||
if c_to_s == s_to_c: c_to_s = s_to_c = True # (False, False) == (True, True)
|
||||
self.c_to_s = c_to_s
|
||||
self.s_to_c = s_to_c
|
||||
self.blocked = blocked_packets
|
||||
self.code = code
|
||||
|
||||
def compile(self):
|
||||
if isinstance(self.regex, str): self.regex = self.regex.encode()
|
||||
if not isinstance(self.regex, bytes): raise Exception("Invalid Regex Paramether")
|
||||
re.compile(self.regex) # raise re.error if is invalid!
|
||||
case_sensitive = "1" if self.is_case_sensitive else "0"
|
||||
if self.c_to_s:
|
||||
yield case_sensitive + "C" + self.regex.hex() if self.is_blacklist else case_sensitive + "c"+ self.regex.hex()
|
||||
if self.s_to_c:
|
||||
yield case_sensitive + "S" + self.regex.hex() if self.is_blacklist else case_sensitive + "s"+ self.regex.hex()
|
||||
|
||||
class Proxy:
|
||||
def __init__(self, internal_port=0, public_port=0, callback_blocked_update=None, filters=None, public_host="0.0.0.0", internal_host="127.0.0.1"):
|
||||
self.filter_map = {}
|
||||
self.filter_map_lock = asyncio.Lock()
|
||||
self.update_config_lock = asyncio.Lock()
|
||||
self.status_change = asyncio.Lock()
|
||||
self.public_host = public_host
|
||||
self.public_port = public_port
|
||||
self.internal_host = internal_host
|
||||
self.internal_port = internal_port
|
||||
self.filters = set(filters) if filters else set([])
|
||||
self.process = None
|
||||
self.callback_blocked_update = callback_blocked_update
|
||||
|
||||
async def start(self, in_pause=False):
|
||||
await self.status_change.acquire()
|
||||
if not self.isactive():
|
||||
try:
|
||||
self.filter_map = self.compile_filters()
|
||||
filters_codes = self.get_filter_codes() if not in_pause else []
|
||||
proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"../proxy")
|
||||
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
proxy_binary_path, str(self.public_host), str(self.public_port), str(self.internal_host), str(self.internal_port),
|
||||
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
|
||||
)
|
||||
await self.update_config(filters_codes)
|
||||
finally:
|
||||
self.status_change.release()
|
||||
try:
|
||||
while True:
|
||||
buff = await self.process.stdout.readuntil()
|
||||
stdout_line = buff.decode()
|
||||
if stdout_line.startswith("BLOCKED"):
|
||||
regex_id = stdout_line.split()[1]
|
||||
async with self.filter_map_lock:
|
||||
if regex_id in self.filter_map:
|
||||
self.filter_map[regex_id].blocked+=1
|
||||
if self.callback_blocked_update: self.callback_blocked_update(self.filter_map[regex_id])
|
||||
except Exception:
|
||||
return await self.process.wait()
|
||||
else:
|
||||
self.status_change.release()
|
||||
|
||||
|
||||
async def stop(self):
|
||||
async with self.status_change:
|
||||
if self.isactive():
|
||||
self.process.kill()
|
||||
return False
|
||||
return True
|
||||
|
||||
async def restart(self, in_pause=False):
|
||||
status = await self.stop()
|
||||
await self.start(in_pause=in_pause)
|
||||
return status
|
||||
|
||||
async def update_config(self, filters_codes):
|
||||
async with self.update_config_lock:
|
||||
if (self.isactive()):
|
||||
self.process.stdin.write((" ".join(filters_codes)+"\n").encode())
|
||||
await self.process.stdin.drain()
|
||||
|
||||
async def reload(self):
|
||||
if self.isactive():
|
||||
async with self.filter_map_lock:
|
||||
self.filter_map = self.compile_filters()
|
||||
filters_codes = self.get_filter_codes()
|
||||
await self.update_config(filters_codes)
|
||||
|
||||
def get_filter_codes(self):
|
||||
filters_codes = list(self.filter_map.keys())
|
||||
filters_codes.sort(key=lambda a: self.filter_map[a].blocked, reverse=True)
|
||||
return filters_codes
|
||||
|
||||
def isactive(self):
|
||||
return self.process and self.process.returncode is None
|
||||
|
||||
async def pause(self):
|
||||
if self.isactive():
|
||||
await self.update_config([])
|
||||
else:
|
||||
await self.start(in_pause=True)
|
||||
|
||||
def compile_filters(self):
|
||||
res = {}
|
||||
for filter_obj in self.filters:
|
||||
try:
|
||||
raw_filters = filter_obj.compile()
|
||||
for filter in raw_filters:
|
||||
res[filter] = filter_obj
|
||||
except Exception: pass
|
||||
return res
|
||||
196
backend/modules/regexproxy/utils.py
Normal file
196
backend/modules/regexproxy/utils.py
Normal file
@@ -0,0 +1,196 @@
|
||||
import secrets
|
||||
from modules.regexproxy.proxy import Filter, Proxy
|
||||
import random, socket, asyncio
|
||||
from base64 import b64decode
|
||||
from utils.sqlite import SQLite
|
||||
|
||||
class STATUS:
|
||||
WAIT = "wait"
|
||||
STOP = "stop"
|
||||
PAUSE = "pause"
|
||||
ACTIVE = "active"
|
||||
|
||||
class ServiceNotFoundException(Exception): pass
|
||||
|
||||
class ServiceManager:
|
||||
def __init__(self, id, db):
|
||||
self.id = id
|
||||
self.db = db
|
||||
self.proxy = Proxy(
|
||||
internal_host="127.0.0.1",
|
||||
callback_blocked_update=self._stats_updater
|
||||
)
|
||||
self.status = STATUS.STOP
|
||||
self.wanted_status = STATUS.STOP
|
||||
self.filters = {}
|
||||
self._update_port_from_db()
|
||||
self._update_filters_from_db()
|
||||
self.lock = asyncio.Lock()
|
||||
self.starter = None
|
||||
|
||||
def _update_port_from_db(self):
|
||||
res = self.db.query("""
|
||||
SELECT
|
||||
public_port,
|
||||
internal_port
|
||||
FROM services WHERE service_id = ?;
|
||||
""", self.id)
|
||||
if len(res) == 0: raise ServiceNotFoundException()
|
||||
self.proxy.internal_port = res[0]["internal_port"]
|
||||
self.proxy.public_port = res[0]["public_port"]
|
||||
|
||||
def _update_filters_from_db(self):
|
||||
res = self.db.query("""
|
||||
SELECT
|
||||
regex, mode, regex_id `id`, is_blacklist,
|
||||
blocked_packets n_packets, is_case_sensitive
|
||||
FROM regexes WHERE service_id = ? AND active=1;
|
||||
""", self.id)
|
||||
|
||||
#Filter check
|
||||
old_filters = set(self.filters.keys())
|
||||
new_filters = set([f["id"] for f in res])
|
||||
|
||||
#remove old filters
|
||||
for f in old_filters:
|
||||
if not f in new_filters:
|
||||
del self.filters[f]
|
||||
|
||||
for f in new_filters:
|
||||
if not f in old_filters:
|
||||
filter_info = [ele for ele in res if ele["id"] == f][0]
|
||||
self.filters[f] = Filter(
|
||||
is_case_sensitive=filter_info["is_case_sensitive"],
|
||||
c_to_s=filter_info["mode"] in ["C","B"],
|
||||
s_to_c=filter_info["mode"] in ["S","B"],
|
||||
is_blacklist=filter_info["is_blacklist"],
|
||||
regex=b64decode(filter_info["regex"]),
|
||||
blocked_packets=filter_info["n_packets"],
|
||||
code=f
|
||||
)
|
||||
self.proxy.filters = list(self.filters.values())
|
||||
|
||||
def __update_status_db(self, status):
|
||||
self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", status, self.id)
|
||||
|
||||
async def next(self,to):
|
||||
async with self.lock:
|
||||
return await self._next(to)
|
||||
|
||||
async def _next(self, to):
|
||||
if self.status != to:
|
||||
# ACTIVE -> PAUSE or PAUSE -> ACTIVE
|
||||
if (self.status, to) in [(STATUS.ACTIVE, STATUS.PAUSE)]:
|
||||
await self.proxy.pause()
|
||||
self._set_status(to)
|
||||
|
||||
elif (self.status, to) in [(STATUS.PAUSE, STATUS.ACTIVE)]:
|
||||
await self.proxy.reload()
|
||||
self._set_status(to)
|
||||
|
||||
# ACTIVE -> STOP
|
||||
elif (self.status,to) in [(STATUS.ACTIVE, STATUS.STOP), (STATUS.WAIT, STATUS.STOP), (STATUS.PAUSE, STATUS.STOP)]: #Stop proxy
|
||||
if self.starter: self.starter.cancel()
|
||||
await self.proxy.stop()
|
||||
self._set_status(to)
|
||||
|
||||
# STOP -> ACTIVE or STOP -> PAUSE
|
||||
elif (self.status, to) in [(STATUS.STOP, STATUS.ACTIVE), (STATUS.STOP, STATUS.PAUSE)]:
|
||||
self.wanted_status = to
|
||||
self._set_status(STATUS.WAIT)
|
||||
self.__proxy_starter(to)
|
||||
|
||||
|
||||
def _stats_updater(self,filter:Filter):
|
||||
self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", filter.blocked, filter.code)
|
||||
|
||||
async def update_port(self):
|
||||
async with self.lock:
|
||||
self._update_port_from_db()
|
||||
if self.status in [STATUS.PAUSE, STATUS.ACTIVE]:
|
||||
next_status = self.status if self.status != STATUS.WAIT else self.wanted_status
|
||||
await self._next(STATUS.STOP)
|
||||
await self._next(next_status)
|
||||
|
||||
def _set_status(self,status):
|
||||
self.status = status
|
||||
self.__update_status_db(status)
|
||||
|
||||
|
||||
async def update_filters(self):
|
||||
async with self.lock:
|
||||
self._update_filters_from_db()
|
||||
if self.status in [STATUS.PAUSE, STATUS.ACTIVE]:
|
||||
await self.proxy.reload()
|
||||
|
||||
def __proxy_starter(self,to):
|
||||
async def func():
|
||||
try:
|
||||
while True:
|
||||
if check_port_is_open(self.proxy.public_port):
|
||||
self._set_status(to)
|
||||
await self.proxy.start(in_pause=(to==STATUS.PAUSE))
|
||||
self._set_status(STATUS.STOP)
|
||||
return
|
||||
else:
|
||||
await asyncio.sleep(.5)
|
||||
except asyncio.CancelledError:
|
||||
self._set_status(STATUS.STOP)
|
||||
await self.proxy.stop()
|
||||
self.starter = asyncio.create_task(func())
|
||||
|
||||
class ProxyManager:
|
||||
def __init__(self, db:SQLite):
|
||||
self.db = db
|
||||
self.proxy_table:dict = {}
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
async def close(self):
|
||||
for key in list(self.proxy_table.keys()):
|
||||
await self.remove(key)
|
||||
|
||||
async def remove(self,id):
|
||||
async with self.lock:
|
||||
if id in self.proxy_table:
|
||||
await self.proxy_table[id].next(STATUS.STOP)
|
||||
del self.proxy_table[id]
|
||||
|
||||
async def reload(self):
|
||||
async with self.lock:
|
||||
for srv in self.db.query('SELECT service_id, status FROM services;'):
|
||||
srv_id, req_status = srv["service_id"], srv["status"]
|
||||
if srv_id in self.proxy_table:
|
||||
continue
|
||||
|
||||
self.proxy_table[srv_id] = ServiceManager(srv_id,self.db)
|
||||
await self.proxy_table[srv_id].next(req_status)
|
||||
|
||||
def get(self,id):
|
||||
if id in self.proxy_table:
|
||||
return self.proxy_table[id]
|
||||
else:
|
||||
raise ServiceNotFoundException()
|
||||
|
||||
def check_port_is_open(port):
|
||||
try:
|
||||
sock = socket.socket()
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind(('0.0.0.0',port))
|
||||
sock.close()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def gen_service_id(db):
|
||||
while True:
|
||||
res = secrets.token_hex(8)
|
||||
if len(db.query('SELECT 1 FROM services WHERE service_id = ?;', res)) == 0:
|
||||
break
|
||||
return res
|
||||
|
||||
def gen_internal_port(db):
|
||||
while True:
|
||||
res = random.randint(30000, 45000)
|
||||
if len(db.query('SELECT 1 FROM services WHERE internal_port = ?;', res)) == 0:
|
||||
break
|
||||
return res
|
||||
@@ -1,125 +0,0 @@
|
||||
from typing import Union
|
||||
import json, sqlite3, os
|
||||
from hashlib import md5
|
||||
import base64
|
||||
|
||||
class SQLite():
|
||||
def __init__(self, db_name: str, schema:dict = None) -> None:
|
||||
self.conn: Union[None, sqlite3.Connection] = None
|
||||
self.cur = None
|
||||
self.db_name = db_name
|
||||
self.__backup = None
|
||||
self.schema = {} if schema is None else schema
|
||||
self.DB_VER = md5(json.dumps(self.schema).encode()).hexdigest()
|
||||
|
||||
def connect(self) -> None:
|
||||
try:
|
||||
self.conn = sqlite3.connect(self.db_name, check_same_thread = False)
|
||||
except Exception:
|
||||
path_name = os.path.dirname(self.db_name)
|
||||
if not os.path.exists(path_name): os.makedirs(path_name)
|
||||
with open(self.db_name, 'x'): pass
|
||||
self.conn = sqlite3.connect(self.db_name, check_same_thread = False)
|
||||
def dict_factory(cursor, row):
|
||||
d = {}
|
||||
for idx, col in enumerate(cursor.description):
|
||||
d[col[0]] = row[idx]
|
||||
return d
|
||||
self.conn.row_factory = dict_factory
|
||||
|
||||
def backup(self):
|
||||
with open(self.db_name, "rb") as f:
|
||||
self.__backup = f.read()
|
||||
|
||||
def restore(self):
|
||||
were_active = True if self.conn else False
|
||||
self.disconnect()
|
||||
if self.__backup:
|
||||
with open(self.db_name, "wb") as f:
|
||||
f.write(self.__backup)
|
||||
self.__backup = None
|
||||
if were_active: self.connect()
|
||||
|
||||
def delete_backup(self):
|
||||
self.__backup = None
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if self.conn: self.conn.close()
|
||||
self.conn = None
|
||||
|
||||
def create_schema(self, tables = {}) -> None:
|
||||
if self.conn:
|
||||
cur = self.conn.cursor()
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS main.keys_values(key VARCHAR(100) PRIMARY KEY, value VARCHAR(100) NOT NULL);")
|
||||
for t in tables:
|
||||
if t == "QUERY": continue
|
||||
cur.execute('CREATE TABLE IF NOT EXISTS main.{}({});'.format(t, ''.join([(c + ' ' + tables[t][c] + ', ') for c in tables[t]])[:-2]))
|
||||
if "QUERY" in tables: [cur.execute(qry) for qry in tables["QUERY"]]
|
||||
cur.close()
|
||||
|
||||
def query(self, query, *values):
|
||||
cur = self.conn.cursor()
|
||||
try:
|
||||
cur.execute(query, values)
|
||||
return cur.fetchall()
|
||||
finally:
|
||||
cur.close()
|
||||
try: self.conn.commit()
|
||||
except Exception: pass
|
||||
|
||||
def delete(self):
|
||||
self.disconnect()
|
||||
os.remove(self.db_name)
|
||||
|
||||
def init(self):
|
||||
self.connect()
|
||||
try:
|
||||
if self.get('DB_VERSION') != self.DB_VER: raise Exception("DB_VERSION is not correct")
|
||||
except Exception:
|
||||
self.delete()
|
||||
self.connect()
|
||||
self.create_schema(self.schema)
|
||||
self.put('DB_VERSION', self.DB_VER)
|
||||
|
||||
def get(self, key):
|
||||
q = self.query('SELECT value FROM keys_values WHERE key = ?', key)
|
||||
if len(q) == 0:
|
||||
return None
|
||||
else:
|
||||
return q[0]["value"]
|
||||
|
||||
def put(self, key, value):
|
||||
if self.get(key) is None:
|
||||
self.query('INSERT INTO keys_values (key, value) VALUES (?, ?);', key, str(value))
|
||||
else:
|
||||
self.query('UPDATE keys_values SET value=? WHERE key = ?;', str(value), key)
|
||||
|
||||
|
||||
class Service:
|
||||
def __init__(self, id: str, status: str, port: int, name: str, proto: str, ip_int: str):
|
||||
self.id = id
|
||||
self.status = status
|
||||
self.port = port
|
||||
self.name = name
|
||||
self.proto = proto
|
||||
self.ip_int = ip_int
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, var: dict):
|
||||
return cls(id=var["service_id"], status=var["status"], port=var["port"], name=var["name"], proto=var["proto"], ip_int=var["ip_int"])
|
||||
|
||||
|
||||
class Regex:
|
||||
def __init__(self, id: int, regex: bytes, mode: str, service_id: str, is_blacklist: bool, blocked_packets: int, is_case_sensitive: bool, active: bool):
|
||||
self.regex = regex
|
||||
self.mode = mode
|
||||
self.service_id = service_id
|
||||
self.is_blacklist = is_blacklist
|
||||
self.blocked_packets = blocked_packets
|
||||
self.id = id
|
||||
self.is_case_sensitive = is_case_sensitive
|
||||
self.active = active
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, var: dict):
|
||||
return cls(id=var["regex_id"], regex=base64.b64decode(var["regex"]), mode=var["mode"], service_id=var["service_id"], is_blacklist=var["is_blacklist"], blocked_packets=var["blocked_packets"], is_case_sensitive=var["is_case_sensitive"], active=var["active"])
|
||||
Reference in New Issue
Block a user