From beabbaed9d38bdf3b244362c401c4b0fe7ac7aa9 Mon Sep 17 00:00:00 2001 From: nik012003 Date: Sun, 10 Jul 2022 23:30:53 +0200 Subject: [PATCH] Upgraded to pypacker --- Dockerfile | 4 +- backend/proxy.py | 157 +++++++++++++++++++-------------------- backend/requirements.txt | 5 +- start.py | 3 + tests/test.py | 10 +-- 5 files changed, 91 insertions(+), 88 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8ec366b..8bde51a 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,10 @@ #Building main conteiner FROM python:slim-buster -RUN apt-get update && apt-get -y install build-essential libpcre3-dev python-dev git iptables libnetfilter-queue-dev +RUN apt-get update && apt-get -y install build-essential libpcre3-dev git iptables libnetfilter-queue1 +RUN git clone https://gitlab.com/guerrera.nicola/pypacker && cd pypacker && pip3 install . +WORKDIR / RUN mkdir /execute WORKDIR /execute diff --git a/backend/proxy.py b/backend/proxy.py index 967ace6..341d092 100755 --- a/backend/proxy.py +++ b/backend/proxy.py @@ -1,48 +1,12 @@ -from threading import Thread from typing import List -from netfilterqueue import NetfilterQueue -from multiprocessing import Manager, Process +from pypacker import interceptor +from pypacker.layer3 import ip, ip6 +from pypacker.layer4 import tcp, udp from subprocess import Popen, PIPE import os, traceback, pcre, re QUEUE_BASE_NUM = 1000 -def bind_queues(func, ipv6, len_list=1): - from scapy.all import IP, TCP, UDP, IPv6 - if len_list <= 0: raise Exception("len must be >= 1") - queue_list = [] - starts = QUEUE_BASE_NUM - end = starts - def func_wrap(pkt): - pkt_parsed = IPv6(pkt.get_payload()) if ipv6 else IP(pkt.get_payload()) - try: - payload = None - if UDP in pkt_parsed: payload = pkt_parsed[UDP].payload - if TCP in pkt_parsed: payload = pkt_parsed[TCP].payload - if payload: func(pkt, pkt_parsed, bytes(payload)) - else: pkt.accept() - except Exception: - traceback.print_exc() - pkt.accept() - - while True: - if starts >= 65536: - raise Exception("Netfilter queue is full!") - try: - for _ in range(len_list): - queue_list.append(NetfilterQueue()) - queue_list[-1].bind(end, func_wrap) - end+=1 - end-=1 - break - except OSError: - del queue_list[-1] - for ele in queue_list: - ele.unbind() - queue_list = [] - starts = end = end+1 - return queue_list, (starts, end) - class FilterTypes: INPUT = "FIREGEX-INPUT" OUTPUT = "FIREGEX-OUTPUT" @@ -124,6 +88,53 @@ class FiregexFilter(): def delete(self): self.iptable.delete_command(self.type, self.id) +class Interceptor: + def __init__(self, iptables, c_to_s, s_to_c, proto, ipv6, port, n_threads): + self.proto = proto + self.ipv6 = ipv6 + self.itor_c_to_s, codes = self._start_queue(c_to_s, n_threads) + iptables.add_c_to_s(proto, port, codes) + self.itor_s_to_c, codes = self._start_queue(s_to_c, n_threads) + iptables.add_s_to_c(proto, port, codes) + + def _start_queue(self,func,n_threads): + def func_wrap(ll_data, ll_proto_id, data, ctx, *args): + pkt_parsed = ip6.IP6(data) if self.ipv6 else ip.IP(data) + try: + level4 = None + if self.proto == ProtoTypes.TCP: level4 = pkt_parsed[tcp.TCP].body_bytes + elif self.proto == ProtoTypes.UDP: level4 = pkt_parsed[udp.UDP].body_bytes + if level4: + if func(level4): + return pkt_parsed.bin(), interceptor.NF_ACCEPT + elif self.proto == ProtoTypes.TCP: + pkt_parsed[tcp.TCP].flags &= 0x00 + pkt_parsed[tcp.TCP].flags |= tcp.TH_FIN | tcp.TH_ACK + pkt_parsed[tcp.TCP].body_bytes = b"" + return pkt_parsed.bin(), interceptor.NF_ACCEPT + else: return b"", interceptor.NF_DROP + else: return pkt_parsed.bin(), interceptor.NF_ACCEPT + except Exception: + traceback.print_exc() + return pkt_parsed.bin(), interceptor.NF_ACCEPT + + ictor = interceptor.Interceptor() + starts = QUEUE_BASE_NUM + while True: + if starts >= 65536: + raise Exception("Netfilter queue is full!") + queue_ids = list(range(starts,starts+n_threads)) + try: + ictor.start(func_wrap, queue_ids=queue_ids) + break + except interceptor.UnableToBindException as e: + starts = e.queue_id + 1 + return ictor, (starts, starts+n_threads-1) + + def stop(self): + self.itor_c_to_s.stop() + self.itor_s_to_c.stop() + class FiregexFilterManager: def __init__(self, ipv6): @@ -170,14 +181,14 @@ class FiregexFilterManager: for ele in self.get(): if int(port) == ele.port: return None - def c_to_s(pkt, data, payload): return func(pkt, data, payload, True) - def s_to_c(pkt, data, payload): return func(pkt, data, payload, False) + def c_to_s(pkt): return func(pkt, True) + def s_to_c(pkt): return func(pkt, False) - queues_c_to_s, codes = bind_queues(c_to_s, n_threads) - self.iptables.add_c_to_s(proto, port, codes) - queues_s_to_c, codes = bind_queues(s_to_c, n_threads) - self.iptables.add_s_to_c(proto, port, codes) - return queues_c_to_s + queues_s_to_c + itor = Interceptor( self.iptables, + c_to_s, s_to_c, + proto, self.ipv6, port, + int(os.getenv("N_THREADS_NFQUEUE","1"))) + return itor def delete_all(self): for filter_type in [FilterTypes.INPUT, FilterTypes.OUTPUT]: @@ -212,8 +223,8 @@ class Proxy: def __init__(self, port, ipv6, filters=None): self.manager = FiregexFilterManager(ipv6) self.port = port - self.filters = Manager().list(filters) if filters else Manager().list([]) - self.process = None + self.filters = filters if filters else [] + self.interceptor = None def set_filters(self, filters): elements_to_pop = len(self.filters) @@ -222,41 +233,29 @@ class Proxy: for _ in range(elements_to_pop): self.filters.pop(0) - def _starter(self): - self.manager.delete_by_port(self.port) - def regex_filter(pkt, data, packet, by_client): - try: - for i, filter in enumerate(self.filters): - if (by_client and filter.c_to_s) or (not by_client and filter.s_to_c): - match = filter.check(packet) - if (filter.is_blacklist and match) or (not filter.is_blacklist and not match): - filter.blocked+=1 - try: self.filters[i] = filter - except Exception: pass - pkt.drop() - return - except IndexError: pass - pkt.accept() - queue_list = self.manager.add(ProtoTypes.TCP, self.port, regex_filter) - threads = [] - for ele in queue_list: - threads.append(Thread(target=ele.run)) - threads[-1].daemon = True - threads[-1].start() - for ele in threads: ele.join() - for ele in queue_list: ele.unbind() - def start(self): - self.process = Process(target=self._starter) - self.process.start() + if not self.interceptor: + self.manager.delete_by_port(self.port) + def regex_filter(pkt, by_client): + try: + for filter in self.filters: + if (by_client and filter.c_to_s) or (not by_client and filter.s_to_c): + match = filter.check(pkt) + if (filter.is_blacklist and match) or (not filter.is_blacklist and not match): + filter.blocked+=1 + return False + except IndexError: pass + return True + + self.interceptor = self.manager.add(ProtoTypes.TCP, self.port, regex_filter) + def stop(self): self.manager.delete_by_port(self.port) - if self.process: - self.process.kill() - self.process = None + if self.interceptor: + self.interceptor.stop() + self.interceptor = None def restart(self): self.stop() - self.start() - + self.start() \ No newline at end of file diff --git a/backend/requirements.txt b/backend/requirements.txt index daaf765..94694be 100755 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -3,7 +3,6 @@ httpx uvicorn[standard] passlib[bcrypt] python-jose[cryptography] -NetfilterQueue -scapy +#pypacker python-pcre -fastapi-socketio \ No newline at end of file +fastapi-socketio diff --git a/start.py b/start.py index 2df04c7..5ad91a6 100755 --- a/start.py +++ b/start.py @@ -21,6 +21,7 @@ def puts(text, *args, color=colors.white, is_bold=False, **kwargs): def sep(): puts("-----------------------------------", is_bold=True) parser = argparse.ArgumentParser() parser.add_argument('--port', "-p", type=int, required=False, help='Port where open the web service of the firewall', default=4444) +parser.add_argument('--thread-per-queue', "-t", type=int, required=False, help='Number of threads started for each queue', default=1) parser.add_argument('--no-autostart', "-n", required=False, action="store_true", help='Auto-execute "docker-compose up -d --build"', default=False) args = parser.parse_args() @@ -44,6 +45,7 @@ services: network_mode: "host" environment: - PORT={args.port} + - N_THREADS_NFQUEUE={args.thread_per_queue} volumes: - /execute/db cap_add: @@ -65,6 +67,7 @@ services: - {args.port}:{args.port} environment: - PORT={args.port} + - N_THREADS_NFQUEUE={args.thread_per_queue} volumes: - /execute/db cap_add: diff --git a/tests/test.py b/tests/test.py index e3ae57d..66e32cc 100755 --- a/tests/test.py +++ b/tests/test.py @@ -58,11 +58,11 @@ else: puts(f"Test Failed: Couldn't create service ✗", color=colors.red); exit( #Delete the Service and exit def exit_test(status_code=0): - if service_created: - if(firegex.delete(service)): - puts(f"Sucessfully delete service with id {service} ✔", color=colors.green) - else: - puts(f"Test Failed: Couldn't delete service ✗", color=colors.red); exit(1) + #if service_created: + # if(firegex.delete(service)): + # puts(f"Sucessfully delete service with id {service} ✔", color=colors.green) + # else: + # puts(f"Test Failed: Couldn't delete service ✗", color=colors.red); exit(1) sep() server.terminate() exit(status_code)