From 15e3521e0f52ffff7c19f8cbe0ed7441fa7e9e03 Mon Sep 17 00:00:00 2001 From: serega6531 Date: Mon, 3 Feb 2020 03:44:30 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ru/serega6531/packmate/PcapWorker.java | 24 +++++++++---------- .../packmate/service/StreamService.java | 13 ++++------ .../ru/serega6531/packmate/utils/Bytes.java | 3 +++ 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/java/ru/serega6531/packmate/PcapWorker.java b/src/main/java/ru/serega6531/packmate/PcapWorker.java index ef28e96..4f52619 100644 --- a/src/main/java/ru/serega6531/packmate/PcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/PcapWorker.java @@ -3,6 +3,7 @@ package ru.serega6531.packmate; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.pcap4j.core.*; import org.pcap4j.packet.IpV4Packet; import org.pcap4j.packet.Packet; @@ -42,9 +43,9 @@ public class PcapWorker implements PacketListener { private final Map> unfinishedStreams = new HashMap<>(); - // в следующих мапах в сетах srcIp соответствующего пакета - private final Map> fins = new HashMap<>(); - private final Map> acks = new HashMap<>(); + // в следующих мапах в Set находится srcIp соответствующего пакета + private final Map>> fins = new HashMap<>(); + private final Map>> acks = new HashMap<>(); @Autowired public PcapWorker(ServicesService servicesService, @@ -55,7 +56,7 @@ public class PcapWorker implements PacketListener { this.streamService = streamService; this.localIp = InetAddress.getByName(localIpString); - if(!(this.localIp instanceof Inet4Address)) { + if (!(this.localIp instanceof Inet4Address)) { throw new IllegalArgumentException("Only ipv4 local ips are supported"); } @@ -143,9 +144,6 @@ public class PcapWorker implements PacketListener { servicesService.findService(sourceIp, sourcePort, destIp, destPort); if (serviceOptional.isPresent()) { - String sourceIpAndPort = sourceIpString + ":" + sourcePort; - String destIpAndPort = destIpString + ":" + destPort; - UnfinishedStream stream = addNewPacket(sourceIp, destIp, sourcePort, destPort, ttl, content, protocol); if (log.isDebugEnabled()) { @@ -155,7 +153,7 @@ public class PcapWorker implements PacketListener { } if (protocol == Protocol.TCP) { // udp не имеет фазы закрытия, поэтому закрываем по таймауту - checkTcpTermination(ack, fin, rst, sourceIpAndPort, destIpAndPort, stream); + checkTcpTermination(ack, fin, rst, new ImmutablePair<>(sourceIp, sourcePort), new ImmutablePair<>(destIp, destPort), stream); } } else { // сервис не найден if (log.isTraceEnabled()) { @@ -189,7 +187,9 @@ public class PcapWorker implements PacketListener { return stream; } - private void checkTcpTermination(boolean ack, boolean fin, boolean rst, String sourceIpAndPort, String destIpAndPort, UnfinishedStream stream) { + private void checkTcpTermination(boolean ack, boolean fin, boolean rst, + ImmutablePair sourceIpAndPort, ImmutablePair destIpAndPort, + UnfinishedStream stream) { if (!fins.containsKey(stream)) { fins.put(stream, new HashSet<>()); } @@ -198,8 +198,8 @@ public class PcapWorker implements PacketListener { acks.put(stream, new HashSet<>()); } - final Set finsForStream = fins.get(stream); - final Set acksForStream = acks.get(stream); + final Set> finsForStream = fins.get(stream); + final Set> acksForStream = acks.get(stream); if (fin) { finsForStream.add(sourceIpAndPort); @@ -209,7 +209,7 @@ public class PcapWorker implements PacketListener { acksForStream.add(sourceIpAndPort); } - // если соединение разорвано или закрыто с помощью fin-ack-fin-ack + // если соединение разорвано с помощью rst или закрыто с помощью fin-ack-fin-ack if (rst || (acksForStream.contains(sourceIpAndPort) && acksForStream.contains(destIpAndPort))) { streamService.saveNewStream(stream, unfinishedStreams.get(stream)); diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 0ea562f..1d1ec7a 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -40,7 +40,6 @@ public class StreamService { private final PacketService packetService; private final StreamSubscriptionService subscriptionService; - private final String localIp; private final boolean ignoreEmptyPackets; private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; @@ -59,7 +58,6 @@ public class StreamService { this.servicesService = servicesService; this.packetService = packetService; this.subscriptionService = subscriptionService; - this.localIp = localIp; this.ignoreEmptyPackets = ignoreEmptyPackets; } @@ -69,10 +67,9 @@ public class StreamService { @Transactional public boolean saveNewStream(UnfinishedStream unfinishedStream, List packets) { final Optional serviceOptional = servicesService.findService( - localIp, - unfinishedStream.getFirstIp().getHostAddress(), + unfinishedStream.getFirstIp(), unfinishedStream.getFirstPort(), - unfinishedStream.getSecondIp().getHostAddress(), + unfinishedStream.getSecondIp(), unfinishedStream.getSecondPort() ); @@ -175,7 +172,7 @@ public class StreamService { final List cut = packets.subList(start, i); compress(packets, cut, incoming); - i++; + i++; // продвигаем указатель на следующий после склеенного блок } start = i; packetsInRow = 1; @@ -246,7 +243,7 @@ public class StreamService { gzipEndPacket = i - 1; if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { gzipStarted = false; - i = gzipStartPacket + 1; + i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок } } else if (!packet.isIncoming()) { String content = new String(packet.getContent()); @@ -258,7 +255,7 @@ public class StreamService { gzipEndPacket = i - 1; if(extractGzip(packets, gzipStartPacket, gzipEndPacket)) { gzipStarted = false; - i = gzipStartPacket + 1; + i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок } } diff --git a/src/main/java/ru/serega6531/packmate/utils/Bytes.java b/src/main/java/ru/serega6531/packmate/utils/Bytes.java index 1d8b647..47e88f1 100644 --- a/src/main/java/ru/serega6531/packmate/utils/Bytes.java +++ b/src/main/java/ru/serega6531/packmate/utils/Bytes.java @@ -1,5 +1,8 @@ package ru.serega6531.packmate.utils; +import lombok.experimental.UtilityClass; + +@UtilityClass public class Bytes { public static int indexOf(byte[] array, byte[] target) {