diff --git a/.dockerignore b/.dockerignore index 897cfef..0758864 100644 --- a/.dockerignore +++ b/.dockerignore @@ -19,7 +19,7 @@ Dockerfile /frontend/build/ /frontend/build/** /frontend/node_modules/ -/backend/modules/cppqueue +/backend/modules/cppregex /backend/modules/proxy docker-compose.yml diff --git a/.gitignore b/.gitignore index 03e9b7f..cf7f774 100644 --- a/.gitignore +++ b/.gitignore @@ -21,8 +21,10 @@ /frontend/build/** /frontend/dist/ /frontend/dist/** -/backend/modules/cppqueue -/backend/binsrc/cppqueue +/backend/modules/cppregex +/backend/binsrc/cppregex +/backend/modules/cpproxy +/backend/binsrc/cpproxy /backend/modules/proxy /docker-compose.yml /firegex-compose.yml diff --git a/Dockerfile b/Dockerfile index d26b7a4..0bdf8ae 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,9 +15,9 @@ RUN bun run build #Building main conteiner FROM --platform=$TARGETARCH registry.fedoraproject.org/fedora:latest -RUN dnf -y update && dnf install -y python3-pip @development-tools gcc-c++ \ - libnetfilter_queue-devel libnfnetlink-devel libmnl-devel libcap-ng-utils \ - nftables vectorscan-devel libtins-devel python3-nftables libpcap-devel boost-devel +RUN dnf -y update && dnf install -y python3-devel python3-pip @development-tools gcc-c++ \ + libnetfilter_queue-devel libnfnetlink-devel libmnl-devel libcap-ng-utils nftables \ + vectorscan-devel libtins-devel python3-nftables libpcap-devel boost-devel RUN mkdir -p /execute/modules WORKDIR /execute @@ -26,8 +26,8 @@ ADD ./backend/requirements.txt /execute/requirements.txt RUN pip3 install --no-cache-dir --break-system-packages -r /execute/requirements.txt --no-warn-script-location COPY ./backend/binsrc /execute/binsrc -RUN g++ binsrc/nfqueue.cpp -o modules/cppqueue -std=c++23 -O3 -lnetfilter_queue -pthread -lnfnetlink $(pkg-config --cflags --libs libtins libhs libmnl) -#RUN g++ binsrc/nfproxy-tun.cpp -o modules/cpproxy -std=c++23 -O3 -lnetfilter_queue -pthread -lnfnetlink $(pkg-config --cflags --libs libtins libmnl) +RUN g++ binsrc/nfregex.cpp -o modules/cppregex -std=c++23 -O3 -lnetfilter_queue -pthread -lnfnetlink $(pkg-config --cflags --libs libtins libhs libmnl) +RUN g++ binsrc/nfproxy-tun.cpp -o modules/cpproxy -std=c++23 -O3 -lnetfilter_queue -pthread -lnfnetlink $(pkg-config --cflags --libs libtins libmnl python3) COPY ./backend/ /execute/ COPY --from=frontend /app/dist/ ./frontend/ diff --git a/backend/app.py b/backend/app.py index 5192f6a..b6646f2 100644 --- a/backend/app.py +++ b/backend/app.py @@ -42,14 +42,6 @@ app = FastAPI( title="Firegex API", version=API_VERSION, ) -utils.socketio = socketio.AsyncServer( - async_mode="asgi", - cors_allowed_origins=[], - transports=["websocket"] -) - -sio_app = socketio.ASGIApp(utils.socketio, socketio_path="/sock/socket.io", other_asgi_app=app) -app.mount("/sock", sio_app) if DEBUG: app.add_middleware( @@ -61,6 +53,15 @@ if DEBUG: ) +utils.socketio = socketio.AsyncServer( + async_mode="asgi", + cors_allowed_origins=[], + transports=["websocket"] +) + +sio_app = socketio.ASGIApp(utils.socketio, socketio_path="/sock/socket.io", other_asgi_app=app) +app.mount("/sock", sio_app) + def APP_STATUS(): return "init" if db.get("password") is None else "run" def JWT_SECRET(): return db.get("secret") @@ -197,10 +198,11 @@ if __name__ == '__main__': os.chdir(os.path.dirname(os.path.realpath(__file__))) uvicorn.run( "app:app", - host=None, #"::" if DEBUG else None, + host="::" if DEBUG else None, port=FIREGEX_PORT, - reload=False,#DEBUG, + reload=DEBUG, access_log=True, - workers=1, # Multiple workers will cause a crash due to the creation - # of multiple processes with separated memory + workers=1, # Firewall module can't be replicated in multiple workers + # Later the firewall module will be moved to a separate process + # The webserver will communicate using redis (redis is also needed for websockets) ) diff --git a/backend/binsrc/classes/netfilter.cpp b/backend/binsrc/classes/netfilter.cpp index cb58213..794a76e 100644 --- a/backend/binsrc/classes/netfilter.cpp +++ b/backend/binsrc/classes/netfilter.cpp @@ -1,244 +1,101 @@ -#include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include -#include - -using namespace std; +#include +#include "../utils.cpp" +#include "nfqueue.cpp" #ifndef NETFILTER_CLASS_CPP #define NETFILTER_CLASS_CPP -typedef int QueueCallbackFunction(const nlmsghdr *, const mnl_socket*, void *); +namespace Firegex { +namespace NfQueue { -struct nfqueue_execution_data_tmp{ - mnl_socket* nl = nullptr; - function queue_cb = nullptr; - void *data = nullptr; -}; +template +class ThreadNfQueue { +public: + ThreadNfQueue() = default; + virtual ~ThreadNfQueue() = default; -class NfQueueExecutor { - private: - size_t BUF_SIZE = 0xffff + (MNL_SOCKET_BUFFER_SIZE/2); - char *queue_msg_buffer = nullptr; - QueueCallbackFunction * _queue_callback_hook = nullptr; - public: + std::thread thr; + BlockingQueue*> queue; - unsigned int portid; - u_int16_t queue_num; - mnl_socket* nl = nullptr; - - NfQueueExecutor(u_int16_t queue_num, QueueCallbackFunction* queue_cb): queue_num(queue_num), _queue_callback_hook(queue_cb){ - nl = mnl_socket_open(NETLINK_NETFILTER); - - if (nl == nullptr) { throw runtime_error( "mnl_socket_open" );} - - if (mnl_socket_bind(nl, 0, MNL_SOCKET_AUTOPID) < 0) { - mnl_socket_close(nl); - throw runtime_error( "mnl_socket_bind" ); - } - portid = mnl_socket_get_portid(nl); - - queue_msg_buffer = (char*) malloc(BUF_SIZE); - - if (!queue_msg_buffer) { - mnl_socket_close(nl); - throw runtime_error( "allocate receive buffer" ); - } - - if (_send_config_cmd(NFQNL_CFG_CMD_BIND) < 0) { - _clear(); - throw runtime_error( "mnl_socket_send" ); - } - //TEST if BIND was successful - if (_send_config_cmd(NFQNL_CFG_CMD_NONE) < 0) { // SEND A NONE cmmand to generate an error meessage - _clear(); - throw runtime_error( "mnl_socket_send" ); - } - if (_recv_packet() == -1) { //RECV the error message - _clear(); - throw runtime_error( "mnl_socket_recvfrom" ); - } - - struct nlmsghdr *nlh = (struct nlmsghdr *) queue_msg_buffer; - - if (nlh->nlmsg_type != NLMSG_ERROR) { - _clear(); - throw runtime_error( "unexpected packet from kernel (expected NLMSG_ERROR packet)" ); - } - //nfqnl_msg_config_cmd - nlmsgerr* error_msg = (nlmsgerr *)mnl_nlmsg_get_payload(nlh); - - // error code taken from the linux kernel: - // https://elixir.bootlin.com/linux/v5.18.12/source/include/linux/errno.h#L27 - #define ENOTSUPP 524 /* Operation is not supported */ - - if (error_msg->error != -ENOTSUPP) { - _clear(); - throw invalid_argument( "queueid is already busy" ); - } - - //END TESTING BIND - nlh = nfq_nlmsg_put(queue_msg_buffer, NFQNL_MSG_CONFIG, queue_num); - nfq_nlmsg_cfg_put_params(nlh, NFQNL_COPY_PACKET, 0xffff); - - mnl_attr_put_u32(nlh, NFQA_CFG_FLAGS, htonl(NFQA_CFG_F_GSO)); - mnl_attr_put_u32(nlh, NFQA_CFG_MASK, htonl(NFQA_CFG_F_GSO)); - - if (mnl_socket_sendto(nl, nlh, nlh->nlmsg_len) < 0) { - _clear(); - throw runtime_error( "mnl_socket_send" ); - } - - } - - NfQueueExecutor(u_int16_t queue_num): NfQueueExecutor(queue_num, nullptr) {} - - // --- Functions to be implemented by the user - - virtual void before_loop() { - // Do nothing by default - } - - virtual void * callback_data_fetch(){ - return nullptr; - } - - // --- End of functions to be implemented by the user - - void run(){ - /* - * ENOBUFS is signalled to userspace when packets were lost - * on kernel side. In most cases, userspace isn't interested - * in this information, so turn it off. - */ - int ret = 1; - mnl_socket_setsockopt(nl, NETLINK_NO_ENOBUFS, &ret, sizeof(int)); - - before_loop(); - - for (;;) { - ret = _recv_packet(); - if (ret == -1) { - throw runtime_error( "mnl_socket_recvfrom" ); - } - nfqueue_execution_data_tmp data = { - nl: nl, - queue_cb: _queue_callback_hook, - data: callback_data_fetch() - }; - ret = mnl_cb_run(queue_msg_buffer, ret, 0, portid, _real_queue_cb, &data); - if (ret < 0){ - throw runtime_error( "mnl_cb_run" ); - } - } - } - - - ~NfQueueExecutor() { - _send_config_cmd(NFQNL_CFG_CMD_UNBIND); - _clear(); - } + virtual void before_loop() {} + virtual void handle_next_packet(PktRequest* pkt){} - private: - - static int _real_queue_cb(const nlmsghdr *nlh, void *data_ptr) { - nfqueue_execution_data_tmp* info = (nfqueue_execution_data_tmp*) data_ptr; - if (info->queue_cb == nullptr) return MNL_CB_OK; - return info->queue_cb(nlh, info->nl, info->data); + void loop() { + static_cast(this)->before_loop(); + for(;;) { + PktRequest* pkt; + queue.take(pkt); + static_cast(this)->handle_next_packet(pkt); + delete pkt; + } } - inline void _clear(){ - if (queue_msg_buffer != nullptr) { - free(queue_msg_buffer); - queue_msg_buffer = nullptr; - } - mnl_socket_close(nl); - nl = nullptr; - } - - inline ssize_t _send_config_cmd(nfqnl_msg_config_cmds cmd){ - struct nlmsghdr *nlh = nfq_nlmsg_put(queue_msg_buffer, NFQNL_MSG_CONFIG, queue_num); - nfq_nlmsg_cfg_put_cmd(nlh, AF_INET, cmd); - return mnl_socket_sendto(nl, nlh, nlh->nlmsg_len); - } - - inline ssize_t _recv_packet(){ - return mnl_socket_recvfrom(nl, queue_msg_buffer, BUF_SIZE); - } - + void run_thread_loop() { + thr = std::thread([this]() { this->loop(); }); + } }; +template , Worker>> +void __real_handler(PktRequest>* pkt) { + const size_t idx = hash_stream_id(pkt->sid) % pkt->ctx->size(); -template >> -class NFQueueSequence{ + auto* converted_pkt = reinterpret_cast*>(pkt); + converted_pkt->ctx = &((*pkt->ctx)[idx]); + + converted_pkt->ctx->queue.put(converted_pkt); +} - private: - vector nfq; - uint16_t _init; - uint16_t _end; - vector threads; - public: - static const int QUEUE_BASE_NUM = 1000; - NFQueueSequence(uint16_t seq_len){ - if (seq_len <= 0) throw invalid_argument("seq_len <= 0"); - nfq = vector(seq_len); - _init = QUEUE_BASE_NUM; - while(nfq[0] == nullptr){ - if (_init+seq_len-1 >= 65536){ - throw runtime_error("NFQueueSequence: too many queues!"); - } - for (int i=0;irun(); - })); - } - } +template , Worker>> +class MultiThreadQueue { + static_assert(std::is_base_of_v, Worker>, + "Worker must inherit from ThreadNfQueue"); - void join(){ - for (int i=0;i workers; + NfQueue, __real_handler> * nfq; + uint16_t queue_num_; + + +public: + const size_t n_threads; + static constexpr int QUEUE_BASE_NUM = 1000; - uint16_t init(){ - return _init; - } - uint16_t end(){ - return _end; - } - - ~NFQueueSequence(){ - for (int i=0;i, __real_handler>(qnum); + queue_num_ = qnum; + break; + } + catch(const std::invalid_argument&) { + if(qnum == std::numeric_limits::max()) + throw std::runtime_error("No available queue numbers"); + } + } + } + + ~MultiThreadQueue() { + delete nfq; + } + + void start() { + for(auto& worker : workers) { + worker.run_thread_loop(); + } + for (;;){ + nfq->handle_next_packet(&workers); } + } + + uint16_t queue_num() const { return queue_num_; } }; +}} // namespace Firegex::NfQueue #endif // NETFILTER_CLASS_CPP \ No newline at end of file diff --git a/backend/binsrc/classes/nfqueue.cpp b/backend/binsrc/classes/nfqueue.cpp new file mode 100644 index 0000000..a9af147 --- /dev/null +++ b/backend/binsrc/classes/nfqueue.cpp @@ -0,0 +1,370 @@ + +#ifndef NFQUEUE_CLASS_CPP +#define NFQUEUE_CLASS_CPP + +#include +#include +#include +#include +#include + +using namespace std; + +namespace Firegex{ +namespace NfQueue{ + +enum class FilterAction{ DROP, ACCEPT, MANGLE, NOACTION }; +enum class L4Proto { TCP, UDP, RAW }; +typedef Tins::TCPIP::StreamIdentifier stream_id; + + +template +class PktRequest { + private: + FilterAction action = FilterAction::NOACTION; + mnl_socket* nl = nullptr; + nfgenmsg * nfg = nullptr; + nfqnl_msg_packet_hdr *ph; + shared_ptr packet_buffer; // Will be deallocated here + size_t data_size = 0; + public: + const bool is_ipv6; + Tins::IP* ipv4 = nullptr; + Tins::IPv6* ipv6 = nullptr; + Tins::TCP* tcp = nullptr; + Tins::UDP* udp = nullptr; + const L4Proto l4_proto; + const bool is_input; + + const string packet; + const string data; + const stream_id sid; + + T* ctx; + + private: + + inline void fetch_data_size(Tins::PDU* pdu){ + auto inner = pdu->inner_pdu(); + if (inner == nullptr){ + data_size = 0; + }else{ + data_size = inner->size(); + } + } + + L4Proto fill_l4_info(){ + if (is_ipv6){ + tcp = ipv6->find_pdu(); + if (tcp == nullptr){ + udp = ipv6->find_pdu(); + if (udp == nullptr){ + fetch_data_size(ipv6); + return L4Proto::RAW; + }else{ + fetch_data_size(udp); + return L4Proto::UDP; + } + }else{ + fetch_data_size(tcp); + return L4Proto::TCP; + } + }else{ + tcp = ipv4->find_pdu(); + if (tcp == nullptr){ + udp = ipv4->find_pdu(); + if (udp == nullptr){ + fetch_data_size(ipv4); + return L4Proto::RAW; + }else{ + fetch_data_size(udp); + return L4Proto::UDP; + } + }else{ + fetch_data_size(tcp); + return L4Proto::TCP; + } + } + } + + public: + + PktRequest(shared_ptr buf, Tins::IP* ipv4, const char* payload, size_t plen, stream_id sid, T* ctx, mnl_socket* nl, nfgenmsg *nfg, nfqnl_msg_packet_hdr *ph, bool is_input): + is_ipv6(false), ipv4(ipv4), packet(string(payload, plen)), sid(sid), ctx(ctx), nl(nl), nfg(nfg), ph(ph), + is_input(is_input), packet_buffer(buf), l4_proto(fill_l4_info()), data(string(payload+(plen-data_size), data_size)) {} + + PktRequest(shared_ptr buf, Tins::IPv6* ipv6, const char* payload, size_t plen, stream_id sid, T* ctx, mnl_socket* nl, nfgenmsg *nfg, nfqnl_msg_packet_hdr *ph, bool is_input): + is_ipv6(true), ipv6(ipv6), packet(string(payload, plen)), sid(sid), ctx(ctx), nl(nl), nfg(nfg), ph(ph), + is_input(is_input), packet_buffer(buf), l4_proto(fill_l4_info()), data(string(payload+(plen-data_size), data_size)) {} + + void drop(){ + if (action == FilterAction::NOACTION){ + action = FilterAction::DROP; + perfrom_action(); + }else{ + throw invalid_argument("Cannot drop a packet that has already been dropped or accepted"); + } + } + + void accept(){ + if (action == FilterAction::NOACTION){ + action = FilterAction::ACCEPT; + perfrom_action(); + }else{ + throw invalid_argument("Cannot accept a packet that has already been dropped or accepted"); + } + } + + void mangle(){ + if (action == FilterAction::NOACTION){ + action = FilterAction::MANGLE; + perfrom_action(); + }else{ + throw invalid_argument("Cannot mangle a packet that has already been accepted or dropped"); + } + } + + FilterAction get_action(){ + return action; + } + + ~PktRequest(){ + if (ipv4 != nullptr){ + delete ipv4; + } + if (ipv6 != nullptr){ + delete ipv6; + } + } + + private: + void perfrom_action(){ + char buf[MNL_SOCKET_BUFFER_SIZE]; + struct nlmsghdr *nlh_verdict; + nlh_verdict = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, ntohs(nfg->res_id)); + switch (action) + { + case FilterAction::ACCEPT: + nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); + break; + case FilterAction::DROP: + nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_DROP ); + break; + case FilterAction::MANGLE:{ + if (is_ipv6){ + nfq_nlmsg_verdict_put_pkt(nlh_verdict, ipv6->serialize().data(), ipv6->size()); + }else{ + nfq_nlmsg_verdict_put_pkt(nlh_verdict, ipv4->serialize().data(), ipv4->size()); + } + nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); + break; + } + default: + throw invalid_argument("Invalid action"); + } + if (mnl_socket_sendto(nl, nlh_verdict, nlh_verdict->nlmsg_len) < 0) { + throw runtime_error( "mnl_socket_send" ); + } + } + +}; + +struct internal_nfqueue_execution_data_tmp{ + mnl_socket* nl = nullptr; + void *data = nullptr; + shared_ptr packet_buffer; +}; + +const size_t NFQUEUE_BUFFER_SIZE = 0xffff + (MNL_SOCKET_BUFFER_SIZE/2); +/* NfQueue wrapper class to handle nfqueue packets + this class is made to be possible enqueue multiple packets to multiple threads + --> handle function is responsable to delete the PktRequest object */ +template *)> +class NfQueue { + private: + mnl_socket* nl = nullptr; + unsigned int portid; + public: + + const uint16_t queue_num; + + NfQueue(u_int16_t queue_num): queue_num(queue_num) { + nl = mnl_socket_open(NETLINK_NETFILTER); + + if (nl == nullptr) { throw runtime_error( "mnl_socket_open" );} + + if (mnl_socket_bind(nl, 0, MNL_SOCKET_AUTOPID) < 0) { + mnl_socket_close(nl); + throw runtime_error( "mnl_socket_bind" ); + } + portid = mnl_socket_get_portid(nl); + + char queue_msg_buffer[NFQUEUE_BUFFER_SIZE]; + + if (_send_config_cmd(NFQNL_CFG_CMD_BIND, queue_msg_buffer) < 0) { + _clear(); + throw runtime_error( "mnl_socket_send" ); + } + //TEST if BIND was successful + if (_send_config_cmd(NFQNL_CFG_CMD_NONE, queue_msg_buffer) < 0) { // SEND A NONE command to generate an error meessage + _clear(); + throw runtime_error( "mnl_socket_send" ); + } + if (_recv_packet(queue_msg_buffer) == -1) { //RECV the error message + _clear(); + throw runtime_error( "mnl_socket_recvfrom" ); + } + + struct nlmsghdr *nlh = (struct nlmsghdr *) queue_msg_buffer; + + if (nlh->nlmsg_type != NLMSG_ERROR) { + _clear(); + throw runtime_error( "unexpected packet from kernel (expected NLMSG_ERROR packet)" ); + } + //nfqnl_msg_config_cmd + nlmsgerr* error_msg = (nlmsgerr *)mnl_nlmsg_get_payload(nlh); + + // error code taken from the linux kernel: + // https://elixir.bootlin.com/linux/v5.18.12/source/include/linux/errno.h#L27 + #define ENOTSUPP 524 /* Operation is not supported */ + + if (error_msg->error != -ENOTSUPP) { + _clear(); + throw invalid_argument( "queueid is already busy" ); + } + + //END TESTING BIND + nlh = nfq_nlmsg_put(queue_msg_buffer, NFQNL_MSG_CONFIG, queue_num); + nfq_nlmsg_cfg_put_params(nlh, NFQNL_COPY_PACKET, 0xffff); + + mnl_attr_put_u32(nlh, NFQA_CFG_FLAGS, htonl(NFQA_CFG_F_GSO)); + mnl_attr_put_u32(nlh, NFQA_CFG_MASK, htonl(NFQA_CFG_F_GSO)); + + if (mnl_socket_sendto(nl, nlh, nlh->nlmsg_len) < 0) { + _clear(); + throw runtime_error( "mnl_socket_send" ); + } + + /* + * ENOBUFS is signalled to userspace when packets were lost + * on kernel side. In most cases, userspace isn't interested + * in this information, so turn it off. + */ + int tmp = 1; + mnl_socket_setsockopt(nl, NETLINK_NO_ENOBUFS, &tmp, sizeof(int)); + + } + + void handle_next_packet(D* data){ + auto queue_msg_buffer = make_shared(NFQUEUE_BUFFER_SIZE); + int ret = _recv_packet(queue_msg_buffer.get(), NFQUEUE_BUFFER_SIZE); + if (ret == -1) { + throw runtime_error( "mnl_socket_recvfrom" ); + } + internal_nfqueue_execution_data_tmp raw_ptr = { + nl: nl, + data: data, + packet_buffer: queue_msg_buffer + }; + + ret = mnl_cb_run(queue_msg_buffer.get(), ret, 0, portid, _real_queue_cb, &raw_ptr); + if (ret <= 0){ + cerr << "[error] [NfQueue.handle_next_packet] mnl_cb_run error with: " << ret << endl; + throw runtime_error( "mnl_cb_run error!" ); + } + } + + ~NfQueue() { + char queue_msg_buffer[NFQUEUE_BUFFER_SIZE]; + _send_config_cmd(NFQNL_CFG_CMD_UNBIND, queue_msg_buffer); + _clear(); + } + + private: + + template>> + static void inline _send_verdict(shared_ptr raw_buf, T* packet, char *payload, uint16_t plen, nfgenmsg *nfg, nfqnl_msg_packet_hdr *ph, internal_nfqueue_execution_data_tmp* ctx, bool is_input){ + handle_func(new PktRequest( + raw_buf, packet, payload, plen, + stream_id::make_identifier(*packet), + (D*)ctx->data, ctx->nl, nfg, ph, is_input + )); + } + + static int _real_queue_cb(const nlmsghdr *nlh, void *data_ptr) { + + internal_nfqueue_execution_data_tmp* info = (internal_nfqueue_execution_data_tmp*) data_ptr; + + //Extract attributes from the nlmsghdr + nlattr *attr[NFQA_MAX+1] = {}; + + if (nfq_nlmsg_parse(nlh, attr) < 0) { + cerr << "[error] [NfQueue._real_queue_cb] problems parsing" << endl; + return MNL_CB_ERROR; + } + if (attr[NFQA_PACKET_HDR] == nullptr) { + cerr << "[error] [NfQueue._real_queue_cb] packet header not set" << endl; + return MNL_CB_ERROR; + } + if (attr[NFQA_MARK] == nullptr) { + cerr << "[error] [NfQueue._real_queue_cb] mark not set" << endl; + return MNL_CB_ERROR; + } + + //Get Payload + uint16_t plen = mnl_attr_get_payload_len(attr[NFQA_PAYLOAD]); + char *payload = (char *)mnl_attr_get_payload(attr[NFQA_PAYLOAD]); + + //Return result to the kernel + struct nfqnl_msg_packet_hdr *ph = (nfqnl_msg_packet_hdr*) mnl_attr_get_payload(attr[NFQA_PACKET_HDR]); + struct nfgenmsg *nfg = (nfgenmsg *)mnl_nlmsg_get_payload(nlh); + + bool is_input = ntohl(mnl_attr_get_u32(attr[NFQA_MARK])) & 0x1; // == 0x1337 that is odd + // Check IP protocol version + if ( (payload[0] & 0xf0) == 0x40 ){ + _send_verdict(info->packet_buffer, new Tins::IP((uint8_t*)payload, plen), payload, plen, nfg, ph, info, is_input); + }else{ + _send_verdict(info->packet_buffer, new Tins::IPv6((uint8_t*)payload, plen), payload, plen, nfg, ph, info, is_input); + } + return MNL_CB_OK; + } + + inline void _clear(){ + if (nl != nullptr) { + mnl_socket_close(nl); + nl = nullptr; + } + } + + inline ssize_t _send_config_cmd(nfqnl_msg_config_cmds cmd, char* buf){ + struct nlmsghdr *nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, queue_num); + nfq_nlmsg_cfg_put_cmd(nlh, AF_INET, cmd); + return mnl_socket_sendto(nl, nlh, nlh->nlmsg_len); + } + + inline ssize_t _recv_packet(char* buf, size_t buf_size = NFQUEUE_BUFFER_SIZE){ + return mnl_socket_recvfrom(nl, buf, buf_size); + } + +}; + + + +uint32_t hash_stream_id(const stream_id &sid) { + uint32_t addr_hash = 0; + const uint32_t* min_addr = reinterpret_cast(sid.min_address.data()); + const uint32_t* max_addr = reinterpret_cast(sid.max_address.data()); + addr_hash ^= min_addr[0] ^ min_addr[1] ^ min_addr[2] ^ min_addr[3]; + addr_hash ^= max_addr[0] ^ max_addr[1] ^ max_addr[2] ^ max_addr[3]; + + uint32_t ports = (static_cast(sid.min_address_port) << 16) | sid.max_address_port; + + uint32_t hash = addr_hash ^ ports; + + hash *= 0x9e3779b9; + + return hash; +} + +}} +#endif // NFQUEUE_CLASS_CPP \ No newline at end of file diff --git a/backend/binsrc/nfproxy-tun.cpp b/backend/binsrc/nfproxy-tun.cpp index b44135a..520292b 100644 --- a/backend/binsrc/nfproxy-tun.cpp +++ b/backend/binsrc/nfproxy-tun.cpp @@ -1,26 +1,40 @@ +#define PY_SSIZE_T_CLEAN +#include + #include "proxytun/settings.cpp" #include "proxytun/proxytun.cpp" #include "classes/netfilter.cpp" #include #include +#include +#include using namespace std; +using namespace Firegex::PyProxy; +using Firegex::NfQueue::MultiThreadQueue; + +ssize_t read_check(int __fd, void *__buf, size_t __nbytes){ + ssize_t bytes = read(__fd, __buf, __nbytes); + if (bytes == 0){ + cerr << "[fatal] [updater] read() returned EOF" << endl; + throw invalid_argument("read() returned EOF"); + } + if (bytes < 0){ + cerr << "[fatal] [updater] read() returned an error" << bytes << endl; + throw invalid_argument("read() returned an error"); + } + return bytes; +} void config_updater (){ while (true){ - //TODO read config getline(cin, line); - if (cin.eof()){ - cerr << "[fatal] [updater] cin.eof()" << endl; - exit(EXIT_FAILURE); - } - if (cin.bad()){ - cerr << "[fatal] [updater] cin.bad()" << endl; - exit(EXIT_FAILURE); - } + uint32_t code_size; + read_check(STDIN_FILENO, &code_size, 4); + vector code(code_size); + read_check(STDIN_FILENO, code.data(), code_size); cerr << "[info] [updater] Updating configuration" << endl; - try{ - //TODO add data config.reset(new PyCodeConfig("")); + config.reset(new PyCodeConfig(code)); cerr << "[info] [updater] Config update done" << endl; osyncstream(cout) << "ACK OK" << endl; }catch(const std::exception& e){ @@ -31,18 +45,28 @@ void config_updater (){ } int main(int argc, char *argv[]){ + + Py_Initialize(); + atexit(Py_Finalize); + + if (freopen(nullptr, "rb", stdin) == nullptr){ // We need to read from stdin binary data + cerr << "[fatal] [main] Failed to reopen stdin in binary mode" << endl; + return 1; + } int n_of_threads = 1; char * n_threads_str = getenv("NTHREADS"); if (n_threads_str != nullptr) n_of_threads = ::atoi(n_threads_str); if(n_of_threads <= 0) n_of_threads = 1; - config.reset(new PyCodeConfig("")); - - NFQueueSequence queues(n_of_threads); - queues.start(); + config.reset(new PyCodeConfig()); + MultiThreadQueue queue(n_of_threads); - osyncstream(cout) << "QUEUES " << queues.init() << " " << queues.end() << endl; - cerr << "[info] [main] Queues: " << queues.init() << ":" << queues.end() << " threads assigned: " << n_of_threads << endl; + osyncstream(cout) << "QUEUE " << queue.queue_num() << endl; + cerr << "[info] [main] Queue: " << queue.queue_num() << " threads assigned: " << n_of_threads << endl; + thread qthr([&](){ + queue.start(); + }); config_updater(); + qthr.join(); } diff --git a/backend/binsrc/nfqueue.cpp b/backend/binsrc/nfregex.cpp similarity index 79% rename from backend/binsrc/nfqueue.cpp rename to backend/binsrc/nfregex.cpp index 3d22fe5..45493b8 100644 --- a/backend/binsrc/nfqueue.cpp +++ b/backend/binsrc/nfregex.cpp @@ -5,6 +5,8 @@ #include using namespace std; +using namespace Firegex::Regex; +using Firegex::NfQueue::MultiThreadQueue; void config_updater (){ string line; @@ -54,12 +56,17 @@ int main(int argc, char *argv[]){ } regex_config.reset(new RegexRules(stream_mode)); + - NFQueueSequence queues(n_of_threads); - queues.start(); + MultiThreadQueue queue_manager(n_of_threads); - osyncstream(cout) << "QUEUES " << queues.init() << " " << queues.end() << endl; - cerr << "[info] [main] Queues: " << queues.init() << ":" << queues.end() << " threads assigned: " << n_of_threads << " stream mode: " << stream_mode << endl; + osyncstream(cout) << "QUEUE " << queue_manager.queue_num() << endl; + cerr << "[info] [main] Queue: " << queue_manager.queue_num() << " threads assigned: " << n_of_threads << " stream mode: " << stream_mode << endl; + thread qthr([&](){ + queue_manager.start(); + }); config_updater(); + qthr.join(); + } diff --git a/backend/binsrc/proxytun/proxytun.cpp b/backend/binsrc/proxytun/proxytun.cpp index 50aad35..910a86b 100644 --- a/backend/binsrc/proxytun/proxytun.cpp +++ b/backend/binsrc/proxytun/proxytun.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include "../classes/netfilter.cpp" @@ -24,24 +23,32 @@ using Tins::TCPIP::Stream; using Tins::TCPIP::StreamFollower; using namespace std; -class PyProxyQueue: public NfQueueExecutor { +namespace Firegex { +namespace PyProxy { + +class PyProxyQueue: public NfQueue::ThreadNfQueue { public: stream_ctx sctx; + StreamFollower follower; + + struct { + bool matching_has_been_called = false; + bool already_closed = false; + bool result; + NfQueue::PktRequest* pkt; + } match_ctx; void before_loop() override { - sctx.follower.new_stream_callback(bind(on_new_stream, placeholders::_1, &sctx)); - sctx.follower.stream_termination_callback(bind(on_stream_close, placeholders::_1, &sctx)); + follower.new_stream_callback(bind(on_new_stream, placeholders::_1, this)); + follower.stream_termination_callback(bind(on_stream_close, placeholders::_1, this)); } - void * callback_data_fetch() override{ - return &sctx; - } - - static bool filter_action(packet_info& info){ + bool filter_action(NfQueue::PktRequest* pkt){ shared_ptr conf = config; - auto stream_search = info.sctx->streams_ctx.find(info.sid); - pyfilter_ctx stream_match; - if (stream_search == info.sctx->streams_ctx.end()){ + + auto stream_search = sctx.streams_ctx.find(pkt->sid); + pyfilter_ctx* stream_match; + if (stream_search == sctx.streams_ctx.end()){ // TODO: New pyfilter_ctx }else{ stream_match = stream_search->second; @@ -59,86 +66,76 @@ class PyProxyQueue: public NfQueueExecutor { } //If the stream has already been matched, drop all data, and try to close the connection - static void keep_fin_packet(stream_ctx* sctx){ - sctx->match_info.matching_has_been_called = true; - sctx->match_info.already_closed = true; + static void keep_fin_packet(PyProxyQueue* pkt){ + pkt->match_ctx.matching_has_been_called = true; + pkt->match_ctx.already_closed = true; } - static void on_data_recv(Stream& stream, stream_ctx* sctx, string data) { - sctx->match_info.matching_has_been_called = true; - sctx->match_info.already_closed = false; - bool result = filter_action(*sctx->match_info.pkt_info); + static void on_data_recv(Stream& stream, PyProxyQueue* pkt, string data) { + pkt->match_ctx.matching_has_been_called = true; + pkt->match_ctx.already_closed = false; + bool result = pkt->filter_action(pkt->match_ctx.pkt); if (!result){ - sctx->clean_stream_by_id(sctx->match_info.pkt_info->sid); - stream.client_data_callback(bind(keep_fin_packet, sctx)); - stream.server_data_callback(bind(keep_fin_packet, sctx)); + pkt->sctx.clean_stream_by_id(pkt->match_ctx.pkt->sid); + stream.client_data_callback(bind(keep_fin_packet, pkt)); + stream.server_data_callback(bind(keep_fin_packet, pkt)); } - sctx->match_info.result = result; + pkt->match_ctx.result = result; } //Input data filtering - static void on_client_data(Stream& stream, stream_ctx* sctx) { - sctx->match_info.pkt_info->is_input = true; - on_data_recv(stream, sctx, string(stream.client_payload().begin(), stream.client_payload().end())); + static void on_client_data(Stream& stream, PyProxyQueue* pkt) { + on_data_recv(stream, pkt, string(stream.client_payload().begin(), stream.client_payload().end())); } //Server data filtering - static void on_server_data(Stream& stream, stream_ctx* sctx) { - sctx->match_info.pkt_info->is_input = false; - on_data_recv(stream, sctx, string(stream.server_payload().begin(), stream.server_payload().end())); + static void on_server_data(Stream& stream, PyProxyQueue* pkt) { + on_data_recv(stream, pkt, string(stream.server_payload().begin(), stream.server_payload().end())); } // A stream was terminated. The second argument is the reason why it was terminated - static void on_stream_close(Stream& stream, stream_ctx* sctx) { + static void on_stream_close(Stream& stream, PyProxyQueue* pkt) { stream_id stream_id = stream_id::make_identifier(stream); - sctx->clean_stream_by_id(stream_id); + pkt->sctx.clean_stream_by_id(stream_id); } - static void on_new_stream(Stream& stream, stream_ctx* sctx) { + static void on_new_stream(Stream& stream, PyProxyQueue* pkt) { stream.auto_cleanup_payloads(true); if (stream.is_partial_stream()) { //TODO take a decision about this... stream.enable_recovery_mode(10 * 1024); } - stream.client_data_callback(bind(on_client_data, placeholders::_1, sctx)); - stream.server_data_callback(bind(on_server_data, placeholders::_1, sctx)); - stream.stream_closed_callback(bind(on_stream_close, placeholders::_1, sctx)); + stream.client_data_callback(bind(on_client_data, placeholders::_1, pkt)); + stream.server_data_callback(bind(on_server_data, placeholders::_1, pkt)); + stream.stream_closed_callback(bind(on_stream_close, placeholders::_1, pkt)); } - template - static void build_verdict(T packet, uint8_t *payload, uint16_t plen, nlmsghdr *nlh_verdict, nfqnl_msg_packet_hdr *ph, stream_ctx* sctx, bool is_ipv6){ - Tins::TCP* tcp = packet.template find_pdu(); - if (!tcp){ + + void handle_next_packet(NfQueue::PktRequest* pkt) override{ + if (pkt->l4_proto != NfQueue::L4Proto::TCP){ throw invalid_argument("Only TCP and UDP are supported"); } - Tins::PDU* application_layer = tcp->inner_pdu(); + Tins::PDU* application_layer = pkt->tcp->inner_pdu(); u_int16_t payload_size = 0; if (application_layer != nullptr){ payload_size = application_layer->size(); } - packet_info pktinfo{ - payload: string(payload+plen - payload_size, payload+plen), - sid: stream_id::make_identifier(packet), - is_ipv6: is_ipv6, - sctx: sctx, - packet_pdu: &packet, - tcp: tcp, - }; - sctx->match_info.matching_has_been_called = false; - sctx->match_info.pkt_info = &pktinfo; - sctx->follower.process_packet(packet); + match_ctx.matching_has_been_called = false; + match_ctx.pkt = pkt; + if (pkt->is_ipv6){ + follower.process_packet(*pkt->ipv6); + }else{ + follower.process_packet(*pkt->ipv4); + } // Do an action only is an ordered packet has been received - if (sctx->match_info.matching_has_been_called){ - bool empty_payload = pktinfo.payload.empty(); + if (match_ctx.matching_has_been_called){ + bool empty_payload = payload_size == 0; //In this 2 cases we have to remove all data about the stream - if (!sctx->match_info.result || sctx->match_info.already_closed){ - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.build_verdict] Stream matched, removing all data about it" << endl; - #endif - sctx->clean_stream_by_id(pktinfo.sid); + if (!match_ctx.result || match_ctx.already_closed){ + sctx.clean_stream_by_id(pkt->sid); //If the packet has data, we have to remove it if (!empty_payload){ - Tins::PDU* data_layer = tcp->release_inner_pdu(); + Tins::PDU* data_layer = pkt->tcp->release_inner_pdu(); if (data_layer != nullptr){ delete data_layer; } @@ -146,59 +143,17 @@ class PyProxyQueue: public NfQueueExecutor { //For the first matched data or only for data packets, we set FIN bit //This only for client packets, because this will trigger server to close the connection //Packets will be filtered anyway also if client don't send packets - if ((!sctx->match_info.result || !empty_payload) && is_input){ - tcp->set_flag(Tins::TCP::FIN,1); - tcp->set_flag(Tins::TCP::ACK,1); - tcp->set_flag(Tins::TCP::SYN,0); + if ((!match_ctx.result || !empty_payload) && pkt->is_input){ + pkt->tcp->set_flag(Tins::TCP::FIN,1); + pkt->tcp->set_flag(Tins::TCP::ACK,1); + pkt->tcp->set_flag(Tins::TCP::SYN,0); } //Send the edited packet to the kernel - nfq_nlmsg_verdict_put_pkt(nlh_verdict, packet.serialize().data(), packet.size()); + return pkt->mangle(); } } - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); - + return pkt->accept(); } - - static int queue_cb(const nlmsghdr *nlh, const mnl_socket* nl, void *data_ptr) { - - stream_ctx* sctx = (stream_ctx*)data_ptr; - - //Extract attributes from the nlmsghdr - nlattr *attr[NFQA_MAX+1] = {}; - - if (nfq_nlmsg_parse(nlh, attr) < 0) { - perror("problems parsing"); - return MNL_CB_ERROR; - } - if (attr[NFQA_PACKET_HDR] == nullptr) { - fputs("metaheader not set\n", stderr); - return MNL_CB_ERROR; - } - //Get Payload - uint16_t plen = mnl_attr_get_payload_len(attr[NFQA_PAYLOAD]); - uint8_t *payload = (uint8_t *)mnl_attr_get_payload(attr[NFQA_PAYLOAD]); - - //Return result to the kernel - struct nfqnl_msg_packet_hdr *ph = (nfqnl_msg_packet_hdr*) mnl_attr_get_payload(attr[NFQA_PACKET_HDR]); - struct nfgenmsg *nfg = (nfgenmsg *)mnl_nlmsg_get_payload(nlh); - char buf[MNL_SOCKET_BUFFER_SIZE]; - struct nlmsghdr *nlh_verdict; - - nlh_verdict = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, ntohs(nfg->res_id)); - // Check IP protocol version - if ( (payload[0] & 0xf0) == 0x40 ){ - build_verdict(Tins::IP(payload, plen), payload, plen, nlh_verdict, ph, sctx, false); - }else{ - build_verdict(Tins::IPv6(payload, plen), payload, plen, nlh_verdict, ph, sctx, true); - } - - if (mnl_socket_sendto(nl, nlh_verdict, nlh_verdict->nlmsg_len) < 0) { - throw runtime_error( "mnl_socket_send" ); - } - return MNL_CB_OK; - } - - PyProxyQueue(int queue) : NfQueueExecutor(queue, &queue_cb) {} ~PyProxyQueue() { sctx.clean(); @@ -206,4 +161,5 @@ class PyProxyQueue: public NfQueueExecutor { }; +}} #endif // PROXY_TUNNEL_CLASS_CPP \ No newline at end of file diff --git a/backend/binsrc/proxytun/settings.cpp b/backend/binsrc/proxytun/settings.cpp index fc43c51..f4adae4 100644 --- a/backend/binsrc/proxytun/settings.cpp +++ b/backend/binsrc/proxytun/settings.cpp @@ -1,21 +1,17 @@ #ifndef PROXY_TUNNEL_SETTINGS_CPP #define PROXY_TUNNEL_SETTINGS_CPP -#include -#include -#include -#include "../utils.hpp" #include -#include #include using namespace std; class PyCodeConfig{ public: - const string code; + const vector code; public: - PyCodeConfig(string pycode): code(pycode){} + PyCodeConfig(vector pycode): code(pycode){} + PyCodeConfig(): code(vector()){} ~PyCodeConfig(){} }; diff --git a/backend/binsrc/proxytun/stream_ctx.cpp b/backend/binsrc/proxytun/stream_ctx.cpp index b2ade14..a1f2db0 100644 --- a/backend/binsrc/proxytun/stream_ctx.cpp +++ b/backend/binsrc/proxytun/stream_ctx.cpp @@ -3,11 +3,8 @@ #define STREAM_CTX_CPP #include -#include #include -using Tins::TCPIP::Stream; -using Tins::TCPIP::StreamFollower; using namespace std; typedef Tins::TCPIP::StreamIdentifier stream_id; @@ -17,44 +14,25 @@ struct pyfilter_ctx { string pycode; }; -typedef map matching_map; - -struct packet_info; - -struct tcp_stream_tmp { - bool matching_has_been_called = false; - bool already_closed = false; - bool result; - packet_info *pkt_info; -}; +typedef map matching_map; struct stream_ctx { matching_map streams_ctx; - StreamFollower follower; - tcp_stream_tmp match_info; + void clean_stream_by_id(stream_id sid){ auto stream_search = streams_ctx.find(sid); if (stream_search != streams_ctx.end()){ auto stream_match = stream_search->second; //DEALLOC PY GLOB TODO + delete stream_match; } } void clean(){ for (auto ele: streams_ctx){ //TODO dealloc ele.second.pyglob + delete ele.second; } } }; -struct packet_info { - string payload; - stream_id sid; - bool is_input; - bool is_ipv6; - stream_ctx* sctx; - Tins::PDU* packet_pdu; - Tins::TCP* tcp; -}; - - #endif // STREAM_CTX_CPP \ No newline at end of file diff --git a/backend/binsrc/regex/regex_rules.cpp b/backend/binsrc/regex/regex_rules.cpp index 8715932..c59ad45 100644 --- a/backend/binsrc/regex/regex_rules.cpp +++ b/backend/binsrc/regex/regex_rules.cpp @@ -4,13 +4,16 @@ #include #include #include -#include "../utils.hpp" +#include "../utils.cpp" #include #include #include using namespace std; +namespace Firegex { +namespace Regex { + enum FilterDirection{ CTOS, STOC }; struct decoded_regex { @@ -181,10 +184,6 @@ void inline scratch_setup(regex_ruleset &conf, hs_scratch_t* & scratch){ } } -struct matched_data{ - unsigned int matched = 0; - bool has_matched = false; -}; - +}} #endif // REGEX_FILTER_CPP diff --git a/backend/binsrc/regex/regexfilter.cpp b/backend/binsrc/regex/regexfilter.cpp index 2690fa3..63af811 100644 --- a/backend/binsrc/regex/regexfilter.cpp +++ b/backend/binsrc/regex/regexfilter.cpp @@ -15,53 +15,59 @@ #include #include #include +#include #include #include "../classes/netfilter.cpp" #include "stream_ctx.cpp" #include "regex_rules.cpp" -using Tins::TCPIP::Stream; -using Tins::TCPIP::StreamFollower; using namespace std; -class RegexQueue: public NfQueueExecutor { - public: + +namespace Firegex { +namespace Regex { + +using Tins::TCPIP::Stream; +using Tins::TCPIP::StreamFollower; + + + +class RegexNfQueue : public NfQueue::ThreadNfQueue { +public: stream_ctx sctx; - - void before_loop() override { - sctx.follower.new_stream_callback(bind(on_new_stream, placeholders::_1, &sctx)); - sctx.follower.stream_termination_callback(bind(on_stream_close, placeholders::_1, &sctx)); - } - - void * callback_data_fetch() override{ - return &sctx; - } - - static bool filter_action(packet_info& info){ - shared_ptr conf = regex_config; - auto current_version = conf->ver(); - if (current_version != info.sctx->latest_config_ver){ - #ifdef DEBUG - cerr << "[DEBUG] [filter_callback] Configuration has changed (" << current_version << "!=" << info.sctx->latest_config_ver << "), cleaning scratch spaces" << endl; - #endif - info.sctx->clean(); - info.sctx->latest_config_ver = current_version; - } - scratch_setup(conf->input_ruleset, info.sctx->in_scratch); - scratch_setup(conf->output_ruleset, info.sctx->out_scratch); + u_int16_t latest_config_ver = 0; + StreamFollower follower; + struct { + bool matching_has_been_called = false; + bool already_closed = false; + bool result; + NfQueue::PktRequest* pkt; + } match_ctx; - hs_database_t* regex_matcher = info.is_input ? conf->input_ruleset.hs_db : conf->output_ruleset.hs_db; + + bool filter_action(NfQueue::PktRequest* pkt){ + shared_ptr conf = regex_config; + + auto current_version = conf->ver(); + if (current_version != latest_config_ver){ + sctx.clean(); + latest_config_ver = current_version; + } + scratch_setup(conf->input_ruleset, sctx.in_scratch); + scratch_setup(conf->output_ruleset, sctx.out_scratch); + + hs_database_t* regex_matcher = pkt->is_input ? conf->input_ruleset.hs_db : conf->output_ruleset.hs_db; if (regex_matcher == nullptr){ return true; } - #ifdef DEBUG - cerr << "[DEBUG] [filter_callback] Matching packet with " << (info.is_input ? "input" : "output") << " ruleset" << endl; - #endif - - matched_data match_res; + struct matched_data{ + unsigned int matched = 0; + bool has_matched = false; + } match_res; + hs_error_t err; - hs_scratch_t* scratch_space = info.is_input ? info.sctx->in_scratch: info.sctx->out_scratch; + hs_scratch_t* scratch_space = pkt->is_input ? sctx.in_scratch: sctx.out_scratch; auto match_func = [](unsigned int id, auto from, auto to, auto flags, auto ctx){ auto res = (matched_data*)ctx; res->has_matched = true; @@ -70,49 +76,32 @@ class RegexQueue: public NfQueueExecutor { }; hs_stream_t* stream_match; if (conf->stream_mode()){ - matching_map* match_map = info.is_input ? &info.sctx->in_hs_streams : &info.sctx->out_hs_streams; - #ifdef DEBUG - cerr << "[DEBUG] [filter_callback] Dumping match_map " << match_map << endl; - for (auto ele: *match_map){ - cerr << "[DEBUG] [filter_callback] " << ele.first << " -> " << ele.second << endl; - } - cerr << "[DEBUG] [filter_callback] End of match_map" << endl; - #endif - auto stream_search = match_map->find(info.sid); + matching_map* match_map = pkt->is_input ? &sctx.in_hs_streams : &sctx.out_hs_streams; + auto stream_search = match_map->find(pkt->sid); if (stream_search == match_map->end()){ - - #ifdef DEBUG - cerr << "[DEBUG] [filter_callback] Creating new stream matcher for " << info.sid << endl; - #endif if (hs_open_stream(regex_matcher, 0, &stream_match) != HS_SUCCESS) { cerr << "[error] [filter_callback] Error opening the stream matcher (hs)" << endl; throw invalid_argument("Cannot open stream match on hyperscan"); } - if (info.is_tcp){ - match_map->insert_or_assign(info.sid, stream_match); + if (pkt->l4_proto == NfQueue::L4Proto::TCP){ + match_map->insert_or_assign(pkt->sid, stream_match); } }else{ stream_match = stream_search->second; } - #ifdef DEBUG - cerr << "[DEBUG] [filter_callback] Matching as a stream" << endl; - #endif err = hs_scan_stream( - stream_match,info.payload.c_str(), info.payload.length(), + stream_match,pkt->data.c_str(), pkt->data.size(), 0, scratch_space, match_func, &match_res ); }else{ - #ifdef DEBUG - cerr << "[DEBUG] [filter_callback] Matching as a block" << endl; - #endif err = hs_scan( - regex_matcher,info.payload.c_str(), info.payload.length(), + regex_matcher,pkt->data.c_str(), pkt->data.size(), 0, scratch_space, match_func, &match_res ); } if ( - !info.is_tcp && conf->stream_mode() && + pkt->l4_proto != NfQueue::L4Proto::TCP && conf->stream_mode() && hs_close_stream(stream_match, scratch_space, nullptr, nullptr) != HS_SUCCESS ){ cerr << "[error] [filter_callback] Error closing the stream matcher (hs)" << endl; @@ -123,117 +112,34 @@ class RegexQueue: public NfQueueExecutor { throw invalid_argument("Error while matching the stream with hyperscan"); } if (match_res.has_matched){ - auto rules_vector = info.is_input ? conf->input_ruleset.regexes : conf->output_ruleset.regexes; + auto& rules_vector = pkt->is_input ? conf->input_ruleset.regexes : conf->output_ruleset.regexes; osyncstream(cout) << "BLOCKED " << rules_vector[match_res.matched] << endl; return false; } return true; } - //If the stream has already been matched, drop all data, and try to close the connection - static void keep_fin_packet(stream_ctx* sctx){ - sctx->match_info.matching_has_been_called = true; - sctx->match_info.already_closed = true; - } - - static void on_data_recv(Stream& stream, stream_ctx* sctx, string data) { - sctx->match_info.matching_has_been_called = true; - sctx->match_info.already_closed = false; - bool result = filter_action(*sctx->match_info.pkt_info); - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.on_data_recv] result: " << result << endl; - #endif - if (!result){ - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.on_data_recv] Stream matched, removing all data about it" << endl; - #endif - sctx->clean_stream_by_id(sctx->match_info.pkt_info->sid); - stream.client_data_callback(bind(keep_fin_packet, sctx)); - stream.server_data_callback(bind(keep_fin_packet, sctx)); - } - sctx->match_info.result = result; - } - - //Input data filtering - static void on_client_data(Stream& stream, stream_ctx* sctx) { - on_data_recv(stream, sctx, string(stream.client_payload().begin(), stream.client_payload().end())); - } - - //Server data filtering - static void on_server_data(Stream& stream, stream_ctx* sctx) { - on_data_recv(stream, sctx, string(stream.server_payload().begin(), stream.server_payload().end())); - } - - // A stream was terminated. The second argument is the reason why it was terminated - static void on_stream_close(Stream& stream, stream_ctx* sctx) { - stream_id stream_id = stream_id::make_identifier(stream); - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.on_stream_close] Stream terminated, deleting all data" << endl; - #endif - sctx->clean_stream_by_id(stream_id); - } - - static void on_new_stream(Stream& stream, stream_ctx* sctx) { - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.on_new_stream] New stream detected" << endl; - #endif - stream.auto_cleanup_payloads(true); - if (stream.is_partial_stream()) { - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.on_new_stream] Partial stream detected" << endl; - #endif - stream.enable_recovery_mode(10 * 1024); - } - stream.client_data_callback(bind(on_client_data, placeholders::_1, sctx)); - stream.server_data_callback(bind(on_server_data, placeholders::_1, sctx)); - stream.stream_closed_callback(bind(on_stream_close, placeholders::_1, sctx)); - } + void handle_next_packet(NfQueue::PktRequest* pkt) override{ + bool empty_payload = pkt->data.size() == 0; + if (pkt->tcp){ + match_ctx.matching_has_been_called = false; + match_ctx.pkt = pkt; - template - static void build_verdict(T packet, uint8_t *payload, uint16_t plen, nlmsghdr *nlh_verdict, nfqnl_msg_packet_hdr *ph, stream_ctx* sctx, bool is_input, bool is_ipv6){ - Tins::TCP* tcp = packet.template find_pdu(); - - if (tcp){ - Tins::PDU* application_layer = tcp->inner_pdu(); - u_int16_t payload_size = 0; - if (application_layer != nullptr){ - payload_size = application_layer->size(); - } - packet_info pktinfo{ - payload: string(payload+plen - payload_size, payload+plen), - sid: stream_id::make_identifier(packet), - is_input: is_input, - is_tcp: true, - is_ipv6: is_ipv6, - sctx: sctx, - packet_pdu: &packet, - layer4_pdu: tcp, - }; - sctx->match_info.matching_has_been_called = false; - sctx->match_info.pkt_info = &pktinfo; - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.build_verdict] TCP Packet received " << packet.src_addr() << ":" << tcp->sport() << " -> " << packet.dst_addr() << ":" << tcp->dport() << " thr: " << this_thread::get_id() << ", sending to libtins StreamFollower" << endl; - #endif - sctx->follower.process_packet(packet); - #ifdef DEBUG - if (sctx->tcp_match_util.matching_has_been_called){ - cerr << "[DEBUG] [NetfilterQueue.build_verdict] StreamFollower has called matching functions" << endl; + if (pkt->ipv4){ + follower.process_packet(*pkt->ipv4); }else{ - cerr << "[DEBUG] [NetfilterQueue.build_verdict] StreamFollower has NOT called matching functions" << endl; + follower.process_packet(*pkt->ipv6); } - #endif + // Do an action only is an ordered packet has been received - if (sctx->match_info.matching_has_been_called){ - bool empty_payload = pktinfo.payload.empty(); + if (match_ctx.matching_has_been_called){ + //In this 2 cases we have to remove all data about the stream - if (!sctx->match_info.result || sctx->match_info.already_closed){ - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.build_verdict] Stream matched, removing all data about it" << endl; - #endif - sctx->clean_stream_by_id(pktinfo.sid); + if (!match_ctx.result || match_ctx.already_closed){ + sctx.clean_stream_by_id(pkt->sid); //If the packet has data, we have to remove it if (!empty_payload){ - Tins::PDU* data_layer = tcp->release_inner_pdu(); + Tins::PDU* data_layer = pkt->tcp->release_inner_pdu(); if (data_layer != nullptr){ delete data_layer; } @@ -241,106 +147,83 @@ class RegexQueue: public NfQueueExecutor { //For the first matched data or only for data packets, we set FIN bit //This only for client packets, because this will trigger server to close the connection //Packets will be filtered anyway also if client don't send packets - if ((!sctx->match_info.result || !empty_payload) && is_input){ - tcp->set_flag(Tins::TCP::FIN,1); - tcp->set_flag(Tins::TCP::ACK,1); - tcp->set_flag(Tins::TCP::SYN,0); + if ((!match_ctx.result || !empty_payload) && pkt->is_input){ + pkt->tcp->set_flag(Tins::TCP::FIN,1); + pkt->tcp->set_flag(Tins::TCP::ACK,1); + pkt->tcp->set_flag(Tins::TCP::SYN,0); } //Send the edited packet to the kernel - nfq_nlmsg_verdict_put_pkt(nlh_verdict, packet.serialize().data(), packet.size()); + return pkt->mangle(); } } - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); + return pkt->accept(); }else{ - Tins::UDP* udp = packet.template find_pdu(); - if (!udp){ + if (!pkt->udp){ throw invalid_argument("Only TCP and UDP are supported"); } - Tins::PDU* application_layer = udp->inner_pdu(); - u_int16_t payload_size = 0; - if (application_layer != nullptr){ - payload_size = application_layer->size(); - } - if((udp->inner_pdu() == nullptr)){ - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); - } - packet_info pktinfo{ - payload: string(payload+plen - payload_size, payload+plen), - sid: stream_id::make_identifier(packet), - is_input: is_input, - is_tcp: false, - is_ipv6: is_ipv6, - sctx: sctx, - packet_pdu: &packet, - layer4_pdu: udp, - }; - if (filter_action(pktinfo)){ - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); + if(empty_payload){ + return pkt->accept(); + }else if (filter_action(pkt)){ + return pkt->accept(); }else{ - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_DROP ); + return pkt->drop(); } } } - - static int queue_cb(const nlmsghdr *nlh, const mnl_socket* nl, void *data_ptr) { - - stream_ctx* sctx = (stream_ctx*)data_ptr; - - //Extract attributes from the nlmsghdr - nlattr *attr[NFQA_MAX+1] = {}; - - if (nfq_nlmsg_parse(nlh, attr) < 0) { - perror("problems parsing"); - return MNL_CB_ERROR; - } - if (attr[NFQA_PACKET_HDR] == nullptr) { - fputs("metaheader not set\n", stderr); - return MNL_CB_ERROR; - } - if (attr[NFQA_MARK] == nullptr) { - fputs("mark not set\n", stderr); - return MNL_CB_ERROR; - } - //Get Payload - uint16_t plen = mnl_attr_get_payload_len(attr[NFQA_PAYLOAD]); - uint8_t *payload = (uint8_t *)mnl_attr_get_payload(attr[NFQA_PAYLOAD]); - - //Return result to the kernel - struct nfqnl_msg_packet_hdr *ph = (nfqnl_msg_packet_hdr*) mnl_attr_get_payload(attr[NFQA_PACKET_HDR]); - struct nfgenmsg *nfg = (nfgenmsg *)mnl_nlmsg_get_payload(nlh); - char buf[MNL_SOCKET_BUFFER_SIZE]; - struct nlmsghdr *nlh_verdict; - - nlh_verdict = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, ntohs(nfg->res_id)); - - bool is_input = ntohl(mnl_attr_get_u32(attr[NFQA_MARK])) & 0x1; // == 0x1337 that is odd - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.queue_cb] Packet received" << endl; - cerr << "[DEBUG] [NetfilterQueue.queue_cb] Packet ID: " << ntohl(ph->packet_id) << endl; - cerr << "[DEBUG] [NetfilterQueue.queue_cb] Payload size: " << plen << endl; - cerr << "[DEBUG] [NetfilterQueue.queue_cb] Is input: " << is_input << endl; - #endif - - // Check IP protocol version - if ( (payload[0] & 0xf0) == 0x40 ){ - build_verdict(Tins::IP(payload, plen), payload, plen, nlh_verdict, ph, sctx, is_input, false); - }else{ - build_verdict(Tins::IPv6(payload, plen), payload, plen, nlh_verdict, ph, sctx, is_input, true); - } - - if (mnl_socket_sendto(nl, nlh_verdict, nlh_verdict->nlmsg_len) < 0) { - throw runtime_error( "mnl_socket_send" ); - } - - return MNL_CB_OK; + //If the stream has already been matched, drop all data, and try to close the connection + static void keep_fin_packet(RegexNfQueue* nfq){ + nfq->match_ctx.matching_has_been_called = true; + nfq->match_ctx.already_closed = true; } - RegexQueue(int queue) : NfQueueExecutor(queue, &queue_cb) {} + static void on_data_recv(Stream& stream, RegexNfQueue* nfq, string data) { + nfq->match_ctx.matching_has_been_called = true; + nfq->match_ctx.already_closed = false; + bool result = nfq->filter_action(nfq->match_ctx.pkt); + if (!result){ + nfq->sctx.clean_stream_by_id(nfq->match_ctx.pkt->sid); + stream.client_data_callback(bind(keep_fin_packet, nfq)); + stream.server_data_callback(bind(keep_fin_packet, nfq)); + } + nfq->match_ctx.result = result; + } - ~RegexQueue() { + //Input data filtering + static void on_client_data(Stream& stream, RegexNfQueue* nfq) { + on_data_recv(stream, nfq, string(stream.client_payload().begin(), stream.client_payload().end())); + } + + //Server data filtering + static void on_server_data(Stream& stream, RegexNfQueue* nfq) { + on_data_recv(stream, nfq, string(stream.server_payload().begin(), stream.server_payload().end())); + } + + // A stream was terminated. The second argument is the reason why it was terminated + static void on_stream_close(Stream& stream, RegexNfQueue* nfq) { + stream_id stream_id = stream_id::make_identifier(stream); + nfq->sctx.clean_stream_by_id(stream_id); + } + + static void on_new_stream(Stream& stream, RegexNfQueue* nfq) { + stream.auto_cleanup_payloads(true); + if (stream.is_partial_stream()) { + stream.enable_recovery_mode(10 * 1024); + } + stream.client_data_callback(bind(on_client_data, placeholders::_1, nfq)); + stream.server_data_callback(bind(on_server_data, placeholders::_1, nfq)); + stream.stream_closed_callback(bind(on_stream_close, placeholders::_1, nfq)); + } + + void before_loop() override{ + follower.new_stream_callback(bind(on_new_stream, placeholders::_1, this)); + follower.stream_termination_callback(bind(on_stream_close, placeholders::_1, this)); + } + + ~RegexNfQueue(){ sctx.clean(); } }; +}} #endif // REGEX_FILTER_CLASS_CPP \ No newline at end of file diff --git a/backend/binsrc/regex/stream_ctx.cpp b/backend/binsrc/regex/stream_ctx.cpp index 8b12e45..dc1c3fe 100644 --- a/backend/binsrc/regex/stream_ctx.cpp +++ b/backend/binsrc/regex/stream_ctx.cpp @@ -4,30 +4,19 @@ #include #include -#include #include +#include +#include +#include "regexfilter.cpp" -using Tins::TCPIP::Stream; -using Tins::TCPIP::StreamFollower; using namespace std; +namespace Firegex { +namespace Regex { + typedef Tins::TCPIP::StreamIdentifier stream_id; typedef map matching_map; -/* Considering to use unorder_map using this hash of stream_id - -namespace std { - template<> - struct hash { - size_t operator()(const stream_id& sid) const - { - return std::hash()(sid.max_address[0] + sid.max_address[1] + sid.max_address[2] + sid.max_address[3] + sid.max_address_port + sid.min_address[0] + sid.min_address[1] + sid.min_address[2] + sid.min_address[3] + sid.min_address_port); - } - }; -} - -*/ - #ifdef DEBUG ostream& operator<<(ostream& os, const Tins::TCPIP::StreamIdentifier::address_type &sid){ bool first_print = false; @@ -46,24 +35,11 @@ ostream& operator<<(ostream& os, const stream_id &sid){ } #endif - -struct packet_info; - -struct tcp_stream_tmp { - bool matching_has_been_called = false; - bool already_closed = false; - bool result; - packet_info *pkt_info; -}; - struct stream_ctx { matching_map in_hs_streams; matching_map out_hs_streams; hs_scratch_t* in_scratch = nullptr; hs_scratch_t* out_scratch = nullptr; - u_int16_t latest_config_ver = 0; - StreamFollower follower; - tcp_stream_tmp match_info; void clean_scratches(){ if (out_scratch != nullptr){ @@ -77,9 +53,6 @@ struct stream_ctx { } void clean_stream_by_id(stream_id sid){ - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.clean_stream_by_id] Cleaning stream context of " << sid << endl; - #endif auto stream_search = in_hs_streams.find(sid); hs_stream_t* stream_match; if (stream_search != in_hs_streams.end()){ @@ -103,11 +76,6 @@ struct stream_ctx { } void clean(){ - - #ifdef DEBUG - cerr << "[DEBUG] [NetfilterQueue.clean] Cleaning stream context" << endl; - #endif - if (in_scratch){ for(auto ele: in_hs_streams){ if (hs_close_stream(ele.second, in_scratch, nullptr, nullptr) != HS_SUCCESS) { @@ -131,16 +99,5 @@ struct stream_ctx { } }; -struct packet_info { - string payload; - stream_id sid; - bool is_input; - bool is_tcp; - bool is_ipv6; - stream_ctx* sctx; - Tins::PDU* packet_pdu; - Tins::PDU* layer4_pdu; -}; - - +}} #endif // STREAM_CTX_CPP \ No newline at end of file diff --git a/backend/binsrc/utils.cpp b/backend/binsrc/utils.cpp new file mode 100644 index 0000000..9539dba --- /dev/null +++ b/backend/binsrc/utils.cpp @@ -0,0 +1,64 @@ +#include +#include +#include +#include + +#ifndef UTILS_CPP +#define UTILS_CPP + +bool unhexlify(std::string const &hex, std::string &newString) { + try{ + int len = hex.length(); + for(int i=0; i< len; i+=2) + { + std::string byte = hex.substr(i,2); + char chr = (char) (int)strtol(byte.c_str(), nullptr, 16); + newString.push_back(chr); + } + return true; + } + catch (...){ + return false; + } +} + +template //same of kernel nfqueue max +class BlockingQueue +{ +private: + std::mutex mut; + std::queue private_std_queue; + std::condition_variable condNotEmpty; + std::condition_variable condNotFull; + size_t count; // Guard with Mutex +public: + void put(T new_value) + { + + std::unique_lock lk(mut); + //Condition takes a unique_lock and waits given the false condition + condNotFull.wait(lk,[this]{ + if (count == MAX) { + return false; + }else{ + return true; + } + + }); + private_std_queue.push(new_value); + count++; + condNotEmpty.notify_one(); + } + void take(T& value) + { + std::unique_lock lk(mut); + //Condition takes a unique_lock and waits given the false condition + condNotEmpty.wait(lk,[this]{return !private_std_queue.empty();}); + value=private_std_queue.front(); + private_std_queue.pop(); + count--; + condNotFull.notify_one(); + } +}; + +#endif // UTILS_CPP \ No newline at end of file diff --git a/backend/binsrc/utils.hpp b/backend/binsrc/utils.hpp deleted file mode 100644 index b61ef22..0000000 --- a/backend/binsrc/utils.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#include -#include - -#ifndef UTILS_HPP -#define UTILS_HPP - -bool unhexlify(std::string const &hex, std::string &newString) { - try{ - int len = hex.length(); - for(int i=0; i< len; i+=2) - { - std::string byte = hex.substr(i,2); - char chr = (char) (int)strtol(byte.c_str(), nullptr, 16); - newString.push_back(chr); - } - return true; - } - catch (...){ - return false; - } -} - -#endif \ No newline at end of file diff --git a/backend/modules/nfproxy/firegex.py b/backend/modules/nfproxy/firegex.py index 0ccf94f..37055c3 100644 --- a/backend/modules/nfproxy/firegex.py +++ b/backend/modules/nfproxy/firegex.py @@ -57,9 +57,9 @@ class FiregexInterceptor: self.process.kill() raise Exception("Invalid binary output") line = line_fut.decode() - if line.startswith("QUEUES "): + if line.startswith("QUEUE "): params = line.split() - return (int(params[1]), int(params[2])) + return (int(params[1]), int(params[1])) else: self.process.kill() raise Exception("Invalid binary output") diff --git a/backend/modules/nfproxy/nftables.py b/backend/modules/nfproxy/nftables.py index 588ac1e..eafa129 100644 --- a/backend/modules/nfproxy/nftables.py +++ b/backend/modules/nfproxy/nftables.py @@ -62,6 +62,7 @@ class FiregexTables(NFTableManager): "expr": [ {'match': {'left': {'payload': {'protocol': ip_family(srv.ip_int), 'field': 'saddr'}}, 'op': '==', 'right': nftables_int_to_json(srv.ip_int)}}, {'match': {"left": { "payload": {"protocol": str(srv.proto), "field": "sport"}}, "op": "==", "right": int(srv.port)}}, + {"mangle": {"key": {"meta": {"key": "mark"}},"value": 0x1338}}, {"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}} ] }}}, @@ -72,6 +73,7 @@ class FiregexTables(NFTableManager): "expr": [ {'match': {'left': {'payload': {'protocol': ip_family(srv.ip_int), 'field': 'daddr'}}, 'op': '==', 'right': nftables_int_to_json(srv.ip_int)}}, {'match': {"left": { "payload": {"protocol": str(srv.proto), "field": "dport"}}, "op": "==", "right": int(srv.port)}}, + {"mangle": {"key": {"meta": {"key": "mark"}},"value": 0x1337}}, {"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}} ] }}} diff --git a/backend/modules/nfregex/firegex.py b/backend/modules/nfregex/firegex.py index 71afcf8..f74dca7 100644 --- a/backend/modules/nfregex/firegex.py +++ b/backend/modules/nfregex/firegex.py @@ -84,7 +84,7 @@ class FiregexInterceptor: return self async def _start_binary(self): - proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"../cppqueue") + proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"../cppregex") self.process = await asyncio.create_subprocess_exec( proxy_binary_path, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, @@ -97,9 +97,9 @@ class FiregexInterceptor: self.process.kill() raise Exception("Invalid binary output") line = line_fut.decode() - if line.startswith("QUEUES "): + if line.startswith("QUEUE "): params = line.split() - return (int(params[1]), int(params[2])) + return (int(params[1]), int(params[1])) else: self.process.kill() raise Exception("Invalid binary output") diff --git a/backend/modules/nfregex/nftables.py b/backend/modules/nfregex/nftables.py index ce0088a..34ed844 100644 --- a/backend/modules/nfregex/nftables.py +++ b/backend/modules/nfregex/nftables.py @@ -48,10 +48,12 @@ class FiregexTables(NFTableManager): def add(self, srv:Service, queue_range): for ele in self.get(): - if ele.__eq__(srv): return + if ele.__eq__(srv): + return init, end = queue_range - if init > end: init, end = end, init + if init > end: + init, end = end, init self.cmd( { "insert":{ "rule": { "family": "inet", diff --git a/tests/api_test.py b/tests/api_test.py index ccc3097..c4ece76 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -15,7 +15,7 @@ puts(f"{args.address}", color=colors.yellow) firegex = FiregexAPI(args.address) #Connect to Firegex -if firegex.status()["status"] =="init": +if firegex.status()["status"] == "init": if (firegex.set_password(args.password)): puts(f"Sucessfully set password to {args.password} ✔", color=colors.green) else: diff --git a/tests/benchmark.py b/tests/benchmark.py index 62da986..06044c1 100644 --- a/tests/benchmark.py +++ b/tests/benchmark.py @@ -117,4 +117,4 @@ else: puts("Test Failed: Couldn't delete service ✗", color=colors.red) exit(1) -server.terminate() \ No newline at end of file +server.terminate()