From 733d92cbf82ebd659e38539b1fa76cf87dd7d558 Mon Sep 17 00:00:00 2001 From: serega6531 Date: Mon, 6 Apr 2020 23:10:01 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D1=80=D0=B0=D0=B2=D0=B8=D0=BB=D1=8C?= =?UTF-8?q?=D0=BD=D0=BE=D0=B5=20=D0=B2=D1=80=D0=B5=D0=BC=D1=8F=20=D0=BF?= =?UTF-8?q?=D0=B0=D0=BA=D0=B5=D1=82=D0=B0=20=D0=BF=D1=80=D0=B8=20=D0=BE?= =?UTF-8?q?=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=BA=D0=B5=20pcap=20?= =?UTF-8?q?=D1=84=D0=B0=D0=B9=D0=BB=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../packmate/TimeoutStreamsSaver.java | 2 + .../packmate/controller/PcapController.java | 4 -- .../packmate/pcap/AbstractPcapWorker.java | 46 +++++++++++++------ .../packmate/pcap/FilePcapWorker.java | 16 +++++-- .../packmate/pcap/LivePcapWorker.java | 8 ++-- .../serega6531/packmate/pcap/PcapWorker.java | 9 ++++ 6 files changed, 60 insertions(+), 25 deletions(-) diff --git a/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java b/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java index b4bd9bf..d251fb7 100644 --- a/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java +++ b/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java @@ -3,6 +3,7 @@ package ru.serega6531.packmate; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import ru.serega6531.packmate.model.enums.Protocol; @@ -12,6 +13,7 @@ import java.util.concurrent.TimeUnit; @Component @Slf4j +@ConditionalOnProperty(name = "capture-mode", havingValue = "LIVE") public class TimeoutStreamsSaver { private final PcapWorker pcapWorker; diff --git a/src/main/java/ru/serega6531/packmate/controller/PcapController.java b/src/main/java/ru/serega6531/packmate/controller/PcapController.java index 7199f68..c314975 100644 --- a/src/main/java/ru/serega6531/packmate/controller/PcapController.java +++ b/src/main/java/ru/serega6531/packmate/controller/PcapController.java @@ -24,10 +24,6 @@ public class PcapController { return service.isStarted(); } - public boolean isRunning() { - return true; //TODO - } - @PostMapping("/start") public void start() throws PcapNativeException { service.start(); diff --git a/src/main/java/ru/serega6531/packmate/pcap/AbstractPcapWorker.java b/src/main/java/ru/serega6531/packmate/pcap/AbstractPcapWorker.java index be37015..e211eff 100644 --- a/src/main/java/ru/serega6531/packmate/pcap/AbstractPcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/pcap/AbstractPcapWorker.java @@ -20,6 +20,7 @@ import ru.serega6531.packmate.service.StreamService; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -33,7 +34,10 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener { private final StreamService streamService; protected PcapHandle pcap = null; - protected final ExecutorService listenerExecutorService; + protected final ExecutorService loopExecutorService; + + // во время работы должен быть != null + protected ExecutorService processorExecutorService; private final InetAddress localIp; @@ -58,8 +62,8 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener { } BasicThreadFactory factory = new BasicThreadFactory.Builder() - .namingPattern("pcap-worker-listener").build(); - listenerExecutorService = Executors.newSingleThreadExecutor(factory); + .namingPattern("pcap-loop").build(); + loopExecutorService = Executors.newSingleThreadExecutor(factory); } public void gotPacket(Packet rawPacket) { @@ -67,14 +71,16 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener { return; } + final long time = pcap.getTimestamp().getTime(); + if (rawPacket.contains(TcpPacket.class)) { - gotTcpPacket(rawPacket); + gotTcpPacket(rawPacket, time); } else if (rawPacket.contains(UdpPacket.class)) { - gotUdpPacket(rawPacket); + gotUdpPacket(rawPacket, time); } } - private void gotTcpPacket(Packet rawPacket) { + private void gotTcpPacket(Packet rawPacket, long time) { final IpV4Packet.IpV4Header ipHeader = rawPacket.get(IpV4Packet.class).getHeader(); Inet4Address sourceIp = ipHeader.getSrcAddr(); Inet4Address destIp = ipHeader.getDstAddr(); @@ -96,9 +102,7 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener { servicesService.findService(sourceIp, sourcePort, destIp, destPort); if (serviceOptional.isPresent()) { - final long time = System.currentTimeMillis(); - - listenerExecutorService.execute(() -> { + processorExecutorService.execute(() -> { UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.TCP); if (log.isDebugEnabled()) { @@ -116,7 +120,7 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener { } } - private void gotUdpPacket(Packet rawPacket) { + private void gotUdpPacket(Packet rawPacket, long time) { final IpV4Packet.IpV4Header ipHeader = rawPacket.get(IpV4Packet.class).getHeader(); Inet4Address sourceIp = ipHeader.getSrcAddr(); Inet4Address destIp = ipHeader.getDstAddr(); @@ -135,9 +139,7 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener { servicesService.findService(sourceIp, sourcePort, destIp, destPort); if (serviceOptional.isPresent()) { - final long time = System.currentTimeMillis(); - - listenerExecutorService.execute(() -> { + processorExecutorService.execute(() -> { UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.UDP); if (log.isDebugEnabled()) { @@ -202,9 +204,25 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener { } } + @Override + @SneakyThrows + public void closeAllStreams(Protocol protocol) { + final var streams = (protocol == Protocol.TCP) ? this.unfinishedTcpStreams : this.unfinishedUdpStreams; + + Multimaps.asMap(streams).forEach((key, value) -> + streamService.saveNewStream(key, new ArrayList<>(value))); + + streams.clear(); + if (protocol == Protocol.TCP) { + fins.clear(); + acks.clear(); + } + } + + @Override @SneakyThrows public int closeTimeoutStreams(Protocol protocol, long timeoutMillis) { - return listenerExecutorService.submit(() -> { + return processorExecutorService.submit(() -> { int streamsClosed = 0; final long time = System.currentTimeMillis(); diff --git a/src/main/java/ru/serega6531/packmate/pcap/FilePcapWorker.java b/src/main/java/ru/serega6531/packmate/pcap/FilePcapWorker.java index 6a541f1..f74fea2 100644 --- a/src/main/java/ru/serega6531/packmate/pcap/FilePcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/pcap/FilePcapWorker.java @@ -2,9 +2,11 @@ package ru.serega6531.packmate.pcap; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.tomcat.util.threads.InlineExecutorService; import org.pcap4j.core.PcapNativeException; import org.pcap4j.core.Pcaps; import org.pcap4j.packet.Packet; +import ru.serega6531.packmate.model.enums.Protocol; import ru.serega6531.packmate.service.ServicesService; import ru.serega6531.packmate.service.StreamService; @@ -27,13 +29,19 @@ public class FilePcapWorker extends AbstractPcapWorker { if(!file.exists()) { throw new IllegalArgumentException("File " + file.getAbsolutePath() + " does not exist"); } + + processorExecutorService = new InlineExecutorService(); } @SneakyThrows @Override public void start() { pcap = Pcaps.openOffline(file.getAbsolutePath()); + loopExecutorService.execute(this::runScan); + } + @SneakyThrows + private void runScan() { while (pcap.isOpen()) { try { final Packet packet = pcap.getNextPacketEx(); @@ -42,8 +50,9 @@ public class FilePcapWorker extends AbstractPcapWorker { log.error("Pcap read", e); Thread.sleep(100); } catch (EOFException e) { - log.info("All packets processed"); stop(); + + log.info("All packets processed"); break; } } @@ -53,9 +62,10 @@ public class FilePcapWorker extends AbstractPcapWorker { public void stop() { if (pcap != null && pcap.isOpen()) { pcap.close(); + log.info("Pcap closed"); } - //TODO закрывать все стримы - log.info("Pcap closed"); + closeAllStreams(Protocol.TCP); + closeAllStreams(Protocol.UDP); } } diff --git a/src/main/java/ru/serega6531/packmate/pcap/LivePcapWorker.java b/src/main/java/ru/serega6531/packmate/pcap/LivePcapWorker.java index 0c69a6d..16a38e9 100644 --- a/src/main/java/ru/serega6531/packmate/pcap/LivePcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/pcap/LivePcapWorker.java @@ -10,7 +10,6 @@ import ru.serega6531.packmate.service.ServicesService; import ru.serega6531.packmate.service.StreamService; import java.net.UnknownHostException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @@ -28,15 +27,16 @@ public class LivePcapWorker extends AbstractPcapWorker { if(device == null) { throw new IllegalArgumentException("Device " + interfaceName + " does not exist"); } + + BasicThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern("pcap-processor").build(); + processorExecutorService = Executors.newSingleThreadExecutor(factory); } public void start() throws PcapNativeException { log.info("Using interface " + device.getName()); pcap = device.openLive(65536, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 100); - BasicThreadFactory factory = new BasicThreadFactory.Builder() - .namingPattern("pcap-worker-loop").build(); - ExecutorService loopExecutorService = Executors.newSingleThreadExecutor(factory); try { log.info("Intercept started"); pcap.loop(-1, this, loopExecutorService); diff --git a/src/main/java/ru/serega6531/packmate/pcap/PcapWorker.java b/src/main/java/ru/serega6531/packmate/pcap/PcapWorker.java index 62ee4c6..b79b891 100644 --- a/src/main/java/ru/serega6531/packmate/pcap/PcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/pcap/PcapWorker.java @@ -7,6 +7,15 @@ public interface PcapWorker { void start() throws PcapNativeException; void stop(); + + /** + * Выполняется в вызывающем потоке + */ + void closeAllStreams(Protocol protocol); + + /** + * Выполняется в потоке обработчика + */ int closeTimeoutStreams(Protocol protocol, long timeoutMillis); }