/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint;

import com.couchbase.client.core.RequestCancelledException;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.ResponseHandler;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.DecodingState;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import rx.subjects.Subject;

public abstract class AbstractGenericHandler<RESPONSE, ENCODED, REQUEST extends CouchbaseRequest>
extends MessageToMessageCodec<RESPONSE, REQUEST> {
    protected static final Charset CHARSET = CharsetUtil.UTF_8;
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(AbstractGenericHandler.class);
    private final EventSink<ResponseEvent> responseBuffer;
    private final AbstractEndpoint endpoint;
    private final Queue<REQUEST> sentRequestQueue;
    private REQUEST currentRequest;
    private DecodingState currentDecodingState;

    protected AbstractGenericHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer) {
        this(endpoint, responseBuffer, new ArrayDeque());
    }

    protected AbstractGenericHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, Queue<REQUEST> queue) {
        this.endpoint = endpoint;
        this.responseBuffer = responseBuffer;
        this.sentRequestQueue = queue;
        this.currentDecodingState = DecodingState.INITIAL;
    }

    protected abstract ENCODED encodeRequest(ChannelHandlerContext var1, REQUEST var2) throws Exception;

    protected abstract CouchbaseResponse decodeResponse(ChannelHandlerContext var1, RESPONSE var2) throws Exception;

    @Override
    protected void encode(ChannelHandlerContext ctx, REQUEST msg, List<Object> out) throws Exception {
        ENCODED request = this.encodeRequest(ctx, msg);
        this.sentRequestQueue.offer(msg);
        out.add(request);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, RESPONSE msg, List<Object> out) throws Exception {
        CouchbaseResponse response;
        if (this.currentDecodingState == DecodingState.INITIAL) {
            this.currentRequest = (CouchbaseRequest)this.sentRequestQueue.poll();
            this.currentDecodingState = DecodingState.STARTED;
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Started decoding of " + this.currentRequest);
            }
        }
        if ((response = this.decodeResponse(ctx, msg)) != null) {
            this.publishResponse(response, this.currentRequest.observable());
        }
        if (this.currentDecodingState == DecodingState.FINISHED) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Finished decoding of " + this.currentRequest);
            }
            this.currentRequest = null;
            this.currentDecodingState = DecodingState.INITIAL;
        }
    }

    protected void publishResponse(CouchbaseResponse response, Subject<CouchbaseResponse, CouchbaseResponse> observable) {
        this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, response, observable);
    }

    protected void finishedDecoding() {
        this.currentDecodingState = DecodingState.FINISHED;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Channel Inactive.");
        this.endpoint.notifyChannelInactive();
        ctx.fireChannelInactive();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Channel Active.");
        ctx.fireChannelActive();
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (!ctx.channel().isWritable()) {
            ctx.flush();
        }
        ctx.fireChannelWritabilityChanged();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Connection reset by peer: " + cause.getMessage(), cause);
            } else {
                LOGGER.info(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Connection reset by peer: " + cause.getMessage());
            }
            this.handleOutstandingOperations(ctx);
        } else {
            LOGGER.warn(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Caught unknown exception: " + cause.getMessage(), cause);
            ctx.fireExceptionCaught(cause);
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        this.handleOutstandingOperations(ctx);
    }

    private void handleOutstandingOperations(ChannelHandlerContext ctx) {
        if (this.sentRequestQueue.isEmpty()) {
            LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Not cancelling operations - sent queue is empty.");
            return;
        }
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Cancelling " + this.sentRequestQueue.size() + " outstanding requests.");
        while (!this.sentRequestQueue.isEmpty()) {
            CouchbaseRequest req = (CouchbaseRequest)this.sentRequestQueue.poll();
            try {
                req.observable().onError((Throwable)new RequestCancelledException("Request cancelled in-flight."));
            }
            catch (Exception ex) {
                LOGGER.info("Exception thrown while cancelling outstanding operation: " + req, ex);
            }
        }
    }

    protected REQUEST currentRequest() {
        return this.currentRequest;
    }

    protected CoreEnvironment env() {
        return this.endpoint.environment();
    }

    protected static String logIdent(ChannelHandlerContext ctx, Endpoint endpoint) {
        return "[" + ctx.channel().remoteAddress() + "][" + endpoint.getClass().getSimpleName() + "]: ";
    }
}

