From c628ff57a582a75146d77815d8b8380a1c6adadc Mon Sep 17 00:00:00 2001 From: Domingo Dirutigliano Date: Mon, 17 Mar 2025 11:02:04 +0100 Subject: [PATCH] parsing extentions from server response --- fgex-lib/firegex/nfproxy/internals/data.py | 1 + fgex-lib/firegex/nfproxy/models/http.py | 141 +++++++++++++-------- 2 files changed, 87 insertions(+), 55 deletions(-) diff --git a/fgex-lib/firegex/nfproxy/internals/data.py b/fgex-lib/firegex/nfproxy/internals/data.py index 8bcc884..edb5746 100644 --- a/fgex-lib/firegex/nfproxy/internals/data.py +++ b/fgex-lib/firegex/nfproxy/internals/data.py @@ -106,6 +106,7 @@ class DataStreamCtx: self.__data = glob["__firegex_pyfilter_ctx"] self.filter_glob = glob self.current_pkt = RawPacket._fetch_packet(self) if init_pkt else None + self.call_mem = {} #A memory space valid only for the current packet handler @property def filter_call_info(self) -> list[FilterHandler]: diff --git a/fgex-lib/firegex/nfproxy/models/http.py b/fgex-lib/firegex/nfproxy/models/http.py index 5fa54a7..6b9ec4f 100644 --- a/fgex-lib/firegex/nfproxy/models/http.py +++ b/fgex-lib/firegex/nfproxy/models/http.py @@ -5,12 +5,12 @@ from firegex.nfproxy.internals.exceptions import StreamFullDrop, StreamFullRejec from firegex.nfproxy.internals.models import FullStreamAction, ExceptionAction from dataclasses import dataclass, field from collections import deque -from typing import Type from zstd import ZSTD_uncompress import gzip import io import zlib import brotli +import traceback from websockets.frames import Frame from websockets.extensions.permessage_deflate import PerMessageDeflate from pyllhttp import PAUSED_H2_UPGRADE, PAUSED_UPGRADE @@ -260,7 +260,9 @@ class InternalCallbackHandler(): while True: try: new_frame, self.buffers._ws_packet_stream = self._parse_websocket_frame(self.buffers._ws_packet_stream) - except Exception as e: + except Exception: + print("[WARNING] Websocket parsing failed, passing data to stream...", flush=True) + traceback.print_exc() self._ws_raised_error = True self.msg.stream += self.buffers._ws_packet_stream self.buffers._ws_packet_stream = b"" @@ -287,9 +289,11 @@ class InternalCallbackHandler(): return ext_ws def _parse_websocket_frame(self, data: bytes) -> tuple[Frame|None, bytes]: - # mask = is_input if self._ws_extentions is None: - self._ws_extentions = self._parse_websocket_ext() + if self._is_input(): + self._ws_extentions = [] # Fallback to no options + else: + self._ws_extentions = self._parse_websocket_ext() # Extentions used are choosen by the server response read_buffering = bytearray() def read_exact(n: int): nonlocal read_buffering @@ -450,70 +454,89 @@ class InternalBasicHttpMetaClass: return self._message.lheaders.get(header.lower(), default) @staticmethod - def _associated_parser_class() -> Type[InternalHttpRequest]|Type[InternalHttpResponse]: + def _before_fetch_callable_checks(internal_data: DataStreamCtx) -> bool: raise NotImplementedError() @staticmethod - def _before_fetch_callable_checks(internal_data: DataStreamCtx): - return True + def _parser_class() -> str: + raise NotImplementedError() @classmethod def _fetch_packet(cls, internal_data: DataStreamCtx): if internal_data.current_pkt is None or internal_data.current_pkt.is_tcp is False: raise NotReadyToRun() - ParserType = cls._associated_parser_class() + ParserType = InternalHttpRequest if internal_data.current_pkt.is_input else InternalHttpResponse + parser_key = f"{cls._parser_class()}_{'in' if internal_data.current_pkt.is_input else 'out'}" - parser = internal_data.data_handler_context.get(cls, None) + parser = internal_data.data_handler_context.get(parser_key, None) if parser is None or parser.raised_error: parser: InternalHttpRequest|InternalHttpResponse = ParserType() - internal_data.data_handler_context[cls] = parser + internal_data.data_handler_context[parser_key] = parser + if not internal_data.call_mem.get(cls._parser_class(), False): #Need to parse HTTP + internal_data.call_mem[cls._parser_class()] = True + + #Setting websocket options if needed to the client parser + if internal_data.current_pkt.is_input: + ext_opt = internal_data.data_handler_context.get(f"{cls._parser_class()}_ws_options_client") + if ext_opt is not None and parser._ws_extentions != ext_opt: + parser._ws_extentions = ext_opt + + # Memory size managment + if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size: + match internal_data.full_stream_action: + case FullStreamAction.FLUSH: + # Deleting parser and re-creating it + parser.messages.clear() + parser.msg.total_size -= len(parser.msg.stream) + parser.msg.stream = b"" + parser.msg.total_size -= len(parser.msg.body) + parser.msg.body = b"" + print("[WARNING] Flushing stream", flush=True) + if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size: + parser.reset_data() + case FullStreamAction.REJECT: + raise StreamFullReject() + case FullStreamAction.DROP: + raise StreamFullDrop() + case FullStreamAction.ACCEPT: + raise NotReadyToRun() + + internal_data.call_mem["headers_were_set"] = parser.msg.headers_complete #This information is usefull for building the real object + + try: + parser.parse_data(internal_data.current_pkt.data) + except Exception as e: + traceback.print_exc() + match internal_data.invalid_encoding_action: + case ExceptionAction.REJECT: + raise RejectConnection() + case ExceptionAction.DROP: + raise DropPacket() + case ExceptionAction.NOACTION: + raise e + case ExceptionAction.ACCEPT: + raise NotReadyToRun() + + if parser.should_upgrade and not internal_data.current_pkt.is_input: + #Creating ws_option for the client + if not internal_data.data_handler_context.get(f"{cls._parser_class()}_ws_options_client"): + ext = parser._parse_websocket_ext() + internal_data.data_handler_context[f"{cls._parser_class()}_ws_options_client"] = ext + + #Once the parsers has been triggered, we can return the object if needed if not cls._before_fetch_callable_checks(internal_data): raise NotReadyToRun() - # Memory size managment - if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size: - match internal_data.full_stream_action: - case FullStreamAction.FLUSH: - # Deleting parser and re-creating it - parser.messages.clear() - parser.msg.total_size -= len(parser.msg.stream) - parser.msg.stream = b"" - parser.msg.total_size -= len(parser.msg.body) - parser.msg.body = b"" - print("[WARNING] Flushing stream", flush=True) - if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size: - parser.reset_data() - case FullStreamAction.REJECT: - raise StreamFullReject() - case FullStreamAction.DROP: - raise StreamFullDrop() - case FullStreamAction.ACCEPT: - raise NotReadyToRun() - - headers_were_set = parser.msg.headers_complete - try: - parser.parse_data(internal_data.current_pkt.data) - except Exception as e: - match internal_data.invalid_encoding_action: - case ExceptionAction.REJECT: - raise RejectConnection() - case ExceptionAction.DROP: - raise DropPacket() - case ExceptionAction.NOACTION: - raise e - case ExceptionAction.ACCEPT: - raise NotReadyToRun() - messages_tosend:list[InternalHTTPMessage] = [] for i in range(len(parser.messages)): messages_tosend.append(parser.pop_message()) if len(messages_tosend) > 0: - headers_were_set = False # New messages completed so the current message headers were not set in this case + internal_data.call_mem["headers_were_set"] = False # New messages completed so the current message headers were not set in this case - if not headers_were_set and parser.msg.headers_complete: + if not internal_data.call_mem["headers_were_set"] and parser.msg.headers_complete: messages_tosend.append(parser.msg) # Also the current message needs to be sent due to complete headers if parser._packet_to_stream(): @@ -533,10 +556,6 @@ class HttpRequest(InternalBasicHttpMetaClass): HTTP Request handler This data handler will be called twice, first with the headers complete, and second with the body complete """ - - @staticmethod - def _associated_parser_class() -> Type[InternalHttpRequest]: - return InternalHttpRequest @staticmethod def _before_fetch_callable_checks(internal_data: DataStreamCtx): @@ -546,6 +565,10 @@ class HttpRequest(InternalBasicHttpMetaClass): def method(self) -> bytes: """Method of the request""" return self._parser.msg.method + + @staticmethod + def _parser_class() -> str: + return "full_http" def __repr__(self): return f"" @@ -556,10 +579,6 @@ class HttpResponse(InternalBasicHttpMetaClass): This data handler will be called twice, first with the headers complete, and second with the body complete """ - @staticmethod - def _associated_parser_class() -> Type[InternalHttpResponse]: - return InternalHttpResponse - @staticmethod def _before_fetch_callable_checks(internal_data: DataStreamCtx): return not internal_data.current_pkt.is_input @@ -568,6 +587,10 @@ class HttpResponse(InternalBasicHttpMetaClass): def status_code(self) -> int: """Status code of the response""" return self._parser.msg.status + + @staticmethod + def _parser_class() -> str: + return "full_http" def __repr__(self): return f"" @@ -580,6 +603,10 @@ class HttpRequestHeader(HttpRequest): def _contructor_hook(self): self._parser.save_body = False + + @staticmethod + def _parser_class() -> str: + return "header_http" class HttpResponseHeader(HttpResponse): """ @@ -588,4 +615,8 @@ class HttpResponseHeader(HttpResponse): """ def _contructor_hook(self): - self._parser.save_body = False \ No newline at end of file + self._parser.save_body = False + + @staticmethod + def _parser_class() -> str: + return "header_http"