From 4abb7e2746e05fc23361822cd47200b437412362 Mon Sep 17 00:00:00 2001 From: DomySh Date: Mon, 13 Jun 2022 18:44:11 +0200 Subject: [PATCH] ProxyManager implementation --- Dockerfile | 3 +- backend/app.py | 268 +++++++------------ backend/proxy/__init__.py | 15 +- backend/requirements.txt | 3 +- backend/utils.py | 266 ++++++++++++++++++ frontend/src/components/ServiceRow/index.tsx | 2 +- 6 files changed, 381 insertions(+), 176 deletions(-) create mode 100755 backend/utils.py diff --git a/Dockerfile b/Dockerfile index 50974a5..7977114 100755 --- a/Dockerfile +++ b/Dockerfile @@ -32,7 +32,6 @@ COPY ./config/nginx.conf /tmp/nginx.conf COPY ./config/start_nginx.sh /tmp/start_nginx.sh RUN c++ -O3 -o proxy/proxy proxy/proxy.cpp -pthread -lboost_system -lboost_regex -RUN chmod ug+x /execute/proxy/proxy #Copy react app in the main container COPY --from=frontend /app/build/ ./frontend/ @@ -41,6 +40,8 @@ RUN usermod -a -G root nobody RUN chown -R nobody:root /execute && \ chmod -R 660 /execute && chmod -R u+X /execute +RUN chmod ug+x /execute/proxy/proxy + ENTRYPOINT ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/backend/app.py b/backend/app.py index b4885e6..d60965a 100644 --- a/backend/app.py +++ b/backend/app.py @@ -1,92 +1,17 @@ -import sqlite3, random, string, subprocess, sys, threading, os, bcrypt, secrets, time +import sqlite3, subprocess, sys, threading, bcrypt, secrets, time from flask import Flask, jsonify, request, abort, session from functools import wraps from flask_cors import CORS -from kthread import KThread - - -class SQLite(): - def __init__(self, db_name) -> None: - self.conn = None - self.cur = None - self.db_name = db_name - self.lock = threading.Lock() - - def connect(self) -> None: - if not os.path.exists("db"): os.mkdir("db") - try: - self.conn = sqlite3.connect("db/" + self.db_name + '.db', check_same_thread = False) - except Exception: - with open(self.db_name + '.db', 'x') as f: - pass - self.conn = sqlite3.connect("db/" + self.db_name + '.db', check_same_thread = False) - - def disconnect(self) -> None: - self.conn.close() - - def check_integrity(self, tables = {}) -> None: - cur = self.conn.cursor() - for t in tables: - cur.execute(''' - SELECT name FROM sqlite_master WHERE type='table' AND name='{}'; - '''.format(t)) - - if len(cur.fetchall()) == 0: - cur.execute('''CREATE TABLE main.{}({});'''.format(t, ''.join([(c + ' ' + tables[t][c] + ', ') for c in tables[t]])[:-2])) - cur.close() - - def query(self, query, values = ()): - cur = self.conn.cursor() - try: - with self.lock: - cur.execute(query, values) - return cur.fetchall() - finally: - cur.close() - self.conn.commit() - -def from_name_get_id(name): - serv_id = name.strip().replace(" ","-") - serv_id = "".join([c for c in serv_id if c in (string.ascii_uppercase + string.ascii_lowercase + string.digits + "-")]) - return serv_id.lower() - -def gen_internal_port(): - while True: - res = random.randint(30000, 45000) - if len(db.query('SELECT 1 FROM services WHERE internal_port = ?;', (res,))) == 0: - break - return res - - - +from jsonschema import validate +from utils import SQLite, KeyValueStorage, gen_internal_port, ProxyManager, from_name_get_id, STATUS # DB init db = SQLite('firegex') db.connect() - - -class KeyValueStorage: - def __init__(self): - pass - - def get(self, key): - q = db.query('SELECT value FROM keys_values WHERE key = ?', (key,)) - if len(q) == 0: - return None - else: - return q[0][0] - - def put(self, key, value): - if self.get(key) is None: - db.query('INSERT INTO keys_values (key, value) VALUES (?, ?);', (key,str(value))) - else: - db.query('UPDATE keys_values SET value=? WHERE key = ?;', (str(value), key)) - - +conf = KeyValueStorage(db) +firewall = ProxyManager(db) app = Flask(__name__) -conf = KeyValueStorage() -proxy_table = {} DEBUG = len(sys.argv) > 1 and sys.argv[1] == "DEBUG" @@ -104,36 +29,10 @@ def login_required(f): return decorated_function -def get_service_data(id): - q = db.query('SELECT * FROM services WHERE service_id=?;',(id,)) - if len(q) == 0: return None - srv = q[0] - return { - 'id': srv[1], - 'status': srv[0], - 'public_port': srv[3], - 'internal_port': srv[2] - } - -def service_manager(id): - data = get_service_data(id) - if data is None: return - - @app.before_first_request def before_first_request(): - - - services = [ - - ] - - for srv in services: - - - - + firewall.reload() app.config['SECRET_KEY'] = secrets.token_hex(32) if DEBUG: app.config["STATUS"] = "run" @@ -153,8 +52,19 @@ def get_status(): def login(): if not conf.get("password"): return abort(404) req = request.get_json(force = True) - if not "password" in req or not isinstance(req["password"],str): + + try: + validate( + instance=req, + schema={ + "type" : "object", + "properties" : { + "password" : {"type" : "string"} + }, + }) + except Exception: return abort(400) + if req["password"] == "": return {"status":"Cannot insert an empty password!"} time.sleep(.3) # No bruteforce :) @@ -172,8 +82,20 @@ def logout(): @login_required def change_password(): req = request.get_json(force = True) - if not "password" in req or not isinstance(req["password"],str): + + try: + validate( + instance=req, + schema={ + "type" : "object", + "properties" : { + "password" : {"type" : "string"}, + "expire": {"type" : "boolean"}, + }, + }) + except Exception: return abort(400) + if req["password"] == "": return {"status":"Cannot insert an empty password!"} if req["expire"]: @@ -188,6 +110,19 @@ def change_password(): def set_password(): if conf.get("password"): return abort(404) req = request.get_json(force = True) + + try: + validate( + instance=req, + schema={ + "type" : "object", + "properties" : { + "password" : {"type" : "string"} + }, + }) + except Exception: + return abort(400) + if not "password" in req or not isinstance(req["password"],str): return abort(400) if req["password"] == "": @@ -202,19 +137,10 @@ def set_password(): @app.route('/api/general-stats') @login_required def get_general_stats(): - n_services = db.query(''' - SELECT COUNT (*) FROM services; - ''')[0][0] - n_regexes = db.query(''' - SELECT COUNT (*) FROM regexes; - ''')[0][0] - n_packets = db.query(''' - SELECT SUM(blocked_packets) FROM regexes; - ''')[0][0] - + n_packets = db.query("SELECT SUM(blocked_packets) FROM regexes;")[0][0] return { - 'services': n_services, - 'regexes': n_regexes, + 'services': db.query("SELECT COUNT (*) FROM services;")[0][0], + 'regexes': db.query("SELECT COUNT (*) FROM regexes;")[0][0], 'closed': n_packets if n_packets else 0 } @@ -261,54 +187,36 @@ def get_service(serv): @app.route('/api/service//stop') @login_required def get_service_stop(serv): - db.query(''' - UPDATE services SET status = 'stop' WHERE service_id = ?; - ''', (serv,)) - - return { - 'status': 'ok' - } + firewall.change_status(serv,STATUS.STOP) + return {'status': 'ok'} @app.route('/api/service//pause') @login_required def get_service_pause(serv): - db.query(''' - UPDATE services SET status = 'pause' WHERE service_id = ?; - ''', (serv,)) - - return { - 'status': 'ok' - } + firewall.change_status(serv,STATUS.PAUSE) + return {'status': 'ok'} @app.route('/api/service//start') @login_required def get_service_start(serv): - db.query(''' - UPDATE services SET status = 'wait' WHERE service_id = ?; - ''', (serv,)) - - return { - 'status': 'ok' - } + firewall.change_status(serv,STATUS.ACTIVE) + return {'status': 'ok'} @app.route('/api/service//delete') @login_required def get_service_delete(serv): db.query('DELETE FROM services WHERE service_id = ?;', (serv,)) db.query('DELETE FROM regexes WHERE service_id = ?;', (serv,)) - - return { - 'status': 'ok' - } + firewall.fire_update(serv) + return {'status': 'ok'} @app.route('/api/service//regen-port') @login_required def get_regen_port(serv): - db.query('UPDATE services SET internal_port = ? WHERE service_id = ?;', (gen_internal_port(), serv)) - return { - 'status': 'ok' - } + db.query('UPDATE services SET internal_port = ? WHERE service_id = ?;', (gen_internal_port(db), serv)) + firewall.fire_update(serv) + return {'status': 'ok'} @app.route('/api/service//regexes') @@ -346,37 +254,64 @@ def get_regex_id(regex_id): @app.route('/api/regex//delete') @login_required def get_regex_delete(regex_id): - db.query('DELETE FROM regexes WHERE regex_id = ?;', (regex_id,)) - - return { - 'status': 'ok' - } + q = db.query('SELECT * FROM regexes WHERE regex_id = ?;', (regex_id,)) + + if len(q) != 0: + db.query('DELETE FROM regexes WHERE regex_id = ?;', (regex_id,)) + firewall.fire_update(q[0][2]) + + return {'status': 'ok'} @app.route('/api/regexes/add', methods = ['POST']) @login_required def post_regexes_add(): req = request.get_json(force = True) - - db.query(''' - INSERT INTO regexes (service_id, regex, is_blacklist, mode) VALUES (?, ?, ?, ?); - ''', (req['service_id'], req['regex'], req['is_blacklist'], req['mode'])) - - return { - 'status': 'ok' - } + try: + validate( + instance=req, + schema={ + "type" : "object", + "properties" : { + "service_id" : {"type" : "string"}, + "regex" : {"type" : "string"}, + "is_blacklist" : {"type" : "boolean"}, + "mode" : {"type" : "string"}, + }, + }) + except Exception: + return abort(400) + db.query("INSERT INTO regexes (service_id, regex, is_blacklist, mode) VALUES (?, ?, ?, ?);", + (req['service_id'], req['regex'], req['is_blacklist'], req['mode'])) + + firewall.fire_update(req['service_id']) + return {'status': 'ok'} @app.route('/api/services/add', methods = ['POST']) @login_required def post_services_add(): req = request.get_json(force = True) + + try: + validate( + instance=req, + schema={ + "type" : "object", + "properties" : { + "name" : {"type" : "string"}, + "port" : {"type" : "number"} + }, + }) + except Exception: + return abort(400) + serv_id = from_name_get_id(req['name']) try: - db.query(''' - INSERT INTO services (name, service_id, internal_port, public_port, status) VALUES (?, ?, ?, ?, ?) - ''', (req['name'], serv_id, gen_internal_port(), req['port'], 'stop')) + db.query("INSERT INTO services (name, service_id, internal_port, public_port, status) VALUES (?, ?, ?, ?, ?)", + (req['name'], serv_id, gen_internal_port(db), req['port'], 'stop')) + firewall.reload() except sqlite3.IntegrityError: return {'status': 'Name or/and port of the service has been already assigned to another service'} @@ -413,4 +348,3 @@ if __name__ == '__main__': app.run(host="0.0.0.0", port=8080 ,debug=True) else: subprocess.run(["uwsgi","--socket","./uwsgi.sock","--master","--module","app:app", "--enable-threads"]) - diff --git a/backend/proxy/__init__.py b/backend/proxy/__init__.py index 12079f0..9ebe03b 100755 --- a/backend/proxy/__init__.py +++ b/backend/proxy/__init__.py @@ -1,15 +1,16 @@ -import subprocess, re +import subprocess, re, os #c++ -o proxy proxy.cpp class Filter: - def __init__(self, regex, is_blacklist=True, c_to_s=False, s_to_c=False ): + def __init__(self, regex, is_blacklist=True, c_to_s=False, s_to_c=False, blocked_packets=0, code=None): self.regex = regex self.is_blacklist = is_blacklist if c_to_s == s_to_c: c_to_s = s_to_c = True # (False, False) == (True, True) self.c_to_s = c_to_s self.s_to_c = s_to_c - self.blocked = 0 + self.blocked = blocked_packets + self.code = code def compile(self): if isinstance(self.regex, str): self.regex = self.regex.encode() @@ -20,8 +21,6 @@ class Filter: if self.s_to_c: yield "S"+self.regex.hex() if self.is_blacklist else "s"+self.regex.hex() - - class Proxy: def __init__(self, internal_port, public_port, filters=None, public_host="0.0.0.0", internal_host="127.0.0.1"): self.public_host = public_host @@ -35,8 +34,9 @@ class Proxy: if self.process is None: filter_map = self.compile_filters() filters_codes = list(filter_map.keys()) + proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"./proxy") self.process = subprocess.Popen( - ["./proxy", str(self.public_host), str(self.public_port), str(self.internal_host), str(self.internal_port), *filters_codes], + [proxy_binary_path, str(self.public_host), str(self.public_port), str(self.internal_host), str(self.internal_port), *filters_codes], stdout=subprocess.PIPE, universal_newlines=True ) for stdout_line in iter(self.process.stdout.readline, ""): @@ -68,6 +68,9 @@ class Proxy: def reload(self): if self.process: self.restart() + def isactive(self): + return True if self.process else False + def compile_filters(self): res = {} for filter_obj in self.filters: diff --git a/backend/requirements.txt b/backend/requirements.txt index 5dcca7b..bfbb5ff 100755 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -2,4 +2,5 @@ Flask uwsgi flask-cors bcrypt -kthread \ No newline at end of file +kthread +jsonschema \ No newline at end of file diff --git a/backend/utils.py b/backend/utils.py new file mode 100755 index 0000000..f3366f7 --- /dev/null +++ b/backend/utils.py @@ -0,0 +1,266 @@ +from asyncore import file_dispatcher +from proxy import Filter, Proxy +import random, string, os, threading, sqlite3, time +from kthread import KThread +from base64 import b64decode + + +class SQLite(): + def __init__(self, db_name) -> None: + self.conn = None + self.cur = None + self.db_name = db_name + self.lock = threading.Lock() + + def connect(self) -> None: + if not os.path.exists("db"): os.mkdir("db") + try: + self.conn = sqlite3.connect("db/" + self.db_name + '.db', check_same_thread = False) + except Exception: + with open(self.db_name + '.db', 'x') as f: + pass + self.conn = sqlite3.connect("db/" + self.db_name + '.db', check_same_thread = False) + + def disconnect(self) -> None: + self.conn.close() + + def check_integrity(self, tables = {}) -> None: + cur = self.conn.cursor() + for t in tables: + cur.execute(''' + SELECT name FROM sqlite_master WHERE type='table' AND name='{}'; + '''.format(t)) + + if len(cur.fetchall()) == 0: + cur.execute('''CREATE TABLE main.{}({});'''.format(t, ''.join([(c + ' ' + tables[t][c] + ', ') for c in tables[t]])[:-2])) + cur.close() + + def query(self, query, values = ()): + cur = self.conn.cursor() + try: + with self.lock: + cur.execute(query, values) + return cur.fetchall() + finally: + cur.close() + try: self.conn.commit() + except Exception: pass + +class KeyValueStorage: + def __init__(self, db): + self.db = db + + def get(self, key): + q = self.db.query('SELECT value FROM keys_values WHERE key = ?', (key,)) + if len(q) == 0: + return None + else: + return q[0][0] + + def put(self, key, value): + if self.get(key) is None: + self.db.query('INSERT INTO keys_values (key, value) VALUES (?, ?);', (key,str(value))) + else: + self.db.query('UPDATE keys_values SET value=? WHERE key = ?;', (str(value), key)) + +class STATUS: + WAIT = "wait" + STOP = "stop" + PAUSE = "pause" + ACTIVE = "active" + +class ProxyManager: + def __init__(self, db:SQLite): + self.db = db + self.proxy_table = {} + self.lock = threading.Lock() + + def __clear_proxy_table(self): + for key in self.proxy_table.keys(): + if not self.proxy_table[key]["thread"].is_alive(): + del self.proxy_table[key] + + def reload(self): + with self.lock: + self.__clear_proxy_table() + for srv_id in self.db.query('SELECT service_id, status FROM services;'): + srv_id, n_status = srv_id + if srv_id in self.proxy_table: + continue + update_signal = threading.Event() + req_status = [n_status] + thread = KThread(target=self.service_manager, args=(srv_id, req_status, update_signal)) + self.proxy_table[srv_id] = { + "thread":thread, + "event":update_signal, + "next_status":req_status + } + thread.start() + + def get_service_data(self, id): + q = self.db.query('SELECT * FROM services WHERE service_id=?;',(id,)) + if len(q) == 0: return None + srv = q[0] + filters = [{ + 'id': row[5], + 'regex': row[0], + 'is_blacklist': True if row[3] == "1" else False, + 'mode': row[1], + 'n_packets': row[4], + } for row in self.db.query('SELECT * FROM regexes WHERE service_id = ?;', (id,))] + return { + 'id': srv[1], + 'status': srv[0], + 'public_port': srv[3], + 'internal_port': srv[2], + 'filters':filters + } + + def change_status(self, id, to): + with self.lock: + if id in self.proxy_table: + if self.proxy_table[id]["thread"].is_alive(): + self.proxy_table[id]["next_status"][0] = to + self.proxy_table[id]["event"].set() + else: + del self.proxy_table[id] + + def fire_update(self, id): + with self.lock: + if id in self.proxy_table: + if self.proxy_table[id]["thread"].is_alive(): + self.proxy_table[id]["event"].set() + else: + del self.proxy_table[id] + + def __update_status_db(self, id, status): + self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", (status, id)) + + def __proxy_starter(self, id, proxy:Proxy, next_status, saved_status): + def stats_updater(filter:Filter): + self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", (filter.blocked, filter.code)) + def func(): + while True: + if check_port_is_open(proxy.public_port): + self.__update_status_db(id, next_status) + if saved_status[0] == "wait": saved_status[0] = next_status + proxy_status = proxy.start(callback=stats_updater) + if proxy_status == 1: + self.__update_status_db(id, STATUS.STOP) + return + else: + time.sleep(.5) + + thread = KThread(target=func) + thread.start() + return thread + + def service_manager(self, id, next_status, signal:threading.Event): + + proxy = None + thr_starter:KThread = None + previous_status = "stop" + filters = {} + + while True: + data = self.get_service_data(id) + + #Close thread + if data is None: + if proxy and proxy.isactive(): + proxy.stop() + return + + restart_required = False + + #Port checks + if proxy and (proxy.internal_port != data['internal_port'] or proxy.public_port != data['public_port']): + restart_required = True + + + #Filter check + old_filters = set(filters.keys()) + new_filters = set([f["id"] for f in data["filters"]]) + + #remove old filters + for f in old_filters: + if not f in new_filters: + restart_required = True + del filters[f] + + for f in new_filters: + if not f in old_filters: + restart_required = True + filter_info = [ele for ele in data['filters'] if ele["id"] == f][0] + filters[f] = Filter( + c_to_s=filter_info["mode"] in ["C","B"], + s_to_c=filter_info["mode"] in ["S","B"], + is_blacklist=filter_info["is_blacklist"], + regex=b64decode(filter_info["regex"]), + blocked_packets=filter_info["n_packets"], + code=f + ) + + #proxy status managment + if previous_status != next_status[0] or restart_required: + if (previous_status, next_status[0]) in [(STATUS.ACTIVE, STATUS.PAUSE), (STATUS.STOP, STATUS.PAUSE), (STATUS.PAUSE, STATUS.PAUSE)]: + if proxy: proxy.stop() + proxy = Proxy( + internal_port=data['internal_port'], + public_port=data['public_port'], + filters=[] + ) + previous_status = next_status[0] = STATUS.PAUSE + self.__update_status_db(id, STATUS.WAIT) + thr_starter = self.__proxy_starter(id, proxy, STATUS.PAUSE, [previous_status]) + restart_required = False + + # ACTIVE -> STOP + elif (previous_status,next_status[0]) in [(STATUS.ACTIVE, STATUS.STOP), (STATUS.WAIT, STATUS.STOP), (STATUS.PAUSE, STATUS.STOP)]: #Stop proxy + if thr_starter and thr_starter.is_alive(): thr_starter.kill() + proxy.stop() + previous_status = next_status[0] = STATUS.STOP + self.__update_status_db(id, STATUS.STOP) + restart_required = False + + elif (previous_status, next_status[0]) in [(STATUS.PAUSE, STATUS.ACTIVE), (STATUS.STOP, STATUS.ACTIVE), (STATUS.ACTIVE, STATUS.ACTIVE)]: + if proxy: proxy.stop() + proxy = Proxy( + internal_port=data['internal_port'], + public_port=data['public_port'], + filters=list(filters.values()) + ) + previous_status = next_status[0] = STATUS.ACTIVE + self.__update_status_db(id, STATUS.WAIT) + thr_starter = self.__proxy_starter(id, proxy, STATUS.ACTIVE, [previous_status]) + restart_required = False + else: + self.__update_status_db(id, previous_status) + + signal.wait() + signal.clear() + + +def check_port_is_open(port): + import socket + try: + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('0.0.0.0',port)) + sock.close() + return True + except Exception: + return False + +def from_name_get_id(name): + serv_id = name.strip().replace(" ","-") + serv_id = "".join([c for c in serv_id if c in (string.ascii_uppercase + string.ascii_lowercase + string.digits + "-")]) + return serv_id.lower() + +def gen_internal_port(db): + while True: + res = random.randint(30000, 45000) + if len(db.query('SELECT 1 FROM services WHERE internal_port = ?;', (res,))) == 0: + break + return res + diff --git a/frontend/src/components/ServiceRow/index.tsx b/frontend/src/components/ServiceRow/index.tsx index 701da1e..97510a5 100755 --- a/frontend/src/components/ServiceRow/index.tsx +++ b/frontend/src/components/ServiceRow/index.tsx @@ -84,7 +84,7 @@ function ServiceRow({ service, onClick, additional_buttons }:{ service:Service, {additional_buttons} {["pause","wait"].includes(service.status)? setStopModal(true)} size="xl" radius="md" variant="filled" disabled={service.status === "stop"}> :