/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.distributed.cache.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.remote.VersionNegotiator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CacheClientHandshakeHandler
extends ChannelInboundHandlerAdapter {
    private static final int PROTOCOL_UNINITIALIZED = 0;
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final byte[] MAGIC_HEADER = new byte[]{78, 105, 70, 105};
    private final ChannelPromise promiseHandshakeComplete;
    private final AtomicInteger protocol;
    private final VersionNegotiator versionNegotiator;
    private final long timeoutMillis;

    public CacheClientHandshakeHandler(Channel channel, VersionNegotiator versionNegotiator, long timeoutMillis) {
        this.promiseHandshakeComplete = channel.newPromise();
        this.protocol = new AtomicInteger(0);
        this.versionNegotiator = versionNegotiator;
        this.timeoutMillis = timeoutMillis;
    }

    public void waitHandshakeComplete() {
        this.promiseHandshakeComplete.awaitUninterruptibly(this.timeoutMillis, TimeUnit.MILLISECONDS);
        if (!this.promiseHandshakeComplete.isSuccess()) {
            HandshakeException ex = new HandshakeException("Handshake timed out before completion.");
            this.promiseHandshakeComplete.setFailure((Throwable)ex);
        }
    }

    public VersionNegotiator getVersionNegotiator() {
        return this.versionNegotiator;
    }

    public void channelActive(ChannelHandlerContext ctx) throws IOException {
        ByteBuf byteBufMagic = Unpooled.wrappedBuffer((byte[])MAGIC_HEADER);
        ctx.write((Object)byteBufMagic);
        this.logger.debug("Magic header written");
        int currentVersion = this.versionNegotiator.getVersion();
        ByteBuf byteBufVersion = Unpooled.wrappedBuffer((byte[])new OutboundAdapter().write(currentVersion).toBytes());
        ctx.writeAndFlush((Object)byteBufVersion);
        this.logger.debug("Protocol version {} proposed", (Object)this.versionNegotiator.getVersion());
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (this.promiseHandshakeComplete.isSuccess()) {
            ctx.fireChannelRead(msg);
        } else {
            ByteBuf byteBuf = (ByteBuf)msg;
            try {
                this.processHandshake(ctx, byteBuf);
            }
            catch (IOException | HandshakeException e) {
                throw new IllegalStateException("Handshake Processing Failed", e);
            }
            finally {
                byteBuf.release();
            }
        }
    }

    private void processHandshake(ChannelHandlerContext ctx, ByteBuf byteBuf) throws HandshakeException, IOException {
        short statusCode = byteBuf.readUnsignedByte();
        if (statusCode == 20) {
            this.logger.debug("Protocol version {} accepted", (Object)this.versionNegotiator.getVersion());
            this.protocol.set(this.versionNegotiator.getVersion());
        } else if (statusCode == 21) {
            int newVersion = byteBuf.readInt();
            this.logger.debug("Received protocol version {} counter proposal", (Object)newVersion);
            Integer newPreference = this.versionNegotiator.getPreferredVersion(newVersion);
            Optional.ofNullable(newPreference).orElseThrow(() -> new HandshakeException(String.format("Received unsupported protocol version proposal [%d]", newVersion)));
            this.versionNegotiator.setVersion(newPreference.intValue());
            ctx.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])new OutboundAdapter().write(newPreference).toBytes()));
        } else {
            if (statusCode == 255) {
                short length = byteBuf.readShort();
                byte[] message = new byte[length];
                byteBuf.readBytes(message);
                throw new HandshakeException("Remote destination aborted connection with message: " + new String(message, StandardCharsets.UTF_8));
            }
            throw new HandshakeException("Unknown handshake signal: " + statusCode);
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) {
        if (this.promiseHandshakeComplete.isSuccess()) {
            ctx.fireChannelReadComplete();
        } else if (this.protocol.get() > 0) {
            this.promiseHandshakeComplete.setSuccess();
        }
    }

    public boolean isSuccess() {
        return this.promiseHandshakeComplete.isSuccess();
    }

    public Throwable cause() {
        return this.promiseHandshakeComplete.cause();
    }
}

