Добавлено склеивание соседних пакетов

This commit is contained in:
serega6531
2019-11-25 02:32:10 +03:00
parent 43d6973948
commit 90894d67cb
2 changed files with 108 additions and 60 deletions

View File

@@ -39,6 +39,7 @@ public class StreamService {
private final String localIp;
private final boolean unpackGzippedHttp;
private final boolean ignoreEmptyPackets;
private final boolean mergeAdjacentPackets;
private final byte[] GZIP_HEADER = {0x1f, (byte) 0x8b, 0x08};
private final java.util.regex.Pattern userAgentPattern = java.util.regex.Pattern.compile("User-Agent: (.+)\\n");
@@ -51,7 +52,8 @@ public class StreamService {
StreamSubscriptionService subscriptionService,
@Value("${local-ip}") String localIp,
@Value("${unpack-gzipped-http}") boolean unpackGzippedHttp,
@Value("${ignore-empty-packets}") boolean ignoreEmptyPackets) {
@Value("${ignore-empty-packets}") boolean ignoreEmptyPackets,
@Value("${merge-adjacent-packets}") boolean mergeAdjacentPackets) {
this.repository = repository;
this.patternService = patternService;
this.servicesService = servicesService;
@@ -60,6 +62,7 @@ public class StreamService {
this.localIp = localIp;
this.unpackGzippedHttp = unpackGzippedHttp;
this.ignoreEmptyPackets = ignoreEmptyPackets;
this.mergeAdjacentPackets = mergeAdjacentPackets;
}
/**
@@ -102,6 +105,86 @@ public class StreamService {
}
if (unpackGzippedHttp) {
unpackGzip(packets);
}
if (mergeAdjacentPackets) {
mergeAdjacentPackets(packets);
}
String ua = null;
for (Packet packet : packets) {
String content = new String(packet.getContent());
final Matcher matcher = userAgentPattern.matcher(content);
if (matcher.find()) {
ua = matcher.group(1);
break;
}
}
if (ua != null) {
stream.setUserAgentHash(calculateUserAgentHash(ua));
}
Stream savedStream = save(stream);
Set<Pattern> foundPatterns = new HashSet<>();
for (ru.serega6531.packmate.model.Packet packet : packets) {
packet.setStream(savedStream);
final Set<FoundPattern> matches = patternService.findMatches(packet.getContent(), packet.isIncoming());
packet.setMatches(matches);
foundPatterns.addAll(matches.stream()
.map(FoundPattern::getPatternId)
.map(patternService::find)
.collect(Collectors.toList()));
}
savedStream.setFoundPatterns(foundPatterns);
savedStream.setPackets(packetService.saveAll(packets));
savedStream = save(savedStream);
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.NEW_STREAM, savedStream));
return true;
}
private void mergeAdjacentPackets(List<Packet> packets) {
int start = 0;
int packetsInRow = 0;
boolean incoming = false;
for (int i = 0; i < packets.size(); i++) {
Packet packet = packets.get(i);
if (packet.isIncoming() != incoming || i == packets.size() - 1) {
if (packetsInRow > 1) {
final List<Packet> cut = packets.subList(start, i);
packets.removeAll(cut);
//noinspection OptionalGetWithoutIsPresent
final byte[] content = cut.stream()
.map(Packet::getContent)
.reduce(ArrayUtils::addAll)
.get();
packets.add(start, Packet.builder()
.incoming(incoming)
.timestamp(packets.get(0).getTimestamp())
.ungzipped(cut.stream().anyMatch(Packet::isUngzipped))
.content(content)
.build());
}
start++;
i = start;
packetsInRow = 1;
} else {
packetsInRow++;
}
incoming = packet.isIncoming();
}
}
private void unpackGzip(List<Packet> packets) {
boolean gzipStarted = false;
int gzipStartPacket = 0;
int gzipEndPacket;
@@ -163,42 +246,6 @@ public class StreamService {
}
}
String ua = null;
for (Packet packet : packets) {
String content = new String(packet.getContent());
final Matcher matcher = userAgentPattern.matcher(content);
if (matcher.find()) {
ua = matcher.group(1);
break;
}
}
if (ua != null) {
stream.setUserAgentHash(calculateUserAgentHash(ua));
}
Stream savedStream = save(stream);
Set<Pattern> foundPatterns = new HashSet<>();
for (ru.serega6531.packmate.model.Packet packet : packets) {
packet.setStream(savedStream);
final Set<FoundPattern> matches = patternService.findMatches(packet.getContent(), packet.isIncoming());
packet.setMatches(matches);
foundPatterns.addAll(matches.stream()
.map(FoundPattern::getPatternId)
.map(patternService::find)
.collect(Collectors.toList()));
}
savedStream.setFoundPatterns(foundPatterns);
savedStream.setPackets(packetService.saveAll(packets));
savedStream = save(savedStream);
subscriptionService.broadcast(new SubscriptionMessage(SubscriptionMessageType.NEW_STREAM, savedStream));
return true;
}
private Packet decompressGzipPackets(List<Packet> packets) {
//noinspection OptionalGetWithoutIsPresent
final byte[] content = packets.stream()

View File

@@ -25,3 +25,4 @@ tcp-stream-timeout: 40 # секунд
timeout-stream-check-interval: 10 # секунд
ignore-empty-packets: true
unpack-gzipped-http: true
merge-adjacent-packets: true