/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.cnc;

import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.EventSubscription;
import com.couchbase.client.core.deps.org.jctools.queues.QueueFactory;
import com.couchbase.client.core.deps.org.jctools.queues.spec.ConcurrentQueueSpec;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public class DefaultEventBus
implements EventBus {
    private static final int DEFAULT_QUEUE_CAPACITY = 16384;
    private static final Duration DEFAULT_IDLE_SLEEP_DURATION = Duration.ofMillis(100L);
    private final CopyOnWriteArraySet<Consumer<Event>> subscribers;
    private final Queue<Event> eventQueue;
    private final AtomicBoolean running;
    private final PrintStream errorLogging;
    private final String threadName;
    private final Duration idleSleepDuration;
    private final Scheduler scheduler;
    private volatile Thread runningThread;

    public static Builder builder(Scheduler scheduler) {
        return new Builder(scheduler);
    }

    public static DefaultEventBus create(Scheduler scheduler) {
        return DefaultEventBus.builder(scheduler).build();
    }

    private DefaultEventBus(Builder builder) {
        this.scheduler = builder.scheduler;
        this.subscribers = new CopyOnWriteArraySet();
        this.running = new AtomicBoolean(false);
        this.eventQueue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedMpsc(builder.queueCapacity));
        this.errorLogging = builder.errorLogging.orElse(null);
        this.threadName = builder.threadName;
        this.idleSleepDuration = builder.idleSleepDuration;
    }

    @Override
    public EventSubscription subscribe(Consumer<Event> consumer) {
        this.subscribers.add(consumer);
        return new EventSubscription(this, consumer);
    }

    @Override
    public void unsubscribe(EventSubscription subscription) {
        this.subscribers.remove(subscription.consumer());
    }

    @Override
    public EventBus.PublishResult publish(Event event) {
        if (!this.isRunning()) {
            return EventBus.PublishResult.SHUTDOWN;
        }
        if (this.eventQueue.offer(event)) {
            return EventBus.PublishResult.SUCCESS;
        }
        if (this.errorLogging != null) {
            this.errorLogging.println("Could not publish Event because the queue is full. " + event);
        }
        return EventBus.PublishResult.OVERLOADED;
    }

    @Override
    public Mono<Void> start() {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(false, true)) {
                this.runningThread = new Thread(() -> {
                    long idleSleepTime = this.idleSleepDuration.toMillis();
                    while (this.isRunning() || !this.eventQueue.isEmpty()) {
                        Event event = this.eventQueue.poll();
                        while (event != null) {
                            for (Consumer<Event> subscriber : this.subscribers) {
                                try {
                                    subscriber.accept(event);
                                }
                                catch (Throwable t) {
                                    if (this.errorLogging == null) continue;
                                    this.errorLogging.println("Exception caught in EventBus Consumer: " + t);
                                    t.printStackTrace();
                                }
                            }
                            event = this.eventQueue.poll();
                        }
                        try {
                            if (!this.isRunning()) continue;
                            Thread.sleep(idleSleepTime);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                });
                this.runningThread.setDaemon(true);
                this.runningThread.setName(this.threadName);
                this.runningThread.start();
            }
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> stop(Duration timeout) {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(true, false)) {
                this.runningThread.interrupt();
            }
            return Mono.empty();
        }).then(Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)this.scheduler).takeUntil(i -> !this.runningThread.isAlive()).then()).timeout(timeout, this.scheduler);
    }

    boolean isRunning() {
        return this.running.get();
    }

    boolean hasSubscribers() {
        return !this.subscribers.isEmpty();
    }

    public static class Builder {
        final Scheduler scheduler;
        int queueCapacity;
        Optional<PrintStream> errorLogging;
        String threadName;
        Duration idleSleepDuration;

        Builder(Scheduler scheduler) {
            this.scheduler = scheduler;
            this.queueCapacity = 16384;
            this.errorLogging = Optional.of(System.err);
            this.threadName = "cb-events";
            this.idleSleepDuration = DEFAULT_IDLE_SLEEP_DURATION;
        }

        public Builder queueCapacity(int queueCapacity) {
            this.queueCapacity = queueCapacity;
            return this;
        }

        public Builder errorLogging(Optional<PrintStream> errorLogging) {
            this.errorLogging = errorLogging;
            return this;
        }

        public Builder threadName(String threadName) {
            this.threadName = threadName;
            return this;
        }

        public Builder idleSleepDuration(Duration idleSleepDuration) {
            this.idleSleepDuration = idleSleepDuration;
            return this;
        }

        public DefaultEventBus build() {
            return new DefaultEventBus(this);
        }
    }
}

