diff --git a/backend/binsrc/nfqueue_regex/Cargo.lock b/backend/binsrc/nfqueue_regex/Cargo.lock index e6a7a63..11e8d94 100644 --- a/backend/binsrc/nfqueue_regex/Cargo.lock +++ b/backend/binsrc/nfqueue_regex/Cargo.lock @@ -3,238 +3,22 @@ version = 3 [[package]] -name = "aho-corasick" -version = "1.0.2" +name = "atomic_refcell" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" -dependencies = [ - "memchr", -] - -[[package]] -name = "anyhow" -version = "1.0.71" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" - -[[package]] -name = "bindgen" -version = "0.65.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" -dependencies = [ - "bitflags", - "cexpr", - "clang-sys", - "lazy_static", - "lazycell", - "log", - "peeking_take_while", - "prettyplease", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", - "syn 2.0.18", - "which", -] - -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "cargo-emit" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1582e1c9e755dd6ad6b224dcffb135d199399a4568d454bd89fe515ca8425695" - -[[package]] -name = "cexpr" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" -dependencies = [ - "nom", -] - -[[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "clang-sys" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c688fc74432808e3eb684cae8830a86be1d66a2bd58e1f248ed0960a590baf6f" -dependencies = [ - "glob", - "libc", - "libloading", -] - -[[package]] -name = "convert_case" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" - -[[package]] -name = "derive_more" -version = "0.99.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" -dependencies = [ - "convert_case", - "proc-macro2", - "quote", - "rustc_version", - "syn 1.0.109", -] - -[[package]] -name = "either" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" - -[[package]] -name = "foreign-types" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" -dependencies = [ - "foreign-types-macros", - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-macros" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.18", -] - -[[package]] -name = "foreign-types-shared" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" - -[[package]] -name = "glob" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" - -[[package]] -name = "hyperscan" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e76d5de507383407feda461def7d9019b8252bd00ce841de6b35b2d1c70a6c4e" -dependencies = [ - "anyhow", - "bitflags", - "cfg-if", - "derive_more", - "foreign-types", - "hyperscan-sys", - "libc", - "malloc_buf", - "rustc_version", - "semver", - "thiserror", -] - -[[package]] -name = "hyperscan-sys" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "011ea0ffa36b04c35f09780c3d963377321c1dae759f2bfda696deaa165d283e" -dependencies = [ - "anyhow", - "cargo-emit", - "libc", - "pkg-config", -] - -[[package]] -name = "lazy_static" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" - -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" [[package]] name = "libc" -version = "0.2.145" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc86cde3ff845662b8f4ef6cb50ea0e20c524eb3d29ae048287e06a1b3fa6a81" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] -name = "libloading" -version = "0.7.4" +name = "nfq" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" -dependencies = [ - "cfg-if", - "winapi", -] - -[[package]] -name = "libtins-rs" -version = "0.1.0" -source = "git+https://github.com/Pwnzer0tt1/libtins-rs.git#64807cb78366602542877a17e56cd9ee50180ed1" -dependencies = [ - "bindgen", -] - -[[package]] -name = "log" -version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" - -[[package]] -name = "malloc_buf" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f32c8c0575eee8637bf462087c00098fe16d6cb621f1abb6ebab4da414d57fd" -dependencies = [ - "libc", -] - -[[package]] -name = "memchr" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" - -[[package]] -name = "minimal-lexical" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" - -[[package]] -name = "nfqueue" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8684696e66ae4f74e6b2fe8868bf317bc4cd0e0e53aabddf7c40dad930111bca" +checksum = "b9c8f4c88952507d9df9400a6a2e48640fb460e21dcb2b4716eb3ff156d6db9e" dependencies = [ "libc", ] @@ -243,200 +27,6 @@ dependencies = [ name = "nfqueue_regex" version = "0.1.0" dependencies = [ - "hyperscan", - "libtins-rs", - "nfqueue", + "atomic_refcell", + "nfq", ] - -[[package]] -name = "nom" -version = "7.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" -dependencies = [ - "memchr", - "minimal-lexical", -] - -[[package]] -name = "once_cell" -version = "1.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" - -[[package]] -name = "peeking_take_while" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" - -[[package]] -name = "pkg-config" -version = "0.3.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" - -[[package]] -name = "prettyplease" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9825a04601d60621feed79c4e6b56d65db77cdca55cef43b46b0de1096d1c282" -dependencies = [ - "proc-macro2", - "syn 2.0.18", -] - -[[package]] -name = "proc-macro2" -version = "1.0.59" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "quote" -version = "1.0.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "regex" -version = "1.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" -dependencies = [ - "aho-corasick", - "memchr", - "regex-automata", - "regex-syntax", -] - -[[package]] -name = "regex-automata" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9aaecc05d5c4b5f7da074b9a0d1a0867e71fd36e7fc0482d8bcfe8e8fc56290" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab07dc67230e4a4718e70fd5c20055a4334b121f1f9db8fe63ef39ce9b8c846" - -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - -[[package]] -name = "rustc_version" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" -dependencies = [ - "semver", -] - -[[package]] -name = "semver" -version = "1.0.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" - -[[package]] -name = "shlex" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" - -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "syn" -version = "2.0.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "thiserror" -version = "1.0.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" -dependencies = [ - "thiserror-impl", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.18", -] - -[[package]] -name = "unicode-ident" -version = "1.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" - -[[package]] -name = "which" -version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269" -dependencies = [ - "either", - "libc", - "once_cell", -] - -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/backend/binsrc/nfqueue_regex/Cargo.toml b/backend/binsrc/nfqueue_regex/Cargo.toml index 8ce161f..b9b0e0a 100644 --- a/backend/binsrc/nfqueue_regex/Cargo.toml +++ b/backend/binsrc/nfqueue_regex/Cargo.toml @@ -6,6 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -hyperscan = "0.3.2" -nfqueue = "0.9.1" -libtins-rs = { git = "https://github.com/Pwnzer0tt1/libtins-rs.git" } +atomic_refcell = "0.1.13" +nfq = "0.2.5" +#hyperscan = "0.3.2" diff --git a/backend/binsrc/nfqueue_regex/src/main.rs b/backend/binsrc/nfqueue_regex/src/main.rs index d12f973..639dfd5 100644 --- a/backend/binsrc/nfqueue_regex/src/main.rs +++ b/backend/binsrc/nfqueue_regex/src/main.rs @@ -1,927 +1,150 @@ +use atomic_refcell::AtomicRefCell; +use nfq::{Queue, Verdict}; +use std::cell::{Cell, RefCell}; use std::env; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicPtr, AtomicU32}; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::Arc; +use std::thread::{self, sleep, sleep_ms, JoinHandle}; -use std::collections::HashMap; - -#[macro_use] -extern crate hyperscan; - -use hyperscan::prelude::*; - -#[derive(Hash, Eq, PartialEq, Debug)] -struct ConnectionFlux { - src_ip: String, - src_port: i16, - dst_ip: String, - dst_port: i16, +enum WorkerMessage { + Error(String), + Dropped(usize), } -impl ConnectionFlux{ - fn new(src_ip: &str, src_port: i16, dst_ip: &str, dst_port: i16) -> ConnectionFlux { - ConnectionFlux { src_ip: src_ip.to_string(), src_port, dst_ip: dst_ip.to_string(), dst_port} - } -} - - -fn main() { - let str_of_threads = env::var("NTHREADS").unwrap_or("1".to_string()); - let mut n_of_threads = str_of_threads.parse::().unwrap_or(1); - if n_of_threads <= 0 { - n_of_threads = 1; - } - - let _connections = HashMap::from([ - (ConnectionFlux::new("127.0.0.1", 1337, "127.0.0.1", 1337), 25), - ]); - - eprintln!("[info][main] Using {} threads", n_of_threads) - -} - -// Hyperscan example program 2: pcapscan - -use std::collections::HashMap; -use std::fs; -use std::io; -use std::iter; -use std::net::SocketAddrV4; -use std::path::{Path, PathBuf}; -use std::process::exit; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{Duration, Instant}; - -use anyhow::{Context, Result}; -use byteorder::{BigEndian, ReadBytesExt}; -use pnet::packet::{ - ethernet::{EtherTypes, EthernetPacket}, - ip::IpNextHeaderProtocols, - ipv4::Ipv4Packet, - udp::UdpPacket, - Packet, PrimitiveValues, -}; -use structopt::StructOpt; - -use hyperscan::prelude::*; - -/** - * This function will read in the file with the specified name, with an - * expression per line, ignoring lines starting with '#' and build a Hyperscan - * database for it. - */ -fn init_db>(path: P) -> Result<(StreamingDatabase)> { - // do the actual file reading and string handling - let patterns: Patterns = fs::read_to_string(path)?.parse()?; - - println!("Compiling Hyperscan databases with {} patterns.", patterns.len()); - - Ok((build_database(&patterns)?)) -} - -fn build_database, T: Mode>(builder: &B) -> Result> { - let now = Instant::now(); - - let db = builder.build::()?; - - println!( - "compile `{}` mode database in {} ms", - T::NAME, - now.elapsed().as_millis() - ); - - Ok(db) -} - -// Key for identifying a stream in our pcap input data, using data from its IP -// headers. -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -struct Session { - src: SocketAddrV4, - dst: SocketAddrV4, -} - -impl Session { - fn new(ipv4: &Ipv4Packet) -> Session { - let mut c = io::Cursor::new(ipv4.payload()); - let src_port = c.read_u16::().unwrap(); - let dst_port = c.read_u16::().unwrap(); - - Session { - src: SocketAddrV4::new(ipv4.get_source(), src_port), - dst: SocketAddrV4::new(ipv4.get_destination(), dst_port), +impl ToString for WorkerMessage { + fn to_string(&self) -> String { + match self { + WorkerMessage::Error(e) => format!("E{}", e), + WorkerMessage::Dropped(d) => format!("D{}", d), } } } - -const IP_FLAG_MF: u8 = 1; - -struct Benchmark { - /// Map used to construct stream_ids - sessions: HashMap>, - - /// Hyperscan compiled database (streaming mode) - streaming_db: StreamingDatabase, - - /// Hyperscan temporary scratch space (used in both modes) - scratch: Scratch, - - // Count of matches found during scanning - match_count: AtomicUsize, +struct Pool { + _workers: Vec, + pub start: u16, + pub end: u16, } -impl Benchmark { - fn new(streaming_db: StreamingDatabase) -> Result { - let mut s = streaming_db.alloc_scratch()?; - - block_db.realloc_scratch(&mut s)?; - - Ok(Benchmark { - sessions: HashMap::new(), - streaming_db: streaming_db, - scratch: s, - match_count: AtomicUsize::new(0), - }) - } - - fn decode_packet(packet: &pcap::Packet) -> Option<(Session, Vec)> { - let ether = EthernetPacket::new(&packet.data).unwrap(); - - if ether.get_ethertype() != EtherTypes::Ipv4 { - return None; - } - - let ipv4 = Ipv4Packet::new(ðer.payload()).unwrap(); - - if ipv4.get_version() != 4 { - return None; - } - - if (ipv4.get_flags() & IP_FLAG_MF) == IP_FLAG_MF || ipv4.get_fragment_offset() != 0 { - return None; - } - - match ipv4.get_next_level_protocol() { - IpNextHeaderProtocols::Tcp => { - let payload = ipv4.payload(); - let data_off = ((payload[12] >> 4) * 4) as usize; - - Some((Session::new(&ipv4), Vec::from(&payload[data_off..]))) - } - - IpNextHeaderProtocols::Udp => { - let udp = UdpPacket::new(&ipv4.payload()).unwrap(); - - Some((Session::new(&ipv4), Vec::from(udp.payload()))) - } - _ => None, - } - } - - fn read_streams>(&mut self, path: P) -> Result<(), pcap::Error> { - let mut capture = pcap::Capture::from_file(path)?; - - while let Ok(ref packet) = capture.next_packet() { - if let Some((key, payload)) = Self::decode_packet(&packet) { - if payload.len() > 0 { - let stream_id = match self.sessions.get(&key) { - Some(&id) => id, - None => { - let id = self.sessions.len(); - - assert!(self.sessions.insert(key, id).is_none()); - - id - } - }; - - self.stream_ids.push(stream_id); - self.packets.push(Box::new(payload)); - } +const QUEUE_BASE_NUM: u16 = 1000; +impl Pool { + fn new(threads: u16, tx: Sender, db: RefCell<&str>) -> Self { + // Find free queues + let mut start = QUEUE_BASE_NUM; + let mut queues: Vec<(Queue, u16)> = vec![]; + while queues.len() != threads.into() { + for queue_num in + (start..start.checked_add(threads + 1).expect("No more queues left")).rev() + { + let mut queue = Queue::open().unwrap(); + if queue.bind(queue_num).is_err() { + start = queue_num; + while let Some((mut q, num)) = queues.pop() { + let _ = q.unbind(num); + } + break; + }; + queues.push((queue, queue_num)); } } - println!( - "read {} packets in {} sessions", - self.packets.len(), - self.stream_ids.len(), - ); - - Ok(()) - } - - // Return the number of bytes scanned - fn bytes(&self) -> usize { - self.packets.iter().fold(0, |bytes, p| bytes + p.len()) - } - - // Return the number of matches found. - fn matches(&self) -> usize { - self.match_count.load(Ordering::Relaxed) - } - - // Clear the number of matches found. - fn clear_matches(&mut self) { - self.match_count.store(0, Ordering::Relaxed); - } - - // Open a Hyperscan stream for each stream in stream_ids - fn open_streams(&mut self) -> Result<()> { - self.streams = iter::repeat_with(|| self.streaming_db.open_stream()) - .take(self.sessions.len()) - .collect::>>()?; - - Ok(()) - } - - // Close all open Hyperscan streams (potentially generating any end-anchored matches) - fn close_streams(&mut self) -> Result<()> { - for stream in self.streams.drain(..) { - let match_count = &self.match_count; - stream - .close(&self.scratch, |_, _, _, _| { - match_count.fetch_add(1, Ordering::Relaxed); - - Matching::Continue - }) - .with_context(|| "close stream")?; + Pool { + _workers: queues + .into_iter() + .map(|(queue, queue_num)| Worker::new(queue, queue_num, tx.clone())) + .collect(), + start, + end: (start + threads), } - - Ok(()) } - fn reset_streams(&mut self) -> Result<()> { - for ref stream in &self.streams { - stream - .reset(&self.scratch, |_, _, _, _| { - self.match_count.fetch_add(1, Ordering::Relaxed); + // fn join(self) { + // for worker in self._workers { + // let _ = worker.join(); + // } + // } +} - Matching::Continue - }) - .with_context(|| "reset stream")?; +struct Worker { + _inner: JoinHandle<()>, +} + +impl Worker { + fn new(mut queue: Queue, _queue_num: u16, tx: Sender) -> Self { + Worker { + _inner: thread::spawn(move || loop { + let mut msg = queue.recv().unwrap_or_else(|_| { + let _ = tx.send(WorkerMessage::Error("Fuck".to_string())); + panic!(""); + }); + + msg.set_verdict(Verdict::Accept); + queue.verdict(msg).unwrap(); + }), } - - Ok(()) } - - // Scan each packet (in the ordering given in the PCAP file) - // through Hyperscan using the streaming interface. - fn scan_streams(&mut self) -> Result<()> { - for (i, ref packet) in self.packets.iter().enumerate() { - let ref stream = self.streams[self.stream_ids[i]]; - - stream - .scan(packet.as_ref().as_slice(), &self.scratch, |_, _, _, _| { - self.match_count.fetch_add(1, Ordering::Relaxed); - - Matching::Continue - }) - .with_context(|| "scan packet")?; +} +struct InputOuputPools { + pub output_queue: Pool, + pub input_queue: Pool, + rx: Receiver, +} +impl InputOuputPools { + fn new(threads: u16) -> InputOuputPools { + let (tx, rx) = mpsc::channel(); + InputOuputPools { + output_queue: Pool::new(threads / 2, tx.clone(), RefCell::new("ciao")), + input_queue: Pool::new(threads / 2, tx, RefCell::new("miao")), + rx, } - - Ok(()) } - // Scan each packet (in the ordering given in the PCAP file) - // through Hyperscan using the block-mode interface. - fn scan_block(&mut self) -> Result<()> { - for ref packet in &self.packets { - self.block_db - .scan(packet.as_ref().as_slice(), &self.scratch, |_, _, _, _| { - self.match_count.fetch_add(1, Ordering::Relaxed); - - Matching::Continue - }) - .with_context(|| "scan packet")?; + fn poll_events(&self) { + loop { + let event = self.rx.recv().expect("Channel has hung up"); + println!("{}", event.to_string()); } - - Ok(()) - } - - // Display some information about the compiled database and scanned data. - fn display_stats(&self) -> Result<()> { - let num_packets = self.packets.len(); - let num_streams = self.sessions.len(); - let num_bytes = self.bytes(); - - println!( - "{} packets in {} streams, totalling {} bytes.", - num_packets, num_streams, num_bytes - ); - println!( - "Average packet length: {} bytes.", - num_bytes / if num_packets > 0 { num_packets } else { 1 } - ); - println!( - "Average stream length: {} bytes.", - num_bytes / if num_streams > 0 { num_streams } else { 1 } - ); - println!(""); - println!( - "Streaming mode Hyperscan database size : {} bytes.", - self.streaming_db.size()? - ); - println!( - "Block mode Hyperscan database size : {} bytes.", - self.block_db.size()? - ); - println!( - "Streaming mode Hyperscan stream state size: {} bytes (per stream).", - self.streaming_db.stream_size()? - ); - - Ok(()) } } -#[derive(Debug, StructOpt)] -#[structopt(name = "simplegrep", about = "An example search a given input file for a pattern.")] -struct Opt { - /// repeat times - #[structopt(short = "n", default_value = "1")] - repeats: usize, +static mut DB: AtomicPtr> = AtomicPtr::new(std::ptr::null_mut() as *mut Arc); - /// pattern file - #[structopt(parse(from_os_str))] - pattern_file: PathBuf, +fn main() -> std::io::Result<()> { + let mut my_x: Arc = Arc::new(0); + let my_x_ptr: *mut Arc = std::ptr::addr_of_mut!(my_x); - /// pcap file - #[structopt(parse(from_os_str))] - pcap_file: PathBuf, -} + unsafe { DB.store(my_x_ptr, std::sync::atomic::Ordering::SeqCst) }; -// Main entry point. -fn main() -> Result<()> { - let Opt { - repeats, - pattern_file, - pcap_file, - } = Opt::from_args(); + thread::spawn(|| loop { + let x_ptr = unsafe { DB.load(std::sync::atomic::Ordering::SeqCst) }; + let x = unsafe { (*x_ptr).clone() }; + dbg!(x); + //sleep_ms(1000); + }); - // Read our pattern set in and build Hyperscan databases from it. - println!("Pattern file: {:?}", pattern_file); - - let (streaming_db, block_db) = match read_databases(pattern_file) { - Ok((streaming_db, block_db)) => (streaming_db, block_db), - Err(err) => { - eprintln!("ERROR: Unable to parse and compile patterns: {}\n", err); - exit(-1); - } - }; - - // Read our input PCAP file in - let mut bench = Benchmark::new(streaming_db, block_db)?; - - println!("PCAP input file: {:?}", pcap_file); - - if let Err(err) = bench.read_streams(pcap_file) { - eprintln!("Unable to read packets from PCAP file. Exiting. {}\n", err); - exit(-1); + for i in 0..1000000000 { + let mut my_x: Arc = Arc::new(i); + let my_x_ptr: *mut Arc = std::ptr::addr_of_mut!(my_x); + unsafe { DB.store(my_x_ptr, std::sync::atomic::Ordering::SeqCst) }; + //sleep_ms(100); } - if repeats != 1 { - println!("Repeating PCAP scan {} times.", repeats); + let mut threads = env::var("NPROCS").unwrap_or_default().parse().unwrap_or(2); + if threads % 2 != 0 { + threads += 1; } - bench.display_stats()?; - - // Streaming mode scans. - let mut streaming_scan = Duration::from_secs(0); - let mut streaming_open_close = Duration::from_secs(0); - - for i in 0..repeats { - if i == 0 { - // Open streams. - let now = Instant::now(); - bench.open_streams()?; - streaming_open_close = streaming_open_close + now.elapsed(); - } else { - // Reset streams. - let now = Instant::now(); - bench.reset_streams()?; - streaming_open_close = streaming_open_close + now.elapsed(); - } - - // Scan all our packets in streaming mode. - let now = Instant::now(); - bench.scan_streams()?; - streaming_scan = streaming_scan + now.elapsed(); - } - - // Close streams. - let now = Instant::now(); - bench.close_streams()?; - streaming_open_close = streaming_open_close + now.elapsed(); - - // Collect data from streaming mode scans. - let bytes = bench.bytes(); - let total_bytes = (bytes * 8 * repeats) as f64; - let tput_stream_scanning = total_bytes * 1000.0 / streaming_scan.as_millis() as f64; - let tput_stream_overhead = total_bytes * 1000.0 / (streaming_scan + streaming_open_close).as_millis() as f64; - let matches_stream = bench.matches(); - let match_rate_stream = (matches_stream as f64) / ((bytes * repeats) as f64 / 1024.0); - - // Scan all our packets in block mode. - bench.clear_matches(); - let now = Instant::now(); - for _ in 0..repeats { - bench.scan_block()?; - } - let scan_block = now.elapsed(); - - // Collect data from block mode scans. - let tput_block_scanning = total_bytes * 1000.0 / scan_block.as_millis() as f64; - let matches_block = bench.matches(); - let match_rate_block = (matches_block as f64) / ((bytes * repeats) as f64 / 1024.0); - - println!("\nStreaming mode:\n"); - println!(" Total matches: {}", matches_stream); - println!(" Match rate: {:.4} matches/kilobyte", match_rate_stream); - println!( - " Throughput (with stream overhead): {:.2} megabits/sec", - tput_stream_overhead / 1000000.0 + let in_out_pools = InputOuputPools::new(threads); + eprintln!( + "[info] [main] Input queues: {}:{}", + in_out_pools.input_queue.start, in_out_pools.input_queue.end ); - println!( - " Throughput (no stream overhead): {:.2} megabits/sec", - tput_stream_scanning / 1000000.0 + eprintln!( + "[info] [main] Output queues: {}:{}", + in_out_pools.output_queue.start, in_out_pools.output_queue.end ); - - println!("\nBlock mode:\n"); - println!(" Total matches: {}", matches_block); - println!(" Match rate: {:.4} matches/kilobyte", match_rate_block); - println!(" Throughput: {:.2} megabits/sec", tput_block_scanning / 1000000.0); - - if bytes < (2 * 1024 * 1024) { - println!( - "\nWARNING: Input PCAP file is less than 2MB in size.\n - This test may have been too short to calculate accurate results." - ); - } - + in_out_pools.poll_events(); Ok(()) } -/* - -shared_ptr regex_config; - -void config_updater (){ - string line; - while (true){ - getline(cin, line); - if (cin.eof()){ - cerr << "[fatal] [updater] cin.eof()" << endl; - exit(EXIT_FAILURE); - } - if (cin.bad()){ - cerr << "[fatal] [updater] cin.bad()" << endl; - exit(EXIT_FAILURE); - } - cerr << "[info] [updater] Updating configuration with line " << line << endl; - istringstream config_stream(line); - regex_rules *regex_new_config = new regex_rules(); - while(!config_stream.eof()){ - string data; - config_stream >> data; - if (data != "" && data != "\n"){ - regex_new_config->add(data.c_str()); - } - } - regex_config.reset(regex_new_config); - cerr << "[info] [updater] Config update done" << endl; - - } - -} - -template -bool filter_callback(const uint8_t *data, uint32_t len){ - shared_ptr current_config = regex_config; - return current_config->check((unsigned char *)data, len, is_input); -} - -int main(int argc, char *argv[]) -{ - regex_config.reset(new regex_rules()); - NFQueueSequence> input_queues(n_of_threads/2); - input_queues.start(); - NFQueueSequence> output_queues(n_of_threads/2); - output_queues.start(); - - cout << "QUEUES INPUT " << input_queues.init() << " " << input_queues.end() << " OUTPUT " << output_queues.init() << " " << output_queues.end() << endl; - cerr << "[info] [main] Input queues: " << input_queues.init() << ":" << input_queues.end() << " threads assigned: " << n_of_threads/2 << endl; - cerr << "[info] [main] Output queues: " << output_queues.init() << ":" << output_queues.end() << " threads assigned: " << n_of_threads/2 << endl; - - config_updater(); -} - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#ifndef NETFILTER_CLASSES_HPP -#define NETFILTER_CLASSES_HPP - -typedef bool NetFilterQueueCallback(const uint8_t*,uint32_t); - -Tins::PDU * find_transport_layer(Tins::PDU* pkt){ - while(pkt != NULL){ - if (pkt->pdu_type() == Tins::PDU::TCP || pkt->pdu_type() == Tins::PDU::UDP) { - return pkt; - } - pkt = pkt->inner_pdu(); - } - return pkt; -} - -template -class NetfilterQueue { - public: - size_t BUF_SIZE = 0xffff + (MNL_SOCKET_BUFFER_SIZE/2); - char *buf = NULL; - unsigned int portid; - u_int16_t queue_num; - struct mnl_socket* nl = NULL; - - NetfilterQueue(u_int16_t queue_num): queue_num(queue_num) { - - nl = mnl_socket_open(NETLINK_NETFILTER); - - if (nl == NULL) { throw std::runtime_error( "mnl_socket_open" );} - - if (mnl_socket_bind(nl, 0, MNL_SOCKET_AUTOPID) < 0) { - mnl_socket_close(nl); - throw std::runtime_error( "mnl_socket_bind" ); - } - portid = mnl_socket_get_portid(nl); - - buf = (char*) malloc(BUF_SIZE); - - if (!buf) { - mnl_socket_close(nl); - throw std::runtime_error( "allocate receive buffer" ); - } - - if (send_config_cmd(NFQNL_CFG_CMD_BIND) < 0) { - _clear(); - throw std::runtime_error( "mnl_socket_send" ); - } - //TEST if BIND was successful - if (send_config_cmd(NFQNL_CFG_CMD_NONE) < 0) { // SEND A NONE cmmand to generate an error meessage - _clear(); - throw std::runtime_error( "mnl_socket_send" ); - } - if (recv_packet() == -1) { //RECV the error message - _clear(); - throw std::runtime_error( "mnl_socket_recvfrom" ); - } - - struct nlmsghdr *nlh = (struct nlmsghdr *) buf; - - if (nlh->nlmsg_type != NLMSG_ERROR) { - _clear(); - throw std::runtime_error( "unexpected packet from kernel (expected NLMSG_ERROR packet)" ); - } - //nfqnl_msg_config_cmd - nlmsgerr* error_msg = (nlmsgerr *)mnl_nlmsg_get_payload(nlh); - - // error code taken from the linux kernel: - // https://elixir.bootlin.com/linux/v5.18.12/source/include/linux/errno.h#L27 - #define ENOTSUPP 524 /* Operation is not supported */ - - if (error_msg->error != -ENOTSUPP) { - _clear(); - throw std::invalid_argument( "queueid is already busy" ); - } - - //END TESTING BIND - nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, queue_num); - nfq_nlmsg_cfg_put_params(nlh, NFQNL_COPY_PACKET, 0xffff); - - - mnl_attr_put_u32(nlh, NFQA_CFG_FLAGS, htonl(NFQA_CFG_F_GSO)); - mnl_attr_put_u32(nlh, NFQA_CFG_MASK, htonl(NFQA_CFG_F_GSO)); - - if (mnl_socket_sendto(nl, nlh, nlh->nlmsg_len) < 0) { - _clear(); - throw std::runtime_error( "mnl_socket_send" ); - } - - } - - - - void run(){ - /* - * ENOBUFS is signalled to userspace when packets were lost - * on kernel side. In most cases, userspace isn't interested - * in this information, so turn it off. - */ - int ret = 1; - mnl_socket_setsockopt(nl, NETLINK_NO_ENOBUFS, &ret, sizeof(int)); - - for (;;) { - ret = recv_packet(); - if (ret == -1) { - throw std::runtime_error( "mnl_socket_recvfrom" ); - } - - ret = mnl_cb_run(buf, ret, 0, portid, queue_cb, nl); - if (ret < 0){ - throw std::runtime_error( "mnl_cb_run" ); - } - } - } - - ~NetfilterQueue() { - send_config_cmd(NFQNL_CFG_CMD_UNBIND); - _clear(); - } - private: - - ssize_t send_config_cmd(nfqnl_msg_config_cmds cmd){ - struct nlmsghdr *nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, queue_num); - nfq_nlmsg_cfg_put_cmd(nlh, AF_INET, cmd); - return mnl_socket_sendto(nl, nlh, nlh->nlmsg_len); - } - - ssize_t recv_packet(){ - return mnl_socket_recvfrom(nl, buf, BUF_SIZE); - } - - void _clear(){ - if (buf != NULL) { - free(buf); - buf = NULL; - } - mnl_socket_close(nl); - } - - static int queue_cb(const struct nlmsghdr *nlh, void *data) - { - struct mnl_socket* nl = (struct mnl_socket*)data; - //Extract attributes from the nlmsghdr - struct nlattr *attr[NFQA_MAX+1] = {}; - - if (nfq_nlmsg_parse(nlh, attr) < 0) { - perror("problems parsing"); - return MNL_CB_ERROR; - } - if (attr[NFQA_PACKET_HDR] == NULL) { - fputs("metaheader not set\n", stderr); - return MNL_CB_ERROR; - } - //Get Payload - uint16_t plen = mnl_attr_get_payload_len(attr[NFQA_PAYLOAD]); - void *payload = mnl_attr_get_payload(attr[NFQA_PAYLOAD]); - - //Return result to the kernel - struct nfqnl_msg_packet_hdr *ph = (nfqnl_msg_packet_hdr*) mnl_attr_get_payload(attr[NFQA_PACKET_HDR]); - struct nfgenmsg *nfg = (nfgenmsg *)mnl_nlmsg_get_payload(nlh); - char buf[MNL_SOCKET_BUFFER_SIZE]; - struct nlmsghdr *nlh_verdict; - struct nlattr *nest; - - nlh_verdict = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, ntohs(nfg->res_id)); - - /* - This define allow to avoid to allocate new heap memory for each packet. - The code under this comment is replicated for ipv6 and ip - Better solutions are welcome. :) - */ - #define PKT_HANDLE \ - Tins::PDU *transport_layer = find_transport_layer(&packet); \ - if(transport_layer->inner_pdu() == nullptr || transport_layer == nullptr){ \ - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); \ - }else{ \ - int size = transport_layer->inner_pdu()->size(); \ - if(callback_func((const uint8_t*)payload+plen - size, size)){ \ - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); \ - } else{ \ - if (transport_layer->pdu_type() == Tins::PDU::TCP){ \ - ((Tins::TCP *)transport_layer)->release_inner_pdu(); \ - ((Tins::TCP *)transport_layer)->set_flag(Tins::TCP::FIN,1); \ - ((Tins::TCP *)transport_layer)->set_flag(Tins::TCP::ACK,1); \ - ((Tins::TCP *)transport_layer)->set_flag(Tins::TCP::SYN,0); \ - nfq_nlmsg_verdict_put_pkt(nlh_verdict, packet.serialize().data(), packet.size()); \ - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_ACCEPT ); \ - }else{ \ - nfq_nlmsg_verdict_put(nlh_verdict, ntohl(ph->packet_id), NF_DROP ); \ - } \ - } \ - } - - // Check IP protocol version - if ( (((uint8_t*)payload)[0] & 0xf0) == 0x40 ){ - Tins::IP packet = Tins::IP((uint8_t*)payload,plen); - PKT_HANDLE - }else{ - Tins::IPv6 packet = Tins::IPv6((uint8_t*)payload,plen); - PKT_HANDLE - } - - /* example to set the connmark. First, start NFQA_CT section: */ - nest = mnl_attr_nest_start(nlh_verdict, NFQA_CT); - - /* then, add the connmark attribute: */ - mnl_attr_put_u32(nlh_verdict, CTA_MARK, htonl(42)); - /* more conntrack attributes, e.g. CTA_LABELS could be set here */ - - /* end conntrack section */ - mnl_attr_nest_end(nlh_verdict, nest); - - if (mnl_socket_sendto(nl, nlh_verdict, nlh_verdict->nlmsg_len) < 0) { - throw std::runtime_error( "mnl_socket_send" ); - } - - return MNL_CB_OK; - } - -}; - -template -class NFQueueSequence{ - private: - std::vector *> nfq; - uint16_t _init; - uint16_t _end; - std::vector threads; - public: - static const int QUEUE_BASE_NUM = 1000; - - NFQueueSequence(uint16_t seq_len){ - if (seq_len <= 0) throw std::invalid_argument("seq_len <= 0"); - nfq = std::vector*>(seq_len); - _init = QUEUE_BASE_NUM; - while(nfq[0] == NULL){ - if (_init+seq_len-1 >= 65536){ - throw std::runtime_error("NFQueueSequence: too many queues!"); - } - for (int i=0;i(_init+i); - }catch(const std::invalid_argument e){ - for(int j = 0; j < i; j++) { - delete nfq[j]; - nfq[j] = nullptr; - } - _init += seq_len - i; - break; - } - } - } - _end = _init + seq_len - 1; - } - - void start(){ - if (threads.size() != 0) throw std::runtime_error("NFQueueSequence: already started!"); - for (int i=0;i::run, nfq[i])); - } - } - - void join(){ - for (int i=0;i -#include -#include -#include -#include "../utils.hpp" - - -#ifndef REGEX_FILTER_HPP -#define REGEX_FILTER_HPP - -typedef jpcre2::select jp; -typedef std::pair regex_rule_pair; -typedef std::vector regex_rule_vector; -struct regex_rules{ - regex_rule_vector output_whitelist, input_whitelist, output_blacklist, input_blacklist; - - regex_rule_vector* getByCode(char code){ - switch(code){ - case 'C': // Client to server Blacklist - return &input_blacklist; break; - case 'c': // Client to server Whitelist - return &input_whitelist; break; - case 'S': // Server to client Blacklist - return &output_blacklist; break; - case 's': // Server to client Whitelist - return &output_whitelist; break; - } - throw std::invalid_argument( "Expected 'C' 'c' 'S' or 's'" ); - } - - int add(const char* arg){ - //Integrity checks - size_t arg_len = strlen(arg); - if (arg_len < 2 || arg_len%2 != 0){ - std::cerr << "[warning] [regex_rules.add] invalid arg passed (" << arg << "), skipping..." << std::endl; - return -1; - } - if (arg[0] != '0' && arg[0] != '1'){ - std::cerr << "[warning] [regex_rules.add] invalid is_case_sensitive (" << arg[0] << ") in '" << arg << "', must be '1' or '0', skipping..." << std::endl; - return -1; - } - if (arg[1] != 'C' && arg[1] != 'c' && arg[1] != 'S' && arg[1] != 's'){ - std::cerr << "[warning] [regex_rules.add] invalid filter_type (" << arg[1] << ") in '" << arg << "', must be 'C', 'c', 'S' or 's', skipping..." << std::endl; - return -1; - } - std::string hex(arg+2), expr; - if (!unhexlify(hex, expr)){ - std::cerr << "[warning] [regex_rules.add] invalid hex regex value (" << hex << "), skipping..." << std::endl; - return -1; - } - //Push regex - jp::Regex regex(expr,arg[0] == '1'?"gS":"giS"); - if (regex){ - std::cerr << "[info] [regex_rules.add] adding new regex filter: '" << expr << "'" << std::endl; - getByCode(arg[1])->push_back(std::make_pair(std::string(arg), regex)); - } else { - std::cerr << "[warning] [regex_rules.add] compiling of '" << expr << "' regex failed, skipping..." << std::endl; - return -1; - } - return 0; - } - - bool check(unsigned char* data, const size_t& bytes_transferred, const bool in_input){ - std::string str_data((char *) data, bytes_transferred); - for (regex_rule_pair ele:(in_input?input_blacklist:output_blacklist)){ - try{ - if(ele.second.match(str_data)){ - std::stringstream msg; - msg << "BLOCKED " << ele.first << "\n"; - std::cout << msg.str() << std::flush; - return false; - } - } catch(...){ - std::cerr << "[info] [regex_rules.check] Error while matching blacklist regex: " << ele.first << std::endl; - } - } - for (regex_rule_pair ele:(in_input?input_whitelist:output_whitelist)){ - try{ - std::cerr << "[debug] [regex_rules.check] regex whitelist match " << ele.second.getPattern() << std::endl; - if(!ele.second.match(str_data)){ - std::stringstream msg; - msg << "BLOCKED " << ele.first << "\n"; - std::cout << msg.str() << std::flush; - return false; - } - } catch(...){ - std::cerr << "[info] [regex_rules.check] Error while matching whitelist regex: " << ele.first << std::endl; - } - } - return true; - } - -}; - -#endif // REGEX_FILTER_HPP - -#include -#include - -#ifndef UTILS_HPP -#define UTILS_HPP - -bool unhexlify(std::string const &hex, std::string &newString) { - try{ - int len = hex.length(); - for(int i=0; i< len; i+=2) - { - std::string byte = hex.substr(i,2); - char chr = (char) (int)strtol(byte.c_str(), NULL, 16); - newString.push_back(chr); - } - return true; - } - catch (...){ - return false; - } -} - -#endif -*/ \ No newline at end of file diff --git a/backend/binsrc/nfqueue_regex/src/regex_rules.rs b/backend/binsrc/nfqueue_regex/src/regex_rules.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/backend/binsrc/nfqueue_regex/src/regex_rules.rs @@ -0,0 +1 @@ +