/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.clogproxy.client.connection;

import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.connection.ConnectionParams;
import com.oceanbase.clogproxy.client.connection.StreamContext;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.util.ClientUtil;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
import com.oceanbase.clogproxy.common.packet.protocol.V1Proto;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import java.util.concurrent.BlockingQueue;
import net.jpountz.lz4.LZ4FastDecompressor;
import org.apache.flink.cdc.connectors.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.flink.cdc.connectors.shaded.org.apache.commons.lang3.Conversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientHandlerV01 {
    private static final Logger logger = LoggerFactory.getLogger(ClientHandlerV01.class);
    private static final byte[] MAGIC_STRING = new byte[]{120, 105, 53, 51, 103, 93, 113};
    private static final String CLIENT_IP = NetworkUtil.getLocalIp();
    private final ClientConf config;
    private ConnectionParams params;
    private final BlockingQueue<StreamContext.TransferPacket> recordQueue;
    private final LZ4FastDecompressor fastDecompressor;
    private HandshakeState state = HandshakeState.PROTOCOL_VERSION;
    private String logProxyIp;

    public ClientHandlerV01(ClientConf config, ConnectionParams params, BlockingQueue<StreamContext.TransferPacket> recordQueue, LZ4FastDecompressor fastDecompressor) {
        this.config = config;
        this.params = params;
        this.recordQueue = recordQueue;
        this.fastDecompressor = fastDecompressor;
    }

    public void setParams(ConnectionParams params) {
        this.params = params;
    }

    public boolean channelRead(boolean poolflag, ByteBuf buffer, boolean inDataNotEnough) throws Exception {
        boolean dataNotEnough = inDataNotEnough;
        switch (this.state) {
            case PROTOCOL_VERSION: {
                if (buffer.readableBytes() >= 2) {
                    short code = buffer.readShort();
                    ProtocolVersion version = ProtocolVersion.codeOf(code);
                    if (version == null) {
                        this.resetState();
                        logger.error("unsupport protocol version: {}", (Object)code);
                        throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "unsupport protocol version: " + code);
                    }
                    this.state = HandshakeState.HEADER_CODE;
                    break;
                }
                dataNotEnough = true;
                break;
            }
            case HEADER_CODE: {
                if (buffer.readableBytes() >= 4) {
                    int code = buffer.readInt();
                    if (code != HeaderType.HANDSHAKE_RESPONSE_CLIENT.code() && code != HeaderType.ERROR_RESPONSE.code()) {
                        this.resetState();
                        logger.error("unexpected Header Type, expected: {}({}), income: {}", new Object[]{HeaderType.HANDSHAKE_RESPONSE_CLIENT.code(), HeaderType.HANDSHAKE_RESPONSE_CLIENT.name(), code});
                        throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "unexpected Header Type: " + code);
                    }
                    this.state = HandshakeState.RESPONSE_CODE;
                    break;
                }
                dataNotEnough = true;
                break;
            }
            case RESPONSE_CODE: {
                if (buffer.readableBytes() >= 4) {
                    int code = buffer.readInt();
                    if (code != 0) {
                        this.state = HandshakeState.MESSAGE;
                        break;
                    }
                    this.state = HandshakeState.LOGPROXY_IP;
                    break;
                }
                dataNotEnough = true;
                break;
            }
            case MESSAGE: {
                String message = ClientHandlerV01.decodeStringInt(buffer);
                if (message != null) {
                    this.resetState();
                    logger.error("LogProxy refused handshake request: {}", (Object)message);
                    throw new LogProxyClientException(ErrorCode.NO_AUTH, "LogProxy refused handshake request: " + message, true);
                }
                dataNotEnough = true;
                break;
            }
            case LOGPROXY_IP: {
                this.logProxyIp = ClientHandlerV01.decodeStringByte(buffer);
                if (this.logProxyIp != null) {
                    this.state = HandshakeState.LOGPROXY_VERSION;
                    break;
                }
                dataNotEnough = true;
                break;
            }
            case LOGPROXY_VERSION: {
                String logProxyVersion = ClientHandlerV01.decodeStringByte(buffer);
                if (logProxyVersion != null) {
                    logger.info("Connected to LogProxy: {}, {}", (Object)this.logProxyIp, (Object)logProxyVersion);
                    this.state = HandshakeState.STREAM;
                    break;
                }
                dataNotEnough = true;
                break;
            }
            case STREAM: {
                this.parseData(poolflag, buffer);
                dataNotEnough = true;
            }
        }
        return dataNotEnough;
    }

    private static String decodeStringInt(ByteBuf buffer) {
        if (buffer.readableBytes() < 4) {
            return null;
        }
        buffer.markReaderIndex();
        int length = buffer.readInt();
        if (buffer.readableBytes() < length) {
            buffer.resetReaderIndex();
            return null;
        }
        byte[] bytes = new byte[length];
        buffer.readBytes(bytes);
        String str = new String(bytes);
        if (str.isEmpty()) {
            throw new RuntimeException("decode string is null or empty");
        }
        return str;
    }

    private static String decodeStringByte(ByteBuf buffer) {
        if (buffer.readableBytes() < 1) {
            return null;
        }
        buffer.markReaderIndex();
        short length = buffer.readByte();
        if (buffer.readableBytes() < length) {
            buffer.resetReaderIndex();
            return null;
        }
        byte[] bytes = new byte[length];
        buffer.readBytes(bytes);
        String str = new String(bytes);
        if (str.isEmpty()) {
            throw new RuntimeException("decode string is null or empty");
        }
        return str;
    }

    private void parseData(boolean poolflag, ByteBuf buffer) throws LogProxyClientException {
        while (poolflag && buffer.readableBytes() >= 2) {
            boolean go;
            buffer.markReaderIndex();
            short code = buffer.readShort();
            ProtocolVersion version = ProtocolVersion.codeOf(code);
            if (version == null) {
                this.resetState();
                logger.error("unsupport protocol version: {}", (Object)code);
                throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "unsupport protocol version: " + code);
            }
            switch (version) {
                case V1: {
                    go = this.parseDataV1(buffer);
                    break;
                }
                default: {
                    go = this.parseDataV0(buffer);
                }
            }
            if (go) continue;
            break;
        }
    }

    private boolean parseDataV0(ByteBuf buffer) {
        if (buffer.readableBytes() < 8) {
            buffer.resetReaderIndex();
            return false;
        }
        int code = buffer.readInt();
        if (code != HeaderType.DATA_CLIENT.code()) {
            this.resetState();
            logger.error("unexpected Header Type, expected: {}({}), income: {}", new Object[]{HeaderType.DATA_CLIENT.code(), HeaderType.DATA_CLIENT.name(), code});
            throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "unexpected Header Type: " + code);
        }
        int dataLength = buffer.readInt();
        if (buffer.readableBytes() < dataLength) {
            buffer.resetReaderIndex();
            return false;
        }
        code = buffer.readByte();
        if (CompressType.codeOf(code) == null) {
            throw new LogProxyClientException(ErrorCode.E_COMPRESS_TYPE, "unexpected Compress Type: " + code);
        }
        int totalLength = buffer.readInt();
        int rawDataLength = buffer.readInt();
        byte[] rawData = new byte[rawDataLength];
        buffer.readBytes(rawData);
        if (code == CompressType.LZ4.code()) {
            byte[] bytes = new byte[totalLength];
            int decompress = this.fastDecompressor.decompress(rawData, 0, bytes, 0, totalLength);
            if (decompress != rawDataLength) {
                throw new LogProxyClientException(ErrorCode.E_LEN, "decompressed length [" + decompress + "] is not expected [" + rawDataLength + "]");
            }
            this.parseRecord(bytes);
        } else {
            this.parseRecord(rawData);
        }
        return true;
    }

    private void parseRecord(byte[] bytes) throws LogProxyClientException {
        int offset = 0;
        while (offset < bytes.length) {
            int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4);
            dataLength = ByteBufUtil.swapInt(dataLength);
            LogMessage logMessage = new LogMessage(false);
            byte[] data = new byte[dataLength];
            System.arraycopy(bytes, offset + 8, data, 0, data.length);
            try {
                logMessage.parse(data);
            }
            catch (Exception e) {
                if (this.config.isIgnoreUnknownRecordType()) {
                    logger.debug("Unsupported record type: {}", (Object)logMessage);
                    offset += 8 + dataLength;
                    continue;
                }
                throw new LogProxyClientException(ErrorCode.E_PARSE, e);
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Log message: {}", (Object)logMessage);
            }
            while (true) {
                try {
                    this.recordQueue.put(new StreamContext.TransferPacket(logMessage));
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
            offset += 8 + dataLength;
        }
    }

    private boolean parseDataV1(ByteBuf buffer) {
        if (buffer.readableBytes() < 4) {
            buffer.resetReaderIndex();
            return false;
        }
        int length = buffer.readInt();
        if (buffer.readableBytes() < length) {
            buffer.resetReaderIndex();
            return false;
        }
        byte[] buff = new byte[length];
        buffer.readBytes(buff, 0, length);
        try {
            V1Proto.PbPacket packet = V1Proto.PbPacket.parseFrom(buff);
            if (packet.getCompressType() != CompressType.NONE.code()) {
                throw new LogProxyClientException(ErrorCode.E_COMPRESS_TYPE, "Unsupport Compress Type: " + packet.getCompressType());
            }
            if (packet.getType() != HeaderType.STATUS.code()) {
                throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "Unsupport Header Type: " + packet.getType());
            }
            LogProxyProto.RuntimeStatus status = LogProxyProto.RuntimeStatus.parseFrom(packet.getPayload());
            if (status == null) {
                throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet, empty Runtime Status");
            }
            while (true) {
                try {
                    this.recordQueue.put(new StreamContext.TransferPacket(status));
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
        }
        catch (InvalidProtocolBufferException e) {
            throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet", e);
        }
        return true;
    }

    public void resetState() {
        this.state = HandshakeState.PROTOCOL_VERSION;
    }

    public ByteBuf generateConnectRequest() {
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length);
        byteBuf.writeBytes(MAGIC_STRING);
        byteBuf.capacity(byteBuf.capacity() + 2 + 4 + 1);
        byteBuf.writeShort(ProtocolVersion.V0.code());
        byteBuf.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
        byteBuf.writeByte(this.params.getLogType().code());
        int length = CLIENT_IP.length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(CLIENT_IP.getBytes());
        length = this.params.getClientId().length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(this.params.getClientId().getBytes());
        String version = ClientUtil.getClientVersion();
        length = version.length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(version.getBytes());
        length = this.params.getConfigurationString().length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(this.params.getConfigurationString().getBytes());
        return byteBuf;
    }

    static enum HandshakeState {
        PROTOCOL_VERSION,
        HEADER_CODE,
        RESPONSE_CODE,
        MESSAGE,
        LOGPROXY_IP,
        LOGPROXY_VERSION,
        STREAM;

    }
}

