/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.io.netty.kv;

import com.couchbase.client.core.cnc.CbTracing;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.cnc.events.io.ChannelClosedProactivelyEvent;
import com.couchbase.client.core.cnc.events.io.CollectionOutdatedHandledEvent;
import com.couchbase.client.core.cnc.events.io.InvalidRequestDetectedEvent;
import com.couchbase.client.core.cnc.events.io.KeyValueErrorMapCodeHandledEvent;
import com.couchbase.client.core.cnc.events.io.NotMyVbucketReceivedEvent;
import com.couchbase.client.core.cnc.events.io.UnknownResponseReceivedEvent;
import com.couchbase.client.core.cnc.events.io.UnknownResponseStatusReceivedEvent;
import com.couchbase.client.core.cnc.events.io.UnknownServerPushRequestReceivedEvent;
import com.couchbase.client.core.cnc.events.io.UnsupportedResponseTypeReceivedEvent;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.MemcachedBucketConfig;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBufUtil;
import com.couchbase.client.core.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.core.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.core.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.core.deps.io.netty.util.collection.IntObjectHashMap;
import com.couchbase.client.core.deps.io.netty.util.collection.IntObjectMap;
import com.couchbase.client.core.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.core.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.core.endpoint.BaseEndpoint;
import com.couchbase.client.core.endpoint.EndpointContext;
import com.couchbase.client.core.env.CompressionConfig;
import com.couchbase.client.core.error.CollectionNotFoundException;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.RangeScanPartitionFailedException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.io.CollectionMap;
import com.couchbase.client.core.io.IoContext;
import com.couchbase.client.core.io.netty.HandlerUtils;
import com.couchbase.client.core.io.netty.TracingUtils;
import com.couchbase.client.core.io.netty.kv.ChannelAttributes;
import com.couchbase.client.core.io.netty.kv.ErrorMap;
import com.couchbase.client.core.io.netty.kv.KeyValueChannelContext;
import com.couchbase.client.core.io.netty.kv.MemcacheProtocol;
import com.couchbase.client.core.io.netty.kv.ServerFeature;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.KeyValueRequest;
import com.couchbase.client.core.msg.kv.RangeScanContinueRequest;
import com.couchbase.client.core.msg.kv.RangeScanContinueResponse;
import com.couchbase.client.core.msg.kv.UnlockRequest;
import com.couchbase.client.core.retry.RetryOrchestrator;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.UnsignedLEB128;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class KeyValueMessageHandler
extends ChannelDuplexHandler {
    private final EndpointContext endpointContext;
    private final IntObjectMap<KeyValueRequest<Response>> writtenRequests;
    private final IntObjectMap<RequestSpan> writtenRequestDispatchSpans;
    private final IntObjectMap<Long> writtenRequestDispatchTimings;
    private final CompressionConfig compressionConfig;
    private final EventBus eventBus;
    private final Optional<String> bucketName;
    private final BaseEndpoint endpoint;
    private IoContext ioContext;
    private KeyValueChannelContext channelContext;
    private ErrorMap errorMap;
    private final boolean isInternalTracer;

    public KeyValueMessageHandler(BaseEndpoint endpoint, EndpointContext endpointContext, Optional<String> bucketName) {
        this.endpoint = endpoint;
        this.endpointContext = endpointContext;
        this.writtenRequests = new IntObjectHashMap<KeyValueRequest<Response>>();
        this.writtenRequestDispatchTimings = new IntObjectHashMap<Long>();
        this.writtenRequestDispatchSpans = new IntObjectHashMap<RequestSpan>();
        this.compressionConfig = endpointContext.environment().compressionConfig();
        this.eventBus = endpointContext.environment().eventBus();
        this.bucketName = bucketName;
        this.isInternalTracer = CbTracing.isInternalTracer(endpointContext.environment().requestTracer());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        boolean preserveTtl;
        this.ioContext = new IoContext(this.endpointContext, ctx.channel().localAddress(), ctx.channel().remoteAddress(), this.endpointContext.bucket());
        this.errorMap = ctx.channel().attr(ChannelAttributes.ERROR_MAP_KEY).get();
        Set<ServerFeature> features = ctx.channel().attr(ChannelAttributes.SERVER_FEATURE_KEY).get();
        boolean compression = features != null && features.contains((Object)ServerFeature.SNAPPY);
        boolean collections = features != null && features.contains((Object)ServerFeature.COLLECTIONS);
        boolean mutationTokens = features != null && features.contains((Object)ServerFeature.MUTATION_SEQNO);
        boolean syncReplication = features != null && features.contains((Object)ServerFeature.SYNC_REPLICATION);
        boolean altRequest = features != null && features.contains((Object)ServerFeature.ALT_REQUEST);
        boolean vattrEnabled = features != null && features.contains((Object)ServerFeature.VATTR);
        boolean createAsDeleted = features != null && features.contains((Object)ServerFeature.CREATE_AS_DELETED);
        boolean bl = preserveTtl = features != null && features.contains((Object)ServerFeature.PRESERVE_TTL);
        if (syncReplication && !altRequest) {
            throw new IllegalStateException("If Synchronous Replication is enabled, the server also must negotiate Alternate Requests. This is a bug! - please report.");
        }
        this.channelContext = new KeyValueChannelContext(compression ? this.compressionConfig : null, collections, mutationTokens, this.bucketName, syncReplication, vattrEnabled, altRequest, this.ioContext.core().configurationProvider().collectionMap(), ctx.channel().id(), createAsDeleted, preserveTtl);
        ctx.fireChannelActive();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof KeyValueRequest) {
            KeyValueRequest request = (KeyValueRequest)msg;
            int opaque = request.opaque();
            this.writtenRequests.put(opaque, (KeyValueRequest<Response>)request);
            try {
                ctx.write(request.encode(ctx.alloc(), opaque, this.channelContext), promise);
                this.writtenRequestDispatchTimings.put(opaque, (Long)System.nanoTime());
                if (request.requestSpan() != null) {
                    RequestTracer tracer = this.endpointContext.environment().requestTracer();
                    RequestSpan dispatchSpan = tracer.requestSpan("dispatch_to_server", request.requestSpan());
                    if (!this.isInternalTracer) {
                        TracingUtils.setCommonDispatchSpanAttributes(dispatchSpan, ctx.channel().attr(ChannelAttributes.CHANNEL_ID_KEY).get(), this.ioContext.localHostname(), this.ioContext.localPort(), this.endpoint.remoteHostname(), this.endpoint.remotePort(), null);
                        TracingUtils.setNumericOperationId(dispatchSpan, request.opaque());
                        TracingUtils.setCommonKVSpanAttributes(dispatchSpan, request);
                    }
                    this.writtenRequestDispatchSpans.put(opaque, dispatchSpan);
                }
            }
            catch (Throwable err) {
                this.writtenRequests.remove(opaque);
                if (err instanceof CollectionNotFoundException && this.channelContext.collectionsEnabled()) {
                    ConfigurationProvider cp = this.ioContext.core().configurationProvider();
                    if (cp.collectionRefreshInProgress(request.collectionIdentifier())) {
                        RetryOrchestrator.maybeRetry(this.ioContext, request, RetryReason.COLLECTION_MAP_REFRESH_IN_PROGRESS);
                    } else if (cp.config().bucketConfig(request.bucket()) instanceof MemcachedBucketConfig) {
                        request.fail(FeatureNotAvailableException.collectionsForMemcached());
                    } else {
                        this.handleOutdatedCollection(request, RetryReason.COLLECTION_NOT_FOUND);
                    }
                    return;
                }
                request.fail(err);
            }
        } else {
            this.eventBus.publish(new InvalidRequestDetectedEvent(this.ioContext, ServiceType.KV, msg));
            ctx.channel().close().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> this.eventBus.publish(new ChannelClosedProactivelyEvent(this.ioContext, ChannelClosedProactivelyEvent.Reason.INVALID_REQUEST_DETECTED))));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (msg instanceof ByteBuf) {
                this.decode(ctx, (ByteBuf)msg);
            } else {
                this.ioContext.environment().eventBus().publish(new UnsupportedResponseTypeReceivedEvent(this.ioContext, msg));
                HandlerUtils.closeChannelWithReason(this.ioContext, ctx, ChannelClosedProactivelyEvent.Reason.INVALID_RESPONSE_FORMAT_DETECTED);
            }
        }
        finally {
            if (this.endpoint != null) {
                this.endpoint.markRequestCompletion();
            }
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        for (KeyValueRequest request : this.writtenRequests.values()) {
            RetryOrchestrator.maybeRetry(this.ioContext, request, RetryReason.CHANNEL_CLOSED_WHILE_IN_FLIGHT);
        }
        ctx.fireChannelInactive();
    }

    private void decode(ChannelHandlerContext ctx, ByteBuf response) {
        ErrorMap.ErrorCode errorCode;
        boolean serverPush;
        boolean bl = serverPush = MemcacheProtocol.magic(response) == MemcacheProtocol.Magic.SERVER_PUSH_REQUEST.magic();
        if (serverPush) {
            this.handleServerPushRequest(ctx, response);
            return;
        }
        int opaque = MemcacheProtocol.opaque(response);
        KeyValueRequest<Response> request = this.writtenRequests.remove(opaque);
        if (request == null) {
            this.handleUnknownResponseReceived(ctx, response);
            return;
        }
        long originalStart = this.completeRequestTimings(request, response, opaque);
        short statusCode = MemcacheProtocol.status(response);
        ResponseStatus status = MemcacheProtocol.decodeStatus(statusCode);
        ErrorMap.ErrorCode errorCode2 = errorCode = status != ResponseStatus.SUCCESS ? this.decodeErrorCode(statusCode) : null;
        if (errorCode != null) {
            request.errorCode(errorCode);
        }
        boolean errorUnknown = false;
        if (status == ResponseStatus.UNKNOWN) {
            errorUnknown = true;
            if (errorCode != null) {
                this.ioContext.environment().eventBus().publish(new KeyValueErrorMapCodeHandledEvent(this.ioContext, errorCode));
                status = this.handleErrorCode(ctx, errorCode);
            }
            this.ioContext.environment().eventBus().publish(new UnknownResponseStatusReceivedEvent(this.ioContext, statusCode));
        }
        boolean isRangeScanContinue = request instanceof RangeScanContinueRequest;
        if (status == ResponseStatus.NOT_MY_VBUCKET && !isRangeScanContinue) {
            this.handleNotMyVbucket(request, response);
        } else if (status == ResponseStatus.UNKNOWN_COLLECTION) {
            this.handleOutdatedCollection(request, RetryReason.KV_COLLECTION_OUTDATED);
        } else if (errorUnknown && this.errorMapIndicatesRetry(errorCode)) {
            RetryOrchestrator.maybeRetry(this.ioContext, request, RetryReason.KV_ERROR_MAP_INDICATED);
        } else if (this.statusIndicatesInvalidChannel(status)) {
            HandlerUtils.closeChannelWithReason(this.ioContext, ctx, ChannelClosedProactivelyEvent.Reason.KV_RESPONSE_CONTAINED_CLOSE_INDICATION);
        } else {
            this.retryOrComplete(request, response, status, isRangeScanContinue, originalStart);
        }
    }

    private void handleServerPushRequest(ChannelHandlerContext ctx, ByteBuf request) {
        byte opcodeByte = MemcacheProtocol.opcode(request);
        MemcacheProtocol.ServerPushOpcode opcode = MemcacheProtocol.ServerPushOpcode.of(opcodeByte);
        if (opcode == MemcacheProtocol.ServerPushOpcode.CLUSTERMAP_CHANGE_NOTIFICATION) {
            this.handleClustermapChangeNotification(request);
            return;
        }
        this.ioContext.environment().eventBus().publish(new UnknownServerPushRequestReceivedEvent(this.ioContext, ByteBufUtil.getBytes(request)));
    }

    private void handleClustermapChangeNotification(ByteBuf request) {
    }

    private void retryOrComplete(KeyValueRequest<Response> request, ByteBuf response, ResponseStatus status, boolean isRangeScanContinue, long start) {
        RetryReason retryReason = this.statusCodeIndicatesRetry(status, request);
        if (retryReason == null) {
            if (isRangeScanContinue) {
                this.decodeAndCompleteRangeScanContinue(request, response, start);
            } else if (!request.completed()) {
                this.decodeAndComplete(request, response);
            } else {
                this.ioContext.environment().orphanReporter().report(request);
            }
        } else {
            RetryOrchestrator.maybeRetry(this.ioContext, request, retryReason);
        }
    }

    private long completeRequestTimings(KeyValueRequest<Response> request, ByteBuf response, int opaque) {
        long serverTime = MemcacheProtocol.parseServerDurationFromResponse(response);
        request.context().serverLatency(serverTime);
        long start = this.writtenRequestDispatchTimings.remove(opaque);
        request.context().dispatchLatency(System.nanoTime() - start);
        RequestSpan dispatchSpan = this.writtenRequestDispatchSpans.remove(opaque);
        if (dispatchSpan != null) {
            if (!this.isInternalTracer) {
                TracingUtils.setServerDurationAttribute(dispatchSpan, serverTime);
            }
            dispatchSpan.end();
        }
        return start;
    }

    private void decodeAndComplete(KeyValueRequest<Response> request, ByteBuf response) {
        try {
            Response decoded = request.decode(response, this.channelContext);
            request.succeed(decoded);
        }
        catch (Throwable t) {
            request.fail(new DecodingFailureException(t));
        }
    }

    private void decodeAndCompleteRangeScanContinue(KeyValueRequest<Response> request, ByteBuf response, long originalStart) {
        ResponseStatus status;
        boolean expectedStatus;
        RangeScanContinueResponse decoded = (RangeScanContinueResponse)request.decode(response, this.channelContext);
        if (!request.completed()) {
            request.succeed(decoded);
        }
        boolean bl = expectedStatus = (status = decoded.status()) == ResponseStatus.COMPLETE || status == ResponseStatus.CONTINUE || status == ResponseStatus.SUCCESS;
        if (!expectedStatus) {
            decoded.failFeed(new RangeScanPartitionFailedException("Stream continue failed with non-successful response status", status));
            return;
        }
        boolean hasLastItem = status == ResponseStatus.COMPLETE;
        boolean completeStream = hasLastItem || status == ResponseStatus.CONTINUE;
        decoded.feedItems(MemcacheProtocol.body(response).orElse(Unpooled.EMPTY_BUFFER), hasLastItem, completeStream);
        if (decoded.status() == ResponseStatus.SUCCESS) {
            this.writtenRequests.put(request.opaque(), request);
            this.writtenRequestDispatchTimings.put(request.opaque(), (Long)originalStart);
        }
    }

    private boolean statusIndicatesInvalidChannel(ResponseStatus status) {
        return status == ResponseStatus.INTERNAL_SERVER_ERROR || status == ResponseStatus.NO_BUCKET && this.bucketName.isPresent() || status == ResponseStatus.NOT_INITIALIZED;
    }

    private void handleUnknownResponseReceived(ChannelHandlerContext ctx, ByteBuf response) {
        byte[] packet = ByteBufUtil.getBytes(response);
        this.ioContext.environment().eventBus().publish(new UnknownResponseReceivedEvent(this.ioContext, packet));
        HandlerUtils.closeChannelWithReason(this.ioContext, ctx, ChannelClosedProactivelyEvent.Reason.KV_RESPONSE_CONTAINED_UNKNOWN_OPAQUE);
    }

    private ResponseStatus handleErrorCode(ChannelHandlerContext ctx, ErrorMap.ErrorCode errorCode) {
        if (errorCode.attributes().contains((Object)ErrorMap.ErrorAttribute.CONN_STATE_INVALIDATED)) {
            HandlerUtils.closeChannelWithReason(this.ioContext, ctx, ChannelClosedProactivelyEvent.Reason.KV_RESPONSE_CONTAINED_CLOSE_INDICATION);
            return ResponseStatus.UNKNOWN;
        }
        if (errorCode.attributes().contains((Object)ErrorMap.ErrorAttribute.TEMP)) {
            return ResponseStatus.TEMPORARY_FAILURE;
        }
        if (errorCode.attributes().contains((Object)ErrorMap.ErrorAttribute.AUTH)) {
            return ResponseStatus.NO_ACCESS;
        }
        if (errorCode.attributes().contains((Object)ErrorMap.ErrorAttribute.ITEM_LOCKED)) {
            return ResponseStatus.LOCKED;
        }
        return ResponseStatus.UNKNOWN;
    }

    private boolean errorMapIndicatesRetry(ErrorMap.ErrorCode errorCode) {
        return errorCode != null && (errorCode.attributes().contains((Object)ErrorMap.ErrorAttribute.RETRY_NOW) || errorCode.attributes().contains((Object)ErrorMap.ErrorAttribute.RETRY_LATER));
    }

    private RetryReason statusCodeIndicatesRetry(ResponseStatus status, Request<?> request) {
        switch (status) {
            case LOCKED: {
                return request instanceof UnlockRequest ? null : RetryReason.KV_LOCKED;
            }
            case TEMPORARY_FAILURE: {
                return RetryReason.KV_TEMPORARY_FAILURE;
            }
            case SYNC_WRITE_IN_PROGRESS: {
                return RetryReason.KV_SYNC_WRITE_IN_PROGRESS;
            }
            case SYNC_WRITE_RE_COMMIT_IN_PROGRESS: {
                return RetryReason.KV_SYNC_WRITE_RE_COMMIT_IN_PROGRESS;
            }
        }
        return null;
    }

    private ErrorMap.ErrorCode decodeErrorCode(short statusCode) {
        return this.errorMap != null ? this.errorMap.errors().get(statusCode) : null;
    }

    private void handleNotMyVbucket(KeyValueRequest<? extends Response> request, ByteBuf response) {
        request.indicateRejectedWithNotMyVbucket();
        this.eventBus.publish(new NotMyVbucketReceivedEvent(this.ioContext, request.partition()));
        String origin = request.context().lastDispatchedTo() != null ? request.context().lastDispatchedTo().host() : null;
        RetryOrchestrator.maybeRetry(this.ioContext, request, RetryReason.KV_NOT_MY_VBUCKET);
        String body = MemcacheProtocol.bodyAsString(response).trim();
        if (body.startsWith("{")) {
            this.ioContext.core().configurationProvider().proposeBucketConfig(new ProposedBucketConfigContext(request.bucket(), body, origin));
        }
    }

    private void handleOutdatedCollection(KeyValueRequest<? extends Response> request, RetryReason retryReason) {
        this.eventBus.publish(new CollectionOutdatedHandledEvent(request.collectionIdentifier(), retryReason, new OutdatedCollectionContext(this.ioContext, this.ioContext.core().configurationProvider().collectionMap())));
        this.ioContext.core().configurationProvider().refreshCollectionId(request.collectionIdentifier());
        RetryOrchestrator.maybeRetry(this.ioContext, request, retryReason);
    }

    static class OutdatedCollectionContext
    extends IoContext {
        private final CollectionMap collectionMap;

        public OutdatedCollectionContext(IoContext ioContext, CollectionMap collectionMap) {
            super(ioContext, ioContext.localSocket(), ioContext.remoteSocket(), ioContext.bucket());
            this.collectionMap = collectionMap;
        }

        @Override
        public void injectExportableParams(Map<String, Object> input) {
            super.injectExportableParams(input);
            input.put("open", this.collectionMap.inner().entrySet().stream().map(e -> {
                HashMap<String, Object> converted = new HashMap<String, Object>(((CollectionIdentifier)e.getKey()).toMap());
                converted.put("id", "0x" + Long.toHexString(UnsignedLEB128.decode((byte[])e.getValue())));
                return converted;
            }).collect(Collectors.toList()));
        }
    }
}

