/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.io.netty.kv;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.events.io.ErrorMapLoadedEvent;
import com.couchbase.client.core.cnc.events.io.ErrorMapLoadingFailedEvent;
import com.couchbase.client.core.cnc.events.io.ErrorMapUndecodableEvent;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBufUtil;
import com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.core.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.core.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.core.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.core.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.core.endpoint.EndpointContext;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.io.IoContext;
import com.couchbase.client.core.io.netty.kv.ChannelAttributes;
import com.couchbase.client.core.io.netty.kv.ConnectTimings;
import com.couchbase.client.core.io.netty.kv.ErrorMap;
import com.couchbase.client.core.io.netty.kv.MemcacheProtocol;
import com.couchbase.client.core.msg.kv.BaseKeyValueRequest;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Stability.Internal
public class ErrorMapLoadingHandler
extends ChannelDuplexHandler {
    private static final short MAP_VERSION = 1;
    private final EndpointContext endpointContext;
    private final Duration timeout;
    private IoContext ioContext;
    private ChannelPromise interceptedConnectPromise;

    public ErrorMapLoadingHandler(EndpointContext endpointContext) {
        this.endpointContext = endpointContext;
        this.timeout = endpointContext.environment().timeoutConfig().connectTimeout();
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        this.interceptedConnectPromise = promise;
        ChannelPromise downstream = ctx.newPromise();
        downstream.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
            if (!f.isSuccess() && !this.interceptedConnectPromise.isDone()) {
                ConnectTimings.record(ctx.channel(), this.getClass());
                this.interceptedConnectPromise.tryFailure(f.cause());
            }
        }));
        ctx.connect(remoteAddress, localAddress, downstream);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ioContext = new IoContext(this.endpointContext, ctx.channel().localAddress(), ctx.channel().remoteAddress(), this.endpointContext.bucket());
        ctx.executor().schedule(() -> {
            if (!this.interceptedConnectPromise.isDone()) {
                ConnectTimings.stop(ctx.channel(), this.getClass(), true);
                this.interceptedConnectPromise.tryFailure(new TimeoutException("KV Error Map loading timed out after " + this.timeout.toMillis() + "ms"));
            }
        }, this.timeout.toNanos(), TimeUnit.NANOSECONDS);
        ConnectTimings.start(ctx.channel(), this.getClass());
        ctx.writeAndFlush(this.buildErrorMapRequest(ctx));
        ctx.fireChannelActive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Optional<Duration> latency = ConnectTimings.stop(ctx.channel(), this.getClass(), false);
        if (msg instanceof ByteBuf) {
            if (MemcacheProtocol.successful((ByteBuf)msg)) {
                Optional<ErrorMap> loadedMap = this.extractErrorMap((ByteBuf)msg);
                loadedMap.ifPresent(errorMap -> ctx.channel().attr(ChannelAttributes.ERROR_MAP_KEY).set((ErrorMap)errorMap));
                this.endpointContext.environment().eventBus().publish(new ErrorMapLoadedEvent(this.ioContext, latency.orElse(Duration.ZERO), loadedMap));
            } else {
                this.endpointContext.environment().eventBus().publish(new ErrorMapLoadingFailedEvent(this.ioContext, latency.orElse(Duration.ZERO), MemcacheProtocol.status((ByteBuf)msg)));
            }
            this.interceptedConnectPromise.trySuccess();
            ctx.pipeline().remove(this);
        } else {
            this.interceptedConnectPromise.tryFailure(new CouchbaseException("Unexpected response type on channel read, this is a bug - please report. " + msg));
        }
        ReferenceCountUtil.release(msg);
    }

    private Optional<ErrorMap> extractErrorMap(ByteBuf msg) {
        Optional<ByteBuf> body = MemcacheProtocol.body(msg);
        if (body.isPresent()) {
            byte[] input = ByteBufUtil.getBytes(body.get());
            try {
                return Optional.of(ErrorMap.fromJson(input));
            }
            catch (IOException e) {
                this.endpointContext.environment().eventBus().publish(new ErrorMapUndecodableEvent(this.ioContext, e.getMessage(), new String(input, StandardCharsets.UTF_8)));
                return Optional.empty();
            }
        }
        this.endpointContext.environment().eventBus().publish(new ErrorMapUndecodableEvent(this.ioContext, "No content in response", ""));
        return Optional.empty();
    }

    private ByteBuf buildErrorMapRequest(ChannelHandlerContext ctx) {
        ByteBuf body = ctx.alloc().buffer(2).writeShort(1);
        ByteBuf request = MemcacheProtocol.request(ctx.alloc(), MemcacheProtocol.Opcode.ERROR_MAP, MemcacheProtocol.noDatatype(), MemcacheProtocol.noPartition(), BaseKeyValueRequest.nextOpaque(), MemcacheProtocol.noCas(), MemcacheProtocol.noExtras(), MemcacheProtocol.noKey(), body);
        body.release();
        return request;
    }
}

