/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.fabric;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.neo4j.configuration.connectors.ConnectorPortRegister;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.internal.shaded.reactor.core.publisher.Flux;
import org.neo4j.driver.internal.shaded.reactor.core.publisher.Mono;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.fabric.DriverUtils;
import org.neo4j.test.extension.BoltDbmsExtension;
import org.neo4j.test.extension.Inject;
import org.reactivestreams.Publisher;

@BoltDbmsExtension
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
class BoltLocalResultStreamTest {
    @Inject
    private static ConnectorPortRegister connectorPortRegister;
    private static Driver driver;

    BoltLocalResultStreamTest() {
    }

    @BeforeAll
    static void beforeAll() {
        driver = DriverUtils.createDriver(connectorPortRegister);
    }

    @AfterAll
    static void tearDown() {
        driver.close();
    }

    @Test
    void testBasicResultStream() {
        List result = this.inTx(tx -> tx.run("UNWIND range(0, 4) AS i RETURN 'r' + i as A").stream().map(r -> r.get("A").asString()).collect(Collectors.toList()));
        Assertions.assertThat((List)result).isEqualTo(List.of("r0", "r1", "r2", "r3", "r4"));
    }

    @Test
    void testRxResultStream() {
        List result = this.inRxTx(tx -> {
            RxResult statementResult = tx.run("UNWIND range(0, 4) AS i RETURN 'r' + i as A");
            return ((List)Flux.from((Publisher)statementResult.records()).limitRate(1).collectList().block()).stream().map(r -> r.get("A").asString()).collect(Collectors.toList());
        });
        Assertions.assertThat((List)result).isEqualTo(List.of("r0", "r1", "r2", "r3", "r4"));
    }

    @Test
    void testPartialStream() {
        List result = this.inRxTx(tx -> {
            RxResult statementResult = tx.run("UNWIND range(0, 4) AS i RETURN 'r' + i as A");
            return ((List)Flux.from((Publisher)statementResult.records()).limitRequest(2L).collectList().block()).stream().map(r -> r.get("A").asString()).collect(Collectors.toList());
        });
        Assertions.assertThat((List)result).isEqualTo(List.of("r0", "r1"));
    }

    private <T> T inTx(Function<Transaction, T> workload) {
        try (Session session = driver.session();){
            Object object = session.writeTransaction(workload::apply);
            return (T)object;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T inRxTx(Function<RxTransaction, T> workload) {
        RxSession session = driver.rxSession();
        try {
            T t;
            RxTransaction tx = (RxTransaction)Mono.from((Publisher)session.beginTransaction()).block();
            try {
                t = workload.apply(tx);
            }
            catch (Throwable throwable) {
                Mono.from((Publisher)tx.rollback()).block();
                throw throwable;
            }
            Mono.from((Publisher)tx.rollback()).block();
            return t;
        }
        finally {
            Mono.from((Publisher)session.close()).block();
        }
    }
}

