/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.ResultStream;
import jakarta.annotation.Nonnull;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.axonframework.common.AxonException;
import org.axonframework.common.annotations.Internal;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.axonframework.queryhandling.QueryResponseMessage;

@Internal
public abstract class AbstractQueryResponseMessageStream<T>
implements MessageStream<QueryResponseMessage> {
    private final ResultStream<T> stream;
    private final AtomicReference<Throwable> error = new AtomicReference();
    private final AtomicReference<Runnable> callback = new AtomicReference<Runnable>(() -> {});

    public AbstractQueryResponseMessageStream(@Nonnull ResultStream<T> stream) {
        this.stream = Objects.requireNonNull(stream, "The query result stream cannot be null.");
    }

    public Optional<MessageStream.Entry<QueryResponseMessage>> next() {
        return Optional.ofNullable(this.stream.nextIfAvailable()).flatMap(this::toEntry);
    }

    public Optional<MessageStream.Entry<QueryResponseMessage>> peek() {
        return Optional.ofNullable(this.stream.peek()).flatMap(this::toEntry);
    }

    public void onAvailable(@Nonnull Runnable callback) {
        this.callback.set(callback);
        this.stream.onAvailable(callback);
    }

    @Nonnull
    public Optional<Throwable> error() {
        return Optional.ofNullable(this.error.get()).or(() -> this.stream.getError());
    }

    public boolean isCompleted() {
        return this.error.get() != null || this.stream.isClosed();
    }

    public boolean hasNextAvailable() {
        return this.error.get() == null && this.stream.peek() != null;
    }

    public void close() {
        if (!this.stream.isClosed()) {
            this.stream.close();
        }
    }

    @Nonnull
    private Optional<MessageStream.Entry<QueryResponseMessage>> toEntry(@Nonnull T t) {
        if (this.isError(t)) {
            this.error.set((Throwable)this.createAxonException(t));
            this.close();
            this.callback.get().run();
            return Optional.empty();
        }
        return Optional.of(new SimpleEntry((Message)this.buildResponseMessage(t), Context.empty()));
    }

    @Nonnull
    protected abstract QueryResponseMessage buildResponseMessage(@Nonnull T var1);

    @Nonnull
    protected abstract AxonException createAxonException(@Nonnull T var1);

    protected abstract boolean isError(@Nonnull T var1);
}

