/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.dag.physical;

import com.hazelcast.map.IMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.NonNull;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.dag.physical.UnknownPhysicalPlanException;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhysicalPlan {
    private static final Logger log = LoggerFactory.getLogger(PhysicalPlan.class);
    private final List<SubPlan> pipelineList;
    private final AtomicInteger finishedPipelineNum = new AtomicInteger(0);
    private final AtomicInteger canceledPipelineNum = new AtomicInteger(0);
    private final AtomicInteger failedPipelineNum = new AtomicInteger(0);
    private final JobImmutableInformation jobImmutableInformation;
    private final IMap<Object, Object> runningJobStateIMap;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private CompletableFuture<JobResult> jobEndFuture;
    private final AtomicReference<String> errorBySubPlan = new AtomicReference();
    private final String jobFullName;
    private final long jobId;
    private JobMaster jobMaster;
    private Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourceFutures = new HashMap<TaskGroupLocation, CompletableFuture<SlotProfile>>();
    private boolean makeJobEndWhenPipelineEnded = true;
    private volatile boolean isRunning = false;

    public PhysicalPlan(@NonNull List<SubPlan> pipelineList, @NonNull ExecutorService executorService, @NonNull JobImmutableInformation jobImmutableInformation, long initializationTimestamp, @NonNull IMap<Object, Object> runningJobStateIMap, @NonNull IMap runningJobStateTimestampsIMap) {
        if (pipelineList == null) {
            throw new NullPointerException("pipelineList is marked non-null but is null");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService is marked non-null but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked non-null but is null");
        }
        if (runningJobStateIMap == null) {
            throw new NullPointerException("runningJobStateIMap is marked non-null but is null");
        }
        if (runningJobStateTimestampsIMap == null) {
            throw new NullPointerException("runningJobStateTimestampsIMap is marked non-null but is null");
        }
        this.jobImmutableInformation = jobImmutableInformation;
        this.jobId = jobImmutableInformation.getJobId();
        Long[] stateTimestamps = new Long[JobStatus.values().length];
        if (runningJobStateTimestampsIMap.get((Object)this.jobId) == null) {
            stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
            runningJobStateTimestampsIMap.put((Object)this.jobId, (Object)stateTimestamps);
        }
        if (runningJobStateIMap.get((Object)this.jobId) == null) {
            stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
            runningJobStateTimestampsIMap.put((Object)this.jobId, (Object)stateTimestamps);
            runningJobStateIMap.put((Object)this.jobId, (Object)JobStatus.CREATED);
        }
        this.pipelineList = pipelineList;
        if (pipelineList.isEmpty()) {
            throw new UnknownPhysicalPlanException("The physical plan didn't have any can execute pipeline");
        }
        this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(), jobImmutableInformation.getJobId());
        this.runningJobStateIMap = runningJobStateIMap;
        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
    }

    public void setJobMaster(JobMaster jobMaster) {
        this.jobMaster = jobMaster;
        this.pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
    }

    public PassiveCompletableFuture<JobResult> initStateFuture() {
        this.jobEndFuture = new CompletableFuture();
        this.pipelineList.forEach(this::addPipelineEndCallback);
        return new PassiveCompletableFuture(this.jobEndFuture);
    }

    public void addPipelineEndCallback(SubPlan subPlan) {
        PassiveCompletableFuture<PipelineExecutionState> future = subPlan.initStateFuture();
        future.thenAcceptAsync(pipelineState -> {
            try {
                log.info("{} future complete with state {}", (Object)subPlan.getPipelineFullName(), (Object)pipelineState.getPipelineStatus());
                if (PipelineStatus.CANCELED.equals((Object)pipelineState.getPipelineStatus())) {
                    this.canceledPipelineNum.incrementAndGet();
                } else if (PipelineStatus.FAILED.equals((Object)pipelineState.getPipelineStatus())) {
                    this.failedPipelineNum.incrementAndGet();
                    this.errorBySubPlan.compareAndSet(null, pipelineState.getThrowableMsg());
                    if (this.makeJobEndWhenPipelineEnded) {
                        log.info(String.format("cancel job %s because makeJobEndWhenPipelineEnded is true", this.jobFullName));
                        this.updateJobState(JobStatus.FAILING);
                    }
                }
                if (this.finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
                    if (this.failedPipelineNum.get() > 0) {
                        JobStatus jobStatus = JobStatus.FAILED;
                        this.updateJobState(jobStatus);
                    } else if (this.canceledPipelineNum.get() > 0) {
                        JobStatus jobStatus = JobStatus.CANCELED;
                        this.updateJobState(jobStatus);
                    } else {
                        JobStatus jobStatus = this.getJobStatus() == JobStatus.DOING_SAVEPOINT ? JobStatus.SAVEPOINT_DONE : JobStatus.FINISHED;
                        this.updateJobState(jobStatus);
                    }
                }
            }
            catch (Throwable e) {
                log.error(ExceptionUtils.getMessage((Throwable)e));
            }
        }, (Executor)this.jobMaster.getExecutorService());
    }

    public void cancelJob() {
        if (this.getJobStatus().isEndState()) {
            log.warn(String.format("%s is in end state %s, can not be cancel", this.jobFullName, this.getJobStatus()));
            return;
        }
        if (this.runningJobStateIMap.get((Object)this.jobId) == JobStatus.PENDING) {
            this.updateJobState(JobStatus.CANCELED);
        } else {
            this.updateJobState(JobStatus.CANCELING);
        }
    }

    public void savepointJob() {
        if (this.getJobStatus().isEndState()) {
            log.warn(String.format("%s is in end state %s, can not do savepoint", this.jobFullName, this.getJobStatus()));
            return;
        }
        this.updateJobState(JobStatus.DOING_SAVEPOINT);
    }

    public List<SubPlan> getPipelineList() {
        return this.pipelineList;
    }

    private void updateStateTimestamps(@NonNull JobStatus targetState) {
        if (targetState == null) {
            throw new NullPointerException("targetState is marked non-null but is null");
        }
        Long[] stateTimestamps = (Long[])this.runningJobStateTimestampsIMap.get((Object)this.jobId);
        stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
        this.runningJobStateTimestampsIMap.set((Object)this.jobId, (Object)stateTimestamps);
    }

    public synchronized void updateJobState(@NonNull JobStatus targetState) {
        block5: {
            if (targetState == null) {
                throw new NullPointerException("targetState is marked non-null but is null");
            }
            try {
                JobStatus current = (JobStatus)this.runningJobStateIMap.get((Object)this.jobId);
                log.debug(String.format("Try to update the %s state from %s to %s", this.jobFullName, current, targetState));
                if (current.equals((Object)targetState)) {
                    log.info("{} current state equals target state: {}, skip", (Object)this.jobFullName, (Object)targetState);
                    return;
                }
                if (current.isEndState()) {
                    String message = "Job is trying to leave terminal state " + current;
                    throw new SeaTunnelEngineException(message);
                }
                RetryUtils.retryWithException(() -> {
                    this.updateStateTimestamps(targetState);
                    this.runningJobStateIMap.set((Object)this.jobId, (Object)targetState);
                    return null;
                }, (RetryUtils.RetryMaterial)new RetryUtils.RetryMaterial(30, true, ExceptionUtil::isOperationNeedRetryException, 2000L));
                log.info(String.format("%s turned from state %s to %s.", this.jobFullName, current, targetState));
                this.stateProcess();
            }
            catch (Exception e) {
                log.error(ExceptionUtils.getMessage((Throwable)e));
                if (targetState.equals((Object)JobStatus.FAILING)) break block5;
                this.makeJobFailing(e);
            }
        }
    }

    public JobImmutableInformation getJobImmutableInformation() {
        return this.jobImmutableInformation;
    }

    public JobStatus getJobStatus() {
        return (JobStatus)this.runningJobStateIMap.get((Object)this.jobId);
    }

    public String getJobFullName() {
        return this.jobFullName;
    }

    public void makeJobFailing(Throwable e) {
        this.errorBySubPlan.compareAndSet(null, ExceptionUtils.getMessage((Throwable)e));
        this.updateJobState(JobStatus.FAILING);
    }

    public void startJob() {
        this.isRunning = true;
        log.info("{} state process is start", (Object)this.getJobFullName());
        this.stateProcess();
    }

    public void stopJobStateProcess() {
        this.isRunning = false;
        log.info("{} state process is stop", (Object)this.getJobFullName());
    }

    private synchronized void stateProcess() {
        if (!this.isRunning) {
            log.warn(String.format("%s state process is stopped", this.jobFullName));
            return;
        }
        switch (this.getJobStatus()) {
            case CREATED: {
                this.updateJobState(JobStatus.SCHEDULED);
                break;
            }
            case PENDING: 
            case SCHEDULED: {
                this.getPipelineList().forEach(subPlan -> {
                    if (PipelineStatus.CREATED.equals((Object)subPlan.getCurrPipelineStatus())) {
                        subPlan.startSubPlanStateProcess();
                    }
                });
                this.updateJobState(JobStatus.RUNNING);
                break;
            }
            case RUNNING: 
            case DOING_SAVEPOINT: {
                break;
            }
            case FAILING: 
            case CANCELING: {
                this.jobMaster.neverNeedRestore();
                this.getPipelineList().forEach(SubPlan::cancelPipeline);
                break;
            }
            case FAILED: 
            case CANCELED: 
            case SAVEPOINT_DONE: 
            case FINISHED: {
                this.stopJobStateProcess();
                this.jobEndFuture.complete((Object)new JobResult(this.getJobStatus(), this.errorBySubPlan.get()));
                return;
            }
            default: {
                throw new IllegalArgumentException("Unknown Job State: " + this.getJobStatus());
            }
        }
    }

    public void completeJobEndFuture(JobResult jobResult) {
        this.jobEndFuture.complete((Object)jobResult);
    }

    public Map<TaskGroupLocation, CompletableFuture<SlotProfile>> getPreApplyResourceFutures() {
        return this.preApplyResourceFutures;
    }

    public void setPreApplyResourceFutures(Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourceFutures) {
        this.preApplyResourceFutures = preApplyResourceFutures;
    }
}

