136 lines
3.8 KiB
Python
136 lines
3.8 KiB
Python
import asyncio
|
|
from modules.firewall.nftables import FiregexTables
|
|
from modules.firewall.models import *
|
|
from utils.sqlite import SQLite
|
|
from modules.firewall.models import Action
|
|
|
|
nft = FiregexTables()
|
|
|
|
class FirewallManager:
|
|
def __init__(self, db:SQLite):
|
|
self.db = db
|
|
self.lock = asyncio.Lock()
|
|
|
|
async def close(self):
|
|
async with self.lock:
|
|
nft.reset()
|
|
|
|
async def init(self):
|
|
nft.init()
|
|
await self.reload()
|
|
|
|
async def reload(self):
|
|
async with self.lock:
|
|
if self.enabled:
|
|
nft.set(
|
|
map(Rule.from_dict, self.db.query('SELECT * FROM rules WHERE active = 1 ORDER BY rule_id;')),
|
|
policy=self.policy,
|
|
opt=self.settings
|
|
)
|
|
else:
|
|
nft.reset()
|
|
|
|
@property
|
|
def settings(self):
|
|
return FirewallSettings(
|
|
keep_rules=self.keep_rules,
|
|
allow_loopback=self.allow_loopback,
|
|
allow_established=self.allow_established,
|
|
allow_icmp=self.allow_icmp,
|
|
multicast_dns=self.multicast_dns,
|
|
allow_upnp=self.allow_upnp,
|
|
drop_invalid=self.drop_invalid,
|
|
allow_dhcp=self.allow_dhcp
|
|
)
|
|
|
|
@settings.setter
|
|
def settings(self, value:FirewallSettings):
|
|
self.keep_rules = value.keep_rules
|
|
self.allow_loopback=value.allow_loopback
|
|
self.allow_established=value.allow_established
|
|
self.allow_icmp=value.allow_icmp
|
|
self.multicast_dns=value.multicast_dns
|
|
self.allow_upnp=value.allow_upnp
|
|
self.drop_invalid=value.drop_invalid
|
|
self.allow_dhcp=value.allow_dhcp
|
|
|
|
@property
|
|
def policy(self):
|
|
return self.db.get("POLICY", Action.ACCEPT)
|
|
|
|
@policy.setter
|
|
def policy(self, value):
|
|
self.db.set("POLICY", value)
|
|
|
|
@property
|
|
def enabled(self):
|
|
return self.db.get("ENABLED", "0") == "1"
|
|
|
|
@enabled.setter
|
|
def enabled(self, value):
|
|
self.db.set("ENABLED", "1" if value else "0")
|
|
|
|
@property
|
|
def keep_rules(self):
|
|
return self.db.get("keep_rules", "0") == "1"
|
|
|
|
@keep_rules.setter
|
|
def keep_rules(self, value):
|
|
self.db.set("keep_rules", "1" if value else "0")
|
|
|
|
@property
|
|
def allow_loopback(self):
|
|
return self.db.get("allow_loopback", "1") == "1"
|
|
|
|
@allow_loopback.setter
|
|
def allow_loopback(self, value):
|
|
self.db.set("allow_loopback", "1" if value else "0")
|
|
|
|
@property
|
|
def allow_icmp(self):
|
|
return self.db.get("allow_icmp", "1") == "1"
|
|
|
|
@allow_icmp.setter
|
|
def allow_icmp(self, value):
|
|
self.db.set("allow_icmp", "1" if value else "0")
|
|
|
|
@property
|
|
def allow_established(self):
|
|
return self.db.get("allow_established", "1") == "1"
|
|
|
|
@allow_established.setter
|
|
def allow_established(self, value):
|
|
self.db.set("allow_established", "1" if value else "0")
|
|
|
|
@property
|
|
def multicast_dns(self):
|
|
return self.db.get("multicast_dns", "1") == "1"
|
|
|
|
@multicast_dns.setter
|
|
def multicast_dns(self, value):
|
|
self.db.set("multicast_dns", "1" if value else "0")
|
|
|
|
@property
|
|
def allow_upnp(self):
|
|
return self.db.get("allow_upnp", "1") == "1"
|
|
|
|
@allow_upnp.setter
|
|
def allow_upnp(self, value):
|
|
self.db.set("allow_upnp", "1" if value else "0")
|
|
|
|
@property
|
|
def drop_invalid(self):
|
|
return self.db.get("drop_invalid", "1") == "1"
|
|
|
|
@drop_invalid.setter
|
|
def drop_invalid(self, value):
|
|
self.db.set("drop_invalid", "1" if value else "0")
|
|
|
|
@property
|
|
def allow_dhcp(self):
|
|
return self.db.get("allow_dhcp", "1") == "1"
|
|
|
|
@drop_invalid.setter
|
|
def allow_dhcp(self, value):
|
|
self.db.set("allow_dhcp", "1" if value else "0")
|