From 90894d67cb9659f99cd9430a03c0a12f8c99ff55 Mon Sep 17 00:00:00 2001 From: serega6531 Date: Mon, 25 Nov 2019 02:32:10 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=BE=20=D1=81=D0=BA=D0=BB=D0=B5=D0=B8=D0=B2=D0=B0=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5=20=D1=81=D0=BE=D1=81=D0=B5=D0=B4=D0=BD=D0=B8=D1=85?= =?UTF-8?q?=20=D0=BF=D0=B0=D0=BA=D0=B5=D1=82=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../packmate/service/StreamService.java | 165 +++++++++++------- src/main/resources/application.yml | 3 +- 2 files changed, 108 insertions(+), 60 deletions(-) diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 4b44449..e00e3b8 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -39,6 +39,7 @@ public class StreamService { private final String localIp; private final boolean unpackGzippedHttp; private final boolean ignoreEmptyPackets; + private final boolean mergeAdjacentPackets; private final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; private final java.util.regex.Pattern userAgentPattern = java.util.regex.Pattern.compile("User-Agent: (.+)\\n"); @@ -51,7 +52,8 @@ public class StreamService { StreamSubscriptionService subscriptionService, @Value("${local-ip}") String localIp, @Value("${unpack-gzipped-http}") boolean unpackGzippedHttp, - @Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) { + @Value("${ignore-empty-packets}") boolean ignoreEmptyPackets, + @Value("${merge-adjacent-packets}") boolean mergeAdjacentPackets) { this.repository = repository; this.patternService = patternService; this.servicesService = servicesService; @@ -60,6 +62,7 @@ public class StreamService { this.localIp = localIp; this.unpackGzippedHttp = unpackGzippedHttp; this.ignoreEmptyPackets = ignoreEmptyPackets; + this.mergeAdjacentPackets = mergeAdjacentPackets; } /** @@ -102,65 +105,11 @@ public class StreamService { } if (unpackGzippedHttp) { - boolean gzipStarted = false; - int gzipStartPacket = 0; - int gzipEndPacket; + unpackGzip(packets); + } - for (int i = 0; i < packets.size(); i++) { - Packet packet = packets.get(i); - - if (packet.isIncoming() && gzipStarted) { - gzipEndPacket = i - 1; - - List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); - - Packet decompressed = decompressGzipPackets(cut); - if (decompressed != null) { - packets.removeAll(cut); - packets.add(gzipStartPacket, decompressed); - gzipStarted = false; - i = gzipStartPacket + 1; - } - } else if (!packet.isIncoming()) { - String content = new String(packet.getContent()); - - int contentPos = content.indexOf("\r\n\r\n"); - boolean http = content.startsWith("HTTP/"); - - if (http && gzipStarted) { - gzipEndPacket = i - 1; - List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); - - Packet decompressed = decompressGzipPackets(cut); - if (decompressed != null) { - packets.removeAll(cut); - packets.add(gzipStartPacket, decompressed); - gzipStarted = false; - i = gzipStartPacket + 1; - } - } - - if (contentPos != -1) { // начало body - String headers = content.substring(0, contentPos); - boolean gziped = headers.contains("Content-Encoding: gzip\r\n"); - if (gziped) { - gzipStarted = true; - gzipStartPacket = i; - } - } - } - } - - if (gzipStarted) { - gzipEndPacket = packets.size() - 1; - List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); - - Packet decompressed = decompressGzipPackets(cut); - if (decompressed != null) { - packets.removeAll(cut); - packets.add(gzipStartPacket, decompressed); - } - } + if (mergeAdjacentPackets) { + mergeAdjacentPackets(packets); } String ua = null; @@ -199,6 +148,104 @@ public class StreamService { return true; } + private void mergeAdjacentPackets(List packets) { + int start = 0; + int packetsInRow = 0; + boolean incoming = false; + + for (int i = 0; i < packets.size(); i++) { + Packet packet = packets.get(i); + if (packet.isIncoming() != incoming || i == packets.size() - 1) { + if (packetsInRow > 1) { + final List cut = packets.subList(start, i); + packets.removeAll(cut); + //noinspection OptionalGetWithoutIsPresent + final byte[] content = cut.stream() + .map(Packet::getContent) + .reduce(ArrayUtils::addAll) + .get(); + + packets.add(start, Packet.builder() + .incoming(incoming) + .timestamp(packets.get(0).getTimestamp()) + .ungzipped(cut.stream().anyMatch(Packet::isUngzipped)) + .content(content) + .build()); + } + + start++; + i = start; + packetsInRow = 1; + } else { + packetsInRow++; + } + + incoming = packet.isIncoming(); + } + } + + private void unpackGzip(List packets) { + boolean gzipStarted = false; + int gzipStartPacket = 0; + int gzipEndPacket; + + for (int i = 0; i < packets.size(); i++) { + Packet packet = packets.get(i); + + if (packet.isIncoming() && gzipStarted) { + gzipEndPacket = i - 1; + + List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); + + Packet decompressed = decompressGzipPackets(cut); + if (decompressed != null) { + packets.removeAll(cut); + packets.add(gzipStartPacket, decompressed); + gzipStarted = false; + i = gzipStartPacket + 1; + } + } else if (!packet.isIncoming()) { + String content = new String(packet.getContent()); + + int contentPos = content.indexOf("\r\n\r\n"); + boolean http = content.startsWith("HTTP/"); + + if (http && gzipStarted) { + gzipEndPacket = i - 1; + List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); + + Packet decompressed = decompressGzipPackets(cut); + if (decompressed != null) { + packets.removeAll(cut); + packets.add(gzipStartPacket, decompressed); + gzipStarted = false; + i = gzipStartPacket + 1; + } + } + + if (contentPos != -1) { // начало body + String headers = content.substring(0, contentPos); + boolean gziped = headers.contains("Content-Encoding: gzip\r\n"); + if (gziped) { + gzipStarted = true; + gzipStartPacket = i; + } + } + } + } + + if (gzipStarted) { + gzipEndPacket = packets.size() - 1; + List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); + + Packet decompressed = decompressGzipPackets(cut); + if (decompressed != null) { + packets.removeAll(cut); + packets.add(gzipStartPacket, decompressed); + } + } + } + private Packet decompressGzipPackets(List packets) { //noinspection OptionalGetWithoutIsPresent final byte[] content = packets.stream() diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5ae077f..2348c1b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -24,4 +24,5 @@ udp-stream-timeout: 20 # секунд tcp-stream-timeout: 40 # секунд timeout-stream-check-interval: 10 # секунд ignore-empty-packets: true -unpack-gzipped-http: true \ No newline at end of file +unpack-gzipped-http: true +merge-adjacent-packets: true \ No newline at end of file