diff --git a/src/main/java/ru/serega6531/packmate/WebSocketHandler.java b/src/main/java/ru/serega6531/packmate/WebSocketHandler.java index f2c93e9..47d362a 100644 --- a/src/main/java/ru/serega6531/packmate/WebSocketHandler.java +++ b/src/main/java/ru/serega6531/packmate/WebSocketHandler.java @@ -5,15 +5,15 @@ import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; -import ru.serega6531.packmate.service.StreamSubscriptionService; +import ru.serega6531.packmate.service.SubscriptionService; @Component public class WebSocketHandler extends TextWebSocketHandler { - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; @Autowired - public WebSocketHandler(StreamSubscriptionService subscriptionService) { + public WebSocketHandler(SubscriptionService subscriptionService) { this.subscriptionService = subscriptionService; } diff --git a/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java b/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java index af46d09..1d44fda 100644 --- a/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java +++ b/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java @@ -1,5 +1,5 @@ package ru.serega6531.packmate.model.enums; public enum SubscriptionMessageType { - SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, DELETE_PATTERN, NEW_STREAM + SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, DELETE_PATTERN, NEW_STREAM, COUNTERS_UPDATE } diff --git a/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java b/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java new file mode 100644 index 0000000..4b601da --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java @@ -0,0 +1,18 @@ +package ru.serega6531.packmate.model.pojo; + +import lombok.Getter; + +@Getter +public class Counter { + + private int value = 0; + + public void increment() { + value++; + } + + public void increment(int num) { + value += num; + } + +} diff --git a/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java b/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java new file mode 100644 index 0000000..b39bb8e --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java @@ -0,0 +1,23 @@ +package ru.serega6531.packmate.model.pojo; + +import lombok.Getter; + +import java.util.Map; + +@Getter +public class CountersHolder { + + private Map servicesPackets; + private Map servicesStreams; + + private int totalPackets; + private int totalStreams; + + public CountersHolder(Map servicesPackets, Map servicesStreams, + int totalPackets, int totalStreams) { + this.servicesPackets = servicesPackets; + this.servicesStreams = servicesStreams; + this.totalPackets = totalPackets; + this.totalStreams = totalStreams; + } +} diff --git a/src/main/java/ru/serega6531/packmate/service/CountingService.java b/src/main/java/ru/serega6531/packmate/service/CountingService.java new file mode 100644 index 0000000..a23f8f1 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/CountingService.java @@ -0,0 +1,63 @@ +package ru.serega6531.packmate.service; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import ru.serega6531.packmate.model.enums.SubscriptionMessageType; +import ru.serega6531.packmate.model.pojo.Counter; +import ru.serega6531.packmate.model.pojo.CountersHolder; +import ru.serega6531.packmate.model.pojo.SubscriptionMessage; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +@Service +public class CountingService { + + private final SubscriptionService subscriptionService; + + private Map servicesPackets = new HashMap<>(); + private Map servicesStreams = new HashMap<>(); + + private Counter totalPackets = new Counter(); + private Counter totalStreams = new Counter(); + + @Autowired + public CountingService(SubscriptionService subscriptionService) { + this.subscriptionService = subscriptionService; + } + + void countStream(int serviceId, int packets) { + getCounter(servicesPackets, serviceId).increment(packets); + getCounter(servicesStreams, serviceId).increment(); + + totalPackets.increment(packets); + totalStreams.increment(); + } + + @Scheduled(cron = "0 * * ? * *") + public void sendCounters() { + subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.COUNTERS_UPDATE, + new CountersHolder( + toIntegerMap(servicesPackets), toIntegerMap(servicesStreams), + totalPackets.getValue(), totalStreams.getValue()))); + + servicesPackets.clear(); + servicesStreams.clear(); + totalPackets = new Counter(); + totalStreams = new Counter(); + } + + private Map toIntegerMap(Map map) { + return map.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + ent -> ent.getValue().getValue())); + } + + private Counter getCounter(Map counters, int serviceId) { + return counters.computeIfAbsent(serviceId, c -> new Counter()); + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/PatternService.java b/src/main/java/ru/serega6531/packmate/service/PatternService.java index 442e430..46974b9 100644 --- a/src/main/java/ru/serega6531/packmate/service/PatternService.java +++ b/src/main/java/ru/serega6531/packmate/service/PatternService.java @@ -22,13 +22,13 @@ import java.util.stream.Collectors; public class PatternService { private final PatternRepository repository; - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; private final Map patterns = new HashMap<>(); @Autowired public PatternService(PatternRepository repository, - StreamSubscriptionService subscriptionService) { + SubscriptionService subscriptionService) { this.repository = repository; this.subscriptionService = subscriptionService; diff --git a/src/main/java/ru/serega6531/packmate/service/ServicesService.java b/src/main/java/ru/serega6531/packmate/service/ServicesService.java index 34e91b2..66f8142 100644 --- a/src/main/java/ru/serega6531/packmate/service/ServicesService.java +++ b/src/main/java/ru/serega6531/packmate/service/ServicesService.java @@ -22,7 +22,7 @@ import java.util.Optional; public class ServicesService { private final ServiceRepository repository; - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; private final InetAddress localIp; @@ -30,7 +30,7 @@ public class ServicesService { @Autowired public ServicesService(ServiceRepository repository, - StreamSubscriptionService subscriptionService, + SubscriptionService subscriptionService, @Value("${local-ip}") String localIpString) throws UnknownHostException { this.repository = repository; this.subscriptionService = subscriptionService; diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 9b1c2dd..19afa74 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -30,7 +30,8 @@ public class StreamService { private final StreamRepository repository; private final PatternService patternService; private final ServicesService servicesService; - private final StreamSubscriptionService subscriptionService; + private final CountingService countingService; + private final SubscriptionService subscriptionService; private final boolean ignoreEmptyPackets; @@ -40,11 +41,13 @@ public class StreamService { public StreamService(StreamRepository repository, PatternService patternService, ServicesService servicesService, - StreamSubscriptionService subscriptionService, + CountingService countingService, + SubscriptionService subscriptionService, @Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) { this.repository = repository; this.patternService = patternService; this.servicesService = servicesService; + this.countingService = countingService; this.subscriptionService = subscriptionService; this.ignoreEmptyPackets = ignoreEmptyPackets; } @@ -88,6 +91,8 @@ public class StreamService { stream.setEndTimestamp(packets.get(packets.size() - 1).getTimestamp()); stream.setService(service.getPort()); + countingService.countStream(service.getPort(), packets.size()); + new StreamOptimizer(service, packets).optimizeStream(); processUserAgent(packets, stream); diff --git a/src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java similarity index 88% rename from src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java rename to src/main/java/ru/serega6531/packmate/service/SubscriptionService.java index 218d810..207a978 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java +++ b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java @@ -18,14 +18,14 @@ import java.util.Objects; @Service @Slf4j -public class StreamSubscriptionService { +public class SubscriptionService { private final List subscribers = Collections.synchronizedList(new ArrayList<>()); private final ObjectMapper mapper; @Autowired - public StreamSubscriptionService(ObjectMapper mapper) { + public SubscriptionService(ObjectMapper mapper) { this.mapper = mapper; } @@ -36,7 +36,7 @@ public class StreamSubscriptionService { public void removeSubscriber(WebSocketSession session) { subscribers.remove(session); - log.info("User unsubscribed {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); + log.info("User unsubscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); } void broadcast(SubscriptionMessage message) {