Отдельные мапы для разных протоколов

This commit is contained in:
serega6531
2020-03-11 02:33:16 +03:00
parent 716c60fdee
commit 1f76d2c9dd
2 changed files with 25 additions and 19 deletions

View File

@@ -45,7 +45,8 @@ public class PcapWorker implements PacketListener {
private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic
private final ListMultimap<UnfinishedStream, ru.serega6531.packmate.model.Packet> unfinishedStreams = ArrayListMultimap.create();
private final ListMultimap<UnfinishedStream, ru.serega6531.packmate.model.Packet> unfinishedTcpStreams = ArrayListMultimap.create();
private final ListMultimap<UnfinishedStream, ru.serega6531.packmate.model.Packet> unfinishedUdpStreams = ArrayListMultimap.create();
// в следующих мапах в значениях находится srcIp соответствующего пакета
private final SetMultimap<UnfinishedStream, ImmutablePair<Inet4Address, Integer>> fins = HashMultimap.create();
@@ -142,7 +143,7 @@ public class PcapWorker implements PacketListener {
if (log.isDebugEnabled()) {
log.debug("tcp {} {}:{} -> {}:{}, номер пакета {}",
serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort,
unfinishedStreams.get(stream).size());
unfinishedTcpStreams.get(stream).size());
}
checkTcpTermination(ack, fin, rst, new ImmutablePair<>(sourceIp, sourcePort), new ImmutablePair<>(destIp, destPort), stream);
@@ -179,7 +180,7 @@ public class PcapWorker implements PacketListener {
if (log.isDebugEnabled()) {
log.debug("udp {} {}:{} -> {}:{}, номер пакета {}",
serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort,
unfinishedStreams.get(stream).size());
unfinishedUdpStreams.get(stream).size());
}
});
} else { // сервис не найден
@@ -203,11 +204,14 @@ public class PcapWorker implements PacketListener {
.content(content)
.build();
if (!unfinishedStreams.containsKey(stream)) {
final ListMultimap<UnfinishedStream, ru.serega6531.packmate.model.Packet> streams =
(protocol == Protocol.TCP) ? this.unfinishedTcpStreams : this.unfinishedUdpStreams;
if (!streams.containsKey(stream)) {
log.debug("Начат новый стрим");
}
unfinishedStreams.put(stream, packet);
streams.put(stream, packet);
return stream;
}
@@ -229,9 +233,9 @@ public class PcapWorker implements PacketListener {
// если соединение разорвано с помощью rst или закрыто с помощью fin-ack-fin-ack
if (rst || (acks.containsEntry(stream, sourceIpAndPort) && acks.containsEntry(stream, destIpAndPort))) {
streamService.saveNewStream(stream, unfinishedStreams.get(stream));
streamService.saveNewStream(stream, unfinishedTcpStreams.get(stream));
unfinishedStreams.removeAll(stream);
unfinishedTcpStreams.removeAll(stream);
fins.removeAll(stream);
acks.removeAll(stream);
}
@@ -243,13 +247,15 @@ public class PcapWorker implements PacketListener {
int streamsClosed = 0;
final long time = System.currentTimeMillis();
final ListMultimap<UnfinishedStream, ru.serega6531.packmate.model.Packet> streams =
(protocol == Protocol.TCP) ? this.unfinishedTcpStreams : this.unfinishedUdpStreams;
final Map<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> oldStreams =
Multimaps.asMap(unfinishedStreams).entrySet().stream()
Multimaps.asMap(streams).entrySet().stream()
.filter(entry -> {
final List<ru.serega6531.packmate.model.Packet> packets = entry.getValue();
return time - packets.get(packets.size() - 1).getTimestamp() > timeoutMillis;
})
.filter(entry -> entry.getKey().getProtocol() == protocol)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> entry : oldStreams.entrySet()) {
@@ -265,7 +271,7 @@ public class PcapWorker implements PacketListener {
acks.removeAll(stream);
}
unfinishedStreams.removeAll(stream);
streams.removeAll(stream);
}
return streamsClosed;