Рефакторинг

This commit is contained in:
serega6531
2020-04-15 23:09:44 +03:00
parent 7dd4c6f468
commit 85681947cd
4 changed files with 96 additions and 52 deletions

View File

@@ -32,9 +32,9 @@ public class SubscriptionService {
public void addSubscriber(WebSocketSession session) { public void addSubscriber(WebSocketSession session) {
subscribers.add(session); subscribers.add(session);
log.info("User subscribed: {} {}", log.info("User subscribed: {} ({})",
session.getClass().getSimpleName(), Objects.requireNonNull(session.getRemoteAddress()).getHostName(),
Objects.requireNonNull(session.getRemoteAddress()).getHostName()); session.getClass().getSimpleName());
} }
public void removeSubscriber(WebSocketSession session) { public void removeSubscriber(WebSocketSession session) {

View File

@@ -8,6 +8,7 @@ import ru.serega6531.packmate.utils.Bytes;
import ru.serega6531.packmate.utils.PacketUtils; import ru.serega6531.packmate.utils.PacketUtils;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@@ -56,7 +57,7 @@ public class HttpChunksProcessor {
} }
private void checkCompleteChunk(List<Packet> packets, int start) { private void checkCompleteChunk(List<Packet> packets, int start) {
boolean end = packets.get(packets.size() - 1).getContentString().endsWith("\r\n0\r\n\r\n"); boolean end = Bytes.endsWith(packets.get(packets.size() - 1).getContent(), "\r\n0\r\n\r\n".getBytes());
if (end) { if (end) {
processChunk(packets, start); processChunk(packets, start);
@@ -76,66 +77,81 @@ public class HttpChunksProcessor {
ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(content, contentStart, content.length)); ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(content, contentStart, content.length));
while (true) { while (true) {
final String found = readChunkSize(buf); final int chunkSize = readChunkSize(buf);
if (found != null) {
final int chunkSize = Integer.parseInt(found, 16);
if (chunkSize == 0) { // конец потока чанков switch (chunkSize) {
Packet result = Packet.builder() case -1 -> {
.incoming(false) log.warn("Failed to merge chunks, next chunk size not found");
.timestamp(packets.get(0).getTimestamp())
.ungzipped(false)
.webSocketParsed(false)
.content(output.toByteArray())
.build();
this.packets.removeAll(packets);
this.packets.add(start, result);
resetChunk();
position = start + 1;
return;
}
if (chunkSize > buf.remaining()) {
log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}",
buf.position(), chunkSize, buf.capacity());
resetChunk(); resetChunk();
return; return;
} }
case 0 -> {
byte[] chunk = new byte[chunkSize]; buildWholePacket(packets, start, output);
buf.get(chunk);
output.write(chunk);
if (buf.remaining() < 2) {
log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n");
resetChunk();
return; return;
} }
default -> {
int c1 = buf.get(); if (!readChunk(buf, chunkSize, output)) return;
int c2 = buf.get(); if (!readTrailer(buf)) return;
if(c1 != '\r' || c2 != '\n') {
log.warn("Failed to merge chunks, chunk trailer is not equal to \\r\\n");
resetChunk();
return;
} }
} else {
log.warn("Failed to merge chunks, next chunk size not found");
resetChunk();
return;
} }
} }
} }
private boolean readChunk(ByteBuffer buf, int chunkSize, ByteArrayOutputStream output) throws IOException {
if (chunkSize > buf.remaining()) {
log.warn("Failed to merge chunks, chunk size too big: {} + {} > {}",
buf.position(), chunkSize, buf.capacity());
resetChunk();
return false;
}
byte[] chunk = new byte[chunkSize];
buf.get(chunk);
output.write(chunk);
return true;
}
private boolean readTrailer(ByteBuffer buf) {
if (buf.remaining() < 2) {
log.warn("Failed to merge chunks, chunk doesn't end with \\r\\n");
resetChunk();
return false;
}
int c1 = buf.get();
int c2 = buf.get();
if(c1 != '\r' || c2 != '\n') {
log.warn("Failed to merge chunks, chunk trailer is not equal to \\r\\n");
resetChunk();
return false;
}
return true;
}
private void buildWholePacket(List<Packet> packets, int start, ByteArrayOutputStream output) {
Packet result = Packet.builder()
.incoming(false)
.timestamp(packets.get(0).getTimestamp())
.ungzipped(false)
.webSocketParsed(false)
.content(output.toByteArray())
.build();
this.packets.removeAll(packets);
this.packets.add(start, result);
resetChunk();
position = start + 1;
}
private void resetChunk() { private void resetChunk() {
chunkStarted = false; chunkStarted = false;
chunkPackets.clear(); chunkPackets.clear();
} }
private String readChunkSize(ByteBuffer buf) { private int readChunkSize(ByteBuffer buf) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
while (buf.remaining() > 2) { while (buf.remaining() > 2) {
@@ -145,14 +161,16 @@ public class HttpChunksProcessor {
sb.append((char) b); sb.append((char) b);
} else if (b == '\r') { } else if (b == '\r') {
if(buf.get() == '\n') { if(buf.get() == '\n') {
return sb.toString(); return Integer.parseInt(sb.toString(), 16);
} else { } else {
return null; // после \r не идет \n return -1; // после \r не идет \n
} }
} else {
return -1;
} }
} }
return null; return -1;
} }
} }

View File

@@ -61,7 +61,7 @@ public class StreamOptimizer {
} }
/** /**
* https://ru.wikipedia.org/wiki/Chunked_transfer_encoding * <a href="https://ru.wikipedia.org/wiki/Chunked_transfer_encoding">Chunked transfer encoding</a>
*/ */
private void processChunkedEncoding() { private void processChunkedEncoding() {
HttpChunksProcessor processor = new HttpChunksProcessor(packets); HttpChunksProcessor processor = new HttpChunksProcessor(packets);

View File

@@ -5,10 +5,18 @@ import lombok.experimental.UtilityClass;
@UtilityClass @UtilityClass
public class Bytes { public class Bytes {
/**
* @param array где ищем
* @param target что ищем
*/
public int indexOf(byte[] array, byte[] target) { public int indexOf(byte[] array, byte[] target) {
return indexOf(array, target, 0, array.length); return indexOf(array, target, 0, array.length);
} }
/**
* @param array где ищем
* @param target что ищем
*/
public int indexOf(byte[] array, byte[] target, int start, int end) { public int indexOf(byte[] array, byte[] target, int start, int end) {
if (target.length == 0) { if (target.length == 0) {
return 0; return 0;
@@ -26,4 +34,22 @@ public class Bytes {
return -1; return -1;
} }
/**
* @param array где ищем
* @param target что ищем
*/
public boolean endsWith(byte[] array, byte[] target) {
if(array.length < target.length) {
return false;
}
for (int i = 0; i < target.length; i++) {
if(array[array.length - target.length + i] != target[i]) {
return false;
}
}
return true;
}
} }