From fa889401127543ee626af1413ec015b96a8685b5 Mon Sep 17 00:00:00 2001 From: nik012003 Date: Sat, 18 Jun 2022 13:05:59 +0200 Subject: [PATCH] Fixed real-time regex --- backend/proxy/__init__.py | 62 ++++++++++++++----------- backend/proxy/proxy.cpp | 4 ++ backend/utils.py | 96 ++++++++++++++++++++++----------------- 3 files changed, 94 insertions(+), 68 deletions(-) diff --git a/backend/proxy/__init__.py b/backend/proxy/__init__.py index a0aceb9..eac517a 100755 --- a/backend/proxy/__init__.py +++ b/backend/proxy/__init__.py @@ -1,6 +1,8 @@ from signal import SIGUSR1 +from secrets import token_urlsafe import subprocess, re, os + #c++ -o proxy proxy.cpp class Filter: @@ -25,33 +27,36 @@ class Filter: yield case_sensitive + "S" + self.regex.hex() if self.is_blacklist else case_sensitive + "s"+ self.regex.hex() class Proxy: - def __init__(self, internal_port, public_port, filters=None, public_host="0.0.0.0", internal_host="127.0.0.1"): + def __init__(self, internal_port, public_port, callback_blocked_update=None, filters=None, public_host="0.0.0.0", internal_host="127.0.0.1"): self.public_host = public_host self.public_port = public_port self.internal_host = internal_host self.internal_port = internal_port self.filters = set(filters) if filters else set([]) self.process = None + self.callback_blocked_update = callback_blocked_update + self.config_file_path = None + while self.config_file_path is None: + config_file_path = os.path.join("/tmp/" + token_urlsafe(16)) + if not os.path.exists(config_file_path): + self.config_file_path = config_file_path - def start(self, callback=None): + def start(self, in_pause=False): if self.process is None: filter_map = self.compile_filters() - filters_codes = list(filter_map.keys()) + filters_codes = list(filter_map.keys()) if not in_pause else [] proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"./proxy") - config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"./config_file") - with open(config_file_path,'w') as config_file: - for line in filters_codes: - config_file.write(line + '\n') + self.__write_config(filters_codes) self.process = subprocess.Popen( - [proxy_binary_path, str(self.public_host), str(self.public_port), str(self.internal_host), str(self.internal_port), config_file_path], + [proxy_binary_path, str(self.public_host), str(self.public_port), str(self.internal_host), str(self.internal_port), self.config_file_path], stdout=subprocess.PIPE, universal_newlines=True ) for stdout_line in iter(self.process.stdout.readline, ""): if stdout_line.startswith("BLOCKED"): regex_id = stdout_line.split()[1] filter_map[regex_id].blocked+=1 - if callback: callback(filter_map[regex_id]) + if self.callback_blocked_update: self.callback_blocked_update(filter_map[regex_id]) self.process.stdout.close() return self.process.wait() @@ -68,24 +73,37 @@ class Proxy: self.process = None return True - def restart(self): + def restart(self, in_pause=False): status = self.stop() - self.start() + self.start(in_pause=in_pause) return status + def __write_config(self, filters_codes): + with open(self.config_file_path,'w') as config_file: + for line in filters_codes: + config_file.write(line + '\n') + def reload(self): - if self.process: + if self.isactive(): filter_map = self.compile_filters() filters_codes = list(filter_map.keys()) - config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"./config_file") - with open(config_file_path,'w') as config_file: - for line in filters_codes: - config_file.write(line + '\n') - self.process.send_signal(SIGUSR1) + self.__write_config(filters_codes) + self.trigger_reload_config() def isactive(self): return True if self.process else False + def trigger_reload_config(self): + self.process.send_signal(SIGUSR1) + + + def pause(self): + if self.isactive(): + self.__write_config([]) + self.trigger_reload_config() + else: + self.start(in_pause=True) + def compile_filters(self): res = {} for filter_obj in self.filters: @@ -95,13 +113,3 @@ class Proxy: res[filter] = filter_obj except Exception: pass return res - - def add_filter(self, filter): - self.filters.add(filter) - self.reload() - - def remove_filter(self, filter): - try: - del self.filters[self.filters.remove(filter)] - except ValueError: return - self.reload() diff --git a/backend/proxy/proxy.cpp b/backend/proxy/proxy.cpp index 7f6c27a..605f3a2 100644 --- a/backend/proxy/proxy.cpp +++ b/backend/proxy/proxy.cpp @@ -318,12 +318,16 @@ void push_regex(char* arg, bool case_sensitive, vector if (case_sensitive){ boost::regex regex(reinterpret_cast(expr), reinterpret_cast(expr) + expr_len); + #ifdef DEBUG cout << "Added case sensitive regex " << expr << endl; + #endif v.push_back(make_pair(string(arg), regex)); } else { boost::regex regex(reinterpret_cast(expr), reinterpret_cast(expr) + expr_len, boost::regex::icase); + #ifdef DEBUG cout << "Added case insensitive regex " << expr << endl; + #endif v.push_back(make_pair(string(arg), regex)); } } diff --git a/backend/utils.py b/backend/utils.py index 476c6b5..f998c75 100755 --- a/backend/utils.py +++ b/backend/utils.py @@ -1,4 +1,5 @@ from asyncore import file_dispatcher +from imp import reload from proxy import Filter, Proxy import random, string, os, threading, sqlite3, time, atexit, socket from kthread import KThread @@ -148,14 +149,12 @@ class ProxyManager: self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", (status, id)) def __proxy_starter(self, id, proxy:Proxy, next_status, saved_status): - def stats_updater(filter:Filter): - self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", (filter.blocked, filter.code)) def func(): while True: if check_port_is_open(proxy.public_port): self.__update_status_db(id, next_status) if saved_status[0] == "wait": saved_status[0] = next_status - proxy_status = proxy.start(callback=stats_updater) + proxy_status = proxy.start(in_pause=(next_status==STATUS.PAUSE)) if proxy_status == 1: self.__update_status_db(id, STATUS.STOP) return @@ -170,10 +169,13 @@ class ProxyManager: proxy = None thr_starter:KThread = None - previous_status = "stop" + previous_status = STATUS.STOP filters = {} while True: + restart_required = False + reload_required = False + data = self.get_service_data(id) #Close thread @@ -181,14 +183,7 @@ class ProxyManager: if proxy and proxy.isactive(): proxy.stop() return - - restart_required = False - - #Port checks - if proxy and (proxy.internal_port != data['internal_port'] or proxy.public_port != data['public_port']): - restart_required = True - - + #Filter check old_filters = set(filters.keys()) new_filters = set([f["id"] for f in data["filters"]]) @@ -196,12 +191,12 @@ class ProxyManager: #remove old filters for f in old_filters: if not f in new_filters: - restart_required = False + reload_required = True del filters[f] for f in new_filters: if not f in old_filters: - restart_required = False + reload_required = True filter_info = [ele for ele in data['filters'] if ele["id"] == f][0] filters[f] = Filter( is_case_sensitive=filter_info["is_case_sensitive"], @@ -212,21 +207,43 @@ class ProxyManager: blocked_packets=filter_info["n_packets"], code=f ) + + + def stats_updater(filter:Filter): + print("Callback received",filter.blocked, filter.code) + self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", (filter.blocked, filter.code)) + + if not proxy: + proxy = Proxy( + internal_port=data['internal_port'], + public_port=data['public_port'], + filters=list(filters.values()), + internal_host=LOCALHOST_IP, + callback_blocked_update=stats_updater + ) + + #Port checks + if proxy.internal_port != data['internal_port'] or proxy.public_port != data['public_port']: + proxy.internal_port = data['internal_port'] + proxy.public_port = data['public_port'] + restart_required = True + + #Update filters + if reload_required: + proxy.filters = list(filters.values()) #proxy status managment - if previous_status != next_status[0] or restart_required: - if (previous_status, next_status[0]) in [(STATUS.ACTIVE, STATUS.PAUSE), (STATUS.STOP, STATUS.PAUSE), (STATUS.PAUSE, STATUS.PAUSE)]: - if proxy: proxy.stop() - proxy = Proxy( - internal_port=data['internal_port'], - public_port=data['public_port'], - filters=[], - internal_host=LOCALHOST_IP, - ) - previous_status = next_status[0] = STATUS.PAUSE - self.__update_status_db(id, STATUS.WAIT) - thr_starter = self.__proxy_starter(id, proxy, STATUS.PAUSE, [previous_status]) - restart_required = False + if previous_status != next_status[0]: + # ACTIVE -> PAUSE or PAUSE -> ACTIVE + if (previous_status, next_status[0]) in [(STATUS.ACTIVE, STATUS.PAUSE), (STATUS.PAUSE, STATUS.ACTIVE)]: + if restart_required: + proxy.restart(in_pause=next_status[0]) + else: + if next_status[0] == STATUS.ACTIVE: proxy.reload() + else: proxy.pause() + previous_status = next_status[0] + self.__update_status_db(id, next_status[0]) + reload_required = restart_required = False # ACTIVE -> STOP elif (previous_status,next_status[0]) in [(STATUS.ACTIVE, STATUS.STOP), (STATUS.WAIT, STATUS.STOP), (STATUS.PAUSE, STATUS.STOP)]: #Stop proxy @@ -234,26 +251,23 @@ class ProxyManager: proxy.stop() previous_status = next_status[0] = STATUS.STOP self.__update_status_db(id, STATUS.STOP) - restart_required = False - - elif (previous_status, next_status[0]) in [(STATUS.PAUSE, STATUS.ACTIVE), (STATUS.STOP, STATUS.ACTIVE), (STATUS.ACTIVE, STATUS.ACTIVE)]: - if proxy: proxy.stop() - proxy = Proxy( - internal_port=data['internal_port'], - public_port=data['public_port'], - filters=list(filters.values()), - internal_host=LOCALHOST_IP, - ) - previous_status = next_status[0] = STATUS.ACTIVE + reload_required = restart_required = False + + # STOP -> ACTIVE or STOP -> PAUSE + elif (previous_status, next_status[0]) in [(STATUS.STOP, STATUS.ACTIVE), (STATUS.STOP, STATUS.PAUSE)]: + previous_status = next_status[0] self.__update_status_db(id, STATUS.WAIT) - thr_starter = self.__proxy_starter(id, proxy, STATUS.ACTIVE, [previous_status]) - restart_required = False + thr_starter = self.__proxy_starter(id, proxy, next_status[0], [previous_status]) + reload_required = restart_required = False else: self.__update_status_db(id, previous_status) + if restart_required: proxy.restart() + elif reload_required: proxy.reload() + signal.wait() signal.clear() - + def check_port_is_open(port): try: