/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.recon.tasks;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.ReconConstants;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.ozone.recon.tasks.types.NamedCallableTask;
import org.apache.hadoop.ozone.recon.tasks.types.TaskExecutionException;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReconTaskControllerImpl
implements ReconTaskController {
    private static final Logger LOG = LoggerFactory.getLogger(ReconTaskControllerImpl.class);
    private Map<String, ReconOmTask> reconOmTasks;
    private ExecutorService executorService;
    private final int threadCount;
    private Map<String, AtomicInteger> taskFailureCounter = new HashMap<String, AtomicInteger>();
    private static final int TASK_FAILURE_THRESHOLD = 2;
    private final ReconTaskStatusUpdaterManager taskStatusUpdaterManager;

    @Inject
    public ReconTaskControllerImpl(OzoneConfiguration configuration, Set<ReconOmTask> tasks, ReconTaskStatusUpdaterManager taskStatusUpdaterManager) {
        this.reconOmTasks = new HashMap<String, ReconOmTask>();
        this.threadCount = configuration.getInt("ozone.recon.task.thread.count", 8);
        this.taskStatusUpdaterManager = taskStatusUpdaterManager;
        for (ReconOmTask task : tasks) {
            this.registerTask(task);
        }
    }

    @Override
    public void registerTask(ReconOmTask task) {
        String taskName = task.getTaskName();
        LOG.info("Registered task {} with controller.", (Object)taskName);
        this.reconOmTasks.put(taskName, task);
        this.taskFailureCounter.put(taskName, new AtomicInteger(0));
    }

    @Override
    public synchronized void consumeOMEvents(OMUpdateEventBatch events, OMMetadataManager omMetadataManager) {
        if (!events.isEmpty()) {
            ReconOmTask task;
            ArrayList<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new ArrayList<NamedCallableTask<ReconOmTask.TaskResult>>();
            ArrayList<ReconOmTask.TaskResult> failedTasks = new ArrayList<ReconOmTask.TaskResult>();
            for (Map.Entry<String, ReconOmTask> entry : this.reconOmTasks.entrySet()) {
                ReconOmTask task2 = entry.getValue();
                ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(task2.getTaskName());
                taskStatusUpdater.recordRunStart();
                tasks.add(new NamedCallableTask<ReconOmTask.TaskResult>(task2.getTaskName(), () -> task2.process(events, Collections.emptyMap())));
            }
            this.processTasks(tasks, events, failedTasks);
            ArrayList<ReconOmTask.TaskResult> retryFailedTasks = new ArrayList<ReconOmTask.TaskResult>();
            if (!failedTasks.isEmpty()) {
                tasks.clear();
                for (ReconOmTask.TaskResult taskResult : failedTasks) {
                    task = this.reconOmTasks.get(taskResult.getTaskName());
                    tasks.add(new NamedCallableTask<ReconOmTask.TaskResult>(task.getTaskName(), () -> task.process(events, taskResult.getSubTaskSeekPositions())));
                }
                this.processTasks(tasks, events, retryFailedTasks);
            }
            ReconConstants.resetTableTruncatedFlags();
            if (!retryFailedTasks.isEmpty()) {
                tasks.clear();
                for (ReconOmTask.TaskResult taskResult : failedTasks) {
                    task = this.reconOmTasks.get(taskResult.getTaskName());
                    tasks.add(new NamedCallableTask<ReconOmTask.TaskResult>(task.getTaskName(), () -> task.reprocess(omMetadataManager)));
                }
                ArrayList<ReconOmTask.TaskResult> arrayList = new ArrayList<ReconOmTask.TaskResult>();
                this.processTasks(tasks, events, arrayList);
                this.ignoreFailedTasks(arrayList);
            }
        }
    }

    private void ignoreFailedTasks(List<ReconOmTask.TaskResult> failedTasks) {
        for (ReconOmTask.TaskResult taskResult : failedTasks) {
            String taskName = taskResult.getTaskName();
            LOG.info("Reprocess step failed for task {}.", (Object)taskName);
            if (this.taskFailureCounter.get(taskName).incrementAndGet() <= 2) continue;
            LOG.info("Ignoring task since it failed retry and reprocess more than {} times.", (Object)2);
            this.reconOmTasks.remove(taskName);
        }
    }

    @Override
    public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map<String, ReconOmTask> reconOmTaskMap) {
        ArrayList tasks = new ArrayList();
        Map<String, ReconOmTask> localReconOmTaskMap = reconOmTaskMap;
        if (reconOmTaskMap == null) {
            localReconOmTaskMap = this.reconOmTasks;
        }
        ReconConstants.resetTableTruncatedFlags();
        localReconOmTaskMap.values().forEach(task -> {
            ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
            taskStatusUpdater.recordRunStart();
            tasks.add(new NamedCallableTask<ReconOmTask.TaskResult>(task.getTaskName(), () -> task.reprocess(omMetadataManager)));
        });
        try {
            CompletableFuture.allOf((CompletableFuture[])tasks.stream().map(task -> ((CompletableFuture)CompletableFuture.supplyAsync(() -> {
                try {
                    return (ReconOmTask.TaskResult)task.call();
                }
                catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    throw new TaskExecutionException(task.getTaskName(), e);
                }
            }, this.executorService).thenAccept(result -> {
                String taskName = result.getTaskName();
                ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
                if (!result.isTaskSuccess()) {
                    LOG.error("Init failed for task {}.", (Object)taskName);
                    taskStatusUpdater.setLastTaskRunStatus(-1);
                } else {
                    taskStatusUpdater.setLastTaskRunStatus(0);
                    taskStatusUpdater.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB());
                }
                taskStatusUpdater.recordRunCompletion();
            })).exceptionally(ex -> {
                LOG.error("Task failed with exception: ", ex);
                if (ex.getCause() instanceof TaskExecutionException) {
                    TaskExecutionException taskEx = (TaskExecutionException)ex.getCause();
                    String taskName = taskEx.getTaskName();
                    LOG.error("The above error occurred while trying to execute task: {}", (Object)taskName);
                    ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
                    taskStatusUpdater.setLastTaskRunStatus(-1);
                    taskStatusUpdater.recordRunCompletion();
                }
                return null;
            })).toArray(CompletableFuture[]::new)).join();
        }
        catch (CompletionException ce) {
            LOG.error("Completing all tasks failed with exception ", (Throwable)ce);
        }
        catch (CancellationException ce) {
            LOG.error("Some tasks were cancelled with exception", (Throwable)ce);
        }
    }

    @Override
    public Map<String, ReconOmTask> getRegisteredTasks() {
        return this.reconOmTasks;
    }

    @Override
    public synchronized void start() {
        LOG.info("Starting Recon Task Controller.");
        this.executorService = Executors.newFixedThreadPool(this.threadCount, new ThreadFactoryBuilder().setNameFormat("ReconTaskThread-%d").build());
    }

    @Override
    public synchronized void stop() {
        LOG.info("Stopping Recon Task Controller.");
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    private void processTasks(Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks, OMUpdateEventBatch events, List<ReconOmTask.TaskResult> failedTasks) {
        List<CompletableFuture> futures = tasks.stream().map(task -> ((CompletableFuture)CompletableFuture.supplyAsync(() -> {
            try {
                return (ReconOmTask.TaskResult)task.call();
            }
            catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                throw new TaskExecutionException(task.getTaskName(), e);
            }
        }, this.executorService).thenAccept(result -> {
            String taskName = result.getTaskName();
            ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
            if (!result.isTaskSuccess()) {
                LOG.error("Task {} failed", (Object)taskName);
                failedTasks.add(new ReconOmTask.TaskResult.Builder().setTaskName(taskName).setSubTaskSeekPositions(result.getSubTaskSeekPositions()).build());
                taskStatusUpdater.setLastTaskRunStatus(-1);
            } else {
                this.taskFailureCounter.get(taskName).set(0);
                taskStatusUpdater.setLastTaskRunStatus(0);
                taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
            }
            taskStatusUpdater.recordRunCompletion();
        })).exceptionally(ex -> {
            LOG.error("Task failed with exception: ", ex);
            if (ex.getCause() instanceof TaskExecutionException) {
                TaskExecutionException taskEx = (TaskExecutionException)ex.getCause();
                String taskName = taskEx.getTaskName();
                LOG.error("The above error occurred while trying to execute task: {}", (Object)taskName);
                ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
                taskStatusUpdater.setLastTaskRunStatus(-1);
                taskStatusUpdater.recordRunCompletion();
            }
            return null;
        })).collect(Collectors.toList());
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        }
        catch (CompletionException ce) {
            LOG.error("Completing all tasks failed with exception ", (Throwable)ce);
        }
        catch (CancellationException ce) {
            LOG.error("Some tasks were cancelled with exception", (Throwable)ce);
        }
    }
}

