/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.io.netty.kv;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.events.io.FeaturesNegotiatedEvent;
import com.couchbase.client.core.cnc.events.io.FeaturesNegotiationFailedEvent;
import com.couchbase.client.core.cnc.events.io.UnsolicitedFeaturesReturnedEvent;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.core.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.core.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.core.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.core.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.core.endpoint.EndpointContext;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.io.IoContext;
import com.couchbase.client.core.io.netty.kv.ChannelAttributes;
import com.couchbase.client.core.io.netty.kv.ConnectTimings;
import com.couchbase.client.core.io.netty.kv.MemcacheProtocol;
import com.couchbase.client.core.io.netty.kv.ServerFeature;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.kv.BaseKeyValueRequest;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Stability.Internal
public class FeatureNegotiatingHandler
extends ChannelDuplexHandler {
    private final Duration timeout;
    private final Set<ServerFeature> features;
    private final EndpointContext endpointContext;
    private IoContext ioContext;
    private ChannelPromise interceptedConnectPromise;

    public FeatureNegotiatingHandler(EndpointContext endpointContext, Set<ServerFeature> features) {
        this.endpointContext = endpointContext;
        this.timeout = endpointContext.environment().timeoutConfig().connectTimeout();
        this.features = features;
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (this.features.isEmpty()) {
            ConnectTimings.record(ctx.channel(), this.getClass());
            ctx.pipeline().remove(this);
            ctx.connect(remoteAddress, localAddress, promise);
        } else {
            this.interceptedConnectPromise = promise;
            ChannelPromise downstream = ctx.newPromise();
            downstream.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
                if (!f.isSuccess() && !this.interceptedConnectPromise.isDone()) {
                    ConnectTimings.record(ctx.channel(), this.getClass());
                    this.interceptedConnectPromise.tryFailure(f.cause());
                }
            }));
            ctx.connect(remoteAddress, localAddress, downstream);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ioContext = new IoContext(this.endpointContext, ctx.channel().localAddress(), ctx.channel().remoteAddress(), this.endpointContext.bucket());
        ctx.executor().schedule(() -> {
            if (!this.interceptedConnectPromise.isDone()) {
                ConnectTimings.stop(ctx.channel(), this.getClass(), true);
                this.interceptedConnectPromise.tryFailure(new TimeoutException("KV Feature Negotiation timed out after " + this.timeout.toMillis() + "ms"));
            }
        }, this.timeout.toNanos(), TimeUnit.NANOSECONDS);
        ConnectTimings.start(ctx.channel(), this.getClass());
        ctx.writeAndFlush(this.buildHelloRequest(ctx));
        ctx.fireChannelActive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof ByteBuf) {
            Optional<Duration> latency = ConnectTimings.stop(ctx.channel(), this.getClass(), false);
            if (!MemcacheProtocol.successful((ByteBuf)msg)) {
                this.endpointContext.environment().eventBus().publish(new FeaturesNegotiationFailedEvent(this.ioContext, MemcacheProtocol.status((ByteBuf)msg)));
            }
            Set<ServerFeature> negotiated = this.extractFeaturesFromBody((ByteBuf)msg);
            ctx.channel().attr(ChannelAttributes.SERVER_FEATURE_KEY).set(negotiated);
            this.endpointContext.environment().eventBus().publish(new FeaturesNegotiatedEvent(this.ioContext, latency.orElse(Duration.ZERO), new ArrayList<ServerFeature>(negotiated)));
            this.interceptedConnectPromise.trySuccess();
            ctx.pipeline().remove(this);
        } else {
            this.interceptedConnectPromise.tryFailure(new CouchbaseException("Unexpected response type on channel read, this is a bug - please report. " + msg));
        }
        ReferenceCountUtil.release(msg);
    }

    private Set<ServerFeature> extractFeaturesFromBody(ByteBuf response) {
        Optional<ByteBuf> body = MemcacheProtocol.body(response);
        EnumSet<ServerFeature> negotiated = EnumSet.noneOf(ServerFeature.class);
        ArrayList<ServerFeature> unsolicited = new ArrayList<ServerFeature>();
        if (!body.isPresent()) {
            return negotiated;
        }
        while (body.get().isReadable()) {
            try {
                int featureRaw = body.get().readUnsignedShort();
                ServerFeature feature = ServerFeature.from(featureRaw);
                if (this.features.contains((Object)feature)) {
                    negotiated.add(feature);
                    continue;
                }
                unsolicited.add(feature);
            }
            catch (Exception ex) {
                this.interceptedConnectPromise.tryFailure(new CouchbaseException("Error while parsing negotiated server features.", ex));
            }
        }
        if (!unsolicited.isEmpty()) {
            this.endpointContext.environment().eventBus().publish(new UnsolicitedFeaturesReturnedEvent(this.ioContext, unsolicited));
        }
        return negotiated;
    }

    private ByteBuf buildHelloRequest(ChannelHandlerContext ctx) {
        ByteBuf key = this.buildHelloKey(ctx);
        ByteBuf body = ctx.alloc().buffer(this.features.size() * 2);
        for (ServerFeature feature : this.features) {
            body.writeShort(feature.value());
        }
        ByteBuf request = MemcacheProtocol.request(ctx.alloc(), MemcacheProtocol.Opcode.HELLO, MemcacheProtocol.noDatatype(), MemcacheProtocol.noPartition(), BaseKeyValueRequest.nextOpaque(), MemcacheProtocol.noCas(), MemcacheProtocol.noExtras(), key, body);
        key.release();
        body.release();
        return request;
    }

    private ByteBuf buildHelloKey(ChannelHandlerContext ctx) {
        long convertedChannelId;
        TreeMap<String, String> result = new TreeMap<String, String>();
        String agent = this.endpointContext.environment().userAgent().formattedShort();
        if (agent == null || agent.isEmpty()) {
            agent = "couchbase-java-core/0.0.0";
        } else if (agent.length() > 200) {
            agent = agent.substring(0, 200);
        }
        result.put("a", agent);
        String channelId = "0x" + ctx.channel().id().asShortText();
        try {
            convertedChannelId = channelId.equals("0xembedded") ? 1L : Long.decode(channelId);
        }
        catch (NumberFormatException ex) {
            convertedChannelId = new Random().nextInt();
        }
        String paddedChannelId = FeatureNegotiatingHandler.paddedHex(convertedChannelId);
        String fullId = FeatureNegotiatingHandler.paddedHex(this.endpointContext.id()) + "/" + paddedChannelId;
        result.put("i", fullId);
        ctx.channel().attr(ChannelAttributes.CHANNEL_ID_KEY).set(fullId);
        return ctx.alloc().buffer().writeBytes(Mapper.encodeAsBytes(result));
    }

    private static String paddedHex(long input) {
        return String.format("%016X", input);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (!this.interceptedConnectPromise.isDone()) {
            this.interceptedConnectPromise.tryFailure(cause);
        }
        ctx.fireExceptionCaught(cause);
    }
}

