Рефакторинг

This commit is contained in:
serega6531
2020-02-04 00:07:15 +03:00
parent fe8db447ca
commit c59910a0ea
2 changed files with 38 additions and 19 deletions

View File

@@ -21,28 +21,35 @@ import java.util.zip.ZipException;
@AllArgsConstructor @AllArgsConstructor
@Slf4j @Slf4j
public class StreamOptimizer { class StreamOptimizer {
private final CtfService service; private final CtfService service;
private final List<Packet> packets; private final List<Packet> packets;
private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
public void optimizeStream() { /**
* Вызвать для выполнения оптимизаций на переданном списке пакетов.
*/
void optimizeStream() {
if (service.isUngzipHttp()) { if (service.isUngzipHttp()) {
unpackGzip(packets); unpackGzip();
} }
if (service.isUrldecodeHttpRequests()) { if (service.isUrldecodeHttpRequests()) {
urldecodeRequests(packets); urldecodeRequests();
} }
if (service.isMergeAdjacentPackets()) { if (service.isMergeAdjacentPackets()) {
mergeAdjacentPackets(packets); mergeAdjacentPackets();
} }
} }
private void mergeAdjacentPackets(List<Packet> packets) { /**
* Сжать соседние пакеты в одном направлении в один.
* Выполняется после других оптимизаций чтобы правильно определять границы пакетов.
*/
private void mergeAdjacentPackets() {
int start = 0; int start = 0;
int packetsInRow = 0; int packetsInRow = 0;
boolean incoming = true; boolean incoming = true;
@@ -52,7 +59,7 @@ public class StreamOptimizer {
if (packet.isIncoming() != incoming) { if (packet.isIncoming() != incoming) {
if (packetsInRow > 1) { if (packetsInRow > 1) {
final List<Packet> cut = packets.subList(start, i); final List<Packet> cut = packets.subList(start, i);
compress(packets, cut, incoming); compress(cut, incoming);
i++; // продвигаем указатель на следующий после склеенного блок i++; // продвигаем указатель на следующий после склеенного блок
} }
@@ -67,11 +74,14 @@ public class StreamOptimizer {
if (packetsInRow > 1) { if (packetsInRow > 1) {
final List<Packet> cut = packets.subList(start, packets.size()); final List<Packet> cut = packets.subList(start, packets.size());
compress(packets, cut, incoming); compress(cut, incoming);
} }
} }
private void compress(List<Packet> packets, List<Packet> cut, boolean incoming) { /**
* Сжать кусок cut в один пакет
*/
private void compress(List<Packet> cut, boolean incoming) {
final long timestamp = cut.get(0).getTimestamp(); final long timestamp = cut.get(0).getTimestamp();
final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped); final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped);
//noinspection OptionalGetWithoutIsPresent //noinspection OptionalGetWithoutIsPresent
@@ -89,8 +99,11 @@ public class StreamOptimizer {
.build()); .build());
} }
/**
* Декодирование urlencode с http пакета до смены стороны или окончания стрима
*/
@SneakyThrows @SneakyThrows
private void urldecodeRequests(List<Packet> packets) { private void urldecodeRequests() {
boolean httpStarted = false; boolean httpStarted = false;
for (Packet packet : packets) { for (Packet packet : packets) {
@@ -111,9 +124,13 @@ public class StreamOptimizer {
} }
/** /**
* Попытаться распаковать gzip из исходящих http пакетов * Попытаться распаковать GZIP из исходящих http пакетов. <br>
* GZIP поток начинается на найденном HTTP пакете с заголовком Content-Encoding: gzip
* (при этом заголовок HTTP может быть в другом пакете)<br>
* Поток заканчивается при обнаружении нового HTTP заголовка,
* при смене стороны передачи или при окончании всего стрима
*/ */
private void unpackGzip(List<Packet> packets) { private void unpackGzip() {
boolean gzipStarted = false; boolean gzipStarted = false;
int gzipStartPacket = 0; int gzipStartPacket = 0;
int gzipEndPacket; int gzipEndPacket;
@@ -123,7 +140,7 @@ public class StreamOptimizer {
if (packet.isIncoming() && gzipStarted) { // поток gzip закончился if (packet.isIncoming() && gzipStarted) { // поток gzip закончился
gzipEndPacket = i - 1; gzipEndPacket = i - 1;
if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { if(extractGzip(gzipStartPacket, gzipEndPacket)) {
gzipStarted = false; gzipStarted = false;
i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок
} }
@@ -135,7 +152,7 @@ public class StreamOptimizer {
if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток
gzipEndPacket = i - 1; gzipEndPacket = i - 1;
if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { if(extractGzip(gzipStartPacket, gzipEndPacket)) {
gzipStarted = false; gzipStarted = false;
i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок
} }
@@ -153,14 +170,15 @@ public class StreamOptimizer {
} }
if (gzipStarted) { // стрим закончился gzip пакетом if (gzipStarted) { // стрим закончился gzip пакетом
extractGzip(packets, gzipStartPacket, packets.size() - 1); extractGzip(gzipStartPacket, packets.size() - 1);
} }
} }
/** /**
* Попытаться распаковать кусок пакетов с gzip body и вставить результат на их место
* @return получилось ли распаковать * @return получилось ли распаковать
*/ */
private boolean extractGzip(List<Packet> packets, int gzipStartPacket, int gzipEndPacket) { private boolean extractGzip(int gzipStartPacket, int gzipEndPacket) {
List<Packet> cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); List<Packet> cut = packets.subList(gzipStartPacket, gzipEndPacket + 1);
Packet decompressed = decompressGzipPackets(cut); Packet decompressed = decompressGzipPackets(cut);
@@ -173,9 +191,9 @@ public class StreamOptimizer {
return false; return false;
} }
private Packet decompressGzipPackets(List<Packet> packets) { private Packet decompressGzipPackets(List<Packet> cut) {
//noinspection OptionalGetWithoutIsPresent //noinspection OptionalGetWithoutIsPresent
final byte[] content = packets.stream() final byte[] content = cut.stream()
.map(Packet::getContent) .map(Packet::getContent)
.reduce(ArrayUtils::addAll) .reduce(ArrayUtils::addAll)
.get(); .get();
@@ -194,7 +212,7 @@ public class StreamOptimizer {
return Packet.builder() return Packet.builder()
.incoming(false) .incoming(false)
.timestamp(packets.get(0).getTimestamp()) .timestamp(cut.get(0).getTimestamp())
.ungzipped(true) .ungzipped(true)
.content(newContent) .content(newContent)
.build(); .build();

View File

@@ -90,6 +90,7 @@ public class StreamService {
} }
} }
new StreamOptimizer(service, packets).optimizeStream();
processUserAgent(packets, stream); processUserAgent(packets, stream);
Stream savedStream = save(stream); Stream savedStream = save(stream);