/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.dag.physical;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.exception.TaskGroupDeployException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskDeployState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.CheckTaskGroupIsExecutingOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhysicalVertex {
    private static final Logger log = LoggerFactory.getLogger(PhysicalVertex.class);
    private final TaskGroupLocation taskGroupLocation;
    private final String taskFullName;
    private final TaskGroupDefaultImpl taskGroup;
    private final FlakeIdGenerator flakeIdGenerator;
    private final List<Set<URL>> pluginJarsUrls;
    private final List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers;
    private final IMap<Object, Object> runningJobStateIMap;
    private CompletableFuture<TaskExecutionState> taskFuture;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private final NodeEngine nodeEngine;
    private JobMaster jobMaster;
    private volatile ExecutionState currExecutionState;
    public volatile boolean isRunning = false;
    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference();

    public PhysicalVertex(int subTaskGroupIndex, int parallelism, @NonNull TaskGroupDefaultImpl taskGroup, @NonNull FlakeIdGenerator flakeIdGenerator, int pipelineId, int totalPipelineNum, List<Set<URL>> pluginJarsUrls, List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers, @NonNull JobImmutableInformation jobImmutableInformation, long initializationTimestamp, @NonNull NodeEngine nodeEngine, @NonNull IMap runningJobStateIMap, @NonNull IMap runningJobStateTimestampsIMap) {
        if (taskGroup == null) {
            throw new NullPointerException("taskGroup is marked non-null but is null");
        }
        if (flakeIdGenerator == null) {
            throw new NullPointerException("flakeIdGenerator is marked non-null but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked non-null but is null");
        }
        if (nodeEngine == null) {
            throw new NullPointerException("nodeEngine is marked non-null but is null");
        }
        if (runningJobStateIMap == null) {
            throw new NullPointerException("runningJobStateIMap is marked non-null but is null");
        }
        if (runningJobStateTimestampsIMap == null) {
            throw new NullPointerException("runningJobStateTimestampsIMap is marked non-null but is null");
        }
        this.taskGroupLocation = taskGroup.getTaskGroupLocation();
        this.taskGroup = taskGroup;
        this.flakeIdGenerator = flakeIdGenerator;
        this.pluginJarsUrls = pluginJarsUrls;
        this.connectorJarIdentifiers = connectorJarIdentifiers;
        Long[] stateTimestamps = new Long[ExecutionState.values().length];
        if (runningJobStateTimestampsIMap.get((Object)taskGroup.getTaskGroupLocation()) == null) {
            stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = initializationTimestamp;
            runningJobStateTimestampsIMap.put((Object)taskGroup.getTaskGroupLocation(), (Object)stateTimestamps);
        }
        if (runningJobStateIMap.get((Object)this.taskGroupLocation) == null) {
            stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
            runningJobStateTimestampsIMap.put((Object)this.taskGroupLocation, (Object)stateTimestamps);
            runningJobStateIMap.put((Object)this.taskGroupLocation, (Object)ExecutionState.CREATED);
        }
        this.currExecutionState = (ExecutionState)runningJobStateIMap.get((Object)this.taskGroupLocation);
        this.nodeEngine = nodeEngine;
        this.taskFullName = String.format("Job (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]", jobImmutableInformation.getJobId(), pipelineId, totalPipelineNum, taskGroup.getTaskGroupName(), subTaskGroupIndex + 1, parallelism, this.taskGroupLocation);
        this.taskFuture = new CompletableFuture();
        this.runningJobStateIMap = runningJobStateIMap;
        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
    }

    public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
        this.taskFuture = new CompletableFuture();
        this.currExecutionState = (ExecutionState)this.runningJobStateIMap.get((Object)this.taskGroupLocation);
        if (this.currExecutionState != null) {
            log.info(String.format("The task %s is in state %s when init state future", this.taskFullName, this.currExecutionState));
        }
        if (ExecutionState.RUNNING.equals(this.currExecutionState)) {
            if (!this.checkTaskGroupIsExecuting(this.taskGroupLocation)) {
                this.updateTaskState(ExecutionState.FAILING);
            }
        } else if (ExecutionState.DEPLOYING.equals(this.currExecutionState) && !this.checkTaskGroupIsExecuting(this.taskGroupLocation)) {
            this.updateTaskState(ExecutionState.FAILING);
        }
        return new PassiveCompletableFuture(this.taskFuture);
    }

    public void restoreExecutionState() {
        this.startPhysicalVertex();
        this.stateProcess();
    }

    private boolean checkTaskGroupIsExecuting(TaskGroupLocation taskGroupLocation) {
        IMap ownedSlotProfilesIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_ownedSlotProfilesIMap");
        SlotProfile slotProfile = this.getOwnedSlotProfilesByTaskGroup(taskGroupLocation, (IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>>)ownedSlotProfilesIMap);
        if (null != slotProfile) {
            Address worker = slotProfile.getWorker();
            List members = this.nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress).collect(Collectors.toList());
            if (!members.contains(worker)) {
                log.warn("The node:{} running the taskGroup {} no longer exists, return false.", (Object)worker.toString(), (Object)taskGroupLocation);
                return false;
            }
            InvocationFuture invoke = this.nodeEngine.getOperationService().createInvocationBuilder("st:impl:seaTunnelServer", (Operation)new CheckTaskGroupIsExecutingOperation(taskGroupLocation), worker).invoke();
            try {
                return (Boolean)invoke.get();
            }
            catch (InterruptedException | ExecutionException e) {
                log.warn("Execution of CheckTaskGroupIsExecutingOperation {} failed, checkTaskGroupIsExecuting return false. ", (Object)taskGroupLocation, (Object)e);
            }
        }
        return false;
    }

    private SlotProfile getOwnedSlotProfilesByTaskGroup(TaskGroupLocation taskGroupLocation, IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap) {
        PipelineLocation pipelineLocation = taskGroupLocation.getPipelineLocation();
        try {
            return (SlotProfile)((Map)ownedSlotProfilesIMap.get((Object)pipelineLocation)).get(taskGroupLocation);
        }
        catch (NullPointerException nullPointerException) {
            return null;
        }
    }

    private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) throws Exception {
        if (slotProfile == null) {
            throw new NullPointerException("slotProfile is marked non-null but is null");
        }
        return this.deployInternal(taskGroupImmutableInformation -> {
            SeaTunnelServer server = (SeaTunnelServer)this.nodeEngine.getService("st:impl:seaTunnelServer");
            return server.getSlotService().getSlotContext(slotProfile).getTaskExecutionService().deployTask((TaskGroupImmutableInformation)taskGroupImmutableInformation);
        });
    }

    private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) {
        if (slotProfile == null) {
            throw new NullPointerException("slotProfile is marked non-null but is null");
        }
        return this.deployInternal(taskGroupImmutableInformation -> {
            try {
                return (TaskDeployState)NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, new DeployTaskOperation(slotProfile, this.nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)), slotProfile.getWorker()).get();
            }
            catch (Exception e) {
                if (this.getExecutionState().isEndState()) {
                    log.warn(ExceptionUtils.getMessage((Throwable)e));
                    log.warn(String.format("%s deploy error, but the state is already in end state %s, skip this error", this.getTaskFullName(), this.currExecutionState));
                    return TaskDeployState.success();
                }
                return TaskDeployState.failed(e);
            }
        });
    }

    public void makeTaskGroupDeploy() {
        this.updateTaskState(ExecutionState.DEPLOYING);
    }

    public TaskDeployState deploy(@NonNull SlotProfile slotProfile) {
        if (slotProfile == null) {
            throw new NullPointerException("slotProfile is marked non-null but is null");
        }
        try {
            if (slotProfile.getWorker().equals((Object)this.nodeEngine.getThisAddress())) {
                return this.deployOnLocal(slotProfile);
            }
            return this.deployOnRemote(slotProfile);
        }
        catch (Throwable th) {
            return TaskDeployState.failed(th);
        }
    }

    private TaskDeployState deployInternal(Function<TaskGroupImmutableInformation, TaskDeployState> taskGroupConsumer) {
        TaskGroupImmutableInformation taskGroupImmutableInformation = this.getTaskGroupImmutableInformation();
        TaskDeployState state = taskGroupConsumer.apply(taskGroupImmutableInformation);
        this.updateTaskState(ExecutionState.RUNNING);
        return state;
    }

    @VisibleForTesting
    public TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
        List<Data> tasksData = this.taskGroup.getTasks().stream().map(task -> this.nodeEngine.getSerializationService().toData(task)).collect(Collectors.toList());
        return new TaskGroupImmutableInformation(this.taskGroup.getTaskGroupLocation().getJobId(), this.flakeIdGenerator.newId(), this.taskGroup.getTaskGroupType(), this.taskGroup.getTaskGroupLocation(), this.taskGroup.getTaskGroupName(), tasksData, this.pluginJarsUrls, this.connectorJarIdentifiers);
    }

    @VisibleForTesting
    public TaskGroup getTaskGroup() {
        return this.taskGroup;
    }

    public synchronized void updateTaskState(@NonNull ExecutionState targetState) {
        block5: {
            if (targetState == null) {
                throw new NullPointerException("targetState is marked non-null but is null");
            }
            try {
                ExecutionState current = (ExecutionState)this.runningJobStateIMap.get((Object)this.taskGroupLocation);
                log.debug(String.format("Try to update the task %s state from %s to %s", this.taskFullName, current, targetState));
                if (current.equals(targetState)) {
                    log.info("{} current state equals target state: {}, skip", (Object)this.taskFullName, (Object)targetState);
                    return;
                }
                if (current.isEndState()) {
                    String message = "Task is trying to leave terminal state " + current;
                    log.error(message);
                    return;
                }
                RetryUtils.retryWithException(() -> {
                    this.updateStateTimestamps(targetState);
                    this.runningJobStateIMap.set((Object)this.taskGroupLocation, (Object)targetState);
                    return null;
                }, (RetryUtils.RetryMaterial)new RetryUtils.RetryMaterial(30, true, ExceptionUtil::isOperationNeedRetryException, 2000L));
                this.currExecutionState = targetState;
                log.info(String.format("%s turned from state %s to %s.", this.taskFullName, current, targetState));
                this.stateProcess();
            }
            catch (Exception e) {
                log.error(ExceptionUtils.getMessage((Throwable)e));
                if (targetState.equals(ExecutionState.FAILING)) break block5;
                this.makeTaskGroupFailing(e);
            }
        }
    }

    public synchronized void cancel() {
        if (!this.getExecutionState().isEndState()) {
            this.updateTaskState(ExecutionState.CANCELING);
        }
    }

    private void noticeTaskExecutionServiceCancel() {
        if (!this.checkTaskGroupIsExecuting(this.taskGroupLocation)) {
            this.updateTaskState(ExecutionState.CANCELED);
            return;
        }
        int i = 0;
        while (!this.taskFuture.isDone()) {
            Address executionAddress = this.getCurrentExecutionAddress();
            if (this.nodeEngine.getClusterService().getMember(executionAddress) == null) break;
            try {
                ++i;
                log.info(String.format("Send cancel %s operator to member %s", this.taskFullName, executionAddress));
                this.nodeEngine.getOperationService().createInvocationBuilder("st:impl:seaTunnelServer", (Operation)new CancelTaskOperation(this.taskGroupLocation), executionAddress).invoke().get();
                return;
            }
            catch (Exception e) {
                log.warn(String.format("%s cancel failed with Exception: %s, retry %s", this.getTaskFullName(), ExceptionUtils.getMessage((Throwable)e), i));
                try {
                    Thread.sleep(2000L);
                }
                catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    private void updateStateTimestamps(@NonNull ExecutionState targetState) {
        if (targetState == null) {
            throw new NullPointerException("targetState is marked non-null but is null");
        }
        Long[] stateTimestamps = (Long[])this.runningJobStateTimestampsIMap.get((Object)this.taskGroupLocation);
        stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
        this.runningJobStateTimestampsIMap.set((Object)this.taskGroupLocation, (Object)stateTimestamps);
    }

    public ExecutionState getExecutionState() {
        return this.currExecutionState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetExecutionState() {
        PhysicalVertex physicalVertex = this;
        synchronized (physicalVertex) {
            ExecutionState executionState = this.getExecutionState();
            if (!executionState.isEndState()) {
                String message = String.format("%s reset state failed, only end state can be reset, current is %s", this.getTaskFullName(), executionState);
                log.error(message);
                throw new IllegalStateException(message);
            }
            try {
                RetryUtils.retryWithException(() -> {
                    this.updateStateTimestamps(ExecutionState.CREATED);
                    this.runningJobStateIMap.set((Object)this.taskGroupLocation, (Object)ExecutionState.CREATED);
                    this.errorByPhysicalVertex = new AtomicReference();
                    return null;
                }, (RetryUtils.RetryMaterial)new RetryUtils.RetryMaterial(30, true, ExceptionUtil::isOperationNeedRetryException, 2000L));
            }
            catch (Exception e) {
                log.warn(ExceptionUtils.getMessage((Throwable)e));
                log.warn(String.format("Set %s state %s to Imap failed, skip.", this.getTaskFullName(), ExecutionState.CREATED));
            }
            this.currExecutionState = ExecutionState.CREATED;
            log.info(String.format("%s turn to state %s.", this.taskFullName, ExecutionState.CREATED));
        }
    }

    public void reset() {
        this.resetExecutionState();
    }

    public String getTaskFullName() {
        return this.taskFullName;
    }

    public void updateStateByExecutionService(TaskExecutionState taskExecutionState) {
        if (!taskExecutionState.getExecutionState().isEndState()) {
            throw new SeaTunnelEngineException(String.format("The state must be end state from ExecutionService, can not be %s", taskExecutionState.getExecutionState()));
        }
        this.errorByPhysicalVertex.compareAndSet(null, taskExecutionState.getThrowableMsg());
        this.updateTaskState(taskExecutionState.getExecutionState());
    }

    public Address getCurrentExecutionAddress() {
        SlotProfile ownedSlotProfiles = this.jobMaster.getOwnedSlotProfiles(this.taskGroupLocation);
        if (ownedSlotProfiles == null) {
            return null;
        }
        return ownedSlotProfiles.getWorker();
    }

    public TaskGroupLocation getTaskGroupLocation() {
        return this.taskGroupLocation;
    }

    public void setJobMaster(JobMaster jobMaster) {
        this.jobMaster = jobMaster;
    }

    public void startPhysicalVertex() {
        this.isRunning = true;
        log.info(String.format("%s state process is start", this.taskFullName));
    }

    public void stopPhysicalVertex() {
        this.isRunning = false;
        log.info(String.format("%s state process is stopped", this.taskFullName));
    }

    public synchronized void stateProcess() {
        if (!this.isRunning) {
            log.warn(String.format("%s state process is not start", this.taskFullName));
            return;
        }
        switch (this.getExecutionState()) {
            case INITIALIZING: 
            case CREATED: 
            case RUNNING: {
                break;
            }
            case DEPLOYING: {
                TaskDeployState deployState = this.deploy(this.jobMaster.getOwnedSlotProfiles(this.taskGroupLocation));
                if (!deployState.isSuccess()) {
                    this.makeTaskGroupFailing((Throwable)new TaskGroupDeployException(deployState.getThrowableMsg()));
                    break;
                }
                this.updateTaskState(ExecutionState.RUNNING);
                break;
            }
            case FAILING: {
                this.updateTaskState(ExecutionState.FAILED);
                break;
            }
            case CANCELING: {
                this.noticeTaskExecutionServiceCancel();
                break;
            }
            case CANCELED: {
                this.stopPhysicalVertex();
                this.taskFuture.complete((Object)new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, this.errorByPhysicalVertex.get()));
                return;
            }
            case FAILED: {
                this.stopPhysicalVertex();
                log.error(String.format("%s end with state %s and Exception: %s", this.taskFullName, ExecutionState.FAILED, this.errorByPhysicalVertex.get()));
                this.taskFuture.complete((Object)new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, this.errorByPhysicalVertex.get()));
                return;
            }
            case FINISHED: {
                this.stopPhysicalVertex();
                this.taskFuture.complete((Object)new TaskExecutionState(this.taskGroupLocation, ExecutionState.FINISHED, this.errorByPhysicalVertex.get()));
                return;
            }
            default: {
                throw new IllegalArgumentException("Unknown TaskGroup State: " + this.getExecutionState());
            }
        }
    }

    public void makeTaskGroupFailing(Throwable err) {
        this.errorByPhysicalVertex.compareAndSet(null, ExceptionUtils.getMessage((Throwable)err));
        this.updateTaskState(ExecutionState.FAILING);
    }
}

