Improve stability and functionalities

This commit is contained in:
DomySh
2022-06-30 15:58:03 +02:00
parent 4174654c3c
commit 02124c817b
16 changed files with 210 additions and 123 deletions

View File

@@ -36,8 +36,9 @@ def JWT_SECRET(): return conf.get("secret")
@app.on_event("shutdown")
async def shutdown_event():
await firewall.close()
db.disconnect()
await firewall.close()
@app.on_event("startup")
async def startup_event():
@@ -197,7 +198,7 @@ async def get_service_regexes(service_id: str, auth: bool = Depends(is_loggined)
return db.query("""
SELECT
regex, mode, regex_id `id`, service_id, is_blacklist,
blocked_packets n_packets, is_case_sensitive
blocked_packets n_packets, is_case_sensitive, active
FROM regexes WHERE service_id = ?;
""", service_id)
@@ -206,7 +207,7 @@ async def get_regex_id(regex_id: int, auth: bool = Depends(is_loggined)):
res = db.query("""
SELECT
regex, mode, regex_id `id`, service_id, is_blacklist,
blocked_packets n_packets, is_case_sensitive
blocked_packets n_packets, is_case_sensitive, active
FROM regexes WHERE `id` = ?;
""", regex_id)
if len(res) == 0: raise HTTPException(status_code=400, detail="This regex does not exists!")
@@ -221,6 +222,22 @@ async def get_regex_delete(regex_id: int, auth: bool = Depends(is_loggined)):
return {'status': 'ok'}
@app.get('/api/regex/{regex_id}/enable')
async def get_regex_delete(regex_id: int, auth: bool = Depends(is_loggined)):
res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id)
if len(res) != 0:
db.query('UPDATE regexes SET active=1 WHERE regex_id = ?;', regex_id)
await firewall.get(res[0]["service_id"]).update_filters()
return {'status': 'ok'}
@app.get('/api/regex/{regex_id}/disable')
async def get_regex_delete(regex_id: int, auth: bool = Depends(is_loggined)):
res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id)
if len(res) != 0:
db.query('UPDATE regexes SET active=0 WHERE regex_id = ?;', regex_id)
await firewall.get(res[0]["service_id"]).update_filters()
return {'status': 'ok'}
class RegexAddForm(BaseModel):
service_id: str
regex: str
@@ -261,7 +278,7 @@ async def post_services_add(form: ServiceAddForm, auth: bool = Depends(is_loggin
async def frontend_debug_proxy(path):
httpc = httpx.AsyncClient()
req = httpc.build_request("GET",urllib.parse.urljoin(f"http://0.0.0.0:{os.getenv('F_PORT','3000')}", path))
req = httpc.build_request("GET",urllib.parse.urljoin(f"http://127.0.0.1:{os.getenv('F_PORT','3000')}", path))
resp = await httpc.send(req, stream=True)
return StreamingResponse(resp.aiter_bytes(),status_code=resp.status_code)
@@ -287,7 +304,7 @@ if DEBUG:
@app.websocket("/ws")
async def websocket_debug_proxy(ws: WebSocket):
await ws.accept()
async with websockets.connect(f"ws://0.0.0.0:{os.getenv('F_PORT','3000')}/ws") as ws_b_client:
async with websockets.connect(f"ws://127.0.0.1:{os.getenv('F_PORT','3000')}/ws") as ws_b_client:
fwd_task = asyncio.create_task(forward_websocket(ws, ws_b_client))
rev_task = asyncio.create_task(reverse_websocket(ws, ws_b_client))
await asyncio.gather(fwd_task, rev_task)
@@ -298,7 +315,7 @@ async def catch_all(full_path:str):
try:
return await frontend_debug_proxy(full_path)
except Exception:
return {"details":"Frontend not started at "+f"http://0.0.0.0:{os.getenv('F_PORT','3000')}"}
return {"details":"Frontend not started at "+f"http://127.0.0.1:{os.getenv('F_PORT','3000')}"}
else: return await react_deploy(full_path)

View File

@@ -40,7 +40,7 @@ class Proxy:
if not self.isactive():
try:
self.filter_map = self.compile_filters()
filters_codes = list(self.filter_map.keys()) if not in_pause else []
filters_codes = self.get_filter_codes() if not in_pause else []
proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"./proxy")
self.process = await asyncio.create_subprocess_exec(
@@ -57,8 +57,9 @@ class Proxy:
if stdout_line.startswith("BLOCKED"):
regex_id = stdout_line.split()[1]
async with self.filter_map_lock:
self.filter_map[regex_id].blocked+=1
if self.callback_blocked_update: self.callback_blocked_update(self.filter_map[regex_id])
if regex_id in self.filter_map:
self.filter_map[regex_id].blocked+=1
if self.callback_blocked_update: self.callback_blocked_update(self.filter_map[regex_id])
except Exception:
return await self.process.wait()
else:
@@ -87,8 +88,13 @@ class Proxy:
if self.isactive():
async with self.filter_map_lock:
self.filter_map = self.compile_filters()
filters_codes = list(self.filter_map.keys())
filters_codes = self.get_filter_codes()
await self.update_config(filters_codes)
def get_filter_codes(self):
filters_codes = list(self.filter_map.keys())
filters_codes.sort(key=lambda a: self.filter_map[a].blocked, reverse=True)
return filters_codes
def isactive(self):
return self.process and self.process.returncode is None

View File

@@ -66,6 +66,7 @@ class SQLite():
'blocked_packets': 'INTEGER UNSIGNED NOT NULL DEFAULT 0',
'regex_id': 'INTEGER PRIMARY KEY',
'is_case_sensitive' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1))',
'active' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1)) DEFAULT 1',
'FOREIGN KEY (service_id)':'REFERENCES services (service_id)',
},
'keys_values': {
@@ -109,6 +110,7 @@ class ServiceManager:
callback_blocked_update=self._stats_updater
)
self.status = STATUS.STOP
self.wanted_status = STATUS.STOP
self.filters = {}
self._update_port_from_db()
self._update_filters_from_db()
@@ -131,7 +133,7 @@ class ServiceManager:
SELECT
regex, mode, regex_id `id`, is_blacklist,
blocked_packets n_packets, is_case_sensitive
FROM regexes WHERE service_id = ?;
FROM regexes WHERE service_id = ? AND active=1;
""", self.id)
#Filter check
@@ -161,27 +163,31 @@ class ServiceManager:
self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", status, self.id)
async def next(self,to):
async with self.lock:
if self.status != to:
# ACTIVE -> PAUSE or PAUSE -> ACTIVE
if (self.status, to) in [(STATUS.ACTIVE, STATUS.PAUSE)]:
await self.proxy.pause()
self._set_status(to)
async with self.lock:
return await self._next(to)
async def _next(self, to):
if self.status != to:
# ACTIVE -> PAUSE or PAUSE -> ACTIVE
if (self.status, to) in [(STATUS.ACTIVE, STATUS.PAUSE)]:
await self.proxy.pause()
self._set_status(to)
elif (self.status, to) in [(STATUS.PAUSE, STATUS.ACTIVE)]:
await self.proxy.reload()
self._set_status(to)
elif (self.status, to) in [(STATUS.PAUSE, STATUS.ACTIVE)]:
await self.proxy.reload()
self._set_status(to)
# ACTIVE -> STOP
elif (self.status,to) in [(STATUS.ACTIVE, STATUS.STOP), (STATUS.WAIT, STATUS.STOP), (STATUS.PAUSE, STATUS.STOP)]: #Stop proxy
if self.starter: self.starter.cancel()
await self.proxy.stop()
self._set_status(to)
# ACTIVE -> STOP
elif (self.status,to) in [(STATUS.ACTIVE, STATUS.STOP), (STATUS.WAIT, STATUS.STOP), (STATUS.PAUSE, STATUS.STOP)]: #Stop proxy
if self.starter: self.starter.cancel()
await self.proxy.stop()
self._set_status(to)
# STOP -> ACTIVE or STOP -> PAUSE
elif (self.status, to) in [(STATUS.STOP, STATUS.ACTIVE), (STATUS.STOP, STATUS.PAUSE)]:
self._set_status(STATUS.WAIT)
self.__proxy_starter(to)
# STOP -> ACTIVE or STOP -> PAUSE
elif (self.status, to) in [(STATUS.STOP, STATUS.ACTIVE), (STATUS.STOP, STATUS.PAUSE)]:
self.wanted_status = to
self._set_status(STATUS.WAIT)
self.__proxy_starter(to)
def _stats_updater(self,filter:Filter):
@@ -191,7 +197,9 @@ class ServiceManager:
async with self.lock:
self._update_port_from_db()
if self.status in [STATUS.PAUSE, STATUS.ACTIVE]:
await self.proxy.restart(in_pause=(self.status == STATUS.PAUSE))
next_status = self.status if self.status != STATUS.WAIT else self.wanted_status
await self._next(STATUS.STOP)
await self._next(next_status)
def _set_status(self,status):
self.status = status
@@ -233,7 +241,7 @@ class ProxyManager:
async def remove(self,id):
async with self.lock:
if id in self.proxy_table:
await self.proxy_table[id].proxy.stop()
await self.proxy_table[id].next(STATUS.STOP)
del self.proxy_table[id]
async def reload(self):