ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Netty protobuf 패킷 압축
    카테고리 없음 2025. 3. 28. 16:50

    Netty + Protobuf 환경에서 패킷 압축 처리 방법

    현재 SimpleProtobufEncoder를 사용해서 Protobuf 메시지를 인코딩하고 있는데,
    여기서 압축 여부를 패킷에 포함시키고, 수신 측에서 이를 해제하는 방식을 자세히 설명할게.


    1. 압축이 필요한 이유

    • 대량의 데이터를 효율적으로 전송하기 위해 패킷 크기를 줄인다.
    • 서버와 클라이언트 간의 네트워크 트래픽을 최적화한다.
    • 특히 MMORPG 같은 게임 서버에서는 중요한 최적화 요소임.

    2. 압축을 적용하는 방법

    패킷을 보낼 때 압축을 적용했다는 메타데이터를 추가하는 것이 중요해.
    일반적으로 패킷 헤더에 압축 여부(1 byte)를 추가하고,
    압축된 경우 Zlib, LZ4, Snappy 등으로 압축 후 전송한다.

    패킷 구조 예시

    압축 여부 (1 Byte) 데이터 (압축 or 비압축)

    0 (압축 안함) 평문 Protobuf 데이터
    1 (압축됨) 압축된 Protobuf 데이터

    3. 패킷 인코딩 (송신 측)

    먼저 SimpleProtobufEncoder를 수정해서, 압축된 데이터를 포함하도록 만들자.

    SimpleProtobufEncoder (압축 적용)

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageEncoder;
    import com.google.protobuf.MessageLite;
    import com.google.protobuf.MessageLiteOrBuilder;
    import java.util.List;
    import java.util.zip.Deflater;
    
    @ChannelHandler.Sharable
    public class SimpleProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
            ByteBuf buffer = ctx.alloc().buffer();
            
            // Protobuf 데이터 직렬화
            byte[] data;
            if (msg instanceof MessageLite) {
                data = ((MessageLite) msg).toByteArray();
            } else {
                data = ((MessageLite.Builder) msg).build().toByteArray();
            }
    
            boolean shouldCompress = data.length > 512;  // 512바이트 이상이면 압축
            if (shouldCompress) {
                byte[] compressedData = compressData(data);
                buffer.writeByte(1);  // 압축 여부 (1: 압축됨)
                buffer.writeBytes(compressedData);
            } else {
                buffer.writeByte(0);  // 압축 안함
                buffer.writeBytes(data);
            }
    
            out.add(buffer);
        }
    
        // Zlib을 이용한 데이터 압축
        private byte[] compressData(byte[] data) throws Exception {
            Deflater deflater = new Deflater();
            deflater.setInput(data);
            deflater.finish();
    
            byte[] buffer = new byte[4096];
            int compressedSize = deflater.deflate(buffer);
            deflater.end();
    
            byte[] compressedData = new byte[compressedSize];
            System.arraycopy(buffer, 0, compressedData, 0, compressedSize);
            return compressedData;
        }
    }
    

    설명

    1. Protobuf 데이터를 직렬화 (toByteArray())
    2. 512바이트 이상이면 압축 적용
    3. 패킷 첫 번째 바이트(1 Byte)에 압축 여부 저장
    4. 압축 여부에 따라 원본 또는 압축된 데이터를 전송

    4. 패킷 디코딩 (수신 측)

    이제 수신 측에서는 패킷을 받아서, 압축이 되어 있으면 해제(Decompress) 후 Protobuf 메시지를 복원해야 해.

    SimpleProtobufDecoder (압축 해제)

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import com.google.protobuf.MessageLite;
    import java.util.List;
    import java.util.zip.Inflater;
    
    public class SimpleProtobufDecoder<T extends MessageLite> extends MessageToMessageDecoder<ByteBuf> {
    
        private final T defaultInstance;
    
        public SimpleProtobufDecoder(T defaultInstance) {
            this.defaultInstance = defaultInstance;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 압축 여부 확인
            byte compressionFlag = in.readByte();
    
            byte[] data = new byte[in.readableBytes()];
            in.readBytes(data);
    
            if (compressionFlag == 1) { // 압축된 데이터
                data = decompressData(data);
            }
    
            // Protobuf 메시지 디코딩
            MessageLite message = defaultInstance.getParserForType().parseFrom(data);
            out.add(message);
        }
    
        // Zlib을 이용한 압축 해제
        private byte[] decompressData(byte[] compressedData) throws Exception {
            Inflater inflater = new Inflater();
            inflater.setInput(compressedData);
    
            byte[] buffer = new byte[4096];
            int decompressedSize = inflater.inflate(buffer);
            inflater.end();
    
            byte[] decompressedData = new byte[decompressedSize];
            System.arraycopy(buffer, 0, decompressedData, 0, decompressedSize);
            return decompressedData;
        }
    }
    

    설명

    1. 첫 바이트를 읽어 압축 여부 확인
    2. 압축되었으면 해제 후 사용
    3. Protobuf 메시지를 복원하여 객체로 변환

    5. Netty 파이프라인에 적용

    이제 Netty의 채널 파이프라인에 위의 인코더/디코더를 추가하면 돼.

    ChannelPipeline pipeline = ch.pipeline();
    
    // Protobuf 압축 인코더 추가 (송신)
    pipeline.addLast("protobufEncoder", new SimpleProtobufEncoder());
    
    // Protobuf 압축 디코더 추가 (수신)
    pipeline.addLast("protobufDecoder", new SimpleProtobufDecoder(MyProtoMessage.getDefaultInstance()));
    

    6. 최종 정리

    • 송신 측 (Encoder)
      1. Protobuf 직렬화
      2. 패킷 크기가 크면 압축
      3. 패킷 헤더에 압축 여부 (1 byte) 추가
      4. Netty 파이프라인을 통해 전송
    • 수신 측 (Decoder)
      1. 첫 바이트를 읽어 압축 여부 확인
      2. 압축되었으면 해제
      3. Protobuf 디코딩 후 객체 변환
      4. Netty 파이프라인을 통해 다음 핸들러로 전달

    7. 추가 개선 가능 요소

    • 압축 알고리즘 최적화:
      • Zlib(기본 제공) 외에도 LZ4, Snappy 등의 빠른 압축 알고리즘 사용 가능
      • Snappy 사용 시 Snappy.compress() 및 Snappy.uncompress() 적용 가능
    • 멀티 스레드 환경 최적화
      • Netty에서는 Deflater/Inflater가 스레드 세이프하지 않으므로,
        각 채널마다 별도로 생성하거나 FastLZ 같은 라이브러리 활용 가능.

    이렇게 하면 Netty에서 Protobuf 기반으로 일부 패킷을 압축해서 보내고,
    수신 측에서 이를 해제하여 처리할 수 있어! 🚀

Designed by Tistory.