package cn.kstry.framework.core.engine.future;

import cn.kstry.framework.core.engine.FlowRegister;
import cn.kstry.framework.core.enums.AsyncTaskState;
import cn.kstry.framework.core.exception.ExceptionEnum;
import cn.kstry.framework.core.exception.KstryException;
import cn.kstry.framework.core.util.AssertUtil;
import cn.kstry.framework.core.util.ExceptionUtil;
import cn.kstry.framework.core.util.GlobalUtil;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:cn/kstry/framework/core/engine/future/FlowTaskSubscriber.class */
public abstract class FlowTaskSubscriber extends BaseSubscriber<Object> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlowTaskSubscriber.class);
    protected final FlowRegister flowRegister;
    private final String taskName;
    private final Integer timeout;
    private final boolean strictMode;
    private final Runnable threadSwitchHook;
    private final Runnable threadSwitchClear;
    private final AtomicBoolean alreadyInvoke = new AtomicBoolean(false);

    public FlowTaskSubscriber(Runnable runnable, Runnable runnable2, boolean z, Integer num, FlowRegister flowRegister, String str) {
        this.threadSwitchHook = runnable;
        this.threadSwitchClear = runnable2;
        this.flowRegister = flowRegister;
        this.taskName = str;
        this.timeout = num;
        this.strictMode = z;
    }

    protected void hookOnSubscribe(@Nonnull Subscription subscription) {
        request(1L);
    }

    protected void hookOnNext(@Nonnull Object obj) {
        AssertUtil.notTrue(Boolean.valueOf(this.flowRegister.getAdminFuture().isCancelled(this.flowRegister.getStartEventId())), ExceptionEnum.ASYNC_TASK_INTERRUPTED, "Task interrupted. Story task was interrupted! taskName: {}", this.taskName);
        try {
            if (alreadyInvoke()) {
                return;
            }
            try {
                this.threadSwitchHook.run();
                doNextHook(obj);
                this.threadSwitchClear.run();
                dispose();
            } catch (Throwable th) {
                this.flowRegister.getAdminFuture().errorNotice(th, this.flowRegister.getStartEventId());
                this.threadSwitchClear.run();
                dispose();
            }
        } catch (Throwable th2) {
            this.threadSwitchClear.run();
            dispose();
            throw th2;
        }
    }

    protected void hookOnComplete() {
        AssertUtil.notTrue(Boolean.valueOf(this.flowRegister.getAdminFuture().isCancelled(this.flowRegister.getStartEventId())), ExceptionEnum.ASYNC_TASK_INTERRUPTED, "Task interrupted. Story task was interrupted! taskName: {}", this.taskName);
        if (alreadyInvoke()) {
            return;
        }
        try {
            this.threadSwitchHook.run();
            doCompleteHook();
        } catch (Throwable th) {
            this.flowRegister.getAdminFuture().errorNotice(th, this.flowRegister.getStartEventId());
        } finally {
            this.threadSwitchClear.run();
            dispose();
        }
    }

    protected void hookOnError(@Nonnull Throwable th) {
        try {
            this.threadSwitchHook.run();
            doErrorHook(th);
        } catch (Throwable th2) {
            LOGGER.warn(th2.getMessage(), th2);
        } finally {
            this.threadSwitchClear.run();
            dispose();
        }
    }

    protected void hookFinally(@Nonnull SignalType signalType) {
        try {
            try {
                this.threadSwitchHook.run();
                doFinallyHook();
                this.threadSwitchClear.run();
            } catch (Throwable th) {
                this.flowRegister.getAdminFuture().errorNotice(th, this.flowRegister.getStartEventId());
                this.threadSwitchClear.run();
            }
        } catch (Throwable th2) {
            this.threadSwitchClear.run();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncTaskState hookTimeout() {
        try {
            if (alreadyInvoke()) {
                return AsyncTaskState.TIMEOUT;
            }
            this.threadSwitchHook.run();
            return doTimeoutHook();
        } catch (Throwable th) {
            LOGGER.warn(th.getMessage(), th);
            return AsyncTaskState.TIMEOUT;
        } finally {
            this.threadSwitchClear.run();
            dispose();
        }
    }

    public Integer getTimeout() {
        return this.timeout;
    }

    public boolean isStrictMode() {
        return this.strictMode;
    }

    protected void doNextHook(Object obj) {
    }

    protected void doErrorHook(Throwable th) {
    }

    protected void doCompleteHook() {
    }

    protected void doFinallyHook() {
    }

    protected AsyncTaskState doTimeoutHook() {
        KstryException buildException = ExceptionUtil.buildException(null, ExceptionEnum.ASYNC_TASK_TIMEOUT, GlobalUtil.format("Async task timeout! maximum time limit: {}ms, taskName: {}", this.timeout, this.taskName));
        try {
            onError(buildException);
            return AsyncTaskState.TIMEOUT;
        } finally {
            this.flowRegister.getAdminFuture().errorNotice(buildException, this.flowRegister.getStartEventId());
        }
    }

    private boolean alreadyInvoke() {
        return !this.alreadyInvoke.compareAndSet(false, true);
    }
}
