/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.RequestCancelledException;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.ResponseHandler;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.DecodingState;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleState;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleStateEvent;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.Subject;

public abstract class AbstractGenericHandler<RESPONSE, ENCODED, REQUEST extends CouchbaseRequest>
extends MessageToMessageCodec<RESPONSE, REQUEST> {
    protected static final Charset CHARSET = CharsetUtil.UTF_8;
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(AbstractGenericHandler.class);
    private final EventSink<ResponseEvent> responseBuffer;
    private final AbstractEndpoint endpoint;
    private final Queue<REQUEST> sentRequestQueue;
    private final boolean isTransient;
    private REQUEST currentRequest;
    private DecodingState currentDecodingState;

    protected AbstractGenericHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, boolean isTransient) {
        this(endpoint, responseBuffer, new ArrayDeque(), isTransient);
    }

    protected AbstractGenericHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, Queue<REQUEST> queue, boolean isTransient) {
        this.endpoint = endpoint;
        this.responseBuffer = responseBuffer;
        this.sentRequestQueue = queue;
        this.currentDecodingState = DecodingState.INITIAL;
        this.isTransient = isTransient;
    }

    protected abstract ENCODED encodeRequest(ChannelHandlerContext var1, REQUEST var2) throws Exception;

    protected abstract CouchbaseResponse decodeResponse(ChannelHandlerContext var1, RESPONSE var2) throws Exception;

    @Override
    protected void encode(ChannelHandlerContext ctx, REQUEST msg, List<Object> out) throws Exception {
        ENCODED request = this.encodeRequest(ctx, msg);
        this.sentRequestQueue.offer(msg);
        out.add(request);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, RESPONSE msg, List<Object> out) throws Exception {
        if (this.currentDecodingState == DecodingState.INITIAL) {
            this.currentRequest = (CouchbaseRequest)this.sentRequestQueue.poll();
            this.currentDecodingState = DecodingState.STARTED;
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Started decoding of " + this.currentRequest);
            }
        }
        try {
            CouchbaseResponse response = this.decodeResponse(ctx, msg);
            if (response != null) {
                this.publishResponse(response, this.currentRequest.observable());
            }
        }
        catch (CouchbaseException e) {
            this.currentRequest.observable().onError((Throwable)e);
        }
        catch (Exception e) {
            this.currentRequest.observable().onError((Throwable)new CouchbaseException(e));
        }
        if (this.currentDecodingState == DecodingState.FINISHED) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Finished decoding of " + this.currentRequest);
            }
            this.currentRequest = null;
            this.currentDecodingState = DecodingState.INITIAL;
        }
    }

    protected void publishResponse(final CouchbaseResponse response, final Subject<CouchbaseResponse, CouchbaseResponse> observable) {
        if (response.status() != ResponseStatus.RETRY && observable != null) {
            final Scheduler.Worker worker = this.env().scheduler().createWorker();
            worker.schedule(new Action0(){

                public void call() {
                    try {
                        observable.onNext((Object)response);
                        observable.onCompleted();
                    }
                    catch (Exception ex) {
                        LOGGER.warn("Caught exception while onNext on observable", ex);
                        observable.onError((Throwable)ex);
                    }
                    finally {
                        worker.unsubscribe();
                    }
                }
            });
        } else {
            this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, response, observable);
        }
    }

    protected void finishedDecoding() {
        this.currentDecodingState = DecodingState.FINISHED;
        if (this.isTransient) {
            this.endpoint.disconnect();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Channel Inactive.");
        this.endpoint.notifyChannelInactive();
        ctx.fireChannelInactive();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Channel Active.");
        ctx.fireChannelActive();
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (!ctx.channel().isWritable()) {
            ctx.flush();
        }
        ctx.fireChannelWritabilityChanged();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Connection reset by peer: " + cause.getMessage(), cause);
            } else {
                LOGGER.info(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Connection reset by peer: " + cause.getMessage());
            }
            this.handleOutstandingOperations(ctx);
        } else {
            LOGGER.warn(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Caught unknown exception: " + cause.getMessage(), cause);
            ctx.fireExceptionCaught(cause);
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        this.handleOutstandingOperations(ctx);
    }

    private void handleOutstandingOperations(ChannelHandlerContext ctx) {
        if (this.sentRequestQueue.isEmpty()) {
            LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Not cancelling operations - sent queue is empty.");
            return;
        }
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Cancelling " + this.sentRequestQueue.size() + " outstanding requests.");
        while (!this.sentRequestQueue.isEmpty()) {
            CouchbaseRequest req = (CouchbaseRequest)this.sentRequestQueue.poll();
            try {
                this.sideEffectRequestToCancel(req);
                req.observable().onError((Throwable)new RequestCancelledException("Request cancelled in-flight."));
            }
            catch (Exception ex) {
                LOGGER.info("Exception thrown while cancelling outstanding operation: " + req, ex);
            }
        }
    }

    protected void sideEffectRequestToCancel(REQUEST request) {
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent)evt;
            if (e.state() == IdleState.ALL_IDLE) {
                CouchbaseRequest keepAlive = this.createKeepAliveRequest();
                if (keepAlive != null) {
                    keepAlive.observable().subscribe((Action1)new KeepAliveResponseAction(ctx));
                    this.onKeepAliveFired(ctx, keepAlive);
                    ctx.pipeline().writeAndFlush(keepAlive);
                }
                return;
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    protected CouchbaseRequest createKeepAliveRequest() {
        return null;
    }

    protected void onKeepAliveFired(ChannelHandlerContext ctx, CouchbaseRequest keepAliveRequest) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "KeepAlive fired");
        }
    }

    protected void onKeepAliveResponse(ChannelHandlerContext ctx, CouchbaseResponse keepAliveResponse) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "keepAlive was answered, status " + (Object)((Object)keepAliveResponse.status()));
        }
    }

    protected REQUEST currentRequest() {
        return this.currentRequest;
    }

    protected void currentRequest(REQUEST request) {
        this.currentRequest = request;
    }

    protected CoreEnvironment env() {
        return this.endpoint.environment();
    }

    protected static String logIdent(ChannelHandlerContext ctx, Endpoint endpoint) {
        return "[" + ctx.channel().remoteAddress() + "][" + endpoint.getClass().getSimpleName() + "]: ";
    }

    private class KeepAliveResponseAction
    implements Action1<CouchbaseResponse> {
        private final ChannelHandlerContext ctx;

        public KeepAliveResponseAction(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        public void call(CouchbaseResponse couchbaseResponse) {
            AbstractGenericHandler.this.onKeepAliveResponse(this.ctx, couchbaseResponse);
        }
    }
}

