Добавлено сохранение tcp пакетов по таймауту
This commit is contained in:
@@ -24,7 +24,6 @@ import java.net.Inet4Address;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@@ -39,7 +38,6 @@ public class PcapWorker {
|
|||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
private final String localIp;
|
private final String localIp;
|
||||||
private final int udpStreamTimeout;
|
|
||||||
|
|
||||||
private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic
|
private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic
|
||||||
|
|
||||||
@@ -53,13 +51,11 @@ public class PcapWorker {
|
|||||||
public PcapWorker(ServicesService servicesService,
|
public PcapWorker(ServicesService servicesService,
|
||||||
StreamService streamService,
|
StreamService streamService,
|
||||||
@Value("${interface-name}") String interfaceName,
|
@Value("${interface-name}") String interfaceName,
|
||||||
@Value("${local-ip}") String localIp,
|
@Value("${local-ip}") String localIp) throws PcapNativeException {
|
||||||
@Value("${udp-stream-timeout}") int udpStreamTimeout) throws PcapNativeException {
|
|
||||||
this.servicesService = servicesService;
|
this.servicesService = servicesService;
|
||||||
this.streamService = streamService;
|
this.streamService = streamService;
|
||||||
|
|
||||||
this.localIp = localIp;
|
this.localIp = localIp;
|
||||||
this.udpStreamTimeout = udpStreamTimeout;
|
|
||||||
|
|
||||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||||
.namingPattern("pcap-worker").build();
|
.namingPattern("pcap-worker").build();
|
||||||
@@ -205,7 +201,7 @@ public class PcapWorker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int closeUdpStreams() {
|
public int closeTimeoutStreams(Protocol protocol, long timeoutMillis) {
|
||||||
int streamsClosed = 0;
|
int streamsClosed = 0;
|
||||||
final Iterator<Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>>> iterator = unfinishedStreams.entrySet().iterator();
|
final Iterator<Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>>> iterator = unfinishedStreams.entrySet().iterator();
|
||||||
|
|
||||||
@@ -213,9 +209,9 @@ public class PcapWorker {
|
|||||||
final Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> entry = iterator.next();
|
final Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> entry = iterator.next();
|
||||||
final UnfinishedStream stream = entry.getKey();
|
final UnfinishedStream stream = entry.getKey();
|
||||||
|
|
||||||
if (stream.getProtocol() == Protocol.UDP) {
|
if (stream.getProtocol() == protocol) {
|
||||||
final List<ru.serega6531.packmate.model.Packet> packets = entry.getValue();
|
final List<ru.serega6531.packmate.model.Packet> packets = entry.getValue();
|
||||||
if (System.currentTimeMillis() - packets.get(packets.size() - 1).getTimestamp() > TimeUnit.SECONDS.toMillis(udpStreamTimeout)) {
|
if (System.currentTimeMillis() - packets.get(packets.size() - 1).getTimestamp() > timeoutMillis) {
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
streamService.saveNewStream(stream, packets);
|
streamService.saveNewStream(stream, packets);
|
||||||
streamsClosed++;
|
streamsClosed++;
|
||||||
|
|||||||
@@ -0,0 +1,42 @@
|
|||||||
|
package ru.serega6531.packmate;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import ru.serega6531.packmate.model.Protocol;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class TimeoutStreamsSaver {
|
||||||
|
|
||||||
|
private final PcapWorker pcapWorker;
|
||||||
|
private final long udpStreamTimeoutMillis;
|
||||||
|
private final long tcpStreamTimeoutMillis;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public TimeoutStreamsSaver(PcapWorker pcapWorker,
|
||||||
|
@Value("${udp-stream-timeout}") int udpStreamTimeout,
|
||||||
|
@Value("${tcp-stream-timeout}") int tcpStreamTimeout) {
|
||||||
|
this.pcapWorker = pcapWorker;
|
||||||
|
this.udpStreamTimeoutMillis = TimeUnit.SECONDS.toMillis(udpStreamTimeout);
|
||||||
|
this.tcpStreamTimeoutMillis = TimeUnit.SECONDS.toMillis(tcpStreamTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Scheduled(fixedRateString = "PT${timeout-stream-check-interval}S", initialDelayString = "PT${timeout-stream-check-interval}S")
|
||||||
|
public void saveStreams() {
|
||||||
|
int streamsClosed = pcapWorker.closeTimeoutStreams(Protocol.UDP, udpStreamTimeoutMillis);
|
||||||
|
if(streamsClosed > 0) {
|
||||||
|
log.info("Закрыто {} udp стримов", streamsClosed);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamsClosed = pcapWorker.closeTimeoutStreams(Protocol.TCP, tcpStreamTimeoutMillis);
|
||||||
|
if(streamsClosed > 0) {
|
||||||
|
log.info("Закрыто {} tcp стримов", streamsClosed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,27 +0,0 @@
|
|||||||
package ru.serega6531.packmate;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
public class UdpStreamsSaver {
|
|
||||||
|
|
||||||
private final PcapWorker pcapWorker;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
public UdpStreamsSaver(PcapWorker pcapWorker) {
|
|
||||||
this.pcapWorker = pcapWorker;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Scheduled(fixedRateString = "PT${udp-stream-check-interval}S", initialDelayString = "PT${udp-stream-check-interval}S")
|
|
||||||
public void saveStreams() {
|
|
||||||
final int streamsClosed = pcapWorker.closeUdpStreams();
|
|
||||||
if(streamsClosed > 0) {
|
|
||||||
log.info("Закрыто {} udp стримов", streamsClosed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -18,5 +18,6 @@ local-ip: "192.168.0.125"
|
|||||||
account-login: BinaryBears
|
account-login: BinaryBears
|
||||||
account-password: 123456
|
account-password: 123456
|
||||||
udp-stream-timeout: 20 # секунд
|
udp-stream-timeout: 20 # секунд
|
||||||
udp-stream-check-interval: 10
|
tcp-stream-timeout: 120 # секунд
|
||||||
|
timeout-stream-check-interval: 10 # секунд
|
||||||
ignore-empty-packets: true
|
ignore-empty-packets: true
|
||||||
Reference in New Issue
Block a user