/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.grpc.query.QueryUpdate;
import org.axonframework.axonserver.connector.event.util.GrpcExceptionParser;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.queryhandling.DefaultQueryBusSpanFactory;
import org.axonframework.queryhandling.QueryBusSpanFactory;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class AxonServerSubscriptionQueryResult<I, U>
implements SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> {
    private final Logger logger = LoggerFactory.getLogger(AxonServerSubscriptionQueryResult.class);
    private final Mono<QueryResponseMessage<I>> initialResult;
    private final io.axoniq.axonserver.connector.query.SubscriptionQueryResult result;
    private final Flux<SubscriptionQueryUpdateMessage<U>> updates = Flux.create(fluxSink -> {
        fluxSink.onRequest(count -> {
            int i = 0;
            while ((long)i < count) {
                QueryUpdate next = (QueryUpdate)result.updates().nextIfAvailable();
                if (next == null) {
                    if (!result.updates().isClosed()) break;
                    this.completeFlux((FluxSink<SubscriptionQueryUpdateMessage<U>>)fluxSink, result.updates().getError().orElse(null));
                    break;
                }
                this.publishQueryUpdate(queryMessage, subscriptionSerializer, spanFactory, (FluxSink<SubscriptionQueryUpdateMessage<U>>)fluxSink, next);
                ++i;
            }
        });
        fluxSink.onDispose(() -> {
            this.logger.debug("Flux was disposed. Will close subscription query");
            result.updates().close();
        });
        result.updates().onAvailable(() -> {
            if (fluxSink.requestedFromDownstream() > 0L) {
                QueryUpdate next = (QueryUpdate)result.updates().nextIfAvailable();
                if (next != null) {
                    this.publishQueryUpdate(queryMessage, subscriptionSerializer, spanFactory, (FluxSink<SubscriptionQueryUpdateMessage<U>>)fluxSink, next);
                }
            } else {
                this.logger.trace("Not sending update to Flux Sink. Not enough info requested");
            }
            if (result.updates().isClosed()) {
                this.completeFlux((FluxSink<SubscriptionQueryUpdateMessage<U>>)fluxSink, result.updates().getError().orElse(null));
            }
        });
    }).doOnError(e -> result.updates().close());

    @Deprecated
    public AxonServerSubscriptionQueryResult(io.axoniq.axonserver.connector.query.SubscriptionQueryResult result, SubscriptionMessageSerializer subscriptionSerializer) {
        this(result, (SpanFactory)NoOpSpanFactory.INSTANCE, subscriptionSerializer);
    }

    @Deprecated
    public AxonServerSubscriptionQueryResult(io.axoniq.axonserver.connector.query.SubscriptionQueryResult result, SpanFactory spanFactory, SubscriptionMessageSerializer subscriptionSerializer) {
        this(null, result, subscriptionSerializer, (QueryBusSpanFactory)DefaultQueryBusSpanFactory.builder().spanFactory(spanFactory).build(), (Span)new NoOpSpanFactory.NoOpSpan());
    }

    public AxonServerSubscriptionQueryResult(SubscriptionQueryMessage<?, ?, ?> queryMessage, io.axoniq.axonserver.connector.query.SubscriptionQueryResult result, SubscriptionMessageSerializer subscriptionSerializer, QueryBusSpanFactory spanFactory, Span parentSpan) {
        this.initialResult = Mono.fromCompletionStage(() -> ((io.axoniq.axonserver.connector.query.SubscriptionQueryResult)result).initialResult()).onErrorMap(GrpcExceptionParser::parse).doOnError(arg_0 -> ((Span)parentSpan).recordException(arg_0)).doOnTerminate(() -> ((Span)parentSpan).end()).map(subscriptionSerializer::deserialize);
        this.result = result;
    }

    private void publishQueryUpdate(SubscriptionQueryMessage<?, ?, ?> queryMessage, SubscriptionMessageSerializer subscriptionSerializer, QueryBusSpanFactory spanFactory, FluxSink<SubscriptionQueryUpdateMessage<U>> fluxSink, QueryUpdate next) {
        SubscriptionQueryUpdateMessage message = subscriptionSerializer.deserialize(next);
        spanFactory.createSubscriptionQueryProcessUpdateSpan(message, queryMessage).run(() -> fluxSink.next((Object)message));
    }

    private void completeFlux(FluxSink<SubscriptionQueryUpdateMessage<U>> fluxSink, Throwable error) {
        if (error != null) {
            fluxSink.error(error);
        } else {
            fluxSink.complete();
        }
    }

    public Mono<QueryResponseMessage<I>> initialResult() {
        return this.initialResult;
    }

    public Flux<SubscriptionQueryUpdateMessage<U>> updates() {
        return this.updates;
    }

    public boolean cancel() {
        this.result.updates().close();
        return true;
    }
}

