From b387dc4fe3acea71fca5f0bc3e85ec25713d26a9 Mon Sep 17 00:00:00 2001 From: serega6531 Date: Mon, 3 Feb 2020 03:11:10 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ru/serega6531/packmate/PcapWorker.java | 164 ++++++++++-------- .../packmate/service/ServicesService.java | 13 +- .../packmate/service/StreamService.java | 8 +- 3 files changed, 110 insertions(+), 75 deletions(-) diff --git a/src/main/java/ru/serega6531/packmate/PcapWorker.java b/src/main/java/ru/serega6531/packmate/PcapWorker.java index 5268087..ef28e96 100644 --- a/src/main/java/ru/serega6531/packmate/PcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/PcapWorker.java @@ -19,6 +19,8 @@ import ru.serega6531.packmate.service.StreamService; import javax.annotation.PreDestroy; import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,7 +36,7 @@ public class PcapWorker implements PacketListener { private PcapHandle pcap = null; private final ExecutorService executorService; - private final String localIp; + private final InetAddress localIp; private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic @@ -48,11 +50,14 @@ public class PcapWorker implements PacketListener { public PcapWorker(ServicesService servicesService, StreamService streamService, @Value("${interface-name}") String interfaceName, - @Value("${local-ip}") String localIp) throws PcapNativeException { + @Value("${local-ip}") String localIpString) throws PcapNativeException, UnknownHostException { this.servicesService = servicesService; this.streamService = streamService; - this.localIp = localIp; + this.localIp = InetAddress.getByName(localIpString); + if(!(this.localIp instanceof Inet4Address)) { + throw new IllegalArgumentException("Only ipv4 local ips are supported"); + } BasicThreadFactory factory = new BasicThreadFactory.Builder() .namingPattern("pcap-worker").build(); @@ -60,7 +65,7 @@ public class PcapWorker implements PacketListener { device = Pcaps.getDevByName(interfaceName); } - public void start() throws PcapNativeException { + void start() throws PcapNativeException { log.info("Using interface " + device.getName()); pcap = device.openLive(65536, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 100); @@ -69,6 +74,7 @@ public class PcapWorker implements PacketListener { log.info("Intercept started"); pcap.loop(-1, this); // использовать другой executor? } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); // выходим } catch (Exception e) { log.error("Error while capturing packet", e); @@ -79,7 +85,7 @@ public class PcapWorker implements PacketListener { @PreDestroy @SneakyThrows - public void stop() { + private void stop() { if (pcap != null && pcap.isOpen()) { pcap.breakLoop(); pcap.close(); @@ -89,15 +95,13 @@ public class PcapWorker implements PacketListener { } public void gotPacket(Packet rawPacket) { - Inet4Address sourceIp = null; - Inet4Address destIp = null; - String sourceIpString = null; - String destIpString = null; - int sourcePort = -1; - int destPort = -1; - byte ttl = 0; - byte[] content = null; - Protocol protocol = null; + Inet4Address sourceIp; + Inet4Address destIp; + int sourcePort; + int destPort; + byte ttl; + byte[] content; + Protocol protocol; boolean ack = false; boolean fin = false; boolean rst = false; @@ -106,9 +110,9 @@ public class PcapWorker implements PacketListener { final IpV4Packet.IpV4Header header = rawPacket.get(IpV4Packet.class).getHeader(); sourceIp = header.getSrcAddr(); destIp = header.getDstAddr(); - sourceIpString = header.getSrcAddr().getHostAddress(); - destIpString = header.getDstAddr().getHostAddress(); ttl = header.getTtl(); + } else { + return; } if (rawPacket.contains(TcpPacket.class)) { @@ -128,76 +132,94 @@ public class PcapWorker implements PacketListener { destPort = header.getDstPort().valueAsInt(); content = packet.getPayload() != null ? packet.getPayload().getRawData() : new byte[0]; protocol = Protocol.UDP; + } else { + return; } - if (sourceIpString != null && sourcePort != -1) { - final Optional serviceOptional = - servicesService.findService(localIp, sourceIpString, sourcePort, destIpString, destPort); + String sourceIpString = sourceIp.getHostAddress(); + String destIpString = destIp.getHostAddress(); - if (serviceOptional.isPresent()) { - String sourceIpAndPort = sourceIpString + ":" + sourcePort; - String destIpAndPort = destIpString + ":" + destPort; + final Optional serviceOptional = + servicesService.findService(sourceIp, sourcePort, destIp, destPort); - boolean incoming = destIpString.equals(localIp); + if (serviceOptional.isPresent()) { + String sourceIpAndPort = sourceIpString + ":" + sourcePort; + String destIpAndPort = destIpString + ":" + destPort; - UnfinishedStream stream = new UnfinishedStream(sourceIp, destIp, sourcePort, destPort, protocol); - - ru.serega6531.packmate.model.Packet packet = ru.serega6531.packmate.model.Packet.builder() - .tempId(packetIdCounter++) - .ttl(ttl) - .timestamp(System.currentTimeMillis()) - .incoming(incoming) - .content(content) - .build(); - - if (unfinishedStreams.containsKey(stream)) { - unfinishedStreams.get(stream).add(packet); - } else { - log.debug("Начат новый стрим"); - List packets = new ArrayList<>(); - packets.add(packet); - unfinishedStreams.put(stream, packets); - } + UnfinishedStream stream = addNewPacket(sourceIp, destIp, sourcePort, destPort, ttl, content, protocol); + if (log.isDebugEnabled()) { log.debug("{} {} {}:{} -> {}:{}, номер пакета {}", protocol.name().toLowerCase(), serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort, unfinishedStreams.get(stream).size()); + } - if (protocol == Protocol.TCP) { - if (!fins.containsKey(stream)) { - fins.put(stream, new HashSet<>()); - } - - if (!acks.containsKey(stream)) { - acks.put(stream, new HashSet<>()); - } - - final Set finsForStream = fins.get(stream); - final Set acksForStream = acks.get(stream); - - if (fin) { - finsForStream.add(sourceIpAndPort); - } - - if (ack && finsForStream.contains(destIpAndPort)) { // проверяем destIp, потому что ищем ответ на его fin - acksForStream.add(sourceIpAndPort); - } - - if (rst || (acksForStream.contains(sourceIpAndPort) && acksForStream.contains(destIpAndPort))) { - streamService.saveNewStream(stream, unfinishedStreams.get(stream)); - - unfinishedStreams.remove(stream); - fins.remove(stream); - acks.remove(stream); - } - } - } else { + if (protocol == Protocol.TCP) { // udp не имеет фазы закрытия, поэтому закрываем по таймауту + checkTcpTermination(ack, fin, rst, sourceIpAndPort, destIpAndPort, stream); + } + } else { // сервис не найден + if (log.isTraceEnabled()) { log.trace("{} {}:{} -> {}:{}", protocol.name().toLowerCase(), sourceIpString, sourcePort, destIpString, destPort); } } } - public int closeTimeoutStreams(Protocol protocol, long timeoutMillis) { + private UnfinishedStream addNewPacket(Inet4Address sourceIp, Inet4Address destIp, + int sourcePort, int destPort, byte ttl, byte[] content, Protocol protocol) { + boolean incoming = destIp.equals(localIp); + + UnfinishedStream stream = new UnfinishedStream(sourceIp, destIp, sourcePort, destPort, protocol); + + ru.serega6531.packmate.model.Packet packet = ru.serega6531.packmate.model.Packet.builder() + .tempId(packetIdCounter++) + .ttl(ttl) + .timestamp(System.currentTimeMillis()) + .incoming(incoming) + .content(content) + .build(); + + if (unfinishedStreams.containsKey(stream)) { + unfinishedStreams.get(stream).add(packet); + } else { + log.debug("Начат новый стрим"); + List packets = new ArrayList<>(); + packets.add(packet); + unfinishedStreams.put(stream, packets); + } + return stream; + } + + private void checkTcpTermination(boolean ack, boolean fin, boolean rst, String sourceIpAndPort, String destIpAndPort, UnfinishedStream stream) { + if (!fins.containsKey(stream)) { + fins.put(stream, new HashSet<>()); + } + + if (!acks.containsKey(stream)) { + acks.put(stream, new HashSet<>()); + } + + final Set finsForStream = fins.get(stream); + final Set acksForStream = acks.get(stream); + + if (fin) { + finsForStream.add(sourceIpAndPort); + } + + if (ack && finsForStream.contains(destIpAndPort)) { // проверяем destIp, потому что ищем ответ на его fin + acksForStream.add(sourceIpAndPort); + } + + // если соединение разорвано или закрыто с помощью fin-ack-fin-ack + if (rst || (acksForStream.contains(sourceIpAndPort) && acksForStream.contains(destIpAndPort))) { + streamService.saveNewStream(stream, unfinishedStreams.get(stream)); + + unfinishedStreams.remove(stream); + fins.remove(stream); + acks.remove(stream); + } + } + + int closeTimeoutStreams(Protocol protocol, long timeoutMillis) { int streamsClosed = 0; final Iterator>> iterator = unfinishedStreams.entrySet().iterator(); diff --git a/src/main/java/ru/serega6531/packmate/service/ServicesService.java b/src/main/java/ru/serega6531/packmate/service/ServicesService.java index 2c7a024..d8134fe 100644 --- a/src/main/java/ru/serega6531/packmate/service/ServicesService.java +++ b/src/main/java/ru/serega6531/packmate/service/ServicesService.java @@ -2,12 +2,16 @@ package ru.serega6531.packmate.service; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import ru.serega6531.packmate.model.CtfService; import ru.serega6531.packmate.model.enums.SubscriptionMessageType; import ru.serega6531.packmate.model.pojo.SubscriptionMessage; import ru.serega6531.packmate.repository.ServiceRepository; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -20,18 +24,23 @@ public class ServicesService { private final ServiceRepository repository; private final StreamSubscriptionService subscriptionService; + private final InetAddress localIp; + private final Map services = new HashMap<>(); @Autowired - public ServicesService(ServiceRepository repository, StreamSubscriptionService subscriptionService) { + public ServicesService(ServiceRepository repository, + StreamSubscriptionService subscriptionService, + @Value("${local-ip}") String localIpString) throws UnknownHostException { this.repository = repository; this.subscriptionService = subscriptionService; + this.localIp = InetAddress.getByName(localIpString); repository.findAll().forEach(s -> services.put(s.getPort(), s)); log.info("Loaded {} services", services.size()); } - public Optional findService(String localIp, String firstIp, int firstPort, String secondIp, int secondPort) { + public Optional findService(Inet4Address firstIp, int firstPort, Inet4Address secondIp, int secondPort) { if (firstIp.equals(localIp)) { return findByPort(firstPort); } else if (secondIp.equals(localIp)) { diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 761369f..0ea562f 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -43,7 +43,7 @@ public class StreamService { private final String localIp; private final boolean ignoreEmptyPackets; - private final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; + private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; private final java.util.regex.Pattern userAgentPattern = java.util.regex.Pattern.compile("User-Agent: (.+)\\r\\n"); @Autowired @@ -347,7 +347,11 @@ public class StreamService { private String calculateUserAgentHash(String ua) { char[] alphabet = "abcdefghijklmnopqrstuvwxyz0123456789".toCharArray(); int l = alphabet.length; - final int hash = Math.abs(ua.hashCode()) % (l * l * l); + int hashCode = ua.hashCode(); + if(hashCode == Integer.MIN_VALUE) { + hashCode = Integer.MAX_VALUE; + } + final int hash = Math.abs(hashCode) % (l * l * l); return "" + alphabet[hash % l] + alphabet[(hash / l) % l] + alphabet[(hash / (l * l)) % l]; }