/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import reactor.core.publisher.Flux;

class ConcatenatingMessageStream<M extends Message<?>>
implements MessageStream<M> {
    private final MessageStream<M> first;
    private final MessageStream<M> second;

    ConcatenatingMessageStream(@Nonnull MessageStream<M> first, @Nonnull MessageStream<M> second) {
        this.first = first;
        this.second = second;
    }

    @Override
    public Flux<MessageStream.Entry<M>> asFlux() {
        return this.first.asFlux().concatWith(this.second.asFlux());
    }

    @Override
    public Optional<MessageStream.Entry<M>> next() {
        if (this.first.isCompleted() && this.first.error().isEmpty()) {
            return this.second.next();
        }
        return this.first.next();
    }

    @Override
    public void onAvailable(@Nonnull Runnable callback) {
        this.first.onAvailable(() -> {
            if (this.first.isCompleted() && this.first.error().isEmpty()) {
                this.second.onAvailable(callback);
            } else {
                callback.run();
            }
        });
    }

    @Override
    public Optional<Throwable> error() {
        return this.first.isCompleted() ? this.first.error().or(this.second::error) : this.first.error();
    }

    @Override
    public boolean isCompleted() {
        return this.first.isCompleted() && this.second.isCompleted();
    }

    @Override
    public boolean hasNextAvailable() {
        return this.first.isCompleted() && this.first.error().isEmpty() ? this.second.hasNextAvailable() : this.first.hasNextAvailable();
    }

    @Override
    public void close() {
        this.first.close();
        this.second.close();
    }

    @Override
    public <R> CompletableFuture<R> reduce(@Nonnull R identity, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> accumulator) {
        return this.first.reduce(identity, accumulator).thenCompose(intermediate -> this.second.reduce(intermediate, accumulator));
    }
}

