Отправка сообщения в начале и конце обработки

This commit is contained in:
serega6531
2020-04-06 23:52:08 +03:00
parent 733d92cbf8
commit e1b11cfdee
8 changed files with 65 additions and 12 deletions

View File

@@ -22,6 +22,7 @@ import ru.serega6531.packmate.pcap.LivePcapWorker;
import ru.serega6531.packmate.pcap.PcapWorker;
import ru.serega6531.packmate.service.ServicesService;
import ru.serega6531.packmate.service.StreamService;
import ru.serega6531.packmate.service.SubscriptionService;
import java.net.UnknownHostException;
@@ -48,6 +49,7 @@ public class ApplicationConfiguration extends WebSecurityConfigurerAdapter imple
@Autowired
public PcapWorker pcapWorker(ServicesService servicesService,
StreamService streamService,
SubscriptionService subscriptionService,
@Value("${local-ip}") String localIpString,
@Value("${interface-name}") String interfaceName,
@Value("${pcap-file}") String filename,
@@ -55,7 +57,7 @@ public class ApplicationConfiguration extends WebSecurityConfigurerAdapter imple
if(captureMode == CaptureMode.LIVE) {
return new LivePcapWorker(servicesService, streamService, localIpString, interfaceName);
} else {
return new FilePcapWorker(servicesService, streamService, localIpString, filename);
return new FilePcapWorker(servicesService, streamService, subscriptionService, localIpString, filename);
}
}

View File

@@ -1,5 +1,9 @@
package ru.serega6531.packmate.model.enums;
public enum SubscriptionMessageType {
SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, DELETE_PATTERN, NEW_STREAM, COUNTERS_UPDATE
SAVE_SERVICE, SAVE_PATTERN,
DELETE_SERVICE, DELETE_PATTERN,
NEW_STREAM,
COUNTERS_UPDATE,
PCAP_STARTED, PCAP_STOPPED
}

View File

@@ -7,8 +7,11 @@ import org.pcap4j.core.PcapNativeException;
import org.pcap4j.core.Pcaps;
import org.pcap4j.packet.Packet;
import ru.serega6531.packmate.model.enums.Protocol;
import ru.serega6531.packmate.model.enums.SubscriptionMessageType;
import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
import ru.serega6531.packmate.service.ServicesService;
import ru.serega6531.packmate.service.StreamService;
import ru.serega6531.packmate.service.SubscriptionService;
import java.io.EOFException;
import java.io.File;
@@ -17,13 +20,16 @@ import java.net.UnknownHostException;
@Slf4j
public class FilePcapWorker extends AbstractPcapWorker {
private final SubscriptionService subscriptionService;
private final File file;
public FilePcapWorker(ServicesService servicesService,
StreamService streamService,
SubscriptionService subscriptionService,
String localIpString,
String filename) throws UnknownHostException {
super(servicesService, streamService, localIpString);
this.subscriptionService = subscriptionService;
file = new File(filename);
if(!file.exists()) {
@@ -67,5 +73,7 @@ public class FilePcapWorker extends AbstractPcapWorker {
closeAllStreams(Protocol.TCP);
closeAllStreams(Protocol.UDP);
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.PCAP_STOPPED, null));
}
}

View File

@@ -4,6 +4,8 @@ import lombok.Getter;
import org.pcap4j.core.PcapNativeException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.serega6531.packmate.model.enums.SubscriptionMessageType;
import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
import ru.serega6531.packmate.pcap.PcapWorker;
@Service
@@ -12,16 +14,19 @@ public class PcapService {
@Getter
private boolean started = false;
private final SubscriptionService subscriptionService;
private final PcapWorker worker;
@Autowired
public PcapService(PcapWorker worker) {
public PcapService(SubscriptionService subscriptionService, PcapWorker worker) {
this.subscriptionService = subscriptionService;
this.worker = worker;
}
public synchronized void start() throws PcapNativeException {
if(!started) {
started = true;
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.PCAP_STARTED, null));
worker.start();
}
}

View File

@@ -2,6 +2,7 @@ package ru.serega6531.packmate.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -39,10 +40,15 @@ public class SubscriptionService {
log.info("User unsubscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName());
}
void broadcast(SubscriptionMessage message) {
/**
* Вызов потокобезопасный
*/
@SneakyThrows
public void broadcast(SubscriptionMessage message) {
final TextMessage messageJson = objectToTextMessage(message);
subscribers.forEach(s -> {
try {
s.sendMessage(objectToTextMessage(message));
s.sendMessage(messageJson);
} catch (IOException | SockJsTransportFailureException e) {
log.warn("WS", e);
}