Merge branch 'chunked-transfer' into 'master'

Chunked transfer encoding support

Closes #23

See merge request packmate/Packmate!8
This commit is contained in:
Sergey
2020-04-14 17:06:56 +00:00
12 changed files with 394 additions and 153 deletions

View File

@@ -16,6 +16,8 @@ public class CtfService {
private String name; private String name;
private boolean processChunkedEncoding;
private boolean ungzipHttp; private boolean ungzipHttp;
private boolean urldecodeHttpRequests; private boolean urldecodeHttpRequests;

View File

@@ -57,7 +57,7 @@ public class FilePcapWorker extends AbstractPcapWorker {
final Packet packet = pcap.getNextPacketEx(); final Packet packet = pcap.getNextPacketEx();
gotPacket(packet); gotPacket(packet);
} catch (PcapNativeException e) { } catch (PcapNativeException e) {
log.error("Pcap read", e); log.error("Pcap read error: {}", e.getMessage());
Thread.sleep(100); Thread.sleep(100);
} catch (EOFException e) { } catch (EOFException e) {
stop(); stop();

View File

@@ -15,6 +15,7 @@ import ru.serega6531.packmate.model.pojo.Pagination;
import ru.serega6531.packmate.model.pojo.SubscriptionMessage; import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
import ru.serega6531.packmate.model.pojo.UnfinishedStream; import ru.serega6531.packmate.model.pojo.UnfinishedStream;
import ru.serega6531.packmate.repository.StreamRepository; import ru.serega6531.packmate.repository.StreamRepository;
import ru.serega6531.packmate.service.optimization.StreamOptimizer;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;

View File

@@ -0,0 +1,142 @@
package ru.serega6531.packmate.service.optimization;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import ru.serega6531.packmate.model.Packet;
import ru.serega6531.packmate.utils.Bytes;
import ru.serega6531.packmate.utils.PacketUtils;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Slf4j
@AllArgsConstructor
public class HttpChunksProcessor {
private List<Packet> packets;
public void processChunkedEncoding() {
boolean chunkStarted = false;
int start = -1;
List<Packet> chunk = new ArrayList<>();
for (int i = 0; i < packets.size(); i++) {
Packet packet = packets.get(i);
if (!packet.isIncoming()) {
String content = packet.getContentString();
boolean http = content.startsWith("HTTP/");
int contentPos = content.indexOf("\r\n\r\n");
if (http && contentPos != -1) { // начало body
String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n
boolean chunked = headers.contains("Transfer-Encoding: chunked\r\n");
if (chunked) {
chunkStarted = true;
start = i;
chunk.add(packet);
if (checkCompleteChunk(chunk, start)) {
chunkStarted = false;
chunk.clear();
i = start + 1;
}
} else {
chunkStarted = false;
chunk.clear();
}
} else if (chunkStarted) {
chunk.add(packet);
if (checkCompleteChunk(chunk, start)) {
chunkStarted = false;
chunk.clear();
i = start + 1;
}
}
}
}
}
/**
* @return true если чанк завершен
*/
private boolean checkCompleteChunk(List<Packet> chunk, int start) {
boolean end = chunk.get(chunk.size() - 1).getContentString().endsWith("\r\n0\r\n\r\n");
if (end) {
//noinspection OptionalGetWithoutIsPresent
final byte[] content = PacketUtils.mergePackets(chunk).get();
ByteArrayOutputStream output = new ByteArrayOutputStream(content.length);
final int contentStart = Bytes.indexOf(content, "\r\n\r\n".getBytes()) + 4;
output.write(content, 0, contentStart);
final byte[] body = Arrays.copyOfRange(content, contentStart, content.length);
int currentPos = 0;
while (true) {
final String found = readChunkSize(body, currentPos);
if (found != null) {
final int chunkSize = Integer.parseInt(found, 16);
if (chunkSize == 0) { // конец потока чанков
Packet result = Packet.builder()
.incoming(false)
.timestamp(chunk.get(0).getTimestamp())
.ungzipped(false)
.webSocketParsed(false)
.content(output.toByteArray())
.build();
packets.removeAll(chunk);
packets.add(start, result);
return true;
}
currentPos += found.length() + 2;
if (currentPos + chunkSize >= body.length) {
log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}", currentPos, chunkSize, body.length);
return true; // обнулить список, но не заменять пакеты
}
output.write(body, currentPos, chunkSize);
currentPos += chunkSize;
if (currentPos + 2 >= body.length || body[currentPos] != '\r' || body[currentPos + 1] != '\n') {
log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n");
return true; // обнулить список, но не заменять пакеты
}
currentPos += 2;
} else {
log.warn("Failed to merge chunks, next chunk size not found");
return true; // обнулить список, но не заменять пакеты
}
}
}
return false;
}
private String readChunkSize(byte[] content, int start) {
StringBuilder sb = new StringBuilder();
for (int i = start; i < content.length - 1; i++) {
byte b = content[i];
if ((b >= '0' && b <= '9') || (b >= 'a' && b <= 'f')) {
sb.append((char) b);
} else if (b == '\r' && content[i + 1] == '\n') {
return sb.toString();
}
}
return null;
}
}

View File

@@ -1,139 +1,28 @@
package ru.serega6531.packmate.service; package ru.serega6531.packmate.service.optimization;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import ru.serega6531.packmate.model.CtfService;
import ru.serega6531.packmate.model.Packet; import ru.serega6531.packmate.model.Packet;
import ru.serega6531.packmate.utils.Bytes; import ru.serega6531.packmate.utils.Bytes;
import ru.serega6531.packmate.utils.PacketUtils;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.ZipException; import java.util.zip.ZipException;
@AllArgsConstructor
@Slf4j @Slf4j
public class StreamOptimizer { @AllArgsConstructor
public class HttpGzipProcessor {
private final CtfService service;
private List<Packet> packets;
private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
/** private List<Packet> packets;
* Вызвать для выполнения оптимизаций на переданном списке пакетов.
*/
public List<Packet> optimizeStream() {
if (service.isUngzipHttp()) {
unpackGzip();
}
if (service.isParseWebSockets()) {
parseWebSockets();
}
if (service.isUrldecodeHttpRequests()) {
urldecodeRequests();
}
if (service.isMergeAdjacentPackets()) {
mergeAdjacentPackets();
}
return packets;
}
/**
* Сжать соседние пакеты в одном направлении в один.
* Выполняется после других оптимизаций чтобы правильно определять границы пакетов.
*/
private void mergeAdjacentPackets() {
int start = 0;
int packetsInRow = 0;
boolean incoming = true;
for (int i = 0; i < packets.size(); i++) {
Packet packet = packets.get(i);
if (packet.isIncoming() != incoming) {
if (packetsInRow > 1) {
compress(start, i);
i = start + 1; // продвигаем указатель на следующий после склеенного блок
}
start = i;
packetsInRow = 1;
} else {
packetsInRow++;
}
incoming = packet.isIncoming();
}
if (packetsInRow > 1) {
compress(start, packets.size());
}
}
/**
* Сжать кусок со start по end в один пакет
*/
private void compress(int start, int end) {
final List<Packet> cut = packets.subList(start, end);
final long timestamp = cut.get(0).getTimestamp();
final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped);
final boolean webSocketParsed = cut.stream().anyMatch(Packet::isWebSocketParsed);
boolean incoming = cut.get(0).isIncoming();
//noinspection OptionalGetWithoutIsPresent
final byte[] content = cut.stream()
.map(Packet::getContent)
.reduce(ArrayUtils::addAll)
.get();
packets.removeAll(cut);
packets.add(start, Packet.builder()
.incoming(incoming)
.timestamp(timestamp)
.ungzipped(ungzipped)
.webSocketParsed(webSocketParsed)
.content(content)
.build());
}
/**
* Декодирование urlencode с http пакета до смены стороны или окончания стрима
*/
@SneakyThrows
private void urldecodeRequests() {
boolean httpStarted = false;
for (Packet packet : packets) {
if (packet.isIncoming()) {
String content = packet.getContentString();
if (content.contains("HTTP/")) {
httpStarted = true;
}
if (httpStarted) {
try {
content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString());
packet.setContent(content.getBytes());
} catch (IllegalArgumentException e) {
log.warn("urldecode", e);
}
}
} else {
httpStarted = false;
}
}
}
/** /**
* Попытаться распаковать GZIP из исходящих http пакетов. <br> * Попытаться распаковать GZIP из исходящих http пакетов. <br>
@@ -142,7 +31,7 @@ public class StreamOptimizer {
* Поток заканчивается при обнаружении нового HTTP заголовка, * Поток заканчивается при обнаружении нового HTTP заголовка,
* при смене стороны передачи или при окончании всего стрима * при смене стороны передачи или при окончании всего стрима
*/ */
private void unpackGzip() { public void unpackGzip() {
boolean gzipStarted = false; boolean gzipStarted = false;
int gzipStartPacket = 0; int gzipStartPacket = 0;
int gzipEndPacket; int gzipEndPacket;
@@ -206,10 +95,7 @@ public class StreamOptimizer {
private Packet decompressGzipPackets(List<Packet> cut) { private Packet decompressGzipPackets(List<Packet> cut) {
//noinspection OptionalGetWithoutIsPresent //noinspection OptionalGetWithoutIsPresent
final byte[] content = cut.stream() final byte[] content = PacketUtils.mergePackets(cut).get();
.map(Packet::getContent)
.reduce(ArrayUtils::addAll)
.get();
final int gzipStart = Bytes.indexOf(content, GZIP_HEADER); final int gzipStart = Bytes.indexOf(content, GZIP_HEADER);
byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart); byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart);
@@ -231,7 +117,7 @@ public class StreamOptimizer {
.content(newContent) .content(newContent)
.build(); .build();
} catch (ZipException e) { } catch (ZipException e) {
log.warn("Failed to decompress gzip, leaving as it is", e); log.warn("Failed to decompress gzip, leaving as it is: {}", e.getMessage());
} catch (IOException e) { } catch (IOException e) {
log.error("decompress gzip", e); log.error("decompress gzip", e);
} }
@@ -239,17 +125,4 @@ public class StreamOptimizer {
return null; return null;
} }
private void parseWebSockets() {
if (!packets.get(0).getContentString().contains("HTTP/")) {
return;
}
final WebSocketsParser parser = new WebSocketsParser(packets);
if(!parser.isParsed()) {
return;
}
packets = parser.getParsedPackets();
}
} }

View File

@@ -0,0 +1,46 @@
package ru.serega6531.packmate.service.optimization;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import ru.serega6531.packmate.model.Packet;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
@AllArgsConstructor
@Slf4j
public class HttpUrldecodeProcessor {
private List<Packet> packets;
/**
* Декодирование urlencode с http пакета до смены стороны или окончания стрима
*/
@SneakyThrows
public void urldecodeRequests() {
boolean httpStarted = false;
for (Packet packet : packets) {
if (packet.isIncoming()) {
String content = packet.getContentString();
if (content.contains("HTTP/")) {
httpStarted = true;
}
if (httpStarted) {
try {
content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString());
packet.setContent(content.getBytes());
} catch (IllegalArgumentException e) {
log.warn("urldecode", e);
}
}
} else {
httpStarted = false;
}
}
}
}

View File

@@ -0,0 +1,67 @@
package ru.serega6531.packmate.service.optimization;
import lombok.AllArgsConstructor;
import ru.serega6531.packmate.model.Packet;
import ru.serega6531.packmate.utils.PacketUtils;
import java.util.List;
@AllArgsConstructor
public class PacketsMerger {
private List<Packet> packets;
/**
* Сжать соседние пакеты в одном направлении в один.
* Выполняется после других оптимизаций чтобы правильно определять границы пакетов.
*/
public void mergeAdjacentPackets() {
int start = 0;
int packetsInRow = 0;
boolean incoming = true;
for (int i = 0; i < packets.size(); i++) {
Packet packet = packets.get(i);
if (packet.isIncoming() != incoming) {
if (packetsInRow > 1) {
compress(start, i);
i = start + 1; // продвигаем указатель на следующий после склеенного блок
}
start = i;
packetsInRow = 1;
} else {
packetsInRow++;
}
incoming = packet.isIncoming();
}
if (packetsInRow > 1) {
compress(start, packets.size());
}
}
/**
* Сжать кусок со start по end в один пакет
*/
private void compress(int start, int end) {
final List<Packet> cut = packets.subList(start, end);
final long timestamp = cut.get(0).getTimestamp();
final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped);
final boolean webSocketParsed = cut.stream().anyMatch(Packet::isWebSocketParsed);
boolean incoming = cut.get(0).isIncoming();
//noinspection OptionalGetWithoutIsPresent
final byte[] content = PacketUtils.mergePackets(cut).get();
packets.removeAll(cut);
packets.add(start, Packet.builder()
.incoming(incoming)
.timestamp(timestamp)
.ungzipped(ungzipped)
.webSocketParsed(webSocketParsed)
.content(content)
.build());
}
}

View File

@@ -0,0 +1,96 @@
package ru.serega6531.packmate.service.optimization;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import ru.serega6531.packmate.model.CtfService;
import ru.serega6531.packmate.model.Packet;
import java.util.List;
@AllArgsConstructor
@Slf4j
public class StreamOptimizer {
private final CtfService service;
private List<Packet> packets;
/**
* Вызвать для выполнения оптимизаций на переданном списке пакетов.
*/
public List<Packet> optimizeStream() {
if (service.isProcessChunkedEncoding()) {
processChunkedEncoding();
}
if (service.isUngzipHttp()) {
unpackGzip();
}
if (service.isParseWebSockets()) {
parseWebSockets();
}
if (service.isUrldecodeHttpRequests()) {
urldecodeRequests();
}
if (service.isMergeAdjacentPackets()) {
mergeAdjacentPackets();
}
return packets;
}
/**
* Сжать соседние пакеты в одном направлении в один.
* Выполняется после других оптимизаций чтобы правильно определять границы пакетов.
*/
private void mergeAdjacentPackets() {
final PacketsMerger merger = new PacketsMerger(packets);
merger.mergeAdjacentPackets();
}
/**
* Декодирование urlencode с http пакета до смены стороны или окончания стрима
*/
@SneakyThrows
private void urldecodeRequests() {
final HttpUrldecodeProcessor processor = new HttpUrldecodeProcessor(packets);
processor.urldecodeRequests();
}
/**
* https://ru.wikipedia.org/wiki/Chunked_transfer_encoding
*/
private void processChunkedEncoding() {
HttpChunksProcessor processor = new HttpChunksProcessor(packets);
processor.processChunkedEncoding();
}
/**
* Попытаться распаковать GZIP из исходящих http пакетов. <br>
* GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip
* (при этом заголовок HTTP может быть в другом пакете)<br>
* Поток заканчивается при обнаружении нового HTTP заголовка,
* при смене стороны передачи или при окончании всего стрима
*/
private void unpackGzip() {
final HttpGzipProcessor processor = new HttpGzipProcessor(packets);
processor.unpackGzip();
}
private void parseWebSockets() {
if (!packets.get(0).getContentString().contains("HTTP/")) {
return;
}
final WebSocketsParser parser = new WebSocketsParser(packets);
if (!parser.isParsed()) {
return;
}
packets = parser.getParsedPackets();
}
}

View File

@@ -1,8 +1,7 @@
package ru.serega6531.packmate.service; package ru.serega6531.packmate.service.optimization;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.java_websocket.drafts.Draft_6455; import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.exceptions.InvalidDataException; import org.java_websocket.exceptions.InvalidDataException;
import org.java_websocket.exceptions.InvalidHandshakeException; import org.java_websocket.exceptions.InvalidHandshakeException;
@@ -12,6 +11,7 @@ import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.HandshakeImpl1Client; import org.java_websocket.handshake.HandshakeImpl1Client;
import org.java_websocket.handshake.HandshakeImpl1Server; import org.java_websocket.handshake.HandshakeImpl1Server;
import ru.serega6531.packmate.model.Packet; import ru.serega6531.packmate.model.Packet;
import ru.serega6531.packmate.utils.PacketUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@@ -110,10 +110,8 @@ public class WebSocketsParser {
for (List<Packet> side : sides) { for (List<Packet> side : sides) {
final Packet lastPacket = side.get(0); final Packet lastPacket = side.get(0);
final byte[] wsContent = side.stream() //noinspection OptionalGetWithoutIsPresent
.map(Packet::getContent) final byte[] wsContent = PacketUtils.mergePackets(side).get();
.reduce(ArrayUtils::addAll)
.get();
final ByteBuffer buffer = ByteBuffer.wrap(wsContent); final ByteBuffer buffer = ByteBuffer.wrap(wsContent);
List<Framedata> frames; List<Framedata> frames;
@@ -177,9 +175,7 @@ public class WebSocketsParser {
} }
private String getHandshake(final List<Packet> packets) { private String getHandshake(final List<Packet> packets) {
final String handshake = packets.stream() final String handshake = PacketUtils.mergePackets(packets)
.map(Packet::getContent)
.reduce(ArrayUtils::addAll)
.map(String::new) .map(String::new)
.orElse(null); .orElse(null);

View File

@@ -0,0 +1,19 @@
package ru.serega6531.packmate.utils;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.ArrayUtils;
import ru.serega6531.packmate.model.Packet;
import java.util.List;
import java.util.Optional;
@UtilityClass
public class PacketUtils {
public static Optional<byte[]> mergePackets(List<Packet> cut) {
return cut.stream()
.map(Packet::getContent)
.reduce(ArrayUtils::addAll);
}
}

View File

@@ -4,7 +4,9 @@ import org.apache.commons.lang3.ArrayUtils;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import ru.serega6531.packmate.model.CtfService; import ru.serega6531.packmate.model.CtfService;
import ru.serega6531.packmate.model.Packet; import ru.serega6531.packmate.model.Packet;
import ru.serega6531.packmate.service.StreamOptimizer; import ru.serega6531.packmate.service.optimization.HttpGzipProcessor;
import ru.serega6531.packmate.service.optimization.HttpUrldecodeProcessor;
import ru.serega6531.packmate.service.optimization.PacketsMerger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64; import java.util.Base64;
@@ -21,14 +23,11 @@ class StreamOptimizerTest {
final byte[] gzipped = Base64.getDecoder().decode(encoded); final byte[] gzipped = Base64.getDecoder().decode(encoded);
final byte[] content = ArrayUtils.addAll("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Length: 26\r\n\r\n".getBytes(), gzipped); final byte[] content = ArrayUtils.addAll("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Length: 26\r\n\r\n".getBytes(), gzipped);
CtfService service = new CtfService();
service.setUngzipHttp(true);
Packet p = createPacket(content, false); Packet p = createPacket(content, false);
List<Packet> list = new ArrayList<>(); List<Packet> list = new ArrayList<>();
list.add(p); list.add(p);
list = new StreamOptimizer(service, list).optimizeStream(); new HttpGzipProcessor(list).unpackGzip();
final String processed = list.get(0).getContentString(); final String processed = list.get(0).getContentString();
assertTrue(processed.contains("aaabbb")); assertTrue(processed.contains("aaabbb"));
} }
@@ -42,7 +41,7 @@ class StreamOptimizerTest {
List<Packet> list = new ArrayList<>(); List<Packet> list = new ArrayList<>();
list.add(p); list.add(p);
list = new StreamOptimizer(service, list).optimizeStream(); new HttpUrldecodeProcessor(list).urldecodeRequests();
final String processed = list.get(0).getContentString(); final String processed = list.get(0).getContentString();
assertTrue(processed.contains("а б")); assertTrue(processed.contains("а б"));
} }
@@ -67,7 +66,7 @@ class StreamOptimizerTest {
list.add(p5); list.add(p5);
list.add(p6); list.add(p6);
list = new StreamOptimizer(service, list).optimizeStream(); new PacketsMerger(list).mergeAdjacentPackets();
assertEquals(4, list.size()); assertEquals(4, list.size());
assertEquals(2, list.get(1).getContent().length); assertEquals(2, list.get(1).getContent().length);