Работа над разделением пакетов на стримы

This commit is contained in:
serega6531
2019-04-29 01:59:42 +03:00
parent 4ada121c9b
commit bbe4eaf56e
3 changed files with 83 additions and 12 deletions

View File

@@ -21,9 +21,8 @@ import ru.serega6531.packmate.service.ServicesService;
import ru.serega6531.packmate.service.StreamService;
import javax.annotation.PreDestroy;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.net.Inet4Address;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@@ -43,7 +42,13 @@ public class PcapWorker {
private final String localIp;
private final Set<UnfinishedStream> unfinishedStreams = new HashSet<>();
private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic
private final Map<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> unfinishedStreams = new HashMap<>();
// в следующих мапах в листах srcIp соответствующего пакета
private final Map<UnfinishedStream, List<String>> fins = new HashMap<>();
private final Map<UnfinishedStream, List<String>> acks = new HashMap<>();
@Autowired
public PcapWorker(ServicesService servicesService,
@@ -100,16 +105,23 @@ public class PcapWorker {
}
private void processPacket(Packet rawPacket) {
String sourceIp = null;
String destIp = null;
Inet4Address sourceIp = null;
Inet4Address destIp = null;
String sourceIpString = null;
String destIpString = null;
int sourcePort = -1;
int destPort = -1;
byte[] content = null;
Protocol protocol = null;
boolean ack = false;
boolean fin = false;
if(rawPacket.contains(IpV4Packet.class)){
final IpV4Packet.IpV4Header header = rawPacket.get(IpV4Packet.class).getHeader();
sourceIp = header.getSrcAddr().getHostAddress();
destIp = header.getDstAddr().getHostAddress();
sourceIp = header.getSrcAddr();
destIp = header.getDstAddr();
sourceIpString = header.getSrcAddr().getHostAddress();
destIpString = header.getDstAddr().getHostAddress();
}
if(rawPacket.contains(TcpPacket.class)) {
@@ -117,26 +129,66 @@ public class PcapWorker {
final TcpPacket.TcpHeader header = packet.getHeader();
sourcePort = header.getSrcPort().valueAsInt();
destPort = header.getDstPort().valueAsInt();
ack = header.getAck();
fin = header.getFin();
content = packet.getRawData();
protocol = Protocol.TCP;
} else if(rawPacket.contains(UdpPacket.class)) {
final UdpPacket packet = rawPacket.get(UdpPacket.class);
final UdpPacket.UdpHeader header = packet.getHeader();
sourcePort = header.getSrcPort().valueAsInt();
destPort = header.getDstPort().valueAsInt();
content = packet.getRawData();
protocol = Protocol.UDP;
}
if(sourceIp != null && sourcePort != -1) {
if(sourceIpString != null && sourcePort != -1) {
Optional<CtfService> serviceOptional = Optional.empty();
if(sourceIp.equals(localIp)) {
if(sourceIpString.equals(localIp)) {
serviceOptional = servicesService.findByPort(sourcePort);
} else if(destIp.equals(localIp)) {
} else if(destIpString.equals(localIp)) {
serviceOptional = servicesService.findByPort(destPort);
}
if(serviceOptional.isPresent()) {
log.info("{} {}:{} -> {}:{}", serviceOptional, sourceIp, sourcePort, destIp, destPort);
UnfinishedStream stream = new UnfinishedStream(sourceIp, destIp, sourcePort, destPort, protocol);
ru.serega6531.packmate.model.Packet packet = ru.serega6531.packmate.model.Packet.builder()
.tempId(packetIdCounter++)
.timestamp(System.currentTimeMillis())
.content(content)
.build();
if(unfinishedStreams.containsKey(stream)) {
unfinishedStreams.get(stream).add(packet);
} else {
List<ru.serega6531.packmate.model.Packet> packets = new ArrayList<>();
packets.add(packet);
unfinishedStreams.put(stream, packets);
}
if(protocol == Protocol.TCP) {
if(!fins.containsKey(stream)) {
fins.put(stream, new ArrayList<>());
}
if(!acks.containsKey(stream)) {
acks.put(stream, new ArrayList<>());
}
if(ack && fins.get(stream).contains(destIpString)) { // проверяем destIp, потому что ищем ответ на его fin
//TODO
}
if(fin && acks.get(stream).contains(destIpString)) {
//TODO
}
}
log.info("{} {}:{} -> {}:{}, stream size {}, {} {}",
serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort,
unfinishedStreams.get(stream).size(), ack ? "ask" : "", fin ? "fin" : "");
}
}
}