diff --git a/backend/app.py b/backend/app.py index 2a97aeb..23b9801 100644 --- a/backend/app.py +++ b/backend/app.py @@ -34,11 +34,6 @@ app = FastAPI(debug=DEBUG, redoc_url=None) def APP_STATUS(): return "init" if conf.get("password") is None else "run" def JWT_SECRET(): return conf.get("secret") -@app.on_event("shutdown") -async def shutdown_event(): - db.disconnect() - await firewall.close() - @app.on_event("startup") async def startup_event(): db.init() @@ -147,10 +142,8 @@ async def get_general_stats(auth: bool = Depends(is_loggined)): """)[0] class ServiceModel(BaseModel): - id:str status: str - public_port: int - internal_port: int + port: int name: str n_regex: int n_packets: int @@ -160,119 +153,80 @@ async def get_service_list(auth: bool = Depends(is_loggined)): """Get the list of existent firegex services""" return db.query(""" SELECT - s.service_id `id`, s.status status, - s.public_port public_port, - s.internal_port internal_port, + s.port port, s.name name, COUNT(r.regex_id) n_regex, COALESCE(SUM(r.blocked_packets),0) n_packets - FROM services s LEFT JOIN regexes r ON r.service_id = s.service_id - GROUP BY s.service_id; + FROM services s LEFT JOIN regexes r ON r.service_port = s.port + GROUP BY s.port; """) -@app.get('/api/service/{service_id}', response_model=ServiceModel) -async def get_service_by_id(service_id: str, auth: bool = Depends(is_loggined)): +@app.get('/api/service/{service_port}', response_model=ServiceModel) +async def get_service_by_id(service_port: int, auth: bool = Depends(is_loggined)): """Get info about a specific service using his id""" res = db.query(""" SELECT - s.service_id `id`, s.status status, - s.public_port public_port, - s.internal_port internal_port, + s.port port, s.name name, COUNT(r.regex_id) n_regex, COALESCE(SUM(r.blocked_packets),0) n_packets - FROM services s LEFT JOIN regexes r ON r.service_id = s.service_id WHERE s.service_id = ? - GROUP BY s.service_id; - """, service_id) + FROM services s LEFT JOIN regexes r ON r.service_port = s.port WHERE s.port = ? + GROUP BY s.port; + """, service_port) if len(res) == 0: raise HTTPException(status_code=400, detail="This service does not exists!") return res[0] class StatusMessageModel(BaseModel): status:str -@app.get('/api/service/{service_id}/pause', response_model=StatusMessageModel) -async def service_pause(service_id: str, auth: bool = Depends(is_loggined)): - """Request the pause of a specific service""" - await firewall.get(service_id).next(STATUS.PAUSE) +@app.get('/api/service/{service_port}/stop', response_model=StatusMessageModel) +async def service_stop(service_port: int, auth: bool = Depends(is_loggined)): + """Request the stop of a specific service""" + await firewall.get(service_port).next(STATUS.STOP) return {'status': 'ok'} -@app.get('/api/service/{service_id}/start', response_model=StatusMessageModel) -async def service_start(service_id: str, auth: bool = Depends(is_loggined)): +@app.get('/api/service/{service_port}/start', response_model=StatusMessageModel) +async def service_start(service_port: int, auth: bool = Depends(is_loggined)): """Request the start of a specific service""" - await firewall.get(service_id).next(STATUS.ACTIVE) + await firewall.get(service_port).next(STATUS.ACTIVE) return {'status': 'ok'} -@app.get('/api/service/{service_id}/delete', response_model=StatusMessageModel) -async def service_delete(service_id: str, auth: bool = Depends(is_loggined)): +@app.get('/api/service/{service_port}/delete', response_model=StatusMessageModel) +async def service_delete(service_port: int, auth: bool = Depends(is_loggined)): """Request the deletion of a specific service""" - db.query('DELETE FROM services WHERE service_id = ?;', service_id) - db.query('DELETE FROM regexes WHERE service_id = ?;', service_id) - await firewall.remove(service_id) - return {'status': 'ok'} - - -@app.get('/api/service/{service_id}/regen-port', response_model=StatusMessageModel) -async def regen_service_port(service_id: str, auth: bool = Depends(is_loggined)): - """Request the regeneration of a the internal proxy port of a specific service""" - db.query('UPDATE services SET internal_port = ? WHERE service_id = ?;', gen_internal_port(db), service_id) - await firewall.get(service_id).update_port() - return {'status': 'ok'} - -class ChangePortForm(BaseModel): - port: Union[int, None] - internalPort: Union[int, None] - -@app.post('/api/service/{service_id}/change-ports', response_model=StatusMessageModel) -async def change_service_ports(service_id: str, change_port:ChangePortForm, auth: bool = Depends(is_loggined)): - """Choose and change the ports of the service""" - if change_port.port is None and change_port.internalPort is None: - return {'status': 'Invalid Request!'} - try: - sql_inj = "" - query = [] - if not change_port.port is None: - sql_inj+=" public_port = ? " - query.append(change_port.port) - if not change_port.port is None and not change_port.internalPort is None: - sql_inj += "," - if not change_port.internalPort is None: - sql_inj+=" internal_port = ? " - query.append(change_port.internalPort) - query.append(service_id) - db.query(f'UPDATE services SET {sql_inj} WHERE service_id = ?;', *query) - except sqlite3.IntegrityError: - return {'status': 'Name or/and port of the service has been already assigned to another service'} - await firewall.get(service_id).update_port() + db.query('DELETE FROM services WHERE port = ?;', service_port) + db.query('DELETE FROM regexes WHERE service_port = ?;', service_port) + await firewall.remove(service_port) return {'status': 'ok'} class RegexModel(BaseModel): regex:str mode:str id:int - service_id:str + service_port:int is_blacklist: bool n_packets:int is_case_sensitive:bool active:bool -@app.get('/api/service/{service_id}/regexes', response_model=List[RegexModel]) -async def get_service_regexe_list(service_id: str, auth: bool = Depends(is_loggined)): +@app.get('/api/service/{service_port}/regexes', response_model=List[RegexModel]) +async def get_service_regexe_list(service_port: int, auth: bool = Depends(is_loggined)): """Get the list of the regexes of a service""" return db.query(""" SELECT - regex, mode, regex_id `id`, service_id, is_blacklist, + regex, mode, regex_id `id`, service_port, is_blacklist, blocked_packets n_packets, is_case_sensitive, active - FROM regexes WHERE service_id = ?; - """, service_id) + FROM regexes WHERE service_port = ?; + """, service_port) @app.get('/api/regex/{regex_id}', response_model=RegexModel) async def get_regex_by_id(regex_id: int, auth: bool = Depends(is_loggined)): """Get regex info using his id""" res = db.query(""" SELECT - regex, mode, regex_id `id`, service_id, is_blacklist, + regex, mode, regex_id `id`, service_port, is_blacklist, blocked_packets n_packets, is_case_sensitive, active FROM regexes WHERE `id` = ?; """, regex_id) @@ -285,7 +239,7 @@ async def regex_delete(regex_id: int, auth: bool = Depends(is_loggined)): res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id) if len(res) != 0: db.query('DELETE FROM regexes WHERE regex_id = ?;', regex_id) - await firewall.get(res[0]["service_id"]).update_filters() + await firewall.get(res[0]["service_port"]).update_filters() return {'status': 'ok'} @@ -295,7 +249,7 @@ async def regex_enable(regex_id: int, auth: bool = Depends(is_loggined)): res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id) if len(res) != 0: db.query('UPDATE regexes SET active=1 WHERE regex_id = ?;', regex_id) - await firewall.get(res[0]["service_id"]).update_filters() + await firewall.get(res[0]["service_port"]).update_filters() return {'status': 'ok'} @app.get('/api/regex/{regex_id}/disable', response_model=StatusMessageModel) @@ -304,11 +258,11 @@ async def regex_disable(regex_id: int, auth: bool = Depends(is_loggined)): res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id) if len(res) != 0: db.query('UPDATE regexes SET active=0 WHERE regex_id = ?;', regex_id) - await firewall.get(res[0]["service_id"]).update_filters() + await firewall.get(res[0]["service_port"]).update_filters() return {'status': 'ok'} class RegexAddForm(BaseModel): - service_id: str + service_port: int regex: str mode: str active: Union[bool,None] @@ -323,36 +277,29 @@ async def add_new_regex(form: RegexAddForm, auth: bool = Depends(is_loggined)): except Exception: return {"status":"Invalid regex"} try: - db.query("INSERT INTO regexes (service_id, regex, is_blacklist, mode, is_case_sensitive, active ) VALUES (?, ?, ?, ?, ?, ?);", - form.service_id, form.regex, form.is_blacklist, form.mode, form.is_case_sensitive, True if form.active is None else form.active ) + db.query("INSERT INTO regexes (service_port, regex, is_blacklist, mode, is_case_sensitive, active ) VALUES (?, ?, ?, ?, ?, ?);", + form.service_port, form.regex, form.is_blacklist, form.mode, form.is_case_sensitive, True if form.active is None else form.active ) except sqlite3.IntegrityError: return {'status': 'An identical regex already exists'} - await firewall.get(form.service_id).update_filters() + await firewall.get(form.service_port).update_filters() return {'status': 'ok'} class ServiceAddForm(BaseModel): name: str port: int - internalPort: Union[int, None] -class ServiceAddStatus(BaseModel): - status:str - id: Union[str,None] - -@app.post('/api/services/add', response_model=ServiceAddStatus) +@app.post('/api/services/add', response_model=StatusMessageModel) async def add_new_service(form: ServiceAddForm, auth: bool = Depends(is_loggined)): """Add a new service""" - serv_id = gen_service_id(db) try: - internal_port = form.internalPort if form.internalPort else gen_internal_port(db) - db.query("INSERT INTO services (name, service_id, internal_port, public_port, status) VALUES (?, ?, ?, ?, ?)", - form.name, serv_id, internal_port, form.port, 'stop') + db.query("INSERT INTO services (name, port, status) VALUES (?, ?, ?)", + form.name, form.port, STATUS.STOP) except sqlite3.IntegrityError: return {'status': 'Name or/and ports of the service has been already assigned to another service'} await firewall.reload() - return {'status': 'ok', "id": serv_id } + return {'status': 'ok'} async def frontend_debug_proxy(path): httpc = httpx.AsyncClient() diff --git a/backend/proxy/__init__.py b/backend/proxy.py similarity index 75% rename from backend/proxy/__init__.py rename to backend/proxy.py index a7de8cf..55f4d72 100755 --- a/backend/proxy/__init__.py +++ b/backend/proxy.py @@ -4,6 +4,7 @@ from threading import Lock, Thread from scapy.all import IP, TCP, UDP from subprocess import Popen, PIPE import os, pcre2, traceback, asyncio +from kthread import KThread QUEUE_BASE_NUM = 1000 @@ -176,40 +177,6 @@ class FiregexFilterManager: if filter.port == int(port): filter.delete() -def c_to_s(pkt, data): - print("SENDING", bytes(data[TCP].payload).decode()) - if "bug" in bytes(data[TCP].payload).decode(): - pkt.drop() - return - pkt.accept() - -def s_to_c(pkt, data): - print("RECIVING", bytes(data[TCP].payload).decode()) - pkt.accept() - -""" -try: - - manager.delete_all() - thr_list = [] - q_list = manager.add("test_service",ProtoTypes.TCP, 8080, c_to_s, s_to_c) - print(manager.get()) - for q in q_list: - thr_list.append(Thread(target=q.run)) - thr_list[-1].start() - - for t in thr_list: - t.join() -except KeyboardInterrupt: - for q in q_list: - q.unbind() - - manager.delete_by_service("test_service") - -#sudo iptables -I OUTPUT -p tcp --sport 8080 -j NFQUEUE --queue-num 10001 --queue-bypass -m comment --comment "&firegex&servid& Text" -#sudo iptables -I INPUT -p tcp --dport 8080 -j NFQUEUE --queue-num 10000 --queue-bypass -m comment --comment "&firegex&servid& Text" -""" - class Filter: def __init__(self, regex, is_case_sensitive=True, is_blacklist=True, c_to_s=False, s_to_c=False, blocked_packets=0, code=None): self.regex = regex @@ -231,16 +198,16 @@ class Filter: return True if self.compiled_regex.search(data) else False class Proxy: - def __init__(self, public_port, callback_blocked_update=None, filters=None): + def __init__(self, public_port = 0, callback_blocked_update=None, filters=None): self.manager = FiregexFilterManager() - self.update_config_lock = asyncio.Lock() - self.status_change = asyncio.Lock() self.port = public_port self.filters: Set[Filter] = set(filters) if filters else set([]) self.use_filters = True self.callback_blocked_update = callback_blocked_update - - async def start(self): + self.threads = [] + self.queue_list = [] + + def start(self): self.manager.delete_by_port(self.port) def c_to_s(pkt, data): @@ -273,44 +240,24 @@ class Proxy: pass pkt.accept() - self.manager.add(ProtoTypes.TCP, self.port, c_to_s, s_to_c) + self.queue_list = self.manager.add(ProtoTypes.TCP, self.port, c_to_s, s_to_c) + for ele in self.queue_list: + self.threads.append(KThread(target=ele.run)) + self.threads[-1].daemon = True + self.threads[-1].start() - async def stop(self): - async with self.status_change: - if self.isactive(): - self.process.kill() - return False - return True - - async def restart(self, in_pause=False): - status = await self.stop() - await self.start(in_pause=in_pause) - return status - - async def update_config(self, filters_codes): - async with self.update_config_lock: - if (self.isactive()): - self.process.stdin.write((" ".join(filters_codes)+"\n").encode()) - await self.process.stdin.drain() - - async def reload(self): - if self.isactive(): - async with self.filter_map_lock: - self.filter_map = self.compile_filters() - filters_codes = self.get_filter_codes() - await self.update_config(filters_codes) - - def get_filter_codes(self): - filters_codes = list(self.filter_map.keys()) - filters_codes.sort(key=lambda a: self.filter_map[a].blocked, reverse=True) - return filters_codes - - def isactive(self): - return self.process and self.process.returncode is None - - async def pause(self): - if self.isactive(): - await self.update_config([]) - else: - await self.start(in_pause=True) + def stop(self): + self.manager.delete_by_port(self.port) + for ele in self.threads: + ele.kill() + if ele.is_alive(): + print("Not killed succesffully") #TODO + self.threads = [] + for ele in self.queue_list: + ele.unbind() + self.queue_list = [] + + def restart(self): + self.stop() + self.start() \ No newline at end of file diff --git a/backend/proxy/proxy.cpp b/backend/proxy/proxy.cpp deleted file mode 100644 index 4a6cac9..0000000 --- a/backend/proxy/proxy.cpp +++ /dev/null @@ -1,497 +0,0 @@ -/* - Copyright (c) 2007 Arash Partow (http://www.partow.net) - URL: http://www.partow.net/programming/tcpproxy/index.html - Modified and adapted by Pwnzer0tt1 -*/ -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -typedef jpcre2::select jp; -using namespace std; - -bool unhexlify(string const &hex, string &newString) { - try{ - int len = hex.length(); - for(int i=0; i< len; i+=2) - { - std::string byte = hex.substr(i,2); - char chr = (char) (int)strtol(byte.c_str(), NULL, 16); - newString.push_back(chr); - } - return true; - } - catch (...){ - return false; - } -} - -typedef pair regex_rule_pair; -typedef vector regex_rule_vector; -struct regex_rules{ - regex_rule_vector regex_s_c_w, regex_c_s_w, regex_s_c_b, regex_c_s_b; - - regex_rule_vector* getByCode(char code){ - switch(code){ - case 'C': // Client to server Blacklist - return ®ex_c_s_b; break; - case 'c': // Client to server Whitelist - return ®ex_c_s_w; break; - case 'S': // Server to client Blacklist - return ®ex_s_c_b; break; - case 's': // Server to client Whitelist - return ®ex_s_c_w; break; - } - throw invalid_argument( "Expected 'C' 'c' 'S' or 's'" ); - } - - void add(const char* arg){ - - //Integrity checks - size_t arg_len = strlen(arg); - if (arg_len < 2 || arg_len%2 != 0) return; - if (arg[0] != '0' && arg[0] != '1') return; - if (arg[1] != 'C' && arg[1] != 'c' && arg[1] != 'S' && arg[1] != 's') return; - string hex(arg+2), expr; - if (!unhexlify(hex, expr)) return; - //Push regex - jp::Regex regex(expr,arg[0] == '1'?"gS":"giS"); - if (regex){ - #ifdef DEBUG - cerr << "Added regex " << expr << " " << arg << endl; - #endif - getByCode(arg[1])->push_back(make_pair(string(arg), regex)); - } else { - cerr << "Regex " << arg << " was not compiled successfully" << endl; - } - } - -}; -shared_ptr regex_config; - -mutex update_mutex; -#ifdef MULTI_THREAD -mutex stdout_mutex; -#endif - -bool filter_data(unsigned char* data, const size_t& bytes_transferred, regex_rule_vector const &blacklist, regex_rule_vector const &whitelist){ - #ifdef DEBUG_PACKET - cerr << "---------------- Packet ----------------" << endl; - for(int i=0;i lck(stdout_mutex); - #endif - cout << "BLOCKED " << ele.first << endl; - return false; - } - } catch(...){ - cerr << "Error while matching regex: " << ele.first << endl; - } - } - for (regex_rule_pair ele:whitelist){ - try{ - if(!ele.second.match(str_data)){ - #ifdef MULTI_THREAD - std::unique_lock lck(stdout_mutex); - #endif - cout << "BLOCKED " << ele.first << endl; - return false; - } - } catch(...){ - cerr << "Error while matching regex: " << ele.first << endl; - } - } - #ifdef DEBUG - cerr << "Packet Accepted!" << endl; - #endif - return true; -} - -namespace tcp_proxy -{ - namespace ip = boost::asio::ip; - - class bridge : public boost::enable_shared_from_this - { - public: - - typedef ip::tcp::socket socket_type; - typedef boost::shared_ptr ptr_type; - - bridge(boost::asio::io_context& ios) - : downstream_socket_(ios), - upstream_socket_ (ios), - thread_safety(ios) - {} - - socket_type& downstream_socket() - { - // Client socket - return downstream_socket_; - } - - socket_type& upstream_socket() - { - // Remote server socket - return upstream_socket_; - } - - void start(const string& upstream_host, unsigned short upstream_port) - { - // Attempt connection to remote server (upstream side) - upstream_socket_.async_connect( - ip::tcp::endpoint( - boost::asio::ip::address::from_string(upstream_host), - upstream_port), - boost::asio::bind_executor(thread_safety, - boost::bind( - &bridge::handle_upstream_connect, - shared_from_this(), - boost::asio::placeholders::error))); - } - - void handle_upstream_connect(const boost::system::error_code& error) - { - if (!error) - { - // Setup async read from remote server (upstream) - - upstream_socket_.async_read_some( - boost::asio::buffer(upstream_data_,max_data_length), - boost::asio::bind_executor(thread_safety, - boost::bind(&bridge::handle_upstream_read, - shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - - // Setup async read from client (downstream) - downstream_socket_.async_read_some( - boost::asio::buffer(downstream_data_,max_data_length), - boost::asio::bind_executor(thread_safety, - boost::bind(&bridge::handle_downstream_read, - shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - } - else - close(); - } - - private: - - /* - Section A: Remote Server --> Proxy --> Client - Process data recieved from remote sever then send to client. - */ - - // Read from remote server complete, now send data to client - void handle_upstream_read(const boost::system::error_code& error, - const size_t& bytes_transferred) // Da Server a Client - { - if (!error) - { - shared_ptr regex_old_config = regex_config; - if (filter_data(upstream_data_, bytes_transferred, regex_old_config->regex_s_c_b, regex_old_config->regex_s_c_w)){ - async_write(downstream_socket_, - boost::asio::buffer(upstream_data_,bytes_transferred), - boost::asio::bind_executor(thread_safety, - boost::bind(&bridge::handle_downstream_write, - shared_from_this(), - boost::asio::placeholders::error))); - }else{ - close(); - } - } - else - close(); - } - - // Write to client complete, Async read from remote server - void handle_downstream_write(const boost::system::error_code& error) - { - if (!error) - { - - upstream_socket_.async_read_some( - boost::asio::buffer(upstream_data_,max_data_length), - boost::asio::bind_executor(thread_safety, - boost::bind(&bridge::handle_upstream_read, - shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - } - else - close(); - } - // *** End Of Section A *** - - - /* - Section B: Client --> Proxy --> Remove Server - Process data recieved from client then write to remove server. - */ - - // Read from client complete, now send data to remote server - void handle_downstream_read(const boost::system::error_code& error, - const size_t& bytes_transferred) // Da Client a Server - { - if (!error) - { - shared_ptr regex_old_config = regex_config; - if (filter_data(downstream_data_, bytes_transferred, regex_old_config->regex_c_s_b, regex_old_config->regex_c_s_w)){ - async_write(upstream_socket_, - boost::asio::buffer(downstream_data_,bytes_transferred), - boost::asio::bind_executor(thread_safety, - boost::bind(&bridge::handle_upstream_write, - shared_from_this(), - boost::asio::placeholders::error))); - }else{ - close(); - } - } - else - close(); - } - - // Write to remote server complete, Async read from client - void handle_upstream_write(const boost::system::error_code& error) - { - if (!error) - { - downstream_socket_.async_read_some( - boost::asio::buffer(downstream_data_,max_data_length), - boost::asio::bind_executor(thread_safety, - boost::bind(&bridge::handle_downstream_read, - shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - } - else - close(); - } - // *** End Of Section B *** - - void close() - { - boost::mutex::scoped_lock lock(mutex_); - - if (downstream_socket_.is_open()) - { - downstream_socket_.close(); - } - - if (upstream_socket_.is_open()) - { - upstream_socket_.close(); - } - } - - socket_type downstream_socket_; - socket_type upstream_socket_; - - enum { max_data_length = 8192 }; //8KB - unsigned char downstream_data_[max_data_length]; - unsigned char upstream_data_ [max_data_length]; - boost::asio::io_context::strand thread_safety; - boost::mutex mutex_; - public: - - class acceptor - { - public: - - acceptor(boost::asio::io_context& io_context, - const string& local_host, unsigned short local_port, - const string& upstream_host, unsigned short upstream_port) - : io_context_(io_context), - localhost_address(boost::asio::ip::address_v4::from_string(local_host)), - acceptor_(io_context_,ip::tcp::endpoint(localhost_address,local_port)), - upstream_port_(upstream_port), - upstream_host_(upstream_host) - {} - - bool accept_connections() - { - try - { - session_ = boost::shared_ptr(new bridge(io_context_)); - - acceptor_.async_accept(session_->downstream_socket(), - boost::asio::bind_executor(session_->thread_safety, - boost::bind(&acceptor::handle_accept, - this, - boost::asio::placeholders::error))); - } - catch(exception& e) - { - cerr << "acceptor exception: " << e.what() << endl; - return false; - } - - return true; - } - - private: - - void handle_accept(const boost::system::error_code& error) - { - if (!error) - { - session_->start(upstream_host_,upstream_port_); - - if (!accept_connections()) - { - cerr << "Failure during call to accept." << endl; - } - } - else - { - cerr << "Error: " << error.message() << endl; - } - } - - boost::asio::io_context& io_context_; - ip::address_v4 localhost_address; - ip::tcp::acceptor acceptor_; - ptr_type session_; - unsigned short upstream_port_; - string upstream_host_; - }; - - }; -} - -void update_config (boost::asio::streambuf &input_buffer){ - #ifdef DEBUG - cerr << "Updating configuration" << endl; - #endif - std::istream config_stream(&input_buffer); - std::unique_lock lck(update_mutex); - regex_rules *regex_new_config = new regex_rules(); - string data; - while(true){ - config_stream >> data; - if (config_stream.eof()) break; - regex_new_config->add(data.c_str()); - } - regex_config.reset(regex_new_config); -} - -class async_updater -{ -public: - async_updater(boost::asio::io_context& io_context) : input_(io_context, ::dup(STDIN_FILENO)), thread_safety(io_context) - { - - boost::asio::async_read_until(input_, input_buffer_, '\n', - boost::asio::bind_executor(thread_safety, - boost::bind(&async_updater::on_update, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - } - - void on_update(const boost::system::error_code& error, std::size_t length) - { - if (!error) - { - update_config(input_buffer_); - boost::asio::async_read_until(input_, input_buffer_, '\n', - boost::asio::bind_executor(thread_safety, - boost::bind(&async_updater::on_update, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - } - else - { - close(); - } - } - - void close() - { - input_.close(); - } - -private: - boost::asio::posix::stream_descriptor input_; - boost::asio::io_context::strand thread_safety; - boost::asio::streambuf input_buffer_; -}; - - -int main(int argc, char* argv[]) -{ - if (argc < 5) - { - cerr << "usage: tcpproxy_server " << endl; - return 1; - } - - const unsigned short local_port = static_cast(::atoi(argv[2])); - const unsigned short forward_port = static_cast(::atoi(argv[4])); - const string local_host = argv[1]; - const string forward_host = argv[3]; - - boost::asio::io_context ios; - - boost::asio::streambuf buf; - boost::asio::posix::stream_descriptor cin_in(ios, ::dup(STDIN_FILENO)); - boost::asio::read_until(cin_in, buf,'\n'); - update_config(buf); - - async_updater updater(ios); - - #ifdef DEBUG - cerr << "Starting Proxy" << endl; - #endif - try - { - tcp_proxy::bridge::acceptor acceptor(ios, - local_host, local_port, - forward_host, forward_port); - - acceptor.accept_connections(); - #ifdef MULTI_THREAD - boost::thread_group tg; - #ifdef THREAD_NUM - for (unsigned i = 0; i < THREAD_NUM; ++i) - #else - for (unsigned i = 0; i < thread::hardware_concurrency(); ++i) - #endif - tg.create_thread(boost::bind(&boost::asio::io_context::run, &ios)); - - tg.join_all(); - #else - ios.run(); - #endif - } - catch(exception& e) - { - cerr << "Error: " << e.what() << endl; - return 1; - } - #ifdef DEBUG - cerr << "Proxy stopped!" << endl; - #endif - - return 0; -} diff --git a/backend/requirements.txt b/backend/requirements.txt index 9ab6b99..00962b5 100755 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -4,3 +4,5 @@ uvicorn[standard] passlib[bcrypt] python-jose[cryptography] NetfilterQueue +kthread +scapy \ No newline at end of file diff --git a/backend/utils.py b/backend/utils.py index 35de202..a1093fc 100755 --- a/backend/utils.py +++ b/backend/utils.py @@ -53,28 +53,26 @@ class SQLite(): self.create_schema({ 'services': { 'status': 'VARCHAR(100) NOT NULL', - 'service_id': 'VARCHAR(100) PRIMARY KEY', - 'internal_port': 'INT NOT NULL CHECK(internal_port > 0 and internal_port < 65536)', - 'public_port': 'INT NOT NULL CHECK(internal_port > 0 and internal_port < 65536) UNIQUE', + 'port': 'INT NOT NULL CHECK(port > 0 and port < 65536) UNIQUE PRIMARY KEY', 'name': 'VARCHAR(100) NOT NULL' }, 'regexes': { 'regex': 'TEXT NOT NULL', 'mode': 'VARCHAR(1) NOT NULL', - 'service_id': 'VARCHAR(100) NOT NULL', + 'service_port': 'INT NOT NULL', 'is_blacklist': 'BOOLEAN NOT NULL CHECK (is_blacklist IN (0, 1))', 'blocked_packets': 'INTEGER UNSIGNED NOT NULL DEFAULT 0', 'regex_id': 'INTEGER PRIMARY KEY', 'is_case_sensitive' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1))', 'active' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1)) DEFAULT 1', - 'FOREIGN KEY (service_id)':'REFERENCES services (service_id)', + 'FOREIGN KEY (service_port)':'REFERENCES services (port)', }, 'keys_values': { 'key': 'VARCHAR(100) PRIMARY KEY', 'value': 'VARCHAR(100) NOT NULL', }, }) - self.query("CREATE UNIQUE INDEX IF NOT EXISTS unique_regex_service ON regexes (regex,service_id,is_blacklist,mode,is_case_sensitive);") + self.query("CREATE UNIQUE INDEX IF NOT EXISTS unique_regex_service ON regexes (regex,service_port,is_blacklist,mode,is_case_sensitive);") class KeyValueStorage: def __init__(self, db): @@ -94,42 +92,32 @@ class KeyValueStorage: self.db.query('UPDATE keys_values SET value=? WHERE key = ?;', str(value), key) class STATUS: - PAUSE = "pause" + STOP = "stop" ACTIVE = "active" class ServiceNotFoundException(Exception): pass class ServiceManager: - def __init__(self, id, db): - self.id = id + def __init__(self, port, db): + self.port = port self.db = db self.proxy = Proxy( - callback_blocked_update=self._stats_updater + callback_blocked_update=self._stats_updater, + public_port=port ) - self.status = STATUS.PAUSE + self.status = STATUS.STOP self.filters = {} - self._update_port_from_db() self._update_filters_from_db() self.lock = asyncio.Lock() self.starter = None - - def _update_port_from_db(self): - res = self.db.query(""" - SELECT - public_port, - internal_port - FROM services WHERE service_id = ?; - """, self.id) - if len(res) == 0: raise ServiceNotFoundException() - self.proxy.public_port = res[0]["public_port"] def _update_filters_from_db(self): res = self.db.query(""" SELECT regex, mode, regex_id `id`, is_blacklist, blocked_packets n_packets, is_case_sensitive - FROM regexes WHERE service_id = ? AND active=1; - """, self.id) + FROM regexes WHERE service_port = ? AND active=1; + """, self.port) #Filter check old_filters = set(self.filters.keys()) @@ -155,43 +143,34 @@ class ServiceManager: self.proxy.filters = list(self.filters.values()) def __update_status_db(self, status): - self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", status, self.id) + self.db.query("UPDATE services SET status = ? WHERE port = ?;", status, self.port) async def next(self,to): async with self.lock: - return await self._next(to) + return self._next(to) - async def _next(self, to): + def _next(self, to): if self.status != to: # ACTIVE -> PAUSE - if (self.status, to) in [(STATUS.ACTIVE, STATUS.PAUSE)]: - await self.proxy.pause() + if (self.status, to) in [(STATUS.ACTIVE, STATUS.STOP)]: + self.proxy.stop() self._set_status(to) # PAUSE -> ACTIVE - elif (self.status, to) in [(STATUS.PAUSE, STATUS.ACTIVE)]: - await self.proxy.reload() + elif (self.status, to) in [(STATUS.STOP, STATUS.ACTIVE)]: + self.proxy.restart() self._set_status(to) def _stats_updater(self,filter:Filter): self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", filter.blocked, filter.code) - async def update_port(self): - async with self.lock: - self._update_port_from_db() - if self.status in [STATUS.ACTIVE]: - await self.proxy.reload() - def _set_status(self,status): self.status = status self.__update_status_db(status) - async def update_filters(self): async with self.lock: self._update_filters_from_db() - if self.status in [STATUS.PAUSE, STATUS.ACTIVE]: - await self.proxy.reload() class ProxyManager: def __init__(self, db:SQLite): @@ -203,48 +182,24 @@ class ProxyManager: for key in list(self.proxy_table.keys()): await self.remove(key) - async def remove(self,id): + async def remove(self,port): async with self.lock: - if id in self.proxy_table: - await self.proxy_table[id].next(STATUS.PAUSE) - del self.proxy_table[id] + if port in self.proxy_table: + await self.proxy_table[port].next(STATUS.STOP) + del self.proxy_table[port] async def reload(self): async with self.lock: - for srv in self.db.query('SELECT service_id, status FROM services;'): - srv_id, req_status = srv["service_id"], srv["status"] - if srv_id in self.proxy_table: + for srv in self.db.query('SELECT port, status FROM services;'): + srv_port, req_status = srv["port"], srv["status"] + if srv_port in self.proxy_table: continue - self.proxy_table[srv_id] = ServiceManager(srv_id,self.db) - await self.proxy_table[srv_id].next(req_status) + self.proxy_table[srv_port] = ServiceManager(srv_port,self.db) + await self.proxy_table[srv_port].next(req_status) - def get(self,id): - if id in self.proxy_table: - return self.proxy_table[id] + def get(self,port): + if port in self.proxy_table: + return self.proxy_table[port] else: - raise ServiceNotFoundException() - -def check_port_is_open(port): - try: - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('0.0.0.0',port)) - sock.close() - return True - except Exception: - return False - -def gen_service_id(db): - while True: - res = secrets.token_hex(8) - if len(db.query('SELECT 1 FROM services WHERE service_id = ?;', res)) == 0: - break - return res - -def gen_internal_port(db): - while True: - res = random.randint(30000, 45000) - if len(db.query('SELECT 1 FROM services WHERE internal_port = ?;', res)) == 0: - break - return res \ No newline at end of file + raise ServiceNotFoundException() \ No newline at end of file