Fixed Netfiltering

This commit is contained in:
nik012003
2022-07-07 21:56:34 +02:00
parent a97fea7005
commit 333d19a40d
5 changed files with 99 additions and 745 deletions

View File

@@ -34,11 +34,6 @@ app = FastAPI(debug=DEBUG, redoc_url=None)
def APP_STATUS(): return "init" if conf.get("password") is None else "run" def APP_STATUS(): return "init" if conf.get("password") is None else "run"
def JWT_SECRET(): return conf.get("secret") def JWT_SECRET(): return conf.get("secret")
@app.on_event("shutdown")
async def shutdown_event():
db.disconnect()
await firewall.close()
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
db.init() db.init()
@@ -147,10 +142,8 @@ async def get_general_stats(auth: bool = Depends(is_loggined)):
""")[0] """)[0]
class ServiceModel(BaseModel): class ServiceModel(BaseModel):
id:str
status: str status: str
public_port: int port: int
internal_port: int
name: str name: str
n_regex: int n_regex: int
n_packets: int n_packets: int
@@ -160,119 +153,80 @@ async def get_service_list(auth: bool = Depends(is_loggined)):
"""Get the list of existent firegex services""" """Get the list of existent firegex services"""
return db.query(""" return db.query("""
SELECT SELECT
s.service_id `id`,
s.status status, s.status status,
s.public_port public_port, s.port port,
s.internal_port internal_port,
s.name name, s.name name,
COUNT(r.regex_id) n_regex, COUNT(r.regex_id) n_regex,
COALESCE(SUM(r.blocked_packets),0) n_packets COALESCE(SUM(r.blocked_packets),0) n_packets
FROM services s LEFT JOIN regexes r ON r.service_id = s.service_id FROM services s LEFT JOIN regexes r ON r.service_port = s.port
GROUP BY s.service_id; GROUP BY s.port;
""") """)
@app.get('/api/service/{service_id}', response_model=ServiceModel) @app.get('/api/service/{service_port}', response_model=ServiceModel)
async def get_service_by_id(service_id: str, auth: bool = Depends(is_loggined)): async def get_service_by_id(service_port: int, auth: bool = Depends(is_loggined)):
"""Get info about a specific service using his id""" """Get info about a specific service using his id"""
res = db.query(""" res = db.query("""
SELECT SELECT
s.service_id `id`,
s.status status, s.status status,
s.public_port public_port, s.port port,
s.internal_port internal_port,
s.name name, s.name name,
COUNT(r.regex_id) n_regex, COUNT(r.regex_id) n_regex,
COALESCE(SUM(r.blocked_packets),0) n_packets COALESCE(SUM(r.blocked_packets),0) n_packets
FROM services s LEFT JOIN regexes r ON r.service_id = s.service_id WHERE s.service_id = ? FROM services s LEFT JOIN regexes r ON r.service_port = s.port WHERE s.port = ?
GROUP BY s.service_id; GROUP BY s.port;
""", service_id) """, service_port)
if len(res) == 0: raise HTTPException(status_code=400, detail="This service does not exists!") if len(res) == 0: raise HTTPException(status_code=400, detail="This service does not exists!")
return res[0] return res[0]
class StatusMessageModel(BaseModel): class StatusMessageModel(BaseModel):
status:str status:str
@app.get('/api/service/{service_id}/pause', response_model=StatusMessageModel) @app.get('/api/service/{service_port}/stop', response_model=StatusMessageModel)
async def service_pause(service_id: str, auth: bool = Depends(is_loggined)): async def service_stop(service_port: int, auth: bool = Depends(is_loggined)):
"""Request the pause of a specific service""" """Request the stop of a specific service"""
await firewall.get(service_id).next(STATUS.PAUSE) await firewall.get(service_port).next(STATUS.STOP)
return {'status': 'ok'} return {'status': 'ok'}
@app.get('/api/service/{service_id}/start', response_model=StatusMessageModel) @app.get('/api/service/{service_port}/start', response_model=StatusMessageModel)
async def service_start(service_id: str, auth: bool = Depends(is_loggined)): async def service_start(service_port: int, auth: bool = Depends(is_loggined)):
"""Request the start of a specific service""" """Request the start of a specific service"""
await firewall.get(service_id).next(STATUS.ACTIVE) await firewall.get(service_port).next(STATUS.ACTIVE)
return {'status': 'ok'} return {'status': 'ok'}
@app.get('/api/service/{service_id}/delete', response_model=StatusMessageModel) @app.get('/api/service/{service_port}/delete', response_model=StatusMessageModel)
async def service_delete(service_id: str, auth: bool = Depends(is_loggined)): async def service_delete(service_port: int, auth: bool = Depends(is_loggined)):
"""Request the deletion of a specific service""" """Request the deletion of a specific service"""
db.query('DELETE FROM services WHERE service_id = ?;', service_id) db.query('DELETE FROM services WHERE port = ?;', service_port)
db.query('DELETE FROM regexes WHERE service_id = ?;', service_id) db.query('DELETE FROM regexes WHERE service_port = ?;', service_port)
await firewall.remove(service_id) await firewall.remove(service_port)
return {'status': 'ok'}
@app.get('/api/service/{service_id}/regen-port', response_model=StatusMessageModel)
async def regen_service_port(service_id: str, auth: bool = Depends(is_loggined)):
"""Request the regeneration of a the internal proxy port of a specific service"""
db.query('UPDATE services SET internal_port = ? WHERE service_id = ?;', gen_internal_port(db), service_id)
await firewall.get(service_id).update_port()
return {'status': 'ok'}
class ChangePortForm(BaseModel):
port: Union[int, None]
internalPort: Union[int, None]
@app.post('/api/service/{service_id}/change-ports', response_model=StatusMessageModel)
async def change_service_ports(service_id: str, change_port:ChangePortForm, auth: bool = Depends(is_loggined)):
"""Choose and change the ports of the service"""
if change_port.port is None and change_port.internalPort is None:
return {'status': 'Invalid Request!'}
try:
sql_inj = ""
query = []
if not change_port.port is None:
sql_inj+=" public_port = ? "
query.append(change_port.port)
if not change_port.port is None and not change_port.internalPort is None:
sql_inj += ","
if not change_port.internalPort is None:
sql_inj+=" internal_port = ? "
query.append(change_port.internalPort)
query.append(service_id)
db.query(f'UPDATE services SET {sql_inj} WHERE service_id = ?;', *query)
except sqlite3.IntegrityError:
return {'status': 'Name or/and port of the service has been already assigned to another service'}
await firewall.get(service_id).update_port()
return {'status': 'ok'} return {'status': 'ok'}
class RegexModel(BaseModel): class RegexModel(BaseModel):
regex:str regex:str
mode:str mode:str
id:int id:int
service_id:str service_port:int
is_blacklist: bool is_blacklist: bool
n_packets:int n_packets:int
is_case_sensitive:bool is_case_sensitive:bool
active:bool active:bool
@app.get('/api/service/{service_id}/regexes', response_model=List[RegexModel]) @app.get('/api/service/{service_port}/regexes', response_model=List[RegexModel])
async def get_service_regexe_list(service_id: str, auth: bool = Depends(is_loggined)): async def get_service_regexe_list(service_port: int, auth: bool = Depends(is_loggined)):
"""Get the list of the regexes of a service""" """Get the list of the regexes of a service"""
return db.query(""" return db.query("""
SELECT SELECT
regex, mode, regex_id `id`, service_id, is_blacklist, regex, mode, regex_id `id`, service_port, is_blacklist,
blocked_packets n_packets, is_case_sensitive, active blocked_packets n_packets, is_case_sensitive, active
FROM regexes WHERE service_id = ?; FROM regexes WHERE service_port = ?;
""", service_id) """, service_port)
@app.get('/api/regex/{regex_id}', response_model=RegexModel) @app.get('/api/regex/{regex_id}', response_model=RegexModel)
async def get_regex_by_id(regex_id: int, auth: bool = Depends(is_loggined)): async def get_regex_by_id(regex_id: int, auth: bool = Depends(is_loggined)):
"""Get regex info using his id""" """Get regex info using his id"""
res = db.query(""" res = db.query("""
SELECT SELECT
regex, mode, regex_id `id`, service_id, is_blacklist, regex, mode, regex_id `id`, service_port, is_blacklist,
blocked_packets n_packets, is_case_sensitive, active blocked_packets n_packets, is_case_sensitive, active
FROM regexes WHERE `id` = ?; FROM regexes WHERE `id` = ?;
""", regex_id) """, regex_id)
@@ -285,7 +239,7 @@ async def regex_delete(regex_id: int, auth: bool = Depends(is_loggined)):
res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id) res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id)
if len(res) != 0: if len(res) != 0:
db.query('DELETE FROM regexes WHERE regex_id = ?;', regex_id) db.query('DELETE FROM regexes WHERE regex_id = ?;', regex_id)
await firewall.get(res[0]["service_id"]).update_filters() await firewall.get(res[0]["service_port"]).update_filters()
return {'status': 'ok'} return {'status': 'ok'}
@@ -295,7 +249,7 @@ async def regex_enable(regex_id: int, auth: bool = Depends(is_loggined)):
res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id) res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id)
if len(res) != 0: if len(res) != 0:
db.query('UPDATE regexes SET active=1 WHERE regex_id = ?;', regex_id) db.query('UPDATE regexes SET active=1 WHERE regex_id = ?;', regex_id)
await firewall.get(res[0]["service_id"]).update_filters() await firewall.get(res[0]["service_port"]).update_filters()
return {'status': 'ok'} return {'status': 'ok'}
@app.get('/api/regex/{regex_id}/disable', response_model=StatusMessageModel) @app.get('/api/regex/{regex_id}/disable', response_model=StatusMessageModel)
@@ -304,11 +258,11 @@ async def regex_disable(regex_id: int, auth: bool = Depends(is_loggined)):
res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id) res = db.query('SELECT * FROM regexes WHERE regex_id = ?;', regex_id)
if len(res) != 0: if len(res) != 0:
db.query('UPDATE regexes SET active=0 WHERE regex_id = ?;', regex_id) db.query('UPDATE regexes SET active=0 WHERE regex_id = ?;', regex_id)
await firewall.get(res[0]["service_id"]).update_filters() await firewall.get(res[0]["service_port"]).update_filters()
return {'status': 'ok'} return {'status': 'ok'}
class RegexAddForm(BaseModel): class RegexAddForm(BaseModel):
service_id: str service_port: int
regex: str regex: str
mode: str mode: str
active: Union[bool,None] active: Union[bool,None]
@@ -323,36 +277,29 @@ async def add_new_regex(form: RegexAddForm, auth: bool = Depends(is_loggined)):
except Exception: except Exception:
return {"status":"Invalid regex"} return {"status":"Invalid regex"}
try: try:
db.query("INSERT INTO regexes (service_id, regex, is_blacklist, mode, is_case_sensitive, active ) VALUES (?, ?, ?, ?, ?, ?);", db.query("INSERT INTO regexes (service_port, regex, is_blacklist, mode, is_case_sensitive, active ) VALUES (?, ?, ?, ?, ?, ?);",
form.service_id, form.regex, form.is_blacklist, form.mode, form.is_case_sensitive, True if form.active is None else form.active ) form.service_port, form.regex, form.is_blacklist, form.mode, form.is_case_sensitive, True if form.active is None else form.active )
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
return {'status': 'An identical regex already exists'} return {'status': 'An identical regex already exists'}
await firewall.get(form.service_id).update_filters() await firewall.get(form.service_port).update_filters()
return {'status': 'ok'} return {'status': 'ok'}
class ServiceAddForm(BaseModel): class ServiceAddForm(BaseModel):
name: str name: str
port: int port: int
internalPort: Union[int, None]
class ServiceAddStatus(BaseModel): @app.post('/api/services/add', response_model=StatusMessageModel)
status:str
id: Union[str,None]
@app.post('/api/services/add', response_model=ServiceAddStatus)
async def add_new_service(form: ServiceAddForm, auth: bool = Depends(is_loggined)): async def add_new_service(form: ServiceAddForm, auth: bool = Depends(is_loggined)):
"""Add a new service""" """Add a new service"""
serv_id = gen_service_id(db)
try: try:
internal_port = form.internalPort if form.internalPort else gen_internal_port(db) db.query("INSERT INTO services (name, port, status) VALUES (?, ?, ?)",
db.query("INSERT INTO services (name, service_id, internal_port, public_port, status) VALUES (?, ?, ?, ?, ?)", form.name, form.port, STATUS.STOP)
form.name, serv_id, internal_port, form.port, 'stop')
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
return {'status': 'Name or/and ports of the service has been already assigned to another service'} return {'status': 'Name or/and ports of the service has been already assigned to another service'}
await firewall.reload() await firewall.reload()
return {'status': 'ok', "id": serv_id } return {'status': 'ok'}
async def frontend_debug_proxy(path): async def frontend_debug_proxy(path):
httpc = httpx.AsyncClient() httpc = httpx.AsyncClient()

View File

@@ -4,6 +4,7 @@ from threading import Lock, Thread
from scapy.all import IP, TCP, UDP from scapy.all import IP, TCP, UDP
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
import os, pcre2, traceback, asyncio import os, pcre2, traceback, asyncio
from kthread import KThread
QUEUE_BASE_NUM = 1000 QUEUE_BASE_NUM = 1000
@@ -176,40 +177,6 @@ class FiregexFilterManager:
if filter.port == int(port): if filter.port == int(port):
filter.delete() filter.delete()
def c_to_s(pkt, data):
print("SENDING", bytes(data[TCP].payload).decode())
if "bug" in bytes(data[TCP].payload).decode():
pkt.drop()
return
pkt.accept()
def s_to_c(pkt, data):
print("RECIVING", bytes(data[TCP].payload).decode())
pkt.accept()
"""
try:
manager.delete_all()
thr_list = []
q_list = manager.add("test_service",ProtoTypes.TCP, 8080, c_to_s, s_to_c)
print(manager.get())
for q in q_list:
thr_list.append(Thread(target=q.run))
thr_list[-1].start()
for t in thr_list:
t.join()
except KeyboardInterrupt:
for q in q_list:
q.unbind()
manager.delete_by_service("test_service")
#sudo iptables -I OUTPUT -p tcp --sport 8080 -j NFQUEUE --queue-num 10001 --queue-bypass -m comment --comment "&firegex&servid& Text"
#sudo iptables -I INPUT -p tcp --dport 8080 -j NFQUEUE --queue-num 10000 --queue-bypass -m comment --comment "&firegex&servid& Text"
"""
class Filter: class Filter:
def __init__(self, regex, is_case_sensitive=True, is_blacklist=True, c_to_s=False, s_to_c=False, blocked_packets=0, code=None): def __init__(self, regex, is_case_sensitive=True, is_blacklist=True, c_to_s=False, s_to_c=False, blocked_packets=0, code=None):
self.regex = regex self.regex = regex
@@ -231,16 +198,16 @@ class Filter:
return True if self.compiled_regex.search(data) else False return True if self.compiled_regex.search(data) else False
class Proxy: class Proxy:
def __init__(self, public_port, callback_blocked_update=None, filters=None): def __init__(self, public_port = 0, callback_blocked_update=None, filters=None):
self.manager = FiregexFilterManager() self.manager = FiregexFilterManager()
self.update_config_lock = asyncio.Lock()
self.status_change = asyncio.Lock()
self.port = public_port self.port = public_port
self.filters: Set[Filter] = set(filters) if filters else set([]) self.filters: Set[Filter] = set(filters) if filters else set([])
self.use_filters = True self.use_filters = True
self.callback_blocked_update = callback_blocked_update self.callback_blocked_update = callback_blocked_update
self.threads = []
self.queue_list = []
async def start(self): def start(self):
self.manager.delete_by_port(self.port) self.manager.delete_by_port(self.port)
def c_to_s(pkt, data): def c_to_s(pkt, data):
@@ -273,44 +240,24 @@ class Proxy:
pass pass
pkt.accept() pkt.accept()
self.manager.add(ProtoTypes.TCP, self.port, c_to_s, s_to_c) self.queue_list = self.manager.add(ProtoTypes.TCP, self.port, c_to_s, s_to_c)
for ele in self.queue_list:
self.threads.append(KThread(target=ele.run))
self.threads[-1].daemon = True
self.threads[-1].start()
async def stop(self): def stop(self):
async with self.status_change: self.manager.delete_by_port(self.port)
if self.isactive(): for ele in self.threads:
self.process.kill() ele.kill()
return False if ele.is_alive():
return True print("Not killed succesffully") #TODO
self.threads = []
for ele in self.queue_list:
ele.unbind()
self.queue_list = []
async def restart(self, in_pause=False):
status = await self.stop()
await self.start(in_pause=in_pause)
return status
async def update_config(self, filters_codes):
async with self.update_config_lock:
if (self.isactive()):
self.process.stdin.write((" ".join(filters_codes)+"\n").encode())
await self.process.stdin.drain()
async def reload(self):
if self.isactive():
async with self.filter_map_lock:
self.filter_map = self.compile_filters()
filters_codes = self.get_filter_codes()
await self.update_config(filters_codes)
def get_filter_codes(self):
filters_codes = list(self.filter_map.keys())
filters_codes.sort(key=lambda a: self.filter_map[a].blocked, reverse=True)
return filters_codes
def isactive(self):
return self.process and self.process.returncode is None
async def pause(self):
if self.isactive():
await self.update_config([])
else:
await self.start(in_pause=True)
def restart(self):
self.stop()
self.start()

View File

@@ -1,497 +0,0 @@
/*
Copyright (c) 2007 Arash Partow (http://www.partow.net)
URL: http://www.partow.net/programming/tcpproxy/index.html
Modified and adapted by Pwnzer0tt1
*/
#include <cstdlib>
#include <cstddef>
#include <iostream>
#include <string>
#include <mutex>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/bind/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread/mutex.hpp>
#include <jpcre2.hpp>
typedef jpcre2::select<char> jp;
using namespace std;
bool unhexlify(string const &hex, string &newString) {
try{
int len = hex.length();
for(int i=0; i< len; i+=2)
{
std::string byte = hex.substr(i,2);
char chr = (char) (int)strtol(byte.c_str(), NULL, 16);
newString.push_back(chr);
}
return true;
}
catch (...){
return false;
}
}
typedef pair<string,jp::Regex> regex_rule_pair;
typedef vector<regex_rule_pair> regex_rule_vector;
struct regex_rules{
regex_rule_vector regex_s_c_w, regex_c_s_w, regex_s_c_b, regex_c_s_b;
regex_rule_vector* getByCode(char code){
switch(code){
case 'C': // Client to server Blacklist
return &regex_c_s_b; break;
case 'c': // Client to server Whitelist
return &regex_c_s_w; break;
case 'S': // Server to client Blacklist
return &regex_s_c_b; break;
case 's': // Server to client Whitelist
return &regex_s_c_w; break;
}
throw invalid_argument( "Expected 'C' 'c' 'S' or 's'" );
}
void add(const char* arg){
//Integrity checks
size_t arg_len = strlen(arg);
if (arg_len < 2 || arg_len%2 != 0) return;
if (arg[0] != '0' && arg[0] != '1') return;
if (arg[1] != 'C' && arg[1] != 'c' && arg[1] != 'S' && arg[1] != 's') return;
string hex(arg+2), expr;
if (!unhexlify(hex, expr)) return;
//Push regex
jp::Regex regex(expr,arg[0] == '1'?"gS":"giS");
if (regex){
#ifdef DEBUG
cerr << "Added regex " << expr << " " << arg << endl;
#endif
getByCode(arg[1])->push_back(make_pair(string(arg), regex));
} else {
cerr << "Regex " << arg << " was not compiled successfully" << endl;
}
}
};
shared_ptr<regex_rules> regex_config;
mutex update_mutex;
#ifdef MULTI_THREAD
mutex stdout_mutex;
#endif
bool filter_data(unsigned char* data, const size_t& bytes_transferred, regex_rule_vector const &blacklist, regex_rule_vector const &whitelist){
#ifdef DEBUG_PACKET
cerr << "---------------- Packet ----------------" << endl;
for(int i=0;i<bytes_transferred;i++) cerr << data[i];
cerr << endl;
for(int i=0;i<bytes_transferred;i++) fprintf(stderr, "%x", data[i]);
cerr << endl;
cerr << "---------------- End Packet ----------------" << endl;
#endif
string str_data((char *) data, bytes_transferred);
for (regex_rule_pair ele:blacklist){
try{
if(ele.second.match(str_data)){
#ifdef MULTI_THREAD
std::unique_lock<std::mutex> lck(stdout_mutex);
#endif
cout << "BLOCKED " << ele.first << endl;
return false;
}
} catch(...){
cerr << "Error while matching regex: " << ele.first << endl;
}
}
for (regex_rule_pair ele:whitelist){
try{
if(!ele.second.match(str_data)){
#ifdef MULTI_THREAD
std::unique_lock<std::mutex> lck(stdout_mutex);
#endif
cout << "BLOCKED " << ele.first << endl;
return false;
}
} catch(...){
cerr << "Error while matching regex: " << ele.first << endl;
}
}
#ifdef DEBUG
cerr << "Packet Accepted!" << endl;
#endif
return true;
}
namespace tcp_proxy
{
namespace ip = boost::asio::ip;
class bridge : public boost::enable_shared_from_this<bridge>
{
public:
typedef ip::tcp::socket socket_type;
typedef boost::shared_ptr<bridge> ptr_type;
bridge(boost::asio::io_context& ios)
: downstream_socket_(ios),
upstream_socket_ (ios),
thread_safety(ios)
{}
socket_type& downstream_socket()
{
// Client socket
return downstream_socket_;
}
socket_type& upstream_socket()
{
// Remote server socket
return upstream_socket_;
}
void start(const string& upstream_host, unsigned short upstream_port)
{
// Attempt connection to remote server (upstream side)
upstream_socket_.async_connect(
ip::tcp::endpoint(
boost::asio::ip::address::from_string(upstream_host),
upstream_port),
boost::asio::bind_executor(thread_safety,
boost::bind(
&bridge::handle_upstream_connect,
shared_from_this(),
boost::asio::placeholders::error)));
}
void handle_upstream_connect(const boost::system::error_code& error)
{
if (!error)
{
// Setup async read from remote server (upstream)
upstream_socket_.async_read_some(
boost::asio::buffer(upstream_data_,max_data_length),
boost::asio::bind_executor(thread_safety,
boost::bind(&bridge::handle_upstream_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
// Setup async read from client (downstream)
downstream_socket_.async_read_some(
boost::asio::buffer(downstream_data_,max_data_length),
boost::asio::bind_executor(thread_safety,
boost::bind(&bridge::handle_downstream_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
else
close();
}
private:
/*
Section A: Remote Server --> Proxy --> Client
Process data recieved from remote sever then send to client.
*/
// Read from remote server complete, now send data to client
void handle_upstream_read(const boost::system::error_code& error,
const size_t& bytes_transferred) // Da Server a Client
{
if (!error)
{
shared_ptr<regex_rules> regex_old_config = regex_config;
if (filter_data(upstream_data_, bytes_transferred, regex_old_config->regex_s_c_b, regex_old_config->regex_s_c_w)){
async_write(downstream_socket_,
boost::asio::buffer(upstream_data_,bytes_transferred),
boost::asio::bind_executor(thread_safety,
boost::bind(&bridge::handle_downstream_write,
shared_from_this(),
boost::asio::placeholders::error)));
}else{
close();
}
}
else
close();
}
// Write to client complete, Async read from remote server
void handle_downstream_write(const boost::system::error_code& error)
{
if (!error)
{
upstream_socket_.async_read_some(
boost::asio::buffer(upstream_data_,max_data_length),
boost::asio::bind_executor(thread_safety,
boost::bind(&bridge::handle_upstream_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
else
close();
}
// *** End Of Section A ***
/*
Section B: Client --> Proxy --> Remove Server
Process data recieved from client then write to remove server.
*/
// Read from client complete, now send data to remote server
void handle_downstream_read(const boost::system::error_code& error,
const size_t& bytes_transferred) // Da Client a Server
{
if (!error)
{
shared_ptr<regex_rules> regex_old_config = regex_config;
if (filter_data(downstream_data_, bytes_transferred, regex_old_config->regex_c_s_b, regex_old_config->regex_c_s_w)){
async_write(upstream_socket_,
boost::asio::buffer(downstream_data_,bytes_transferred),
boost::asio::bind_executor(thread_safety,
boost::bind(&bridge::handle_upstream_write,
shared_from_this(),
boost::asio::placeholders::error)));
}else{
close();
}
}
else
close();
}
// Write to remote server complete, Async read from client
void handle_upstream_write(const boost::system::error_code& error)
{
if (!error)
{
downstream_socket_.async_read_some(
boost::asio::buffer(downstream_data_,max_data_length),
boost::asio::bind_executor(thread_safety,
boost::bind(&bridge::handle_downstream_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
else
close();
}
// *** End Of Section B ***
void close()
{
boost::mutex::scoped_lock lock(mutex_);
if (downstream_socket_.is_open())
{
downstream_socket_.close();
}
if (upstream_socket_.is_open())
{
upstream_socket_.close();
}
}
socket_type downstream_socket_;
socket_type upstream_socket_;
enum { max_data_length = 8192 }; //8KB
unsigned char downstream_data_[max_data_length];
unsigned char upstream_data_ [max_data_length];
boost::asio::io_context::strand thread_safety;
boost::mutex mutex_;
public:
class acceptor
{
public:
acceptor(boost::asio::io_context& io_context,
const string& local_host, unsigned short local_port,
const string& upstream_host, unsigned short upstream_port)
: io_context_(io_context),
localhost_address(boost::asio::ip::address_v4::from_string(local_host)),
acceptor_(io_context_,ip::tcp::endpoint(localhost_address,local_port)),
upstream_port_(upstream_port),
upstream_host_(upstream_host)
{}
bool accept_connections()
{
try
{
session_ = boost::shared_ptr<bridge>(new bridge(io_context_));
acceptor_.async_accept(session_->downstream_socket(),
boost::asio::bind_executor(session_->thread_safety,
boost::bind(&acceptor::handle_accept,
this,
boost::asio::placeholders::error)));
}
catch(exception& e)
{
cerr << "acceptor exception: " << e.what() << endl;
return false;
}
return true;
}
private:
void handle_accept(const boost::system::error_code& error)
{
if (!error)
{
session_->start(upstream_host_,upstream_port_);
if (!accept_connections())
{
cerr << "Failure during call to accept." << endl;
}
}
else
{
cerr << "Error: " << error.message() << endl;
}
}
boost::asio::io_context& io_context_;
ip::address_v4 localhost_address;
ip::tcp::acceptor acceptor_;
ptr_type session_;
unsigned short upstream_port_;
string upstream_host_;
};
};
}
void update_config (boost::asio::streambuf &input_buffer){
#ifdef DEBUG
cerr << "Updating configuration" << endl;
#endif
std::istream config_stream(&input_buffer);
std::unique_lock<std::mutex> lck(update_mutex);
regex_rules *regex_new_config = new regex_rules();
string data;
while(true){
config_stream >> data;
if (config_stream.eof()) break;
regex_new_config->add(data.c_str());
}
regex_config.reset(regex_new_config);
}
class async_updater
{
public:
async_updater(boost::asio::io_context& io_context) : input_(io_context, ::dup(STDIN_FILENO)), thread_safety(io_context)
{
boost::asio::async_read_until(input_, input_buffer_, '\n',
boost::asio::bind_executor(thread_safety,
boost::bind(&async_updater::on_update, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
void on_update(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
update_config(input_buffer_);
boost::asio::async_read_until(input_, input_buffer_, '\n',
boost::asio::bind_executor(thread_safety,
boost::bind(&async_updater::on_update, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
else
{
close();
}
}
void close()
{
input_.close();
}
private:
boost::asio::posix::stream_descriptor input_;
boost::asio::io_context::strand thread_safety;
boost::asio::streambuf input_buffer_;
};
int main(int argc, char* argv[])
{
if (argc < 5)
{
cerr << "usage: tcpproxy_server <local host ip> <local port> <forward host ip> <forward port>" << endl;
return 1;
}
const unsigned short local_port = static_cast<unsigned short>(::atoi(argv[2]));
const unsigned short forward_port = static_cast<unsigned short>(::atoi(argv[4]));
const string local_host = argv[1];
const string forward_host = argv[3];
boost::asio::io_context ios;
boost::asio::streambuf buf;
boost::asio::posix::stream_descriptor cin_in(ios, ::dup(STDIN_FILENO));
boost::asio::read_until(cin_in, buf,'\n');
update_config(buf);
async_updater updater(ios);
#ifdef DEBUG
cerr << "Starting Proxy" << endl;
#endif
try
{
tcp_proxy::bridge::acceptor acceptor(ios,
local_host, local_port,
forward_host, forward_port);
acceptor.accept_connections();
#ifdef MULTI_THREAD
boost::thread_group tg;
#ifdef THREAD_NUM
for (unsigned i = 0; i < THREAD_NUM; ++i)
#else
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
#endif
tg.create_thread(boost::bind(&boost::asio::io_context::run, &ios));
tg.join_all();
#else
ios.run();
#endif
}
catch(exception& e)
{
cerr << "Error: " << e.what() << endl;
return 1;
}
#ifdef DEBUG
cerr << "Proxy stopped!" << endl;
#endif
return 0;
}

View File

@@ -4,3 +4,5 @@ uvicorn[standard]
passlib[bcrypt] passlib[bcrypt]
python-jose[cryptography] python-jose[cryptography]
NetfilterQueue NetfilterQueue
kthread
scapy

View File

@@ -53,28 +53,26 @@ class SQLite():
self.create_schema({ self.create_schema({
'services': { 'services': {
'status': 'VARCHAR(100) NOT NULL', 'status': 'VARCHAR(100) NOT NULL',
'service_id': 'VARCHAR(100) PRIMARY KEY', 'port': 'INT NOT NULL CHECK(port > 0 and port < 65536) UNIQUE PRIMARY KEY',
'internal_port': 'INT NOT NULL CHECK(internal_port > 0 and internal_port < 65536)',
'public_port': 'INT NOT NULL CHECK(internal_port > 0 and internal_port < 65536) UNIQUE',
'name': 'VARCHAR(100) NOT NULL' 'name': 'VARCHAR(100) NOT NULL'
}, },
'regexes': { 'regexes': {
'regex': 'TEXT NOT NULL', 'regex': 'TEXT NOT NULL',
'mode': 'VARCHAR(1) NOT NULL', 'mode': 'VARCHAR(1) NOT NULL',
'service_id': 'VARCHAR(100) NOT NULL', 'service_port': 'INT NOT NULL',
'is_blacklist': 'BOOLEAN NOT NULL CHECK (is_blacklist IN (0, 1))', 'is_blacklist': 'BOOLEAN NOT NULL CHECK (is_blacklist IN (0, 1))',
'blocked_packets': 'INTEGER UNSIGNED NOT NULL DEFAULT 0', 'blocked_packets': 'INTEGER UNSIGNED NOT NULL DEFAULT 0',
'regex_id': 'INTEGER PRIMARY KEY', 'regex_id': 'INTEGER PRIMARY KEY',
'is_case_sensitive' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1))', 'is_case_sensitive' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1))',
'active' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1)) DEFAULT 1', 'active' : 'BOOLEAN NOT NULL CHECK (is_case_sensitive IN (0, 1)) DEFAULT 1',
'FOREIGN KEY (service_id)':'REFERENCES services (service_id)', 'FOREIGN KEY (service_port)':'REFERENCES services (port)',
}, },
'keys_values': { 'keys_values': {
'key': 'VARCHAR(100) PRIMARY KEY', 'key': 'VARCHAR(100) PRIMARY KEY',
'value': 'VARCHAR(100) NOT NULL', 'value': 'VARCHAR(100) NOT NULL',
}, },
}) })
self.query("CREATE UNIQUE INDEX IF NOT EXISTS unique_regex_service ON regexes (regex,service_id,is_blacklist,mode,is_case_sensitive);") self.query("CREATE UNIQUE INDEX IF NOT EXISTS unique_regex_service ON regexes (regex,service_port,is_blacklist,mode,is_case_sensitive);")
class KeyValueStorage: class KeyValueStorage:
def __init__(self, db): def __init__(self, db):
@@ -94,42 +92,32 @@ class KeyValueStorage:
self.db.query('UPDATE keys_values SET value=? WHERE key = ?;', str(value), key) self.db.query('UPDATE keys_values SET value=? WHERE key = ?;', str(value), key)
class STATUS: class STATUS:
PAUSE = "pause" STOP = "stop"
ACTIVE = "active" ACTIVE = "active"
class ServiceNotFoundException(Exception): pass class ServiceNotFoundException(Exception): pass
class ServiceManager: class ServiceManager:
def __init__(self, id, db): def __init__(self, port, db):
self.id = id self.port = port
self.db = db self.db = db
self.proxy = Proxy( self.proxy = Proxy(
callback_blocked_update=self._stats_updater callback_blocked_update=self._stats_updater,
public_port=port
) )
self.status = STATUS.PAUSE self.status = STATUS.STOP
self.filters = {} self.filters = {}
self._update_port_from_db()
self._update_filters_from_db() self._update_filters_from_db()
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.starter = None self.starter = None
def _update_port_from_db(self):
res = self.db.query("""
SELECT
public_port,
internal_port
FROM services WHERE service_id = ?;
""", self.id)
if len(res) == 0: raise ServiceNotFoundException()
self.proxy.public_port = res[0]["public_port"]
def _update_filters_from_db(self): def _update_filters_from_db(self):
res = self.db.query(""" res = self.db.query("""
SELECT SELECT
regex, mode, regex_id `id`, is_blacklist, regex, mode, regex_id `id`, is_blacklist,
blocked_packets n_packets, is_case_sensitive blocked_packets n_packets, is_case_sensitive
FROM regexes WHERE service_id = ? AND active=1; FROM regexes WHERE service_port = ? AND active=1;
""", self.id) """, self.port)
#Filter check #Filter check
old_filters = set(self.filters.keys()) old_filters = set(self.filters.keys())
@@ -155,43 +143,34 @@ class ServiceManager:
self.proxy.filters = list(self.filters.values()) self.proxy.filters = list(self.filters.values())
def __update_status_db(self, status): def __update_status_db(self, status):
self.db.query("UPDATE services SET status = ? WHERE service_id = ?;", status, self.id) self.db.query("UPDATE services SET status = ? WHERE port = ?;", status, self.port)
async def next(self,to): async def next(self,to):
async with self.lock: async with self.lock:
return await self._next(to) return self._next(to)
async def _next(self, to): def _next(self, to):
if self.status != to: if self.status != to:
# ACTIVE -> PAUSE # ACTIVE -> PAUSE
if (self.status, to) in [(STATUS.ACTIVE, STATUS.PAUSE)]: if (self.status, to) in [(STATUS.ACTIVE, STATUS.STOP)]:
await self.proxy.pause() self.proxy.stop()
self._set_status(to) self._set_status(to)
# PAUSE -> ACTIVE # PAUSE -> ACTIVE
elif (self.status, to) in [(STATUS.PAUSE, STATUS.ACTIVE)]: elif (self.status, to) in [(STATUS.STOP, STATUS.ACTIVE)]:
await self.proxy.reload() self.proxy.restart()
self._set_status(to) self._set_status(to)
def _stats_updater(self,filter:Filter): def _stats_updater(self,filter:Filter):
self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", filter.blocked, filter.code) self.db.query("UPDATE regexes SET blocked_packets = ? WHERE regex_id = ?;", filter.blocked, filter.code)
async def update_port(self):
async with self.lock:
self._update_port_from_db()
if self.status in [STATUS.ACTIVE]:
await self.proxy.reload()
def _set_status(self,status): def _set_status(self,status):
self.status = status self.status = status
self.__update_status_db(status) self.__update_status_db(status)
async def update_filters(self): async def update_filters(self):
async with self.lock: async with self.lock:
self._update_filters_from_db() self._update_filters_from_db()
if self.status in [STATUS.PAUSE, STATUS.ACTIVE]:
await self.proxy.reload()
class ProxyManager: class ProxyManager:
def __init__(self, db:SQLite): def __init__(self, db:SQLite):
@@ -203,48 +182,24 @@ class ProxyManager:
for key in list(self.proxy_table.keys()): for key in list(self.proxy_table.keys()):
await self.remove(key) await self.remove(key)
async def remove(self,id): async def remove(self,port):
async with self.lock: async with self.lock:
if id in self.proxy_table: if port in self.proxy_table:
await self.proxy_table[id].next(STATUS.PAUSE) await self.proxy_table[port].next(STATUS.STOP)
del self.proxy_table[id] del self.proxy_table[port]
async def reload(self): async def reload(self):
async with self.lock: async with self.lock:
for srv in self.db.query('SELECT service_id, status FROM services;'): for srv in self.db.query('SELECT port, status FROM services;'):
srv_id, req_status = srv["service_id"], srv["status"] srv_port, req_status = srv["port"], srv["status"]
if srv_id in self.proxy_table: if srv_port in self.proxy_table:
continue continue
self.proxy_table[srv_id] = ServiceManager(srv_id,self.db) self.proxy_table[srv_port] = ServiceManager(srv_port,self.db)
await self.proxy_table[srv_id].next(req_status) await self.proxy_table[srv_port].next(req_status)
def get(self,id): def get(self,port):
if id in self.proxy_table: if port in self.proxy_table:
return self.proxy_table[id] return self.proxy_table[port]
else: else:
raise ServiceNotFoundException() raise ServiceNotFoundException()
def check_port_is_open(port):
try:
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0',port))
sock.close()
return True
except Exception:
return False
def gen_service_id(db):
while True:
res = secrets.token_hex(8)
if len(db.query('SELECT 1 FROM services WHERE service_id = ?;', res)) == 0:
break
return res
def gen_internal_port(db):
while True:
res = random.randint(30000, 45000)
if len(db.query('SELECT 1 FROM services WHERE internal_port = ?;', res)) == 0:
break
return res