From bbe4eaf56e0797c6f8c44edf4a015d7b6d33acca Mon Sep 17 00:00:00 2001 From: serega6531 Date: Mon, 29 Apr 2019 01:59:42 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=20=D0=BD?= =?UTF-8?q?=D0=B0=D0=B4=20=D1=80=D0=B0=D0=B7=D0=B4=D0=B5=D0=BB=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5=D0=BC=20=D0=BF=D0=B0=D0=BA=D0=B5=D1=82=D0=BE=D0=B2?= =?UTF-8?q?=20=D0=BD=D0=B0=20=D1=81=D1=82=D1=80=D0=B8=D0=BC=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ru/serega6531/packmate/PcapWorker.java | 76 ++++++++++++++++--- .../ru/serega6531/packmate/model/Packet.java | 8 ++ .../packmate/model/UnfinishedStream.java | 11 +++ 3 files changed, 83 insertions(+), 12 deletions(-) diff --git a/src/main/java/ru/serega6531/packmate/PcapWorker.java b/src/main/java/ru/serega6531/packmate/PcapWorker.java index cbcb01d..fa651bf 100644 --- a/src/main/java/ru/serega6531/packmate/PcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/PcapWorker.java @@ -21,9 +21,8 @@ import ru.serega6531.packmate.service.ServicesService; import ru.serega6531.packmate.service.StreamService; import javax.annotation.PreDestroy; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; +import java.net.Inet4Address; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; @@ -43,7 +42,13 @@ public class PcapWorker { private final String localIp; - private final Set unfinishedStreams = new HashSet<>(); + private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic + + private final Map> unfinishedStreams = new HashMap<>(); + + // в следующих мапах в листах srcIp соответствующего пакета + private final Map> fins = new HashMap<>(); + private final Map> acks = new HashMap<>(); @Autowired public PcapWorker(ServicesService servicesService, @@ -100,16 +105,23 @@ public class PcapWorker { } private void processPacket(Packet rawPacket) { - String sourceIp = null; - String destIp = null; + Inet4Address sourceIp = null; + Inet4Address destIp = null; + String sourceIpString = null; + String destIpString = null; int sourcePort = -1; int destPort = -1; byte[] content = null; + Protocol protocol = null; + boolean ack = false; + boolean fin = false; if(rawPacket.contains(IpV4Packet.class)){ final IpV4Packet.IpV4Header header = rawPacket.get(IpV4Packet.class).getHeader(); - sourceIp = header.getSrcAddr().getHostAddress(); - destIp = header.getDstAddr().getHostAddress(); + sourceIp = header.getSrcAddr(); + destIp = header.getDstAddr(); + sourceIpString = header.getSrcAddr().getHostAddress(); + destIpString = header.getDstAddr().getHostAddress(); } if(rawPacket.contains(TcpPacket.class)) { @@ -117,26 +129,66 @@ public class PcapWorker { final TcpPacket.TcpHeader header = packet.getHeader(); sourcePort = header.getSrcPort().valueAsInt(); destPort = header.getDstPort().valueAsInt(); + ack = header.getAck(); + fin = header.getFin(); content = packet.getRawData(); + protocol = Protocol.TCP; } else if(rawPacket.contains(UdpPacket.class)) { final UdpPacket packet = rawPacket.get(UdpPacket.class); final UdpPacket.UdpHeader header = packet.getHeader(); sourcePort = header.getSrcPort().valueAsInt(); destPort = header.getDstPort().valueAsInt(); content = packet.getRawData(); + protocol = Protocol.UDP; } - if(sourceIp != null && sourcePort != -1) { + if(sourceIpString != null && sourcePort != -1) { Optional serviceOptional = Optional.empty(); - if(sourceIp.equals(localIp)) { + if(sourceIpString.equals(localIp)) { serviceOptional = servicesService.findByPort(sourcePort); - } else if(destIp.equals(localIp)) { + } else if(destIpString.equals(localIp)) { serviceOptional = servicesService.findByPort(destPort); } if(serviceOptional.isPresent()) { - log.info("{} {}:{} -> {}:{}", serviceOptional, sourceIp, sourcePort, destIp, destPort); + UnfinishedStream stream = new UnfinishedStream(sourceIp, destIp, sourcePort, destPort, protocol); + + ru.serega6531.packmate.model.Packet packet = ru.serega6531.packmate.model.Packet.builder() + .tempId(packetIdCounter++) + .timestamp(System.currentTimeMillis()) + .content(content) + .build(); + + if(unfinishedStreams.containsKey(stream)) { + unfinishedStreams.get(stream).add(packet); + } else { + List packets = new ArrayList<>(); + packets.add(packet); + unfinishedStreams.put(stream, packets); + } + + if(protocol == Protocol.TCP) { + if(!fins.containsKey(stream)) { + fins.put(stream, new ArrayList<>()); + } + + if(!acks.containsKey(stream)) { + acks.put(stream, new ArrayList<>()); + } + + if(ack && fins.get(stream).contains(destIpString)) { // проверяем destIp, потому что ищем ответ на его fin + //TODO + } + + if(fin && acks.get(stream).contains(destIpString)) { + //TODO + } + } + + log.info("{} {}:{} -> {}:{}, stream size {}, {} {}", + serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort, + unfinishedStreams.get(stream).size(), ack ? "ask" : "", fin ? "fin" : ""); } } } diff --git a/src/main/java/ru/serega6531/packmate/model/Packet.java b/src/main/java/ru/serega6531/packmate/model/Packet.java index 093385d..9ed1b7c 100644 --- a/src/main/java/ru/serega6531/packmate/model/Packet.java +++ b/src/main/java/ru/serega6531/packmate/model/Packet.java @@ -1,5 +1,6 @@ package ru.serega6531.packmate.model; +import lombok.Builder; import lombok.Data; import org.hibernate.annotations.GenericGenerator; @@ -16,16 +17,23 @@ import javax.persistence.*; @org.hibernate.annotations.Parameter(name = "increment_size", value = "1") } ) +@Builder public class Packet { @Id @GeneratedValue(generator = "packet_generator") private Long id; + @Transient + private Long tempId; + @ManyToOne @JoinColumn(name = "stream_id", nullable = false) private Stream stream; private long timestamp; + @Lob + private byte[] content; + } diff --git a/src/main/java/ru/serega6531/packmate/model/UnfinishedStream.java b/src/main/java/ru/serega6531/packmate/model/UnfinishedStream.java index cf264fd..96421ad 100644 --- a/src/main/java/ru/serega6531/packmate/model/UnfinishedStream.java +++ b/src/main/java/ru/serega6531/packmate/model/UnfinishedStream.java @@ -31,4 +31,15 @@ public class UnfinishedStream { return (ipEq1 || ipEq2) && (portEq1 || portEq2) && protocol == o.protocol; } + + @Override + public int hashCode() { + final int PRIME = 59; + + int result = firstIp.hashCode() * secondIp.hashCode(); + result = result * PRIME + (firstPort * secondPort); + result = result * PRIME + protocol.hashCode(); + + return result; + } }