/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.grpc.query.QueryUpdate;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class AxonServerSubscriptionQueryResult<I, U>
implements SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> {
    private final Logger logger = LoggerFactory.getLogger(AxonServerSubscriptionQueryResult.class);
    private final Mono<QueryResponseMessage<I>> initialResult;
    private final io.axoniq.axonserver.connector.query.SubscriptionQueryResult result;
    private final Flux<SubscriptionQueryUpdateMessage<U>> updates = Flux.create(fluxSink -> {
        fluxSink.onRequest(count -> {
            int i = 0;
            while ((long)i < count) {
                QueryUpdate next = (QueryUpdate)result.updates().nextIfAvailable();
                if (next != null) {
                    fluxSink.next((Object)next);
                } else if (result.updates().isClosed()) {
                    fluxSink.complete();
                }
                ++i;
            }
        });
        fluxSink.onDispose(() -> {
            this.logger.debug("Flux was disposed. Will close subscription query");
            result.updates().close();
        });
        result.updates().onAvailable(() -> {
            if (fluxSink.requestedFromDownstream() > 0L) {
                QueryUpdate next = (QueryUpdate)result.updates().nextIfAvailable();
                if (next != null) {
                    fluxSink.next((Object)next);
                }
            } else {
                this.logger.trace("Not sending update to Flux Sink. Not enough info requested");
            }
            if (result.updates().isClosed()) {
                fluxSink.complete();
            }
        });
    }).doOnError(e -> result.updates().close()).map(subscriptionSerializer::deserialize);

    public AxonServerSubscriptionQueryResult(io.axoniq.axonserver.connector.query.SubscriptionQueryResult result, SubscriptionMessageSerializer subscriptionSerializer) {
        this.initialResult = Mono.fromCompletionStage(() -> ((io.axoniq.axonserver.connector.query.SubscriptionQueryResult)result).initialResult()).map(subscriptionSerializer::deserialize);
        this.result = result;
    }

    public Mono<QueryResponseMessage<I>> initialResult() {
        return this.initialResult;
    }

    public Flux<SubscriptionQueryUpdateMessage<U>> updates() {
        return this.updates;
    }

    public boolean cancel() {
        this.result.updates().close();
        return true;
    }
}

