Fixes and improvements in thread managment
This commit is contained in:
@@ -141,6 +141,7 @@ if __name__ == '__main__':
|
||||
host="0.0.0.0",
|
||||
port=FIREGEX_PORT,
|
||||
reload=DEBUG,
|
||||
access_log=DEBUG,
|
||||
workers=1
|
||||
access_log=True,
|
||||
workers=1 # Multiple workers will cause a crash due to the creation
|
||||
# of multiple processes with separated memory
|
||||
)
|
||||
|
||||
@@ -45,9 +45,11 @@ bool filter_callback(const uint8_t *data, uint32_t len){
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
int n_of_threads = 1;
|
||||
if (argc >= 2) n_of_threads = atoi(argv[1]);
|
||||
char * n_threads_str = getenv("NTHREADS");
|
||||
if (n_threads_str != NULL) n_of_threads = ::atoi(n_threads_str);
|
||||
if(n_of_threads <= 0) n_of_threads = 1;
|
||||
if (n_of_threads % 2 != 0 ) n_of_threads++;
|
||||
cerr << "[info] [main] Using " << n_of_threads << " threads" << endl;
|
||||
regex_config.reset(new regex_rules());
|
||||
NFQueueSequence<filter_callback<true>> input_queues(n_of_threads/2);
|
||||
input_queues.start();
|
||||
@@ -55,6 +57,8 @@ int main(int argc, char *argv[])
|
||||
output_queues.start();
|
||||
|
||||
cout << "QUEUES INPUT " << input_queues.init() << " " << input_queues.end() << " OUTPUT " << output_queues.init() << " " << output_queues.end() << endl;
|
||||
cerr << "[info] [main] Input queues: " << input_queues.init() << ":" << input_queues.end() << " threads assigned: " << n_of_threads/2 << endl;
|
||||
cerr << "[info] [main] Output queues: " << output_queues.init() << ":" << output_queues.end() << " threads assigned: " << n_of_threads/2 << endl;
|
||||
|
||||
config_updater();
|
||||
}
|
||||
|
||||
@@ -450,6 +450,10 @@ int main(int argc, char* argv[])
|
||||
const unsigned short forward_port = static_cast<unsigned short>(::atoi(argv[4]));
|
||||
const string local_host = argv[1];
|
||||
const string forward_host = argv[3];
|
||||
|
||||
int threads = 1;
|
||||
char * n_threads_str = getenv("NTHREADS");
|
||||
if (n_threads_str != NULL) threads = ::atoi(n_threads_str);
|
||||
|
||||
boost::asio::io_context ios;
|
||||
|
||||
@@ -470,19 +474,16 @@ int main(int argc, char* argv[])
|
||||
forward_host, forward_port);
|
||||
|
||||
acceptor.accept_connections();
|
||||
#ifdef MULTI_THREAD
|
||||
boost::thread_group tg;
|
||||
#ifdef THREAD_NUM
|
||||
for (unsigned i = 0; i < THREAD_NUM; ++i)
|
||||
#else
|
||||
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
|
||||
#endif
|
||||
tg.create_thread(boost::bind(&boost::asio::io_context::run, &ios));
|
||||
|
||||
if (threads > 1){
|
||||
boost::thread_group tg;
|
||||
for (unsigned i = 0; i < threads; ++i)
|
||||
tg.create_thread(boost::bind(&boost::asio::io_context::run, &ios));
|
||||
|
||||
tg.join_all();
|
||||
#else
|
||||
ios.run();
|
||||
#endif
|
||||
}else{
|
||||
ios.run();
|
||||
}
|
||||
}
|
||||
catch(exception& e)
|
||||
{
|
||||
|
||||
@@ -5,9 +5,6 @@ from modules.nfregex.models import Service, Regex
|
||||
import re, os, asyncio
|
||||
import traceback
|
||||
|
||||
QUEUE_BASE_NUM = 1000
|
||||
|
||||
|
||||
class RegexFilter:
|
||||
def __init__(
|
||||
self, regex,
|
||||
@@ -61,14 +58,12 @@ class FiregexInterceptor:
|
||||
self.regex_filters: Set[RegexFilter]
|
||||
self.update_config_lock:asyncio.Lock
|
||||
self.process:asyncio.subprocess.Process
|
||||
self.n_queues:int
|
||||
self.update_task: asyncio.Task
|
||||
|
||||
@classmethod
|
||||
async def start(cls, filter: FiregexFilter, n_queues:int = int(os.getenv("NTHREADS","1"))):
|
||||
async def start(cls, filter: FiregexFilter):
|
||||
self = cls()
|
||||
self.filter = filter
|
||||
self.n_queues = n_queues
|
||||
self.filter_map_lock = asyncio.Lock()
|
||||
self.update_config_lock = asyncio.Lock()
|
||||
input_range, output_range = await self._start_binary()
|
||||
@@ -81,7 +76,7 @@ class FiregexInterceptor:
|
||||
async def _start_binary(self):
|
||||
proxy_binary_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"../cppqueue")
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
proxy_binary_path, str(self.n_queues),
|
||||
proxy_binary_path,
|
||||
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
|
||||
)
|
||||
line_fut = self.process.stdout.readuntil()
|
||||
|
||||
@@ -80,7 +80,7 @@ class FiregexTables:
|
||||
"expr": [
|
||||
{'match': {'left': {'payload': {'protocol': ip_family(ip_int), 'field': 'saddr'}}, 'op': '==', 'right': {"prefix": {"addr": ip_addr, "len": ip_addr_cidr}}}},
|
||||
{'match': {"left": { "payload": {"protocol": str(proto), "field": "sport"}}, "op": "==", "right": int(port)}},
|
||||
{"queue": {"num": str(init) if init == end else f"{init}-{end}", "flags": ["bypass"]}}
|
||||
{"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}}
|
||||
]
|
||||
}}})
|
||||
|
||||
@@ -97,17 +97,17 @@ class FiregexTables:
|
||||
"expr": [
|
||||
{'match': {'left': {'payload': {'protocol': ip_family(ip_int), 'field': 'daddr'}}, 'op': '==', 'right': {"prefix": {"addr": ip_addr, "len": ip_addr_cidr}}}},
|
||||
{'match': {"left": { "payload": {"protocol": str(proto), "field": "dport"}}, "op": "==", "right": int(port)}},
|
||||
{"queue": {"num": str(init) if init == end else f"{init}-{end}", "flags": ["bypass"]}}
|
||||
{"queue": {"num": str(init) if init == end else {"range":[init, end] }, "flags": ["bypass"]}}
|
||||
]
|
||||
}}})
|
||||
|
||||
def get(self) -> List[FiregexFilter]:
|
||||
res = []
|
||||
for filter in [ele["rule"] for ele in self.list() if "rule" in ele and ele["rule"]["table"] == self.table_name]:
|
||||
queue_str = str(filter["expr"][2]["queue"]["num"]).split("-")
|
||||
queue_str = filter["expr"][2]["queue"]["num"]
|
||||
queue = None
|
||||
if len(queue_str) == 1: queue = int(queue_str[0]), int(queue_str[0])
|
||||
else: queue = int(queue_str[0]), int(queue_str[1])
|
||||
if isinstance(queue_str,dict): queue = int(queue_str["range"][0]), int(queue_str["range"][1])
|
||||
else: queue = int(queue_str), int(queue_str)
|
||||
ip_int = None
|
||||
if isinstance(filter["expr"][0]["match"]["right"],str):
|
||||
ip_int = str(ip_parse(filter["expr"][0]["match"]["right"]))
|
||||
|
||||
Reference in New Issue
Block a user