/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.mssql;

import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.mssql.QueryLogger;
import io.r2dbc.mssql.client.Client;
import io.r2dbc.mssql.message.ClientMessage;
import io.r2dbc.mssql.message.Message;
import io.r2dbc.mssql.message.token.DoneToken;
import io.r2dbc.mssql.message.token.SqlBatch;
import io.r2dbc.mssql.util.Assert;
import io.r2dbc.mssql.util.Operators;
import java.util.function.BiConsumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

final class QueryMessageFlow {
    QueryMessageFlow() {
    }

    static Flux<Message> exchange(Client client, String query) {
        Assert.requireNonNull(client, "Client must not be null");
        Assert.requireNonNull(query, "Query must not be null");
        return client.exchange((Publisher<? extends ClientMessage>)Mono.fromSupplier(() -> SqlBatch.create(1, client.getTransactionDescriptor(), query)), DoneToken::isDone).doOnSubscribe(ignore -> QueryLogger.logQuery(client.getContext(), query)).handle((BiConsumer)DoneHandler.INSTANCE).transform(Operators::discardOnCancel).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
    }

    static enum DoneHandler implements BiConsumer<Message, SynchronousSink<Message>>
    {
        INSTANCE;


        @Override
        public void accept(Message message, SynchronousSink<Message> sink) {
            sink.next((Object)message);
            if (DoneToken.isDone(message)) {
                sink.complete();
            }
        }
    }
}

