From 4e5d6e3eb8c21e68fde95b1cedc172f0dee4c021 Mon Sep 17 00:00:00 2001 From: serega6531 Date: Tue, 14 Apr 2020 14:55:14 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../packmate/service/StreamOptimizer.java | 383 ------------------ .../packmate/service/StreamService.java | 1 + .../optimization/HttpChunksProcessor.java | 142 +++++++ .../optimization/HttpGzipProcessor.java | 128 ++++++ .../optimization/HttpUrldecodeProcessor.java | 46 +++ .../service/optimization/PacketsMerger.java | 67 +++ .../service/optimization/StreamOptimizer.java | 96 +++++ .../{ => optimization}/WebSocketsParser.java | 14 +- .../packmate/utils/PacketUtils.java | 19 + .../packmate/StreamOptimizerTest.java | 13 +- 10 files changed, 510 insertions(+), 399 deletions(-) delete mode 100644 src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java create mode 100644 src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java create mode 100644 src/main/java/ru/serega6531/packmate/service/optimization/HttpGzipProcessor.java create mode 100644 src/main/java/ru/serega6531/packmate/service/optimization/HttpUrldecodeProcessor.java create mode 100644 src/main/java/ru/serega6531/packmate/service/optimization/PacketsMerger.java create mode 100644 src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java rename src/main/java/ru/serega6531/packmate/service/{ => optimization}/WebSocketsParser.java (95%) create mode 100644 src/main/java/ru/serega6531/packmate/utils/PacketUtils.java diff --git a/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java b/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java deleted file mode 100644 index fe4ff4e..0000000 --- a/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java +++ /dev/null @@ -1,383 +0,0 @@ -package ru.serega6531.packmate.service; - -import lombok.AllArgsConstructor; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.ArrayUtils; -import ru.serega6531.packmate.model.CtfService; -import ru.serega6531.packmate.model.Packet; -import ru.serega6531.packmate.utils.Bytes; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.URLDecoder; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.zip.GZIPInputStream; -import java.util.zip.ZipException; - -@AllArgsConstructor -@Slf4j -public class StreamOptimizer { - - private final CtfService service; - private List packets; - - private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; - - /** - * Вызвать для выполнения оптимизаций на переданном списке пакетов. - */ - public List optimizeStream() { - if (service.isProcessChunkedEncoding()) { - processChunkedEncoding(); - } - - if (service.isUngzipHttp()) { - unpackGzip(); - } - - if (service.isParseWebSockets()) { - parseWebSockets(); - } - - if (service.isUrldecodeHttpRequests()) { - urldecodeRequests(); - } - - if (service.isMergeAdjacentPackets()) { - mergeAdjacentPackets(); - } - - return packets; - } - - /** - * Сжать соседние пакеты в одном направлении в один. - * Выполняется после других оптимизаций чтобы правильно определять границы пакетов. - */ - private void mergeAdjacentPackets() { - int start = 0; - int packetsInRow = 0; - boolean incoming = true; - - for (int i = 0; i < packets.size(); i++) { - Packet packet = packets.get(i); - if (packet.isIncoming() != incoming) { - if (packetsInRow > 1) { - compress(start, i); - - i = start + 1; // продвигаем указатель на следующий после склеенного блок - } - start = i; - packetsInRow = 1; - } else { - packetsInRow++; - } - - incoming = packet.isIncoming(); - } - - if (packetsInRow > 1) { - compress(start, packets.size()); - } - } - - /** - * Сжать кусок со start по end в один пакет - */ - private void compress(int start, int end) { - final List cut = packets.subList(start, end); - final long timestamp = cut.get(0).getTimestamp(); - final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped); - final boolean webSocketParsed = cut.stream().anyMatch(Packet::isWebSocketParsed); - boolean incoming = cut.get(0).isIncoming(); - //noinspection OptionalGetWithoutIsPresent - final byte[] content = mergePackets(cut).get(); - - packets.removeAll(cut); - packets.add(start, Packet.builder() - .incoming(incoming) - .timestamp(timestamp) - .ungzipped(ungzipped) - .webSocketParsed(webSocketParsed) - .content(content) - .build()); - } - - /** - * Декодирование urlencode с http пакета до смены стороны или окончания стрима - */ - @SneakyThrows - private void urldecodeRequests() { - boolean httpStarted = false; - - for (Packet packet : packets) { - if (packet.isIncoming()) { - String content = packet.getContentString(); - if (content.contains("HTTP/")) { - httpStarted = true; - } - - if (httpStarted) { - try { - content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString()); - packet.setContent(content.getBytes()); - } catch (IllegalArgumentException e) { - log.warn("urldecode", e); - } - } - } else { - httpStarted = false; - } - } - } - - private void processChunkedEncoding() { - boolean chunkStarted = false; - int start = -1; - List chunk = new ArrayList<>(); - - for (int i = 0; i < packets.size(); i++) { - Packet packet = packets.get(i); - if (!packet.isIncoming()) { - String content = packet.getContentString(); - - boolean http = content.startsWith("HTTP/"); - int contentPos = content.indexOf("\r\n\r\n"); - - if (http && contentPos != -1) { // начало body - String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n - boolean chunked = headers.contains("Transfer-Encoding: chunked\r\n"); - if (chunked) { - chunkStarted = true; - start = i; - chunk.add(packet); - - if (checkCompleteChunk(chunk, start)) { - chunkStarted = false; - chunk.clear(); - i = start + 1; - } - } else { - chunkStarted = false; - chunk.clear(); - } - } else if (chunkStarted) { - chunk.add(packet); - if (checkCompleteChunk(chunk, start)) { - chunkStarted = false; - chunk.clear(); - i = start + 1; - } - } - } - } - } - - /** - * @return true если чанк завершен - */ - private boolean checkCompleteChunk(List chunk, int start) { - boolean end = chunk.get(chunk.size() - 1).getContentString().endsWith("\r\n0\r\n\r\n"); - - if (end) { - //noinspection OptionalGetWithoutIsPresent - final byte[] content = mergePackets(chunk).get(); - - ByteArrayOutputStream output = new ByteArrayOutputStream(content.length); - - final int contentStart = Bytes.indexOf(content, "\r\n\r\n".getBytes()) + 4; - output.write(content, 0, contentStart); - - final byte[] body = Arrays.copyOfRange(content, contentStart, content.length); - - int currentPos = 0; - - while (true) { - final String found = readChunkSize(body, currentPos); - if (found != null) { - final int chunkSize = Integer.parseInt(found, 16); - - if (chunkSize == 0) { // конец потока чанков - Packet result = Packet.builder() - .incoming(false) - .timestamp(chunk.get(0).getTimestamp()) - .ungzipped(false) - .webSocketParsed(false) - .content(output.toByteArray()) - .build(); - - packets.removeAll(chunk); - packets.add(start, result); - - return true; - } - - currentPos += found.length() + 2; - - if (currentPos + chunkSize >= body.length) { - log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}", currentPos, chunkSize, body.length); - return true; // обнулить список, но не заменять пакеты - } - - output.write(body, currentPos, chunkSize); - currentPos += chunkSize; - - if (currentPos + 2 >= body.length || body[currentPos] != '\r' || body[currentPos + 1] != '\n') { - log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n"); - return true; // обнулить список, но не заменять пакеты - } - - currentPos += 2; - } else { - log.warn("Failed to merge chunks, next chunk size not found"); - return true; // обнулить список, но не заменять пакеты - } - } - } - - return false; - } - - private String readChunkSize(byte[] content, int start) { - StringBuilder sb = new StringBuilder(); - for (int i = start; i < content.length - 1; i++) { - byte b = content[i]; - - if ((b >= '0' && b <= '9') || (b >= 'a' && b <= 'f')) { - sb.append((char) b); - } else if (b == '\r' && content[i + 1] == '\n') { - return sb.toString(); - } - } - - return null; - } - - /** - * Попытаться распаковать GZIP из исходящих http пакетов.
- * GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip - * (при этом заголовок HTTP может быть в другом пакете)
- * Поток заканчивается при обнаружении нового HTTP заголовка, - * при смене стороны передачи или при окончании всего стрима - */ - private void unpackGzip() { - boolean gzipStarted = false; - int gzipStartPacket = 0; - int gzipEndPacket; - - for (int i = 0; i < packets.size(); i++) { - Packet packet = packets.get(i); - - if (packet.isIncoming() && gzipStarted) { // поток gzip закончился - gzipEndPacket = i - 1; - if (extractGzip(gzipStartPacket, gzipEndPacket)) { - gzipStarted = false; - i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок - } - } else if (!packet.isIncoming()) { - String content = packet.getContentString(); - - int contentPos = content.indexOf("\r\n\r\n"); - boolean http = content.startsWith("HTTP/"); - - if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток - gzipEndPacket = i - 1; - if (extractGzip(gzipStartPacket, gzipEndPacket)) { - gzipStarted = false; - i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок - } - } - - if (contentPos != -1) { // начало body - String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n - boolean gziped = headers.contains("Content-Encoding: gzip\r\n"); - if (gziped) { - gzipStarted = true; - gzipStartPacket = i; - } - } - } - } - - if (gzipStarted) { // стрим закончился gzip пакетом - extractGzip(gzipStartPacket, packets.size() - 1); - } - } - - /** - * Попытаться распаковать кусок пакетов с gzip body и вставить результат на их место - * - * @return получилось ли распаковать - */ - private boolean extractGzip(int gzipStartPacket, int gzipEndPacket) { - List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); - - Packet decompressed = decompressGzipPackets(cut); - if (decompressed != null) { - packets.removeAll(cut); - packets.add(gzipStartPacket, decompressed); - return true; - } - - return false; - } - - private Packet decompressGzipPackets(List cut) { - //noinspection OptionalGetWithoutIsPresent - final byte[] content = mergePackets(cut).get(); - - final int gzipStart = Bytes.indexOf(content, GZIP_HEADER); - byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart); - byte[] gzipBytes = Arrays.copyOfRange(content, gzipStart, content.length); - - try { - final GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(gzipBytes)); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - IOUtils.copy(gzipStream, out); - byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray()); - - log.debug("GZIP decompressed: {} -> {} bytes", gzipBytes.length, out.size()); - - return Packet.builder() - .incoming(false) - .timestamp(cut.get(0).getTimestamp()) - .ungzipped(true) - .webSocketParsed(false) - .content(newContent) - .build(); - } catch (ZipException e) { - log.warn("Failed to decompress gzip, leaving as it is: {}", e.getMessage()); - } catch (IOException e) { - log.error("decompress gzip", e); - } - - return null; - } - - private void parseWebSockets() { - if (!packets.get(0).getContentString().contains("HTTP/")) { - return; - } - - final WebSocketsParser parser = new WebSocketsParser(packets); - if (!parser.isParsed()) { - return; - } - - packets = parser.getParsedPackets(); - } - - private Optional mergePackets(List cut) { - return cut.stream() - .map(Packet::getContent) - .reduce(ArrayUtils::addAll); - } - -} diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 8d0fc25..ed9fb36 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -15,6 +15,7 @@ import ru.serega6531.packmate.model.pojo.Pagination; import ru.serega6531.packmate.model.pojo.SubscriptionMessage; import ru.serega6531.packmate.model.pojo.UnfinishedStream; import ru.serega6531.packmate.repository.StreamRepository; +import ru.serega6531.packmate.service.optimization.StreamOptimizer; import java.util.HashSet; import java.util.List; diff --git a/src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java b/src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java new file mode 100644 index 0000000..aaa8adc --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java @@ -0,0 +1,142 @@ +package ru.serega6531.packmate.service.optimization; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import ru.serega6531.packmate.model.Packet; +import ru.serega6531.packmate.utils.Bytes; +import ru.serega6531.packmate.utils.PacketUtils; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@Slf4j +@AllArgsConstructor +public class HttpChunksProcessor { + + private List packets; + + public void processChunkedEncoding() { + boolean chunkStarted = false; + int start = -1; + List chunk = new ArrayList<>(); + + for (int i = 0; i < packets.size(); i++) { + Packet packet = packets.get(i); + if (!packet.isIncoming()) { + String content = packet.getContentString(); + + boolean http = content.startsWith("HTTP/"); + int contentPos = content.indexOf("\r\n\r\n"); + + if (http && contentPos != -1) { // начало body + String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n + boolean chunked = headers.contains("Transfer-Encoding: chunked\r\n"); + if (chunked) { + chunkStarted = true; + start = i; + chunk.add(packet); + + if (checkCompleteChunk(chunk, start)) { + chunkStarted = false; + chunk.clear(); + i = start + 1; + } + } else { + chunkStarted = false; + chunk.clear(); + } + } else if (chunkStarted) { + chunk.add(packet); + if (checkCompleteChunk(chunk, start)) { + chunkStarted = false; + chunk.clear(); + i = start + 1; + } + } + } + } + } + + /** + * @return true если чанк завершен + */ + private boolean checkCompleteChunk(List chunk, int start) { + boolean end = chunk.get(chunk.size() - 1).getContentString().endsWith("\r\n0\r\n\r\n"); + + if (end) { + //noinspection OptionalGetWithoutIsPresent + final byte[] content = PacketUtils.mergePackets(chunk).get(); + + ByteArrayOutputStream output = new ByteArrayOutputStream(content.length); + + final int contentStart = Bytes.indexOf(content, "\r\n\r\n".getBytes()) + 4; + output.write(content, 0, contentStart); + + final byte[] body = Arrays.copyOfRange(content, contentStart, content.length); + + int currentPos = 0; + + while (true) { + final String found = readChunkSize(body, currentPos); + if (found != null) { + final int chunkSize = Integer.parseInt(found, 16); + + if (chunkSize == 0) { // конец потока чанков + Packet result = Packet.builder() + .incoming(false) + .timestamp(chunk.get(0).getTimestamp()) + .ungzipped(false) + .webSocketParsed(false) + .content(output.toByteArray()) + .build(); + + packets.removeAll(chunk); + packets.add(start, result); + + return true; + } + + currentPos += found.length() + 2; + + if (currentPos + chunkSize >= body.length) { + log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}", currentPos, chunkSize, body.length); + return true; // обнулить список, но не заменять пакеты + } + + output.write(body, currentPos, chunkSize); + currentPos += chunkSize; + + if (currentPos + 2 >= body.length || body[currentPos] != '\r' || body[currentPos + 1] != '\n') { + log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n"); + return true; // обнулить список, но не заменять пакеты + } + + currentPos += 2; + } else { + log.warn("Failed to merge chunks, next chunk size not found"); + return true; // обнулить список, но не заменять пакеты + } + } + } + + return false; + } + + private String readChunkSize(byte[] content, int start) { + StringBuilder sb = new StringBuilder(); + for (int i = start; i < content.length - 1; i++) { + byte b = content[i]; + + if ((b >= '0' && b <= '9') || (b >= 'a' && b <= 'f')) { + sb.append((char) b); + } else if (b == '\r' && content[i + 1] == '\n') { + return sb.toString(); + } + } + + return null; + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/optimization/HttpGzipProcessor.java b/src/main/java/ru/serega6531/packmate/service/optimization/HttpGzipProcessor.java new file mode 100644 index 0000000..3cd8323 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/optimization/HttpGzipProcessor.java @@ -0,0 +1,128 @@ +package ru.serega6531.packmate.service.optimization; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.ArrayUtils; +import ru.serega6531.packmate.model.Packet; +import ru.serega6531.packmate.utils.Bytes; +import ru.serega6531.packmate.utils.PacketUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipException; + +@Slf4j +@AllArgsConstructor +public class HttpGzipProcessor { + + private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; + + private List packets; + + /** + * Попытаться распаковать GZIP из исходящих http пакетов.
+ * GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip + * (при этом заголовок HTTP может быть в другом пакете)
+ * Поток заканчивается при обнаружении нового HTTP заголовка, + * при смене стороны передачи или при окончании всего стрима + */ + public void unpackGzip() { + boolean gzipStarted = false; + int gzipStartPacket = 0; + int gzipEndPacket; + + for (int i = 0; i < packets.size(); i++) { + Packet packet = packets.get(i); + + if (packet.isIncoming() && gzipStarted) { // поток gzip закончился + gzipEndPacket = i - 1; + if (extractGzip(gzipStartPacket, gzipEndPacket)) { + gzipStarted = false; + i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок + } + } else if (!packet.isIncoming()) { + String content = packet.getContentString(); + + int contentPos = content.indexOf("\r\n\r\n"); + boolean http = content.startsWith("HTTP/"); + + if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток + gzipEndPacket = i - 1; + if (extractGzip(gzipStartPacket, gzipEndPacket)) { + gzipStarted = false; + i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок + } + } + + if (contentPos != -1) { // начало body + String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n + boolean gziped = headers.contains("Content-Encoding: gzip\r\n"); + if (gziped) { + gzipStarted = true; + gzipStartPacket = i; + } + } + } + } + + if (gzipStarted) { // стрим закончился gzip пакетом + extractGzip(gzipStartPacket, packets.size() - 1); + } + } + + /** + * Попытаться распаковать кусок пакетов с gzip body и вставить результат на их место + * + * @return получилось ли распаковать + */ + private boolean extractGzip(int gzipStartPacket, int gzipEndPacket) { + List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); + + Packet decompressed = decompressGzipPackets(cut); + if (decompressed != null) { + packets.removeAll(cut); + packets.add(gzipStartPacket, decompressed); + return true; + } + + return false; + } + + private Packet decompressGzipPackets(List cut) { + //noinspection OptionalGetWithoutIsPresent + final byte[] content = PacketUtils.mergePackets(cut).get(); + + final int gzipStart = Bytes.indexOf(content, GZIP_HEADER); + byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart); + byte[] gzipBytes = Arrays.copyOfRange(content, gzipStart, content.length); + + try { + final GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(gzipBytes)); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IOUtils.copy(gzipStream, out); + byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray()); + + log.debug("GZIP decompressed: {} -> {} bytes", gzipBytes.length, out.size()); + + return Packet.builder() + .incoming(false) + .timestamp(cut.get(0).getTimestamp()) + .ungzipped(true) + .webSocketParsed(false) + .content(newContent) + .build(); + } catch (ZipException e) { + log.warn("Failed to decompress gzip, leaving as it is: {}", e.getMessage()); + } catch (IOException e) { + log.error("decompress gzip", e); + } + + return null; + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/optimization/HttpUrldecodeProcessor.java b/src/main/java/ru/serega6531/packmate/service/optimization/HttpUrldecodeProcessor.java new file mode 100644 index 0000000..ce4f371 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/optimization/HttpUrldecodeProcessor.java @@ -0,0 +1,46 @@ +package ru.serega6531.packmate.service.optimization; + +import lombok.AllArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import ru.serega6531.packmate.model.Packet; + +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.List; + +@AllArgsConstructor +@Slf4j +public class HttpUrldecodeProcessor { + + private List packets; + + /** + * Декодирование urlencode с http пакета до смены стороны или окончания стрима + */ + @SneakyThrows + public void urldecodeRequests() { + boolean httpStarted = false; + + for (Packet packet : packets) { + if (packet.isIncoming()) { + String content = packet.getContentString(); + if (content.contains("HTTP/")) { + httpStarted = true; + } + + if (httpStarted) { + try { + content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString()); + packet.setContent(content.getBytes()); + } catch (IllegalArgumentException e) { + log.warn("urldecode", e); + } + } + } else { + httpStarted = false; + } + } + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/optimization/PacketsMerger.java b/src/main/java/ru/serega6531/packmate/service/optimization/PacketsMerger.java new file mode 100644 index 0000000..89ff3c9 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/optimization/PacketsMerger.java @@ -0,0 +1,67 @@ +package ru.serega6531.packmate.service.optimization; + +import lombok.AllArgsConstructor; +import ru.serega6531.packmate.model.Packet; +import ru.serega6531.packmate.utils.PacketUtils; + +import java.util.List; + +@AllArgsConstructor +public class PacketsMerger { + + private List packets; + + /** + * Сжать соседние пакеты в одном направлении в один. + * Выполняется после других оптимизаций чтобы правильно определять границы пакетов. + */ + public void mergeAdjacentPackets() { + int start = 0; + int packetsInRow = 0; + boolean incoming = true; + + for (int i = 0; i < packets.size(); i++) { + Packet packet = packets.get(i); + if (packet.isIncoming() != incoming) { + if (packetsInRow > 1) { + compress(start, i); + + i = start + 1; // продвигаем указатель на следующий после склеенного блок + } + start = i; + packetsInRow = 1; + } else { + packetsInRow++; + } + + incoming = packet.isIncoming(); + } + + if (packetsInRow > 1) { + compress(start, packets.size()); + } + } + + /** + * Сжать кусок со start по end в один пакет + */ + private void compress(int start, int end) { + final List cut = packets.subList(start, end); + final long timestamp = cut.get(0).getTimestamp(); + final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped); + final boolean webSocketParsed = cut.stream().anyMatch(Packet::isWebSocketParsed); + boolean incoming = cut.get(0).isIncoming(); + //noinspection OptionalGetWithoutIsPresent + final byte[] content = PacketUtils.mergePackets(cut).get(); + + packets.removeAll(cut); + packets.add(start, Packet.builder() + .incoming(incoming) + .timestamp(timestamp) + .ungzipped(ungzipped) + .webSocketParsed(webSocketParsed) + .content(content) + .build()); + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java b/src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java new file mode 100644 index 0000000..5c856a1 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java @@ -0,0 +1,96 @@ +package ru.serega6531.packmate.service.optimization; + +import lombok.AllArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import ru.serega6531.packmate.model.CtfService; +import ru.serega6531.packmate.model.Packet; + +import java.util.List; + +@AllArgsConstructor +@Slf4j +public class StreamOptimizer { + + private final CtfService service; + private List packets; + + /** + * Вызвать для выполнения оптимизаций на переданном списке пакетов. + */ + public List optimizeStream() { + if (service.isProcessChunkedEncoding()) { + processChunkedEncoding(); + } + + if (service.isUngzipHttp()) { + unpackGzip(); + } + + if (service.isParseWebSockets()) { + parseWebSockets(); + } + + if (service.isUrldecodeHttpRequests()) { + urldecodeRequests(); + } + + if (service.isMergeAdjacentPackets()) { + mergeAdjacentPackets(); + } + + return packets; + } + + /** + * Сжать соседние пакеты в одном направлении в один. + * Выполняется после других оптимизаций чтобы правильно определять границы пакетов. + */ + private void mergeAdjacentPackets() { + final PacketsMerger merger = new PacketsMerger(packets); + merger.mergeAdjacentPackets(); + } + + /** + * Декодирование urlencode с http пакета до смены стороны или окончания стрима + */ + @SneakyThrows + private void urldecodeRequests() { + final HttpUrldecodeProcessor processor = new HttpUrldecodeProcessor(packets); + processor.urldecodeRequests(); + } + + /** + * https://ru.wikipedia.org/wiki/Chunked_transfer_encoding + */ + private void processChunkedEncoding() { + HttpChunksProcessor processor = new HttpChunksProcessor(packets); + processor.processChunkedEncoding(); + } + + /** + * Попытаться распаковать GZIP из исходящих http пакетов.
+ * GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip + * (при этом заголовок HTTP может быть в другом пакете)
+ * Поток заканчивается при обнаружении нового HTTP заголовка, + * при смене стороны передачи или при окончании всего стрима + */ + private void unpackGzip() { + final HttpGzipProcessor processor = new HttpGzipProcessor(packets); + processor.unpackGzip(); + } + + private void parseWebSockets() { + if (!packets.get(0).getContentString().contains("HTTP/")) { + return; + } + + final WebSocketsParser parser = new WebSocketsParser(packets); + if (!parser.isParsed()) { + return; + } + + packets = parser.getParsedPackets(); + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java b/src/main/java/ru/serega6531/packmate/service/optimization/WebSocketsParser.java similarity index 95% rename from src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java rename to src/main/java/ru/serega6531/packmate/service/optimization/WebSocketsParser.java index 5d59845..de38248 100644 --- a/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java +++ b/src/main/java/ru/serega6531/packmate/service/optimization/WebSocketsParser.java @@ -1,8 +1,7 @@ -package ru.serega6531.packmate.service; +package ru.serega6531.packmate.service.optimization; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.ArrayUtils; import org.java_websocket.drafts.Draft_6455; import org.java_websocket.exceptions.InvalidDataException; import org.java_websocket.exceptions.InvalidHandshakeException; @@ -12,6 +11,7 @@ import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.HandshakeImpl1Client; import org.java_websocket.handshake.HandshakeImpl1Server; import ru.serega6531.packmate.model.Packet; +import ru.serega6531.packmate.utils.PacketUtils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -110,10 +110,8 @@ public class WebSocketsParser { for (List side : sides) { final Packet lastPacket = side.get(0); - final byte[] wsContent = side.stream() - .map(Packet::getContent) - .reduce(ArrayUtils::addAll) - .get(); + //noinspection OptionalGetWithoutIsPresent + final byte[] wsContent = PacketUtils.mergePackets(side).get(); final ByteBuffer buffer = ByteBuffer.wrap(wsContent); List frames; @@ -177,9 +175,7 @@ public class WebSocketsParser { } private String getHandshake(final List packets) { - final String handshake = packets.stream() - .map(Packet::getContent) - .reduce(ArrayUtils::addAll) + final String handshake = PacketUtils.mergePackets(packets) .map(String::new) .orElse(null); diff --git a/src/main/java/ru/serega6531/packmate/utils/PacketUtils.java b/src/main/java/ru/serega6531/packmate/utils/PacketUtils.java new file mode 100644 index 0000000..b5af91d --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/utils/PacketUtils.java @@ -0,0 +1,19 @@ +package ru.serega6531.packmate.utils; + +import lombok.experimental.UtilityClass; +import org.apache.commons.lang3.ArrayUtils; +import ru.serega6531.packmate.model.Packet; + +import java.util.List; +import java.util.Optional; + +@UtilityClass +public class PacketUtils { + + public static Optional mergePackets(List cut) { + return cut.stream() + .map(Packet::getContent) + .reduce(ArrayUtils::addAll); + } + +} diff --git a/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java b/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java index 67bde4a..edf6b3f 100644 --- a/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java +++ b/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java @@ -4,7 +4,9 @@ import org.apache.commons.lang3.ArrayUtils; import org.junit.jupiter.api.Test; import ru.serega6531.packmate.model.CtfService; import ru.serega6531.packmate.model.Packet; -import ru.serega6531.packmate.service.StreamOptimizer; +import ru.serega6531.packmate.service.optimization.HttpGzipProcessor; +import ru.serega6531.packmate.service.optimization.HttpUrldecodeProcessor; +import ru.serega6531.packmate.service.optimization.PacketsMerger; import java.util.ArrayList; import java.util.Base64; @@ -21,14 +23,11 @@ class StreamOptimizerTest { final byte[] gzipped = Base64.getDecoder().decode(encoded); final byte[] content = ArrayUtils.addAll("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Length: 26\r\n\r\n".getBytes(), gzipped); - CtfService service = new CtfService(); - service.setUngzipHttp(true); - Packet p = createPacket(content, false); List list = new ArrayList<>(); list.add(p); - list = new StreamOptimizer(service, list).optimizeStream(); + new HttpGzipProcessor(list).unpackGzip(); final String processed = list.get(0).getContentString(); assertTrue(processed.contains("aaabbb")); } @@ -42,7 +41,7 @@ class StreamOptimizerTest { List list = new ArrayList<>(); list.add(p); - list = new StreamOptimizer(service, list).optimizeStream(); + new HttpUrldecodeProcessor(list).urldecodeRequests(); final String processed = list.get(0).getContentString(); assertTrue(processed.contains("а б")); } @@ -67,7 +66,7 @@ class StreamOptimizerTest { list.add(p5); list.add(p6); - list = new StreamOptimizer(service, list).optimizeStream(); + new PacketsMerger(list).mergeAdjacentPackets(); assertEquals(4, list.size()); assertEquals(2, list.get(1).getContent().length);