/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.internal.reactivestreams;

import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.neo4j.driver.Record;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.observation.DriverObservationProvider;
import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.observation.util.ObservationUtil;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.summary.ResultSummary;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class InternalReactiveResult
implements ReactiveResult {
    private final RxResultCursor cursor;
    private final DriverObservationProvider observationProvider;

    public InternalReactiveResult(RxResultCursor cursor, DriverObservationProvider observationProvider) {
        this.cursor = cursor;
        this.observationProvider = Objects.requireNonNull(observationProvider);
    }

    @Override
    public List<String> keys() {
        return this.cursor.keys();
    }

    @Override
    public Publisher<Record> records() {
        Observation recordsObservation = this.observationProvider.resultRecords(ReactiveResult.class);
        return ObservationUtil.observeStreams(recordsObservation, Flux.create(sink -> {
            if (this.cursor.isDone()) {
                sink.error((Throwable)ErrorUtil.newResultConsumedError());
            } else {
                this.cursor.installRecordConsumer(this.createRecordConsumer((FluxSink<Record>)sink), recordsObservation);
                sink.onCancel(() -> ((RxResultCursor)this.cursor).cancel());
                sink.onRequest(arg_0 -> ((RxResultCursor)this.cursor).request(arg_0));
            }
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.IGNORE));
    }

    @Override
    public Publisher<ResultSummary> consume() {
        Observation consumeObservation = this.observationProvider.resultConsume(ReactiveResult.class);
        return ObservationUtil.observeStreams(consumeObservation, Mono.create(sink -> this.cursor.summaryAsync(consumeObservation).whenComplete((summary, summaryCompletionError) -> {
            Throwable error = Futures.completionExceptionCause(summaryCompletionError);
            if (summary != null) {
                sink.success(summary);
            } else {
                sink.error(error);
            }
        })));
    }

    @Override
    public Publisher<Boolean> isOpen() {
        return Mono.just((Object)(!this.cursor.isDone() ? 1 : 0));
    }

    private BiConsumer<Record, Throwable> createRecordConsumer(FluxSink<Record> sink) {
        return (r, e) -> {
            if (r != null) {
                sink.next(r);
            } else if (e != null) {
                sink.error(e);
            } else {
                sink.complete();
            }
        };
    }
}

