/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.control;

import org.reactivestreams.Subscriber;
import reactor.fn.Consumer;
import reactor.rx.action.Action;

public class StreamStateCallbackAction<T>
extends Action<T, T> {
    private final Consumer<? super Subscriber<? super T>> subscribeConsumer;
    private final Consumer<Void> cancelConsumer;

    public StreamStateCallbackAction(Consumer<? super Subscriber<? super T>> subscribeConsumer, Consumer<Void> cancelConsumer) {
        this.subscribeConsumer = subscribeConsumer;
        this.cancelConsumer = cancelConsumer;
    }

    @Override
    protected void doNext(T ev) {
        this.broadcastNext(ev);
    }

    @Override
    protected void doShutdown() {
        if (this.cancelConsumer != null) {
            this.cancelConsumer.accept(null);
        }
        super.doShutdown();
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        if (this.subscribeConsumer != null) {
            this.subscribeConsumer.accept(subscriber);
        }
        super.subscribe(subscriber);
    }
}

