Правильное время пакета при обработке pcap файла

This commit is contained in:
serega6531
2020-04-06 23:10:01 +03:00
parent 0391d55e91
commit 733d92cbf8
6 changed files with 60 additions and 25 deletions

View File

@@ -3,6 +3,7 @@ package ru.serega6531.packmate;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import ru.serega6531.packmate.model.enums.Protocol; import ru.serega6531.packmate.model.enums.Protocol;
@@ -12,6 +13,7 @@ import java.util.concurrent.TimeUnit;
@Component @Component
@Slf4j @Slf4j
@ConditionalOnProperty(name = "capture-mode", havingValue = "LIVE")
public class TimeoutStreamsSaver { public class TimeoutStreamsSaver {
private final PcapWorker pcapWorker; private final PcapWorker pcapWorker;

View File

@@ -24,10 +24,6 @@ public class PcapController {
return service.isStarted(); return service.isStarted();
} }
public boolean isRunning() {
return true; //TODO
}
@PostMapping("/start") @PostMapping("/start")
public void start() throws PcapNativeException { public void start() throws PcapNativeException {
service.start(); service.start();

View File

@@ -20,6 +20,7 @@ import ru.serega6531.packmate.service.StreamService;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@@ -33,7 +34,10 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener {
private final StreamService streamService; private final StreamService streamService;
protected PcapHandle pcap = null; protected PcapHandle pcap = null;
protected final ExecutorService listenerExecutorService; protected final ExecutorService loopExecutorService;
// во время работы должен быть != null
protected ExecutorService processorExecutorService;
private final InetAddress localIp; private final InetAddress localIp;
@@ -58,8 +62,8 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener {
} }
BasicThreadFactory factory = new BasicThreadFactory.Builder() BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("pcap-worker-listener").build(); .namingPattern("pcap-loop").build();
listenerExecutorService = Executors.newSingleThreadExecutor(factory); loopExecutorService = Executors.newSingleThreadExecutor(factory);
} }
public void gotPacket(Packet rawPacket) { public void gotPacket(Packet rawPacket) {
@@ -67,14 +71,16 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener {
return; return;
} }
final long time = pcap.getTimestamp().getTime();
if (rawPacket.contains(TcpPacket.class)) { if (rawPacket.contains(TcpPacket.class)) {
gotTcpPacket(rawPacket); gotTcpPacket(rawPacket, time);
} else if (rawPacket.contains(UdpPacket.class)) { } else if (rawPacket.contains(UdpPacket.class)) {
gotUdpPacket(rawPacket); gotUdpPacket(rawPacket, time);
} }
} }
private void gotTcpPacket(Packet rawPacket) { private void gotTcpPacket(Packet rawPacket, long time) {
final IpV4Packet.IpV4Header ipHeader = rawPacket.get(IpV4Packet.class).getHeader(); final IpV4Packet.IpV4Header ipHeader = rawPacket.get(IpV4Packet.class).getHeader();
Inet4Address sourceIp = ipHeader.getSrcAddr(); Inet4Address sourceIp = ipHeader.getSrcAddr();
Inet4Address destIp = ipHeader.getDstAddr(); Inet4Address destIp = ipHeader.getDstAddr();
@@ -96,9 +102,7 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener {
servicesService.findService(sourceIp, sourcePort, destIp, destPort); servicesService.findService(sourceIp, sourcePort, destIp, destPort);
if (serviceOptional.isPresent()) { if (serviceOptional.isPresent()) {
final long time = System.currentTimeMillis(); processorExecutorService.execute(() -> {
listenerExecutorService.execute(() -> {
UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.TCP); UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.TCP);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@@ -116,7 +120,7 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener {
} }
} }
private void gotUdpPacket(Packet rawPacket) { private void gotUdpPacket(Packet rawPacket, long time) {
final IpV4Packet.IpV4Header ipHeader = rawPacket.get(IpV4Packet.class).getHeader(); final IpV4Packet.IpV4Header ipHeader = rawPacket.get(IpV4Packet.class).getHeader();
Inet4Address sourceIp = ipHeader.getSrcAddr(); Inet4Address sourceIp = ipHeader.getSrcAddr();
Inet4Address destIp = ipHeader.getDstAddr(); Inet4Address destIp = ipHeader.getDstAddr();
@@ -135,9 +139,7 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener {
servicesService.findService(sourceIp, sourcePort, destIp, destPort); servicesService.findService(sourceIp, sourcePort, destIp, destPort);
if (serviceOptional.isPresent()) { if (serviceOptional.isPresent()) {
final long time = System.currentTimeMillis(); processorExecutorService.execute(() -> {
listenerExecutorService.execute(() -> {
UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.UDP); UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.UDP);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@@ -202,9 +204,25 @@ public abstract class AbstractPcapWorker implements PcapWorker, PacketListener {
} }
} }
@Override
@SneakyThrows
public void closeAllStreams(Protocol protocol) {
final var streams = (protocol == Protocol.TCP) ? this.unfinishedTcpStreams : this.unfinishedUdpStreams;
Multimaps.asMap(streams).forEach((key, value) ->
streamService.saveNewStream(key, new ArrayList<>(value)));
streams.clear();
if (protocol == Protocol.TCP) {
fins.clear();
acks.clear();
}
}
@Override
@SneakyThrows @SneakyThrows
public int closeTimeoutStreams(Protocol protocol, long timeoutMillis) { public int closeTimeoutStreams(Protocol protocol, long timeoutMillis) {
return listenerExecutorService.submit(() -> { return processorExecutorService.submit(() -> {
int streamsClosed = 0; int streamsClosed = 0;
final long time = System.currentTimeMillis(); final long time = System.currentTimeMillis();

View File

@@ -2,9 +2,11 @@ package ru.serega6531.packmate.pcap;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.threads.InlineExecutorService;
import org.pcap4j.core.PcapNativeException; import org.pcap4j.core.PcapNativeException;
import org.pcap4j.core.Pcaps; import org.pcap4j.core.Pcaps;
import org.pcap4j.packet.Packet; import org.pcap4j.packet.Packet;
import ru.serega6531.packmate.model.enums.Protocol;
import ru.serega6531.packmate.service.ServicesService; import ru.serega6531.packmate.service.ServicesService;
import ru.serega6531.packmate.service.StreamService; import ru.serega6531.packmate.service.StreamService;
@@ -27,13 +29,19 @@ public class FilePcapWorker extends AbstractPcapWorker {
if(!file.exists()) { if(!file.exists()) {
throw new IllegalArgumentException("File " + file.getAbsolutePath() + " does not exist"); throw new IllegalArgumentException("File " + file.getAbsolutePath() + " does not exist");
} }
processorExecutorService = new InlineExecutorService();
} }
@SneakyThrows @SneakyThrows
@Override @Override
public void start() { public void start() {
pcap = Pcaps.openOffline(file.getAbsolutePath()); pcap = Pcaps.openOffline(file.getAbsolutePath());
loopExecutorService.execute(this::runScan);
}
@SneakyThrows
private void runScan() {
while (pcap.isOpen()) { while (pcap.isOpen()) {
try { try {
final Packet packet = pcap.getNextPacketEx(); final Packet packet = pcap.getNextPacketEx();
@@ -42,8 +50,9 @@ public class FilePcapWorker extends AbstractPcapWorker {
log.error("Pcap read", e); log.error("Pcap read", e);
Thread.sleep(100); Thread.sleep(100);
} catch (EOFException e) { } catch (EOFException e) {
log.info("All packets processed");
stop(); stop();
log.info("All packets processed");
break; break;
} }
} }
@@ -53,9 +62,10 @@ public class FilePcapWorker extends AbstractPcapWorker {
public void stop() { public void stop() {
if (pcap != null && pcap.isOpen()) { if (pcap != null && pcap.isOpen()) {
pcap.close(); pcap.close();
log.info("Pcap closed");
} }
//TODO закрывать все стримы closeAllStreams(Protocol.TCP);
log.info("Pcap closed"); closeAllStreams(Protocol.UDP);
} }
} }

View File

@@ -10,7 +10,6 @@ import ru.serega6531.packmate.service.ServicesService;
import ru.serega6531.packmate.service.StreamService; import ru.serega6531.packmate.service.StreamService;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@Slf4j @Slf4j
@@ -28,15 +27,16 @@ public class LivePcapWorker extends AbstractPcapWorker {
if(device == null) { if(device == null) {
throw new IllegalArgumentException("Device " + interfaceName + " does not exist"); throw new IllegalArgumentException("Device " + interfaceName + " does not exist");
} }
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("pcap-processor").build();
processorExecutorService = Executors.newSingleThreadExecutor(factory);
} }
public void start() throws PcapNativeException { public void start() throws PcapNativeException {
log.info("Using interface " + device.getName()); log.info("Using interface " + device.getName());
pcap = device.openLive(65536, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 100); pcap = device.openLive(65536, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 100);
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("pcap-worker-loop").build();
ExecutorService loopExecutorService = Executors.newSingleThreadExecutor(factory);
try { try {
log.info("Intercept started"); log.info("Intercept started");
pcap.loop(-1, this, loopExecutorService); pcap.loop(-1, this, loopExecutorService);

View File

@@ -7,6 +7,15 @@ public interface PcapWorker {
void start() throws PcapNativeException; void start() throws PcapNativeException;
void stop(); void stop();
/**
* Выполняется в вызывающем потоке
*/
void closeAllStreams(Protocol protocol);
/**
* Выполняется в потоке обработчика
*/
int closeTimeoutStreams(Protocol protocol, long timeoutMillis); int closeTimeoutStreams(Protocol protocol, long timeoutMillis);
} }