/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.commons.math3.util.Pair;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
import org.apache.gobblin.broker.iface.ScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.CountUpAndDownLatch;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskCreationException;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskIFaceWrapper;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.runtime.troubleshooter.InMemoryIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.TaskEventMetadataUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.util.retry.RetryerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
public class GobblinMultiTaskAttempt {
    private static final String TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX = ".suc";
    private final Logger log;
    private final Iterator<WorkUnit> workUnits;
    private final String jobId;
    private final String attemptId;
    private final JobState jobState;
    private final TaskStateTracker taskStateTracker;
    private final TaskExecutor taskExecutor;
    private final Optional<String> containerIdOptional;
    private final Optional<StateStore<TaskState>> taskStateStoreOptional;
    private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
    private final TaskEventMetadataGenerator taskEventMetadataGenerator;
    private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = gmta -> false;
    private List<Task> tasks;
    private volatile AtomicBoolean stopped = new AtomicBoolean(false);
    private final IssueRepository issueRepository;
    private List<CommitStep> cleanupCommitSteps;

    public GobblinMultiTaskAttempt(Iterator<WorkUnit> workUnits, String jobId, JobState jobState, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, Optional<String> containerIdOptional, Optional<StateStore<TaskState>> taskStateStoreOptional, SharedResourcesBroker<GobblinScopeTypes> jobBroker) {
        this(workUnits, jobId, jobState, taskStateTracker, taskExecutor, containerIdOptional, taskStateStoreOptional, jobBroker, new InMemoryIssueRepository());
    }

    public GobblinMultiTaskAttempt(Iterator<WorkUnit> workUnits, String jobId, JobState jobState, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, Optional<String> containerIdOptional, Optional<StateStore<TaskState>> taskStateStoreOptional, SharedResourcesBroker<GobblinScopeTypes> jobBroker, IssueRepository issueRepository) {
        this.workUnits = workUnits;
        this.jobId = jobId;
        this.issueRepository = issueRepository;
        this.attemptId = this.getClass().getName() + "." + this.jobId;
        this.jobState = jobState;
        this.taskStateTracker = taskStateTracker;
        this.taskExecutor = taskExecutor;
        this.containerIdOptional = containerIdOptional;
        this.taskStateStoreOptional = taskStateStoreOptional;
        this.log = LoggerFactory.getLogger((String)(GobblinMultiTaskAttempt.class.getName() + "-" + (String)containerIdOptional.or((Object)"noattempt")));
        this.jobBroker = jobBroker;
        this.tasks = new ArrayList<Task>();
        this.taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator((State)jobState);
    }

    public void run() throws IOException, InterruptedException {
        if (!this.workUnits.hasNext()) {
            this.log.warn("No work units to run in container " + (String)this.containerIdOptional.or((Object)""));
            return;
        }
        CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
        Pair<List<Task>, Boolean> executionResult = this.runWorkUnits(countDownLatch);
        this.tasks = (List)executionResult.getFirst();
        if (this.tasks.isEmpty() && this.stopped.get()) {
            return;
        }
        if (!((Boolean)executionResult.getSecond()).booleanValue()) {
            throw new TaskCreationException("Failing in submitting at least one task before execution.");
        }
        this.log.info("Waiting for submitted tasks of job {} to complete in container {}...", (Object)this.jobId, this.containerIdOptional.or((Object)""));
        try {
            while (countDownLatch.getCount() > 0L) {
                if (this.interruptionPredicate.test(this)) {
                    this.log.info("Interrupting task execution due to satisfied predicate.");
                    this.interruptTaskExecution(countDownLatch);
                } else {
                    long totalTasks = countDownLatch.totalParties.get();
                    long runningTasks = countDownLatch.getCount();
                    this.log.info(String.format("%d out of %d tasks of job %s are running in container %s. %d tasks finished.", runningTasks, totalTasks, this.jobId, this.containerIdOptional.or((Object)""), totalTasks - runningTasks));
                    if (!countDownLatch.await(10L, TimeUnit.SECONDS)) continue;
                }
                break;
            }
        }
        catch (InterruptedException interrupt) {
            this.log.info("Job interrupted by InterruptedException.");
            this.interruptTaskExecution(countDownLatch);
        }
        this.log.info("All assigned tasks of job {} have completed in container {}", (Object)this.jobId, this.containerIdOptional.or((Object)""));
    }

    private void interruptTaskExecution(CountDownLatch countDownLatch) throws InterruptedException {
        this.log.info("Job interrupted. Attempting a graceful shutdown of the job.");
        this.tasks.forEach(Task::shutdown);
        if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
            this.log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
            try {
                this.taskExecutor.shutDown();
            }
            catch (Throwable t) {
                throw new RuntimeException("Failed to shutdown task executor.", t);
            }
        }
    }

    public void commit() throws IOException {
        if (this.tasks == null || this.tasks.isEmpty()) {
            this.log.warn("No tasks to be committed in container " + (String)this.containerIdOptional.or((Object)""));
            return;
        }
        Iterator callableIterator = Iterators.transform(this.tasks.iterator(), (Function)new Function<Task, Callable<Void>>(){

            public Callable<Void> apply(final Task task) {
                return new Callable<Void>(){

                    @Override
                    @Nullable
                    public Void call() throws Exception {
                        task.commit();
                        return null;
                    }
                };
            }
        });
        try {
            List executionResults = new IteratorExecutor(callableIterator, this.getTaskCommitThreadPoolSize(), ExecutorsUtils.newDaemonThreadFactory((Optional)Optional.of((Object)this.log), (Optional)Optional.of((Object)"Task-committing-pool-%d"))).executeAndGetResults();
            IteratorExecutor.logFailures((List)executionResults, (Logger)this.log, (int)10);
            this.reportTaskIssues();
        }
        catch (InterruptedException ie) {
            try {
                this.log.error("Committing of tasks interrupted. Aborting.");
                throw new RuntimeException(ie);
            }
            catch (Throwable throwable) {
                this.reportTaskIssues();
                this.persistTaskStateStore();
                if (this.cleanupCommitSteps != null) {
                    for (CommitStep cleanupCommitStep : this.cleanupCommitSteps) {
                        this.log.info("Executing additional commit step.");
                        cleanupCommitStep.execute();
                    }
                }
                throw throwable;
            }
        }
        this.persistTaskStateStore();
        if (this.cleanupCommitSteps != null) {
            for (CommitStep cleanupCommitStep : this.cleanupCommitSteps) {
                this.log.info("Executing additional commit step.");
                cleanupCommitStep.execute();
            }
        }
    }

    private void reportTaskIssues() {
        if (this.issueRepository == null) {
            this.log.info("Automatic troubleshooting is not configured for this task. Make sure to pass issue repository to turn it on.");
            return;
        }
        try {
            for (Task task : this.tasks) {
                task.getTaskState().setTaskIssues(this.issueRepository.getAll());
            }
        }
        catch (TroubleshooterException e) {
            this.log.warn("Failed to save task issues", (Throwable)e);
        }
    }

    public synchronized void shutdownTasks() throws InterruptedException {
        this.log.info("Shutting down tasks");
        for (Task task : this.tasks) {
            task.shutdown();
        }
        for (Task task : this.tasks) {
            task.awaitShutdown(1000L);
        }
        for (Task task : this.tasks) {
            if (task.cancel()) {
                this.log.info("Task {} cancelled.", (Object)task.getTaskId());
                continue;
            }
            this.log.info("Task {} could not be cancelled.", (Object)task.getTaskId());
        }
        this.stopped.set(true);
    }

    /*
     * WARNING - void declaration
     */
    private void persistTaskStateStore() throws IOException {
        if (!this.taskStateStoreOptional.isPresent()) {
            this.log.info("Task state store does not exist.");
            return;
        }
        StateStore taskStateStore = (StateStore)this.taskStateStoreOptional.get();
        for (Task task : this.tasks) {
            String taskId = task.getTaskId();
            if (!taskStateStore.exists(this.jobId, taskId + ".tst")) continue;
            taskStateStore.delete(this.jobId, taskId + ".tst");
        }
        boolean hasTaskFailure = false;
        for (Task task : this.tasks) {
            this.log.info("Writing task state for task " + task.getTaskId());
            taskStateStore.put(task.getJobId(), task.getTaskId() + ".tst", (State)task.getTaskState());
            if (task.getTaskState().getWorkingState() != WorkUnitState.WorkingState.FAILED) continue;
            hasTaskFailure = true;
        }
        if (hasTaskFailure) {
            void var3_7;
            String string = String.format("Tasks in container %s failed", this.containerIdOptional.or((Object)""));
            for (Task task : this.tasks) {
                if (task.getTaskState().contains("task.failure.exception")) {
                    String string2 = String.format("Task failed: %s (Gobblin task id %s, container id %s)", task.getTaskState().getProp("task.failure.exception"), task.getTaskId(), this.containerIdOptional.or((Object)""));
                }
                if (task.getTaskState().getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL && task.getTaskState().getWorkingState() != WorkUnitState.WorkingState.COMMITTED) continue;
                taskStateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX, (State)task.getTaskState());
            }
            throw new IOException((String)var3_7);
        }
    }

    public boolean isSpeculativeExecutionSafe() {
        for (Task task : this.tasks) {
            if (task.isSpeculativeExecutionSafe()) continue;
            this.log.info("One task is not safe for speculative execution.");
            return false;
        }
        this.log.info("All tasks are safe for speculative execution.");
        return true;
    }

    private final int getTaskCommitThreadPoolSize() {
        return Integer.parseInt(this.jobState.getProp("taskexecutor.threadpool.size", Integer.toString(2)));
    }

    public void addCleanupCommitStep(CommitStep commitStep) {
        if (this.cleanupCommitSteps == null) {
            this.cleanupCommitSteps = Lists.newArrayList((Object[])new CommitStep[]{commitStep});
        } else {
            this.cleanupCommitSteps.add(commitStep);
        }
    }

    private boolean taskSuccessfulInPriorAttempt(String taskId) {
        if (this.taskStateStoreOptional.isPresent()) {
            StateStore taskStateStore = (StateStore)this.taskStateStoreOptional.get();
            try {
                if (taskStateStore.exists(this.jobId, taskId + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX)) {
                    this.log.info("Skipping task {} that successfully executed in a prior attempt.", (Object)taskId);
                    return true;
                }
            }
            catch (IOException e) {
                return false;
            }
        }
        return false;
    }

    private synchronized Pair<List<Task>, Boolean> runWorkUnits(CountUpAndDownLatch countDownLatch) {
        ArrayList tasks = Lists.newArrayList();
        if (this.stopped.get()) {
            return new Pair((Object)tasks, (Object)false);
        }
        boolean areAllTasksSubmitted = true;
        while (this.workUnits.hasNext()) {
            WorkUnit workUnit = this.workUnits.next();
            String taskId = workUnit.getProp("task.id");
            if (this.taskSuccessfulInPriorAttempt(taskId)) continue;
            SubscopedBrokerBuilder taskBrokerBuilder = this.jobBroker.newSubscopedBuilder((ScopeInstance)new TaskScopeInstance(taskId));
            WorkUnitState workUnitState = new WorkUnitState(workUnit, (State)this.jobState, taskBrokerBuilder);
            workUnitState.setId(taskId);
            workUnitState.setProp("job.id", (Object)this.jobId);
            workUnitState.setProp("task.id", (Object)taskId);
            workUnitState.setProp("task.startTimeMillis", (Object)Long.toString(System.currentTimeMillis()));
            if (this.containerIdOptional.isPresent()) {
                workUnitState.setProp("task.AttemptId", this.containerIdOptional.get());
            }
            Task task = null;
            try {
                countDownLatch.countUp();
                task = this.createTaskWithRetry(workUnitState, countDownLatch);
                this.taskStateTracker.registerNewTask(task);
                task.setTaskFuture(this.taskExecutor.submit(task));
                tasks.add(task);
            }
            catch (Throwable e) {
                if (e instanceof OutOfMemoryError) {
                    this.log.error("Encountering memory error in task creation/execution stage, please investigate memory usage:", e);
                    this.printMemoryUsage();
                }
                if (task == null) {
                    if (e instanceof RetryException) {
                        areAllTasksSubmitted = false;
                    }
                    countDownLatch.countDown();
                    this.log.error("Could not create task for workunit {}", (Object)workUnit, (Object)e);
                    continue;
                }
                if (!task.hasTaskFuture()) {
                    this.taskStateTracker.onTaskRunCompletion(task);
                    areAllTasksSubmitted = false;
                    this.log.error("Could not submit task for workunit {}", (Object)workUnit, (Object)e);
                    continue;
                }
                task.cancel();
                this.log.error("Failure after task submitted for workunit {}", (Object)workUnit, (Object)e);
            }
        }
        EventSubmitter.Builder eventSubmitterBuilder = new EventSubmitter.Builder(JobMetrics.get(this.jobId, new JobMetrics.CreatorTag(this.attemptId)).getMetricContext(), "gobblin.runtime");
        eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata((State)this.jobState, "TasksSubmitted"));
        eventSubmitterBuilder.build().submit("TasksSubmitted", new String[]{"tasksCount", Integer.toString(tasks.size())});
        return new Pair((Object)tasks, (Object)areAllTasksSubmitted);
    }

    private void printMemoryUsage() {
        MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
        MemoryUsage heapMemory = memoryBean.getHeapMemoryUsage();
        MemoryUsage nonHeapMemory = memoryBean.getNonHeapMemoryUsage();
        String format = "%-15s%-15s%-15s%-15s";
        this.log.info("Heap Memory");
        this.log.info(String.format(format, "init", "used", "Committed", "max"));
        this.log.info(String.format(format, heapMemory.getInit(), heapMemory.getUsed(), heapMemory.getCommitted(), heapMemory.getMax()));
        this.log.info("Non-heap Memory");
        this.log.info(String.format(format, "init", "used", "Committed", "max"));
        this.log.info(String.format(format, nonHeapMemory.getInit(), nonHeapMemory.getUsed(), nonHeapMemory.getCommitted(), nonHeapMemory.getMax()));
    }

    private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
        Optional<TaskFactory> taskFactoryOpt = TaskUtils.getTaskFactory((State)workUnitState);
        TaskContext taskContext = new TaskContext(workUnitState);
        if (taskFactoryOpt.isPresent()) {
            return new TaskIFaceWrapper(((TaskFactory)taskFactoryOpt.get()).createTask(taskContext), taskContext, countDownLatch, this.taskStateTracker);
        }
        return new Task(taskContext, this.taskStateTracker, this.taskExecutor, (Optional<CountDownLatch>)Optional.of((Object)countDownLatch));
    }

    @VisibleForTesting
    Task createTaskWithRetry(final WorkUnitState workUnitState, final CountDownLatch countDownLatch) throws RetryException {
        Properties defaultRetryConfig = new Properties();
        defaultRetryConfig.setProperty("time_out_ms", TimeUnit.MINUTES.toMillis(1L) + "");
        defaultRetryConfig.setProperty("interval_ms", TimeUnit.SECONDS.toMillis(2L) + "");
        Config config = ConfigUtils.propertiesToConfig((Properties)this.jobState.getProperties()).withFallback((ConfigMergeable)ConfigUtils.propertiesToConfig((Properties)defaultRetryConfig));
        Retryer retryer = RetryerFactory.newInstance((Config)config);
        final AtomicInteger counter = new AtomicInteger(0);
        try {
            return (Task)retryer.call((Callable)new Callable<Task>(){

                @Override
                public Task call() throws Exception {
                    counter.incrementAndGet();
                    GobblinMultiTaskAttempt.this.log.info(String.format("Task creation attempt %s", counter.get()));
                    return GobblinMultiTaskAttempt.this.createTaskRunnable(workUnitState, countDownLatch);
                }
            });
        }
        catch (ExecutionException ee) {
            throw new RuntimeException("Failure in executing retryer due to, ", ee);
        }
    }

    public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException {
        this.run();
        if (multiTaskAttemptCommitPolicy.equals((Object)CommitPolicy.IMMEDIATE)) {
            this.log.info("Will commit tasks directly.");
            this.commit();
        } else if (!this.isSpeculativeExecutionSafe()) {
            throw new RuntimeException("Speculative execution is enabled. However, the task context is not safe for speculative execution.");
        }
    }

    public void cleanMetrics() {
        this.tasks.forEach(task -> {
            TaskMetrics.remove(task);
            JobMetrics.attemptRemove(this.jobId, new JobMetrics.CreatorTag(task.getTaskId()));
        });
        JobMetrics.attemptRemove(this.jobId, new JobMetrics.CreatorTag(this.attemptId));
    }

    public static GobblinMultiTaskAttempt runWorkUnits(JobContext jobContext, Iterator<WorkUnit> workUnits, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException {
        GobblinMultiTaskAttempt multiTaskAttempt = new GobblinMultiTaskAttempt(workUnits, jobContext.getJobId(), jobContext.getJobState(), taskStateTracker, taskExecutor, (Optional<String>)Optional.absent(), (Optional<StateStore<TaskState>>)Optional.absent(), jobContext.getJobBroker(), jobContext.getIssueRepository());
        multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
        return multiTaskAttempt;
    }

    public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String containerId, JobState jobState, List<WorkUnit> workUnits, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, StateStore<TaskState> taskStateStore, CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker, IssueRepository issueRepository, Predicate<GobblinMultiTaskAttempt> interruptionPredicate) throws IOException, InterruptedException {
        if (jobState.getPropAsBoolean("workunit.enableTrackingLogs")) {
            Logger log = LoggerFactory.getLogger((String)GobblinMultiTaskAttempt.class.getName());
            log.info("Work unit tracking log: {}", workUnits);
        }
        GobblinMultiTaskAttempt multiTaskAttempt = new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, taskStateTracker, taskExecutor, (Optional<String>)Optional.of((Object)containerId), (Optional<StateStore<TaskState>>)Optional.of(taskStateStore), jobBroker, issueRepository);
        multiTaskAttempt.setInterruptionPredicate(interruptionPredicate);
        multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
        return multiTaskAttempt;
    }

    public int getNumTasksCreated() {
        return this.tasks.size();
    }

    public void setInterruptionPredicate(Predicate<GobblinMultiTaskAttempt> interruptionPredicate) {
        this.interruptionPredicate = interruptionPredicate;
    }

    public AtomicBoolean getStopped() {
        return this.stopped;
    }

    public static enum CommitPolicy {
        IMMEDIATE,
        CUSTOMIZED;

    }
}

