diff --git a/src/main/java/ru/serega6531/packmate/PcapWorker.java b/src/main/java/ru/serega6531/packmate/PcapWorker.java index fc36f32..1adf3a5 100644 --- a/src/main/java/ru/serega6531/packmate/PcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/PcapWorker.java @@ -45,7 +45,8 @@ public class PcapWorker implements PacketListener { private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic - private final ListMultimap unfinishedStreams = ArrayListMultimap.create(); + private final ListMultimap unfinishedTcpStreams = ArrayListMultimap.create(); + private final ListMultimap unfinishedUdpStreams = ArrayListMultimap.create(); // в следующих мапах в значениях находится srcIp соответствующего пакета private final SetMultimap> fins = HashMultimap.create(); @@ -142,7 +143,7 @@ public class PcapWorker implements PacketListener { if (log.isDebugEnabled()) { log.debug("tcp {} {}:{} -> {}:{}, номер пакета {}", serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort, - unfinishedStreams.get(stream).size()); + unfinishedTcpStreams.get(stream).size()); } checkTcpTermination(ack, fin, rst, new ImmutablePair<>(sourceIp, sourcePort), new ImmutablePair<>(destIp, destPort), stream); @@ -179,7 +180,7 @@ public class PcapWorker implements PacketListener { if (log.isDebugEnabled()) { log.debug("udp {} {}:{} -> {}:{}, номер пакета {}", serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort, - unfinishedStreams.get(stream).size()); + unfinishedUdpStreams.get(stream).size()); } }); } else { // сервис не найден @@ -203,11 +204,14 @@ public class PcapWorker implements PacketListener { .content(content) .build(); - if (!unfinishedStreams.containsKey(stream)) { + final ListMultimap streams = + (protocol == Protocol.TCP) ? this.unfinishedTcpStreams : this.unfinishedUdpStreams; + + if (!streams.containsKey(stream)) { log.debug("Начат новый стрим"); } - unfinishedStreams.put(stream, packet); + streams.put(stream, packet); return stream; } @@ -229,9 +233,9 @@ public class PcapWorker implements PacketListener { // если соединение разорвано с помощью rst или закрыто с помощью fin-ack-fin-ack if (rst || (acks.containsEntry(stream, sourceIpAndPort) && acks.containsEntry(stream, destIpAndPort))) { - streamService.saveNewStream(stream, unfinishedStreams.get(stream)); + streamService.saveNewStream(stream, unfinishedTcpStreams.get(stream)); - unfinishedStreams.removeAll(stream); + unfinishedTcpStreams.removeAll(stream); fins.removeAll(stream); acks.removeAll(stream); } @@ -243,13 +247,15 @@ public class PcapWorker implements PacketListener { int streamsClosed = 0; final long time = System.currentTimeMillis(); + final ListMultimap streams = + (protocol == Protocol.TCP) ? this.unfinishedTcpStreams : this.unfinishedUdpStreams; + final Map> oldStreams = - Multimaps.asMap(unfinishedStreams).entrySet().stream() + Multimaps.asMap(streams).entrySet().stream() .filter(entry -> { final List packets = entry.getValue(); return time - packets.get(packets.size() - 1).getTimestamp() > timeoutMillis; }) - .filter(entry -> entry.getKey().getProtocol() == protocol) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); for (Map.Entry> entry : oldStreams.entrySet()) { @@ -265,7 +271,7 @@ public class PcapWorker implements PacketListener { acks.removeAll(stream); } - unfinishedStreams.removeAll(stream); + streams.removeAll(stream); } return streamsClosed; diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index b68e603..3c188bb 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -70,6 +70,15 @@ public class StreamService { } CtfService service = serviceOptional.get(); + if (ignoreEmptyPackets) { + packets.removeIf(packet -> packet.getContent().length == 0); + + if (packets.isEmpty()) { + log.debug("Стрим состоит только из пустых пакетов и не будет сохранен"); + return false; + } + } + Optional firstIncoming = packets.stream() .filter(Packet::isIncoming) .findFirst(); @@ -81,15 +90,6 @@ public class StreamService { stream.setEndTimestamp(packets.get(packets.size() - 1).getTimestamp()); stream.setService(service.getPort()); - if (ignoreEmptyPackets) { - packets.removeIf(packet -> packet.getContent().length == 0); - - if (packets.isEmpty()) { - log.debug("Стрим состоит только из пустых пакетов и не будет сохранен"); - return false; - } - } - new StreamOptimizer(service, packets).optimizeStream(); processUserAgent(packets, stream);