/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server;

import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.tracing.MDCExecutorService;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.EventService;
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskCallTimer;
import org.apache.seatunnel.engine.server.execution.TaskDeployState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupUtils;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;

public class TaskExecutionService
implements DynamicMetricsProvider {
    private final String hzInstanceName;
    private final NodeEngineImpl nodeEngine;
    private final ClassLoaderService classLoaderService;
    private final ILogger logger;
    private volatile boolean isRunning = true;
    private final LinkedBlockingDeque<TaskTracker> threadShareTaskQueue = new LinkedBlockingDeque();
    private final ExecutorService executorService = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());
    private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(this.executorService, this.threadShareTaskQueue);
    private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<TaskGroupLocation, TaskGroupContext>();
    private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> finishedExecutionContexts = new ConcurrentHashMap<TaskGroupLocation, TaskGroupContext>();
    private final ConcurrentMap<TaskGroupLocation, Map<String, CompletableFuture<?>>> taskAsyncFunctionFuture = new ConcurrentHashMap();
    private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<TaskGroupLocation, CompletableFuture<Void>>();
    private final SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
    private final ScheduledExecutorService scheduledExecutorService;
    private final ServerConnectorPackageClient serverConnectorPackageClient;
    private final EventService eventService;

    public TaskExecutionService(ClassLoaderService classLoaderService, NodeEngineImpl nodeEngine, EventService eventService) {
        this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
        this.nodeEngine = nodeEngine;
        this.classLoaderService = classLoaderService;
        this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
        MetricsRegistry registry = nodeEngine.getMetricsRegistry();
        MetricDescriptor descriptor = registry.newMetricDescriptor().withTag("service", this.getClass().getSimpleName());
        registry.registerStaticMetrics(descriptor, (Object)this);
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.scheduledExecutorService.scheduleAtFixedRate(this::updateMetricsContextInImap, 0L, this.seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(), TimeUnit.SECONDS);
        this.serverConnectorPackageClient = new ServerConnectorPackageClient(nodeEngine, this.seaTunnelConfig);
        this.eventService = eventService;
    }

    public void start() {
        this.runBusWorkSupplier.runNewBusWork(false);
    }

    public void shutdown() {
        this.isRunning = false;
        this.executorService.shutdownNow();
        this.scheduledExecutorService.shutdown();
    }

    public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) {
        TaskGroupContext taskGroupContext = (TaskGroupContext)this.executionContexts.get(taskGroupLocation);
        if (taskGroupContext == null) {
            taskGroupContext = (TaskGroupContext)this.finishedExecutionContexts.get(taskGroupLocation);
        }
        if (taskGroupContext == null) {
            throw new TaskGroupContextNotFoundException(String.format("task group %s not found.", taskGroupLocation));
        }
        return taskGroupContext;
    }

    public TaskGroupContext getActiveExecutionContext(TaskGroupLocation taskGroupLocation) {
        TaskGroupContext taskGroupContext = (TaskGroupContext)this.executionContexts.get(taskGroupLocation);
        if (taskGroupContext == null) {
            throw new TaskGroupContextNotFoundException(String.format("task group %s not found.", taskGroupLocation));
        }
        return taskGroupContext;
    }

    private void submitThreadShareTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
        Stream<TaskTracker> taskTrackerStream = tasks.stream().map(t -> {
            if (!taskGroupExecutionTracker.executionCompletedExceptionally()) {
                try {
                    TaskTracker taskTracker = new TaskTracker((Task)t, taskGroupExecutionTracker);
                    taskTracker.task.init();
                    return taskTracker;
                }
                catch (Exception e) {
                    taskGroupExecutionTracker.exception(e);
                    taskGroupExecutionTracker.taskDone((Task)t);
                }
            }
            return null;
        });
        if (!taskGroupExecutionTracker.executionCompletedExceptionally()) {
            taskTrackerStream.forEach(this.threadShareTaskQueue::add);
        }
    }

    private void submitBlockingTask(TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
        MDCExecutorService mdcExecutorService = MDCTracer.tracing((ExecutorService)this.executorService);
        CountDownLatch startedLatch = new CountDownLatch(tasks.size());
        taskGroupExecutionTracker.blockingFutures = tasks.stream().map(t -> new BlockingWorker(new TaskTracker((Task)t, taskGroupExecutionTracker), startedLatch)).map(r -> new NamedTaskWrapper((Runnable)r, "BlockingWorker-" + taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())).map(arg_0 -> ((MDCExecutorService)mdcExecutorService).submit(arg_0)).collect(Collectors.toList());
        Util.uncheckRun(startedLatch::await);
    }

    public TaskDeployState deployTask(@NonNull Data taskImmutableInformation) {
        if (taskImmutableInformation == null) {
            throw new NullPointerException("taskImmutableInformation is marked non-null but is null");
        }
        TaskGroupImmutableInformation taskImmutableInfo = (TaskGroupImmutableInformation)this.nodeEngine.getSerializationService().toObject((Object)taskImmutableInformation);
        return this.deployTask(taskImmutableInfo);
    }

    public <T extends Task> T getTask(@NonNull TaskLocation taskLocation) {
        if (taskLocation == null) {
            throw new NullPointerException("taskLocation is marked non-null but is null");
        }
        TaskGroupContext executionContext = this.getActiveExecutionContext(taskLocation.getTaskGroupLocation());
        return executionContext.getTaskGroup().getTask(taskLocation.getTaskID());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImmutableInfo) {
        if (taskImmutableInfo == null) {
            throw new NullPointerException("taskImmutableInfo is marked non-null but is null");
        }
        this.logger.info(String.format("received deploying task executionId [%s]", taskImmutableInfo.getExecutionId()));
        TaskGroup taskGroup = null;
        try {
            List<Set<ConnectorJarIdentifier>> connectorJarIdentifiersList = taskImmutableInfo.getConnectorJarIdentifiers();
            List<Data> taskData = taskImmutableInfo.getTasksData();
            ConcurrentHashMap<Long, ClassLoader> classLoaders = new ConcurrentHashMap<Long, ClassLoader>();
            ArrayList<Task> tasks = new ArrayList<Task>();
            ConcurrentHashMap<Long, Collection<URL>> taskJars = new ConcurrentHashMap<Long, Collection<URL>>();
            for (int i = 0; i < taskData.size(); ++i) {
                Set<Object> jars = new HashSet();
                Set<ConnectorJarIdentifier> connectorJarIdentifiers = connectorJarIdentifiersList.get(i);
                if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
                    jars = this.serverConnectorPackageClient.getConnectorJarFromLocal(connectorJarIdentifiers);
                } else if (!CollectionUtils.isEmpty((Collection)taskImmutableInfo.getJars().get(i))) {
                    jars = taskImmutableInfo.getJars().get(i);
                }
                ClassLoader classLoader = this.classLoaderService.getClassLoader(taskImmutableInfo.getJobId(), (Collection)Lists.newArrayList(jars));
                Task task = jars.isEmpty() ? (Task)this.nodeEngine.getSerializationService().toObject((Object)taskData.get(i)) : (Task)CustomClassLoadedObject.deserializeWithCustomClassLoader((SerializationService)this.nodeEngine.getSerializationService(), (ClassLoader)classLoader, (Data)taskData.get(i));
                tasks.add(task);
                classLoaders.put(task.getTaskID(), classLoader);
                taskJars.put(task.getTaskID(), jars);
            }
            taskGroup = TaskGroupUtils.createTaskGroup(taskImmutableInfo.getTaskGroupType(), taskImmutableInfo.getTaskGroupLocation(), taskImmutableInfo.getTaskGroupName(), tasks);
            this.logger.info(String.format("deploying task %s, executionId [%s]", taskGroup.getTaskGroupLocation(), taskImmutableInfo.getExecutionId()));
            TaskExecutionService taskExecutionService = this;
            synchronized (taskExecutionService) {
                if (this.executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
                    throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupLocation()));
                }
                this.deployLocalTask(taskGroup, classLoaders, taskJars);
                return TaskDeployState.success();
            }
        }
        catch (Throwable t) {
            this.logger.severe(String.format("TaskGroupID : %s  deploy error with Exception: %s", taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null", ExceptionUtils.getMessage((Throwable)t)));
            return TaskDeployState.failed(t);
        }
    }

    public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(@NonNull TaskGroup taskGroup, @NonNull ConcurrentHashMap<Long, ClassLoader> classLoaders, ConcurrentHashMap<Long, Collection<URL>> jars) {
        if (taskGroup == null) {
            throw new NullPointerException("taskGroup is marked non-null but is null");
        }
        if (classLoaders == null) {
            throw new NullPointerException("classLoaders is marked non-null but is null");
        }
        CompletableFuture resultFuture = new CompletableFuture();
        try {
            taskGroup.init();
            this.logger.info(String.format("deploying TaskGroup %s init success", taskGroup.getTaskGroupLocation()));
            Collection<Task> tasks = taskGroup.getTasks();
            CompletableFuture cancellationFuture = new CompletableFuture();
            TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker((CompletableFuture<Void>)cancellationFuture, taskGroup, (CompletableFuture<TaskExecutionState>)resultFuture);
            ConcurrentHashMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<Long, TaskExecutionContext>();
            Map<Boolean, List<Task>> byCooperation = tasks.stream().peek(task -> {
                TaskExecutionContext taskExecutionContext = new TaskExecutionContext((Task)task, this.nodeEngine, this);
                task.setTaskExecutionContext(taskExecutionContext);
                taskExecutionContextMap.put(task.getTaskID(), taskExecutionContext);
            }).collect(Collectors.partitioningBy(t -> {
                ThreadShareMode mode = this.seaTunnelConfig.getEngineConfig().getTaskExecutionThreadShareMode();
                if (mode.equals((Object)ThreadShareMode.ALL)) {
                    return true;
                }
                if (mode.equals((Object)ThreadShareMode.OFF)) {
                    return false;
                }
                if (mode.equals((Object)ThreadShareMode.PART)) {
                    return t.isThreadsShare();
                }
                return true;
            }));
            this.executionContexts.put(taskGroup.getTaskGroupLocation(), new TaskGroupContext(taskGroup, classLoaders, jars));
            this.cancellationFutures.put(taskGroup.getTaskGroupLocation(), (CompletableFuture<Void>)cancellationFuture);
            this.submitThreadShareTask(executionTracker, byCooperation.get(true));
            this.submitBlockingTask(executionTracker, byCooperation.get(false));
            taskGroup.setTasksContext(taskExecutionContextMap);
            this.logger.info(String.format("deploying TaskGroup %s success", taskGroup.getTaskGroupLocation()));
        }
        catch (Throwable t2) {
            this.logger.severe(ExceptionUtils.getMessage((Throwable)t2));
            resultFuture.completeExceptionally(t2);
        }
        resultFuture.whenCompleteAsync(ExceptionUtil.withTryCatch((ILogger)this.logger, (r, s) -> {
            if (s != null) {
                this.logger.severe(String.format("Task %s complete with error %s", taskGroup.getTaskGroupLocation(), ExceptionUtils.getMessage((Throwable)s)));
            }
            if (r == null) {
                r = new TaskExecutionState(taskGroup.getTaskGroupLocation(), ExecutionState.FAILED, (Throwable)s);
            }
            this.logger.info(String.format("Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState()));
            this.notifyTaskStatusToMaster(taskGroup.getTaskGroupLocation(), (TaskExecutionState)r);
        }), (Executor)MDCTracer.tracing((ExecutorService)this.executorService));
        return new PassiveCompletableFuture(resultFuture);
    }

    private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation, TaskExecutionState taskExecutionState) {
        long sleepTime = 1000L;
        boolean notifyStateSuccess = false;
        while (this.isRunning && !notifyStateSuccess) {
            InvocationFuture invoke = this.nodeEngine.getOperationService().createInvocationBuilder("st:impl:seaTunnelServer", (Operation)new NotifyTaskStatusOperation(taskGroupLocation, taskExecutionState), this.nodeEngine.getMasterAddress()).invoke();
            try {
                invoke.get();
                notifyStateSuccess = true;
            }
            catch (InterruptedException e) {
                this.logger.severe("send notify task status failed", (Throwable)e);
            }
            catch (JobNotFoundException e) {
                this.logger.warning("send notify task status failed because can't find job", (Throwable)e);
                notifyStateSuccess = true;
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof JobNotFoundException) {
                    this.logger.warning("send notify task status failed because can't find job", (Throwable)e);
                    notifyStateSuccess = true;
                    continue;
                }
                this.logger.warning(ExceptionUtils.getMessage((Throwable)e));
                this.logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis", taskGroupLocation, sleepTime));
                try {
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException ex) {
                    this.logger.severe((Throwable)e);
                }
            }
        }
    }

    public void cancelTaskGroup(TaskGroupLocation taskGroupLocation) {
        this.logger.info(String.format("Task (%s) need cancel.", taskGroupLocation));
        if (this.cancellationFutures.containsKey(taskGroupLocation)) {
            try {
                ((CompletableFuture)this.cancellationFutures.get(taskGroupLocation)).cancel(false);
            }
            catch (CancellationException cancellationException) {}
        } else {
            this.logger.warning(String.format("need cancel taskId : %s is not exist", taskGroupLocation));
        }
    }

    public void asyncExecuteFunction(TaskGroupLocation taskGroupLocation, Runnable task) {
        String id = UUID.randomUUID().toString();
        this.logger.fine("accept async execute function from " + taskGroupLocation + " with id " + id);
        if (!this.taskAsyncFunctionFuture.containsKey(taskGroupLocation)) {
            this.taskAsyncFunctionFuture.put(taskGroupLocation, new ConcurrentHashMap());
        }
        CompletableFuture future = CompletableFuture.runAsync((Runnable)task, (Executor)MDCTracer.tracing((ExecutorService)this.executorService));
        ((Map)this.taskAsyncFunctionFuture.get(taskGroupLocation)).put(id, future);
        future.whenComplete((r, e) -> {
            ((Map)this.taskAsyncFunctionFuture.get(taskGroupLocation)).remove(id);
            this.logger.fine("remove async execute function from " + taskGroupLocation + " with id " + id);
        });
    }

    public void notifyCleanTaskGroupContext(TaskGroupLocation taskGroupLocation) {
        this.finishedExecutionContexts.remove(taskGroupLocation);
    }

    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        try {
            MetricDescriptor copy1 = descriptor.copy().withTag("service", this.getClass().getSimpleName());
            HashMap<TaskGroupLocation, TaskGroupContext> contextMap = new HashMap<TaskGroupLocation, TaskGroupContext>();
            contextMap.putAll(this.finishedExecutionContexts);
            contextMap.putAll(this.executionContexts);
            contextMap.forEach((taskGroupLocation, taskGroupContext) -> {
                MetricDescriptor copy2 = copy1.copy().withTag("taskGroupLocation", taskGroupLocation.toString()).withTag("jobId", String.valueOf(taskGroupLocation.getJobId())).withTag("pipelineId", String.valueOf(taskGroupLocation.getPipelineId())).withTag("taskGroupId", String.valueOf(taskGroupLocation.getTaskGroupId()));
                taskGroupContext.getTaskGroup().getTasks().forEach(task -> {
                    Long taskID = task.getTaskID();
                    MetricDescriptor copy3 = copy2.copy().withTag("taskID", String.valueOf(taskID));
                    task.provideDynamicMetrics(copy3, context);
                });
            });
        }
        catch (Throwable t) {
            this.logger.warning("Dynamic metric collection failed", t);
            throw t;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMetricsContextInImap() {
        block18: {
            if (!this.nodeEngine.getNode().getState().equals((Object)NodeState.ACTIVE)) {
                this.logger.warning(String.format("The Node is not ready yet, Node state %s,looking forward to the next scheduling", this.nodeEngine.getNode().getState()));
                return;
            }
            IMap metricsImap = this.nodeEngine.getHazelcastInstance().getMap("engine_runningJobMetrics");
            HashMap<TaskGroupLocation, TaskGroupContext> contextMap = new HashMap<TaskGroupLocation, TaskGroupContext>();
            contextMap.putAll(this.finishedExecutionContexts);
            contextMap.putAll(this.executionContexts);
            HashMap localMap = new HashMap();
            contextMap.forEach((taskGroupLocation, taskGroupContext) -> taskGroupContext.getTaskGroup().getTasks().forEach(task -> {
                SeaTunnelTask seaTunnelTask;
                if (task instanceof SeaTunnelTask && null != (seaTunnelTask = (SeaTunnelTask)task).getMetricsContext()) {
                    localMap.put(seaTunnelTask.getTaskLocation(), seaTunnelTask.getMetricsContext());
                }
            }));
            if (!localMap.isEmpty()) {
                boolean lockedIMap = false;
                try {
                    lockedIMap = metricsImap.tryLock((Object)Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5L, TimeUnit.SECONDS);
                    if (!lockedIMap) {
                        this.logger.warning("try lock failed in update metrics");
                        return;
                    }
                    HashMap centralMap = (HashMap)metricsImap.computeIfAbsent((Object)Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> new HashMap());
                    centralMap.putAll(localMap);
                    metricsImap.put((Object)Constant.IMAP_RUNNING_JOB_METRICS_KEY, (Object)centralMap);
                }
                catch (Exception e) {
                    this.logger.warning("The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time", (Throwable)e);
                }
                finally {
                    if (!lockedIMap) break block18;
                    boolean unLockedIMap = false;
                    while (!unLockedIMap) {
                        try {
                            metricsImap.unlock((Object)Constant.IMAP_RUNNING_JOB_METRICS_KEY);
                            unLockedIMap = true;
                        }
                        catch (OperationTimeoutException e) {
                            this.logger.warning("unlock imap failed in update metrics", (Throwable)e);
                        }
                    }
                }
            }
        }
        this.printTaskExecutionRuntimeInfo();
    }

    public void printTaskExecutionRuntimeInfo() {
        if (this.logger.isFineEnabled()) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)this.executorService;
            int activeCount = threadPoolExecutor.getActiveCount();
            int taskQueueSize = this.threadShareTaskQueue.size();
            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
            long taskCount = threadPoolExecutor.getTaskCount();
            this.logger.fine(StringFormatUtils.formatTable((Object[])new Object[]{"TaskExecutionServer Thread Pool Status", "activeCount", activeCount, "threadShareTaskQueueSize", taskQueueSize, "completedTaskCount", completedTaskCount, "taskCount", taskCount}));
        }
    }

    public void reportEvent(Event e) {
        this.eventService.reportEvent(e);
    }

    public ServerConnectorPackageClient getServerConnectorPackageClient() {
        return this.serverConnectorPackageClient;
    }

    public static class NamedTaskWrapper
    implements Runnable {
        private final Runnable task;
        private final String threadName;

        public NamedTaskWrapper(Runnable task, String threadName) {
            this.task = task;
            this.threadName = threadName;
        }

        @Override
        public void run() {
            Thread currentThread = Thread.currentThread();
            String originalName = currentThread.getName();
            try {
                currentThread.setName(this.threadName);
                this.task.run();
            }
            finally {
                currentThread.setName(originalName);
            }
        }
    }

    public final class TaskGroupExecutionTracker {
        private final TaskGroup taskGroup;
        final CompletableFuture<TaskExecutionState> future;
        volatile List<Future<?>> blockingFutures = Collections.emptyList();
        private final AtomicInteger completionLatch;
        private final AtomicReference<Throwable> executionException = new AtomicReference();
        private final AtomicBoolean isCancel = new AtomicBoolean(false);
        private final Map<Long, Future<?>> currRunningTaskFuture = new ConcurrentHashMap();

        TaskGroupExecutionTracker(@NonNull CompletableFuture<Void> cancellationFuture, @NonNull TaskGroup taskGroup, CompletableFuture<TaskExecutionState> future) {
            if (cancellationFuture == null) {
                throw new NullPointerException("cancellationFuture is marked non-null but is null");
            }
            if (taskGroup == null) {
                throw new NullPointerException("taskGroup is marked non-null but is null");
            }
            if (future == null) {
                throw new NullPointerException("future is marked non-null but is null");
            }
            this.future = future;
            this.completionLatch = new AtomicInteger(taskGroup.getTasks().size());
            this.taskGroup = taskGroup;
            cancellationFuture.whenComplete(ExceptionUtil.withTryCatch((ILogger)TaskExecutionService.this.logger, (r, e) -> {
                this.isCancel.set(true);
                if (e == null) {
                    e = new IllegalStateException("cancellationFuture should be completed exceptionally");
                }
                this.exception((Throwable)e);
                this.cancelAllTask(taskGroup.getTaskGroupLocation());
            }));
        }

        void exception(Throwable t) {
            this.executionException.compareAndSet(null, t);
        }

        private void cancelAllTask(TaskGroupLocation taskGroupLocation) {
            try {
                this.blockingFutures.forEach(f -> f.cancel(true));
                this.currRunningTaskFuture.values().forEach(f -> f.cancel(true));
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
            this.cancelAsyncFunction(taskGroupLocation);
        }

        private void cancelAsyncFunction(TaskGroupLocation taskGroupLocation) {
            try {
                if (TaskExecutionService.this.taskAsyncFunctionFuture.containsKey(taskGroupLocation)) {
                    ((Map)TaskExecutionService.this.taskAsyncFunctionFuture.remove(taskGroupLocation)).values().stream().filter(f -> !f.isDone()).filter(f -> !f.isCancelled()).forEach(f -> f.cancel(true));
                }
            }
            catch (CancellationException ignore) {
                TaskExecutionService.this.logger.warning(ExceptionUtils.getMessage((Throwable)ignore));
            }
        }

        void taskDone(Task task) {
            TaskGroupLocation taskGroupLocation = this.taskGroup.getTaskGroupLocation();
            TaskExecutionService.this.logger.info(String.format("taskDone, taskId = %d, taskGroup = %s", task.getTaskID(), taskGroupLocation));
            Throwable ex = this.executionException.get();
            if (this.completionLatch.decrementAndGet() == 0) {
                this.recycleClassLoader(taskGroupLocation);
                TaskExecutionService.this.finishedExecutionContexts.put(taskGroupLocation, TaskExecutionService.this.executionContexts.remove(taskGroupLocation));
                TaskExecutionService.this.cancellationFutures.remove(taskGroupLocation);
                try {
                    this.cancelAsyncFunction(taskGroupLocation);
                }
                catch (Throwable t) {
                    TaskExecutionService.this.logger.severe("cancel async function failed", t);
                }
                try {
                    TaskExecutionService.this.updateMetricsContextInImap();
                }
                catch (Throwable t) {
                    TaskExecutionService.this.logger.severe("update metrics context in imap failed", t);
                }
                if (ex == null) {
                    TaskExecutionService.this.logger.info(String.format("taskGroup %s complete with FINISHED", taskGroupLocation));
                    this.future.complete((Object)new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED));
                    return;
                }
                if (this.isCancel.get()) {
                    TaskExecutionService.this.logger.info(String.format("taskGroup %s complete with CANCELED", taskGroupLocation));
                    this.future.complete((Object)new TaskExecutionState(taskGroupLocation, ExecutionState.CANCELED));
                    return;
                }
                TaskExecutionService.this.logger.info(String.format("taskGroup %s complete with FAILED", taskGroupLocation));
                this.future.complete((Object)new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, ex));
            }
            if (!this.isCancel.get() && ex != null) {
                TaskExecutionService.this.logger.info(String.format("task %s error with exception: [%s], cancel other task in taskGroup %s.", task.getTaskID(), ex, taskGroupLocation));
                this.cancelAllTask(taskGroupLocation);
            }
        }

        private void recycleClassLoader(TaskGroupLocation taskGroupLocation) {
            TaskGroupContext context = (TaskGroupContext)TaskExecutionService.this.executionContexts.get(taskGroupLocation);
            ((TaskGroupContext)TaskExecutionService.this.executionContexts.get(taskGroupLocation)).setClassLoaders(null);
            for (Collection<URL> jars : context.getJars().values()) {
                TaskExecutionService.this.classLoaderService.releaseClassLoader(taskGroupLocation.getJobId(), jars);
            }
        }

        boolean executionCompletedExceptionally() {
            return this.executionException.get() != null;
        }
    }

    public final class RunBusWorkSupplier {
        ExecutorService executorService;
        LinkedBlockingDeque<TaskTracker> taskQueue;

        public RunBusWorkSupplier(ExecutorService executorService, LinkedBlockingDeque<TaskTracker> taskqueue) {
            this.executorService = executorService;
            this.taskQueue = taskqueue;
        }

        public boolean runNewBusWork(boolean checkTaskQueue) {
            if (!checkTaskQueue || !this.taskQueue.isEmpty()) {
                LinkedBlockingQueue futureBlockingQueue = new LinkedBlockingQueue();
                CooperativeTaskWorker cooperativeTaskWorker = new CooperativeTaskWorker(this.taskQueue, this, futureBlockingQueue);
                Future<?> submit = this.executorService.submit(cooperativeTaskWorker);
                futureBlockingQueue.add(submit);
                return true;
            }
            return false;
        }
    }

    public final class CooperativeTaskWorker
    implements Runnable {
        AtomicBoolean keep = new AtomicBoolean(true);
        public AtomicReference<TaskTracker> exclusiveTaskTracker = new AtomicReference();
        final TaskCallTimer timer;
        private Thread myThread;
        public LinkedBlockingDeque<TaskTracker> taskQueue;
        private Future<?> thisTaskFuture;
        private BlockingQueue<Future<?>> futureBlockingQueue;

        public CooperativeTaskWorker(LinkedBlockingDeque<TaskTracker> taskQueue, RunBusWorkSupplier runBusWorkSupplier, BlockingQueue<Future<?>> futureBlockingQueue) {
            TaskExecutionService.this.logger.info(String.format("Created new BusWork : %s", this.hashCode()));
            this.taskQueue = taskQueue;
            this.timer = new TaskCallTimer(50L, this.keep, runBusWorkSupplier, this);
            this.futureBlockingQueue = futureBlockingQueue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            this.thisTaskFuture = this.futureBlockingQueue.take();
            this.futureBlockingQueue = null;
            this.myThread = Thread.currentThread();
            while (this.keep.get() && TaskExecutionService.this.isRunning) {
                ProgressState call;
                TaskGroupExecutionTracker taskGroupExecutionTracker;
                TaskTracker taskTracker;
                block18: {
                    taskTracker = null != this.exclusiveTaskTracker.get() ? this.exclusiveTaskTracker.get() : this.taskQueue.takeFirst();
                    taskGroupExecutionTracker = taskTracker.taskGroupExecutionTracker;
                    if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
                        taskGroupExecutionTracker.taskDone(taskTracker.task);
                        if (null == this.exclusiveTaskTracker.get()) continue;
                        break;
                    }
                    taskGroupExecutionTracker.currRunningTaskFuture.put(taskTracker.task.getTaskID(), this.thisTaskFuture);
                    if (null == this.exclusiveTaskTracker.get()) {
                        this.timer.timerStart(taskTracker);
                    }
                    call = null;
                    try {
                        this.myThread.setContextClassLoader(((TaskGroupContext)TaskExecutionService.this.executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())).getClassLoaders().get(taskTracker.task.getTaskID()));
                        call = taskTracker.task.call();
                        TaskCallTimer taskCallTimer = this.timer;
                        synchronized (taskCallTimer) {
                            this.timer.timerStop();
                        }
                    }
                    catch (InterruptedException e) {
                        if (taskGroupExecutionTracker.executionException.get() == null && !taskGroupExecutionTracker.isCancel.get()) {
                            taskGroupExecutionTracker.exception(e);
                        }
                        taskGroupExecutionTracker.taskDone(taskTracker.task);
                        TaskExecutionService.this.logger.warning("Exception in " + taskTracker.task, (Throwable)e);
                        if (null == this.exclusiveTaskTracker.get()) break block18;
                        break;
                    }
                    catch (Throwable e) {
                        taskGroupExecutionTracker.exception(e);
                        taskGroupExecutionTracker.taskDone(taskTracker.task);
                        TaskExecutionService.this.logger.warning("Exception in " + taskTracker.task, e);
                        if (null == this.exclusiveTaskTracker.get()) break block18;
                        break;
                    }
                    finally {
                        this.timer.timerStop();
                        taskGroupExecutionTracker.currRunningTaskFuture.remove(taskTracker.task.getTaskID());
                    }
                }
                if (null == call) continue;
                if (call.isDone()) {
                    taskGroupExecutionTracker.taskDone(taskTracker.task);
                    if (null == this.exclusiveTaskTracker.get()) continue;
                    break;
                }
                if (null != this.exclusiveTaskTracker.get()) continue;
                this.taskQueue.offer(taskTracker);
            }
        }
    }

    private final class BlockingTaskThreadFactory
    implements ThreadFactory {
        private final AtomicInteger seq = new AtomicInteger();

        private BlockingTaskThreadFactory() {
        }

        @Override
        public Thread newThread(@NonNull Runnable r) {
            if (r == null) {
                throw new NullPointerException("r is marked non-null but is null");
            }
            return new Thread(r, String.format("hz.%s.seaTunnel.task.thread-%d", TaskExecutionService.this.hzInstanceName, this.seq.getAndIncrement()));
        }
    }

    private final class BlockingWorker
    implements Runnable {
        private final TaskTracker tracker;
        private final CountDownLatch startedLatch;

        private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {
            this.tracker = tracker;
            this.startedLatch = startedLatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            TaskGroupExecutionTracker taskGroupExecutionTracker = this.tracker.taskGroupExecutionTracker;
            ClassLoader classLoader = ((TaskGroupContext)TaskExecutionService.this.executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())).getClassLoaders().get(this.tracker.task.getTaskID());
            ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(classLoader);
            Task t = this.tracker.task;
            ProgressState result = null;
            try {
                this.startedLatch.countDown();
                t.init();
                while (!(result = t.call()).isDone() && TaskExecutionService.this.isRunning && !taskGroupExecutionTracker.executionCompletedExceptionally()) {
                }
            }
            catch (InterruptedException e) {
                TaskExecutionService.this.logger.warning(String.format("Interrupted task %d - %s", t.getTaskID(), t));
                if (taskGroupExecutionTracker.executionException.get() == null && !taskGroupExecutionTracker.isCancel.get()) {
                    taskGroupExecutionTracker.exception(e);
                }
            }
            catch (Throwable e) {
                if (taskGroupExecutionTracker.isCancel.get()) {
                    TaskExecutionService.this.logger.warning(String.format("Interrupted task %d - %s", t.getTaskID(), t));
                } else {
                    TaskExecutionService.this.logger.warning("Exception in " + t, e);
                }
                taskGroupExecutionTracker.exception(e);
            }
            finally {
                taskGroupExecutionTracker.taskDone(t);
                if (result == null || !result.isDone()) {
                    try {
                        this.tracker.task.close();
                    }
                    catch (IOException e) {
                        TaskExecutionService.this.logger.severe("Close task error", (Throwable)e);
                    }
                }
            }
            Thread.currentThread().setContextClassLoader(oldClassLoader);
        }
    }
}

