Рефакторинг

This commit is contained in:
serega6531
2020-02-04 01:01:43 +03:00
parent c59910a0ea
commit bc847f2045
2 changed files with 19 additions and 21 deletions

View File

@@ -1,5 +1,6 @@
package ru.serega6531.packmate;
import com.google.common.collect.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -41,11 +42,11 @@ public class PcapWorker implements PacketListener {
private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic
private final Map<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> unfinishedStreams = new HashMap<>();
private final ListMultimap<UnfinishedStream, ru.serega6531.packmate.model.Packet> unfinishedStreams = ArrayListMultimap.create();
// в следующих мапах в Set находится srcIp соответствующего пакета
private final Map<UnfinishedStream, Set<ImmutablePair<Inet4Address, Integer>>> fins = new HashMap<>();
private final Map<UnfinishedStream, Set<ImmutablePair<Inet4Address, Integer>>> acks = new HashMap<>();
private final SetMultimap<UnfinishedStream, ImmutablePair<Inet4Address, Integer>> fins = HashMultimap.create();
private final SetMultimap<UnfinishedStream, ImmutablePair<Inet4Address, Integer>> acks = HashMultimap.create();
@Autowired
public PcapWorker(ServicesService servicesService,
@@ -176,44 +177,40 @@ public class PcapWorker implements PacketListener {
.content(content)
.build();
if (unfinishedStreams.containsKey(stream)) {
unfinishedStreams.get(stream).add(packet);
} else {
if (!unfinishedStreams.containsKey(stream)) {
log.debug("Начат новый стрим");
List<ru.serega6531.packmate.model.Packet> packets = new ArrayList<>();
packets.add(packet);
unfinishedStreams.put(stream, packets);
}
unfinishedStreams.put(stream, packet);
return stream;
}
private void checkTcpTermination(boolean ack, boolean fin, boolean rst,
ImmutablePair<Inet4Address, Integer> sourceIpAndPort, ImmutablePair<Inet4Address, Integer> destIpAndPort,
UnfinishedStream stream) {
final Set<ImmutablePair<Inet4Address, Integer>> finsForStream = fins.computeIfAbsent(stream, k -> new HashSet<>());
final Set<ImmutablePair<Inet4Address, Integer>> acksForStream = acks.computeIfAbsent(stream, k -> new HashSet<>());
if (fin) {
finsForStream.add(sourceIpAndPort);
fins.put(stream, sourceIpAndPort);
}
if (ack && finsForStream.contains(destIpAndPort)) { // проверяем destIp, потому что ищем ответ на его fin
acksForStream.add(sourceIpAndPort);
if (ack && fins.containsEntry(stream, destIpAndPort)) { // проверяем destIp, потому что ищем ответ на его fin
acks.put(stream, sourceIpAndPort);
}
// если соединение разорвано с помощью rst или закрыто с помощью fin-ack-fin-ack
if (rst || (acksForStream.contains(sourceIpAndPort) && acksForStream.contains(destIpAndPort))) {
if (rst || (acks.containsEntry(stream, sourceIpAndPort) && acks.containsEntry(stream, destIpAndPort))) {
streamService.saveNewStream(stream, unfinishedStreams.get(stream));
unfinishedStreams.remove(stream);
fins.remove(stream);
acks.remove(stream);
unfinishedStreams.removeAll(stream);
fins.removeAll(stream);
acks.removeAll(stream);
}
}
int closeTimeoutStreams(Protocol protocol, long timeoutMillis) {
int streamsClosed = 0;
final Iterator<Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>>> iterator = unfinishedStreams.entrySet().iterator();
final Iterator<Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>>> iterator =
Multimaps.asMap(unfinishedStreams).entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> entry = iterator.next();
@@ -229,8 +226,8 @@ public class PcapWorker implements PacketListener {
iterator.remove();
if (protocol == Protocol.TCP) {
fins.remove(stream);
acks.remove(stream);
fins.removeAll(stream);
acks.removeAll(stream);
}
}
}