diff --git a/src/main/java/ru/serega6531/packmate/PcapWorker.java b/src/main/java/ru/serega6531/packmate/PcapWorker.java index 4f52619..48e432d 100644 --- a/src/main/java/ru/serega6531/packmate/PcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/PcapWorker.java @@ -190,16 +190,8 @@ public class PcapWorker implements PacketListener { private void checkTcpTermination(boolean ack, boolean fin, boolean rst, ImmutablePair sourceIpAndPort, ImmutablePair destIpAndPort, UnfinishedStream stream) { - if (!fins.containsKey(stream)) { - fins.put(stream, new HashSet<>()); - } - - if (!acks.containsKey(stream)) { - acks.put(stream, new HashSet<>()); - } - - final Set> finsForStream = fins.get(stream); - final Set> acksForStream = acks.get(stream); + final Set> finsForStream = fins.computeIfAbsent(stream, k -> new HashSet<>()); + final Set> acksForStream = acks.computeIfAbsent(stream, k -> new HashSet<>()); if (fin) { finsForStream.add(sourceIpAndPort); diff --git a/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java b/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java new file mode 100644 index 0000000..ef79376 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java @@ -0,0 +1,210 @@ +package ru.serega6531.packmate.service; + +import lombok.AllArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.ArrayUtils; +import ru.serega6531.packmate.model.CtfService; +import ru.serega6531.packmate.model.Packet; +import ru.serega6531.packmate.utils.Bytes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipException; + +@AllArgsConstructor +@Slf4j +public class StreamOptimizer { + + private final CtfService service; + private final List packets; + + private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; + + public void optimizeStream() { + if (service.isUngzipHttp()) { + unpackGzip(packets); + } + + if (service.isUrldecodeHttpRequests()) { + urldecodeRequests(packets); + } + + if (service.isMergeAdjacentPackets()) { + mergeAdjacentPackets(packets); + } + } + + private void mergeAdjacentPackets(List packets) { + int start = 0; + int packetsInRow = 0; + boolean incoming = true; + + for (int i = 0; i < packets.size(); i++) { + Packet packet = packets.get(i); + if (packet.isIncoming() != incoming) { + if (packetsInRow > 1) { + final List cut = packets.subList(start, i); + compress(packets, cut, incoming); + + i++; // продвигаем указатель на следующий после склеенного блок + } + start = i; + packetsInRow = 1; + } else { + packetsInRow++; + } + + incoming = packet.isIncoming(); + } + + if (packetsInRow > 1) { + final List cut = packets.subList(start, packets.size()); + compress(packets, cut, incoming); + } + } + + private void compress(List packets, List cut, boolean incoming) { + final long timestamp = cut.get(0).getTimestamp(); + final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped); + //noinspection OptionalGetWithoutIsPresent + final byte[] content = cut.stream() + .map(Packet::getContent) + .reduce(ArrayUtils::addAll) + .get(); + + packets.removeAll(cut); + packets.add(Packet.builder() + .incoming(incoming) + .timestamp(timestamp) + .ungzipped(ungzipped) + .content(content) + .build()); + } + + @SneakyThrows + private void urldecodeRequests(List packets) { + boolean httpStarted = false; + + for (Packet packet : packets) { + if (packet.isIncoming()) { + String content = new String(packet.getContent()); + if (content.startsWith("HTTP/")) { + httpStarted = true; + } + + if (httpStarted) { + content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString()); + packet.setContent(content.getBytes()); + } + } else { + httpStarted = false; + } + } + } + + /** + * Попытаться распаковать gzip из исходящих http пакетов + */ + private void unpackGzip(List packets) { + boolean gzipStarted = false; + int gzipStartPacket = 0; + int gzipEndPacket; + + for (int i = 0; i < packets.size(); i++) { + Packet packet = packets.get(i); + + if (packet.isIncoming() && gzipStarted) { // поток gzip закончился + gzipEndPacket = i - 1; + if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { + gzipStarted = false; + i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок + } + } else if (!packet.isIncoming()) { + String content = new String(packet.getContent()); + + int contentPos = content.indexOf("\r\n\r\n"); + boolean http = content.startsWith("HTTP/"); + + if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток + gzipEndPacket = i - 1; + if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { + gzipStarted = false; + i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок + } + } + + if (contentPos != -1) { // начало body + String headers = content.substring(0, contentPos); + boolean gziped = headers.contains("Content-Encoding: gzip\r\n"); + if (gziped) { + gzipStarted = true; + gzipStartPacket = i; + } + } + } + } + + if (gzipStarted) { // стрим закончился gzip пакетом + extractGzip(packets, gzipStartPacket, packets.size() - 1); + } + } + + /** + * @return получилось ли распаковать + */ + private boolean extractGzip(List packets, int gzipStartPacket, int gzipEndPacket) { + List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); + + Packet decompressed = decompressGzipPackets(cut); + if (decompressed != null) { + packets.removeAll(cut); + packets.add(gzipStartPacket, decompressed); + return true; + } + + return false; + } + + private Packet decompressGzipPackets(List packets) { + //noinspection OptionalGetWithoutIsPresent + final byte[] content = packets.stream() + .map(Packet::getContent) + .reduce(ArrayUtils::addAll) + .get(); + + final int gzipStart = Bytes.indexOf(content, GZIP_HEADER); + byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart); + byte[] gzipBytes = Arrays.copyOfRange(content, gzipStart, content.length); + + try { + final GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(gzipBytes)); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IOUtils.copy(gzipStream, out); + byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray()); + + log.debug("Разархивирован gzip: {} -> {} байт", gzipBytes.length, out.size()); + + return Packet.builder() + .incoming(false) + .timestamp(packets.get(0).getTimestamp()) + .ungzipped(true) + .content(newContent) + .build(); + } catch (ZipException e) { + log.warn("Не удалось разархивировать gzip, оставляем как есть", e); + } catch (IOException e) { + log.error("decompress gzip", e); + } + + return null; + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 1d1ec7a..2eb5af3 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -1,9 +1,6 @@ package ru.serega6531.packmate.service; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.ArrayUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.PageRequest; @@ -17,18 +14,13 @@ import ru.serega6531.packmate.model.pojo.Pagination; import ru.serega6531.packmate.model.pojo.SubscriptionMessage; import ru.serega6531.packmate.model.pojo.UnfinishedStream; import ru.serega6531.packmate.repository.StreamRepository; -import ru.serega6531.packmate.utils.Bytes; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.URLDecoder; -import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.regex.Matcher; import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; -import java.util.zip.ZipException; @Service @Slf4j @@ -42,7 +34,6 @@ public class StreamService { private final boolean ignoreEmptyPackets; - private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; private final java.util.regex.Pattern userAgentPattern = java.util.regex.Pattern.compile("User-Agent: (.+)\\r\\n"); @Autowired @@ -51,7 +42,6 @@ public class StreamService { ServicesService servicesService, PacketService packetService, StreamSubscriptionService subscriptionService, - @Value("${local-ip}") String localIp, @Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) { this.repository = repository; this.patternService = patternService; @@ -100,7 +90,6 @@ public class StreamService { } } - optimizeStream(packets, service); processUserAgent(packets, stream); Stream savedStream = save(stream); @@ -114,20 +103,6 @@ public class StreamService { return true; } - private void optimizeStream(List packets, CtfService service) { - if (service.isUngzipHttp()) { - unpackGzip(packets); - } - - if (service.isUrldecodeHttpRequests()) { - urldecodeRequests(packets); - } - - if (service.isMergeAdjacentPackets()) { - mergeAdjacentPackets(packets); - } - } - private void processUserAgent(List packets, Stream stream) { String ua = null; for (Packet packet : packets) { @@ -144,6 +119,17 @@ public class StreamService { } } + private String calculateUserAgentHash(String ua) { + char[] alphabet = "abcdefghijklmnopqrstuvwxyz0123456789".toCharArray(); + int l = alphabet.length; + int hashCode = ua.hashCode(); + if(hashCode == Integer.MIN_VALUE) { // abs(MIN_VALUE) вернет то же значение + hashCode = Integer.MAX_VALUE; + } + final int hash = Math.abs(hashCode) % (l * l * l); + return "" + alphabet[hash % l] + alphabet[(hash / l) % l] + alphabet[(hash / (l * l)) % l]; + } + private Set getFoundPatterns(List packets, Stream savedStream) { Set foundPatterns = new HashSet<>(); @@ -160,171 +146,6 @@ public class StreamService { return foundPatterns; } - private void mergeAdjacentPackets(List packets) { - int start = 0; - int packetsInRow = 0; - boolean incoming = true; - - for (int i = 0; i < packets.size(); i++) { - Packet packet = packets.get(i); - if (packet.isIncoming() != incoming) { - if (packetsInRow > 1) { - final List cut = packets.subList(start, i); - compress(packets, cut, incoming); - - i++; // продвигаем указатель на следующий после склеенного блок - } - start = i; - packetsInRow = 1; - } else { - packetsInRow++; - } - - incoming = packet.isIncoming(); - } - - if (packetsInRow > 1) { - final List cut = packets.subList(start, packets.size()); - compress(packets, cut, incoming); - } - } - - private void compress(List packets, List cut, boolean incoming) { - final long timestamp = cut.get(0).getTimestamp(); - final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped); - //noinspection OptionalGetWithoutIsPresent - final byte[] content = cut.stream() - .map(Packet::getContent) - .reduce(ArrayUtils::addAll) - .get(); - - packets.removeAll(cut); - packets.add(Packet.builder() - .incoming(incoming) - .timestamp(timestamp) - .ungzipped(ungzipped) - .content(content) - .build()); - } - - @SneakyThrows - private void urldecodeRequests(List packets) { - boolean httpStarted = false; - - for (Packet packet : packets) { - if (packet.isIncoming()) { - String content = new String(packet.getContent()); - if (content.startsWith("HTTP/")) { - httpStarted = true; - } - - if (httpStarted) { - content = URLDecoder.decode(content, StandardCharsets.UTF_8.toString()); - packet.setContent(content.getBytes()); - } - } else { - httpStarted = false; - } - } - } - - /** - * Попытаться распаковать gzip из исходящих http пакетов - */ - private void unpackGzip(List packets) { - boolean gzipStarted = false; - int gzipStartPacket = 0; - int gzipEndPacket; - - for (int i = 0; i < packets.size(); i++) { - Packet packet = packets.get(i); - - if (packet.isIncoming() && gzipStarted) { // поток gzip закончился - gzipEndPacket = i - 1; - if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { - gzipStarted = false; - i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок - } - } else if (!packet.isIncoming()) { - String content = new String(packet.getContent()); - - int contentPos = content.indexOf("\r\n\r\n"); - boolean http = content.startsWith("HTTP/"); - - if (http && gzipStarted) { // начался новый http пакет, заканчиваем старый gzip поток - gzipEndPacket = i - 1; - if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { - gzipStarted = false; - i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок - } - } - - if (contentPos != -1) { // начало body - String headers = content.substring(0, contentPos); - boolean gziped = headers.contains("Content-Encoding: gzip\r\n"); - if (gziped) { - gzipStarted = true; - gzipStartPacket = i; - } - } - } - } - - if (gzipStarted) { // стрим закончился gzip пакетом - extractGzip(packets, gzipStartPacket, packets.size() - 1); - } - } - - /** - * @return получилось ли распаковать - */ - private boolean extractGzip(List packets, int gzipStartPacket, int gzipEndPacket) { - List cut = packets.subList(gzipStartPacket, gzipEndPacket + 1); - - Packet decompressed = decompressGzipPackets(cut); - if (decompressed != null) { - packets.removeAll(cut); - packets.add(gzipStartPacket, decompressed); - return true; - } - - return false; - } - - private Packet decompressGzipPackets(List packets) { - //noinspection OptionalGetWithoutIsPresent - final byte[] content = packets.stream() - .map(Packet::getContent) - .reduce(ArrayUtils::addAll) - .get(); - - final int gzipStart = Bytes.indexOf(content, GZIP_HEADER); - byte[] httpHeader = Arrays.copyOfRange(content, 0, gzipStart); - byte[] gzipBytes = Arrays.copyOfRange(content, gzipStart, content.length); - - try { - final GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(gzipBytes)); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - IOUtils.copy(gzipStream, out); - byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray()); - - log.debug("Разархивирован gzip: {} -> {} байт", gzipBytes.length, out.size()); - - return Packet.builder() - .incoming(false) - .timestamp(packets.get(0).getTimestamp()) - .ungzipped(true) - .content(newContent) - .build(); - } catch (ZipException e) { - log.warn("Не удалось разархивировать gzip, оставляем как есть", e); - } catch (IOException e) { - log.error("decompress gzip", e); - } - - return null; - } - public Stream save(Stream stream) { Stream saved; if (stream.getId() == null) { @@ -341,17 +162,6 @@ public class StreamService { return repository.findById(id); } - private String calculateUserAgentHash(String ua) { - char[] alphabet = "abcdefghijklmnopqrstuvwxyz0123456789".toCharArray(); - int l = alphabet.length; - int hashCode = ua.hashCode(); - if(hashCode == Integer.MIN_VALUE) { - hashCode = Integer.MAX_VALUE; - } - final int hash = Math.abs(hashCode) % (l * l * l); - return "" + alphabet[hash % l] + alphabet[(hash / l) % l] + alphabet[(hash / (l * l)) % l]; - } - @SuppressWarnings("UnusedReturnValue") @Transactional public void setFavorite(long id, boolean favorite) {