/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.application;

import io.atleon.application.ConfiguredAloStream;
import io.atleon.core.AloStream;
import io.atleon.core.AloStreamConfig;
import io.atleon.core.StarterStopper;
import io.atleon.core.StarterStopperConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public abstract class AbstractConfiguredAloStream<C extends AloStreamConfig>
implements ConfiguredAloStream {
    private final AloStream<C> stream;
    private final C config;
    private final StarterStopper starterStopper;
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private volatile Disposable startStopDisposable;

    public AbstractConfiguredAloStream(AloStream<C> stream, C config) {
        this.stream = stream;
        this.config = config;
        this.starterStopper = config instanceof StarterStopperConfig ? ((StarterStopperConfig)StarterStopperConfig.class.cast(config)).buildStarterStopper() : null;
    }

    @Override
    public void start() {
        this.doStartStop((Flux<Boolean>)(this.starterStopper == null ? Flux.just((Object)true) : this.starterStopper.startStop()));
    }

    @Override
    public void stop() {
        this.doStartStop((Flux<Boolean>)Flux.just((Object)false));
    }

    @Override
    public String name() {
        return this.config.name();
    }

    @Override
    public AloStream.State state() {
        return this.stream.state();
    }

    public String toString() {
        return this.getClass().getSimpleName() + "{stream=" + this.stream + ", config=" + this.config + '}';
    }

    protected void applicationReady(boolean autoStart) {
        this.doStartStop((Flux<Boolean>)(this.starterStopper == null ? Flux.just((Object)autoStart) : this.starterStopper.startStop()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doStartStop(Flux<Boolean> startStop) {
        AloStream<C> aloStream = this.stream;
        synchronized (aloStream) {
            if (this.startStopDisposable != null && !this.startStopDisposable.isDisposed()) {
                this.safelyExecute(() -> ((Disposable)this.startStopDisposable).dispose());
            }
            this.startStopDisposable = startStop.subscribe(this::doStartStop, this::logStartStopError, this::logStartStopCompletion);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doStartStop(boolean start) {
        AloStream<C> aloStream = this.stream;
        synchronized (aloStream) {
            if (!start) {
                this.safelyExecute(() -> this.stream.stop());
            } else if (this.stream.state() == AloStream.State.STOPPED) {
                this.safelyExecute(() -> this.stream.start(this.config));
            }
        }
    }

    private void safelyExecute(Runnable runnable) {
        try {
            runnable.run();
        }
        catch (Exception e) {
            this.logger.warn("Failed to safely execute where stream name={}", (Object)this.name(), (Object)e);
        }
    }

    private void logStartStopError(Throwable error) {
        this.logger.error("Dynamic start-stop has failed where stream name={} and state={}", new Object[]{this.name(), this.state(), error});
    }

    private void logStartStopCompletion() {
        this.logger.debug("Dynamic start-stop is completed where stream name={} and state={}", (Object)this.name(), (Object)this.state());
    }
}

