Исправлено сообщение о сохранении стримов по таймауту

This commit is contained in:
serega6531
2019-05-15 01:06:01 +03:00
parent add19bc27a
commit 19ceb97bcc
2 changed files with 16 additions and 5 deletions

View File

@@ -203,9 +203,16 @@ public class PcapWorker implements PacketListener {
if (stream.getProtocol() == protocol) { if (stream.getProtocol() == protocol) {
final List<ru.serega6531.packmate.model.Packet> packets = entry.getValue(); final List<ru.serega6531.packmate.model.Packet> packets = entry.getValue();
if (System.currentTimeMillis() - packets.get(packets.size() - 1).getTimestamp() > timeoutMillis) { if (System.currentTimeMillis() - packets.get(packets.size() - 1).getTimestamp() > timeoutMillis) {
if(streamService.saveNewStream(stream, packets)) {
streamsClosed++;
}
iterator.remove(); iterator.remove();
streamService.saveNewStream(stream, packets);
streamsClosed++; if(protocol == Protocol.TCP) {
fins.remove(stream);
acks.remove(stream);
}
} }
} }
} }

View File

@@ -42,8 +42,11 @@ public class StreamService {
this.ignoreEmptyPackets = ignoreEmptyPackets; this.ignoreEmptyPackets = ignoreEmptyPackets;
} }
/**
* @return был ли сохранен стрим
*/
@Transactional @Transactional
public void saveNewStream(UnfinishedStream unfinishedStream, List<Packet> packets) { public boolean saveNewStream(UnfinishedStream unfinishedStream, List<Packet> packets) {
final Optional<CtfService> serviceOptional = servicesService.findService( final Optional<CtfService> serviceOptional = servicesService.findService(
localIp, localIp,
unfinishedStream.getFirstIp().getHostAddress(), unfinishedStream.getFirstIp().getHostAddress(),
@@ -55,7 +58,7 @@ public class StreamService {
if (!serviceOptional.isPresent()) { if (!serviceOptional.isPresent()) {
log.warn("Не удалось сохранить стрим: сервиса на порту {} или {} не существует", log.warn("Не удалось сохранить стрим: сервиса на порту {} или {} не существует",
unfinishedStream.getFirstPort(), unfinishedStream.getSecondPort()); unfinishedStream.getFirstPort(), unfinishedStream.getSecondPort());
return; return false;
} }
Stream stream = new Stream(); Stream stream = new Stream();
@@ -69,7 +72,7 @@ public class StreamService {
if(packets.isEmpty()) { if(packets.isEmpty()) {
log.debug("Стрим состоит только из пустых пакетов и не будет сохранен"); log.debug("Стрим состоит только из пустых пакетов и не будет сохранен");
return; return false;
} }
} }
@@ -89,6 +92,7 @@ public class StreamService {
savedStream = save(savedStream); savedStream = save(savedStream);
subscriptionService.broadcastNewStream(savedStream); subscriptionService.broadcastNewStream(savedStream);
return true;
} }
public Stream save(Stream stream) { public Stream save(Stream stream) {