/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.ConnectionResources;
import io.r2dbc.postgresql.ExceptionFactory;
import io.r2dbc.postgresql.StatementCache;
import io.r2dbc.postgresql.client.Binding;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
import io.r2dbc.postgresql.client.QueryLogger;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.BindComplete;
import io.r2dbc.postgresql.message.backend.CloseComplete;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.NoData;
import io.r2dbc.postgresql.message.backend.ParseComplete;
import io.r2dbc.postgresql.message.backend.PortalSuspended;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.Bind;
import io.r2dbc.postgresql.message.frontend.Close;
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
import io.r2dbc.postgresql.message.frontend.Describe;
import io.r2dbc.postgresql.message.frontend.Execute;
import io.r2dbc.postgresql.message.frontend.ExecutionType;
import io.r2dbc.postgresql.message.frontend.Flush;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Parse;
import io.r2dbc.postgresql.message.frontend.Sync;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.postgresql.util.PredicateUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

class ExtendedFlowDelegate {
    static final Predicate<BackendMessage> RESULT_FRAME_FILTER;

    ExtendedFlowDelegate() {
    }

    public static Flux<BackendMessage> runQuery(ConnectionResources resources, ExceptionFactory factory, String query, Binding binding, List<ByteBuf> values, int fetchSize) {
        boolean implicitTransactions;
        StatementCache cache = resources.getStatementCache();
        Client client = resources.getClient();
        String name = cache.getName(binding, query);
        String portal = resources.getPortalNameSupplier().get();
        boolean prepareRequired = cache.requiresPrepare(binding, query);
        ArrayList<FrontendMessage.DirectEncoder> messagesToSend = new ArrayList<FrontendMessage.DirectEncoder>(6);
        if (prepareRequired) {
            messagesToSend.add(new Parse(name, binding.getParameterTypes(), query));
        }
        Bind bind = new Bind(portal, binding.getParameterFormats(), values, ExtendedQueryMessageFlow.resultFormat(resources.getConfiguration().isForceBinary()), name);
        messagesToSend.add(bind);
        messagesToSend.add(new Describe(portal, ExecutionType.PORTAL));
        boolean compatibilityMode = resources.getConfiguration().isCompatibilityMode();
        boolean bl = implicitTransactions = resources.getClient().getTransactionStatus() == TransactionStatus.IDLE;
        Flux<BackendMessage> exchange = compatibilityMode ? (fetchSize == 0 || implicitTransactions ? ExtendedFlowDelegate.fetchAll(messagesToSend, client, portal) : ExtendedFlowDelegate.fetchCursoredWithSync(messagesToSend, client, portal, fetchSize)) : (fetchSize == 0 ? ExtendedFlowDelegate.fetchAll(messagesToSend, client, portal) : ExtendedFlowDelegate.fetchCursoredWithFlush(messagesToSend, client, portal, fetchSize));
        if (prepareRequired) {
            exchange = exchange.doOnNext(message -> {
                if (message == ParseComplete.INSTANCE) {
                    cache.put(binding, query, name);
                }
            });
        }
        return exchange.doOnSubscribe(it -> QueryLogger.logQuery(client.getContext(), query)).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release).filter(RESULT_FRAME_FILTER).handle(factory::handleErrorResponse);
    }

    private static Flux<BackendMessage> fetchAll(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal) {
        messagesToSend.add(new Execute(portal, 0));
        messagesToSend.add(new Close(portal, ExecutionType.PORTAL));
        messagesToSend.add(Sync.INSTANCE);
        return (Flux)client.exchange((Publisher<FrontendMessage>)Mono.just((Object)new CompositeFrontendMessage(messagesToSend))).as(Operators::discardOnCancel);
    }

    private static Flux<BackendMessage> fetchCursoredWithSync(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal, int fetchSize) {
        Sinks.Many requests = Sinks.many().unicast().onBackpressureBuffer((Queue)Queues.small().get());
        AtomicBoolean isCanceled = new AtomicBoolean(false);
        AtomicBoolean done = new AtomicBoolean(false);
        messagesToSend.add(new Execute(portal, fetchSize));
        messagesToSend.add(Sync.INSTANCE);
        return (Flux)client.exchange(it -> done.get() && it instanceof ReadyForQuery, (Publisher<FrontendMessage>)Flux.just((Object)new CompositeFrontendMessage(messagesToSend)).concatWith((Publisher)requests.asFlux())).handle((message, sink) -> {
            if (message instanceof CommandComplete) {
                requests.emitNext((Object)new Close(portal, ExecutionType.PORTAL), Sinks.EmitFailureHandler.FAIL_FAST);
                requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                sink.next(message);
            } else if (message instanceof CloseComplete) {
                requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                done.set(true);
                sink.next(message);
            } else if (message instanceof ErrorResponse) {
                done.set(true);
                requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                sink.next(message);
            } else if (message instanceof PortalSuspended) {
                if (isCanceled.get()) {
                    requests.emitNext((Object)new Close(portal, ExecutionType.PORTAL), Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    requests.emitNext((Object)new Execute(portal, fetchSize), Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                }
            } else if (message instanceof NoData) {
                if (isCanceled.get()) {
                    requests.emitNext((Object)new Close(portal, ExecutionType.PORTAL), Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    done.set(true);
                }
            } else {
                sink.next(message);
            }
        }).doFinally(ignore -> requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST)).as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
    }

    private static Flux<BackendMessage> fetchCursoredWithFlush(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal, int fetchSize) {
        Sinks.Many requests = Sinks.many().unicast().onBackpressureBuffer((Queue)Queues.small().get());
        AtomicBoolean isCanceled = new AtomicBoolean(false);
        messagesToSend.add(new Execute(portal, fetchSize));
        messagesToSend.add(Flush.INSTANCE);
        return (Flux)client.exchange((Publisher<FrontendMessage>)Flux.just((Object)new CompositeFrontendMessage(messagesToSend)).concatWith((Publisher)requests.asFlux())).handle((message, sink) -> {
            if (message instanceof CommandComplete) {
                requests.emitNext((Object)new Close(portal, ExecutionType.PORTAL), Sinks.EmitFailureHandler.FAIL_FAST);
                requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                sink.next(message);
            } else if (message instanceof ErrorResponse) {
                requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                sink.next(message);
            } else if (message instanceof PortalSuspended) {
                if (isCanceled.get()) {
                    requests.emitNext((Object)new Close(portal, ExecutionType.PORTAL), Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitNext((Object)Sync.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    requests.emitNext((Object)new Execute(portal, fetchSize), Sinks.EmitFailureHandler.FAIL_FAST);
                    requests.emitNext((Object)Flush.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
                }
            } else {
                sink.next(message);
            }
        }).doFinally(ignore -> requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST)).as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
    }

    static {
        Predicate[] predicateArray = new Predicate[2];
        predicateArray[0] = BindComplete.class::isInstance;
        predicateArray[1] = NoData.class::isInstance;
        RESULT_FRAME_FILTER = PredicateUtils.not(PredicateUtils.or(predicateArray));
    }
}

