/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.Binding;
import dev.miku.r2dbc.mysql.ConnectionContext;
import dev.miku.r2dbc.mysql.ExceptionFactory;
import dev.miku.r2dbc.mysql.InitHandler;
import dev.miku.r2dbc.mysql.MultiQueryHandler;
import dev.miku.r2dbc.mysql.PrepareHandler;
import dev.miku.r2dbc.mysql.TextQuery;
import dev.miku.r2dbc.mysql.TextQueryHandler;
import dev.miku.r2dbc.mysql.client.Client;
import dev.miku.r2dbc.mysql.constant.SslMode;
import dev.miku.r2dbc.mysql.message.client.ClientMessage;
import dev.miku.r2dbc.mysql.message.client.SimpleQueryMessage;
import dev.miku.r2dbc.mysql.message.server.CompleteMessage;
import dev.miku.r2dbc.mysql.message.server.ErrorMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.util.InternalArrays;
import dev.miku.r2dbc.mysql.util.OperatorUtils;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

final class QueryFlow {
    private static final Predicate<ServerMessage> RESULT_DONE = message -> message instanceof CompleteMessage;
    private static final Predicate<ServerMessage> FETCH_DONE = message -> message instanceof ErrorMessage || message instanceof CompleteMessage && ((CompleteMessage)message).isDone();
    private static final Consumer<ReferenceCounted> RELEASE = ReferenceCounted::release;
    private static final Consumer<Object> OBJ_RELEASE = ReferenceCountUtil::release;

    static Mono<Client> login(Client client, SslMode sslMode, String database, String user, @Nullable CharSequence password, ConnectionContext context) {
        EmitterProcessor requests = EmitterProcessor.create((int)1, (boolean)false);
        InitHandler handler = new InitHandler((EmitterProcessor<ClientMessage>)requests, client, sslMode, database, user, password, context);
        return client.exchange((Flux<? extends ClientMessage>)requests, (Predicate<ServerMessage>)handler).handle((BiConsumer)handler).onErrorResume(e -> {
            requests.onComplete();
            return client.forceClose().then(Mono.error((Throwable)e));
        }).then(Mono.just((Object)client));
    }

    static Flux<Flux<ServerMessage>> execute(Client client, String sql, List<Binding> bindings, int fetchSize) {
        return Flux.defer(() -> {
            if (bindings.isEmpty()) {
                return Flux.empty();
            }
            EmitterProcessor requests = EmitterProcessor.create((int)1, (boolean)false);
            PrepareHandler handler = new PrepareHandler((EmitterProcessor<ClientMessage>)requests, sql, bindings.iterator(), fetchSize);
            return OperatorUtils.discardOnCancel(client.exchange((Flux<? extends ClientMessage>)requests, (Predicate<ServerMessage>)handler), handler::close).doOnDiscard(ReferenceCounted.class, RELEASE).handle((BiConsumer)handler).windowUntil(RESULT_DONE);
        });
    }

    static Flux<Flux<ServerMessage>> execute(Client client, TextQuery query, List<Binding> bindings) {
        return Flux.defer(() -> {
            if (bindings.isEmpty()) {
                return Flux.empty();
            }
            EmitterProcessor requests = EmitterProcessor.create((int)1, (boolean)false);
            TextQueryHandler handler = new TextQueryHandler((EmitterProcessor<ClientMessage>)requests, query, bindings.iterator());
            return OperatorUtils.discardOnCancel(client.exchange((Flux<? extends ClientMessage>)requests, (Predicate<ServerMessage>)handler), handler::close).doOnDiscard(ReferenceCounted.class, RELEASE).handle((BiConsumer)handler).windowUntil(RESULT_DONE);
        });
    }

    static Mono<Void> executeVoid(Client client, String sql) {
        return Mono.defer(() -> QueryFlow.execute0(client, sql).doOnNext(OBJ_RELEASE).then());
    }

    static Mono<Void> executeVoid(Client client, String ... statements) {
        return QueryFlow.multiQuery(client, InternalArrays.asIterator(statements)).doOnNext(OBJ_RELEASE).then();
    }

    static Flux<Flux<ServerMessage>> execute(Client client, String sql) {
        return Flux.defer(() -> QueryFlow.execute0(client, sql).windowUntil(RESULT_DONE));
    }

    static Flux<Flux<ServerMessage>> execute(Client client, List<String> statements) {
        return Flux.defer(() -> {
            switch (statements.size()) {
                case 0: {
                    return Flux.empty();
                }
                case 1: {
                    return QueryFlow.execute0(client, (String)statements.get(0)).windowUntil(RESULT_DONE);
                }
            }
            return QueryFlow.multiQuery(client, statements.iterator()).windowUntil(RESULT_DONE);
        });
    }

    private static Flux<ServerMessage> execute0(Client client, String sql) {
        return OperatorUtils.discardOnCancel(client.exchange(new SimpleQueryMessage(sql), FETCH_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle((message, sink) -> {
            if (message instanceof ErrorMessage) {
                sink.error((Throwable)ExceptionFactory.createException((ErrorMessage)message, sql));
            } else {
                sink.next(message);
            }
        });
    }

    private static Flux<ServerMessage> multiQuery(Client client, Iterator<String> statements) {
        EmitterProcessor requests = EmitterProcessor.create((int)1, (boolean)false);
        MultiQueryHandler handler = new MultiQueryHandler((EmitterProcessor<ClientMessage>)requests, statements);
        return OperatorUtils.discardOnCancel(client.exchange((Flux<? extends ClientMessage>)requests, (Predicate<ServerMessage>)handler), handler::close).doOnDiscard(ReferenceCounted.class, RELEASE).handle((BiConsumer)handler);
    }

    private QueryFlow() {
    }
}

