/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.folsom.client;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Queues;
import com.spotify.folsom.AbstractRawMemcacheClient;
import com.spotify.folsom.MemcacheClosedException;
import com.spotify.folsom.MemcacheOverloadedException;
import com.spotify.folsom.MemcacheStatus;
import com.spotify.folsom.Metrics;
import com.spotify.folsom.RawMemcacheClient;
import com.spotify.folsom.client.BatchFlusher;
import com.spotify.folsom.client.MemcacheEncoder;
import com.spotify.folsom.client.Request;
import com.spotify.folsom.client.SetRequest;
import com.spotify.folsom.client.SimpleSizeEstimator;
import com.spotify.folsom.client.TcpTuningHandler;
import com.spotify.folsom.client.TimeoutChecker;
import com.spotify.folsom.client.Utils;
import com.spotify.folsom.client.ascii.AsciiMemcacheDecoder;
import com.spotify.folsom.client.binary.BinaryMemcacheDecoder;
import com.spotify.folsom.client.tls.SSLEngineFactory;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.futures.CompletableFutures;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultRawMemcacheClient
extends AbstractRawMemcacheClient {
    private static final AtomicInteger GLOBAL_CONNECTION_COUNT = new AtomicInteger();
    private final int DEFAULT_TIMEOUT_POLL_INTERVAL_MILLIS = 10;
    private final Logger log = LoggerFactory.getLogger(DefaultRawMemcacheClient.class);
    private final AtomicInteger pendingCounter = new AtomicInteger();
    private final int outstandingRequestLimit;
    private final Channel channel;
    private final BatchFlusher flusher;
    private final HostAndPort address;
    private final Executor executor;
    private final long connectionTimeoutMillis;
    private final Metrics metrics;
    private final int maxSetLength;
    private final AtomicReference<String> disconnectReason = new AtomicReference<Object>(null);
    private static final ThreadFactory THREAD_FACTORY = new DefaultThreadFactory(DefaultRawMemcacheClient.class, true);
    private static final Supplier<EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP = Suppliers.memoize(() -> Epoll.isAvailable() ? new EpollEventLoopGroup(0, THREAD_FACTORY) : new NioEventLoopGroup(0, THREAD_FACTORY));
    private final int pendingCounterLimit;
    private final Metrics.OutstandingRequestsGauge pendingRequestGauge = this::numPendingRequests;

    public static CompletionStage<RawMemcacheClient> connect(final HostAndPort address, int outstandingRequestLimit, int eventLoopThreadFlushMaxBatchSize, boolean binary, Executor executor, long connectionTimeoutMillis, Charset charset, Metrics metrics, int maxSetLength, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, final SSLEngineFactory sslEngineFactory) {
        ByteToMessageDecoder decoder = binary ? new BinaryMemcacheDecoder() : new AsciiMemcacheDecoder(charset);
        ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>((ChannelInboundHandler)decoder){
            final /* synthetic */ ChannelInboundHandler val$decoder;
            {
                this.val$decoder = channelInboundHandler;
            }

            protected void initChannel(Channel ch) {
                ChannelPipeline channelPipeline = ch.pipeline();
                channelPipeline.addLast(new ChannelHandler[]{new TcpTuningHandler()});
                if (sslEngineFactory != null) {
                    SSLEngine sslEngine = sslEngineFactory.createSSLEngine(address.getHostText(), address.getPort());
                    SslHandler sslHandler = new SslHandler(sslEngine);
                    sslHandler.setWrapDataSize(0);
                    channelPipeline.addLast(new ChannelHandler[]{sslHandler});
                }
                channelPipeline.addLast(new ChannelHandler[]{this.val$decoder, new MemcacheEncoder()});
            }
        };
        CompletableFuture<RawMemcacheClient> clientFuture = new CompletableFuture<RawMemcacheClient>();
        EventLoopGroup effectiveELG = eventLoopGroup != null ? eventLoopGroup : (EventLoopGroup)DEFAULT_EVENT_LOOP_GROUP.get();
        Class<? extends Channel> effectiveChannelClass = channelClass != null ? channelClass : DefaultRawMemcacheClient.defaultChannelClass(effectiveELG);
        Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(effectiveELG)).handler((ChannelHandler)initializer)).channel(effectiveChannelClass)).option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, (Object)SimpleSizeEstimator.INSTANCE);
        ChannelFuture connectFuture = bootstrap.connect((SocketAddress)new InetSocketAddress(address.getHostText(), address.getPort()));
        connectFuture.addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            Channel channel = future.channel();
            if (channel == null) {
                clientFuture.completeExceptionally(new IOException("Channel is closed"));
                return;
            }
            if (future.isSuccess()) {
                DefaultRawMemcacheClient client = new DefaultRawMemcacheClient(address, channel, outstandingRequestLimit, eventLoopThreadFlushMaxBatchSize, executor, connectionTimeoutMillis, metrics, maxSetLength);
                clientFuture.complete(client);
            } else {
                channel.close();
                clientFuture.completeExceptionally(future.cause());
            }
        }));
        return clientFuture;
    }

    private static Class<? extends Channel> defaultChannelClass(EventLoopGroup elg) {
        return elg instanceof EpollEventLoopGroup ? EpollSocketChannel.class : NioSocketChannel.class;
    }

    private DefaultRawMemcacheClient(HostAndPort address, Channel channel, int outstandingRequestLimit, int eventLoopThreadFlushMaxBatchSize, Executor executor, long connectionTimeoutMillis, Metrics metrics, int maxSetLength) {
        this.address = address;
        this.executor = executor;
        this.connectionTimeoutMillis = connectionTimeoutMillis;
        this.metrics = metrics;
        this.maxSetLength = maxSetLength;
        this.channel = Objects.requireNonNull(channel, "channel");
        this.flusher = new BatchFlusher(channel, eventLoopThreadFlushMaxBatchSize);
        this.outstandingRequestLimit = outstandingRequestLimit;
        this.pendingCounterLimit = Math.max(0x3FFFFFFF, outstandingRequestLimit);
        GLOBAL_CONNECTION_COUNT.incrementAndGet();
        metrics.registerOutstandingRequestsGauge(this.pendingRequestGauge);
        channel.pipeline().addLast("handler", (ChannelHandler)new ConnectionHandler());
    }

    @Override
    public <T> CompletionStage<T> send(Request<T> request) {
        SetRequest setRequest;
        byte[] value;
        if (request instanceof SetRequest && (value = (setRequest = (SetRequest)((Object)request)).getValue()).length > this.maxSetLength) {
            return this.onExecutor(CompletableFuture.completedFuture(MemcacheStatus.VALUE_TOO_LARGE));
        }
        if (!this.tryIncrementPending()) {
            String disconnectReason = this.disconnectReason.get();
            if (disconnectReason != null) {
                MemcacheClosedException exception = new MemcacheClosedException(String.format("%s, memcached=%s", disconnectReason, this.address.getHostText()));
                return this.onExecutor(CompletableFutures.exceptionallyCompletedFuture((Throwable)exception));
            }
            return this.onExecutor(CompletableFutures.exceptionallyCompletedFuture((Throwable)new MemcacheOverloadedException("too many outstanding requests")));
        }
        this.channel.write(request, (ChannelPromise)new RequestWritePromise(this.channel, request));
        this.flusher.flush();
        return this.onExecutor(request.asFuture());
    }

    private <T> CompletionStage<T> onExecutor(CompletionStage<T> future) {
        return Utils.onExecutor(future, this.executor);
    }

    private boolean tryIncrementPending() {
        int pending;
        do {
            if ((pending = this.pendingCounter.get()) < this.outstandingRequestLimit) continue;
            return false;
        } while (!this.pendingCounter.compareAndSet(pending, pending + 1));
        return true;
    }

    @Override
    public void shutdown() {
        this.setDisconnected("Shutdown");
    }

    @Override
    public boolean isConnected() {
        return this.disconnectReason.get() == null;
    }

    @Override
    public Throwable getConnectionFailure() {
        return null;
    }

    @Override
    public int numTotalConnections() {
        return 1;
    }

    @Override
    public int numActiveConnections() {
        return this.isConnected() ? 1 : 0;
    }

    @Override
    public Stream<AddressAndClient> streamNodes() {
        return Stream.of(new AddressAndClient(this.address, this));
    }

    private boolean isLostConnection(Throwable t) {
        if (t instanceof IOException) {
            String message = t.getMessage();
            if (t instanceof ConnectException) {
                return message.startsWith("Connection refused:");
            }
            if (message.equals("Broken pipe")) {
                return true;
            }
            return message.equals("Connection reset by peer");
        }
        return false;
    }

    public String toString() {
        return "DefaultRawMemcacheClient(" + this.address + ")";
    }

    private void setDisconnected(Throwable cause) {
        String message = cause.getMessage();
        if (message == null) {
            message = cause.getClass().getSimpleName();
        }
        this.setDisconnected(message);
    }

    private void setDisconnected(String message) {
        if (this.disconnectReason.compareAndSet(null, message)) {
            this.pendingCounter.set(this.pendingCounterLimit);
            this.channel.close();
            GLOBAL_CONNECTION_COUNT.decrementAndGet();
            this.metrics.unregisterOutstandingRequestsGauge(this.pendingRequestGauge);
            this.notifyConnectionChange();
        }
    }

    static int getGlobalConnectionCount() {
        return GLOBAL_CONNECTION_COUNT.get();
    }

    @Override
    public int numPendingRequests() {
        int pending = this.pendingCounter.get();
        if (this.disconnectReason.get() != null) {
            return 0;
        }
        return pending;
    }

    static /* synthetic */ long access$000(DefaultRawMemcacheClient x0) {
        return x0.connectionTimeoutMillis;
    }

    private class RequestWritePromise
    extends DefaultChannelPromise {
        private final Request<?> request;

        public RequestWritePromise(Channel channel, Request<?> request) {
            super(channel);
            this.request = request;
        }

        public ChannelPromise setFailure(Throwable cause) {
            super.setFailure(cause);
            this.fail(cause);
            return this;
        }

        public boolean tryFailure(Throwable cause) {
            if (super.tryFailure(cause)) {
                this.fail(cause);
                return true;
            }
            return false;
        }

        private void fail(Throwable cause) {
            DefaultRawMemcacheClient.this.setDisconnected(cause);
            this.request.fail(new MemcacheClosedException((String)DefaultRawMemcacheClient.this.disconnectReason.get()), DefaultRawMemcacheClient.this.address);
        }
    }

    private class ConnectionHandler
    extends ChannelDuplexHandler {
        private final Queue<Request<?>> outstanding = Queues.newArrayDeque();
        private final TimeoutChecker<Request<?>> connectionTimeoutChecker = TimeoutChecker.create(TimeUnit.MILLISECONDS, DefaultRawMemcacheClient.access$000(DefaultRawMemcacheClient.this));
        private final Future<?> timeoutCheckTask;

        ConnectionHandler() {
            long pollIntervalMillis = 10L;
            this.timeoutCheckTask = DefaultRawMemcacheClient.this.channel.eventLoop().scheduleWithFixedDelay(() -> {
                Request<?> head = this.outstanding.peek();
                if (head == null) {
                    return;
                }
                if (this.connectionTimeoutChecker.check(head)) {
                    DefaultRawMemcacheClient.this.log.error("Connection timeout: {} {}", (Object)DefaultRawMemcacheClient.this.channel, head);
                    DefaultRawMemcacheClient.this.setDisconnected("Timeout");
                }
            }, 10L, 10L, TimeUnit.MILLISECONDS);
        }

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            Request request = (Request)msg;
            this.outstanding.add(request);
            super.write(ctx, msg, promise);
        }

        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            this.timeoutCheckTask.cancel(true);
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Request<?> request;
            DefaultRawMemcacheClient.this.setDisconnected("Disconnected");
            while ((request = this.outstanding.poll()) != null) {
                request.fail(new MemcacheClosedException((String)DefaultRawMemcacheClient.this.disconnectReason.get()), DefaultRawMemcacheClient.this.address);
            }
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request<?> request = this.outstanding.poll();
            if (request == null) {
                throw new Exception("Unexpected response: " + msg);
            }
            DefaultRawMemcacheClient.this.pendingCounter.decrementAndGet();
            try {
                request.handle(msg, DefaultRawMemcacheClient.this.address);
            }
            catch (Exception exception) {
                DefaultRawMemcacheClient.this.log.error("Corrupt protocol: " + exception.getMessage(), (Throwable)exception);
                DefaultRawMemcacheClient.this.setDisconnected(exception);
                request.fail(new MemcacheClosedException((String)DefaultRawMemcacheClient.this.disconnectReason.get()), DefaultRawMemcacheClient.this.address);
                ctx.channel().close();
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof DecoderException) {
                DefaultRawMemcacheClient.this.setDisconnected(cause.getCause());
            } else if (!DefaultRawMemcacheClient.this.isLostConnection(cause)) {
                DefaultRawMemcacheClient.this.log.error("Unexpected error, closing connection", cause);
                DefaultRawMemcacheClient.this.setDisconnected(cause);
            }
            ctx.close();
        }
    }
}

