/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.function.Predicate;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;

class FilteringMessageStream<M extends Message>
implements MessageStream<M> {
    private final MessageStream<M> delegate;
    private final Predicate<MessageStream.Entry<M>> filter;
    private MessageStream.Entry<M> peeked = null;

    FilteringMessageStream(@Nonnull MessageStream<M> delegate, @Nonnull Predicate<MessageStream.Entry<M>> filter) {
        this.delegate = delegate;
        this.filter = filter;
    }

    @Override
    public Optional<MessageStream.Entry<M>> next() {
        if (this.peeked != null) {
            MessageStream.Entry<M> result = this.peeked;
            this.peeked = null;
            return Optional.of(result);
        }
        Optional<MessageStream.Entry<M>> result = this.delegate.next();
        while (result.isPresent() && !this.filter.test(result.get())) {
            result = this.delegate.next();
        }
        return result;
    }

    @Override
    public Optional<MessageStream.Entry<M>> peek() {
        if (this.peeked != null) {
            return Optional.of(this.peeked);
        }
        Optional<MessageStream.Entry<M>> result = this.delegate.next();
        while (result.isPresent() && !this.filter.test(result.get())) {
            result = this.delegate.next();
        }
        if (result.isPresent()) {
            this.peeked = result.get();
            return Optional.of(this.peeked);
        }
        return Optional.empty();
    }

    @Override
    public void onAvailable(@Nonnull Runnable callback) {
        this.delegate.onAvailable(callback);
    }

    @Override
    public Optional<Throwable> error() {
        return this.delegate.error();
    }

    @Override
    public boolean isCompleted() {
        return this.delegate.isCompleted() && this.peeked == null;
    }

    @Override
    public boolean hasNextAvailable() {
        return this.peeked != null || this.peek().isPresent();
    }

    @Override
    public void close() {
        this.delegate.close();
    }

    static class Single<M extends Message>
    extends FilteringMessageStream<M>
    implements MessageStream.Single<M> {
        Single(@Nonnull MessageStream.Single<M> delegate, @Nonnull Predicate<MessageStream.Entry<M>> filter) {
            super(delegate, filter);
        }
    }
}

