/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.data.connection.async;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.propagation.ReactorPropagation;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.data.connection.ConnectionDefinition;
import io.micronaut.data.connection.ConnectionStatus;
import io.micronaut.data.connection.async.AsyncConnectionOperations;
import io.micronaut.data.connection.reactive.ReactorConnectionOperations;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.jspecify.annotations.NonNull;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

@Internal
public final class AsyncUsingReactiveConnectionOperations<C>
implements AsyncConnectionOperations<C> {
    private final ReactorConnectionOperations<C> reactorConnectionOperations;

    public AsyncUsingReactiveConnectionOperations(ReactorConnectionOperations<C> reactorConnectionOperations) {
        this.reactorConnectionOperations = reactorConnectionOperations;
    }

    @Override
    public Optional<ConnectionStatus<C>> findConnectionStatus() {
        return Optional.ofNullable(this.reactorConnectionOperations.getConnectionStatus((ContextView)ReactorPropagation.addPropagatedContext((Context)Context.empty(), (PropagatedContext)PropagatedContext.getOrEmpty())));
    }

    @Override
    public <T> CompletionStage<T> withConnection(ConnectionDefinition definition, Function<ConnectionStatus<C>, CompletionStage<T>> handler) {
        return AsyncUsingReactiveConnectionOperations.onCompleteCompleteFuture(Mono.fromDirect(this.reactorConnectionOperations.withConnection(definition, (ConnectionStatus<C> status) -> Mono.deferContextual(contextView -> Mono.fromCompletionStage(() -> {
            PropagatedContext propagatedContext = ReactorPropagation.findPropagatedContext((ContextView)contextView).orElseGet(PropagatedContext::getOrEmpty);
            return status.propagate(propagatedContext, () -> (CompletionStage)handler.apply((ConnectionStatus)status));
        })))));
    }

    private static <T> CompletableFuture<T> onCompleteCompleteFuture(Publisher<T> publisher) {
        final PropagatedContext propagatedContext = PropagatedContext.getOrEmpty();
        final CompletableFuture completableFuture = new CompletableFuture();
        publisher.subscribe((Subscriber)new CoreSubscriber<T>(){
            private T result;

            public @NonNull Context currentContext() {
                return ReactorPropagation.addPropagatedContext((Context)Context.empty(), (PropagatedContext)propagatedContext);
            }

            public void onSubscribe(Subscription s) {
                s.request(1L);
            }

            public void onNext(T t) {
                this.result = t;
            }

            public void onError(Throwable t) {
                propagatedContext.propagate(() -> {
                    completableFuture.completeExceptionally(t);
                    return null;
                });
            }

            public void onComplete() {
                propagatedContext.propagate(() -> {
                    completableFuture.complete(this.result);
                    return null;
                });
            }
        });
        return completableFuture;
    }
}

