/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.ConnectionResources;
import io.r2dbc.postgresql.ExceptionFactory;
import io.r2dbc.postgresql.ExtendedQueryPostgresqlStatement;
import io.r2dbc.postgresql.NotificationResponseWrapper;
import io.r2dbc.postgresql.PostgresqlBatch;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionMetadata;
import io.r2dbc.postgresql.SimpleQueryPostgresqlStatement;
import io.r2dbc.postgresql.StatementCache;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.api.Notification;
import io.r2dbc.postgresql.api.PostgresTransactionDefinition;
import io.r2dbc.postgresql.api.PostgresqlResult;
import io.r2dbc.postgresql.api.PostgresqlStatement;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionContext;
import io.r2dbc.postgresql.client.PortalNameSupplier;
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.codec.Codecs;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

final class PostgresqlConnection
implements io.r2dbc.postgresql.api.PostgresqlConnection {
    private final Logger logger = Loggers.getLogger(this.getClass());
    private final Client client;
    private final ConnectionResources resources;
    private final ConnectionContext connectionContext;
    private final Codecs codecs;
    private final Flux<Integer> validationQuery;
    private final AtomicReference<NotificationAdapter> notificationAdapter = new AtomicReference();
    private volatile IsolationLevel isolationLevel;
    private volatile IsolationLevel previousIsolationLevel;

    PostgresqlConnection(Client client, Codecs codecs, PortalNameSupplier portalNameSupplier, StatementCache statementCache, IsolationLevel isolationLevel, PostgresqlConnectionConfiguration configuration) {
        this.client = Assert.requireNonNull(client, "client must not be null");
        this.resources = new ConnectionResources(client, codecs, this, configuration, portalNameSupplier, statementCache);
        this.connectionContext = client.getContext();
        this.codecs = Assert.requireNonNull(codecs, "codecs must not be null");
        this.isolationLevel = Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
        this.validationQuery = new SimpleQueryPostgresqlStatement(this.resources, "SELECT 1").fetchSize(0).execute().flatMap(PostgresqlResult::getRowsUpdated);
    }

    Client getClient() {
        return this.client;
    }

    @Override
    public Mono<Void> beginTransaction() {
        return this.beginTransaction(EmptyTransactionDefinition.INSTANCE);
    }

    @Override
    public Mono<Void> beginTransaction(TransactionDefinition definition) {
        Assert.requireNonNull(definition, "definition must not be null");
        return this.useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.IDLE == transactionStatus) {
                IsolationLevel isolationLevel = (IsolationLevel)definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL);
                Boolean readOnly = (Boolean)definition.getAttribute(TransactionDefinition.READ_ONLY);
                Boolean deferrable = (Boolean)definition.getAttribute(PostgresTransactionDefinition.DEFERRABLE);
                String begin = "BEGIN";
                String transactionMode = "";
                if (isolationLevel != null) {
                    transactionMode = PostgresqlConnection.appendTransactionMode(transactionMode, "ISOLATION LEVEL", isolationLevel.asSql());
                }
                if (readOnly != null) {
                    transactionMode = PostgresqlConnection.appendTransactionMode(transactionMode, readOnly != false ? "READ ONLY" : "READ WRITE");
                }
                if (deferrable != null) {
                    transactionMode = PostgresqlConnection.appendTransactionMode(transactionMode, deferrable != false ? "" : "NOT", "DEFERRABLE");
                }
                return this.exchange(transactionMode.isEmpty() ? begin : begin + " " + transactionMode).doOnComplete(() -> {
                    this.previousIsolationLevel = this.isolationLevel;
                    if (isolationLevel != null) {
                        this.isolationLevel = isolationLevel;
                    }
                });
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping begin transaction because status is {}"), new Object[]{transactionStatus});
            return Mono.empty();
        });
    }

    private static String appendTransactionMode(String transactionMode, String ... tokens) {
        StringBuilder builder = new StringBuilder(transactionMode);
        boolean first = true;
        if (builder.length() != 0) {
            builder.append(", ");
        }
        for (String token : tokens) {
            if (token.isEmpty()) continue;
            if (first) {
                first = false;
            } else {
                builder.append(" ");
            }
            builder.append(token);
        }
        return builder.toString();
    }

    @Override
    public Mono<Void> close() {
        return this.client.close().doOnSubscribe(subscription -> {
            NotificationAdapter notificationAdapter = this.notificationAdapter.get();
            if (notificationAdapter != null && this.notificationAdapter.compareAndSet(notificationAdapter, null)) {
                notificationAdapter.dispose();
            }
        }).then(Mono.empty());
    }

    @Override
    public Mono<Void> cancelRequest() {
        return this.client.cancelRequest();
    }

    @Override
    public Mono<Void> commitTransaction() {
        return this.useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.IDLE != transactionStatus) {
                return Flux.from(this.exchange("COMMIT")).doOnComplete(this::cleanupIsolationLevel).filter(CommandComplete.class::isInstance).cast(CommandComplete.class).handle((message, sink) -> {
                    if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) {
                        sink.error((Throwable)((Object)new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)"))));
                        return;
                    }
                    sink.next(message);
                });
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), new Object[]{transactionStatus});
            return Mono.empty();
        });
    }

    @Override
    public PostgresqlBatch createBatch() {
        return new PostgresqlBatch(this.resources);
    }

    @Override
    public Mono<Void> createSavepoint(String name) {
        Assert.requireNonNull(name, "name must not be null");
        return this.beginTransaction().then(this.useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.OPEN == transactionStatus) {
                return this.exchange(String.format("SAVEPOINT %s", name));
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping create savepoint because status is {}"), new Object[]{transactionStatus});
            return Mono.empty();
        }));
    }

    @Override
    public PostgresqlStatement createStatement(String sql) {
        Assert.requireNonNull(sql, "sql must not be null");
        if (SimpleQueryPostgresqlStatement.supports(sql)) {
            return new SimpleQueryPostgresqlStatement(this.resources, sql);
        }
        if (ExtendedQueryPostgresqlStatement.supports(sql)) {
            return new ExtendedQueryPostgresqlStatement(this.resources, sql);
        }
        throw new IllegalArgumentException(String.format("Statement '%s' cannot be created. This is often due to the presence of both multiple statements and parameters at the same time.", sql));
    }

    @Override
    public Flux<Notification> getNotifications() {
        NotificationAdapter notifications = this.notificationAdapter.get();
        if (notifications == null) {
            notifications = new NotificationAdapter();
            if (this.notificationAdapter.compareAndSet(null, notifications)) {
                notifications.register(this.client);
            } else {
                notifications = this.notificationAdapter.get();
            }
        }
        return notifications.getEvents();
    }

    @Override
    public PostgresqlConnectionMetadata getMetadata() {
        return new PostgresqlConnectionMetadata(this.client.getVersion());
    }

    @Override
    public IsolationLevel getTransactionIsolationLevel() {
        return this.isolationLevel;
    }

    @Override
    public boolean isAutoCommit() {
        return this.client.getTransactionStatus() == TransactionStatus.IDLE;
    }

    @Override
    public Mono<Void> releaseSavepoint(String name) {
        Assert.requireNonNull(name, "name must not be null");
        return this.useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.OPEN == transactionStatus) {
                return this.exchange(String.format("RELEASE SAVEPOINT %s", name));
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping release savepoint because status is {}"), new Object[]{transactionStatus});
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> rollbackTransaction() {
        return this.useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.IDLE != transactionStatus) {
                return this.exchange("ROLLBACK").doOnComplete(this::cleanupIsolationLevel);
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping rollback transaction because status is {}"), new Object[]{transactionStatus});
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> rollbackTransactionToSavepoint(String name) {
        Assert.requireNonNull(name, "name must not be null");
        return this.useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.IDLE != transactionStatus) {
                return this.exchange(String.format("ROLLBACK TO SAVEPOINT %s", name));
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping rollback transaction to savepoint because status is {}"), new Object[]{transactionStatus});
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> setAutoCommit(boolean autoCommit) {
        return this.useTransactionStatus(transactionStatus -> {
            this.logger.debug(this.connectionContext.getMessage(String.format("Setting auto-commit mode to [%s]", autoCommit)));
            if (this.isAutoCommit()) {
                if (!autoCommit) {
                    this.logger.debug(this.connectionContext.getMessage("Beginning transaction"));
                    return this.beginTransaction();
                }
            } else if (autoCommit) {
                this.logger.debug(this.connectionContext.getMessage("Committing pending transactions"));
                return this.commitTransaction();
            }
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
        Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
        return this.withTransactionStatus(PostgresqlConnection.getTransactionIsolationLevelQuery(isolationLevel)).flatMapMany(this::exchange).then().doOnSuccess(ignore -> {
            this.isolationLevel = isolationLevel;
        });
    }

    @Override
    public String toString() {
        return "PostgresqlConnection{client=" + this.client + ", codecs=" + this.codecs + '}';
    }

    @Override
    public Mono<Boolean> validate(ValidationDepth depth) {
        if (depth == ValidationDepth.LOCAL) {
            return Mono.fromSupplier(this.client::isConnected);
        }
        return Mono.create(sink -> {
            if (!this.client.isConnected()) {
                sink.success((Object)false);
                return;
            }
            this.validationQuery.subscribe((CoreSubscriber)new CoreSubscriber<Integer>(){

                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }

                public void onNext(Integer integer) {
                }

                public void onError(Throwable t) {
                    PostgresqlConnection.this.logger.debug(PostgresqlConnection.this.connectionContext.getMessage("Validation failed"), t);
                    sink.success((Object)false);
                }

                public void onComplete() {
                    sink.success((Object)true);
                }
            });
        });
    }

    private static Function<TransactionStatus, String> getTransactionIsolationLevelQuery(IsolationLevel isolationLevel) {
        return transactionStatus -> {
            if (transactionStatus == TransactionStatus.OPEN) {
                return String.format("SET TRANSACTION ISOLATION LEVEL %s", isolationLevel.asSql());
            }
            return String.format("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL %s", isolationLevel.asSql());
        };
    }

    private Mono<Void> useTransactionStatus(Function<TransactionStatus, Publisher<?>> f) {
        return ((Flux)Flux.defer(() -> (Publisher)f.apply(this.client.getTransactionStatus())).as(Operators::discardOnCancel)).then();
    }

    private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
        return Mono.defer(() -> Mono.just(f.apply(this.client.getTransactionStatus())));
    }

    private <T> Flux<T> exchange(String sql) {
        ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
        return (Flux)SimpleQueryMessageFlow.exchange(this.client, sql).handle(exceptionFactory::handleErrorResponse).as(Operators::discardOnCancel);
    }

    private void cleanupIsolationLevel() {
        if (this.previousIsolationLevel != null) {
            this.isolationLevel = this.previousIsolationLevel;
        }
        this.previousIsolationLevel = null;
    }

    static enum EmptyTransactionDefinition implements TransactionDefinition
    {
        INSTANCE;


        public <T> T getAttribute(Option<T> option) {
            return null;
        }
    }

    static class NotificationAdapter {
        private final Sinks.Many<Notification> sink = Sinks.many().multicast().directBestEffort();
        @Nullable
        private volatile Disposable subscription = null;

        NotificationAdapter() {
        }

        void dispose() {
            Disposable subscription = this.subscription;
            if (subscription != null && !subscription.isDisposed()) {
                subscription.dispose();
            }
        }

        void register(Client client) {
            this.subscription = client.addNotificationListener(new Subscriber<NotificationResponse>(){

                public void onSubscribe(Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                public void onNext(NotificationResponse notificationResponse) {
                    sink.emitNext((Object)new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
                }

                public void onError(Throwable throwable) {
                    sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
                }

                public void onComplete() {
                    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                }
            });
        }

        Flux<Notification> getEvents() {
            return this.sink.asFlux();
        }
    }
}

