/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.ConnectionFunction;
import io.r2dbc.postgresql.ConnectionStrategy;
import io.r2dbc.postgresql.DefaultPortalNameSupplier;
import io.r2dbc.postgresql.DisabledStatementCache;
import io.r2dbc.postgresql.PostgresqlConnection;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlStatement;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import io.r2dbc.postgresql.client.MultiHostConfiguration;
import io.r2dbc.postgresql.codec.DefaultCodecs;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.spi.IsolationLevel;
import java.net.SocketAddress;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class MultiHostConnectionStrategy
implements ConnectionStrategy {
    private final ConnectionFunction connectionFunction;
    private final Collection<SocketAddress> addresses;
    private final PostgresqlConnectionConfiguration configuration;
    private final MultiHostConfiguration multiHostConfiguration;
    private final ConnectionSettings settings;
    private final Map<SocketAddress, HostConnectOutcome> statusMap;

    MultiHostConnectionStrategy(ConnectionFunction connectionFunction, Collection<SocketAddress> addresses, PostgresqlConnectionConfiguration configuration, ConnectionSettings settings) {
        Assert.isTrue(!addresses.isEmpty(), "Collection of SocketAddress must not be empty");
        this.connectionFunction = connectionFunction;
        this.addresses = addresses;
        this.configuration = configuration;
        this.multiHostConfiguration = this.configuration.getMultiHostConfiguration();
        this.settings = settings;
        this.statusMap = new ConcurrentHashMap<SocketAddress, HostConnectOutcome>(addresses.size());
    }

    @Override
    public Mono<Client> connect() {
        return this.connect(this.multiHostConfiguration.getTargetServerType());
    }

    public String toString() {
        return String.format("a %s%s", this.multiHostConfiguration.getTargetServerType() + " node using " + this.multiHostConfiguration.getHosts(), this.statusMap.isEmpty() ? "" : ". Known server states: " + this.statusMap);
    }

    public Mono<Client> connect(TargetServerType targetServerType) {
        CopyOnWriteArrayList errors = new CopyOnWriteArrayList();
        return this.attemptConnection(targetServerType, errors::add).onErrorResume(e -> {
            errors.add(e);
            return Mono.empty();
        }).switchIfEmpty(Mono.defer(() -> targetServerType == TargetServerType.PREFER_SECONDARY ? this.attemptConnection(TargetServerType.PRIMARY, errors::add) : Mono.empty())).switchIfEmpty(Mono.error(() -> {
            if (errors.isEmpty()) {
                return new ExceptionAggregator(String.format("No server matches target type '%s'", targetServerType), null);
            }
            ExceptionAggregator exception = new ExceptionAggregator(null, errors.size() == 1 ? (Throwable)errors.get(0) : null);
            if (errors.size() > 1) {
                errors.forEach(exception::addSuppressed);
            }
            return exception;
        }));
    }

    private Mono<Client> attemptConnection(TargetServerType targetServerType, Consumer<Throwable> errorHandler) {
        return this.getCandidates(targetServerType).concatMap(candidate -> this.attemptConnection(targetServerType, (SocketAddress)candidate).onErrorResume(e -> {
            errorHandler.accept((Throwable)e);
            this.statusMap.put((SocketAddress)candidate, HostConnectOutcome.fail(candidate));
            return Mono.empty();
        })).next();
    }

    private Mono<Client> attemptConnection(TargetServerType targetServerType, SocketAddress candidate) {
        return this.connectionFunction.connect(candidate, this.settings).flatMap(client -> {
            this.statusMap.compute(candidate, (a, oldStatus) -> MultiHostConnectionStrategy.evaluateStatus(candidate, oldStatus));
            if (targetServerType == TargetServerType.ANY) {
                return Mono.just((Object)client);
            }
            return MultiHostConnectionStrategy.isPrimaryServer(client, this.configuration).flatMap(isPrimary -> {
                HostConnectOutcome outcome = isPrimary != false ? HostConnectOutcome.primary(candidate) : HostConnectOutcome.standby(candidate);
                this.statusMap.put(candidate, outcome);
                if (targetServerType.test(candidate, outcome.hostStatus)) {
                    return Mono.just((Object)client);
                }
                return client.close().then(Mono.empty());
            });
        });
    }

    private static HostConnectOutcome evaluateStatus(SocketAddress candidate, @Nullable HostConnectOutcome oldStatus) {
        return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL ? HostConnectOutcome.ok(candidate) : oldStatus;
    }

    private static Mono<Boolean> isPrimaryServer(Client client, PostgresqlConnectionConfiguration configuration) {
        PostgresqlConnection connection = new PostgresqlConnection(client, new DefaultCodecs(client.getByteBufAllocator()), DefaultPortalNameSupplier.INSTANCE, DisabledStatementCache.INSTANCE, IsolationLevel.READ_UNCOMMITTED, configuration);
        return new PostgresqlStatement(connection.getResources(), "SHOW TRANSACTION_READ_ONLY").fetchSize(0).execute().flatMap(result -> result.map(row -> (String)row.get(0, String.class))).map(s -> s.equalsIgnoreCase("off")).last();
    }

    private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
        return Flux.defer(() -> {
            Instant recheckIfBefore = HostConnectOutcome.DEFAULT_CLOCK.instant().plus(this.multiHostConfiguration.getHostRecheckTime());
            Predicate<Instant> needsRecheck = updated -> updated.isBefore(recheckIfBefore);
            ArrayList<SocketAddress> addresses = new ArrayList<SocketAddress>(this.addresses);
            ArrayList<Object> result = new ArrayList(this.addresses.size());
            if (this.multiHostConfiguration.isLoadBalanceHosts()) {
                Collections.shuffle(addresses);
            }
            for (SocketAddress address : addresses) {
                HostConnectOutcome currentStatus = this.statusMap.get(address);
                if (currentStatus != null && currentStatus.hostStatus != HostStatus.CONNECT_OK && !needsRecheck.test(currentStatus.connectionAttemptedAt) && !targetServerType.test(address, currentStatus.hostStatus)) continue;
                result.add(address);
            }
            if (result.isEmpty()) {
                result = addresses;
            }
            return Flux.fromIterable(result);
        });
    }

    public static enum TargetServerType implements HostSelector
    {
        ANY("any"){

            @Override
            public boolean test(SocketAddress address, HostStatus hostStatus) {
                return hostStatus != HostStatus.CONNECT_FAIL;
            }
        }
        ,
        PRIMARY("primary"){

            @Override
            public boolean test(SocketAddress address, HostStatus hostStatus) {
                return hostStatus == HostStatus.PRIMARY;
            }
        }
        ,
        SECONDARY("secondary"){

            @Override
            public boolean test(SocketAddress address, HostStatus hostStatus) {
                return hostStatus == HostStatus.STANDBY;
            }
        }
        ,
        PREFER_SECONDARY("preferSecondary"){

            @Override
            public boolean test(SocketAddress address, HostStatus hostStatus) {
                return hostStatus == HostStatus.STANDBY;
            }
        };

        private final String value;

        private TargetServerType(String value) {
            this.value = value;
        }

        public static TargetServerType fromValue(String value) {
            for (TargetServerType type : TargetServerType.values()) {
                if (!type.value.equalsIgnoreCase(value) && !type.name().equalsIgnoreCase(value)) continue;
                return type;
            }
            throw new IllegalArgumentException(String.format("Cannot resolve '%s' to a valid TargetServerType.", value));
        }

        public String getValue() {
            return this.value;
        }
    }

    private static class HostConnectOutcome {
        static final Clock DEFAULT_CLOCK = Clock.systemDefaultZone();
        public final SocketAddress address;
        public final HostStatus hostStatus;
        public final Instant connectionAttemptedAt;

        private HostConnectOutcome(SocketAddress address, HostStatus hostStatus, Clock clock) {
            this.address = address;
            this.hostStatus = hostStatus;
            this.connectionAttemptedAt = clock.instant();
        }

        public static HostConnectOutcome fail(SocketAddress host) {
            return new HostConnectOutcome(host, HostStatus.CONNECT_FAIL, DEFAULT_CLOCK);
        }

        public static HostConnectOutcome ok(SocketAddress host) {
            return new HostConnectOutcome(host, HostStatus.CONNECT_OK, DEFAULT_CLOCK);
        }

        public static HostConnectOutcome primary(SocketAddress host) {
            return new HostConnectOutcome(host, HostStatus.PRIMARY, DEFAULT_CLOCK);
        }

        public static HostConnectOutcome standby(SocketAddress host) {
            return new HostConnectOutcome(host, HostStatus.STANDBY, DEFAULT_CLOCK);
        }

        public String toString() {
            return this.hostStatus.name();
        }
    }

    public static enum HostStatus {
        CONNECT_FAIL,
        CONNECT_OK,
        PRIMARY,
        STANDBY;

    }

    static class ExceptionAggregator
    extends RuntimeException {
        public ExceptionAggregator(@Nullable String message, @Nullable Throwable cause) {
            super(message, cause);
        }
    }

    public static interface HostSelector {
        public boolean test(SocketAddress var1, HostStatus var2);
    }
}

