diff --git a/.gitignore b/.gitignore index 7d75f0c..194f2b8 100755 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ /backend/db/firegex.db /backend/db/firegex.db-journal -/backend/proxy/proxy +/backend/proxy/proxy.so docker-compose.yml # production /frontend/build diff --git a/backend/proxy/__init__.py b/backend/proxy/__init__.py index fa58ecb..74d5d30 100755 --- a/backend/proxy/__init__.py +++ b/backend/proxy/__init__.py @@ -1,8 +1,8 @@ -from signal import SIGUSR1 +from signal import SIGUSR1, SIGUSR2 from secrets import token_urlsafe +from kthread import KThread import re, os -from ctypes import CDLL, c_char_p, c_int, c_ushort -from threading import Thread +from ctypes import CDLL, POINTER, c_char_p, c_int, c_ushort, CFUNCTYPE, c_void_p, byref #c++ -o proxy proxy.cpp @@ -42,46 +42,44 @@ class Proxy: if not os.path.exists(config_file_path): self.config_file_path = config_file_path self.lib = CDLL(os.path.join(os.path.dirname(os.path.abspath(__file__)),"./proxy.so")) - self.lib.proxy_start.restype = c_int - #char* local_host_p, unsigned short local_port, char* forward_host_p, unsigned short forward_port, char* config_file_p - self.lib.proxy_start.argtypes = [c_char_p, c_ushort, c_char_p, c_ushort, c_char_p] + self.lib.start_proxy.restype = c_int + #char* local_host_p, unsigned short local_port, char* forward_host_p, unsigned short forward_port, char* config_file_p, void (*incrementCallback_p)(const char *) + self.lib.start_proxy.argtypes = [c_char_p, c_ushort, c_char_p, c_ushort, c_char_p, POINTER(c_int), c_void_p] def start(self, in_pause=False): if self.process is None: - filter_map = self.compile_filters() - filters_codes = list(filter_map.keys()) if not in_pause else [] + self.filter_map = self.compile_filters() + filters_codes = list(self.filter_map.keys()) if not in_pause else [] self.__write_config(filters_codes) - self.process = Thread( - target=self.lib.proxy_start, + @CFUNCTYPE(None, c_char_p) + def callback_wrap(regex): + filter = self.filter_map[regex.decode()] + filter.blocked+=1 + print(self.filter_map) + if self.callback_blocked_update: + self.callback_blocked_update(self.filter_map[regex.decode()]) + + status_code = c_int(1) + self.process = KThread( + target=self.lib.start_proxy, args=(self.public_host.encode(), self.public_port, self.internal_host.encode(), self.internal_port, - self.config_file_path.encode() + self.config_file_path.encode(), + byref(status_code), callback_wrap ), - - ) - - #for stdout_line in iter(self.process.stdout.readline, ""): - # if stdout_line.startswith("BLOCKED"): - # regex_id = stdout_line.split()[1] - # filter_map[regex_id].blocked+=1 - # if self.callback_blocked_update: self.callback_blocked_update(filter_map[regex_id]) - #self.process.stdout.close() + ) self.process.start() - return self.process.wait() + self.process.join() + self.__delete_config() + return status_code.value def stop(self): if self.process: - self.process.terminate() - try: - self.process.wait(timeout=3) - return True - except Exception: - self.process.kill() - return False - finally: - self.process = None + if self.process.is_alive(): + os.kill(self.process.native_id,SIGUSR2) + self.process = None return True def restart(self, in_pause=False): @@ -93,7 +91,10 @@ class Proxy: with open(self.config_file_path,'w') as config_file: for line in filters_codes: config_file.write(line + '\n') - + + def __delete_config(self): + os.remove(self.config_file_path) + def reload(self): if self.isactive(): filter_map = self.compile_filters() @@ -102,7 +103,9 @@ class Proxy: self.trigger_reload_config() def isactive(self): - return True if self.process else False + if self.process and not self.process.is_alive(): + self.process = None + return not self.process is None def trigger_reload_config(self): os.kill(self.process.native_id, SIGUSR1) diff --git a/backend/proxy/proxy.cpp b/backend/proxy/proxy.cpp index 2fb7512..f3197af 100644 --- a/backend/proxy/proxy.cpp +++ b/backend/proxy/proxy.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -11,13 +12,14 @@ #include #include #include +#include #include // is* -//#define DEBUG - using namespace std; +#define DEBUG + int to_int(int c) { if (not isxdigit(c)) return -1; // error: non-hexadecimal digit found if (isdigit(c)) return c - '0'; @@ -37,8 +39,94 @@ unhexlify(InputIterator first, InputIterator last, OutputIterator ascii) { return 0; } -vector> regex_s_c_w, regex_c_s_w, regex_s_c_b, regex_c_s_b; -string config_file; +namespace tcp_proxy{ + class acceptor; +}; + + +struct regex_filter_table{ + vector> + regex_s_c_w, regex_c_s_w, regex_s_c_b, regex_c_s_b; +}; +struct regex_proxy{ + string config_file; + tcp_proxy::acceptor *acceptor; + int* status_code; + regex_filter_table filter_tab; + void (*incrementCallback)(const char *) = nullptr; +}; +map proxy_map; + +auto firewall(){ + return proxy_map[gettid()]; +} + +void push_regex(char* arg, bool case_sensitive, vector> &v){ + size_t expr_len = (strlen(arg)-2)/2; + char expr[expr_len]; + unhexlify(arg+2, arg+strlen(arg)-1, expr); + if (case_sensitive){ + boost::regex regex(reinterpret_cast(expr), + reinterpret_cast(expr) + expr_len); + #ifdef DEBUG + cout << "Added case sensitive regex " << expr << endl; + #endif + v.push_back(make_pair(string(arg), regex)); + } else { + boost::regex regex(reinterpret_cast(expr), + reinterpret_cast(expr) + expr_len, boost::regex::icase); + #ifdef DEBUG + cout << "Added case insensitive regex " << expr << endl; + #endif + v.push_back(make_pair(string(arg), regex)); + } +} + +void update_regex(){ + fstream fd; + fd.open(firewall()->config_file,ios::in); + if (!fd.is_open()){ + std::cerr << "Error: config file couln't be opened" << std::endl; + *(firewall()->status_code) = 2; + return; + } + + firewall()->filter_tab.regex_s_c_w.clear(); + firewall()->filter_tab.regex_c_s_w.clear(); + firewall()->filter_tab.regex_s_c_b.clear(); + firewall()->filter_tab.regex_c_s_b.clear(); + + string line; + while(getline(fd, line)){ + char tp[line.length() +1]; + strcpy(tp, line.c_str()); + + if (strlen(tp) >= 2){ + bool case_sensitive = true; + if(tp[0] == '0'){ + case_sensitive = false; + } + switch(tp[1]){ + case 'C': { // Client to server Blacklist + push_regex(tp, case_sensitive, firewall()->filter_tab.regex_c_s_b); + break; + } + case 'c': { // Client to server Whitelist + push_regex(tp, case_sensitive, firewall()->filter_tab.regex_c_s_w); + break; + } + case 'S': { // Server to client Blacklist + push_regex(tp, case_sensitive, firewall()->filter_tab.regex_s_c_b); + break; + } + case 's': { // Server to client Whitelist + push_regex(tp, case_sensitive, firewall()->filter_tab.regex_s_c_w); + break; + } + } + } + } +} bool filter_data(unsigned char* data, const size_t& bytes_transferred, vector> const &blacklist, vector> const &whitelist){ #ifdef DEBUG @@ -52,7 +140,7 @@ bool filter_data(unsigned char* data, const size_t& bytes_transferred, vector(data), reinterpret_cast(data) + bytes_transferred, what, ele.second)){ - cout << "BLOCKED " << ele.first << endl; + if (firewall()->incrementCallback != nullptr) firewall()->incrementCallback(ele.first.c_str()); return false; } } @@ -60,7 +148,7 @@ bool filter_data(unsigned char* data, const size_t& bytes_transferred, vector(data), reinterpret_cast(data) + bytes_transferred, what, ele.second)){ - cout << "BLOCKED " << ele.first << endl; + if (firewall()->incrementCallback != nullptr) firewall()->incrementCallback(ele.first.c_str()); return false; } } @@ -74,13 +162,10 @@ namespace tcp_proxy { namespace ip = boost::asio::ip; + typedef ip::tcp::socket socket_type; class bridge : public boost::enable_shared_from_this { public: - - typedef ip::tcp::socket socket_type; - typedef boost::shared_ptr ptr_type; - bridge(boost::asio::io_service& ios) : downstream_socket_(ios), upstream_socket_ (ios) @@ -88,13 +173,11 @@ namespace tcp_proxy socket_type& downstream_socket() { - // Client socket return downstream_socket_; } socket_type& upstream_socket() { - // Remote server socket return upstream_socket_; } @@ -134,8 +217,6 @@ namespace tcp_proxy close(); } - private: - /* Section A: Remote Server --> Proxy --> Client Process data recieved from remote sever then send to client. @@ -147,7 +228,7 @@ namespace tcp_proxy { if (!error) { - if (filter_data(upstream_data_, bytes_transferred, regex_s_c_b, regex_s_c_w)){ + if (filter_data(upstream_data_, bytes_transferred, firewall()->filter_tab.regex_s_c_b, firewall()->filter_tab.regex_s_c_w)){ async_write(downstream_socket_, boost::asio::buffer(upstream_data_,bytes_transferred), boost::bind(&bridge::handle_downstream_write, @@ -190,7 +271,7 @@ namespace tcp_proxy { if (!error) { - if (filter_data(downstream_data_, bytes_transferred, regex_c_s_b, regex_c_s_w)){ + if (filter_data(downstream_data_, bytes_transferred, firewall()->filter_tab.regex_c_s_b, firewall()->filter_tab.regex_c_s_w)){ async_write(upstream_socket_, boost::asio::buffer(downstream_data_,bytes_transferred), boost::bind(&bridge::handle_upstream_write, @@ -242,181 +323,155 @@ namespace tcp_proxy enum { max_data_length = 8192 }; //8KB unsigned char downstream_data_[max_data_length]; unsigned char upstream_data_ [max_data_length]; - boost::mutex mutex_; - public: - - class acceptor - { - public: - - acceptor(boost::asio::io_service& io_service, - const std::string& local_host, unsigned short local_port, - const std::string& upstream_host, unsigned short upstream_port) - : io_service_(io_service), - localhost_address(boost::asio::ip::address_v4::from_string(local_host)), - acceptor_(io_service_,ip::tcp::endpoint(localhost_address,local_port)), - upstream_port_(upstream_port), - upstream_host_(upstream_host) - {} - - bool accept_connections() - { - try - { - session_ = boost::shared_ptr(new bridge(io_service_)); - - acceptor_.async_accept(session_->downstream_socket(), - boost::bind(&acceptor::handle_accept, - this, - boost::asio::placeholders::error)); - } - catch(std::exception& e) - { - std::cerr << "acceptor exception: " << e.what() << std::endl; - return false; - } - - return true; - } - - private: - - void handle_accept(const boost::system::error_code& error) - { - if (!error) - { - session_->start(upstream_host_,upstream_port_); - - if (!accept_connections()) - { - std::cerr << "Failure during call to accept." << std::endl; - } - } - else - { - std::cerr << "Error: " << error.message() << std::endl; - } - } - - boost::asio::io_service& io_service_; - ip::address_v4 localhost_address; - ip::tcp::acceptor acceptor_; - ptr_type session_; - unsigned short upstream_port_; - std::string upstream_host_; }; - }; -} + class acceptor + { + public: -void push_regex(char* arg, bool case_sensitive, vector> &v){ - size_t expr_len = (strlen(arg)-2)/2; - char expr[expr_len]; - unhexlify(arg+2, arg+strlen(arg)-1, expr); - if (case_sensitive){ - boost::regex regex(reinterpret_cast(expr), - reinterpret_cast(expr) + expr_len); - #ifdef DEBUG - cout << "Added case sensitive regex " << expr << endl; - #endif - v.push_back(make_pair(string(arg), regex)); - } else { - boost::regex regex(reinterpret_cast(expr), - reinterpret_cast(expr) + expr_len, boost::regex::icase); - #ifdef DEBUG - cout << "Added case insensitive regex " << expr << endl; - #endif - v.push_back(make_pair(string(arg), regex)); - } -} + acceptor(boost::asio::io_service& io_service, + const std::string& local_host, unsigned short local_port, + const std::string& upstream_host, unsigned short upstream_port) + : io_service_(io_service), + localhost_address(boost::asio::ip::address_v4::from_string(local_host)), + acceptor_(io_service_,ip::tcp::endpoint(localhost_address,local_port)), + upstream_port_(upstream_port), + upstream_host_(upstream_host) + { + boost::asio::socket_base::reuse_address option(true); + acceptor_.set_option(option); + } -void update_regex(){ - fstream fd; - fd.open(config_file,ios::in); - if (!fd.is_open()){ - std::cerr << "Error: config file couln't be opened" << std::endl; - exit(1); - } + bool accept_connections() + { + try + { + session_ = boost::shared_ptr(new bridge(io_service_)); - regex_s_c_w.clear(); - regex_c_s_w.clear(); - regex_s_c_b.clear(); - regex_c_s_b.clear(); - - string line; - while(getline(fd, line)){ - char tp[line.length() +1]; - strcpy(tp, line.c_str()); - - if (strlen(tp) >= 2){ - bool case_sensitive = true; - if(tp[0] == '0'){ - case_sensitive = false; + acceptor_.async_accept(session_->downstream_socket(), + boost::bind(&acceptor::handle_accept, + this, + boost::asio::placeholders::error)); + } - switch(tp[1]){ - case 'C': { // Client to server Blacklist - push_regex(tp, case_sensitive, regex_c_s_b); - break; - } - case 'c': { // Client to server Whitelist - push_regex(tp, case_sensitive, regex_c_s_w); - break; - } - case 'S': { // Server to client Blacklist - push_regex(tp, case_sensitive, regex_s_c_b); - break; - } - case 's': { // Server to client Whitelist - push_regex(tp, case_sensitive, regex_s_c_w); - break; + catch(std::exception& e) + { + std::cerr << "acceptor exception: " << e.what() << std::endl; + return false; + } + + return true; + } + + void close(){ + cout << "acceptor 1: " << endl; + acceptor_.close(); + cout << "acceptor 2: " << endl; + acceptor_.cancel(); + cout << "acceptor 3: " << endl; + session_->close(); + cout << "acceptor 4: " << endl; + } + + private: + + void handle_accept(const boost::system::error_code& error) + { + if (!error) + { + session_->start(upstream_host_,upstream_port_); + + if (!accept_connections()) + { + std::cerr << "Failure during call to accept." << std::endl; } } + else + { + std::cerr << "Error: " << error.message() << std::endl; + } } - } + + boost::asio::io_service& io_service_; + ip::address_v4 localhost_address; + ip::tcp::acceptor acceptor_; + boost::shared_ptr session_; + unsigned short upstream_port_; + std::string upstream_host_; + }; + + } -void signal_handler(int signal_num) -{ +static void signal_handler(int signal_num){ if (signal_num == SIGUSR1){ #ifdef DEBUG - cout << "Updating configurtation" << endl; + cout << "Updating configurtation: " << gettid() << endl; #endif update_regex(); + }else if(signal_num == SIGUSR2){ + #ifdef DEBUG + cout << "Killed: " << gettid() << endl; + #endif + *(firewall()->status_code) = 0; + #ifdef DEBUG + cout << "Exiting PT1: " << gettid() << endl; + #endif + proxy_map.erase(gettid()); + #ifdef DEBUG + cout << "Exiting PT2: " << gettid() << endl; + #endif + //firewall()->acceptor->close(); + #ifdef DEBUG + cout << "Exiting PT3: " << gettid() << endl; + #endif } } -extern "C" int start_proxy(char* local_host_p, unsigned short local_port, char* forward_host_p, unsigned short forward_port, char* config_file_p, int pid) -{ - const std::string local_host = local_host_p; - const std::string forward_host = forward_host_p; - - config_file = config_file_p; - +extern "C" void start_proxy( + char* local_host,unsigned short local_port, + char* forward_host, unsigned short forward_port, + char* config_file, int* status_code, void (*incrementCallback)(const char *) +){ + #ifdef DEBUG + cout << "Starting: " << gettid() << endl; + #endif + regex_proxy tmp; + proxy_map[gettid()] = &tmp; + tmp.config_file = config_file; + tmp.status_code = status_code; + tmp.incrementCallback = incrementCallback; update_regex(); - signal(SIGUSR1, signal_handler); - boost::asio::io_service ios; - + signal(SIGUSR1, signal_handler); + signal(SIGUSR2, signal_handler); try { - tcp_proxy::bridge::acceptor acceptor(ios, - local_host, local_port, - forward_host, forward_port); - + tcp_proxy::acceptor acceptor(ios, + local_host, local_port, + forward_host, forward_port); + firewall()->acceptor = &acceptor; acceptor.accept_connections(); - ios.run(); } catch(std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; - return 1; + *(firewall()->status_code) = 1; + proxy_map.erase(gettid()); + return; } - return 0; + #ifdef DEBUG + cout << "Exiting: " << gettid() << endl; + #endif + *(firewall()->status_code) = 0; + proxy_map.erase(gettid()); + + } /* diff --git a/backend/proxy/proxy.so b/backend/proxy/proxy.so index 7c33ca3..5faa518 100755 Binary files a/backend/proxy/proxy.so and b/backend/proxy/proxy.so differ diff --git a/backend/utils.py b/backend/utils.py index f998c75..def1ecd 100755 --- a/backend/utils.py +++ b/backend/utils.py @@ -155,7 +155,7 @@ class ProxyManager: self.__update_status_db(id, next_status) if saved_status[0] == "wait": saved_status[0] = next_status proxy_status = proxy.start(in_pause=(next_status==STATUS.PAUSE)) - if proxy_status == 1: + if proxy_status != 0: self.__update_status_db(id, STATUS.STOP) return else: @@ -210,7 +210,6 @@ class ProxyManager: def stats_updater(filter:Filter): - print("Callback received",filter.blocked, filter.code) self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", (filter.blocked, filter.code)) if not proxy: