Рефакторинг
This commit is contained in:
@@ -1,383 +0,0 @@
|
|||||||
package ru.serega6531.packmate.service;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.SneakyThrows;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.lang3.ArrayUtils;
|
|
||||||
import ru.serega6531.packmate.model.CtfService;
|
|
||||||
import ru.serega6531.packmate.model.Packet;
|
|
||||||
import ru.serega6531.packmate.utils.Bytes;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URLDecoder;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.zip.GZIPInputStream;
|
|
||||||
import java.util.zip.ZipException;
|
|
||||||
|
|
||||||
@AllArgsConstructor
|
|
||||||
@Slf4j
|
|
||||||
public class StreamOptimizer {
|
|
||||||
|
|
||||||
private final CtfService service;
|
|
||||||
private List<Packet> packets;
|
|
||||||
|
|
||||||
private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Вызвать для выполнения оптимизаций на переданном списке пакетов.
|
|
||||||
*/
|
|
||||||
public List<Packet> optimizeStream() {
|
|
||||||
if (service.isProcessChunkedEncoding()) {
|
|
||||||
processChunkedEncoding();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (service.isUngzipHttp()) {
|
|
||||||
unpackGzip();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (service.isParseWebSockets()) {
|
|
||||||
parseWebSockets();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (service.isUrldecodeHttpRequests()) {
|
|
||||||
urldecodeRequests();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (service.isMergeAdjacentPackets()) {
|
|
||||||
mergeAdjacentPackets();
|
|
||||||
}
|
|
||||||
|
|
||||||
return packets;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Сжать соседние пакеты в одном направлении в один.
|
|
||||||
* Выполняется после других оптимизаций чтобы правильно определять границы пакетов.
|
|
||||||
*/
|
|
||||||
private void mergeAdjacentPackets() {
|
|
||||||
int start = 0;
|
|
||||||
int packetsInRow = 0;
|
|
||||||
boolean incoming = true;
|
|
||||||
|
|
||||||
for (int i = 0; i < packets.size(); i++) {
|
|
||||||
Packet packet = packets.get(i);
|
|
||||||
if (packet.isIncoming() != incoming) {
|
|
||||||
if (packetsInRow > 1) {
|
|
||||||
compress(start, i);
|
|
||||||
|
|
||||||
i = start + 1; // продвигаем указатель на следующий после склеенного блок
|
|
||||||
}
|
|
||||||
start = i;
|
|
||||||
packetsInRow = 1;
|
|
||||||
} else {
|
|
||||||
packetsInRow++;
|
|
||||||
}
|
|
||||||
|
|
||||||
incoming = packet.isIncoming();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (packetsInRow > 1) {
|
|
||||||
compress(start, packets.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Сжать кусок со start по end в один пакет
|
|
||||||
*/
|
|
||||||
private void compress(int start, int end) {
|
|
||||||
final List<Packet> cut = packets.subList(start, end);
|
|
||||||
final long timestamp = cut.get(0).getTimestamp();
|
|
||||||
final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped);
|
|
||||||
final boolean webSocketParsed = cut.stream().anyMatch(Packet::isWebSocketParsed);
|
|
||||||
boolean incoming = cut.get(0).isIncoming();
|
|
||||||
//noinspection OptionalGetWithoutIsPresent
|
|
||||||
final byte[] content = mergePackets(cut).get();
|
|
||||||
|
|
||||||
packets.removeAll(cut);
|
|
||||||
packets.add(start, Packet.builder()
|
|
||||||
.incoming(incoming)
|
|
||||||
.timestamp(timestamp)
|
|
||||||
.ungzipped(ungzipped)
|
|
||||||
.webSocketParsed(webSocketParsed)
|
|
||||||
.content(content)
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Декодирование urlencode с http пакета до смены стороны или окончания стрима
|
|
||||||
*/
|
|
||||||
@SneakyThrows
|
|
||||||
private void urldecodeRequests() {
|
|
||||||
boolean httpStarted = false;
|
|
||||||
|
|
||||||
for (Packet packet : packets) {
|
|
||||||
if (packet.isIncoming()) {
|
|
||||||
String content = packet.getContentString();
|
|
||||||
if (content.contains("HTTP/")) {
|
|
||||||
httpStarted = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (httpStarted) {
|
|
||||||
try {
|
|
||||||
content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString());
|
|
||||||
packet.setContent(content.getBytes());
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
log.warn("urldecode", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
httpStarted = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processChunkedEncoding() {
|
|
||||||
boolean chunkStarted = false;
|
|
||||||
int start = -1;
|
|
||||||
List<Packet> chunk = new ArrayList<>();
|
|
||||||
|
|
||||||
for (int i = 0; i < packets.size(); i++) {
|
|
||||||
Packet packet = packets.get(i);
|
|
||||||
if (!packet.isIncoming()) {
|
|
||||||
String content = packet.getContentString();
|
|
||||||
|
|
||||||
boolean http = content.startsWith("HTTP/");
|
|
||||||
int contentPos = content.indexOf("\r\n\r\n");
|
|
||||||
|
|
||||||
if (http && contentPos != -1) { // начало body
|
|
||||||
String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n
|
|
||||||
boolean chunked = headers.contains("Transfer-Encoding: chunked\r\n");
|
|
||||||
if (chunked) {
|
|
||||||
chunkStarted = true;
|
|
||||||
start = i;
|
|
||||||
chunk.add(packet);
|
|
||||||
|
|
||||||
if (checkCompleteChunk(chunk, start)) {
|
|
||||||
chunkStarted = false;
|
|
||||||
chunk.clear();
|
|
||||||
i = start + 1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
chunkStarted = false;
|
|
||||||
chunk.clear();
|
|
||||||
}
|
|
||||||
} else if (chunkStarted) {
|
|
||||||
chunk.add(packet);
|
|
||||||
if (checkCompleteChunk(chunk, start)) {
|
|
||||||
chunkStarted = false;
|
|
||||||
chunk.clear();
|
|
||||||
i = start + 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true если чанк завершен
|
|
||||||
*/
|
|
||||||
private boolean checkCompleteChunk(List<Packet> chunk, int start) {
|
|
||||||
boolean end = chunk.get(chunk.size() - 1).getContentString().endsWith("\r\n0\r\n\r\n");
|
|
||||||
|
|
||||||
if (end) {
|
|
||||||
//noinspection OptionalGetWithoutIsPresent
|
|
||||||
final byte[] content = mergePackets(chunk).get();
|
|
||||||
|
|
||||||
ByteArrayOutputStream output = new ByteArrayOutputStream(content.length);
|
|
||||||
|
|
||||||
final int contentStart = Bytes.indexOf(content, "\r\n\r\n".getBytes()) + 4;
|
|
||||||
output.write(content, 0, contentStart);
|
|
||||||
|
|
||||||
final byte[] body = Arrays.copyOfRange(content, contentStart, content.length);
|
|
||||||
|
|
||||||
int currentPos = 0;
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
final String found = readChunkSize(body, currentPos);
|
|
||||||
if (found != null) {
|
|
||||||
final int chunkSize = Integer.parseInt(found, 16);
|
|
||||||
|
|
||||||
if (chunkSize == 0) { // конец потока чанков
|
|
||||||
Packet result = Packet.builder()
|
|
||||||
.incoming(false)
|
|
||||||
.timestamp(chunk.get(0).getTimestamp())
|
|
||||||
.ungzipped(false)
|
|
||||||
.webSocketParsed(false)
|
|
||||||
.content(output.toByteArray())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
packets.removeAll(chunk);
|
|
||||||
packets.add(start, result);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
currentPos += found.length() + 2;
|
|
||||||
|
|
||||||
if (currentPos + chunkSize >= body.length) {
|
|
||||||
log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}", currentPos, chunkSize, body.length);
|
|
||||||
return true; // обнулить список, но не заменять пакеты
|
|
||||||
}
|
|
||||||
|
|
||||||
output.write(body, currentPos, chunkSize);
|
|
||||||
currentPos += chunkSize;
|
|
||||||
|
|
||||||
if (currentPos + 2 >= body.length || body[currentPos] != '\r' || body[currentPos + 1] != '\n') {
|
|
||||||
log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n");
|
|
||||||
return true; // обнулить список, но не заменять пакеты
|
|
||||||
}
|
|
||||||
|
|
||||||
currentPos += 2;
|
|
||||||
} else {
|
|
||||||
log.warn("Failed to merge chunks, next chunk size not found");
|
|
||||||
return true; // обнулить список, но не заменять пакеты
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String readChunkSize(byte[] content, int start) {
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for (int i = start; i < content.length - 1; i++) {
|
|
||||||
byte b = content[i];
|
|
||||||
|
|
||||||
if ((b >= '0' && b <= '9') || (b >= 'a' && b <= 'f')) {
|
|
||||||
sb.append((char) b);
|
|
||||||
} else if (b == '\r' && content[i + 1] == '\n') {
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Попытаться распаковать GZIP из исходящих http пакетов. <br>
|
|
||||||
* GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip
|
|
||||||
* (при этом заголовок HTTP может быть в другом пакете)<br>
|
|
||||||
* Поток заканчивается при обнаружении нового HTTP заголовка,
|
|
||||||
* при смене стороны передачи или при окончании всего стрима
|
|
||||||
*/
|
|
||||||
private void unpackGzip() {
|
|
||||||
boolean gzipStarted = false;
|
|
||||||
int gzipStartPacket = 0;
|
|
||||||
int gzipEndPacket;
|
|
||||||
|
|
||||||
for (int i = 0; i < packets.size(); i++) {
|
|
||||||
Packet packet = packets.get(i);
|
|
||||||
|
|
||||||
if (packet.isIncoming() && gzipStarted) { // поток gzip закончился
|
|
||||||
gzipEndPacket = i - 1;
|
|
||||||
if (extractGzip(gzipStartPacket, gzipEndPacket)) {
|
|
||||||
gzipStarted = false;
|
|
||||||
i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок
|
|
||||||
}
|
|
||||||
} else if (!packet.isIncoming()) {
|
|
||||||
String content = packet.getContentString();
|
|
||||||
|
|
||||||
int contentPos = content.indexOf("\r\n\r\n");
|
|
||||||
boolean http = content.startsWith("HTTP/");
|
|
||||||
|
|
||||||
if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток
|
|
||||||
gzipEndPacket = i - 1;
|
|
||||||
if (extractGzip(gzipStartPacket, gzipEndPacket)) {
|
|
||||||
gzipStarted = false;
|
|
||||||
i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (contentPos != -1) { // начало body
|
|
||||||
String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n
|
|
||||||
boolean gziped = headers.contains("Content-Encoding: gzip\r\n");
|
|
||||||
if (gziped) {
|
|
||||||
gzipStarted = true;
|
|
||||||
gzipStartPacket = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (gzipStarted) { // стрим закончился gzip пакетом
|
|
||||||
extractGzip(gzipStartPacket, packets.size() - 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Попытаться распаковать кусок пакетов с gzip body и вставить результат на их место
|
|
||||||
*
|
|
||||||
* @return получилось ли распаковать
|
|
||||||
*/
|
|
||||||
private boolean extractGzip(int gzipStartPacket, int gzipEndPacket) {
|
|
||||||
List<Packet> cut = packets.subList(gzipStartPacket, gzipEndPacket + 1);
|
|
||||||
|
|
||||||
Packet decompressed = decompressGzipPackets(cut);
|
|
||||||
if (decompressed != null) {
|
|
||||||
packets.removeAll(cut);
|
|
||||||
packets.add(gzipStartPacket, decompressed);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Packet decompressGzipPackets(List<Packet> cut) {
|
|
||||||
//noinspection OptionalGetWithoutIsPresent
|
|
||||||
final byte[] content = mergePackets(cut).get();
|
|
||||||
|
|
||||||
final int gzipStart = Bytes.indexOf(content, GZIP_HEADER);
|
|
||||||
byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart);
|
|
||||||
byte[] gzipBytes = Arrays.copyOfRange(content, gzipStart, content.length);
|
|
||||||
|
|
||||||
try {
|
|
||||||
final GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(gzipBytes));
|
|
||||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
||||||
IOUtils.copy(gzipStream, out);
|
|
||||||
byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray());
|
|
||||||
|
|
||||||
log.debug("GZIP decompressed: {} -> {} bytes", gzipBytes.length, out.size());
|
|
||||||
|
|
||||||
return Packet.builder()
|
|
||||||
.incoming(false)
|
|
||||||
.timestamp(cut.get(0).getTimestamp())
|
|
||||||
.ungzipped(true)
|
|
||||||
.webSocketParsed(false)
|
|
||||||
.content(newContent)
|
|
||||||
.build();
|
|
||||||
} catch (ZipException e) {
|
|
||||||
log.warn("Failed to decompress gzip, leaving as it is: {}", e.getMessage());
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("decompress gzip", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void parseWebSockets() {
|
|
||||||
if (!packets.get(0).getContentString().contains("HTTP/")) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final WebSocketsParser parser = new WebSocketsParser(packets);
|
|
||||||
if (!parser.isParsed()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
packets = parser.getParsedPackets();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Optional<byte[]> mergePackets(List<Packet> cut) {
|
|
||||||
return cut.stream()
|
|
||||||
.map(Packet::getContent)
|
|
||||||
.reduce(ArrayUtils::addAll);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -15,6 +15,7 @@ import ru.serega6531.packmate.model.pojo.Pagination;
|
|||||||
import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
|
import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
|
||||||
import ru.serega6531.packmate.model.pojo.UnfinishedStream;
|
import ru.serega6531.packmate.model.pojo.UnfinishedStream;
|
||||||
import ru.serega6531.packmate.repository.StreamRepository;
|
import ru.serega6531.packmate.repository.StreamRepository;
|
||||||
|
import ru.serega6531.packmate.service.optimization.StreamOptimizer;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|||||||
@@ -0,0 +1,142 @@
|
|||||||
|
package ru.serega6531.packmate.service.optimization;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import ru.serega6531.packmate.model.Packet;
|
||||||
|
import ru.serega6531.packmate.utils.Bytes;
|
||||||
|
import ru.serega6531.packmate.utils.PacketUtils;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class HttpChunksProcessor {
|
||||||
|
|
||||||
|
private List<Packet> packets;
|
||||||
|
|
||||||
|
public void processChunkedEncoding() {
|
||||||
|
boolean chunkStarted = false;
|
||||||
|
int start = -1;
|
||||||
|
List<Packet> chunk = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < packets.size(); i++) {
|
||||||
|
Packet packet = packets.get(i);
|
||||||
|
if (!packet.isIncoming()) {
|
||||||
|
String content = packet.getContentString();
|
||||||
|
|
||||||
|
boolean http = content.startsWith("HTTP/");
|
||||||
|
int contentPos = content.indexOf("\r\n\r\n");
|
||||||
|
|
||||||
|
if (http && contentPos != -1) { // начало body
|
||||||
|
String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n
|
||||||
|
boolean chunked = headers.contains("Transfer-Encoding: chunked\r\n");
|
||||||
|
if (chunked) {
|
||||||
|
chunkStarted = true;
|
||||||
|
start = i;
|
||||||
|
chunk.add(packet);
|
||||||
|
|
||||||
|
if (checkCompleteChunk(chunk, start)) {
|
||||||
|
chunkStarted = false;
|
||||||
|
chunk.clear();
|
||||||
|
i = start + 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
chunkStarted = false;
|
||||||
|
chunk.clear();
|
||||||
|
}
|
||||||
|
} else if (chunkStarted) {
|
||||||
|
chunk.add(packet);
|
||||||
|
if (checkCompleteChunk(chunk, start)) {
|
||||||
|
chunkStarted = false;
|
||||||
|
chunk.clear();
|
||||||
|
i = start + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true если чанк завершен
|
||||||
|
*/
|
||||||
|
private boolean checkCompleteChunk(List<Packet> chunk, int start) {
|
||||||
|
boolean end = chunk.get(chunk.size() - 1).getContentString().endsWith("\r\n0\r\n\r\n");
|
||||||
|
|
||||||
|
if (end) {
|
||||||
|
//noinspection OptionalGetWithoutIsPresent
|
||||||
|
final byte[] content = PacketUtils.mergePackets(chunk).get();
|
||||||
|
|
||||||
|
ByteArrayOutputStream output = new ByteArrayOutputStream(content.length);
|
||||||
|
|
||||||
|
final int contentStart = Bytes.indexOf(content, "\r\n\r\n".getBytes()) + 4;
|
||||||
|
output.write(content, 0, contentStart);
|
||||||
|
|
||||||
|
final byte[] body = Arrays.copyOfRange(content, contentStart, content.length);
|
||||||
|
|
||||||
|
int currentPos = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
final String found = readChunkSize(body, currentPos);
|
||||||
|
if (found != null) {
|
||||||
|
final int chunkSize = Integer.parseInt(found, 16);
|
||||||
|
|
||||||
|
if (chunkSize == 0) { // конец потока чанков
|
||||||
|
Packet result = Packet.builder()
|
||||||
|
.incoming(false)
|
||||||
|
.timestamp(chunk.get(0).getTimestamp())
|
||||||
|
.ungzipped(false)
|
||||||
|
.webSocketParsed(false)
|
||||||
|
.content(output.toByteArray())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
packets.removeAll(chunk);
|
||||||
|
packets.add(start, result);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
currentPos += found.length() + 2;
|
||||||
|
|
||||||
|
if (currentPos + chunkSize >= body.length) {
|
||||||
|
log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}", currentPos, chunkSize, body.length);
|
||||||
|
return true; // обнулить список, но не заменять пакеты
|
||||||
|
}
|
||||||
|
|
||||||
|
output.write(body, currentPos, chunkSize);
|
||||||
|
currentPos += chunkSize;
|
||||||
|
|
||||||
|
if (currentPos + 2 >= body.length || body[currentPos] != '\r' || body[currentPos + 1] != '\n') {
|
||||||
|
log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n");
|
||||||
|
return true; // обнулить список, но не заменять пакеты
|
||||||
|
}
|
||||||
|
|
||||||
|
currentPos += 2;
|
||||||
|
} else {
|
||||||
|
log.warn("Failed to merge chunks, next chunk size not found");
|
||||||
|
return true; // обнулить список, но не заменять пакеты
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String readChunkSize(byte[] content, int start) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (int i = start; i < content.length - 1; i++) {
|
||||||
|
byte b = content[i];
|
||||||
|
|
||||||
|
if ((b >= '0' && b <= '9') || (b >= 'a' && b <= 'f')) {
|
||||||
|
sb.append((char) b);
|
||||||
|
} else if (b == '\r' && content[i + 1] == '\n') {
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,128 @@
|
|||||||
|
package ru.serega6531.packmate.service.optimization;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
|
import ru.serega6531.packmate.model.Packet;
|
||||||
|
import ru.serega6531.packmate.utils.Bytes;
|
||||||
|
import ru.serega6531.packmate.utils.PacketUtils;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.zip.GZIPInputStream;
|
||||||
|
import java.util.zip.ZipException;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class HttpGzipProcessor {
|
||||||
|
|
||||||
|
private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
|
||||||
|
|
||||||
|
private List<Packet> packets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Попытаться распаковать GZIP из исходящих http пакетов. <br>
|
||||||
|
* GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip
|
||||||
|
* (при этом заголовок HTTP может быть в другом пакете)<br>
|
||||||
|
* Поток заканчивается при обнаружении нового HTTP заголовка,
|
||||||
|
* при смене стороны передачи или при окончании всего стрима
|
||||||
|
*/
|
||||||
|
public void unpackGzip() {
|
||||||
|
boolean gzipStarted = false;
|
||||||
|
int gzipStartPacket = 0;
|
||||||
|
int gzipEndPacket;
|
||||||
|
|
||||||
|
for (int i = 0; i < packets.size(); i++) {
|
||||||
|
Packet packet = packets.get(i);
|
||||||
|
|
||||||
|
if (packet.isIncoming() && gzipStarted) { // поток gzip закончился
|
||||||
|
gzipEndPacket = i - 1;
|
||||||
|
if (extractGzip(gzipStartPacket, gzipEndPacket)) {
|
||||||
|
gzipStarted = false;
|
||||||
|
i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок
|
||||||
|
}
|
||||||
|
} else if (!packet.isIncoming()) {
|
||||||
|
String content = packet.getContentString();
|
||||||
|
|
||||||
|
int contentPos = content.indexOf("\r\n\r\n");
|
||||||
|
boolean http = content.startsWith("HTTP/");
|
||||||
|
|
||||||
|
if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток
|
||||||
|
gzipEndPacket = i - 1;
|
||||||
|
if (extractGzip(gzipStartPacket, gzipEndPacket)) {
|
||||||
|
gzipStarted = false;
|
||||||
|
i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (contentPos != -1) { // начало body
|
||||||
|
String headers = content.substring(0, contentPos + 2); // захватываем первые \r\n
|
||||||
|
boolean gziped = headers.contains("Content-Encoding: gzip\r\n");
|
||||||
|
if (gziped) {
|
||||||
|
gzipStarted = true;
|
||||||
|
gzipStartPacket = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (gzipStarted) { // стрим закончился gzip пакетом
|
||||||
|
extractGzip(gzipStartPacket, packets.size() - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Попытаться распаковать кусок пакетов с gzip body и вставить результат на их место
|
||||||
|
*
|
||||||
|
* @return получилось ли распаковать
|
||||||
|
*/
|
||||||
|
private boolean extractGzip(int gzipStartPacket, int gzipEndPacket) {
|
||||||
|
List<Packet> cut = packets.subList(gzipStartPacket, gzipEndPacket + 1);
|
||||||
|
|
||||||
|
Packet decompressed = decompressGzipPackets(cut);
|
||||||
|
if (decompressed != null) {
|
||||||
|
packets.removeAll(cut);
|
||||||
|
packets.add(gzipStartPacket, decompressed);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Packet decompressGzipPackets(List<Packet> cut) {
|
||||||
|
//noinspection OptionalGetWithoutIsPresent
|
||||||
|
final byte[] content = PacketUtils.mergePackets(cut).get();
|
||||||
|
|
||||||
|
final int gzipStart = Bytes.indexOf(content, GZIP_HEADER);
|
||||||
|
byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart);
|
||||||
|
byte[] gzipBytes = Arrays.copyOfRange(content, gzipStart, content.length);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(gzipBytes));
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
IOUtils.copy(gzipStream, out);
|
||||||
|
byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray());
|
||||||
|
|
||||||
|
log.debug("GZIP decompressed: {} -> {} bytes", gzipBytes.length, out.size());
|
||||||
|
|
||||||
|
return Packet.builder()
|
||||||
|
.incoming(false)
|
||||||
|
.timestamp(cut.get(0).getTimestamp())
|
||||||
|
.ungzipped(true)
|
||||||
|
.webSocketParsed(false)
|
||||||
|
.content(newContent)
|
||||||
|
.build();
|
||||||
|
} catch (ZipException e) {
|
||||||
|
log.warn("Failed to decompress gzip, leaving as it is: {}", e.getMessage());
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("decompress gzip", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,46 @@
|
|||||||
|
package ru.serega6531.packmate.service.optimization;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import ru.serega6531.packmate.model.Packet;
|
||||||
|
|
||||||
|
import java.net.URLDecoder;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class HttpUrldecodeProcessor {
|
||||||
|
|
||||||
|
private List<Packet> packets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Декодирование urlencode с http пакета до смены стороны или окончания стрима
|
||||||
|
*/
|
||||||
|
@SneakyThrows
|
||||||
|
public void urldecodeRequests() {
|
||||||
|
boolean httpStarted = false;
|
||||||
|
|
||||||
|
for (Packet packet : packets) {
|
||||||
|
if (packet.isIncoming()) {
|
||||||
|
String content = packet.getContentString();
|
||||||
|
if (content.contains("HTTP/")) {
|
||||||
|
httpStarted = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (httpStarted) {
|
||||||
|
try {
|
||||||
|
content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString());
|
||||||
|
packet.setContent(content.getBytes());
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
log.warn("urldecode", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
httpStarted = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,67 @@
|
|||||||
|
package ru.serega6531.packmate.service.optimization;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import ru.serega6531.packmate.model.Packet;
|
||||||
|
import ru.serega6531.packmate.utils.PacketUtils;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class PacketsMerger {
|
||||||
|
|
||||||
|
private List<Packet> packets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Сжать соседние пакеты в одном направлении в один.
|
||||||
|
* Выполняется после других оптимизаций чтобы правильно определять границы пакетов.
|
||||||
|
*/
|
||||||
|
public void mergeAdjacentPackets() {
|
||||||
|
int start = 0;
|
||||||
|
int packetsInRow = 0;
|
||||||
|
boolean incoming = true;
|
||||||
|
|
||||||
|
for (int i = 0; i < packets.size(); i++) {
|
||||||
|
Packet packet = packets.get(i);
|
||||||
|
if (packet.isIncoming() != incoming) {
|
||||||
|
if (packetsInRow > 1) {
|
||||||
|
compress(start, i);
|
||||||
|
|
||||||
|
i = start + 1; // продвигаем указатель на следующий после склеенного блок
|
||||||
|
}
|
||||||
|
start = i;
|
||||||
|
packetsInRow = 1;
|
||||||
|
} else {
|
||||||
|
packetsInRow++;
|
||||||
|
}
|
||||||
|
|
||||||
|
incoming = packet.isIncoming();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (packetsInRow > 1) {
|
||||||
|
compress(start, packets.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Сжать кусок со start по end в один пакет
|
||||||
|
*/
|
||||||
|
private void compress(int start, int end) {
|
||||||
|
final List<Packet> cut = packets.subList(start, end);
|
||||||
|
final long timestamp = cut.get(0).getTimestamp();
|
||||||
|
final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped);
|
||||||
|
final boolean webSocketParsed = cut.stream().anyMatch(Packet::isWebSocketParsed);
|
||||||
|
boolean incoming = cut.get(0).isIncoming();
|
||||||
|
//noinspection OptionalGetWithoutIsPresent
|
||||||
|
final byte[] content = PacketUtils.mergePackets(cut).get();
|
||||||
|
|
||||||
|
packets.removeAll(cut);
|
||||||
|
packets.add(start, Packet.builder()
|
||||||
|
.incoming(incoming)
|
||||||
|
.timestamp(timestamp)
|
||||||
|
.ungzipped(ungzipped)
|
||||||
|
.webSocketParsed(webSocketParsed)
|
||||||
|
.content(content)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,96 @@
|
|||||||
|
package ru.serega6531.packmate.service.optimization;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import ru.serega6531.packmate.model.CtfService;
|
||||||
|
import ru.serega6531.packmate.model.Packet;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class StreamOptimizer {
|
||||||
|
|
||||||
|
private final CtfService service;
|
||||||
|
private List<Packet> packets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Вызвать для выполнения оптимизаций на переданном списке пакетов.
|
||||||
|
*/
|
||||||
|
public List<Packet> optimizeStream() {
|
||||||
|
if (service.isProcessChunkedEncoding()) {
|
||||||
|
processChunkedEncoding();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (service.isUngzipHttp()) {
|
||||||
|
unpackGzip();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (service.isParseWebSockets()) {
|
||||||
|
parseWebSockets();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (service.isUrldecodeHttpRequests()) {
|
||||||
|
urldecodeRequests();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (service.isMergeAdjacentPackets()) {
|
||||||
|
mergeAdjacentPackets();
|
||||||
|
}
|
||||||
|
|
||||||
|
return packets;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Сжать соседние пакеты в одном направлении в один.
|
||||||
|
* Выполняется после других оптимизаций чтобы правильно определять границы пакетов.
|
||||||
|
*/
|
||||||
|
private void mergeAdjacentPackets() {
|
||||||
|
final PacketsMerger merger = new PacketsMerger(packets);
|
||||||
|
merger.mergeAdjacentPackets();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Декодирование urlencode с http пакета до смены стороны или окончания стрима
|
||||||
|
*/
|
||||||
|
@SneakyThrows
|
||||||
|
private void urldecodeRequests() {
|
||||||
|
final HttpUrldecodeProcessor processor = new HttpUrldecodeProcessor(packets);
|
||||||
|
processor.urldecodeRequests();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* https://ru.wikipedia.org/wiki/Chunked_transfer_encoding
|
||||||
|
*/
|
||||||
|
private void processChunkedEncoding() {
|
||||||
|
HttpChunksProcessor processor = new HttpChunksProcessor(packets);
|
||||||
|
processor.processChunkedEncoding();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Попытаться распаковать GZIP из исходящих http пакетов. <br>
|
||||||
|
* GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip
|
||||||
|
* (при этом заголовок HTTP может быть в другом пакете)<br>
|
||||||
|
* Поток заканчивается при обнаружении нового HTTP заголовка,
|
||||||
|
* при смене стороны передачи или при окончании всего стрима
|
||||||
|
*/
|
||||||
|
private void unpackGzip() {
|
||||||
|
final HttpGzipProcessor processor = new HttpGzipProcessor(packets);
|
||||||
|
processor.unpackGzip();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void parseWebSockets() {
|
||||||
|
if (!packets.get(0).getContentString().contains("HTTP/")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final WebSocketsParser parser = new WebSocketsParser(packets);
|
||||||
|
if (!parser.isParsed()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
packets = parser.getParsedPackets();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,8 +1,7 @@
|
|||||||
package ru.serega6531.packmate.service;
|
package ru.serega6531.packmate.service.optimization;
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.ArrayUtils;
|
|
||||||
import org.java_websocket.drafts.Draft_6455;
|
import org.java_websocket.drafts.Draft_6455;
|
||||||
import org.java_websocket.exceptions.InvalidDataException;
|
import org.java_websocket.exceptions.InvalidDataException;
|
||||||
import org.java_websocket.exceptions.InvalidHandshakeException;
|
import org.java_websocket.exceptions.InvalidHandshakeException;
|
||||||
@@ -12,6 +11,7 @@ import org.java_websocket.framing.Framedata;
|
|||||||
import org.java_websocket.handshake.HandshakeImpl1Client;
|
import org.java_websocket.handshake.HandshakeImpl1Client;
|
||||||
import org.java_websocket.handshake.HandshakeImpl1Server;
|
import org.java_websocket.handshake.HandshakeImpl1Server;
|
||||||
import ru.serega6531.packmate.model.Packet;
|
import ru.serega6531.packmate.model.Packet;
|
||||||
|
import ru.serega6531.packmate.utils.PacketUtils;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -110,10 +110,8 @@ public class WebSocketsParser {
|
|||||||
for (List<Packet> side : sides) {
|
for (List<Packet> side : sides) {
|
||||||
final Packet lastPacket = side.get(0);
|
final Packet lastPacket = side.get(0);
|
||||||
|
|
||||||
final byte[] wsContent = side.stream()
|
//noinspection OptionalGetWithoutIsPresent
|
||||||
.map(Packet::getContent)
|
final byte[] wsContent = PacketUtils.mergePackets(side).get();
|
||||||
.reduce(ArrayUtils::addAll)
|
|
||||||
.get();
|
|
||||||
|
|
||||||
final ByteBuffer buffer = ByteBuffer.wrap(wsContent);
|
final ByteBuffer buffer = ByteBuffer.wrap(wsContent);
|
||||||
List<Framedata> frames;
|
List<Framedata> frames;
|
||||||
@@ -177,9 +175,7 @@ public class WebSocketsParser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private String getHandshake(final List<Packet> packets) {
|
private String getHandshake(final List<Packet> packets) {
|
||||||
final String handshake = packets.stream()
|
final String handshake = PacketUtils.mergePackets(packets)
|
||||||
.map(Packet::getContent)
|
|
||||||
.reduce(ArrayUtils::addAll)
|
|
||||||
.map(String::new)
|
.map(String::new)
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
|
|
||||||
19
src/main/java/ru/serega6531/packmate/utils/PacketUtils.java
Normal file
19
src/main/java/ru/serega6531/packmate/utils/PacketUtils.java
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package ru.serega6531.packmate.utils;
|
||||||
|
|
||||||
|
import lombok.experimental.UtilityClass;
|
||||||
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
|
import ru.serega6531.packmate.model.Packet;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@UtilityClass
|
||||||
|
public class PacketUtils {
|
||||||
|
|
||||||
|
public static Optional<byte[]> mergePackets(List<Packet> cut) {
|
||||||
|
return cut.stream()
|
||||||
|
.map(Packet::getContent)
|
||||||
|
.reduce(ArrayUtils::addAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -4,7 +4,9 @@ import org.apache.commons.lang3.ArrayUtils;
|
|||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import ru.serega6531.packmate.model.CtfService;
|
import ru.serega6531.packmate.model.CtfService;
|
||||||
import ru.serega6531.packmate.model.Packet;
|
import ru.serega6531.packmate.model.Packet;
|
||||||
import ru.serega6531.packmate.service.StreamOptimizer;
|
import ru.serega6531.packmate.service.optimization.HttpGzipProcessor;
|
||||||
|
import ru.serega6531.packmate.service.optimization.HttpUrldecodeProcessor;
|
||||||
|
import ru.serega6531.packmate.service.optimization.PacketsMerger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
@@ -21,14 +23,11 @@ class StreamOptimizerTest {
|
|||||||
final byte[] gzipped = Base64.getDecoder().decode(encoded);
|
final byte[] gzipped = Base64.getDecoder().decode(encoded);
|
||||||
final byte[] content = ArrayUtils.addAll("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Length: 26\r\n\r\n".getBytes(), gzipped);
|
final byte[] content = ArrayUtils.addAll("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Length: 26\r\n\r\n".getBytes(), gzipped);
|
||||||
|
|
||||||
CtfService service = new CtfService();
|
|
||||||
service.setUngzipHttp(true);
|
|
||||||
|
|
||||||
Packet p = createPacket(content, false);
|
Packet p = createPacket(content, false);
|
||||||
List<Packet> list = new ArrayList<>();
|
List<Packet> list = new ArrayList<>();
|
||||||
list.add(p);
|
list.add(p);
|
||||||
|
|
||||||
list = new StreamOptimizer(service, list).optimizeStream();
|
new HttpGzipProcessor(list).unpackGzip();
|
||||||
final String processed = list.get(0).getContentString();
|
final String processed = list.get(0).getContentString();
|
||||||
assertTrue(processed.contains("aaabbb"));
|
assertTrue(processed.contains("aaabbb"));
|
||||||
}
|
}
|
||||||
@@ -42,7 +41,7 @@ class StreamOptimizerTest {
|
|||||||
List<Packet> list = new ArrayList<>();
|
List<Packet> list = new ArrayList<>();
|
||||||
list.add(p);
|
list.add(p);
|
||||||
|
|
||||||
list = new StreamOptimizer(service, list).optimizeStream();
|
new HttpUrldecodeProcessor(list).urldecodeRequests();
|
||||||
final String processed = list.get(0).getContentString();
|
final String processed = list.get(0).getContentString();
|
||||||
assertTrue(processed.contains("а б"));
|
assertTrue(processed.contains("а б"));
|
||||||
}
|
}
|
||||||
@@ -67,7 +66,7 @@ class StreamOptimizerTest {
|
|||||||
list.add(p5);
|
list.add(p5);
|
||||||
list.add(p6);
|
list.add(p6);
|
||||||
|
|
||||||
list = new StreamOptimizer(service, list).optimizeStream();
|
new PacketsMerger(list).mergeAdjacentPackets();
|
||||||
|
|
||||||
assertEquals(4, list.size());
|
assertEquals(4, list.size());
|
||||||
assertEquals(2, list.get(1).getContent().length);
|
assertEquals(2, list.get(1).getContent().length);
|
||||||
|
|||||||
Reference in New Issue
Block a user