add: default policy
This commit is contained in:
@@ -19,7 +19,11 @@ class RuleModel(BaseModel):
|
||||
port_dst_to: PortType
|
||||
action: str
|
||||
mode:str
|
||||
|
||||
|
||||
class RuleForm(BaseModel):
|
||||
rules: list[RuleModel]
|
||||
policy: str
|
||||
|
||||
class RuleAddResponse(BaseModel):
|
||||
status:str|list[dict]
|
||||
|
||||
@@ -51,6 +55,8 @@ db = SQLite('db/firewall-rules.db', {
|
||||
]
|
||||
})
|
||||
|
||||
firewall = FirewallManager(db)
|
||||
|
||||
async def reset(params: ResetRequest):
|
||||
if not params.delete:
|
||||
db.backup()
|
||||
@@ -79,18 +85,18 @@ async def apply_changes():
|
||||
await refresh_frontend()
|
||||
return {'status': 'ok'}
|
||||
|
||||
firewall = FirewallManager(db)
|
||||
|
||||
@app.get('/stats', response_model=GeneralStatModel)
|
||||
async def get_general_stats():
|
||||
"""Get firegex general status about rules"""
|
||||
return db.query("SELECT (SELECT COUNT(*) FROM rules) rules")[0]
|
||||
|
||||
@app.get('/rules', response_model=list[RuleModel])
|
||||
@app.get('/rules', response_model=RuleForm)
|
||||
async def get_rule_list():
|
||||
"""Get the list of existent firegex rules"""
|
||||
return db.query("SELECT active, name, proto, ip_src, ip_dst, port_src_from, port_dst_from, port_src_to, port_dst_to, action, mode FROM rules ORDER BY rule_id;")
|
||||
|
||||
return {
|
||||
"policy": db.get("POLICY", "accept"),
|
||||
"rules": db.query("SELECT active, name, proto, ip_src, ip_dst, port_src_from, port_dst_from, port_src_to, port_dst_to, action, mode FROM rules ORDER BY rule_id;")
|
||||
}
|
||||
@app.get('/rule/{rule_id}/disable', response_model=StatusMessageModel)
|
||||
async def service_disable(rule_id: str):
|
||||
"""Request disabling a specific rule"""
|
||||
@@ -141,10 +147,12 @@ def parse_and_check_rule(rule:RuleModel):
|
||||
|
||||
|
||||
@app.post('/rules/set', response_model=RuleAddResponse)
|
||||
async def add_new_service(form: list[RuleModel]):
|
||||
async def add_new_service(form: RuleForm):
|
||||
"""Add a new service"""
|
||||
form = [parse_and_check_rule(ele) for ele in form]
|
||||
errors = [({"rule":i} | ele) for i, ele in enumerate(form) if isinstance(ele, dict)]
|
||||
if form.policy not in ["accept", "drop", "reject"]:
|
||||
return {"status": "Invalid policy"}
|
||||
rules = [parse_and_check_rule(ele) for ele in form.rules]
|
||||
errors = [({"rule":i} | ele) for i, ele in enumerate(rules) if isinstance(ele, dict)]
|
||||
if len(errors) > 0:
|
||||
return {'status': errors}
|
||||
try:
|
||||
@@ -164,8 +172,9 @@ async def add_new_service(form: list[RuleModel]):
|
||||
ele.port_src_from, ele.port_dst_from,
|
||||
ele.port_src_to, ele.port_dst_to,
|
||||
ele.action, ele.mode
|
||||
) for rid, ele in enumerate(form)]
|
||||
) for rid, ele in enumerate(rules)]
|
||||
)
|
||||
db.set("POLICY", form.policy)
|
||||
except sqlite3.IntegrityError:
|
||||
return {'status': 'Error saving the rules: maybe there are duplicated rules'}
|
||||
return await apply_changes()
|
||||
|
||||
Reference in New Issue
Block a user