/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.AloStreamConfig;
import io.atleon.util.Throwing;
import java.util.concurrent.atomic.AtomicReference;
import org.jspecify.annotations.NullMarked;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@NullMarked
public abstract class AloStream<C extends AloStreamConfig> {
    private static final Disposable EMPTY = () -> {};
    private static final Disposable STARTING = () -> {};
    private final AtomicReference<Disposable> disposableReference = new AtomicReference<Disposable>(EMPTY);

    public final synchronized void start(C config) {
        if (!this.disposableReference.compareAndSet(EMPTY, STARTING) && !this.disposableReference.get().isDisposed()) {
            throw new UnsupportedOperationException("Cannot start AloStream that is already starting/started");
        }
        try {
            this.disposableReference.set(this.startDisposable(config));
        }
        catch (Throwable error) {
            this.disposableReference.set(EMPTY);
            throw Throwing.propagate((Throwable)error);
        }
    }

    public final synchronized void stop() {
        this.disposableReference.getAndSet(EMPTY).dispose();
    }

    public final State state() {
        Disposable disposable = this.disposableReference.get();
        if (disposable == STARTING) {
            return State.STARTING;
        }
        if (disposable == EMPTY || disposable.isDisposed()) {
            return State.STOPPED;
        }
        return State.STARTED;
    }

    protected abstract Disposable startDisposable(C var1);

    protected static Scheduler newBoundedElasticScheduler(String name, int threadCap) {
        return Schedulers.newBoundedElastic((int)threadCap, (int)Integer.MAX_VALUE, (String)name);
    }

    public static enum State {
        STOPPED,
        STARTING,
        STARTED;

    }
}

