diff --git a/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java b/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java index d8bde27..45ab970 100644 --- a/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java +++ b/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java @@ -37,11 +37,10 @@ public class WebSocketsParser { private static final String WEBSOCKET_CONNECTION_HEADER = "connection: upgrade\r\n"; private final List packets; - private List frames; @Getter private boolean parsed = false; - private int httpEnd = -1; + private List parsedPackets; public WebSocketsParser(List packets) { this.packets = packets; @@ -58,6 +57,7 @@ public class WebSocketsParser { return; } + int httpEnd = -1; for (int i = clientHandshakePackets.size(); i < packets.size(); i++) { if (packets.get(i).getContentString().endsWith("\r\n\r\n")) { httpEnd = i + 1; @@ -92,56 +92,88 @@ public class WebSocketsParser { return; } - final List wsPackets = this.packets.subList( + final List wsPackets = packets.subList( httpEnd, - this.packets.size()); + packets.size()); - final byte[] wsContent = wsPackets.stream() - .map(Packet::getContent) - .reduce(ArrayUtils::addAll) - .orElse(null); - - if (wsContent == null) { + if(wsPackets.isEmpty()) { return; } - final ByteBuffer frame = ByteBuffer.wrap(wsContent); - - try { - frames = draft.translateFrame(frame); - } catch (InvalidDataException e) { - log.warn("WebSocket data", e); - return; - } + final List handshakes = packets.subList(0, httpEnd); + parse(wsPackets, handshakes, draft); parsed = true; } + private void parse(final List wsPackets, final List handshakes, Draft_6455 draft) { + List> sides = sliceToSides(wsPackets); + parsedPackets = new ArrayList<>(handshakes); + + for (List side : sides) { + final Packet lastPacket = side.get(0); + + final byte[] wsContent = side.stream() + .map(Packet::getContent) + .reduce(ArrayUtils::addAll) + .get(); + + final ByteBuffer buffer = ByteBuffer.wrap(wsContent); + List frames; + + try { + frames = draft.translateFrame(buffer); + } catch (InvalidDataException e) { + log.warn("WebSocket data", e); + return; + } + + for (Framedata frame : frames) { + if(frame instanceof DataFrame) { + parsedPackets.add(Packet.builder() + .content(frame.getPayloadData().array()) + .incoming(lastPacket.isIncoming()) + .timestamp(lastPacket.getTimestamp()) + .ttl(lastPacket.getTtl()) + .ungzipped(lastPacket.isUngzipped()) + .build() + ); + } + } + } + } + public List getParsedPackets() { if (!parsed) { throw new IllegalStateException("WS is not parsed"); } - final List handshakes = packets.subList(0, httpEnd); - List newPackets = new ArrayList<>(handshakes.size() + frames.size()); - newPackets.addAll(handshakes); + return parsedPackets; + } - final Packet lastPacket = packets.get(packets.size() - 1); + private List> sliceToSides(List packets) { + List> result = new ArrayList<>(); + List side = new ArrayList<>(); + boolean incoming = true; - for (Framedata frame : frames) { - if(frame instanceof DataFrame) { - newPackets.add(Packet.builder() - .content(frame.getPayloadData().array()) - .incoming(true) //TODO - .timestamp(lastPacket.getTimestamp()) - .ttl(lastPacket.getTtl()) - .ungzipped(lastPacket.isUngzipped()) - .build() - ); + for (Packet packet : packets) { + if(packet.isIncoming() != incoming) { + incoming = packet.isIncoming(); + + if(!side.isEmpty()) { + result.add(side); + side = new ArrayList<>(); + } } + + side.add(packet); } - return newPackets; + if(!side.isEmpty()) { + result.add(side); + } + + return result; } private String getHandshake(final List packets) {