/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.ResponseHandler;
import com.couchbase.client.core.endpoint.BootstrapAdapter;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.endpoint.SSLEngineFactory;
import com.couchbase.client.core.endpoint.kv.AuthenticationException;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.SignalConfigReload;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import com.couchbase.client.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelFutureListener;
import com.couchbase.client.deps.io.netty.channel.ChannelHandler;
import com.couchbase.client.deps.io.netty.channel.ChannelInitializer;
import com.couchbase.client.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.deps.io.netty.channel.ChannelPipeline;
import com.couchbase.client.deps.io.netty.channel.epoll.EpollEventLoopGroup;
import com.couchbase.client.deps.io.netty.channel.epoll.EpollSocketChannel;
import com.couchbase.client.deps.io.netty.channel.oio.OioEventLoopGroup;
import com.couchbase.client.deps.io.netty.channel.socket.nio.NioSocketChannel;
import com.couchbase.client.deps.io.netty.channel.socket.oio.OioSocketChannel;
import com.couchbase.client.deps.io.netty.handler.logging.LogLevel;
import com.couchbase.client.deps.io.netty.handler.logging.LoggingHandler;
import com.couchbase.client.deps.io.netty.handler.ssl.SslHandler;
import com.couchbase.client.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.subjects.AsyncSubject;
import rx.subjects.Subject;

public abstract class AbstractEndpoint
extends AbstractStateMachine<LifecycleState>
implements Endpoint {
    public static final int MAX_RECONNECT_DELAY = 4096;
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Endpoint.class);
    private static final ChannelHandler LOGGING_HANDLER_INSTANCE = new LoggingHandler(LogLevel.TRACE);
    private static final NotConnectedException NOT_CONNECTED_EXCEPTION = new NotConnectedException();
    private static final WriteLogListener WRITE_LOG_LISTENER = new WriteLogListener();
    private final BootstrapAdapter bootstrap;
    private final String bucket;
    private final String password;
    private final RingBuffer<ResponseEvent> responseBuffer;
    private final CoreEnvironment env;
    private SSLEngineFactory sslEngineFactory;
    private volatile Channel channel;
    private volatile boolean hasWritten;
    private volatile long reconnectAttempt = 1L;

    protected AbstractEndpoint(String bucket, String password, BootstrapAdapter adapter) {
        super(LifecycleState.DISCONNECTED);
        this.bootstrap = adapter;
        this.bucket = bucket;
        this.password = password;
        this.responseBuffer = null;
        this.env = null;
    }

    protected AbstractEndpoint(String hostname, String bucket, String password, int port, final CoreEnvironment environment, RingBuffer<ResponseEvent> responseBuffer) {
        super(LifecycleState.DISCONNECTED);
        this.bucket = bucket;
        this.password = password;
        this.responseBuffer = responseBuffer;
        this.env = environment;
        if (environment.sslEnabled()) {
            this.sslEngineFactory = new SSLEngineFactory(environment);
        }
        Class channelClass = NioSocketChannel.class;
        if (environment.ioPool() instanceof EpollEventLoopGroup) {
            channelClass = EpollSocketChannel.class;
        } else if (environment.ioPool() instanceof OioEventLoopGroup) {
            channelClass = OioSocketChannel.class;
        }
        this.bootstrap = new BootstrapAdapter((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().remoteAddress(hostname, port).group(environment.ioPool())).channel(channelClass)).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)).option(ChannelOption.TCP_NODELAY, false)).handler(new ChannelInitializer<Channel>(){

            @Override
            protected void initChannel(Channel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                if (environment.sslEnabled()) {
                    pipeline.addLast(new SslHandler(AbstractEndpoint.this.sslEngineFactory.get()));
                }
                if (LOGGER.isTraceEnabled()) {
                    pipeline.addLast(LOGGING_HANDLER_INSTANCE);
                }
                AbstractEndpoint.this.customEndpointHandlers(pipeline);
            }
        }));
    }

    protected abstract void customEndpointHandlers(ChannelPipeline var1);

    @Override
    public Observable<LifecycleState> connect() {
        if (this.state() != LifecycleState.DISCONNECTED) {
            return Observable.just(this.state());
        }
        AsyncSubject observable = AsyncSubject.create();
        this.transitionState(LifecycleState.CONNECTING);
        this.doConnect((Subject<LifecycleState, LifecycleState>)observable);
        return observable;
    }

    protected void doConnect(final Subject<LifecycleState, LifecycleState> observable) {
        this.bootstrap.connect().addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (AbstractEndpoint.this.state() == LifecycleState.DISCONNECTING || AbstractEndpoint.this.state() == LifecycleState.DISCONNECTED) {
                    LOGGER.debug(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Endpoint connect completed, " + "but got instructed to disconnect in the meantime.");
                    AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                    AbstractEndpoint.this.channel = null;
                } else if (future.isSuccess()) {
                    AbstractEndpoint.this.channel = future.channel();
                    LOGGER.debug(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Connected Endpoint.");
                    AbstractEndpoint.this.transitionState(LifecycleState.CONNECTED);
                } else if (future.cause() instanceof AuthenticationException) {
                    LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Authentication Failure.");
                    AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                    observable.onError(future.cause());
                } else if (future.cause() instanceof ClosedChannelException) {
                    LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Generic Failure.");
                    AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                    LOGGER.warn(future.cause().getMessage());
                    observable.onError(future.cause());
                } else {
                    long delay = AbstractEndpoint.this.reconnectDelay();
                    LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Could not connect to endpoint, retrying with delay " + delay + "ms: ", future.cause());
                    AbstractEndpoint.this.transitionState(LifecycleState.CONNECTING);
                    future.channel().eventLoop().schedule(new Runnable(){

                        @Override
                        public void run() {
                            AbstractEndpoint.this.doConnect((Subject<LifecycleState, LifecycleState>)observable);
                        }
                    }, delay, TimeUnit.MILLISECONDS);
                }
                observable.onNext(AbstractEndpoint.this.state());
                observable.onCompleted();
            }
        });
    }

    @Override
    public Observable<LifecycleState> disconnect() {
        if (this.state() == LifecycleState.DISCONNECTED || this.state() == LifecycleState.DISCONNECTING) {
            return Observable.just(this.state());
        }
        if (this.state() == LifecycleState.CONNECTING) {
            this.transitionState(LifecycleState.DISCONNECTED);
            return Observable.just(this.state());
        }
        this.transitionState(LifecycleState.DISCONNECTING);
        final AsyncSubject observable = AsyncSubject.create();
        this.channel.disconnect().addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    LOGGER.debug(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Disconnected Endpoint.");
                } else {
                    LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Received an error " + "during disconnect.", future.cause());
                }
                AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                observable.onNext(AbstractEndpoint.this.state());
                observable.onCompleted();
                AbstractEndpoint.this.channel = null;
            }
        });
        return observable;
    }

    @Override
    public void send(CouchbaseRequest request) {
        if (this.state() == LifecycleState.CONNECTED) {
            if (request instanceof SignalFlush) {
                if (this.hasWritten) {
                    this.channel.flush();
                    this.hasWritten = false;
                }
            } else if (this.channel.isWritable()) {
                this.channel.write(request).addListener(WRITE_LOG_LISTENER);
                this.hasWritten = true;
            } else {
                this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, request, request.observable());
            }
        } else {
            if (request instanceof SignalFlush) {
                return;
            }
            request.observable().onError((Throwable)NOT_CONNECTED_EXCEPTION);
        }
    }

    public void notifyChannelInactive() {
        LOGGER.debug(AbstractEndpoint.logIdent(this.channel, this) + "Got notified from Channel as inactive.");
        this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, SignalConfigReload.INSTANCE, null);
        if (this.state() == LifecycleState.CONNECTED || this.state() == LifecycleState.CONNECTING) {
            this.transitionState(LifecycleState.DISCONNECTED);
            this.connect();
        }
    }

    private long reconnectDelay() {
        int delay;
        return (delay = 1 << (int)this.reconnectAttempt++) >= 4096 ? 4096L : (long)delay;
    }

    protected String bucket() {
        return this.bucket;
    }

    protected String password() {
        return this.password;
    }

    public CoreEnvironment environment() {
        return this.env;
    }

    public RingBuffer<ResponseEvent> responseBuffer() {
        return this.responseBuffer;
    }

    protected static String logIdent(Channel chan, Endpoint endpoint) {
        SocketAddress addr = chan != null ? chan.remoteAddress() : null;
        return "[" + addr + "][" + endpoint.getClass().getSimpleName() + "]: ";
    }

    static {
        NOT_CONNECTED_EXCEPTION.setStackTrace(new StackTraceElement[0]);
    }

    static class WriteLogListener
    implements GenericFutureListener<Future<Void>> {
        WriteLogListener() {
        }

        @Override
        public void operationComplete(Future<Void> future) throws Exception {
            if (!future.isSuccess()) {
                LOGGER.warn("Error during IO write phase.", (Object)future);
            }
        }
    }
}

