Merge branch 'timelines' into 'master'
Timelines See merge request packmate/Packmate!2
This commit is contained in:
@@ -5,15 +5,15 @@ import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
import ru.serega6531.packmate.service.StreamSubscriptionService;
|
||||
import ru.serega6531.packmate.service.SubscriptionService;
|
||||
|
||||
@Component
|
||||
public class WebSocketHandler extends TextWebSocketHandler {
|
||||
|
||||
private final StreamSubscriptionService subscriptionService;
|
||||
private final SubscriptionService subscriptionService;
|
||||
|
||||
@Autowired
|
||||
public WebSocketHandler(StreamSubscriptionService subscriptionService) {
|
||||
public WebSocketHandler(SubscriptionService subscriptionService) {
|
||||
this.subscriptionService = subscriptionService;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
package ru.serega6531.packmate.model.enums;
|
||||
|
||||
public enum SubscriptionMessageType {
|
||||
SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, DELETE_PATTERN, NEW_STREAM
|
||||
SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, DELETE_PATTERN, NEW_STREAM, COUNTERS_UPDATE
|
||||
}
|
||||
|
||||
18
src/main/java/ru/serega6531/packmate/model/pojo/Counter.java
Normal file
18
src/main/java/ru/serega6531/packmate/model/pojo/Counter.java
Normal file
@@ -0,0 +1,18 @@
|
||||
package ru.serega6531.packmate.model.pojo;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public class Counter {
|
||||
|
||||
private int value = 0;
|
||||
|
||||
public void increment() {
|
||||
value++;
|
||||
}
|
||||
|
||||
public void increment(int num) {
|
||||
value += num;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package ru.serega6531.packmate.model.pojo;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Getter
|
||||
public class CountersHolder {
|
||||
|
||||
private Map<Integer, Integer> servicesPackets;
|
||||
private Map<Integer, Integer> servicesStreams;
|
||||
|
||||
private int totalPackets;
|
||||
private int totalStreams;
|
||||
|
||||
public CountersHolder(Map<Integer, Integer> servicesPackets, Map<Integer, Integer> servicesStreams,
|
||||
int totalPackets, int totalStreams) {
|
||||
this.servicesPackets = servicesPackets;
|
||||
this.servicesStreams = servicesStreams;
|
||||
this.totalPackets = totalPackets;
|
||||
this.totalStreams = totalStreams;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package ru.serega6531.packmate.service;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import ru.serega6531.packmate.model.enums.SubscriptionMessageType;
|
||||
import ru.serega6531.packmate.model.pojo.Counter;
|
||||
import ru.serega6531.packmate.model.pojo.CountersHolder;
|
||||
import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class CountingService {
|
||||
|
||||
private final SubscriptionService subscriptionService;
|
||||
|
||||
private Map<Integer, Counter> servicesPackets = new HashMap<>();
|
||||
private Map<Integer, Counter> servicesStreams = new HashMap<>();
|
||||
|
||||
private Counter totalPackets = new Counter();
|
||||
private Counter totalStreams = new Counter();
|
||||
|
||||
@Autowired
|
||||
public CountingService(SubscriptionService subscriptionService) {
|
||||
this.subscriptionService = subscriptionService;
|
||||
}
|
||||
|
||||
void countStream(int serviceId, int packets) {
|
||||
getCounter(servicesPackets, serviceId).increment(packets);
|
||||
getCounter(servicesStreams, serviceId).increment();
|
||||
|
||||
totalPackets.increment(packets);
|
||||
totalStreams.increment();
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 * * ? * *")
|
||||
public void sendCounters() {
|
||||
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.COUNTERS_UPDATE,
|
||||
new CountersHolder(
|
||||
toIntegerMap(servicesPackets), toIntegerMap(servicesStreams),
|
||||
totalPackets.getValue(), totalStreams.getValue())));
|
||||
|
||||
servicesPackets.clear();
|
||||
servicesStreams.clear();
|
||||
totalPackets = new Counter();
|
||||
totalStreams = new Counter();
|
||||
}
|
||||
|
||||
private Map<Integer, Integer> toIntegerMap(Map<Integer, Counter> map) {
|
||||
return map.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
ent -> ent.getValue().getValue()));
|
||||
}
|
||||
|
||||
private Counter getCounter(Map<Integer, Counter> counters, int serviceId) {
|
||||
return counters.computeIfAbsent(serviceId, c -> new Counter());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -22,13 +22,13 @@ import java.util.stream.Collectors;
|
||||
public class PatternService {
|
||||
|
||||
private final PatternRepository repository;
|
||||
private final StreamSubscriptionService subscriptionService;
|
||||
private final SubscriptionService subscriptionService;
|
||||
|
||||
private final Map<Integer, Pattern> patterns = new HashMap<>();
|
||||
|
||||
@Autowired
|
||||
public PatternService(PatternRepository repository,
|
||||
StreamSubscriptionService subscriptionService) {
|
||||
SubscriptionService subscriptionService) {
|
||||
this.repository = repository;
|
||||
this.subscriptionService = subscriptionService;
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ import java.util.Optional;
|
||||
public class ServicesService {
|
||||
|
||||
private final ServiceRepository repository;
|
||||
private final StreamSubscriptionService subscriptionService;
|
||||
private final SubscriptionService subscriptionService;
|
||||
|
||||
private final InetAddress localIp;
|
||||
|
||||
@@ -30,7 +30,7 @@ public class ServicesService {
|
||||
|
||||
@Autowired
|
||||
public ServicesService(ServiceRepository repository,
|
||||
StreamSubscriptionService subscriptionService,
|
||||
SubscriptionService subscriptionService,
|
||||
@Value("${local-ip}") String localIpString) throws UnknownHostException {
|
||||
this.repository = repository;
|
||||
this.subscriptionService = subscriptionService;
|
||||
|
||||
@@ -30,7 +30,8 @@ public class StreamService {
|
||||
private final StreamRepository repository;
|
||||
private final PatternService patternService;
|
||||
private final ServicesService servicesService;
|
||||
private final StreamSubscriptionService subscriptionService;
|
||||
private final CountingService countingService;
|
||||
private final SubscriptionService subscriptionService;
|
||||
|
||||
private final boolean ignoreEmptyPackets;
|
||||
|
||||
@@ -40,11 +41,13 @@ public class StreamService {
|
||||
public StreamService(StreamRepository repository,
|
||||
PatternService patternService,
|
||||
ServicesService servicesService,
|
||||
StreamSubscriptionService subscriptionService,
|
||||
CountingService countingService,
|
||||
SubscriptionService subscriptionService,
|
||||
@Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) {
|
||||
this.repository = repository;
|
||||
this.patternService = patternService;
|
||||
this.servicesService = servicesService;
|
||||
this.countingService = countingService;
|
||||
this.subscriptionService = subscriptionService;
|
||||
this.ignoreEmptyPackets = ignoreEmptyPackets;
|
||||
}
|
||||
@@ -88,6 +91,8 @@ public class StreamService {
|
||||
stream.setEndTimestamp(packets.get(packets.size() - 1).getTimestamp());
|
||||
stream.setService(service.getPort());
|
||||
|
||||
countingService.countStream(service.getPort(), packets.size());
|
||||
|
||||
new StreamOptimizer(service, packets).optimizeStream();
|
||||
processUserAgent(packets, stream);
|
||||
|
||||
|
||||
@@ -18,14 +18,14 @@ import java.util.Objects;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class StreamSubscriptionService {
|
||||
public class SubscriptionService {
|
||||
|
||||
private final List<WebSocketSession> subscribers = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@Autowired
|
||||
public StreamSubscriptionService(ObjectMapper mapper) {
|
||||
public SubscriptionService(ObjectMapper mapper) {
|
||||
this.mapper = mapper;
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ public class StreamSubscriptionService {
|
||||
|
||||
public void removeSubscriber(WebSocketSession session) {
|
||||
subscribers.remove(session);
|
||||
log.info("User unsubscribed {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName());
|
||||
log.info("User unsubscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName());
|
||||
}
|
||||
|
||||
void broadcast(SubscriptionMessage message) {
|
||||
Reference in New Issue
Block a user