From 5a1e1bc2251598b20dc38d9af2b7cdb48d0a4b1e Mon Sep 17 00:00:00 2001 From: Domingo Dirutigliano Date: Mon, 17 Feb 2025 13:07:06 +0100 Subject: [PATCH] less copy and less lock --- backend/binsrc/classes/netfilter.cpp | 2 +- backend/binsrc/classes/nfqueue.cpp | 122 ++++++++++++------------- backend/binsrc/nfregex.cpp | 9 ++ backend/binsrc/proxytun/stream_ctx.cpp | 1 + backend/binsrc/regex/regexfilter.cpp | 6 +- backend/binsrc/utils.cpp | 33 +++++++ 6 files changed, 106 insertions(+), 67 deletions(-) diff --git a/backend/binsrc/classes/netfilter.cpp b/backend/binsrc/classes/netfilter.cpp index 794a76e..5705c5a 100644 --- a/backend/binsrc/classes/netfilter.cpp +++ b/backend/binsrc/classes/netfilter.cpp @@ -24,8 +24,8 @@ public: void loop() { static_cast(this)->before_loop(); + PktRequest* pkt; for(;;) { - PktRequest* pkt; queue.take(pkt); static_cast(this)->handle_next_packet(pkt); delete pkt; diff --git a/backend/binsrc/classes/nfqueue.cpp b/backend/binsrc/classes/nfqueue.cpp index a9af147..f794ed7 100644 --- a/backend/binsrc/classes/nfqueue.cpp +++ b/backend/binsrc/classes/nfqueue.cpp @@ -23,22 +23,21 @@ class PktRequest { private: FilterAction action = FilterAction::NOACTION; mnl_socket* nl = nullptr; - nfgenmsg * nfg = nullptr; - nfqnl_msg_packet_hdr *ph; - shared_ptr packet_buffer; // Will be deallocated here - size_t data_size = 0; + uint16_t res_id; + uint32_t packet_id; public: - const bool is_ipv6; + bool is_ipv6; Tins::IP* ipv4 = nullptr; Tins::IPv6* ipv6 = nullptr; Tins::TCP* tcp = nullptr; Tins::UDP* udp = nullptr; - const L4Proto l4_proto; - const bool is_input; - - const string packet; - const string data; - const stream_id sid; + L4Proto l4_proto; + bool is_input; + + string packet; + char* data; + size_t data_size; + stream_id sid; T* ctx; @@ -89,14 +88,22 @@ class PktRequest { public: - PktRequest(shared_ptr buf, Tins::IP* ipv4, const char* payload, size_t plen, stream_id sid, T* ctx, mnl_socket* nl, nfgenmsg *nfg, nfqnl_msg_packet_hdr *ph, bool is_input): - is_ipv6(false), ipv4(ipv4), packet(string(payload, plen)), sid(sid), ctx(ctx), nl(nl), nfg(nfg), ph(ph), - is_input(is_input), packet_buffer(buf), l4_proto(fill_l4_info()), data(string(payload+(plen-data_size), data_size)) {} - - PktRequest(shared_ptr buf, Tins::IPv6* ipv6, const char* payload, size_t plen, stream_id sid, T* ctx, mnl_socket* nl, nfgenmsg *nfg, nfqnl_msg_packet_hdr *ph, bool is_input): - is_ipv6(true), ipv6(ipv6), packet(string(payload, plen)), sid(sid), ctx(ctx), nl(nl), nfg(nfg), ph(ph), - is_input(is_input), packet_buffer(buf), l4_proto(fill_l4_info()), data(string(payload+(plen-data_size), data_size)) {} - + PktRequest(const char* payload, size_t plen, T* ctx, mnl_socket* nl, nfgenmsg *nfg, nfqnl_msg_packet_hdr *ph, bool is_input): + ctx(ctx), nl(nl), res_id(nfg->res_id), + packet_id(ph->packet_id), is_input(is_input), + packet(string(payload, plen)), + is_ipv6((payload[0] & 0xf0) == 0x60){ + if (is_ipv6){ + ipv6 = new Tins::IPv6((uint8_t*)packet.c_str(), plen); + sid = stream_id::make_identifier(*ipv6); + }else{ + ipv4 = new Tins::IP((uint8_t*)packet.c_str(), plen); + sid = stream_id::make_identifier(*ipv4); + } + l4_proto = fill_l4_info(); + data = packet.data()+(plen-data_size); + } + void drop(){ if (action == FilterAction::NOACTION){ action = FilterAction::DROP; @@ -129,26 +136,21 @@ class PktRequest { } ~PktRequest(){ - if (ipv4 != nullptr){ - delete ipv4; - } - if (ipv6 != nullptr){ - delete ipv6; - } + delete ipv4; + delete ipv6; } private: void perfrom_action(){ char buf[MNL_SOCKET_BUFFER_SIZE]; - struct nlmsghdr *nlh_verdict; - nlh_verdict = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, ntohs(nfg->res_id)); + struct nlmsghdr *nlh_verdict = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, ntohs(res_id)); switch (action) { case FilterAction::ACCEPT: - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); + nfq_nlmsg_verdict_put(nlh_verdict, ntohl(packet_id), NF_ACCEPT ); break; case FilterAction::DROP: - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_DROP ); + nfq_nlmsg_verdict_put(nlh_verdict, ntohl(packet_id), NF_DROP ); break; case FilterAction::MANGLE:{ if (is_ipv6){ @@ -156,7 +158,7 @@ class PktRequest { }else{ nfq_nlmsg_verdict_put_pkt(nlh_verdict, ipv4->serialize().data(), ipv4->size()); } - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); + nfq_nlmsg_verdict_put(nlh_verdict, ntohl(packet_id), NF_ACCEPT ); break; } default: @@ -172,7 +174,6 @@ class PktRequest { struct internal_nfqueue_execution_data_tmp{ mnl_socket* nl = nullptr; void *data = nullptr; - shared_ptr packet_buffer; }; const size_t NFQUEUE_BUFFER_SIZE = 0xffff + (MNL_SOCKET_BUFFER_SIZE/2); @@ -185,10 +186,11 @@ class NfQueue { mnl_socket* nl = nullptr; unsigned int portid; public: - + char* queue_msg_buffer = nullptr; const uint16_t queue_num; NfQueue(u_int16_t queue_num): queue_num(queue_num) { + queue_msg_buffer = new char[NFQUEUE_BUFFER_SIZE]; nl = mnl_socket_open(NETLINK_NETFILTER); if (nl == nullptr) { throw runtime_error( "mnl_socket_open" );} @@ -199,18 +201,16 @@ class NfQueue { } portid = mnl_socket_get_portid(nl); - char queue_msg_buffer[NFQUEUE_BUFFER_SIZE]; - - if (_send_config_cmd(NFQNL_CFG_CMD_BIND, queue_msg_buffer) < 0) { + if (_send_config_cmd(NFQNL_CFG_CMD_BIND) < 0) { _clear(); throw runtime_error( "mnl_socket_send" ); } //TEST if BIND was successful - if (_send_config_cmd(NFQNL_CFG_CMD_NONE, queue_msg_buffer) < 0) { // SEND A NONE command to generate an error meessage + if (_send_config_cmd(NFQNL_CFG_CMD_NONE) < 0) { // SEND A NONE command to generate an error meessage _clear(); throw runtime_error( "mnl_socket_send" ); } - if (_recv_packet(queue_msg_buffer) == -1) { //RECV the error message + if (_recv_packet() == -1) { //RECV the error message _clear(); throw runtime_error( "mnl_socket_recvfrom" ); } @@ -237,9 +237,18 @@ class NfQueue { nlh = nfq_nlmsg_put(queue_msg_buffer, NFQNL_MSG_CONFIG, queue_num); nfq_nlmsg_cfg_put_params(nlh, NFQNL_COPY_PACKET, 0xffff); + #ifdef NFQUEUE_FAIL_OPEN + + mnl_attr_put_u32(nlh, NFQA_CFG_FLAGS, htonl(NFQA_CFG_F_GSO|NFQA_CFG_F_FAIL_OPEN)); + mnl_attr_put_u32(nlh, NFQA_CFG_MASK, htonl(NFQA_CFG_F_GSO|NFQA_CFG_F_FAIL_OPEN)); + + #else + mnl_attr_put_u32(nlh, NFQA_CFG_FLAGS, htonl(NFQA_CFG_F_GSO)); mnl_attr_put_u32(nlh, NFQA_CFG_MASK, htonl(NFQA_CFG_F_GSO)); + #endif + if (mnl_socket_sendto(nl, nlh, nlh->nlmsg_len) < 0) { _clear(); throw runtime_error( "mnl_socket_send" ); @@ -256,18 +265,16 @@ class NfQueue { } void handle_next_packet(D* data){ - auto queue_msg_buffer = make_shared(NFQUEUE_BUFFER_SIZE); - int ret = _recv_packet(queue_msg_buffer.get(), NFQUEUE_BUFFER_SIZE); + int ret = _recv_packet(); if (ret == -1) { throw runtime_error( "mnl_socket_recvfrom" ); } internal_nfqueue_execution_data_tmp raw_ptr = { nl: nl, - data: data, - packet_buffer: queue_msg_buffer + data: data }; - ret = mnl_cb_run(queue_msg_buffer.get(), ret, 0, portid, _real_queue_cb, &raw_ptr); + ret = mnl_cb_run(queue_msg_buffer, ret, 0, portid, _real_queue_cb, &raw_ptr); if (ret <= 0){ cerr << "[error] [NfQueue.handle_next_packet] mnl_cb_run error with: " << ret << endl; throw runtime_error( "mnl_cb_run error!" ); @@ -275,22 +282,12 @@ class NfQueue { } ~NfQueue() { - char queue_msg_buffer[NFQUEUE_BUFFER_SIZE]; - _send_config_cmd(NFQNL_CFG_CMD_UNBIND, queue_msg_buffer); + _send_config_cmd(NFQNL_CFG_CMD_UNBIND); _clear(); } private: - template>> - static void inline _send_verdict(shared_ptr raw_buf, T* packet, char *payload, uint16_t plen, nfgenmsg *nfg, nfqnl_msg_packet_hdr *ph, internal_nfqueue_execution_data_tmp* ctx, bool is_input){ - handle_func(new PktRequest( - raw_buf, packet, payload, plen, - stream_id::make_identifier(*packet), - (D*)ctx->data, ctx->nl, nfg, ph, is_input - )); - } - static int _real_queue_cb(const nlmsghdr *nlh, void *data_ptr) { internal_nfqueue_execution_data_tmp* info = (internal_nfqueue_execution_data_tmp*) data_ptr; @@ -320,12 +317,10 @@ class NfQueue { struct nfgenmsg *nfg = (nfgenmsg *)mnl_nlmsg_get_payload(nlh); bool is_input = ntohl(mnl_attr_get_u32(attr[NFQA_MARK])) & 0x1; // == 0x1337 that is odd - // Check IP protocol version - if ( (payload[0] & 0xf0) == 0x40 ){ - _send_verdict(info->packet_buffer, new Tins::IP((uint8_t*)payload, plen), payload, plen, nfg, ph, info, is_input); - }else{ - _send_verdict(info->packet_buffer, new Tins::IPv6((uint8_t*)payload, plen), payload, plen, nfg, ph, info, is_input); - } + handle_func(new PktRequest( + payload, plen, (D*)info->data, info->nl, nfg, ph, is_input + )); + return MNL_CB_OK; } @@ -334,16 +329,17 @@ class NfQueue { mnl_socket_close(nl); nl = nullptr; } + delete[] queue_msg_buffer; } - inline ssize_t _send_config_cmd(nfqnl_msg_config_cmds cmd, char* buf){ - struct nlmsghdr *nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, queue_num); + inline ssize_t _send_config_cmd(nfqnl_msg_config_cmds cmd){ + struct nlmsghdr *nlh = nfq_nlmsg_put(queue_msg_buffer, NFQNL_MSG_CONFIG, queue_num); nfq_nlmsg_cfg_put_cmd(nlh, AF_INET, cmd); return mnl_socket_sendto(nl, nlh, nlh->nlmsg_len); } - inline ssize_t _recv_packet(char* buf, size_t buf_size = NFQUEUE_BUFFER_SIZE){ - return mnl_socket_recvfrom(nl, buf, buf_size); + inline ssize_t _recv_packet(){ + return mnl_socket_recvfrom(nl, queue_msg_buffer, NFQUEUE_BUFFER_SIZE); } }; diff --git a/backend/binsrc/nfregex.cpp b/backend/binsrc/nfregex.cpp index 45493b8..1b95ef8 100644 --- a/backend/binsrc/nfregex.cpp +++ b/backend/binsrc/nfregex.cpp @@ -8,6 +8,15 @@ using namespace std; using namespace Firegex::Regex; using Firegex::NfQueue::MultiThreadQueue; +/* + +Compile options: +NFQUEUE_FAIL_OPEN - enable fail-open option of nfqueueß +--- +USE_PIPES_FOR_BLOKING_QUEUE - use pipes instead of conditional variable, queue and mutex for blocking queue +*/ + + void config_updater (){ string line; while (true){ diff --git a/backend/binsrc/proxytun/stream_ctx.cpp b/backend/binsrc/proxytun/stream_ctx.cpp index a1f2db0..3057ac8 100644 --- a/backend/binsrc/proxytun/stream_ctx.cpp +++ b/backend/binsrc/proxytun/stream_ctx.cpp @@ -4,6 +4,7 @@ #include #include +#include using namespace std; diff --git a/backend/binsrc/regex/regexfilter.cpp b/backend/binsrc/regex/regexfilter.cpp index 63af811..0ea15d2 100644 --- a/backend/binsrc/regex/regexfilter.cpp +++ b/backend/binsrc/regex/regexfilter.cpp @@ -91,12 +91,12 @@ public: stream_match = stream_search->second; } err = hs_scan_stream( - stream_match,pkt->data.c_str(), pkt->data.size(), + stream_match,pkt->data, pkt->data_size, 0, scratch_space, match_func, &match_res ); }else{ err = hs_scan( - regex_matcher,pkt->data.c_str(), pkt->data.size(), + regex_matcher,pkt->data, pkt->data_size, 0, scratch_space, match_func, &match_res ); } @@ -120,7 +120,7 @@ public: } void handle_next_packet(NfQueue::PktRequest* pkt) override{ - bool empty_payload = pkt->data.size() == 0; + bool empty_payload = pkt->data_size == 0; if (pkt->tcp){ match_ctx.matching_has_been_called = false; match_ctx.pkt = pkt; diff --git a/backend/binsrc/utils.cpp b/backend/binsrc/utils.cpp index 9539dba..a4d889a 100644 --- a/backend/binsrc/utils.cpp +++ b/backend/binsrc/utils.cpp @@ -22,6 +22,36 @@ bool unhexlify(std::string const &hex, std::string &newString) { } } +#ifdef USE_PIPES_FOR_BLOKING_QUEUE + +template +class BlockingQueue +{ +private: + int pipefd[2]; +public: + BlockingQueue(){ + if (pipe(pipefd) == -1) { + throw std::runtime_error("pipe"); + } + } + + void put(T new_value) + { + if (write(pipefd[1], &new_value, sizeof(T)) == -1) { + throw std::runtime_error("write"); + } + } + void take(T& value) + { + if (read(pipefd[0], &value, sizeof(T)) == -1) { + throw std::runtime_error("read"); + } + } +}; + +#else + template //same of kernel nfqueue max class BlockingQueue { @@ -32,6 +62,7 @@ private: std::condition_variable condNotFull; size_t count; // Guard with Mutex public: + void put(T new_value) { @@ -61,4 +92,6 @@ public: } }; +#endif + #endif // UTILS_CPP \ No newline at end of file