From b3cc0de450c7efb558e5bf51074ea160c80eb7ba Mon Sep 17 00:00:00 2001 From: serega6531 Date: Tue, 17 Mar 2020 01:21:40 +0300 Subject: [PATCH 1/2] =?UTF-8?q?=D0=97=D0=B0=D0=B3=D0=BE=D1=82=D0=BE=D0=B2?= =?UTF-8?q?=D0=BA=D0=B0=20=D0=BF=D0=BE=D0=B4=20=D0=BE=D1=82=D0=BF=D1=80?= =?UTF-8?q?=D0=B0=D0=B2=D0=BA=D1=83=20=D1=81=D1=87=D0=B5=D1=82=D1=87=D0=B8?= =?UTF-8?q?=D0=BA=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../serega6531/packmate/WebSocketHandler.java | 6 ++-- .../packmate/model/pojo/Counter.java | 18 ++++++++++ .../packmate/service/CountingService.java | 36 +++++++++++++++++++ .../packmate/service/PatternService.java | 4 +-- .../packmate/service/ServicesService.java | 4 +-- .../packmate/service/StreamService.java | 9 +++-- ...nService.java => SubscriptionService.java} | 4 +-- 7 files changed, 70 insertions(+), 11 deletions(-) create mode 100644 src/main/java/ru/serega6531/packmate/model/pojo/Counter.java create mode 100644 src/main/java/ru/serega6531/packmate/service/CountingService.java rename src/main/java/ru/serega6531/packmate/service/{StreamSubscriptionService.java => SubscriptionService.java} (94%) diff --git a/src/main/java/ru/serega6531/packmate/WebSocketHandler.java b/src/main/java/ru/serega6531/packmate/WebSocketHandler.java index f2c93e9..47d362a 100644 --- a/src/main/java/ru/serega6531/packmate/WebSocketHandler.java +++ b/src/main/java/ru/serega6531/packmate/WebSocketHandler.java @@ -5,15 +5,15 @@ import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; -import ru.serega6531.packmate.service.StreamSubscriptionService; +import ru.serega6531.packmate.service.SubscriptionService; @Component public class WebSocketHandler extends TextWebSocketHandler { - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; @Autowired - public WebSocketHandler(StreamSubscriptionService subscriptionService) { + public WebSocketHandler(SubscriptionService subscriptionService) { this.subscriptionService = subscriptionService; } diff --git a/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java b/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java new file mode 100644 index 0000000..4b601da --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java @@ -0,0 +1,18 @@ +package ru.serega6531.packmate.model.pojo; + +import lombok.Getter; + +@Getter +public class Counter { + + private int value = 0; + + public void increment() { + value++; + } + + public void increment(int num) { + value += num; + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/CountingService.java b/src/main/java/ru/serega6531/packmate/service/CountingService.java new file mode 100644 index 0000000..f899e92 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/CountingService.java @@ -0,0 +1,36 @@ +package ru.serega6531.packmate.service; + +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import ru.serega6531.packmate.model.pojo.Counter; + +import java.util.HashMap; +import java.util.Map; + +@Service +public class CountingService { + + private Map servicesPackets = new HashMap<>(); + private Map servicesStreams = new HashMap<>(); + + private Counter totalPackets = new Counter(); + private Counter totalStreams = new Counter(); + + void countStream(int serviceId, int packets) { + getCounter(servicesPackets, serviceId).increment(packets); + getCounter(servicesStreams, serviceId).increment(); + + totalPackets.increment(packets); + totalStreams.increment(); + } + + @Scheduled(cron = "0 * * ? * *") + public void sendCounters() { + //TODO + } + + private Counter getCounter(Map counters, int serviceId) { + return counters.computeIfAbsent(serviceId, c -> new Counter()); + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/PatternService.java b/src/main/java/ru/serega6531/packmate/service/PatternService.java index 442e430..46974b9 100644 --- a/src/main/java/ru/serega6531/packmate/service/PatternService.java +++ b/src/main/java/ru/serega6531/packmate/service/PatternService.java @@ -22,13 +22,13 @@ import java.util.stream.Collectors; public class PatternService { private final PatternRepository repository; - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; private final Map patterns = new HashMap<>(); @Autowired public PatternService(PatternRepository repository, - StreamSubscriptionService subscriptionService) { + SubscriptionService subscriptionService) { this.repository = repository; this.subscriptionService = subscriptionService; diff --git a/src/main/java/ru/serega6531/packmate/service/ServicesService.java b/src/main/java/ru/serega6531/packmate/service/ServicesService.java index 34e91b2..66f8142 100644 --- a/src/main/java/ru/serega6531/packmate/service/ServicesService.java +++ b/src/main/java/ru/serega6531/packmate/service/ServicesService.java @@ -22,7 +22,7 @@ import java.util.Optional; public class ServicesService { private final ServiceRepository repository; - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; private final InetAddress localIp; @@ -30,7 +30,7 @@ public class ServicesService { @Autowired public ServicesService(ServiceRepository repository, - StreamSubscriptionService subscriptionService, + SubscriptionService subscriptionService, @Value("${local-ip}") String localIpString) throws UnknownHostException { this.repository = repository; this.subscriptionService = subscriptionService; diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 9b1c2dd..19afa74 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -30,7 +30,8 @@ public class StreamService { private final StreamRepository repository; private final PatternService patternService; private final ServicesService servicesService; - private final StreamSubscriptionService subscriptionService; + private final CountingService countingService; + private final SubscriptionService subscriptionService; private final boolean ignoreEmptyPackets; @@ -40,11 +41,13 @@ public class StreamService { public StreamService(StreamRepository repository, PatternService patternService, ServicesService servicesService, - StreamSubscriptionService subscriptionService, + CountingService countingService, + SubscriptionService subscriptionService, @Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) { this.repository = repository; this.patternService = patternService; this.servicesService = servicesService; + this.countingService = countingService; this.subscriptionService = subscriptionService; this.ignoreEmptyPackets = ignoreEmptyPackets; } @@ -88,6 +91,8 @@ public class StreamService { stream.setEndTimestamp(packets.get(packets.size() - 1).getTimestamp()); stream.setService(service.getPort()); + countingService.countStream(service.getPort(), packets.size()); + new StreamOptimizer(service, packets).optimizeStream(); processUserAgent(packets, stream); diff --git a/src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java similarity index 94% rename from src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java rename to src/main/java/ru/serega6531/packmate/service/SubscriptionService.java index 218d810..f140f78 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java +++ b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java @@ -18,14 +18,14 @@ import java.util.Objects; @Service @Slf4j -public class StreamSubscriptionService { +public class SubscriptionService { private final List subscribers = Collections.synchronizedList(new ArrayList<>()); private final ObjectMapper mapper; @Autowired - public StreamSubscriptionService(ObjectMapper mapper) { + public SubscriptionService(ObjectMapper mapper) { this.mapper = mapper; } From 2611c7685e55fdc6129b24c8373e85dfa97f8fa4 Mon Sep 17 00:00:00 2001 From: serega6531 Date: Tue, 17 Mar 2020 15:40:17 +0300 Subject: [PATCH 2/2] =?UTF-8?q?=D0=9E=D1=82=D0=BF=D1=80=D0=B0=D0=B2=D0=BA?= =?UTF-8?q?=D0=B0=20=D1=81=D1=87=D0=B5=D1=82=D1=87=D0=B8=D0=BA=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/enums/SubscriptionMessageType.java | 2 +- .../packmate/model/pojo/CountersHolder.java | 23 +++++++++++++++ .../packmate/service/CountingService.java | 29 ++++++++++++++++++- .../packmate/service/SubscriptionService.java | 2 +- 4 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java diff --git a/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java b/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java index af46d09..1d44fda 100644 --- a/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java +++ b/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java @@ -1,5 +1,5 @@ package ru.serega6531.packmate.model.enums; public enum SubscriptionMessageType { - SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, DELETE_PATTERN, NEW_STREAM + SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, DELETE_PATTERN, NEW_STREAM, COUNTERS_UPDATE } diff --git a/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java b/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java new file mode 100644 index 0000000..b39bb8e --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java @@ -0,0 +1,23 @@ +package ru.serega6531.packmate.model.pojo; + +import lombok.Getter; + +import java.util.Map; + +@Getter +public class CountersHolder { + + private Map servicesPackets; + private Map servicesStreams; + + private int totalPackets; + private int totalStreams; + + public CountersHolder(Map servicesPackets, Map servicesStreams, + int totalPackets, int totalStreams) { + this.servicesPackets = servicesPackets; + this.servicesStreams = servicesStreams; + this.totalPackets = totalPackets; + this.totalStreams = totalStreams; + } +} diff --git a/src/main/java/ru/serega6531/packmate/service/CountingService.java b/src/main/java/ru/serega6531/packmate/service/CountingService.java index f899e92..a23f8f1 100644 --- a/src/main/java/ru/serega6531/packmate/service/CountingService.java +++ b/src/main/java/ru/serega6531/packmate/service/CountingService.java @@ -1,21 +1,33 @@ package ru.serega6531.packmate.service; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import ru.serega6531.packmate.model.enums.SubscriptionMessageType; import ru.serega6531.packmate.model.pojo.Counter; +import ru.serega6531.packmate.model.pojo.CountersHolder; +import ru.serega6531.packmate.model.pojo.SubscriptionMessage; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; @Service public class CountingService { + private final SubscriptionService subscriptionService; + private Map servicesPackets = new HashMap<>(); private Map servicesStreams = new HashMap<>(); private Counter totalPackets = new Counter(); private Counter totalStreams = new Counter(); + @Autowired + public CountingService(SubscriptionService subscriptionService) { + this.subscriptionService = subscriptionService; + } + void countStream(int serviceId, int packets) { getCounter(servicesPackets, serviceId).increment(packets); getCounter(servicesStreams, serviceId).increment(); @@ -26,7 +38,22 @@ public class CountingService { @Scheduled(cron = "0 * * ? * *") public void sendCounters() { - //TODO + subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.COUNTERS_UPDATE, + new CountersHolder( + toIntegerMap(servicesPackets), toIntegerMap(servicesStreams), + totalPackets.getValue(), totalStreams.getValue()))); + + servicesPackets.clear(); + servicesStreams.clear(); + totalPackets = new Counter(); + totalStreams = new Counter(); + } + + private Map toIntegerMap(Map map) { + return map.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + ent -> ent.getValue().getValue())); } private Counter getCounter(Map counters, int serviceId) { diff --git a/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java index f140f78..207a978 100644 --- a/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java +++ b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java @@ -36,7 +36,7 @@ public class SubscriptionService { public void removeSubscriber(WebSocketSession session) { subscribers.remove(session); - log.info("User unsubscribed {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); + log.info("User unsubscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); } void broadcast(SubscriptionMessage message) {