From 35c46a223ecbb3e3fce11bf5aa12348afd498caa Mon Sep 17 00:00:00 2001 From: Domingo Dirutigliano Date: Sun, 16 Mar 2025 13:37:37 +0100 Subject: [PATCH] fixes to encodings, implemeneted ws frame parsing with decompression support --- fgex-lib/fgex-pip/build/lib/fgex/__init__.py | 1 + fgex-lib/fgex-pip/build/lib/fgex/__main__.py | 6 + .../firegex/nfproxy/internals/__init__.py | 8 +- fgex-lib/firegex/nfproxy/models/http.py | 159 ++++++++++++++++-- fgex-lib/requirements.txt | 1 + .../src/components/NFProxy/NFProxyDocs.tsx | 26 ++- tests/nfproxy_test.py | 66 +++++++- 7 files changed, 240 insertions(+), 27 deletions(-) create mode 100644 fgex-lib/fgex-pip/build/lib/fgex/__init__.py create mode 100644 fgex-lib/fgex-pip/build/lib/fgex/__main__.py diff --git a/fgex-lib/fgex-pip/build/lib/fgex/__init__.py b/fgex-lib/fgex-pip/build/lib/fgex/__init__.py new file mode 100644 index 0000000..1bf13c9 --- /dev/null +++ b/fgex-lib/fgex-pip/build/lib/fgex/__init__.py @@ -0,0 +1 @@ +from firegex import * \ No newline at end of file diff --git a/fgex-lib/fgex-pip/build/lib/fgex/__main__.py b/fgex-lib/fgex-pip/build/lib/fgex/__main__.py new file mode 100644 index 0000000..810291c --- /dev/null +++ b/fgex-lib/fgex-pip/build/lib/fgex/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from firegex.cli import run + +if __name__ == "__main__": + run() diff --git a/fgex-lib/firegex/nfproxy/internals/__init__.py b/fgex-lib/firegex/nfproxy/internals/__init__.py index 8070cc6..fe2de9e 100644 --- a/fgex-lib/firegex/nfproxy/internals/__init__.py +++ b/fgex-lib/firegex/nfproxy/internals/__init__.py @@ -121,8 +121,7 @@ def handle_packet(glob: dict) -> None: new_params = params.copy() for ele in params[i]: new_params[i] = ele - for ele in try_to_call(new_params): - yield ele + yield from try_to_call(new_params) is_base_call = False break if is_base_call: @@ -166,4 +165,9 @@ def compile(glob:dict) -> None: internal_data.invalid_encoding_action = glob["FGEX_INVALID_ENCODING_ACTION"] PacketHandlerResult(glob).reset_result() + + def fake_exit(*_a, **_k): + print("WARNING: This function should not be called", flush=True) + + glob["exit"] = fake_exit diff --git a/fgex-lib/firegex/nfproxy/models/http.py b/fgex-lib/firegex/nfproxy/models/http.py index 082c93d..5fa54a7 100644 --- a/fgex-lib/firegex/nfproxy/models/http.py +++ b/fgex-lib/firegex/nfproxy/models/http.py @@ -11,6 +11,9 @@ import gzip import io import zlib import brotli +from websockets.frames import Frame +from websockets.extensions.permessage_deflate import PerMessageDeflate +from pyllhttp import PAUSED_H2_UPGRADE, PAUSED_UPGRADE @dataclass class InternalHTTPMessage: @@ -33,16 +36,21 @@ class InternalHTTPMessage: method: str = field(default=str) content_length: int = field(default=0) stream: bytes = field(default_factory=bytes) + ws_stream: list[Frame] = field(default_factory=list) # Decoded websocket stream + upgrading_to_h2: bool = field(default=False) + upgrading_to_ws: bool = field(default=False) @dataclass class InternalHttpBuffer: """Internal class to handle HTTP messages""" _url_buffer: bytes = field(default_factory=bytes) - _header_fields: dict[bytes, bytes] = field(default_factory=dict) + _raw_header_fields: dict[str, str|list[str]] = field(default_factory=dict) + _header_fields: dict[str, str] = field(default_factory=dict) _body_buffer: bytes = field(default_factory=bytes) _status_buffer: bytes = field(default_factory=bytes) _current_header_field: bytes = field(default_factory=bytes) _current_header_value: bytes = field(default_factory=bytes) + _ws_packet_stream: bytes = field(default_factory=bytes) class InternalCallbackHandler(): @@ -52,6 +60,8 @@ class InternalCallbackHandler(): raised_error = False has_begun = False messages: deque[InternalHTTPMessage] = deque() + _ws_extentions = None + _ws_raised_error = False def reset_data(self): self.msg = InternalHTTPMessage() @@ -92,14 +102,31 @@ class InternalCallbackHandler(): def on_header_value_complete(self): if self.buffers._current_header_field: - self.buffers._header_fields[self.buffers._current_header_field.decode(errors="ignore")] = self.buffers._current_header_value.decode(errors="ignore") + k, v = self.buffers._current_header_field.decode(errors="ignore"), self.buffers._current_header_value.decode(errors="ignore") + old_value = self.buffers._raw_header_fields.get(k, None) + + # raw headers are stored as thay were, considering to check changes between headers encoding + if isinstance(old_value, list): + old_value.append(v) + elif isinstance(old_value, str): + self.buffers._raw_header_fields[k] = [old_value, v] + else: + self.buffers._raw_header_fields[k] = v + + # Decoding headers normally + kl = k.lower() + if kl in self.buffers._header_fields: + self.buffers._header_fields[kl] += f", {v}" # Should be considered as a single list separated by commas as said in the RFC + else: + self.buffers._header_fields[kl] = v + self.buffers._current_header_field = b"" self.buffers._current_header_value = b"" def on_headers_complete(self): - self.msg.headers = self.buffers._header_fields - self.msg.lheaders = {k.lower(): v for k, v in self.buffers._header_fields.items()} - self.buffers._header_fields = {} + self.msg.headers = self.buffers._raw_header_fields + self.msg.lheaders = self.buffers._header_fields + self.buffers._raw_header_fields = {} self.buffers._current_header_field = b"" self.buffers._current_header_value = b"" self.msg.headers_complete = True @@ -119,6 +146,7 @@ class InternalCallbackHandler(): def on_message_complete(self): self.msg.body = self.buffers._body_buffer + self.msg.should_upgrade = self.should_upgrade self.buffers._body_buffer = b"" encodings = [ele.strip() for ele in self.content_encoding.lower().split(",")] decode_success = True @@ -142,7 +170,7 @@ class InternalCallbackHandler(): print(f"Error decompressing brotli: {e}: skipping", flush=True) decode_success = False break - elif enc == "gzip": + elif enc == "gzip" or enc == "x-gzip": #https://datatracker.ietf.org/doc/html/rfc2616#section-3.5 try: if "gzip" in self.content_encoding.lower(): with gzip.GzipFile(fileobj=io.BytesIO(decoding_body)) as f: @@ -158,6 +186,8 @@ class InternalCallbackHandler(): print(f"Error decompressing zstd: {e}: skipping", flush=True) decode_success = False break + elif enc == "identity": + pass # No need to do anything https://datatracker.ietf.org/doc/html/rfc2616#section-3.5 (it's possible to be found also if it should't be used) else: decode_success = False break @@ -214,20 +244,90 @@ class InternalCallbackHandler(): def content_length_parsed(self) -> int: return self.content_length + def _is_input(self) -> bool: + raise NotImplementedError() + def _packet_to_stream(self): return self.should_upgrade and self.save_body + def _stream_parser(self, data: bytes): + if self.msg.upgrading_to_ws: + if self._ws_raised_error: + self.msg.stream += data + self.msg.total_size += len(data) + return + self.buffers._ws_packet_stream += data + while True: + try: + new_frame, self.buffers._ws_packet_stream = self._parse_websocket_frame(self.buffers._ws_packet_stream) + except Exception as e: + self._ws_raised_error = True + self.msg.stream += self.buffers._ws_packet_stream + self.buffers._ws_packet_stream = b"" + self.msg.total_size += len(data) + return + if new_frame is None: + break + self.msg.ws_stream.append(new_frame) + self.msg.total_size += len(new_frame.data) + if self.msg.upgrading_to_h2: + self.msg.total_size += len(data) + self.msg.stream += data + + def _parse_websocket_ext(self): + ext_ws = [] + req_ext = [] + for ele in self.msg.lheaders.get("sec-websocket-extensions", "").split(","): + for xt in ele.split(";"): + req_ext.append(xt.strip().lower()) + + for ele in req_ext: + if ele == "permessage-deflate": + ext_ws.append(PerMessageDeflate(False, False, 15, 15)) + return ext_ws + + def _parse_websocket_frame(self, data: bytes) -> tuple[Frame|None, bytes]: + # mask = is_input + if self._ws_extentions is None: + self._ws_extentions = self._parse_websocket_ext() + read_buffering = bytearray() + def read_exact(n: int): + nonlocal read_buffering + buffer = bytearray(read_buffering) + while len(buffer) < n: + data = yield + if data is None: + raise RuntimeError("Should not send None to this generator") + buffer.extend(data) + new_data = bytes(buffer[:n]) + read_buffering = buffer[n:] + return new_data + + parsing = Frame.parse(read_exact, extensions=self._ws_extentions, mask=self._is_input()) + parsing.send(None) + try: + parsing.send(bytearray(data)) + except StopIteration as e: + return e.value, read_buffering + + return None, read_buffering + def parse_data(self, data: bytes): if self._packet_to_stream(): # This is a websocket upgrade! - self.msg.message_complete = True # The message is complete but becomed a stream, so need to be called every time a new packet is received - self.msg.total_size += len(data) - self.msg.stream += data #buffering stream + self._stream_parser(data) else: try: - self.execute(data) + reason, consumed = self.execute(data) + if reason == PAUSED_UPGRADE: + self.msg.upgrading_to_ws = True + self.msg.message_complete = True + self._stream_parser(data[consumed:]) + elif reason == PAUSED_H2_UPGRADE: + self.msg.upgrading_to_h2 = True + self.msg.message_complete = True + self._stream_parser(data[consumed:]) except Exception as e: self.raised_error = True - print(f"Error parsing HTTP packet: {e} with data {data}", flush=True) raise e def pop_message(self): @@ -241,18 +341,23 @@ class InternalHttpRequest(InternalCallbackHandler, pyllhttp.Request): def __init__(self): super(InternalCallbackHandler, self).__init__() super(pyllhttp.Request, self).__init__() + + def _is_input(self): + return True class InternalHttpResponse(InternalCallbackHandler, pyllhttp.Response): def __init__(self): super(InternalCallbackHandler, self).__init__() super(pyllhttp.Response, self).__init__() + + def _is_input(self): + return False class InternalBasicHttpMetaClass: """Internal class to handle HTTP requests and responses""" def __init__(self, parser: InternalHttpRequest|InternalHttpResponse, msg: InternalHTTPMessage): self._parser = parser - self.stream = b"" self.raised_error = False self._message: InternalHTTPMessage|None = msg self._contructor_hook() @@ -313,12 +418,32 @@ class InternalBasicHttpMetaClass: @property def should_upgrade(self) -> bool: """If the message should upgrade""" - return self._message.should_upgrade + return self._parser.should_upgrade @property def content_length(self) -> int|None: """Content length of the message""" return self._message.content_length + + @property + def upgrading_to_h2(self) -> bool: + """If the message is upgrading to HTTP/2""" + return self._message.upgrading_to_h2 + + @property + def upgrading_to_ws(self) -> bool: + """If the message is upgrading to Websocket""" + return self._message.upgrading_to_ws + + @property + def ws_stream(self) -> list[Frame]: + """Websocket stream""" + return self._message.ws_stream + + @property + def stream(self) -> bytes: + """Stream of the message""" + return self._message.stream def get_header(self, header: str, default=None) -> str: """Get a header from the message without caring about the case""" @@ -391,8 +516,8 @@ class InternalBasicHttpMetaClass: if not headers_were_set and parser.msg.headers_complete: messages_tosend.append(parser.msg) # Also the current message needs to be sent due to complete headers - if headers_were_set and parser.msg.message_complete and parser.msg.should_upgrade and parser.save_body: - messages_tosend.append(parser.msg) # Also the current message needs to beacase a websocket stream is going on + if parser._packet_to_stream(): + messages_tosend.append(parser.msg) # Also the current message needs to beacase a stream is going on messages_to_call = len(messages_tosend) @@ -423,7 +548,7 @@ class HttpRequest(InternalBasicHttpMetaClass): return self._parser.msg.method def __repr__(self): - return f"" + return f"" class HttpResponse(InternalBasicHttpMetaClass): """ @@ -445,7 +570,7 @@ class HttpResponse(InternalBasicHttpMetaClass): return self._parser.msg.status def __repr__(self): - return f"" + return f"" class HttpRequestHeader(HttpRequest): """ diff --git a/fgex-lib/requirements.txt b/fgex-lib/requirements.txt index 82f6118..7836131 100644 --- a/fgex-lib/requirements.txt +++ b/fgex-lib/requirements.txt @@ -5,4 +5,5 @@ zstd # waiting for pull request to be merged brotli # waiting for pull request to be merged watchfiles fgex +websockets pyllhttp diff --git a/frontend/src/components/NFProxy/NFProxyDocs.tsx b/frontend/src/components/NFProxy/NFProxyDocs.tsx index e5332db..838a215 100644 --- a/frontend/src/components/NFProxy/NFProxyDocs.tsx +++ b/frontend/src/components/NFProxy/NFProxyDocs.tsx @@ -232,10 +232,10 @@ export const NFProxyDocs = () => { url: The url of the request (read only) - headers: The headers of the request (read only). The keys and values are exactly the same as the original request (case sensitive). + headers: The headers of the request (read only). The keys and values are exactly the same as the original request (case sensitive). (values can be list in case the same header field is repeated) - get_header(key:str, default = None): A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value. + get_header(key:str, default = None): A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value. (if the same header field is repeated, its value is concatenated with a comma, this function will never return a list) user_agent: The user agent of the request (read only) @@ -261,6 +261,15 @@ export const NFProxyDocs = () => { should_upgrade: It's true if the connection should be upgraded, false if it's not. (read only) + + upgrading_to_h2: It's true if the connection is upgrading to h2, false if it's not. (read only) + + + ws_stream: It's a list of websockets.frames.Frame decoded (permessage-deflate is supported). (read only) [docs] + + + upgrading_to_ws: It's true if the connection is upgrading to ws, false if it's not. (read only) + method: The method of the request (read only) @@ -294,10 +303,10 @@ export const NFProxyDocs = () => { url: The url of the response (read only) - headers: The headers of the response (read only). The keys and values are exactly the same as the original response (case sensitive). + headers: The headers of the response (read only). The keys and values are exactly the same as the original response (case sensitive). (values can be list in case the same header field is repeated) - get_header(key:str, default = None): A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value. + get_header(key:str, default = None): A function that returns the value of a header: it matches the key without case sensitivity. If the header is not found, it returns the default value. (if the same header field is repeated, its value is concatenated with a comma, this function will never return a list) user_agent: The user agent of the response (read only) @@ -323,6 +332,15 @@ export const NFProxyDocs = () => { should_upgrade: It's true if the connection should be upgraded, false if it's not. (read only) + + upgrading_to_h2: It's true if the connection is upgrading to h2, false if it's not. (read only) + + + ws_stream: It's a list of websockets.frames.Frame decoded (permessage-deflate is supported). (read only) [docs] + + + upgrading_to_ws: It's true if the connection is upgrading to ws, false if it's not. (read only) + status_code: The status code of the response (read only) (int) diff --git a/tests/nfproxy_test.py b/tests/nfproxy_test.py index 40fda97..c36350c 100644 --- a/tests/nfproxy_test.py +++ b/tests/nfproxy_test.py @@ -45,10 +45,10 @@ else: def exit_test(code): if service_id: server.stop() - #if firegex.nfproxy_delete_service(service_id): - # puts("Sucessfully deleted service ✔", color=colors.green) - #else: - # puts("Test Failed: Coulnd't delete serivce ✗", color=colors.red) + if firegex.nfproxy_delete_service(service_id): + puts("Sucessfully deleted service ✔", color=colors.green) + else: + puts("Test Failed: Coulnd't delete serivce ✗", color=colors.red) exit(code) if(firegex.nfproxy_start_service(service_id)): @@ -497,6 +497,64 @@ server.close_client() remove_filters() +WS_REQUEST_PARSING_TEST = b'GET /sock/?EIO=4&transport=websocket HTTP/1.1\r\nHost: localhost:8080\r\nConnection: Upgrade\r\nPragma: no-cache\r\nCache-Control: no-cache\r\nUser-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)\xac AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36\r\nUpgrade: websocket\r\nOrigin: http://localhost:8080\r\nSec-WebSocket-Version: 13\r\nAccept-Encoding: gzip, deflate, br, zstd\r\nAccept-Language: it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,zh;q=0.5\r\nCookie: cookie-consent=true; _iub_cs-86405163=%7B%22timestamp%22%3A%222024-09-12T18%3A20%3A18.627Z%22%2C%22version%22%3A%221.65.1%22%2C%22purposes%22%3A%7B%221%22%3Atrue%2C%224%22%3Atrue%7D%2C%22id%22%3A86405163%2C%22cons%22%3A%7B%22rand%22%3A%222b09e6%22%7D%7D\r\nSec-WebSocket-Key: eE01O3/ZShPKsrykACLAaA==\r\nSec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n\r\n\xc1\x84#\x8a\xb2\xbb\x11\xbb\xb2\xbb' + +HTTP_REQUEST_WS_PARSING_TEST = """ +from firegex.nfproxy.models import HttpRequest +from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT + +@pyfilter +def data_type_test(req:HttpRequest): + print(req) + +""" + +if firegex.nfproxy_set_code(service_id, HTTP_REQUEST_WS_PARSING_TEST): + puts("Sucessfully added filter websocket parsing with HttpRequest ✔", color=colors.green) +else: + puts("Test Failed: Couldn't add the websocket parsing filter ✗", color=colors.red) + exit_test(1) + +server.connect_client() +server.send_packet(WS_REQUEST_PARSING_TEST) +if server.recv_packet(): + puts("The HTTP websocket upgrade request was successfully parsed ✔", color=colors.green) +else: + puts("Test Failed: The HTTP websocket upgrade request wasn't parsed (an error occurred) ✗", color=colors.red) + exit_test(1) +server.close_client() + +remove_filters() + +WS_RESPONSE_PARSING_TEST = b'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: eGnJqUSoSKE3wOfKD2M3G82RsS8=\r\nSec-WebSocket-Extensions: permessage-deflate\r\ndate: Sat, 15 Mar 2025 12:04:19 GMT\r\nserver: uvicorn\r\n\r\n\xc1_2\xa8V*\xceLQ\xb2Rr1\xb4\xc8\xf6r\x0c\xf3\xaf\xd25\xf7\x8e\xf4\xb3LsttrW\xd2Q*-H/JLI-V\xb2\x8a\x8e\xd5Q*\xc8\xccK\x0f\xc9\xccM\xcd/-Q\xb222\x00\x02\x88\x98g^IjQYb\x0eP\xd0\x14,\x98\x9bX\x11\x90X\x99\x93\x9f\x084\xda\xd0\x00\x0cj\x01\x00\xc1\x1b21\x80\xd9e\xe1n\x19\x9e\xe3RP\x9a[Z\x99\x93j\xea\x15\x00\xb4\xcbC\xa9\x16\x00' + +HTTP_RESPONSE_WS_PARSING_TEST = """ +from firegex.nfproxy.models import HttpResponse +from firegex.nfproxy import pyfilter, ACCEPT, UNSTABLE_MANGLE, DROP, REJECT + +@pyfilter +def data_type_test(req:HttpResponse): + print(req) + +""" + +if firegex.nfproxy_set_code(service_id, HTTP_RESPONSE_WS_PARSING_TEST): + puts("Sucessfully added filter websocket parsing with HttpResponse ✔", color=colors.green) +else: + puts("Test Failed: Couldn't add the websocket parsing filter ✗", color=colors.red) + exit_test(1) + +server.connect_client() +server.send_packet(WS_RESPONSE_PARSING_TEST) +if server.recv_packet(): + puts("The HTTP websocket upgrade response was successfully parsed ✔", color=colors.green) +else: + puts("Test Failed: The HTTP websocket upgrade response wasn't parsed (an error occurred) ✗", color=colors.red) + exit_test(1) +server.close_client() + +remove_filters() + #Rename service if firegex.nfproxy_rename_service(service_id,f"{args.service_name}2"): puts(f"Sucessfully renamed service to {args.service_name}2 ✔", color=colors.green)