/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.driver;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.CodecException;
import java.io.IOException;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.tinkerpop.gremlin.driver.Channelizer;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.ConnectionPool;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultQueue;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Connection {
    private static final Logger logger = LoggerFactory.getLogger(Connection.class);
    private final Channel channel;
    private final URI uri;
    private final ConcurrentMap<UUID, ResultQueue> pending = new ConcurrentHashMap<UUID, ResultQueue>();
    private final Cluster cluster;
    private final Client client;
    private final ConnectionPool pool;
    public static final int MAX_IN_PROCESS = 4;
    public static final int MIN_IN_PROCESS = 1;
    public static final int MAX_WAIT_FOR_CONNECTION = 3000;
    public static final int MAX_WAIT_FOR_SESSION_CLOSE = 3000;
    public static final int MAX_CONTENT_LENGTH = 65536;
    public static final int RECONNECT_INITIAL_DELAY = 1000;
    public static final int RECONNECT_INTERVAL = 1000;
    public static final int RESULT_ITERATION_BATCH_SIZE = 64;
    public final AtomicInteger borrowed = new AtomicInteger(0);
    private final AtomicReference<Class<Channelizer>> channelizerClass = new AtomicReference<Object>(null);
    private volatile boolean isDead = false;
    private final int maxInProcess;
    private final String connectionLabel;
    private final Channelizer channelizer;
    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference();
    private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false);

    public Connection(URI uri, ConnectionPool pool, int maxInProcess) throws ConnectionException {
        this.uri = uri;
        this.cluster = pool.getCluster();
        this.client = pool.getClient();
        this.pool = pool;
        this.maxInProcess = maxInProcess;
        this.connectionLabel = String.format("Connection{host=%s}", pool.host);
        if (this.cluster.isClosing()) {
            throw new IllegalStateException("Cannot open a connection while the cluster after close() is called");
        }
        Bootstrap b = this.cluster.getFactory().createBootstrap();
        try {
            if (this.channelizerClass.get() == null) {
                this.channelizerClass.compareAndSet(null, Class.forName(this.cluster.connectionPoolSettings().channelizer));
            }
            this.channelizer = this.channelizerClass.get().newInstance();
            this.channelizer.init(this);
            ((Bootstrap)b.channel(NioSocketChannel.class)).handler((ChannelHandler)this.channelizer);
            this.channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
            this.channelizer.connected();
            logger.info("Created new connection for {}", (Object)uri);
        }
        catch (Exception ie) {
            logger.debug("Error opening connection on {}", (Object)uri);
            throw new ConnectionException(uri, "Could not open connection", ie);
        }
    }

    public int availableInProcess() {
        return Math.max(0, this.maxInProcess - this.pending.size());
    }

    public boolean isDead() {
        return this.isDead;
    }

    public boolean isClosed() {
        return this.closeFuture.get() != null;
    }

    URI getUri() {
        return this.uri;
    }

    Cluster getCluster() {
        return this.cluster;
    }

    Client getClient() {
        return this.client;
    }

    ConcurrentMap<UUID, ResultQueue> getPending() {
        return this.pending;
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (!this.closeFuture.compareAndSet(null, future)) {
            return this.closeFuture.get();
        }
        if (this.pending.isEmpty()) {
            if (null == this.channel) {
                future.complete(null);
            } else {
                this.shutdown(future);
            }
        }
        return future;
    }

    public void close() {
        try {
            this.closeAsync().get();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    public ChannelPromise write(RequestMessage requestMessage, CompletableFuture<ResultSet> future) {
        Connection thisConnection = this;
        ChannelPromise promise = this.channel.newPromise().addListener(f -> {
            if (!f.isSuccess()) {
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Write on connection %s failed", thisConnection.getConnectionInfo()), f.cause());
                }
                thisConnection.isDead = true;
                thisConnection.returnToPool();
                future.completeExceptionally(f.cause());
            } else {
                LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<Result>();
                CompletableFuture<Void> readCompleted = new CompletableFuture<Void>();
                readCompleted.thenAcceptAsync(v -> {
                    thisConnection.returnToPool();
                    this.tryShutdown();
                }, (Executor)this.cluster.executor());
                readCompleted.exceptionally(t -> {
                    if (t instanceof IOException || t instanceof CodecException) {
                        if (this.pool != null) {
                            this.pool.replaceConnection(thisConnection);
                        }
                    } else {
                        thisConnection.returnToPool();
                    }
                    this.tryShutdown();
                    return null;
                });
                ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
                this.pending.put(requestMessage.getRequestId(), handler);
                future.complete(new ResultSet(handler, this.cluster.executor(), readCompleted, requestMessage, this.pool.host));
            }
        });
        this.channel.writeAndFlush((Object)requestMessage, promise);
        return promise;
    }

    public void returnToPool() {
        block3: {
            try {
                if (this.pool != null) {
                    this.pool.returnConnection(this);
                }
            }
            catch (ConnectionException ce) {
                if (!logger.isDebugEnabled()) break block3;
                logger.debug("Returned {} connection to {} but an error occurred - {}", new Object[]{this.getConnectionInfo(), this.pool, ce.getMessage()});
            }
        }
    }

    private void tryShutdown() {
        if (this.isClosed() && this.pending.isEmpty()) {
            this.shutdown(this.closeFuture.get());
        }
    }

    private void shutdown(CompletableFuture<Void> future) {
        if (this.shutdownInitiated.compareAndSet(false, true)) {
            if (this.client instanceof Client.SessionedClient) {
                RequestMessage closeMessage = this.client.buildMessage(RequestMessage.build("close")).create();
                CompletableFuture<ResultSet> closed = new CompletableFuture<ResultSet>();
                this.write(closeMessage, closed);
                try {
                    closed.get(this.cluster.connectionPoolSettings().maxWaitForSessionClose, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException ex) {
                    String msg = String.format("Timeout while trying to close connection on %s - force closing - server will close session on shutdown or expiration.", ((Client.SessionedClient)this.client).getSessionId());
                    logger.warn(msg, (Throwable)ex);
                }
                catch (Exception ex) {
                    String msg = String.format("Encountered an error trying to close connection on %s - force closing - server will close session on shutdown or expiration.", ((Client.SessionedClient)this.client).getSessionId());
                    logger.warn(msg, (Throwable)ex);
                }
            }
            this.channelizer.close(this.channel);
            ChannelPromise promise = this.channel.newPromise();
            promise.addListener(f -> {
                if (f.cause() != null) {
                    future.completeExceptionally(f.cause());
                } else {
                    future.complete(null);
                }
            });
            this.channel.close(promise);
        }
    }

    public String getConnectionInfo() {
        return String.format("Connection{host=%s, isDead=%s, borrowed=%s, pending=%s}", this.pool.host, this.isDead, this.borrowed, this.pending.size());
    }

    public String toString() {
        return this.connectionLabel;
    }
}

