/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
import org.apache.gobblin.broker.iface.ScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.CountUpAndDownLatch;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskIFaceWrapper;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
public class GobblinMultiTaskAttempt {
    private static final String TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX = ".suc";
    private final Logger log;
    private final Iterator<WorkUnit> workUnits;
    private final String jobId;
    private final JobState jobState;
    private final TaskStateTracker taskStateTracker;
    private final TaskExecutor taskExecutor;
    private final Optional<String> containerIdOptional;
    private final Optional<StateStore<TaskState>> taskStateStoreOptional;
    private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
    private List<Task> tasks;
    private List<CommitStep> cleanupCommitSteps;

    public GobblinMultiTaskAttempt(Iterator<WorkUnit> workUnits, String jobId, JobState jobState, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, Optional<String> containerIdOptional, Optional<StateStore<TaskState>> taskStateStoreOptional, SharedResourcesBroker<GobblinScopeTypes> jobBroker) {
        this.workUnits = workUnits;
        this.jobId = jobId;
        this.jobState = jobState;
        this.taskStateTracker = taskStateTracker;
        this.taskExecutor = taskExecutor;
        this.containerIdOptional = containerIdOptional;
        this.taskStateStoreOptional = taskStateStoreOptional;
        this.log = LoggerFactory.getLogger((String)(GobblinMultiTaskAttempt.class.getName() + "-" + (String)containerIdOptional.or((Object)"noattempt")));
        this.jobBroker = jobBroker;
        this.tasks = new ArrayList<Task>();
    }

    public void run() throws IOException, InterruptedException {
        if (!this.workUnits.hasNext()) {
            this.log.warn("No work units to run in container " + (String)this.containerIdOptional.or((Object)""));
            return;
        }
        CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
        this.tasks = this.runWorkUnits(countDownLatch);
        this.log.info("Waiting for submitted tasks of job {} to complete in container {}...", (Object)this.jobId, this.containerIdOptional.or((Object)""));
        while (countDownLatch.getCount() > 0L) {
            this.log.info(String.format("%d out of %d tasks of job %s are running in container %s", countDownLatch.getCount(), countDownLatch.getRegisteredParties(), this.jobId, this.containerIdOptional.or((Object)"")));
            if (!countDownLatch.await(10L, TimeUnit.SECONDS)) continue;
        }
        this.log.info("All assigned tasks of job {} have completed in container {}", (Object)this.jobId, this.containerIdOptional.or((Object)""));
    }

    public void commit() throws IOException {
        if (this.tasks == null || this.tasks.isEmpty()) {
            this.log.warn("No tasks to be committed in container " + (String)this.containerIdOptional.or((Object)""));
            return;
        }
        Iterator callableIterator = Iterators.transform(this.tasks.iterator(), (Function)new Function<Task, Callable<Void>>(){

            public Callable<Void> apply(final Task task) {
                return new Callable<Void>(){

                    @Override
                    @Nullable
                    public Void call() throws Exception {
                        task.commit();
                        return null;
                    }
                };
            }
        });
        try {
            List executionResults = new IteratorExecutor(callableIterator, this.getTaskCommitThreadPoolSize(), ExecutorsUtils.newDaemonThreadFactory((Optional)Optional.of((Object)this.log), (Optional)Optional.of((Object)"Task-committing-pool-%d"))).executeAndGetResults();
            IteratorExecutor.logFailures((List)executionResults, (Logger)this.log, (int)10);
        }
        catch (InterruptedException ie) {
            try {
                this.log.error("Committing of tasks interrupted. Aborting.");
                throw new RuntimeException(ie);
            }
            catch (Throwable throwable) {
                this.persistTaskStateStore();
                if (this.cleanupCommitSteps != null) {
                    for (CommitStep cleanupCommitStep : this.cleanupCommitSteps) {
                        this.log.info("Executing additional commit step.");
                        cleanupCommitStep.execute();
                    }
                }
                throw throwable;
            }
        }
        this.persistTaskStateStore();
        if (this.cleanupCommitSteps != null) {
            for (CommitStep cleanupCommitStep : this.cleanupCommitSteps) {
                this.log.info("Executing additional commit step.");
                cleanupCommitStep.execute();
            }
        }
    }

    public void shutdownTasks() throws InterruptedException {
        this.log.info("Shutting down tasks");
        for (Task task : this.tasks) {
            task.shutdown();
        }
        for (Task task : this.tasks) {
            task.awaitShutdown(1000L);
        }
        for (Task task : this.tasks) {
            if (task.cancel()) {
                this.log.info("Task {} cancelled.", (Object)task.getTaskId());
                continue;
            }
            this.log.info("Task {} could not be cancelled.", (Object)task.getTaskId());
        }
    }

    private void persistTaskStateStore() throws IOException {
        if (!this.taskStateStoreOptional.isPresent()) {
            this.log.info("Task state store does not exist.");
            return;
        }
        StateStore taskStateStore = (StateStore)this.taskStateStoreOptional.get();
        for (Task task : this.tasks) {
            String taskId = task.getTaskId();
            if (!taskStateStore.exists(this.jobId, taskId + ".tst")) continue;
            taskStateStore.delete(this.jobId, taskId + ".tst");
        }
        boolean hasTaskFailure = false;
        for (Task task : this.tasks) {
            this.log.info("Writing task state for task " + task.getTaskId());
            taskStateStore.put(task.getJobId(), task.getTaskId() + ".tst", (State)task.getTaskState());
            if (task.getTaskState().getWorkingState() != WorkUnitState.WorkingState.FAILED) continue;
            hasTaskFailure = true;
        }
        if (hasTaskFailure) {
            for (Task task : this.tasks) {
                if (task.getTaskState().contains("task.failure.exception")) {
                    this.log.error(String.format("Task %s failed due to exception: %s", task.getTaskId(), task.getTaskState().getProp("task.failure.exception")));
                }
                if (task.getTaskState().getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL && task.getTaskState().getWorkingState() != WorkUnitState.WorkingState.COMMITTED) continue;
                taskStateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX, (State)task.getTaskState());
            }
            throw new IOException(String.format("Not all tasks running in container %s completed successfully", this.containerIdOptional.or((Object)"")));
        }
    }

    public boolean isSpeculativeExecutionSafe() {
        for (Task task : this.tasks) {
            if (task.isSpeculativeExecutionSafe()) continue;
            this.log.info("One task is not safe for speculative execution.");
            return false;
        }
        this.log.info("All tasks are safe for speculative execution.");
        return true;
    }

    private final int getTaskCommitThreadPoolSize() {
        return Integer.parseInt(this.jobState.getProp("taskexecutor.threadpool.size", Integer.toString(2)));
    }

    public void addCleanupCommitStep(CommitStep commitStep) {
        if (this.cleanupCommitSteps == null) {
            this.cleanupCommitSteps = Lists.newArrayList((Object[])new CommitStep[]{commitStep});
        } else {
            this.cleanupCommitSteps.add(commitStep);
        }
    }

    private boolean taskSuccessfulInPriorAttempt(String taskId) {
        if (this.taskStateStoreOptional.isPresent()) {
            StateStore taskStateStore = (StateStore)this.taskStateStoreOptional.get();
            try {
                if (taskStateStore.exists(this.jobId, taskId + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX)) {
                    this.log.info("Skipping task {} that successfully executed in a prior attempt.", (Object)taskId);
                    return true;
                }
            }
            catch (IOException e) {
                return false;
            }
        }
        return false;
    }

    private List<Task> runWorkUnits(CountUpAndDownLatch countDownLatch) {
        ArrayList tasks = Lists.newArrayList();
        while (this.workUnits.hasNext()) {
            WorkUnit workUnit = this.workUnits.next();
            String taskId = workUnit.getProp("task.id");
            if (this.taskSuccessfulInPriorAttempt(taskId)) continue;
            SubscopedBrokerBuilder taskBrokerBuilder = this.jobBroker.newSubscopedBuilder((ScopeInstance)new TaskScopeInstance(taskId));
            WorkUnitState workUnitState = new WorkUnitState(workUnit, (State)this.jobState, taskBrokerBuilder);
            workUnitState.setId(taskId);
            workUnitState.setProp("job.id", (Object)this.jobId);
            workUnitState.setProp("task.id", (Object)taskId);
            if (this.containerIdOptional.isPresent()) {
                workUnitState.setProp("task.AttemptId", this.containerIdOptional.get());
            }
            Task task = null;
            try {
                countDownLatch.countUp();
                task = this.createTaskRunnable(workUnitState, countDownLatch);
                this.taskStateTracker.registerNewTask(task);
                task.setTaskFuture(this.taskExecutor.submit(task));
                tasks.add(task);
            }
            catch (Exception e) {
                if (task == null) {
                    countDownLatch.countDown();
                    this.log.error("Could not create task for workunit {}", (Object)workUnit, (Object)e);
                    continue;
                }
                if (!task.hasTaskFuture()) {
                    this.taskStateTracker.onTaskRunCompletion(task);
                    this.log.error("Could not submit task for workunit {}", (Object)workUnit, (Object)e);
                    continue;
                }
                task.cancel();
                this.log.error("Failure after task submitted for workunit {}", (Object)workUnit, (Object)e);
            }
        }
        new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), "gobblin.runtime").build().submit("TasksSubmitted", new String[]{"tasksCount", Long.toString(countDownLatch.getRegisteredParties())});
        return tasks;
    }

    private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
        Optional<TaskFactory> taskFactoryOpt = TaskUtils.getTaskFactory((State)workUnitState);
        if (taskFactoryOpt.isPresent()) {
            return new TaskIFaceWrapper(((TaskFactory)taskFactoryOpt.get()).createTask(new TaskContext(workUnitState)), new TaskContext(workUnitState), countDownLatch, this.taskStateTracker);
        }
        return new Task(new TaskContext(workUnitState), this.taskStateTracker, this.taskExecutor, (Optional<CountDownLatch>)Optional.of((Object)countDownLatch));
    }

    public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException {
        this.run();
        if (multiTaskAttemptCommitPolicy.equals((Object)CommitPolicy.IMMEDIATE)) {
            this.log.info("Will commit tasks directly.");
            this.commit();
        } else if (!this.isSpeculativeExecutionSafe()) {
            throw new RuntimeException("Specualtive execution is enabled. However, the task context is not safe for speculative execution.");
        }
    }

    public static GobblinMultiTaskAttempt runWorkUnits(JobContext jobContext, Iterator<WorkUnit> workUnits, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException {
        GobblinMultiTaskAttempt multiTaskAttempt = new GobblinMultiTaskAttempt(workUnits, jobContext.getJobId(), jobContext.getJobState(), taskStateTracker, taskExecutor, (Optional<String>)Optional.absent(), (Optional<StateStore<TaskState>>)Optional.absent(), jobContext.getJobBroker());
        multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
        return multiTaskAttempt;
    }

    public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String containerId, JobState jobState, List<WorkUnit> workUnits, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, StateStore<TaskState> taskStateStore, CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker) throws IOException, InterruptedException {
        if (jobState.getPropAsBoolean("workunit.enableTrackingLogs")) {
            Logger log = LoggerFactory.getLogger((String)GobblinMultiTaskAttempt.class.getName());
            log.info("Work unit tracking log: {}", workUnits);
        }
        GobblinMultiTaskAttempt multiTaskAttempt = new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, taskStateTracker, taskExecutor, (Optional<String>)Optional.of((Object)containerId), (Optional<StateStore<TaskState>>)Optional.of(taskStateStore), jobBroker);
        multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
        return multiTaskAttempt;
    }

    public static enum CommitPolicy {
        IMMEDIATE,
        CUSTOMIZED;

    }
}

