/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.axonframework.messaging.DelegatingMessageStream;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;

public class CloseCallbackMessageStream<M extends Message>
extends DelegatingMessageStream<M, M> {
    private final Runnable closeHandler;
    private final AtomicBoolean invoked = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public CloseCallbackMessageStream(@Nonnull MessageStream<M> delegate, @Nonnull Runnable closeHandler) {
        super(delegate);
        this.closeHandler = Objects.requireNonNull(closeHandler, "Close handler may not be null.");
    }

    static <M extends Message> MessageStream.Single<M> single(@Nonnull MessageStream.Single<M> delegate, Runnable closeHandler) {
        return new CloseCallbackMessageStream<M>(delegate, closeHandler).first();
    }

    static <M extends Message> MessageStream.Empty<M> empty(@Nonnull MessageStream.Empty<M> delegate, Runnable closeHandler) {
        return new CloseCallbackMessageStream<M>(delegate, closeHandler).ignoreEntries();
    }

    @Override
    public Optional<MessageStream.Entry<M>> next() {
        Optional<MessageStream.Entry<M>> next = this.delegate().next();
        this.invokeCloseHandlerIfClosed();
        return next;
    }

    private void invokeCloseHandlerIfClosed() {
        if ((this.closed.get() || this.isCompleted()) && !this.invoked.getAndSet(true)) {
            this.closeHandler.run();
        }
    }

    @Override
    public Optional<MessageStream.Entry<M>> peek() {
        this.invokeCloseHandlerIfClosed();
        return this.delegate().peek();
    }

    @Override
    public void onAvailable(@Nonnull Runnable callback) {
        super.onAvailable(() -> {
            callback.run();
            this.invokeCloseHandlerIfClosed();
        });
    }

    @Override
    public void close() {
        this.closed.set(true);
        super.close();
        this.invokeCloseHandlerIfClosed();
    }

    @Override
    public boolean hasNextAvailable() {
        this.invokeCloseHandlerIfClosed();
        return super.hasNextAvailable();
    }
}

