From 85681947cd0673a3a5f03c3e657a3ac8ed3066a6 Mon Sep 17 00:00:00 2001 From: serega6531 Date: Wed, 15 Apr 2020 23:09:44 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../packmate/service/SubscriptionService.java | 6 +- .../optimization/HttpChunksProcessor.java | 114 ++++++++++-------- .../service/optimization/StreamOptimizer.java | 2 +- .../ru/serega6531/packmate/utils/Bytes.java | 26 ++++ 4 files changed, 96 insertions(+), 52 deletions(-) diff --git a/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java index da64dc9..d5b2b38 100644 --- a/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java +++ b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java @@ -32,9 +32,9 @@ public class SubscriptionService { public void addSubscriber(WebSocketSession session) { subscribers.add(session); - log.info("User subscribed: {} {}", - session.getClass().getSimpleName(), - Objects.requireNonNull(session.getRemoteAddress()).getHostName()); + log.info("User subscribed: {} ({})", + Objects.requireNonNull(session.getRemoteAddress()).getHostName(), + session.getClass().getSimpleName()); } public void removeSubscriber(WebSocketSession session) { diff --git a/src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java b/src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java index cd22ca0..79fcbcb 100644 --- a/src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java +++ b/src/main/java/ru/serega6531/packmate/service/optimization/HttpChunksProcessor.java @@ -8,6 +8,7 @@ import ru.serega6531.packmate.utils.Bytes; import ru.serega6531.packmate.utils.PacketUtils; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -56,7 +57,7 @@ public class HttpChunksProcessor { } private void checkCompleteChunk(List packets, int start) { - boolean end = packets.get(packets.size() - 1).getContentString().endsWith("\r\n0\r\n\r\n"); + boolean end = Bytes.endsWith(packets.get(packets.size() - 1).getContent(), "\r\n0\r\n\r\n".getBytes()); if (end) { processChunk(packets, start); @@ -76,66 +77,81 @@ public class HttpChunksProcessor { ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(content, contentStart, content.length)); while (true) { - final String found = readChunkSize(buf); - if (found != null) { - final int chunkSize = Integer.parseInt(found, 16); + final int chunkSize = readChunkSize(buf); - if (chunkSize == 0) { // конец потока чанков - Packet result = Packet.builder() - .incoming(false) - .timestamp(packets.get(0).getTimestamp()) - .ungzipped(false) - .webSocketParsed(false) - .content(output.toByteArray()) - .build(); - - this.packets.removeAll(packets); - this.packets.add(start, result); - - resetChunk(); - position = start + 1; - - return; - } - - if (chunkSize > buf.remaining()) { - log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}", - buf.position(), chunkSize, buf.capacity()); + switch (chunkSize) { + case -1 -> { + log.warn("Failed to merge chunks, next chunk size not found"); resetChunk(); return; } - - byte[] chunk = new byte[chunkSize]; - buf.get(chunk); - output.write(chunk); - - if (buf.remaining() < 2) { - log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n"); - resetChunk(); + case 0 -> { + buildWholePacket(packets, start, output); return; } - - int c1 = buf.get(); - int c2 = buf.get(); - if(c1 != '\r' || c2 != '\n') { - log.warn("Failed to merge chunks, chunk trailer is not equal to \\r\\n"); - resetChunk(); - return; + default -> { + if (!readChunk(buf, chunkSize, output)) return; + if (!readTrailer(buf)) return; } - } else { - log.warn("Failed to merge chunks, next chunk size not found"); - resetChunk(); - return; } } } + private boolean readChunk(ByteBuffer buf, int chunkSize, ByteArrayOutputStream output) throws IOException { + if (chunkSize > buf.remaining()) { + log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}", + buf.position(), chunkSize, buf.capacity()); + resetChunk(); + return false; + } + + byte[] chunk = new byte[chunkSize]; + buf.get(chunk); + output.write(chunk); + return true; + } + + private boolean readTrailer(ByteBuffer buf) { + if (buf.remaining() < 2) { + log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n"); + resetChunk(); + return false; + } + + int c1 = buf.get(); + int c2 = buf.get(); + + if(c1 != '\r' || c2 != '\n') { + log.warn("Failed to merge chunks, chunk trailer is not equal to \\r\\n"); + resetChunk(); + return false; + } + + return true; + } + + private void buildWholePacket(List packets, int start, ByteArrayOutputStream output) { + Packet result = Packet.builder() + .incoming(false) + .timestamp(packets.get(0).getTimestamp()) + .ungzipped(false) + .webSocketParsed(false) + .content(output.toByteArray()) + .build(); + + this.packets.removeAll(packets); + this.packets.add(start, result); + + resetChunk(); + position = start + 1; + } + private void resetChunk() { chunkStarted = false; chunkPackets.clear(); } - private String readChunkSize(ByteBuffer buf) { + private int readChunkSize(ByteBuffer buf) { StringBuilder sb = new StringBuilder(); while (buf.remaining() > 2) { @@ -145,14 +161,16 @@ public class HttpChunksProcessor { sb.append((char) b); } else if (b == '\r') { if(buf.get() == '\n') { - return sb.toString(); + return Integer.parseInt(sb.toString(), 16); } else { - return null; // после \r не идет \n + return -1; // после \r не идет \n } + } else { + return -1; } } - return null; + return -1; } } diff --git a/src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java b/src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java index 5c856a1..c54e2a7 100644 --- a/src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java +++ b/src/main/java/ru/serega6531/packmate/service/optimization/StreamOptimizer.java @@ -61,7 +61,7 @@ public class StreamOptimizer { } /** - * https://ru.wikipedia.org/wiki/Chunked_transfer_encoding + * Chunked transfer encoding */ private void processChunkedEncoding() { HttpChunksProcessor processor = new HttpChunksProcessor(packets); diff --git a/src/main/java/ru/serega6531/packmate/utils/Bytes.java b/src/main/java/ru/serega6531/packmate/utils/Bytes.java index 6012f70..c3509c6 100644 --- a/src/main/java/ru/serega6531/packmate/utils/Bytes.java +++ b/src/main/java/ru/serega6531/packmate/utils/Bytes.java @@ -5,10 +5,18 @@ import lombok.experimental.UtilityClass; @UtilityClass public class Bytes { + /** + * @param array где ищем + * @param target что ищем + */ public int indexOf(byte[] array, byte[] target) { return indexOf(array, target, 0, array.length); } + /** + * @param array где ищем + * @param target что ищем + */ public int indexOf(byte[] array, byte[] target, int start, int end) { if (target.length == 0) { return 0; @@ -26,4 +34,22 @@ public class Bytes { return -1; } + /** + * @param array где ищем + * @param target что ищем + */ + public boolean endsWith(byte[] array, byte[] target) { + if(array.length < target.length) { + return false; + } + + for (int i = 0; i < target.length; i++) { + if(array[array.length - target.length + i] != target[i]) { + return false; + } + } + + return true; + } + }