parsing extentions from server response

This commit is contained in:
Domingo Dirutigliano
2025-03-17 11:02:04 +01:00
parent 35c46a223e
commit c628ff57a5
2 changed files with 87 additions and 55 deletions

View File

@@ -106,6 +106,7 @@ class DataStreamCtx:
self.__data = glob["__firegex_pyfilter_ctx"]
self.filter_glob = glob
self.current_pkt = RawPacket._fetch_packet(self) if init_pkt else None
self.call_mem = {} #A memory space valid only for the current packet handler
@property
def filter_call_info(self) -> list[FilterHandler]:

View File

@@ -5,12 +5,12 @@ from firegex.nfproxy.internals.exceptions import StreamFullDrop, StreamFullRejec
from firegex.nfproxy.internals.models import FullStreamAction, ExceptionAction
from dataclasses import dataclass, field
from collections import deque
from typing import Type
from zstd import ZSTD_uncompress
import gzip
import io
import zlib
import brotli
import traceback
from websockets.frames import Frame
from websockets.extensions.permessage_deflate import PerMessageDeflate
from pyllhttp import PAUSED_H2_UPGRADE, PAUSED_UPGRADE
@@ -260,7 +260,9 @@ class InternalCallbackHandler():
while True:
try:
new_frame, self.buffers._ws_packet_stream = self._parse_websocket_frame(self.buffers._ws_packet_stream)
except Exception as e:
except Exception:
print("[WARNING] Websocket parsing failed, passing data to stream...", flush=True)
traceback.print_exc()
self._ws_raised_error = True
self.msg.stream += self.buffers._ws_packet_stream
self.buffers._ws_packet_stream = b""
@@ -287,9 +289,11 @@ class InternalCallbackHandler():
return ext_ws
def _parse_websocket_frame(self, data: bytes) -> tuple[Frame|None, bytes]:
# mask = is_input
if self._ws_extentions is None:
self._ws_extentions = self._parse_websocket_ext()
if self._is_input():
self._ws_extentions = [] # Fallback to no options
else:
self._ws_extentions = self._parse_websocket_ext() # Extentions used are choosen by the server response
read_buffering = bytearray()
def read_exact(n: int):
nonlocal read_buffering
@@ -450,70 +454,89 @@ class InternalBasicHttpMetaClass:
return self._message.lheaders.get(header.lower(), default)
@staticmethod
def _associated_parser_class() -> Type[InternalHttpRequest]|Type[InternalHttpResponse]:
def _before_fetch_callable_checks(internal_data: DataStreamCtx) -> bool:
raise NotImplementedError()
@staticmethod
def _before_fetch_callable_checks(internal_data: DataStreamCtx):
return True
def _parser_class() -> str:
raise NotImplementedError()
@classmethod
def _fetch_packet(cls, internal_data: DataStreamCtx):
if internal_data.current_pkt is None or internal_data.current_pkt.is_tcp is False:
raise NotReadyToRun()
ParserType = cls._associated_parser_class()
ParserType = InternalHttpRequest if internal_data.current_pkt.is_input else InternalHttpResponse
parser_key = f"{cls._parser_class()}_{'in' if internal_data.current_pkt.is_input else 'out'}"
parser = internal_data.data_handler_context.get(cls, None)
parser = internal_data.data_handler_context.get(parser_key, None)
if parser is None or parser.raised_error:
parser: InternalHttpRequest|InternalHttpResponse = ParserType()
internal_data.data_handler_context[cls] = parser
internal_data.data_handler_context[parser_key] = parser
if not internal_data.call_mem.get(cls._parser_class(), False): #Need to parse HTTP
internal_data.call_mem[cls._parser_class()] = True
#Setting websocket options if needed to the client parser
if internal_data.current_pkt.is_input:
ext_opt = internal_data.data_handler_context.get(f"{cls._parser_class()}_ws_options_client")
if ext_opt is not None and parser._ws_extentions != ext_opt:
parser._ws_extentions = ext_opt
# Memory size managment
if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size:
match internal_data.full_stream_action:
case FullStreamAction.FLUSH:
# Deleting parser and re-creating it
parser.messages.clear()
parser.msg.total_size -= len(parser.msg.stream)
parser.msg.stream = b""
parser.msg.total_size -= len(parser.msg.body)
parser.msg.body = b""
print("[WARNING] Flushing stream", flush=True)
if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size:
parser.reset_data()
case FullStreamAction.REJECT:
raise StreamFullReject()
case FullStreamAction.DROP:
raise StreamFullDrop()
case FullStreamAction.ACCEPT:
raise NotReadyToRun()
internal_data.call_mem["headers_were_set"] = parser.msg.headers_complete #This information is usefull for building the real object
try:
parser.parse_data(internal_data.current_pkt.data)
except Exception as e:
traceback.print_exc()
match internal_data.invalid_encoding_action:
case ExceptionAction.REJECT:
raise RejectConnection()
case ExceptionAction.DROP:
raise DropPacket()
case ExceptionAction.NOACTION:
raise e
case ExceptionAction.ACCEPT:
raise NotReadyToRun()
if parser.should_upgrade and not internal_data.current_pkt.is_input:
#Creating ws_option for the client
if not internal_data.data_handler_context.get(f"{cls._parser_class()}_ws_options_client"):
ext = parser._parse_websocket_ext()
internal_data.data_handler_context[f"{cls._parser_class()}_ws_options_client"] = ext
#Once the parsers has been triggered, we can return the object if needed
if not cls._before_fetch_callable_checks(internal_data):
raise NotReadyToRun()
# Memory size managment
if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size:
match internal_data.full_stream_action:
case FullStreamAction.FLUSH:
# Deleting parser and re-creating it
parser.messages.clear()
parser.msg.total_size -= len(parser.msg.stream)
parser.msg.stream = b""
parser.msg.total_size -= len(parser.msg.body)
parser.msg.body = b""
print("[WARNING] Flushing stream", flush=True)
if parser.total_size+len(internal_data.current_pkt.data) > internal_data.stream_max_size:
parser.reset_data()
case FullStreamAction.REJECT:
raise StreamFullReject()
case FullStreamAction.DROP:
raise StreamFullDrop()
case FullStreamAction.ACCEPT:
raise NotReadyToRun()
headers_were_set = parser.msg.headers_complete
try:
parser.parse_data(internal_data.current_pkt.data)
except Exception as e:
match internal_data.invalid_encoding_action:
case ExceptionAction.REJECT:
raise RejectConnection()
case ExceptionAction.DROP:
raise DropPacket()
case ExceptionAction.NOACTION:
raise e
case ExceptionAction.ACCEPT:
raise NotReadyToRun()
messages_tosend:list[InternalHTTPMessage] = []
for i in range(len(parser.messages)):
messages_tosend.append(parser.pop_message())
if len(messages_tosend) > 0:
headers_were_set = False # New messages completed so the current message headers were not set in this case
internal_data.call_mem["headers_were_set"] = False # New messages completed so the current message headers were not set in this case
if not headers_were_set and parser.msg.headers_complete:
if not internal_data.call_mem["headers_were_set"] and parser.msg.headers_complete:
messages_tosend.append(parser.msg) # Also the current message needs to be sent due to complete headers
if parser._packet_to_stream():
@@ -533,10 +556,6 @@ class HttpRequest(InternalBasicHttpMetaClass):
HTTP Request handler
This data handler will be called twice, first with the headers complete, and second with the body complete
"""
@staticmethod
def _associated_parser_class() -> Type[InternalHttpRequest]:
return InternalHttpRequest
@staticmethod
def _before_fetch_callable_checks(internal_data: DataStreamCtx):
@@ -546,6 +565,10 @@ class HttpRequest(InternalBasicHttpMetaClass):
def method(self) -> bytes:
"""Method of the request"""
return self._parser.msg.method
@staticmethod
def _parser_class() -> str:
return "full_http"
def __repr__(self):
return f"<HttpRequest method={self.method} url={self.url} headers={self.headers} body=[{0 if not self.body else len(self.body)} bytes] http_version={self.http_version} keep_alive={self.keep_alive} should_upgrade={self.should_upgrade} headers_complete={self.headers_complete} message_complete={self.message_complete} content_length={self.content_length} stream={self.stream} ws_stream={self.ws_stream}>"
@@ -556,10 +579,6 @@ class HttpResponse(InternalBasicHttpMetaClass):
This data handler will be called twice, first with the headers complete, and second with the body complete
"""
@staticmethod
def _associated_parser_class() -> Type[InternalHttpResponse]:
return InternalHttpResponse
@staticmethod
def _before_fetch_callable_checks(internal_data: DataStreamCtx):
return not internal_data.current_pkt.is_input
@@ -568,6 +587,10 @@ class HttpResponse(InternalBasicHttpMetaClass):
def status_code(self) -> int:
"""Status code of the response"""
return self._parser.msg.status
@staticmethod
def _parser_class() -> str:
return "full_http"
def __repr__(self):
return f"<HttpResponse status_code={self.status_code} url={self.url} headers={self.headers} body=[{0 if not self.body else len(self.body)} bytes] http_version={self.http_version} keep_alive={self.keep_alive} should_upgrade={self.should_upgrade} headers_complete={self.headers_complete} message_complete={self.message_complete} content_length={self.content_length} stream={self.stream} ws_stream={self.ws_stream}>"
@@ -580,6 +603,10 @@ class HttpRequestHeader(HttpRequest):
def _contructor_hook(self):
self._parser.save_body = False
@staticmethod
def _parser_class() -> str:
return "header_http"
class HttpResponseHeader(HttpResponse):
"""
@@ -588,4 +615,8 @@ class HttpResponseHeader(HttpResponse):
"""
def _contructor_hook(self):
self._parser.save_body = False
self._parser.save_body = False
@staticmethod
def _parser_class() -> str:
return "header_http"