/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.internal.reactive;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Statement;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.internal.async.ExplicitTransaction;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.reactive.AbstractRxStatementRunner;
import org.neo4j.driver.internal.reactive.InternalRxStatementResult;
import org.neo4j.driver.internal.reactive.InternalRxTransaction;
import org.neo4j.driver.internal.reactive.RxUtils;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxStatementResult;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.driver.reactive.RxTransactionWork;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class InternalRxSession
extends AbstractRxStatementRunner
implements RxSession {
    private final NetworkSession session;

    public InternalRxSession(NetworkSession session) {
        this.session = session;
    }

    @Override
    public Publisher<RxTransaction> beginTransaction() {
        return this.beginTransaction(TransactionConfig.empty());
    }

    @Override
    public Publisher<RxTransaction> beginTransaction(TransactionConfig config) {
        return RxUtils.createMono(() -> {
            CompletableFuture txFuture = new CompletableFuture();
            this.session.beginTransactionAsync(config).whenComplete((tx, completionError) -> {
                if (tx != null) {
                    txFuture.complete(new InternalRxTransaction((ExplicitTransaction)tx));
                } else {
                    this.releaseConnectionBeforeReturning(txFuture, (Throwable)completionError);
                }
            });
            return txFuture;
        });
    }

    private Publisher<RxTransaction> beginTransaction(AccessMode mode, TransactionConfig config) {
        return RxUtils.createMono(() -> {
            CompletableFuture txFuture = new CompletableFuture();
            this.session.beginTransactionAsync(mode, config).whenComplete((tx, completionError) -> {
                if (tx != null) {
                    txFuture.complete(new InternalRxTransaction((ExplicitTransaction)tx));
                } else {
                    this.releaseConnectionBeforeReturning(txFuture, (Throwable)completionError);
                }
            });
            return txFuture;
        });
    }

    @Override
    public <T> Publisher<T> readTransaction(RxTransactionWork<Publisher<T>> work) {
        return this.readTransaction(work, TransactionConfig.empty());
    }

    @Override
    public <T> Publisher<T> readTransaction(RxTransactionWork<Publisher<T>> work, TransactionConfig config) {
        return this.runTransaction(AccessMode.READ, work, config);
    }

    @Override
    public <T> Publisher<T> writeTransaction(RxTransactionWork<Publisher<T>> work) {
        return this.writeTransaction(work, TransactionConfig.empty());
    }

    @Override
    public <T> Publisher<T> writeTransaction(RxTransactionWork<Publisher<T>> work, TransactionConfig config) {
        return this.runTransaction(AccessMode.WRITE, work, config);
    }

    private <T> Publisher<T> runTransaction(AccessMode mode, RxTransactionWork<Publisher<T>> work, TransactionConfig config) {
        Flux repeatableWork = Flux.usingWhen(this.beginTransaction(mode, config), work::execute, RxTransaction::commit, RxTransaction::rollback);
        return this.session.retryLogic().retryRx(repeatableWork);
    }

    @Override
    public RxStatementResult run(String statement, TransactionConfig config) {
        return this.run(new Statement(statement), config);
    }

    @Override
    public RxStatementResult run(String statement, Map<String, Object> parameters, TransactionConfig config) {
        return this.run(new Statement(statement, parameters), config);
    }

    @Override
    public RxStatementResult run(Statement statement) {
        return this.run(statement, TransactionConfig.empty());
    }

    @Override
    public RxStatementResult run(Statement statement, TransactionConfig config) {
        return new InternalRxStatementResult(() -> {
            CompletableFuture resultCursorFuture = new CompletableFuture();
            this.session.runRx(statement, config).whenComplete((cursor, completionError) -> {
                if (cursor != null) {
                    resultCursorFuture.complete(cursor);
                } else {
                    this.releaseConnectionBeforeReturning(resultCursorFuture, (Throwable)completionError);
                }
            });
            return resultCursorFuture;
        });
    }

    private <T> void releaseConnectionBeforeReturning(CompletableFuture<T> returnFuture, Throwable completionError) {
        Throwable error = Futures.completionExceptionCause(completionError);
        this.session.releaseConnectionAsync().whenComplete((ignored, closeError) -> returnFuture.completeExceptionally(Futures.combineErrors(error, closeError)));
    }

    @Override
    public String lastBookmark() {
        return this.session.lastBookmark();
    }

    public Publisher<Void> reset() {
        return RxUtils.createEmptyPublisher(this.session::resetAsync);
    }

    @Override
    public <T> Publisher<T> close() {
        return RxUtils.createEmptyPublisher(this.session::closeAsync);
    }
}

