/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.core;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.axonframework.messaging.core.FluxUtils;
import org.axonframework.messaging.core.Message;
import org.axonframework.messaging.core.MessageStream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

class FluxMessageStream<M extends Message>
implements MessageStream<M> {
    private final Flux<MessageStream.Entry<M>> source;
    private final BlockingQueue<MessageStream.Entry<M>> peeked = new LinkedBlockingQueue<MessageStream.Entry<M>>(5);
    private final AtomicBoolean sourceSubscribed = new AtomicBoolean();
    private final AtomicReference<Subscription> subscription = new AtomicReference();
    private final AtomicReference<Runnable> availabilityCallback = new AtomicReference<Runnable>(() -> {});
    private final AtomicReference<Throwable> error = new AtomicReference();
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    FluxMessageStream(@Nonnull Flux<MessageStream.Entry<M>> source) {
        this.source = source;
    }

    @Override
    public <RM extends Message> MessageStream<RM> map(@Nonnull Function<MessageStream.Entry<M>, MessageStream.Entry<RM>> mapper) {
        return new FluxMessageStream<M>(this.source.map(mapper));
    }

    @Override
    public <R> CompletableFuture<R> reduce(@Nonnull R identity, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> accumulator) {
        return this.source.reduce(identity, accumulator).toFuture();
    }

    @Override
    public Optional<MessageStream.Entry<M>> next() {
        this.subscribeToSource();
        MessageStream.Entry poll = (MessageStream.Entry)this.peeked.poll();
        if (poll != null && this.peeked.isEmpty() && !this.closed.get()) {
            this.subscription.get().request(1L);
        }
        return Optional.ofNullable(poll);
    }

    @Override
    public void setCallback(@Nonnull Runnable callback) {
        this.availabilityCallback.set(callback);
        if (this.hasNextAvailable() || this.isCompleted()) {
            callback.run();
        }
        this.subscribeToSource();
    }

    @Override
    public Optional<Throwable> error() {
        if (this.peeked.isEmpty()) {
            return Optional.ofNullable(this.error.get());
        }
        return Optional.empty();
    }

    @Override
    public boolean isCompleted() {
        return this.peeked.isEmpty() && this.completed.get();
    }

    @Override
    public boolean hasNextAvailable() {
        this.subscribeToSource();
        return !this.peeked.isEmpty();
    }

    @Override
    public void close() {
        this.closed.set(true);
        Subscription s = this.subscription.get();
        if (s != null) {
            s.cancel();
        }
    }

    @Override
    public Optional<MessageStream.Entry<M>> peek() {
        this.subscribeToSource();
        return Optional.ofNullable((MessageStream.Entry)this.peeked.peek());
    }

    private void subscribeToSource() {
        if (!this.sourceSubscribed.getAndSet(true)) {
            this.source.subscribe(new Subscriber<MessageStream.Entry<M>>(){

                public void onSubscribe(Subscription s) {
                    FluxMessageStream.this.subscription.set(s);
                    s.request(1L);
                }

                public void onNext(MessageStream.Entry<M> mEntry) {
                    FluxMessageStream.this.peeked.add(mEntry);
                    FluxMessageStream.this.availabilityCallback.get().run();
                }

                public void onError(Throwable t) {
                    FluxMessageStream.this.error.set(t);
                    FluxMessageStream.this.completed.set(true);
                    FluxMessageStream.this.availabilityCallback.get().run();
                }

                public void onComplete() {
                    FluxMessageStream.this.error.set(null);
                    FluxMessageStream.this.completed.set(true);
                    FluxMessageStream.this.availabilityCallback.get().run();
                }
            });
        }
    }

    @Override
    public MessageStream<M> onErrorContinue(@Nonnull Function<Throwable, MessageStream<M>> onError) {
        return new FluxMessageStream<M>(this.source.onErrorResume(exception -> FluxUtils.of((MessageStream)onError.apply((Throwable)exception))));
    }
}

