/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.client;

import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.client.ClientChannelFactory;
import io.reactivex.netty.client.ClientConnectionFactory;
import io.reactivex.netty.client.ClientMetricsEvent;
import io.reactivex.netty.client.ConnectionPool;
import io.reactivex.netty.client.MaxConnectionsBasedStrategy;
import io.reactivex.netty.client.PoolConfig;
import io.reactivex.netty.client.PoolExhaustedException;
import io.reactivex.netty.client.PoolLimitDeterminationStrategy;
import io.reactivex.netty.client.PooledConnection;
import io.reactivex.netty.client.PooledConnectionFactory;
import io.reactivex.netty.client.PooledConnectionReleasedEvent;
import io.reactivex.netty.client.RxClient;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsListener;
import io.reactivex.netty.metrics.MetricEventsSubject;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.observers.Subscribers;

public class ConnectionPoolImpl<I, O>
implements ConnectionPool<I, O> {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolImpl.class);
    public static final PoolExhaustedException POOL_EXHAUSTED_EXCEPTION = new PoolExhaustedException("Rx Connection Pool exhausted.");
    private final ConcurrentLinkedQueue<PooledConnection<I, O>> idleConnections;
    private final ClientChannelFactory<I, O> channelFactory;
    private final ClientConnectionFactory<I, O, PooledConnection<I, O>> connectionFactory;
    private final PoolLimitDeterminationStrategy limitDeterminationStrategy;
    private final MetricEventsSubject<ClientMetricsEvent<?>> metricEventsSubject;
    private final RxClient.ServerInfo serverInfo;
    private final PoolConfig poolConfig;
    private final ScheduledExecutorService cleanupScheduler;
    private final AtomicBoolean isShutdown = new AtomicBoolean();
    private final ScheduledFuture<?> idleConnCleanupScheduleFuture;

    public ConnectionPoolImpl(RxClient.ServerInfo serverInfo, PoolConfig poolConfig, PoolLimitDeterminationStrategy strategy, ScheduledExecutorService cleanupScheduler, ClientChannelFactory<I, O> channelFactory, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
        this(serverInfo, poolConfig, strategy, cleanupScheduler, new PooledConnectionFactory(poolConfig, eventsSubject), channelFactory, eventsSubject);
    }

    public ConnectionPoolImpl(RxClient.ServerInfo serverInfo, PoolConfig poolConfig, PoolLimitDeterminationStrategy strategy, ScheduledExecutorService cleanupScheduler, ClientConnectionFactory<I, O, PooledConnection<I, O>> connectionFactory, ClientChannelFactory<I, O> channelFactory, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
        this.serverInfo = serverInfo;
        this.poolConfig = poolConfig;
        this.cleanupScheduler = cleanupScheduler;
        this.connectionFactory = connectionFactory;
        this.channelFactory = channelFactory;
        this.metricEventsSubject = eventsSubject;
        long scheduleDurationMillis = Math.max(30L, this.poolConfig.getMaxIdleTimeMillis());
        this.idleConnCleanupScheduleFuture = null != cleanupScheduler ? this.cleanupScheduler.scheduleWithFixedDelay(new IdleConnectionsCleanupTask(), scheduleDurationMillis, scheduleDurationMillis, TimeUnit.MILLISECONDS) : null;
        this.limitDeterminationStrategy = null == strategy ? new MaxConnectionsBasedStrategy() : strategy;
        this.metricEventsSubject.subscribe(this.limitDeterminationStrategy);
        this.idleConnections = new ConcurrentLinkedQueue();
    }

    @Override
    public Observable<ObservableConnection<I, O>> acquire() {
        if (this.isShutdown.get()) {
            return Observable.error((Throwable)new IllegalStateException("Connection pool is already shutdown."));
        }
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<ObservableConnection<I, O>>(){

            public void call(Subscriber<? super ObservableConnection<I, O>> subscriber) {
                long startTimeMillis = Clock.newStartTimeMillis();
                try {
                    ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_START);
                    PooledConnection idleConnection = ConnectionPoolImpl.this.getAnIdleConnection(true);
                    if (null != idleConnection) {
                        idleConnection.beforeReuse();
                        ConnectionPoolImpl.this.channelFactory.onNewConnection(idleConnection, subscriber);
                        long endTime = Clock.onEndMillis(startTimeMillis);
                        ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOLED_CONNECTION_REUSE, endTime);
                        ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_SUCCESS, endTime);
                    } else if (ConnectionPoolImpl.this.limitDeterminationStrategy.acquireCreationPermit(startTimeMillis, TimeUnit.MILLISECONDS)) {
                        Subscriber newConnectionSubscriber = ConnectionPoolImpl.this.newConnectionSubscriber(subscriber, startTimeMillis);
                        try {
                            ConnectionPoolImpl.this.channelFactory.connect(newConnectionSubscriber, ConnectionPoolImpl.this.serverInfo, ConnectionPoolImpl.this.connectionFactory);
                        }
                        catch (Throwable throwable) {
                            newConnectionSubscriber.onError(throwable);
                        }
                    } else {
                        ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, Clock.onEndMillis(startTimeMillis), POOL_EXHAUSTED_EXCEPTION);
                        subscriber.onError((Throwable)POOL_EXHAUSTED_EXCEPTION);
                    }
                }
                catch (Throwable throwable) {
                    ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, Clock.onEndMillis(startTimeMillis), throwable);
                    subscriber.onError(throwable);
                }
            }
        });
    }

    @Override
    public Observable<Void> release(PooledConnection<I, O> connection) {
        if (null == connection) {
            return Observable.error((Throwable)new IllegalArgumentException("Returned a null connection to the pool."));
        }
        long startTimeMillis = Clock.newStartTimeMillis();
        try {
            connection.getChannel().pipeline().fireUserEventTriggered((Object)new PooledConnectionReleasedEvent(connection));
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_START);
            if (this.isShutdown.get() || !connection.isUsable()) {
                this.discardConnection(connection);
                this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_SUCCESS, Clock.onEndMillis(startTimeMillis));
                return Observable.empty();
            }
            this.idleConnections.add(connection);
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_SUCCESS, Clock.onEndMillis(startTimeMillis));
            return Observable.empty();
        }
        catch (Throwable throwable) {
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_FAILED, Clock.onEndMillis(startTimeMillis));
            return Observable.error((Throwable)throwable);
        }
    }

    @Override
    public Observable<Void> discard(PooledConnection<I, O> connection) {
        if (null == connection) {
            return Observable.error((Throwable)new IllegalArgumentException("Returned a null connection to the pool."));
        }
        boolean removed = this.idleConnections.remove(connection);
        if (removed) {
            this.discardConnection(connection);
        }
        return Observable.empty();
    }

    @Override
    public void shutdown() {
        if (!this.isShutdown.compareAndSet(false, true)) {
            return;
        }
        if (null != this.idleConnCleanupScheduleFuture) {
            this.idleConnCleanupScheduleFuture.cancel(true);
        }
        PooledConnection<I, O> idleConnection = this.getAnIdleConnection(true);
        while (null != idleConnection) {
            this.discardConnection(idleConnection);
            idleConnection = this.getAnIdleConnection(true);
        }
        this.metricEventsSubject.onCompleted();
    }

    private PooledConnection<I, O> getAnIdleConnection(boolean claimConnectionIfFound) {
        PooledConnection<I, O> idleConnection;
        while ((idleConnection = this.idleConnections.poll()) != null) {
            if (!idleConnection.isUsable()) {
                this.discardConnection(idleConnection);
                continue;
            }
            if (claimConnectionIfFound && !idleConnection.claim()) continue;
            break;
        }
        return idleConnection;
    }

    private Observable<Void> discardConnection(PooledConnection<I, O> idleConnection) {
        this.metricEventsSubject.onEvent(ClientMetricsEvent.POOLED_CONNECTION_EVICTION);
        return idleConnection.closeUnderlyingChannel();
    }

    private Subscriber<? super ObservableConnection<I, O>> newConnectionSubscriber(final Subscriber<? super ObservableConnection<I, O>> subscriber, final long startTime) {
        return Subscribers.create((Action1)new Action1<ObservableConnection<I, O>>(){

            public void call(ObservableConnection<I, O> connection) {
                ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_SUCCESS, Clock.onEndMillis(startTime));
                PooledConnection pooledConnection = (PooledConnection)connection;
                pooledConnection.setConnectionPool(ConnectionPoolImpl.this);
                pooledConnection.updateMaxIdleTimeMillis(ConnectionPoolImpl.this.poolConfig.getMaxIdleTimeMillis());
                subscriber.onNext(connection);
                subscriber.onCompleted();
            }
        }, (Action1)new Action1<Throwable>(){

            public void call(Throwable throwable) {
                ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, Clock.onEndMillis(startTime), throwable);
                subscriber.onError(throwable);
            }
        });
    }

    @Override
    public Subscription subscribe(MetricEventsListener<? extends ClientMetricsEvent<?>> listener) {
        return this.metricEventsSubject.subscribe(listener);
    }

    private class IdleConnectionsCleanupTask
    implements Runnable {
        private IdleConnectionsCleanupTask() {
        }

        @Override
        public void run() {
            try {
                Iterator iterator = ConnectionPoolImpl.this.idleConnections.iterator();
                while (iterator.hasNext()) {
                    PooledConnection idleConnection = (PooledConnection)iterator.next();
                    if (idleConnection.isUsable() || !idleConnection.claim()) continue;
                    iterator.remove();
                    ConnectionPoolImpl.this.discardConnection(idleConnection);
                }
            }
            catch (Exception e) {
                logger.error("Exception in the idle connection cleanup task. This does NOT stop the next schedule of the task. ", (Throwable)e);
            }
        }
    }
}

