Merge branch 'master' into enable-patterns

# Conflicts:
#	src/main/java/ru/serega6531/packmate/model/enums/SubscriptionMessageType.java
#	src/main/java/ru/serega6531/packmate/service/PatternService.java
This commit is contained in:
serega6531
2020-04-07 00:41:00 +03:00
27 changed files with 492 additions and 87 deletions

View File

@@ -0,0 +1,63 @@
package ru.serega6531.packmate.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import ru.serega6531.packmate.model.enums.SubscriptionMessageType;
import ru.serega6531.packmate.model.pojo.Counter;
import ru.serega6531.packmate.model.pojo.CountersHolder;
import ru.serega6531.packmate.model.pojo.SubscriptionMessage;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class CountingService {
private final SubscriptionService subscriptionService;
private Map<Integer, Counter> servicesPackets = new HashMap<>();
private Map<Integer, Counter> servicesStreams = new HashMap<>();
private Counter totalPackets = new Counter();
private Counter totalStreams = new Counter();
@Autowired
public CountingService(SubscriptionService subscriptionService) {
this.subscriptionService = subscriptionService;
}
void countStream(int serviceId, int packets) {
getCounter(servicesPackets, serviceId).increment(packets);
getCounter(servicesStreams, serviceId).increment();
totalPackets.increment(packets);
totalStreams.increment();
}
@Scheduled(cron = "0 * * ? * *")
public void sendCounters() {
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.COUNTERS_UPDATE,
new CountersHolder(
toIntegerMap(servicesPackets), toIntegerMap(servicesStreams),
totalPackets.getValue(), totalStreams.getValue())));
servicesPackets.clear();
servicesStreams.clear();
totalPackets = new Counter();
totalStreams = new Counter();
}
private Map<Integer, Integer> toIntegerMap(Map<Integer, Counter> map) {
return map.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
ent -> ent.getValue().getValue()));
}
private Counter getCounter(Map<Integer, Counter> counters, int serviceId) {
return counters.computeIfAbsent(serviceId, c -> new Counter());
}
}

View File

@@ -20,13 +20,13 @@ import java.util.stream.Collectors;
public class PatternService {
private final PatternRepository repository;
private final StreamSubscriptionService subscriptionService;
private final SubscriptionService subscriptionService;
private final Map<Integer, Pattern> patterns = new HashMap<>();
@Autowired
public PatternService(PatternRepository repository,
StreamSubscriptionService subscriptionService) {
SubscriptionService subscriptionService) {
this.repository = repository;
this.subscriptionService = subscriptionService;
@@ -56,7 +56,7 @@ public class PatternService {
pattern.setEnabled(enabled);
repository.save(pattern);
if(enabled) {
if (enabled) {
log.info("Включен паттерн {} со значением {}", pattern.getName(), pattern.getValue());
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.ENABLE_PATTERN, id));
} else {
@@ -77,7 +77,7 @@ public class PatternService {
final Pattern saved = repository.save(pattern);
patterns.put(saved.getId(), saved);
log.info("Добавлен новый паттерн {} со значением {}", pattern.getName(), pattern.getValue());
log.info("Added new pattern {} with value {}", pattern.getName(), pattern.getValue());
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.SAVE_PATTERN, saved));
return saved;
}

View File

@@ -22,7 +22,7 @@ import java.util.Optional;
public class ServicesService {
private final ServiceRepository repository;
private final StreamSubscriptionService subscriptionService;
private final SubscriptionService subscriptionService;
private final InetAddress localIp;
@@ -30,7 +30,7 @@ public class ServicesService {
@Autowired
public ServicesService(ServiceRepository repository,
StreamSubscriptionService subscriptionService,
SubscriptionService subscriptionService,
@Value("${local-ip}") String localIpString) throws UnknownHostException {
this.repository = repository;
this.subscriptionService = subscriptionService;
@@ -59,14 +59,14 @@ public class ServicesService {
}
public void deleteByPort(int port) {
log.info("Удален сервис на порту {}", port);
log.info("Removed service at port {}", port);
services.remove(port);
repository.deleteById(port);
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.DELETE_SERVICE, port));
}
public CtfService save(CtfService service) {
log.info("Добавлен или изменен сервис {} на порту {}", service.getName(), service.getPort());
log.info("Added or edited service {} at port {}", service.getName(), service.getPort());
final CtfService saved = repository.save(service);
services.put(saved.getPort(), saved);
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.SAVE_SERVICE, saved));

View File

@@ -24,18 +24,22 @@ import java.util.zip.ZipException;
public class StreamOptimizer {
private final CtfService service;
private final List<Packet> packets;
private List<Packet> packets;
private static final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
/**
* Вызвать для выполнения оптимизаций на переданном списке пакетов.
*/
public void optimizeStream() {
public List<Packet> optimizeStream() {
if (service.isUngzipHttp()) {
unpackGzip();
}
if (service.isInflateWebSockets()) {
inflateWebSocket();
}
if (service.isUrldecodeHttpRequests()) {
urldecodeRequests();
}
@@ -43,6 +47,8 @@ public class StreamOptimizer {
if (service.isMergeAdjacentPackets()) {
mergeAdjacentPackets();
}
return packets;
}
/**
@@ -83,6 +89,7 @@ public class StreamOptimizer {
final List<Packet> cut = packets.subList(start, end);
final long timestamp = cut.get(0).getTimestamp();
final boolean ungzipped = cut.stream().anyMatch(Packet::isUngzipped);
final boolean webSocketInflated = cut.stream().anyMatch(Packet::isWebSocketInflated);
boolean incoming = cut.get(0).isIncoming();
//noinspection OptionalGetWithoutIsPresent
final byte[] content = cut.stream()
@@ -95,6 +102,7 @@ public class StreamOptimizer {
.incoming(incoming)
.timestamp(timestamp)
.ungzipped(ungzipped)
.webSocketInflated(webSocketInflated)
.content(content)
.build());
}
@@ -108,7 +116,7 @@ public class StreamOptimizer {
for (Packet packet : packets) {
if (packet.isIncoming()) {
String content = new String(packet.getContent());
String content = packet.getContentString();
if (content.contains("HTTP/")) {
httpStarted = true;
}
@@ -145,7 +153,7 @@ public class StreamOptimizer {
i = gzipStartPacket + 1; // продвигаем указатель на следующий после склеенного блок
}
} else if (!packet.isIncoming()) {
String content = new String(packet.getContent());
String content = packet.getContentString();
int contentPos = content.indexOf("\r\n\r\n");
boolean http = content.startsWith("HTTP/");
@@ -209,16 +217,17 @@ public class StreamOptimizer {
IOUtils.copy(gzipStream, out);
byte[] newContent = ArrayUtils.addAll(httpHeader, out.toByteArray());
log.debug("Разархивирован gzip: {} -> {} байт", gzipBytes.length, out.size());
log.debug("GZIP decompressed: {} -> {} bytes", gzipBytes.length, out.size());
return Packet.builder()
.incoming(false)
.timestamp(cut.get(0).getTimestamp())
.ungzipped(true)
.webSocketInflated(false)
.content(newContent)
.build();
} catch (ZipException e) {
log.warn("Не удалось разархивировать gzip, оставляем как есть", e);
log.warn("Failed to decompress gzip, leaving as it is", e);
} catch (IOException e) {
log.error("decompress gzip", e);
}
@@ -226,4 +235,17 @@ public class StreamOptimizer {
return null;
}
private void inflateWebSocket() {
if (!packets.get(0).getContentString().contains("HTTP/")) {
return;
}
final WebSocketsParser parser = new WebSocketsParser(packets);
if(!parser.isParsed()) {
return;
}
packets = parser.getParsedPackets();
}
}

View File

@@ -30,7 +30,8 @@ public class StreamService {
private final StreamRepository repository;
private final PatternService patternService;
private final ServicesService servicesService;
private final StreamSubscriptionService subscriptionService;
private final CountingService countingService;
private final SubscriptionService subscriptionService;
private final boolean ignoreEmptyPackets;
@@ -40,11 +41,13 @@ public class StreamService {
public StreamService(StreamRepository repository,
PatternService patternService,
ServicesService servicesService,
StreamSubscriptionService subscriptionService,
CountingService countingService,
SubscriptionService subscriptionService,
@Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) {
this.repository = repository;
this.patternService = patternService;
this.servicesService = servicesService;
this.countingService = countingService;
this.subscriptionService = subscriptionService;
this.ignoreEmptyPackets = ignoreEmptyPackets;
}
@@ -62,7 +65,7 @@ public class StreamService {
);
if (serviceOptional.isEmpty()) {
log.warn("Не удалось сохранить стрим: сервиса на порту {} или {} не существует",
log.warn("Failed to save the stream: service at port {} or {} does not exist",
unfinishedStream.getFirstPort(), unfinishedStream.getSecondPort());
return false;
}
@@ -72,7 +75,7 @@ public class StreamService {
packets.removeIf(packet -> packet.getContent().length == 0);
if (packets.isEmpty()) {
log.debug("Стрим состоит только из пустых пакетов и не будет сохранен");
log.debug("Stream consists only of empty packets and will not be saved");
return false;
}
}
@@ -88,7 +91,9 @@ public class StreamService {
stream.setEndTimestamp(packets.get(packets.size() - 1).getTimestamp());
stream.setService(service.getPort());
new StreamOptimizer(service, packets).optimizeStream();
countingService.countStream(service.getPort(), packets.size());
packets = new StreamOptimizer(service, packets).optimizeStream();
processUserAgent(packets, stream);
Stream savedStream = save(stream);
@@ -105,7 +110,7 @@ public class StreamService {
private void processUserAgent(List<Packet> packets, Stream stream) {
String ua = null;
for (Packet packet : packets) {
String content = new String(packet.getContent());
String content = packet.getContentString();
final Matcher matcher = userAgentPattern.matcher(content);
if (matcher.find()) {
ua = matcher.group(1);
@@ -149,7 +154,7 @@ public class StreamService {
Stream saved;
if (stream.getId() == null) {
saved = repository.save(stream);
log.debug("Создан стрим с id {}", saved.getId());
log.debug("Saved stream with id {}", saved.getId());
} else {
saved = repository.save(stream);
}

View File

@@ -18,25 +18,25 @@ import java.util.Objects;
@Service
@Slf4j
public class StreamSubscriptionService {
public class SubscriptionService {
private final List<WebSocketSession> subscribers = Collections.synchronizedList(new ArrayList<>());
private final ObjectMapper mapper;
@Autowired
public StreamSubscriptionService(ObjectMapper mapper) {
public SubscriptionService(ObjectMapper mapper) {
this.mapper = mapper;
}
public void addSubscriber(WebSocketSession session) {
subscribers.add(session);
log.info("Подписан пользователь {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName());
log.info("User subscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName());
}
public void removeSubscriber(WebSocketSession session) {
subscribers.remove(session);
log.info("Отписан пользователь {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName());
log.info("User unsubscribed: {}", Objects.requireNonNull(session.getRemoteAddress()).getHostName());
}
void broadcast(SubscriptionMessage message) {

View File

@@ -0,0 +1,254 @@
package ru.serega6531.packmate.service;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.exceptions.InvalidDataException;
import org.java_websocket.exceptions.InvalidHandshakeException;
import org.java_websocket.extensions.permessage_deflate.PerMessageDeflateExtension;
import org.java_websocket.framing.DataFrame;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.HandshakeImpl1Client;
import org.java_websocket.handshake.HandshakeImpl1Server;
import ru.serega6531.packmate.model.Packet;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
@Slf4j
public class WebSocketsParser {
private static final java.util.regex.Pattern WEBSOCKET_KEY_PATTERN =
java.util.regex.Pattern.compile("Sec-WebSocket-Key: (.+)\\r\\n");
private static final java.util.regex.Pattern WEBSOCKET_EXTENSIONS_PATTERN =
java.util.regex.Pattern.compile("Sec-WebSocket-Extensions?: (.+)\\r\\n");
private static final java.util.regex.Pattern WEBSOCKET_VERSION_PATTERN =
java.util.regex.Pattern.compile("Sec-WebSocket-Version: (\\d+)\\r\\n");
private static final java.util.regex.Pattern WEBSOCKET_ACCEPT_PATTERN =
java.util.regex.Pattern.compile("Sec-WebSocket-Accept: (.+)\\r\\n");
private static final String WEBSOCKET_EXTENSION_HEADER = "Sec-WebSocket-Extension: permessage-deflate";
private static final String WEBSOCKET_EXTENSIONS_HEADER = "Sec-WebSocket-Extensions: permessage-deflate";
private static final String WEBSOCKET_UPGRADE_HEADER = "upgrade: websocket\r\n";
private static final String WEBSOCKET_CONNECTION_HEADER = "connection: upgrade\r\n";
private final List<Packet> packets;
@Getter
private boolean parsed = false;
private List<Packet> parsedPackets;
public WebSocketsParser(List<Packet> packets) {
this.packets = packets;
detectWebSockets();
}
private void detectWebSockets() {
final List<Packet> clientHandshakePackets = packets.stream()
.takeWhile(Packet::isIncoming)
.collect(Collectors.toList());
final String clientHandshake = getHandshake(clientHandshakePackets);
if (clientHandshake == null) {
return;
}
int httpEnd = -1;
for (int i = clientHandshakePackets.size(); i < packets.size(); i++) {
if (packets.get(i).getContentString().endsWith("\r\n\r\n")) {
httpEnd = i + 1;
break;
}
}
if (httpEnd == -1) {
return;
}
final List<Packet> serverHandshakePackets = packets.subList(clientHandshakePackets.size(), httpEnd);
final String serverHandshake = getHandshake(serverHandshakePackets);
if (serverHandshake == null) {
return;
}
HandshakeImpl1Server serverHandshakeImpl = fillServerHandshake(serverHandshake);
HandshakeImpl1Client clientHandshakeImpl = fillClientHandshake(clientHandshake);
if (serverHandshakeImpl == null || clientHandshakeImpl == null) {
return;
}
Draft_6455 draft = new Draft_6455(new PerMessageDeflateExtension());
try {
draft.acceptHandshakeAsServer(clientHandshakeImpl);
draft.acceptHandshakeAsClient(clientHandshakeImpl, serverHandshakeImpl);
} catch (InvalidHandshakeException e) {
log.warn("WebSocket handshake", e);
return;
}
final List<Packet> wsPackets = packets.subList(
httpEnd,
packets.size());
if(wsPackets.isEmpty()) {
return;
}
final List<Packet> handshakes = packets.subList(0, httpEnd);
parse(wsPackets, handshakes, draft);
parsed = true;
}
private void parse(final List<Packet> wsPackets, final List<Packet> handshakes, Draft_6455 draft) {
List<List<Packet>> sides = sliceToSides(wsPackets);
parsedPackets = new ArrayList<>(handshakes);
for (List<Packet> side : sides) {
final Packet lastPacket = side.get(0);
final byte[] wsContent = side.stream()
.map(Packet::getContent)
.reduce(ArrayUtils::addAll)
.get();
final ByteBuffer buffer = ByteBuffer.wrap(wsContent);
List<Framedata> frames;
try {
frames = draft.translateFrame(buffer);
} catch (InvalidDataException e) {
log.warn("WebSocket data", e);
return;
}
for (Framedata frame : frames) {
if(frame instanceof DataFrame) {
parsedPackets.add(Packet.builder()
.content(frame.getPayloadData().array())
.incoming(lastPacket.isIncoming())
.timestamp(lastPacket.getTimestamp())
.ttl(lastPacket.getTtl())
.ungzipped(lastPacket.isUngzipped())
.webSocketInflated(true)
.build()
);
}
}
}
}
public List<Packet> getParsedPackets() {
if (!parsed) {
throw new IllegalStateException("WS is not parsed");
}
return parsedPackets;
}
private List<List<Packet>> sliceToSides(List<Packet> packets) {
List<List<Packet>> result = new ArrayList<>();
List<Packet> side = new ArrayList<>();
boolean incoming = true;
for (Packet packet : packets) {
if(packet.isIncoming() != incoming) {
incoming = packet.isIncoming();
if(!side.isEmpty()) {
result.add(side);
side = new ArrayList<>();
}
}
side.add(packet);
}
if(!side.isEmpty()) {
result.add(side);
}
return result;
}
private String getHandshake(final List<Packet> packets) {
final String handshake = packets.stream()
.map(Packet::getContent)
.reduce(ArrayUtils::addAll)
.map(String::new)
.orElse(null);
if (handshake == null ||
!handshake.toLowerCase().contains(WEBSOCKET_CONNECTION_HEADER) ||
!handshake.toLowerCase().contains(WEBSOCKET_UPGRADE_HEADER)) {
return null;
}
if (!handshake.contains(WEBSOCKET_EXTENSION_HEADER) &&
!handshake.contains(WEBSOCKET_EXTENSIONS_HEADER)) {
return null;
}
return handshake;
}
private HandshakeImpl1Client fillClientHandshake(String clientHandshake) {
Matcher matcher = WEBSOCKET_VERSION_PATTERN.matcher(clientHandshake);
if (!matcher.find()) {
return null;
}
String version = matcher.group(1);
matcher = WEBSOCKET_KEY_PATTERN.matcher(clientHandshake);
if (!matcher.find()) {
return null;
}
String key = matcher.group(1);
matcher = WEBSOCKET_EXTENSIONS_PATTERN.matcher(clientHandshake);
if (!matcher.find()) {
return null;
}
String extensions = matcher.group(1);
HandshakeImpl1Client clientHandshakeImpl = new HandshakeImpl1Client();
clientHandshakeImpl.put("Upgrade", "websocket");
clientHandshakeImpl.put("Connection", "Upgrade");
clientHandshakeImpl.put("Sec-WebSocket-Version", version);
clientHandshakeImpl.put("Sec-WebSocket-Key", key);
clientHandshakeImpl.put("Sec-WebSocket-Extensions", extensions);
return clientHandshakeImpl;
}
private HandshakeImpl1Server fillServerHandshake(String serverHandshake) {
Matcher matcher = WEBSOCKET_ACCEPT_PATTERN.matcher(serverHandshake);
if (!matcher.find()) {
return null;
}
String accept = matcher.group(1);
matcher = WEBSOCKET_EXTENSIONS_PATTERN.matcher(serverHandshake);
if (!matcher.find()) {
return null;
}
String extensions = matcher.group(1);
HandshakeImpl1Server serverHandshakeImpl = new HandshakeImpl1Server();
serverHandshakeImpl.put("Upgrade", "websocket");
serverHandshakeImpl.put("Connection", "Upgrade");
serverHandshakeImpl.put("Sec-WebSocket-Accept", accept);
serverHandshakeImpl.put("Sec-WebSocket-Extensions", extensions);
return serverHandshakeImpl;
}
}