diff --git a/Dockerfile_app b/Dockerfile_app deleted file mode 100644 index 7bda5bc..0000000 --- a/Dockerfile_app +++ /dev/null @@ -1,20 +0,0 @@ -FROM openjdk:13-jdk-alpine - -RUN apk add libpcap npm - -COPY ./ /app/ - -WORKDIR /app/frontend/ - -RUN npm install && npm run build - -RUN mkdir -p ../src/main/resources/static/ \ - && cp -rf ./dist/* ../src/main/resources/static/ - -WORKDIR /app/ - -RUN ./gradlew --no-daemon build - -RUN cp build/libs/packmate-*.jar app.jar - -EXPOSE 65000:65000 diff --git a/README.md b/README.md index bc2433d..e03a52f 100644 --- a/README.md +++ b/README.md @@ -19,15 +19,16 @@ * Конкатенирует смежные пакеты * Автоматически проводит urldecode * Разархивирует GZIP в HTTP на лету +* Разархивирует сжатые WebSockets ![Скриншот главного окна](screenshots/Screenshot.png) ## Клонирование Поскольку этот репозиторий содержит фронтенд как git submodule, его необходимо клонировать так: ```bash -git clone --recurse-submodules https://gitlab.com/binarybears_ctf/Packmate.git +git clone --recurse-submodules https://gitlab.com/packmate/Packmate.git # Или, на старых версиях git -git clone --recursive https://gitlab.com/binarybears_ctf/Packmate.git +git clone --recursive https://gitlab.com/packmate/Packmate.git ``` Если репозиторий уже был склонирован без подмодулей, необходимо выполнить: diff --git a/README_EN.md b/README_EN.md index 8838e07..5b22879 100644 --- a/README_EN.md +++ b/README_EN.md @@ -14,20 +14,21 @@ Advanced network traffic flow analyzer for A/D CTFs. * Binary substring * Can make certain streams favorite and show only favorite streams * Supports several simultaneous services, can show streams for a specific service or pattern -* Allows to navigate streams using shortcuts +* Allows navigating streams using shortcuts * Has the option to copy packet content in the required format * Can concatenate adjacent packets * Can urldecode text automatically -* Can automatically decode GZIPed HTTP +* Can automatically decompress GZIPed HTTP +* Can automatically deflate WebSockets with permessages-deflate extension ![Main window](screenshots/Screenshot.png) ## Cloning -As this repository contains frontend part as git submodule, it has to be cloned like this: +As this repository contains frontend part as a git submodule, it has to be cloned like this: ```bash -git clone --recurse-submodules https://gitlab.com/binarybears_ctf/Packmate.git +git clone --recurse-submodules https://gitlab.com/packmate/Packmate.git # Or if you have older git -git clone --recursive https://gitlab.com/binarybears_ctf/Packmate.git +git clone --recursive https://gitlab.com/packmate/Packmate.git ``` If the repository was already cloned without submodule, just run: @@ -39,23 +40,23 @@ git submodule update --init --recursive ## Preparation This program uses Docker and docker-compose. -`packmate-db` will listen port 65001 at localhost. -Database files do not mount as volume, so upon container recreation all data will be lost. +`packmate-db` will listen to port 65001 at localhost. +Database files do not mount as volume, so upon container recreation, all data will be lost. ### Settings -This program retreives settings from environment variables, -so it would be convenient to create env file; +This program retrieves settings from environment variables, +so it would be convenient to create an env file; It must be called `.env` and located at the root of the project. Contents of the file: ```bash # Interface to capture on PACKMATE_INTERFACE=wlan0 -# Local ip on said interface to tell incoming packets from outgoing +# Local IP on said interface to tell incoming packets from outgoing PACKMATE_LOCAL_IP=192.168.1.124 -# Username for web interface +# Username for the web interface PACKMATE_WEB_LOGIN=SomeUser -# Password for web interface +# Password for the web interface PACKMATE_WEB_PASSWORD=SomeSecurePassword ``` @@ -67,8 +68,8 @@ sudo docker-compose up --build -d If everything went fine, Packmate will be available on port `65000` from any host -### Accessing web interface -When you open web interface for the first time, you will be asked for login and password +### Accessing the web interface +When you open a web interface for the first time, you will be asked for a login and password you specified in the env file. After entering the credentials, open the settings by clicking on the cogs in the top right corner and enter login and password again. @@ -76,18 +77,18 @@ in the top right corner and enter login and password again. ![Settings](screenshots/Screenshot_Settings.png) All settings are saved in the local storage and will be -lost only upon changing server ip or port. +lost only upon changing server IP or port. ## Usage -First of all you should create game services. +First of all, you should create game services. To do that click `+` in the navbar, -then fill in service name, port and optimization to perform. +then fill in the service name, port, and optimization to perform. -System will start automatically capture streams and show them in a sidebar. -Click at stream to view a list of packets; +The system will start automatically capture streams and show them in a sidebar. +Click at a stream to view a list of packets; you can click a button in the sidebar to switch between binary and text views. -For a simple monitoring of flags there is a system of patterns. +For simple monitoring of flags, there is a system of patterns. To create a pattern open `Patterns` dropdown menu, press `+`, then specify the type of pattern, the pattern itself, highlight color and other things. diff --git a/build.gradle b/build.gradle index 17babb3..586f0c6 100644 --- a/build.gradle +++ b/build.gradle @@ -19,6 +19,9 @@ configurations { repositories { mavenCentral() + + // удалить после выхода стабильной версии Java-WebSocket + maven { url "https://oss.sonatype.org/content/repositories/snapshots" } } dependencies { @@ -32,6 +35,7 @@ dependencies { compile 'org.pcap4j:pcap4j-core:1.8.2' compile 'org.pcap4j:pcap4j-packetfactory-static:1.8.2' compile group: 'com.google.guava', name: 'guava', version: '28.2-jre' + compile group: 'org.java-websocket', name: 'Java-WebSocket', version: '1.5.0-SNAPSHOT' compileOnly 'org.projectlombok:lombok' runtimeOnly 'org.springframework.boot:spring-boot-devtools' runtimeOnly 'org.postgresql:postgresql' diff --git a/docker-compose.yml b/docker-compose.yml index 4b279b5..c969e43 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: container_name: packmate-app build: context: . - dockerfile: Dockerfile_app + dockerfile: docker/Dockerfile_app network_mode: "host" image: packmate-app:v1 command: [ @@ -31,7 +31,7 @@ services: container_name: packmate-db build: context: . - dockerfile: Dockerfile_db + dockerfile: docker/Dockerfile_db args: POSTGRES_USER: ${PACKMATE_DB_USER:-packmate} POSTGRES_PASSWORD: ${PACKMATE_DB_PASSWORD:-K604YnL3G1hp2RDkCZNjGpxbyNpNHTRb} diff --git a/docker/Dockerfile_app b/docker/Dockerfile_app new file mode 100644 index 0000000..18bc01d --- /dev/null +++ b/docker/Dockerfile_app @@ -0,0 +1,20 @@ +FROM openjdk:15-ea-jdk-alpine + +RUN apk --no-cache add libpcap npm + +COPY ./ /app/ + +WORKDIR /app/frontend/ + +RUN npm install && npm run build && npm cache clean --force \ + && mkdir -p ../src/main/resources/static/ \ + && mv ./dist/* ../src/main/resources/static/ \ + && rm -rf node_modules + +WORKDIR /app/ + +RUN ./gradlew --no-daemon --no-build-cache build \ + && cp build/libs/packmate-*.jar app.jar \ + && ./gradlew --no-daemon clean + +EXPOSE 65000:65000 diff --git a/Dockerfile_db b/docker/Dockerfile_db similarity index 62% rename from Dockerfile_db rename to docker/Dockerfile_db index e45f946..00b4865 100644 --- a/Dockerfile_db +++ b/docker/Dockerfile_db @@ -8,7 +8,7 @@ ENV POSTGRES_USER ${POSTGRES_USER} ENV POSTGRES_PASSWORD ${POSTGRES_PASSWORD} ENV POSTGRES_DB ${POSTGRES_DB} -COPY postgresql.conf /tmp/postgresql.conf -COPY update_db_config.sh /docker-entrypoint-initdb.d/_update_db_config.sh +COPY docker/postgresql.conf /tmp/postgresql.conf +COPY docker/update_db_config.sh /docker-entrypoint-initdb.d/_update_db_config.sh EXPOSE 65001:65001 diff --git a/postgresql.conf b/docker/postgresql.conf similarity index 100% rename from postgresql.conf rename to docker/postgresql.conf diff --git a/update_db_config.sh b/docker/update_db_config.sh similarity index 100% rename from update_db_config.sh rename to docker/update_db_config.sh diff --git a/frontend b/frontend index 6883d55..4e6a685 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 6883d5566bf61d837d06f5e8eed1834c1ddc4218 +Subproject commit 4e6a685e698ad764a69a0b5ce182b60bf130f3b5 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a2bf131..a4b4429 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/java/ru/serega6531/packmate/PcapWorker.java b/src/main/java/ru/serega6531/packmate/PcapWorker.java index 5a981ca..e19744d 100644 --- a/src/main/java/ru/serega6531/packmate/PcapWorker.java +++ b/src/main/java/ru/serega6531/packmate/PcapWorker.java @@ -140,7 +140,7 @@ public class PcapWorker implements PacketListener { UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.TCP); if (log.isDebugEnabled()) { - log.debug("tcp {} {}:{} -> {}:{}, номер пакета {}", + log.debug("tcp {} {}:{} -> {}:{}, packet number {}", serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort, unfinishedTcpStreams.get(stream).size()); } @@ -179,7 +179,7 @@ public class PcapWorker implements PacketListener { UnfinishedStream stream = addNewPacket(sourceIp, destIp, time, sourcePort, destPort, ttl, content, Protocol.UDP); if (log.isDebugEnabled()) { - log.debug("udp {} {}:{} -> {}:{}, номер пакета {}", + log.debug("udp {} {}:{} -> {}:{}, packet number {}", serviceOptional.get(), sourceIpString, sourcePort, destIpString, destPort, unfinishedUdpStreams.get(stream).size()); } @@ -207,7 +207,7 @@ public class PcapWorker implements PacketListener { final var streams = (protocol == Protocol.TCP) ? this.unfinishedTcpStreams : this.unfinishedUdpStreams; if (!streams.containsKey(stream)) { - log.debug("Начат новый стрим"); + log.debug("New stream started"); } streams.put(stream, packet); diff --git a/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java b/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java index b924b68..68acaab 100644 --- a/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java +++ b/src/main/java/ru/serega6531/packmate/TimeoutStreamsSaver.java @@ -30,12 +30,12 @@ public class TimeoutStreamsSaver { public void saveStreams() { int streamsClosed = pcapWorker.closeTimeoutStreams(Protocol.UDP, udpStreamTimeoutMillis); if (streamsClosed > 0) { - log.info("Закрыто {} udp стримов", streamsClosed); + log.info("{} udp streams closed", streamsClosed); } streamsClosed = pcapWorker.closeTimeoutStreams(Protocol.TCP, tcpStreamTimeoutMillis); if (streamsClosed > 0) { - log.info("Закрыто {} tcp стримов", streamsClosed); + log.info("{} tcp streams closed", streamsClosed); } } diff --git a/src/main/java/ru/serega6531/packmate/WebSocketHandler.java b/src/main/java/ru/serega6531/packmate/WebSocketHandler.java index f2c93e9..47d362a 100644 --- a/src/main/java/ru/serega6531/packmate/WebSocketHandler.java +++ b/src/main/java/ru/serega6531/packmate/WebSocketHandler.java @@ -5,15 +5,15 @@ import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; -import ru.serega6531.packmate.service.StreamSubscriptionService; +import ru.serega6531.packmate.service.SubscriptionService; @Component public class WebSocketHandler extends TextWebSocketHandler { - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; @Autowired - public WebSocketHandler(StreamSubscriptionService subscriptionService) { + public WebSocketHandler(SubscriptionService subscriptionService) { this.subscriptionService = subscriptionService; } diff --git a/src/main/java/ru/serega6531/packmate/model/CtfService.java b/src/main/java/ru/serega6531/packmate/model/CtfService.java index 01ddf6e..58bef42 100644 --- a/src/main/java/ru/serega6531/packmate/model/CtfService.java +++ b/src/main/java/ru/serega6531/packmate/model/CtfService.java @@ -22,4 +22,6 @@ public class CtfService { private boolean mergeAdjacentPackets; + private boolean inflateWebSockets; + } \ No newline at end of file diff --git a/src/main/java/ru/serega6531/packmate/model/Packet.java b/src/main/java/ru/serega6531/packmate/model/Packet.java index cc60bd2..b0a6f7b 100644 --- a/src/main/java/ru/serega6531/packmate/model/Packet.java +++ b/src/main/java/ru/serega6531/packmate/model/Packet.java @@ -52,6 +52,18 @@ public class Packet { private boolean ungzipped; + private boolean webSocketInflated; + private byte[] content; + @Transient + @JsonIgnore + public String getContentString() { + return new String(content); + } + + public String toString() { + return "Packet(id=" + id + ", content=" + getContentString() + ")"; + } + } diff --git a/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java b/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java index 89ce895..b80cb1c 100644 --- a/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java +++ b/src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java @@ -1,5 +1,5 @@ package ru.serega6531.packmate.model.enums; public enum SubscriptionMessageType { - SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, ENABLE_PATTERN, DISABLE_PATTERN, NEW_STREAM + SAVE_SERVICE, SAVE_PATTERN, DELETE_SERVICE, ENABLE_PATTERN, DISABLE_PATTERN, NEW_STREAM, COUNTERS_UPDATE } diff --git a/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java b/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java new file mode 100644 index 0000000..4b601da --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/model/pojo/Counter.java @@ -0,0 +1,18 @@ +package ru.serega6531.packmate.model.pojo; + +import lombok.Getter; + +@Getter +public class Counter { + + private int value = 0; + + public void increment() { + value++; + } + + public void increment(int num) { + value += num; + } + +} diff --git a/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java b/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java new file mode 100644 index 0000000..b39bb8e --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/model/pojo/CountersHolder.java @@ -0,0 +1,23 @@ +package ru.serega6531.packmate.model.pojo; + +import lombok.Getter; + +import java.util.Map; + +@Getter +public class CountersHolder { + + private Map servicesPackets; + private Map servicesStreams; + + private int totalPackets; + private int totalStreams; + + public CountersHolder(Map servicesPackets, Map servicesStreams, + int totalPackets, int totalStreams) { + this.servicesPackets = servicesPackets; + this.servicesStreams = servicesStreams; + this.totalPackets = totalPackets; + this.totalStreams = totalStreams; + } +} diff --git a/src/main/java/ru/serega6531/packmate/service/CountingService.java b/src/main/java/ru/serega6531/packmate/service/CountingService.java new file mode 100644 index 0000000..a23f8f1 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/CountingService.java @@ -0,0 +1,63 @@ +package ru.serega6531.packmate.service; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import ru.serega6531.packmate.model.enums.SubscriptionMessageType; +import ru.serega6531.packmate.model.pojo.Counter; +import ru.serega6531.packmate.model.pojo.CountersHolder; +import ru.serega6531.packmate.model.pojo.SubscriptionMessage; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +@Service +public class CountingService { + + private final SubscriptionService subscriptionService; + + private Map servicesPackets = new HashMap<>(); + private Map servicesStreams = new HashMap<>(); + + private Counter totalPackets = new Counter(); + private Counter totalStreams = new Counter(); + + @Autowired + public CountingService(SubscriptionService subscriptionService) { + this.subscriptionService = subscriptionService; + } + + void countStream(int serviceId, int packets) { + getCounter(servicesPackets, serviceId).increment(packets); + getCounter(servicesStreams, serviceId).increment(); + + totalPackets.increment(packets); + totalStreams.increment(); + } + + @Scheduled(cron = "0 * * ? * *") + public void sendCounters() { + subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.COUNTERS_UPDATE, + new CountersHolder( + toIntegerMap(servicesPackets), toIntegerMap(servicesStreams), + totalPackets.getValue(), totalStreams.getValue()))); + + servicesPackets.clear(); + servicesStreams.clear(); + totalPackets = new Counter(); + totalStreams = new Counter(); + } + + private Map toIntegerMap(Map map) { + return map.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + ent -> ent.getValue().getValue())); + } + + private Counter getCounter(Map counters, int serviceId) { + return counters.computeIfAbsent(serviceId, c -> new Counter()); + } + +} diff --git a/src/main/java/ru/serega6531/packmate/service/PatternService.java b/src/main/java/ru/serega6531/packmate/service/PatternService.java index aa2bec4..6063521 100644 --- a/src/main/java/ru/serega6531/packmate/service/PatternService.java +++ b/src/main/java/ru/serega6531/packmate/service/PatternService.java @@ -20,13 +20,13 @@ import java.util.stream.Collectors; public class PatternService { private final PatternRepository repository; - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; private final Map patterns = new HashMap<>(); @Autowired public PatternService(PatternRepository repository, - StreamSubscriptionService subscriptionService) { + SubscriptionService subscriptionService) { this.repository = repository; this.subscriptionService = subscriptionService; @@ -56,7 +56,7 @@ public class PatternService { pattern.setEnabled(enabled); repository.save(pattern); - if(enabled) { + if (enabled) { log.info("Включен паттерн {} со значением {}", pattern.getName(), pattern.getValue()); subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.ENABLE_PATTERN, id)); } else { @@ -77,7 +77,7 @@ public class PatternService { final Pattern saved = repository.save(pattern); patterns.put(saved.getId(), saved); - log.info("Добавлен новый паттерн {} со значением {}", pattern.getName(), pattern.getValue()); + log.info("Added new pattern {} with value {}", pattern.getName(), pattern.getValue()); subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.SAVE_PATTERN, saved)); return saved; } diff --git a/src/main/java/ru/serega6531/packmate/service/ServicesService.java b/src/main/java/ru/serega6531/packmate/service/ServicesService.java index ee93035..66f8142 100644 --- a/src/main/java/ru/serega6531/packmate/service/ServicesService.java +++ b/src/main/java/ru/serega6531/packmate/service/ServicesService.java @@ -22,7 +22,7 @@ import java.util.Optional; public class ServicesService { private final ServiceRepository repository; - private final StreamSubscriptionService subscriptionService; + private final SubscriptionService subscriptionService; private final InetAddress localIp; @@ -30,7 +30,7 @@ public class ServicesService { @Autowired public ServicesService(ServiceRepository repository, - StreamSubscriptionService subscriptionService, + SubscriptionService subscriptionService, @Value("${local-ip}") String localIpString) throws UnknownHostException { this.repository = repository; this.subscriptionService = subscriptionService; @@ -59,14 +59,14 @@ public class ServicesService { } public void deleteByPort(int port) { - log.info("Удален сервис на порту {}", port); + log.info("Removed service at port {}", port); services.remove(port); repository.deleteById(port); subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.DELETE_SERVICE, port)); } public CtfService save(CtfService service) { - log.info("Добавлен или изменен сервис {} на порту {}", service.getName(), service.getPort()); + log.info("Added or edited service {} at port {}", service.getName(), service.getPort()); final CtfService saved = repository.save(service); services.put(saved.getPort(), saved); subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.SAVE_SERVICE, saved)); diff --git a/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java b/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java index 3374555..7050fc8 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamOptimizer.java @@ -24,18 +24,22 @@ import java.util.zip.ZipException; public class StreamOptimizer { private final CtfService service; - private final List packets; + private List packets; private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08}; /** * Вызвать для выполнения оптимизаций на переданном списке пакетов. */ - public void optimizeStream() { + public List optimizeStream() { if (service.isUngzipHttp()) { unpackGzip(); } + if (service.isInflateWebSockets()) { + inflateWebSocket(); + } + if (service.isUrldecodeHttpRequests()) { urldecodeRequests(); } @@ -43,6 +47,8 @@ public class StreamOptimizer { if (service.isMergeAdjacentPackets()) { mergeAdjacentPackets(); } + + return packets; } /** @@ -83,6 +89,7 @@ public class StreamOptimizer { final List cut = packets.subList(start, end); final long timestamp = cut.get(0).getTimestamp(); final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped); + final boolean webSocketInflated = cut.stream().anyMatch(Packet::isWebSocketInflated); boolean incoming = cut.get(0).isIncoming(); //noinspection OptionalGetWithoutIsPresent final byte[] content = cut.stream() @@ -95,6 +102,7 @@ public class StreamOptimizer { .incoming(incoming) .timestamp(timestamp) .ungzipped(ungzipped) + .webSocketInflated(webSocketInflated) .content(content) .build()); } @@ -108,7 +116,7 @@ public class StreamOptimizer { for (Packet packet : packets) { if (packet.isIncoming()) { - String content = new String(packet.getContent()); + String content = packet.getContentString(); if (content.contains("HTTP/")) { httpStarted = true; } @@ -145,7 +153,7 @@ public class StreamOptimizer { i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок } } else if (!packet.isIncoming()) { - String content = new String(packet.getContent()); + String content = packet.getContentString(); int contentPos = content.indexOf("\r\n\r\n"); boolean http = content.startsWith("HTTP/"); @@ -209,16 +217,17 @@ public class StreamOptimizer { IOUtils.copy(gzipStream, out); byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray()); - log.debug("Разархивирован gzip: {} -> {} байт", gzipBytes.length, out.size()); + log.debug("GZIP decompressed: {} -> {} bytes", gzipBytes.length, out.size()); return Packet.builder() .incoming(false) .timestamp(cut.get(0).getTimestamp()) .ungzipped(true) + .webSocketInflated(false) .content(newContent) .build(); } catch (ZipException e) { - log.warn("Не удалось разархивировать gzip, оставляем как есть", e); + log.warn("Failed to decompress gzip, leaving as it is", e); } catch (IOException e) { log.error("decompress gzip", e); } @@ -226,4 +235,17 @@ public class StreamOptimizer { return null; } + private void inflateWebSocket() { + if (!packets.get(0).getContentString().contains("HTTP/")) { + return; + } + + final WebSocketsParser parser = new WebSocketsParser(packets); + if(!parser.isParsed()) { + return; + } + + packets = parser.getParsedPackets(); + } + } diff --git a/src/main/java/ru/serega6531/packmate/service/StreamService.java b/src/main/java/ru/serega6531/packmate/service/StreamService.java index 96b2e41..8d0fc25 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamService.java +++ b/src/main/java/ru/serega6531/packmate/service/StreamService.java @@ -30,7 +30,8 @@ public class StreamService { private final StreamRepository repository; private final PatternService patternService; private final ServicesService servicesService; - private final StreamSubscriptionService subscriptionService; + private final CountingService countingService; + private final SubscriptionService subscriptionService; private final boolean ignoreEmptyPackets; @@ -40,11 +41,13 @@ public class StreamService { public StreamService(StreamRepository repository, PatternService patternService, ServicesService servicesService, - StreamSubscriptionService subscriptionService, + CountingService countingService, + SubscriptionService subscriptionService, @Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) { this.repository = repository; this.patternService = patternService; this.servicesService = servicesService; + this.countingService = countingService; this.subscriptionService = subscriptionService; this.ignoreEmptyPackets = ignoreEmptyPackets; } @@ -62,7 +65,7 @@ public class StreamService { ); if (serviceOptional.isEmpty()) { - log.warn("Не удалось сохранить стрим: сервиса на порту {} или {} не существует", + log.warn("Failed to save the stream: service at port {} or {} does not exist", unfinishedStream.getFirstPort(), unfinishedStream.getSecondPort()); return false; } @@ -72,7 +75,7 @@ public class StreamService { packets.removeIf(packet -> packet.getContent().length == 0); if (packets.isEmpty()) { - log.debug("Стрим состоит только из пустых пакетов и не будет сохранен"); + log.debug("Stream consists only of empty packets and will not be saved"); return false; } } @@ -88,7 +91,9 @@ public class StreamService { stream.setEndTimestamp(packets.get(packets.size() - 1).getTimestamp()); stream.setService(service.getPort()); - new StreamOptimizer(service, packets).optimizeStream(); + countingService.countStream(service.getPort(), packets.size()); + + packets = new StreamOptimizer(service, packets).optimizeStream(); processUserAgent(packets, stream); Stream savedStream = save(stream); @@ -105,7 +110,7 @@ public class StreamService { private void processUserAgent(List packets, Stream stream) { String ua = null; for (Packet packet : packets) { - String content = new String(packet.getContent()); + String content = packet.getContentString(); final Matcher matcher = userAgentPattern.matcher(content); if (matcher.find()) { ua = matcher.group(1); @@ -149,7 +154,7 @@ public class StreamService { Stream saved; if (stream.getId() == null) { saved = repository.save(stream); - log.debug("Создан стрим с id {}", saved.getId()); + log.debug("Saved stream with id {}", saved.getId()); } else { saved = repository.save(stream); } diff --git a/src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java similarity index 81% rename from src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java rename to src/main/java/ru/serega6531/packmate/service/SubscriptionService.java index 94db5b5..207a978 100644 --- a/src/main/java/ru/serega6531/packmate/service/StreamSubscriptionService.java +++ b/src/main/java/ru/serega6531/packmate/service/SubscriptionService.java @@ -18,25 +18,25 @@ import java.util.Objects; @Service @Slf4j -public class StreamSubscriptionService { +public class SubscriptionService { private final List subscribers = Collections.synchronizedList(new ArrayList<>()); private final ObjectMapper mapper; @Autowired - public StreamSubscriptionService(ObjectMapper mapper) { + public SubscriptionService(ObjectMapper mapper) { this.mapper = mapper; } public void addSubscriber(WebSocketSession session) { subscribers.add(session); - log.info("Подписан пользователь {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); + log.info("User subscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); } public void removeSubscriber(WebSocketSession session) { subscribers.remove(session); - log.info("Отписан пользователь {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); + log.info("User unsubscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName()); } void broadcast(SubscriptionMessage message) { diff --git a/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java b/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java new file mode 100644 index 0000000..0486001 --- /dev/null +++ b/src/main/java/ru/serega6531/packmate/service/WebSocketsParser.java @@ -0,0 +1,254 @@ +package ru.serega6531.packmate.service; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.java_websocket.drafts.Draft_6455; +import org.java_websocket.exceptions.InvalidDataException; +import org.java_websocket.exceptions.InvalidHandshakeException; +import org.java_websocket.extensions.permessage_deflate.PerMessageDeflateExtension; +import org.java_websocket.framing.DataFrame; +import org.java_websocket.framing.Framedata; +import org.java_websocket.handshake.HandshakeImpl1Client; +import org.java_websocket.handshake.HandshakeImpl1Server; +import ru.serega6531.packmate.model.Packet; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +@Slf4j +public class WebSocketsParser { + + private static final java.util.regex.Pattern WEBSOCKET_KEY_PATTERN = + java.util.regex.Pattern.compile("Sec-WebSocket-Key: (.+)\\r\\n"); + private static final java.util.regex.Pattern WEBSOCKET_EXTENSIONS_PATTERN = + java.util.regex.Pattern.compile("Sec-WebSocket-Extensions?: (.+)\\r\\n"); + private static final java.util.regex.Pattern WEBSOCKET_VERSION_PATTERN = + java.util.regex.Pattern.compile("Sec-WebSocket-Version: (\\d+)\\r\\n"); + private static final java.util.regex.Pattern WEBSOCKET_ACCEPT_PATTERN = + java.util.regex.Pattern.compile("Sec-WebSocket-Accept: (.+)\\r\\n"); + + private static final String WEBSOCKET_EXTENSION_HEADER = "Sec-WebSocket-Extension: permessage-deflate"; + private static final String WEBSOCKET_EXTENSIONS_HEADER = "Sec-WebSocket-Extensions: permessage-deflate"; + private static final String WEBSOCKET_UPGRADE_HEADER = "upgrade: websocket\r\n"; + private static final String WEBSOCKET_CONNECTION_HEADER = "connection: upgrade\r\n"; + + private final List packets; + + @Getter + private boolean parsed = false; + private List parsedPackets; + + public WebSocketsParser(List packets) { + this.packets = packets; + detectWebSockets(); + } + + private void detectWebSockets() { + final List clientHandshakePackets = packets.stream() + .takeWhile(Packet::isIncoming) + .collect(Collectors.toList()); + + final String clientHandshake = getHandshake(clientHandshakePackets); + if (clientHandshake == null) { + return; + } + + int httpEnd = -1; + for (int i = clientHandshakePackets.size(); i < packets.size(); i++) { + if (packets.get(i).getContentString().endsWith("\r\n\r\n")) { + httpEnd = i + 1; + break; + } + } + + if (httpEnd == -1) { + return; + } + + final List serverHandshakePackets = packets.subList(clientHandshakePackets.size(), httpEnd); + final String serverHandshake = getHandshake(serverHandshakePackets); + if (serverHandshake == null) { + return; + } + + HandshakeImpl1Server serverHandshakeImpl = fillServerHandshake(serverHandshake); + HandshakeImpl1Client clientHandshakeImpl = fillClientHandshake(clientHandshake); + + if (serverHandshakeImpl == null || clientHandshakeImpl == null) { + return; + } + + Draft_6455 draft = new Draft_6455(new PerMessageDeflateExtension()); + + try { + draft.acceptHandshakeAsServer(clientHandshakeImpl); + draft.acceptHandshakeAsClient(clientHandshakeImpl, serverHandshakeImpl); + } catch (InvalidHandshakeException e) { + log.warn("WebSocket handshake", e); + return; + } + + final List wsPackets = packets.subList( + httpEnd, + packets.size()); + + if(wsPackets.isEmpty()) { + return; + } + + final List handshakes = packets.subList(0, httpEnd); + + parse(wsPackets, handshakes, draft); + parsed = true; + } + + private void parse(final List wsPackets, final List handshakes, Draft_6455 draft) { + List> sides = sliceToSides(wsPackets); + parsedPackets = new ArrayList<>(handshakes); + + for (List side : sides) { + final Packet lastPacket = side.get(0); + + final byte[] wsContent = side.stream() + .map(Packet::getContent) + .reduce(ArrayUtils::addAll) + .get(); + + final ByteBuffer buffer = ByteBuffer.wrap(wsContent); + List frames; + + try { + frames = draft.translateFrame(buffer); + } catch (InvalidDataException e) { + log.warn("WebSocket data", e); + return; + } + + for (Framedata frame : frames) { + if(frame instanceof DataFrame) { + parsedPackets.add(Packet.builder() + .content(frame.getPayloadData().array()) + .incoming(lastPacket.isIncoming()) + .timestamp(lastPacket.getTimestamp()) + .ttl(lastPacket.getTtl()) + .ungzipped(lastPacket.isUngzipped()) + .webSocketInflated(true) + .build() + ); + } + } + } + } + + public List getParsedPackets() { + if (!parsed) { + throw new IllegalStateException("WS is not parsed"); + } + + return parsedPackets; + } + + private List> sliceToSides(List packets) { + List> result = new ArrayList<>(); + List side = new ArrayList<>(); + boolean incoming = true; + + for (Packet packet : packets) { + if(packet.isIncoming() != incoming) { + incoming = packet.isIncoming(); + + if(!side.isEmpty()) { + result.add(side); + side = new ArrayList<>(); + } + } + + side.add(packet); + } + + if(!side.isEmpty()) { + result.add(side); + } + + return result; + } + + private String getHandshake(final List packets) { + final String handshake = packets.stream() + .map(Packet::getContent) + .reduce(ArrayUtils::addAll) + .map(String::new) + .orElse(null); + + if (handshake == null || + !handshake.toLowerCase().contains(WEBSOCKET_CONNECTION_HEADER) || + !handshake.toLowerCase().contains(WEBSOCKET_UPGRADE_HEADER)) { + return null; + } + + if (!handshake.contains(WEBSOCKET_EXTENSION_HEADER) && + !handshake.contains(WEBSOCKET_EXTENSIONS_HEADER)) { + return null; + } + + return handshake; + } + + private HandshakeImpl1Client fillClientHandshake(String clientHandshake) { + Matcher matcher = WEBSOCKET_VERSION_PATTERN.matcher(clientHandshake); + if (!matcher.find()) { + return null; + } + String version = matcher.group(1); + + matcher = WEBSOCKET_KEY_PATTERN.matcher(clientHandshake); + if (!matcher.find()) { + return null; + } + String key = matcher.group(1); + + matcher = WEBSOCKET_EXTENSIONS_PATTERN.matcher(clientHandshake); + if (!matcher.find()) { + return null; + } + String extensions = matcher.group(1); + + HandshakeImpl1Client clientHandshakeImpl = new HandshakeImpl1Client(); + + clientHandshakeImpl.put("Upgrade", "websocket"); + clientHandshakeImpl.put("Connection", "Upgrade"); + clientHandshakeImpl.put("Sec-WebSocket-Version", version); + clientHandshakeImpl.put("Sec-WebSocket-Key", key); + clientHandshakeImpl.put("Sec-WebSocket-Extensions", extensions); + + return clientHandshakeImpl; + } + + private HandshakeImpl1Server fillServerHandshake(String serverHandshake) { + Matcher matcher = WEBSOCKET_ACCEPT_PATTERN.matcher(serverHandshake); + if (!matcher.find()) { + return null; + } + String accept = matcher.group(1); + + matcher = WEBSOCKET_EXTENSIONS_PATTERN.matcher(serverHandshake); + if (!matcher.find()) { + return null; + } + String extensions = matcher.group(1); + + HandshakeImpl1Server serverHandshakeImpl = new HandshakeImpl1Server(); + + serverHandshakeImpl.put("Upgrade", "websocket"); + serverHandshakeImpl.put("Connection", "Upgrade"); + serverHandshakeImpl.put("Sec-WebSocket-Accept", accept); + serverHandshakeImpl.put("Sec-WebSocket-Extensions", extensions); + + return serverHandshakeImpl; + } + +} diff --git a/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java b/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java index ecdbf5c..67bde4a 100644 --- a/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java +++ b/src/test/java/ru/serega6531/packmate/StreamOptimizerTest.java @@ -28,8 +28,8 @@ class StreamOptimizerTest { List list = new ArrayList<>(); list.add(p); - new StreamOptimizer(service, list).optimizeStream(); - final String processed = new String(list.get(0).getContent()); + list = new StreamOptimizer(service, list).optimizeStream(); + final String processed = list.get(0).getContentString(); assertTrue(processed.contains("aaabbb")); } @@ -42,8 +42,8 @@ class StreamOptimizerTest { List list = new ArrayList<>(); list.add(p); - new StreamOptimizer(service, list).optimizeStream(); - final String processed = new String(list.get(0).getContent()); + list = new StreamOptimizer(service, list).optimizeStream(); + final String processed = list.get(0).getContentString(); assertTrue(processed.contains("а б")); } @@ -67,7 +67,7 @@ class StreamOptimizerTest { list.add(p5); list.add(p6); - new StreamOptimizer(service, list).optimizeStream(); + list = new StreamOptimizer(service, list).optimizeStream(); assertEquals(4, list.size()); assertEquals(2, list.get(1).getContent().length);