/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.recon.tasks;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.ReconConstants;
import org.apache.hadoop.ozone.recon.metrics.ReconTaskControllerMetrics;
import org.apache.hadoop.ozone.recon.metrics.ReconTaskMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBuffer;
import org.apache.hadoop.ozone.recon.tasks.ReconEvent;
import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskReInitializationEvent;
import org.apache.hadoop.ozone.recon.tasks.types.NamedCallableTask;
import org.apache.hadoop.ozone.recon.tasks.types.TaskExecutionException;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReconTaskControllerImpl
implements ReconTaskController {
    private static final Logger LOG = LoggerFactory.getLogger(ReconTaskControllerImpl.class);
    private static final String REPROCESS_STAGING = "REPROCESS_STAGING";
    private final ReconDBProvider reconDBProvider;
    private final ReconContainerMetadataManager reconContainerMetadataManager;
    private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
    private final ReconGlobalStatsManager reconGlobalStatsManager;
    private final ReconFileMetadataManager reconFileMetadataManager;
    private Map<String, ReconOmTask> reconOmTasks;
    private ExecutorService executorService;
    private final int threadCount;
    private final ReconTaskStatusUpdaterManager taskStatusUpdaterManager;
    private final OMUpdateEventBuffer eventBuffer;
    private ExecutorService eventProcessingExecutor;
    private final AtomicBoolean tasksFailed = new AtomicBoolean(false);
    private volatile ReconOMMetadataManager currentOMMetadataManager;
    private final OzoneConfiguration configuration;
    private final ReconTaskControllerMetrics controllerMetrics;
    private final ReconTaskMetrics taskMetrics;
    private AtomicInteger eventProcessRetryCount = new AtomicInteger(0);
    private AtomicLong lastRetryTimestamp = new AtomicLong(0L);
    private static final int MAX_EVENT_PROCESS_RETRIES = 6;
    private static final long RETRY_DELAY_MS = 2000L;

    @Inject
    public ReconTaskControllerImpl(OzoneConfiguration configuration, Set<ReconOmTask> tasks, ReconTaskStatusUpdaterManager taskStatusUpdaterManager, ReconDBProvider reconDBProvider, ReconContainerMetadataManager reconContainerMetadataManager, ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconGlobalStatsManager reconGlobalStatsManager, ReconFileMetadataManager reconFileMetadataManager) {
        this.configuration = configuration;
        this.reconDBProvider = reconDBProvider;
        this.reconContainerMetadataManager = reconContainerMetadataManager;
        this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
        this.reconGlobalStatsManager = reconGlobalStatsManager;
        this.reconFileMetadataManager = reconFileMetadataManager;
        this.reconOmTasks = new HashMap<String, ReconOmTask>();
        this.threadCount = configuration.getInt("ozone.recon.task.thread.count", 8);
        this.taskStatusUpdaterManager = taskStatusUpdaterManager;
        this.controllerMetrics = ReconTaskControllerMetrics.create();
        this.taskMetrics = ReconTaskMetrics.create();
        int eventBufferCapacity = configuration.getInt("ozone.recon.om.event.buffer.capacity", 20000);
        this.eventBuffer = new OMUpdateEventBuffer(eventBufferCapacity, this.controllerMetrics);
        for (ReconOmTask task : tasks) {
            this.registerTask(task);
        }
    }

    @Override
    public void registerTask(ReconOmTask task) {
        String taskName = task.getTaskName();
        LOG.info("Registered task {} with controller.", (Object)taskName);
        this.reconOmTasks.put(taskName, task);
    }

    @Override
    public synchronized void consumeOMEvents(OMUpdateEventBatch events, OMMetadataManager omMetadataManager) {
        if (!events.isEmpty() && !this.hasTasksFailed()) {
            boolean buffered = this.eventBuffer.offer(events);
            if (!buffered) {
                LOG.error("Event buffer is full (capacity: {}). Dropping buffered events and signaling full snapshot. Buffer size: {}, Dropped batches: {}", new Object[]{20000, this.eventBuffer.getQueueSize(), this.eventBuffer.getDroppedBatches()});
                this.drainEventBufferAndCleanExistingCheckpoints();
            } else {
                LOG.debug("Buffered event batch with {} events. Buffer queue size: {}", (Object)events.getEvents().size(), (Object)this.eventBuffer.getQueueSize());
            }
        }
    }

    @Override
    public synchronized boolean reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map<String, ReconOmTask> reconOmTaskMap) {
        ReconDBProvider stagedReconDBProvider;
        LOG.info("Starting Re-initialization of tasks. This is a blocking operation.");
        ArrayList tasks = new ArrayList();
        Map<String, ReconOmTask> localReconOmTaskMap = reconOmTaskMap;
        if (reconOmTaskMap == null) {
            localReconOmTaskMap = this.reconOmTasks;
        }
        ReconConstants.resetTableTruncatedFlags();
        try {
            ReconTaskStatusUpdater reprocessTaskStatus = this.taskStatusUpdaterManager.getTaskStatusUpdater(REPROCESS_STAGING);
            reprocessTaskStatus.recordRunStart();
            stagedReconDBProvider = this.reconDBProvider.getStagedReconDBProvider();
        }
        catch (IOException e) {
            LOG.error("Failed to get staged Recon DB provider for reinitialization of tasks.", (Throwable)e);
            this.controllerMetrics.incrReprocessCheckpointFailures();
            this.recordAllTaskStatus(localReconOmTaskMap, -1, -1L);
            return false;
        }
        localReconOmTaskMap.values().forEach(task -> {
            ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
            taskStatusUpdater.recordRunStart();
            tasks.add(new NamedCallableTask<ReconOmTask.TaskResult>(task.getTaskName(), () -> task.getStagedTask(omMetadataManager, stagedReconDBProvider.getDbStore()).reprocess(omMetadataManager)));
        });
        AtomicBoolean isRunSuccessful = new AtomicBoolean(true);
        try {
            CompletableFuture.allOf((CompletableFuture[])tasks.stream().map(task -> {
                long reprocessStartTime = Time.monotonicNow();
                return ((CompletableFuture)CompletableFuture.supplyAsync(() -> {
                    try {
                        ReconOmTask.TaskResult result = (ReconOmTask.TaskResult)task.call();
                        return result;
                    }
                    catch (Exception e) {
                        this.taskMetrics.incrTaskReprocessFailures(task.getTaskName());
                        if (e instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        throw new TaskExecutionException(task.getTaskName(), e);
                    }
                }, this.executorService).thenAccept(result -> {
                    long reprocessDuration = Time.monotonicNow() - reprocessStartTime;
                    this.taskMetrics.updateTaskReprocessDuration(task.getTaskName(), reprocessDuration);
                    if (!result.isTaskSuccess()) {
                        String taskName = result.getTaskName();
                        LOG.error("Init failed for task {}.", (Object)taskName);
                        this.taskMetrics.incrTaskReprocessFailures(taskName);
                        isRunSuccessful.set(false);
                    }
                })).exceptionally(ex -> {
                    LOG.error("Task failed with exception: ", ex);
                    isRunSuccessful.set(false);
                    if (ex.getCause() instanceof TaskExecutionException) {
                        TaskExecutionException taskEx = (TaskExecutionException)ex.getCause();
                        String taskName = taskEx.getTaskName();
                        this.taskMetrics.incrTaskReprocessFailures(taskName);
                        LOG.error("The above error occurred while trying to execute task: {}", (Object)taskName);
                    }
                    return null;
                });
            }).toArray(CompletableFuture[]::new)).join();
        }
        catch (CompletionException ce) {
            LOG.error("Completing all tasks failed with exception ", (Throwable)ce);
            isRunSuccessful.set(false);
        }
        catch (CancellationException ce) {
            LOG.error("Some tasks were cancelled with exception", (Throwable)ce);
            isRunSuccessful.set(false);
        }
        if (isRunSuccessful.get()) {
            try {
                this.reconDBProvider.replaceStagedDb(stagedReconDBProvider);
                this.reconNamespaceSummaryManager.reinitialize(this.reconDBProvider);
                this.reconContainerMetadataManager.reinitialize(this.reconDBProvider);
                this.reconGlobalStatsManager.reinitialize(this.reconDBProvider);
                this.reconFileMetadataManager.reinitialize(this.reconDBProvider);
                this.recordAllTaskStatus(localReconOmTaskMap, 0, omMetadataManager.getLastSequenceNumberFromDB());
                this.controllerMetrics.incrReprocessSuccessCount();
                LOG.info("Re-initialization of tasks completed successfully.");
            }
            catch (Exception e) {
                LOG.error("Re-initialization of tasks failed.", (Throwable)e);
                this.controllerMetrics.incrReprocessStageDatabaseFailures();
                this.recordAllTaskStatus(localReconOmTaskMap, -1, -1L);
                try {
                    this.reconNamespaceSummaryManager.reinitialize(this.reconDBProvider);
                    this.reconContainerMetadataManager.reinitialize(this.reconDBProvider);
                    this.reconGlobalStatsManager.reinitialize(this.reconDBProvider);
                    this.reconFileMetadataManager.reinitialize(this.reconDBProvider);
                }
                catch (IOException ex) {
                    LOG.error("Re-initialization of task manager failed.", (Throwable)e);
                }
            }
        } else {
            LOG.error("Re-initialization of tasks failed.");
            this.controllerMetrics.incrReprocessExecutionFailures();
            try {
                stagedReconDBProvider.close();
            }
            catch (Exception e) {
                LOG.error("Close of recon container staged db handler is failed", (Throwable)e);
            }
            this.recordAllTaskStatus(localReconOmTaskMap, -1, -1L);
        }
        return isRunSuccessful.get();
    }

    private void recordAllTaskStatus(Map<String, ReconOmTask> localReconOmTaskMap, int status, long updateSeqNumber) {
        localReconOmTaskMap.values().forEach(task -> {
            ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
            if (status == 0) {
                taskStatusUpdater.setLastUpdatedSeqNumber(updateSeqNumber);
            }
            taskStatusUpdater.setLastTaskRunStatus(status);
            taskStatusUpdater.recordRunCompletion();
        });
        ReconTaskStatusUpdater reprocessTaskStatus = this.taskStatusUpdaterManager.getTaskStatusUpdater(REPROCESS_STAGING);
        if (status == 0) {
            reprocessTaskStatus.setLastUpdatedSeqNumber(updateSeqNumber);
        }
        reprocessTaskStatus.setLastTaskRunStatus(status);
        reprocessTaskStatus.recordRunCompletion();
    }

    @Override
    public Map<String, ReconOmTask> getRegisteredTasks() {
        return this.reconOmTasks;
    }

    @Override
    public synchronized void start() {
        LOG.info("Starting Recon Task Controller.");
        this.cleanupPreExistingCheckpoints();
        this.executorService = Executors.newFixedThreadPool(this.threadCount, new ThreadFactoryBuilder().setNameFormat("ReconTaskThread-%d").build());
        this.eventProcessingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("ReconEventProcessor-%d").build());
        this.eventProcessingExecutor.submit(this::processBufferedEventsAsync);
        LOG.info("Started async event processing thread.");
    }

    @Override
    public synchronized void stop() {
        LOG.info("Stopping Recon Task Controller.");
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
        if (this.eventProcessingExecutor != null) {
            this.eventProcessingExecutor.shutdownNow();
        }
    }

    private void processTasks(Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks, OMUpdateEventBatch events, List<ReconOmTask.TaskResult> failedTasks) {
        List<CompletableFuture> futures = tasks.stream().map(task -> ((CompletableFuture)CompletableFuture.supplyAsync(() -> {
            long taskStartTime = Time.monotonicNow();
            try {
                ReconOmTask.TaskResult result = (ReconOmTask.TaskResult)task.call();
                long taskDuration = Time.monotonicNow() - taskStartTime;
                this.taskMetrics.updateTaskDeltaProcessingDuration(task.getTaskName(), taskDuration);
                return result;
            }
            catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                throw new TaskExecutionException(task.getTaskName(), e);
            }
        }, this.executorService).thenAccept(result -> {
            String taskName = result.getTaskName();
            ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
            if (!result.isTaskSuccess()) {
                LOG.error("Task {} failed", (Object)taskName);
                this.taskMetrics.incrTaskDeltaProcessingFailures(taskName);
                failedTasks.add(new ReconOmTask.TaskResult.Builder().setTaskName(taskName).setSubTaskSeekPositions(result.getSubTaskSeekPositions()).build());
                taskStatusUpdater.setLastTaskRunStatus(-1);
            } else {
                this.taskMetrics.incrTaskDeltaProcessingSuccess(taskName);
                taskStatusUpdater.setLastTaskRunStatus(0);
                taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
            }
            taskStatusUpdater.recordRunCompletion();
        })).exceptionally(ex -> {
            LOG.error("Task failed with exception: ", ex);
            if (ex.getCause() instanceof TaskExecutionException) {
                TaskExecutionException taskEx = (TaskExecutionException)ex.getCause();
                String taskName = taskEx.getTaskName();
                LOG.error("The above error occurred while trying to execute task: {}", (Object)taskName);
                this.taskMetrics.incrTaskDeltaProcessingFailures(taskName);
                ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
                taskStatusUpdater.setLastTaskRunStatus(-1);
                taskStatusUpdater.recordRunCompletion();
            }
            return null;
        })).collect(Collectors.toList());
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        }
        catch (CompletionException ce) {
            LOG.error("Completing all tasks failed with exception ", (Throwable)ce);
        }
        catch (CancellationException ce) {
            LOG.error("Some tasks were cancelled with exception", (Throwable)ce);
        }
    }

    private void processBufferedEventsAsync() {
        LOG.info("Started async buffered event processing thread");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                ReconEvent event = this.eventBuffer.poll(1000L);
                if (event == null) continue;
                LOG.debug("Processing buffered event of type {} with {} events", (Object)event.getEventType(), (Object)event.getEventCount());
                this.processReconEvent(event);
            }
            catch (Exception e) {
                LOG.error("Error in async event processing thread", (Throwable)e);
            }
        }
        LOG.info("Async buffered event processing thread stopped");
    }

    @VisibleForTesting
    void processReconEvent(ReconEvent event) {
        switch (event.getEventType()) {
            case OM_UPDATE_BATCH: {
                this.processOMUpdateBatch((OMUpdateEventBatch)event);
                break;
            }
            case TASK_REINITIALIZATION: {
                this.processReInitializationEvent((ReconTaskReInitializationEvent)event);
                break;
            }
            default: {
                LOG.warn("Unknown event type: {}", (Object)event.getEventType());
            }
        }
    }

    private void processOMUpdateBatch(OMUpdateEventBatch events) {
        if (events.isEmpty()) {
            return;
        }
        ArrayList<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new ArrayList<NamedCallableTask<ReconOmTask.TaskResult>>();
        ArrayList<ReconOmTask.TaskResult> failedTasks = new ArrayList<ReconOmTask.TaskResult>();
        for (Map.Entry<String, ReconOmTask> taskEntry : this.reconOmTasks.entrySet()) {
            ReconOmTask task = taskEntry.getValue();
            ReconTaskStatusUpdater taskStatusUpdater = this.taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
            taskStatusUpdater.recordRunStart();
            tasks.add(new NamedCallableTask<ReconOmTask.TaskResult>(task.getTaskName(), () -> task.process(events, Collections.emptyMap())));
        }
        this.processTasks(tasks, events, failedTasks);
        ArrayList<ReconOmTask.TaskResult> retryFailedTasks = new ArrayList<ReconOmTask.TaskResult>();
        if (!failedTasks.isEmpty()) {
            LOG.warn("Some tasks failed while processing buffered events, retrying...");
            tasks.clear();
            for (ReconOmTask.TaskResult taskResult : failedTasks) {
                ReconOmTask task = this.reconOmTasks.get(taskResult.getTaskName());
                tasks.add(new NamedCallableTask<ReconOmTask.TaskResult>(task.getTaskName(), () -> task.process(events, taskResult.getSubTaskSeekPositions())));
            }
            this.processTasks(tasks, events, retryFailedTasks);
            if (!retryFailedTasks.isEmpty()) {
                LOG.warn("Some tasks still failed after retry while processing buffered events, signaling for task reinitialization");
                this.tasksFailed.compareAndSet(false, true);
            }
        }
    }

    @Override
    public boolean hasEventBufferOverflowed() {
        return this.eventBuffer.getDroppedBatches() > 0L;
    }

    public void resetEventBufferOverflowFlag() {
        this.eventBuffer.resetDroppedBatches();
    }

    @Override
    public boolean hasTasksFailed() {
        return this.tasksFailed.get();
    }

    public void resetTasksFailureFlag() {
        this.tasksFailed.compareAndSet(true, false);
    }

    @Override
    public synchronized ReconTaskController.ReInitializationResult queueReInitializationEvent(ReconTaskReInitializationEvent.ReInitializationReason reason) {
        LOG.info("Queueing task reinitialization event due to: {} (retry attempt count: {})", (Object)reason, (Object)this.eventProcessRetryCount.get());
        this.controllerMetrics.incrTotalReprocessSubmittedToQueue();
        ReconTaskController.ReInitializationResult reInitializationResult = this.validateRetryCountAndDelay();
        if (null != reInitializationResult) {
            return reInitializationResult;
        }
        this.drainEventBufferAndCleanExistingCheckpoints();
        ReconOMMetadataManager checkpointedOMMetadataManager = null;
        try {
            LOG.info("Attempting checkpoint creation (retry attempt: {})", (Object)(this.eventProcessRetryCount.get() + 1));
            checkpointedOMMetadataManager = this.createOMCheckpoint(this.currentOMMetadataManager);
            LOG.info("Checkpoint creation succeeded");
        }
        catch (IOException e) {
            LOG.error("Checkpoint creation failed: {}", (Object)e.getMessage());
            this.handleEventFailure();
            return ReconTaskController.ReInitializationResult.RETRY_LATER;
        }
        ReconTaskReInitializationEvent reinitEvent = new ReconTaskReInitializationEvent(reason, checkpointedOMMetadataManager);
        boolean queued = this.eventBuffer.offer(reinitEvent);
        if (queued) {
            this.resetEventFlags();
            LOG.info("Successfully queued reinitialization event after {} retries", (Object)(this.eventProcessRetryCount.get() + 1));
            return ReconTaskController.ReInitializationResult.SUCCESS;
        }
        return null;
    }

    private ReconTaskController.ReInitializationResult validateRetryCountAndDelay() {
        long currentTime = System.currentTimeMillis();
        if (this.eventProcessRetryCount.get() > 0) {
            long timeSinceLastRetry = currentTime - this.lastRetryTimestamp.get();
            if (timeSinceLastRetry < 2000L) {
                LOG.debug("Skipping retry, only {}ms since last retry attempt (need {}ms)", (Object)timeSinceLastRetry, (Object)2000L);
                return ReconTaskController.ReInitializationResult.RETRY_LATER;
            }
            LOG.info("Attempting retry (retry attempt count: {}, delay: {}ms)", (Object)(this.eventProcessRetryCount.get() + 1), (Object)timeSinceLastRetry);
        }
        return this.getEventRetryResult();
    }

    private void handleEventFailure() {
        long currentTime = System.currentTimeMillis();
        this.lastRetryTimestamp.set(currentTime);
        this.eventProcessRetryCount.getAndIncrement();
        this.tasksFailed.compareAndSet(false, true);
        LOG.error("Event processing failed {} times.", (Object)this.eventProcessRetryCount);
    }

    private ReconTaskController.ReInitializationResult getEventRetryResult() {
        if (this.eventProcessRetryCount.get() >= 6) {
            LOG.warn("Maximum iteration retries ({}) exceeded, resetting counters and signaling full snapshot fallback", (Object)6);
            this.resetRetryCounters();
            return ReconTaskController.ReInitializationResult.MAX_RETRIES_EXCEEDED;
        }
        return null;
    }

    public void drainEventBufferAndCleanExistingCheckpoints() {
        ArrayList drainedEvents = new ArrayList();
        int drainedCount = this.eventBuffer.drainTo(drainedEvents);
        if (drainedCount > 0) {
            LOG.info("Drained {} events from buffer before clearing. Checking for checkpoint cleanup.", (Object)drainedCount);
            for (ReconEvent event : drainedEvents) {
                ReconTaskReInitializationEvent reinitEvent;
                ReconOMMetadataManager checkpointedManager;
                if (!(event instanceof ReconTaskReInitializationEvent) || (checkpointedManager = (reinitEvent = (ReconTaskReInitializationEvent)event).getCheckpointedOMMetadataManager()) == null) continue;
                LOG.info("Cleaning up unprocessed checkpoint from drained ReconTaskReInitializationEvent");
                this.cleanupCheckpoint(checkpointedManager);
            }
        }
    }

    @Override
    public void updateOMMetadataManager(ReconOMMetadataManager omMetadataManager) {
        LOG.debug("Updating OM metadata manager");
        this.currentOMMetadataManager = omMetadataManager;
    }

    public ReconOMMetadataManager createOMCheckpoint(ReconOMMetadataManager omMetaManager) throws IOException {
        String parentPath = this.cleanTempCheckPointPath(omMetaManager);
        DBCheckpoint checkpoint = omMetaManager.getStore().getCheckpoint(parentPath, true);
        return omMetaManager.createCheckpointReconMetadataManager(this.configuration, checkpoint);
    }

    private String cleanTempCheckPointPath(ReconOMMetadataManager omMetaManager) throws IOException {
        File dbLocation = omMetaManager.getStore().getDbLocation();
        if (dbLocation == null) {
            throw new IOException("OM DB location is null");
        }
        String tempData = dbLocation.getParent();
        if (tempData == null) {
            throw new IOException("Parent OM DB dir is null");
        }
        File reinitTmpPath = Paths.get(tempData, "temp-recon-reinit-checkpoint_" + UUID.randomUUID()).toFile();
        FileUtils.deleteDirectory((File)reinitTmpPath);
        FileUtils.forceMkdir((File)reinitTmpPath);
        return reinitTmpPath.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processReInitializationEvent(ReconTaskReInitializationEvent event) {
        LOG.info("Processing reinitialization event: reason={}, timestamp={}", (Object)event.getReason(), (Object)event.getTimestamp());
        this.resetTasksFailureFlag();
        ReconOMMetadataManager checkpointedOMMetadataManager = event.getCheckpointedOMMetadataManager();
        try {
            if (checkpointedOMMetadataManager != null) {
                LOG.info("Starting async task reinitialization with checkpointed OM metadata manager due to: {}", (Object)event.getReason());
                boolean isRunSuccessful = this.reInitializeTasks(checkpointedOMMetadataManager, null);
                if (!isRunSuccessful) {
                    this.handleEventFailure();
                    LOG.error("Task reinitialization failed, tasksFailed flag set to true");
                } else {
                    this.resetRetryCounters();
                    LOG.info("Completed async task reinitialization");
                }
            } else {
                LOG.error("Checkpointed OM metadata manager is null, cannot perform reinitialization");
                return;
            }
            LOG.info("Completed processing reinitialization event: {}", (Object)event.getReason());
        }
        catch (Exception e) {
            LOG.error("Error processing reinitialization event", (Throwable)e);
        }
        finally {
            this.cleanupCheckpoint(checkpointedOMMetadataManager);
        }
    }

    public void resetEventFlags() {
        this.resetEventBufferOverflowFlag();
        this.resetTasksFailureFlag();
    }

    @Override
    public int getEventBufferSize() {
        return this.eventBuffer.getQueueSize();
    }

    @VisibleForTesting
    public long getDroppedBatches() {
        return this.eventBuffer.getDroppedBatches();
    }

    @VisibleForTesting
    void resetRetryCounters() {
        this.eventProcessRetryCount.set(0);
        this.lastRetryTimestamp.set(0L);
    }

    @VisibleForTesting
    int getEventProcessRetryCount() {
        return this.eventProcessRetryCount.get();
    }

    @VisibleForTesting
    AtomicBoolean getTasksFailedFlag() {
        return this.tasksFailed;
    }

    private void cleanupPreExistingCheckpoints() {
        try {
            if (this.currentOMMetadataManager == null) {
                LOG.debug("No current OM metadata manager, skipping pre-existing checkpoint cleanup");
                return;
            }
            File dbLocation = this.currentOMMetadataManager.getStore().getDbLocation();
            if (dbLocation == null || dbLocation.getParent() == null) {
                LOG.debug("Cannot determine checkpoint base directory, skipping pre-existing checkpoint cleanup");
                return;
            }
            String baseDirectory = dbLocation.getParent();
            File baseDir = new File(baseDirectory);
            if (!baseDir.exists() || !baseDir.isDirectory()) {
                LOG.debug("Base directory {} does not exist, skipping pre-existing checkpoint cleanup", (Object)baseDirectory);
                return;
            }
            File[] checkpointDirs = baseDir.listFiles((dir, name) -> name.startsWith("temp-recon-reinit-checkpoint"));
            if (checkpointDirs != null && checkpointDirs.length > 0) {
                LOG.info("Found {} pre-existing checkpoint directories to clean up", (Object)checkpointDirs.length);
                for (File checkpointDir : checkpointDirs) {
                    try {
                        if (!checkpointDir.exists() || !checkpointDir.isDirectory()) continue;
                        FileUtils.deleteDirectory((File)checkpointDir);
                        LOG.info("Cleaned up pre-existing checkpoint directory: {}", (Object)checkpointDir);
                    }
                    catch (IOException e) {
                        LOG.warn("Failed to clean up pre-existing checkpoint directory: {}", (Object)checkpointDir, (Object)e);
                    }
                }
            } else {
                LOG.debug("No pre-existing checkpoint directories found");
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to cleanup pre-existing checkpoint directories", (Throwable)e);
        }
    }

    private void cleanupCheckpoint(ReconOMMetadataManager checkpointedManager) {
        if (checkpointedManager == null) {
            return;
        }
        try {
            File checkpointLocation = null;
            try {
                if (checkpointedManager.getStore() != null && checkpointedManager.getStore().getDbLocation() != null) {
                    checkpointLocation = checkpointedManager.getStore().getDbLocation().getParentFile();
                }
            }
            catch (Exception e) {
                LOG.warn("Failed to get checkpoint location for cleanup", (Throwable)e);
            }
            checkpointedManager.stop();
            LOG.debug("Closed checkpointed OM metadata manager database connections");
            if (checkpointLocation != null && checkpointLocation.exists()) {
                try {
                    FileUtils.deleteDirectory((File)checkpointLocation);
                    LOG.debug("Cleaned up checkpoint directory: {}", (Object)checkpointLocation);
                }
                catch (IOException e) {
                    LOG.warn("Failed to cleanup checkpoint directory: {}", (Object)checkpointLocation, (Object)e);
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to cleanup checkpointed OM metadata manager", (Throwable)e);
        }
    }

    @VisibleForTesting
    public OMUpdateEventBuffer getEventBuffer() {
        return this.eventBuffer;
    }
}

