/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.data.rx.sql.impl;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.Statement;
import java.util.Collection;
import org.noear.solon.data.rx.sql.RxSqlExecutor;
import org.noear.solon.data.rx.sql.bound.RxRowConverter;
import org.noear.solon.data.rx.sql.bound.RxStatementBinder;
import org.noear.solon.data.rx.sql.impl.DefaultRxBinder;
import org.noear.solon.data.rx.sql.impl.DefaultRxConverter;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class SimpleRxSqlExecutor
implements RxSqlExecutor {
    private final ConnectionFactory dataSource;
    private final String sql;
    private final Object[] argsDef;
    private static final DefaultRxBinder binderDef = new DefaultRxBinder();

    public SimpleRxSqlExecutor(ConnectionFactory dataSource, String sql, Object[] args) {
        this.dataSource = dataSource;
        this.sql = sql;
        this.argsDef = args;
    }

    @Override
    public <T> Mono<T> queryValue(Class<T> tClass) {
        return this.queryRow((Row row, RowMetadata rowM) -> row.get(0));
    }

    @Override
    public <T> Flux<T> queryValueList(Class<T> tClass) {
        return this.queryRowList((Row row, RowMetadata rowM) -> row.get(0));
    }

    @Override
    public <T> Mono<T> queryRow(Class<T> tClass) {
        return this.queryRow(DefaultRxConverter.getInstance().create(tClass));
    }

    @Override
    public <T> Mono<T> queryRow(RxRowConverter<T> converter) {
        return Mono.from(this.getConnection()).flatMapMany(conn -> binderDef.setValues(conn.createStatement(this.sql), this.argsDef).execute()).flatMap(result -> result.map(converter::convert)).take(1L).singleOrEmpty();
    }

    @Override
    public <T> Flux<T> queryRowList(Class<T> tClass) {
        return this.queryRowList(DefaultRxConverter.getInstance().create(tClass));
    }

    @Override
    public <T> Flux<T> queryRowList(RxRowConverter<T> converter) {
        return Mono.from(this.getConnection()).flatMapMany(conn -> binderDef.setValues(conn.createStatement(this.sql), this.argsDef).execute()).flatMap(result -> result.map(converter::convert));
    }

    @Override
    public Mono<Long> update() {
        return this.update(this.argsDef, binderDef);
    }

    @Override
    public <S> Mono<Long> update(S args, RxStatementBinder<S> binder) {
        return Mono.from(this.getConnection()).flatMapMany(conn -> binder.setValues(conn.createStatement(this.sql), args).execute()).flatMap(result -> result.getRowsUpdated()).take(1L).singleOrEmpty();
    }

    @Override
    public <T> Mono<T> updateReturnKey(Class<T> tClass) {
        return this.updateReturnKey(tClass, this.argsDef, binderDef);
    }

    @Override
    public <T, S> Mono<T> updateReturnKey(Class<T> tClass, S args, RxStatementBinder<S> binder) {
        return Mono.from(this.getConnection()).flatMapMany(conn -> binder.setValues(conn.createStatement(this.sql).returnGeneratedValues(new String[0]), args).execute()).flatMap(result -> result.map(r -> r.get(0))).take(1L).singleOrEmpty();
    }

    @Override
    public Flux<Long> updateBatch(Collection<Object[]> argsList) {
        return this.updateBatch((Collection)argsList, (RxStatementBinder<T>)binderDef);
    }

    public <T> Flux<Long> updateBatch(Collection<T> argsList, RxStatementBinder<T> binder) {
        return Mono.from(this.getConnection()).flatMapMany(conn -> {
            Statement stmt = conn.createStatement(this.sql);
            int count = 0;
            for (Object row : argsList) {
                if (count > 0) {
                    stmt.add();
                }
                binder.setValues(stmt, row);
                ++count;
            }
            return stmt.execute();
        }).flatMap(result -> result.getRowsUpdated());
    }

    protected Publisher<? extends Connection> getConnection() {
        return this.dataSource.create();
    }
}

