/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.messaging;

import io.helidon.messaging.Channel;
import io.helidon.messaging.ContextSubscriber;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.SubmissionPublisher;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public final class Emitter<PAYLOAD>
implements Publisher<Message<PAYLOAD>>,
org.eclipse.microprofile.reactive.messaging.Emitter<PAYLOAD> {
    static final String EMITTER_CONTEXT_PREFIX = "emitter-message";
    private SubmissionPublisher<Message<PAYLOAD>> submissionPublisher;
    private final Set<Channel<PAYLOAD>> channels = new HashSet<Channel<PAYLOAD>>();

    private Emitter() {
    }

    void init(Executor executor, int maxBufferCapacity) {
        this.submissionPublisher = new SubmissionPublisher(executor, maxBufferCapacity);
    }

    public CompletionStage<Void> send(PAYLOAD msg) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.submissionPublisher.submit(Message.of(msg, () -> {
            future.complete(null);
            return CompletableFuture.completedStage(null);
        }));
        return future;
    }

    public <M extends Message<? extends PAYLOAD>> void send(M msg) {
        this.submissionPublisher.submit(msg);
    }

    @Deprecated(forRemoval=true, since="3.0.0")
    public int emit(Message<PAYLOAD> msg) {
        return this.submissionPublisher.submit(msg);
    }

    public void complete() {
        this.submissionPublisher.close();
    }

    public void error(Exception e) {
        this.submissionPublisher.closeExceptionally(e);
    }

    public boolean isCancelled() {
        return this.submissionPublisher.isClosed();
    }

    public boolean hasRequests() {
        return this.submissionPublisher.estimateMinimumDemand() > 0L;
    }

    public void subscribe(Subscriber<? super Message<PAYLOAD>> s) {
        this.submissionPublisher.subscribe(FlowAdapters.toFlowSubscriber(ContextSubscriber.create(EMITTER_CONTEXT_PREFIX, s)));
    }

    Set<Channel<PAYLOAD>> channels() {
        return this.channels;
    }

    public static <PAYLOAD> Emitter<PAYLOAD> create(Channel<PAYLOAD> channel) {
        Builder<PAYLOAD> builder = Emitter.builder().channel(channel);
        return builder.build();
    }

    public static <PAYLOAD> Emitter<PAYLOAD> create(Channel<PAYLOAD> channel, Channel<PAYLOAD> ... channels) {
        Builder<PAYLOAD> builder = Emitter.builder().channel(channel);
        for (Channel<PAYLOAD> ch : channels) {
            builder.channel(ch);
        }
        return builder.build();
    }

    public static <PAYLOAD> Builder<PAYLOAD> builder() {
        return new Builder();
    }

    public static final class Builder<PAYLOAD>
    implements io.helidon.common.Builder<Builder<PAYLOAD>, Emitter<PAYLOAD>> {
        private final Emitter<PAYLOAD> emitter = new Emitter();

        public Builder<PAYLOAD> channel(Channel<PAYLOAD> channel) {
            this.emitter.channels.add(channel);
            return this;
        }

        public Emitter<PAYLOAD> build() {
            return this.emitter;
        }

        public Emitter<PAYLOAD> get() {
            return this.emitter;
        }
    }
}

