Рефакторинг

This commit is contained in:
serega6531
2020-02-03 03:11:10 +03:00
parent 17e3d3cb48
commit b387dc4fe3
3 changed files with 110 additions and 75 deletions

View File

@@ -19,6 +19,8 @@ import ru.serega6531.packmate.service.StreamService;
import javax.annotation.PreDestroy;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -34,7 +36,7 @@ public class PcapWorker implements PacketListener {
private PcapHandle pcap = null;
private final ExecutorService executorService;
private final String localIp;
private final InetAddress localIp;
private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic
@@ -48,11 +50,14 @@ public class PcapWorker implements PacketListener {
public PcapWorker(ServicesService servicesService,
StreamService streamService,
@Value("${interface-name}") String interfaceName,
@Value("${local-ip}") String localIp) throws PcapNativeException {
@Value("${local-ip}") String localIpString) throws PcapNativeException, UnknownHostException {
this.servicesService = servicesService;
this.streamService = streamService;
this.localIp = localIp;
this.localIp = InetAddress.getByName(localIpString);
if(!(this.localIp instanceof Inet4Address)) {
throw new IllegalArgumentException("Only ipv4 local ips are supported");
}
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("pcap-worker").build();
@@ -60,7 +65,7 @@ public class PcapWorker implements PacketListener {
device = Pcaps.getDevByName(interfaceName);
}
public void start() throws PcapNativeException {
void start() throws PcapNativeException {
log.info("Using interface " + device.getName());
pcap = device.openLive(65536, PcapNetworkInterface.PromiscuousMode.PROMISCUOUS, 100);
@@ -69,6 +74,7 @@ public class PcapWorker implements PacketListener {
log.info("Intercept started");
pcap.loop(-1, this); // использовать другой executor?
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
// выходим
} catch (Exception e) {
log.error("Error while capturing packet", e);
@@ -79,7 +85,7 @@ public class PcapWorker implements PacketListener {
@PreDestroy
@SneakyThrows
public void stop() {
private void stop() {
if (pcap != null && pcap.isOpen()) {
pcap.breakLoop();
pcap.close();
@@ -89,15 +95,13 @@ public class PcapWorker implements PacketListener {
}
public void gotPacket(Packet rawPacket) {
Inet4Address sourceIp = null;
Inet4Address destIp = null;
String sourceIpString = null;
String destIpString = null;
int sourcePort = -1;
int destPort = -1;
byte ttl = 0;
byte[] content = null;
Protocol protocol = null;
Inet4Address sourceIp;
Inet4Address destIp;
int sourcePort;
int destPort;
byte ttl;
byte[] content;
Protocol protocol;
boolean ack = false;
boolean fin = false;
boolean rst = false;
@@ -106,9 +110,9 @@ public class PcapWorker implements PacketListener {
final IpV4Packet.IpV4Header header = rawPacket.get(IpV4Packet.class).getHeader();
sourceIp = header.getSrcAddr();
destIp = header.getDstAddr();
sourceIpString = header.getSrcAddr().getHostAddress();
destIpString = header.getDstAddr().getHostAddress();
ttl = header.getTtl();
} else {
return;
}
if (rawPacket.contains(TcpPacket.class)) {
@@ -128,76 +132,94 @@ public class PcapWorker implements PacketListener {
destPort = header.getDstPort().valueAsInt();
content = packet.getPayload() != null ? packet.getPayload().getRawData() : new byte[0];
protocol = Protocol.UDP;
} else {
return;
}
if (sourceIpString != null && sourcePort != -1) {
final Optional<CtfService> serviceOptional =
servicesService.findService(localIp, sourceIpString, sourcePort, destIpString, destPort);
String sourceIpString = sourceIp.getHostAddress();
String destIpString = destIp.getHostAddress();
if (serviceOptional.isPresent()) {
String sourceIpAndPort = sourceIpString + ":" + sourcePort;
String destIpAndPort = destIpString + ":" + destPort;
final Optional<CtfService> serviceOptional =
servicesService.findService(sourceIp, sourcePort, destIp, destPort);
boolean incoming = destIpString.equals(localIp);
if (serviceOptional.isPresent()) {
String sourceIpAndPort = sourceIpString + ":" + sourcePort;
String destIpAndPort = destIpString + ":" + destPort;
UnfinishedStream stream = new UnfinishedStream(sourceIp, destIp, sourcePort, destPort, protocol);
ru.serega6531.packmate.model.Packet packet = ru.serega6531.packmate.model.Packet.builder()
.tempId(packetIdCounter++)
.ttl(ttl)
.timestamp(System.currentTimeMillis())
.incoming(incoming)
.content(content)
.build();
if (unfinishedStreams.containsKey(stream)) {
unfinishedStreams.get(stream).add(packet);
} else {
log.debug("Начат новый стрим");
List<ru.serega6531.packmate.model.Packet> packets = new ArrayList<>();
packets.add(packet);
unfinishedStreams.put(stream, packets);
}
UnfinishedStream stream = addNewPacket(sourceIp, destIp, sourcePort, destPort, ttl, content, protocol);
if (log.isDebugEnabled()) {
log.debug("{} {} {}:{} -> {}:{}, номер пакета {}",
protocol.name().toLowerCase(), serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort,
unfinishedStreams.get(stream).size());
}
if (protocol == Protocol.TCP) {
if (!fins.containsKey(stream)) {
fins.put(stream, new HashSet<>());
}
if (!acks.containsKey(stream)) {
acks.put(stream, new HashSet<>());
}
final Set<String> finsForStream = fins.get(stream);
final Set<String> acksForStream = acks.get(stream);
if (fin) {
finsForStream.add(sourceIpAndPort);
}
if (ack && finsForStream.contains(destIpAndPort)) { // проверяем destIp, потому что ищем ответ на его fin
acksForStream.add(sourceIpAndPort);
}
if (rst || (acksForStream.contains(sourceIpAndPort) && acksForStream.contains(destIpAndPort))) {
streamService.saveNewStream(stream, unfinishedStreams.get(stream));
unfinishedStreams.remove(stream);
fins.remove(stream);
acks.remove(stream);
}
}
} else {
if (protocol == Protocol.TCP) { // udp не имеет фазы закрытия, поэтому закрываем по таймауту
checkTcpTermination(ack, fin, rst, sourceIpAndPort, destIpAndPort, stream);
}
} else { // сервис не найден
if (log.isTraceEnabled()) {
log.trace("{} {}:{} -> {}:{}", protocol.name().toLowerCase(), sourceIpString, sourcePort, destIpString, destPort);
}
}
}
public int closeTimeoutStreams(Protocol protocol, long timeoutMillis) {
private UnfinishedStream addNewPacket(Inet4Address sourceIp, Inet4Address destIp,
int sourcePort, int destPort, byte ttl, byte[] content, Protocol protocol) {
boolean incoming = destIp.equals(localIp);
UnfinishedStream stream = new UnfinishedStream(sourceIp, destIp, sourcePort, destPort, protocol);
ru.serega6531.packmate.model.Packet packet = ru.serega6531.packmate.model.Packet.builder()
.tempId(packetIdCounter++)
.ttl(ttl)
.timestamp(System.currentTimeMillis())
.incoming(incoming)
.content(content)
.build();
if (unfinishedStreams.containsKey(stream)) {
unfinishedStreams.get(stream).add(packet);
} else {
log.debug("Начат новый стрим");
List<ru.serega6531.packmate.model.Packet> packets = new ArrayList<>();
packets.add(packet);
unfinishedStreams.put(stream, packets);
}
return stream;
}
private void checkTcpTermination(boolean ack, boolean fin, boolean rst, String sourceIpAndPort, String destIpAndPort, UnfinishedStream stream) {
if (!fins.containsKey(stream)) {
fins.put(stream, new HashSet<>());
}
if (!acks.containsKey(stream)) {
acks.put(stream, new HashSet<>());
}
final Set<String> finsForStream = fins.get(stream);
final Set<String> acksForStream = acks.get(stream);
if (fin) {
finsForStream.add(sourceIpAndPort);
}
if (ack && finsForStream.contains(destIpAndPort)) { // проверяем destIp, потому что ищем ответ на его fin
acksForStream.add(sourceIpAndPort);
}
// если соединение разорвано или закрыто с помощью fin-ack-fin-ack
if (rst || (acksForStream.contains(sourceIpAndPort) && acksForStream.contains(destIpAndPort))) {
streamService.saveNewStream(stream, unfinishedStreams.get(stream));
unfinishedStreams.remove(stream);
fins.remove(stream);
acks.remove(stream);
}
}
int closeTimeoutStreams(Protocol protocol, long timeoutMillis) {
int streamsClosed = 0;
final Iterator<Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>>> iterator = unfinishedStreams.entrySet().iterator();

View File

@@ -2,12 +2,16 @@ package ru.serega6531.packmate.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ru.serega6531.packmate.model.CtfService;
import ru.serega6531.packmate.model.enums.SubscriptionMessageType;
import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
import ru.serega6531.packmate.repository.ServiceRepository;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -20,18 +24,23 @@ public class ServicesService {
private final ServiceRepository repository;
private final StreamSubscriptionService subscriptionService;
private final InetAddress localIp;
private final Map<Integer, CtfService> services = new HashMap<>();
@Autowired
public ServicesService(ServiceRepository repository, StreamSubscriptionService subscriptionService) {
public ServicesService(ServiceRepository repository,
StreamSubscriptionService subscriptionService,
@Value("${local-ip}") String localIpString) throws UnknownHostException {
this.repository = repository;
this.subscriptionService = subscriptionService;
this.localIp = InetAddress.getByName(localIpString);
repository.findAll().forEach(s -> services.put(s.getPort(), s));
log.info("Loaded {} services", services.size());
}
public Optional<CtfService> findService(String localIp, String firstIp, int firstPort, String secondIp, int secondPort) {
public Optional<CtfService> findService(Inet4Address firstIp, int firstPort, Inet4Address secondIp, int secondPort) {
if (firstIp.equals(localIp)) {
return findByPort(firstPort);
} else if (secondIp.equals(localIp)) {

View File

@@ -43,7 +43,7 @@ public class StreamService {
private final String localIp;
private final boolean ignoreEmptyPackets;
private final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
private final java.util.regex.Pattern userAgentPattern = java.util.regex.Pattern.compile("User-Agent: (.+)\\r\\n");
@Autowired
@@ -347,7 +347,11 @@ public class StreamService {
private String calculateUserAgentHash(String ua) {
char[] alphabet = "abcdefghijklmnopqrstuvwxyz0123456789".toCharArray();
int l = alphabet.length;
final int hash = Math.abs(ua.hashCode()) % (l * l * l);
int hashCode = ua.hashCode();
if(hashCode == Integer.MIN_VALUE) {
hashCode = Integer.MAX_VALUE;
}
final int hash = Math.abs(hashCode) % (l * l * l);
return "" + alphabet[hash % l] + alphabet[(hash / l) % l] + alphabet[(hash / (l * l)) % l];
}