Добавлено сохранение udp стримов
This commit is contained in:
@@ -5,8 +5,10 @@ import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
public class PackmateApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -23,6 +23,7 @@ import java.net.Inet4Address;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@Component
|
||||
@@ -37,6 +38,7 @@ public class PcapWorker {
|
||||
private final ExecutorService executorService;
|
||||
|
||||
private final String localIp;
|
||||
private final int udpStreamTimeout;
|
||||
|
||||
private long packetIdCounter = 0; // оно однопоточное, так что пусть будет без atomic
|
||||
|
||||
@@ -50,11 +52,13 @@ public class PcapWorker {
|
||||
public PcapWorker(ServicesService servicesService,
|
||||
StreamService streamService,
|
||||
@Value("${interface-name}") String interfaceName,
|
||||
@Value("${local-ip}") String localIp) throws PcapNativeException {
|
||||
@Value("${local-ip}") String localIp,
|
||||
@Value("${udp-stream-timeout}") int udpStreamTimeout) throws PcapNativeException {
|
||||
this.servicesService = servicesService;
|
||||
this.streamService = streamService;
|
||||
|
||||
this.localIp = localIp;
|
||||
this.udpStreamTimeout = udpStreamTimeout;
|
||||
|
||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("pcap-worker").build();
|
||||
@@ -195,4 +199,26 @@ public class PcapWorker {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public int closeUdpStreams() {
|
||||
int streamsClosed = 0;
|
||||
final Iterator<Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>>> iterator = unfinishedStreams.entrySet().iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
final Map.Entry<UnfinishedStream, List<ru.serega6531.packmate.model.Packet>> entry = iterator.next();
|
||||
final UnfinishedStream stream = entry.getKey();
|
||||
|
||||
if(stream.getProtocol() == Protocol.UDP) {
|
||||
final List<ru.serega6531.packmate.model.Packet> packets = entry.getValue();
|
||||
if(System.currentTimeMillis() - packets.get(packets.size() - 1).getTimestamp() > TimeUnit.SECONDS.toMillis(udpStreamTimeout)) {
|
||||
iterator.remove();
|
||||
streamService.saveNewStream(stream, packets);
|
||||
streamsClosed++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return streamsClosed;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
27
src/main/java/ru/serega6531/packmate/UdpStreamsSaver.java
Normal file
27
src/main/java/ru/serega6531/packmate/UdpStreamsSaver.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package ru.serega6531.packmate;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class UdpStreamsSaver {
|
||||
|
||||
private final PcapWorker pcapWorker;
|
||||
|
||||
@Autowired
|
||||
public UdpStreamsSaver(PcapWorker pcapWorker) {
|
||||
this.pcapWorker = pcapWorker;
|
||||
}
|
||||
|
||||
@Scheduled(fixedRateString = "PT${udp-stream-check-interval}S")
|
||||
public void saveStreams() {
|
||||
final int streamsClosed = pcapWorker.closeUdpStreams();
|
||||
if(streamsClosed > 0) {
|
||||
log.info("Закрыто {} udp стримов", streamsClosed);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user