/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportTaskManager;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.ManifestFileReader;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

public class ExportScheduler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class);
    private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60000;
    private static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMinutes(10L);
    private static final int DEFAULT_MAX_CLOSE_COUNT = 36;
    private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 300000;
    private static final int DEFAULT_CHECK_STATUS_INTERVAL_MILLS = 30000;
    private static final String COMPLETED_STATUS = "Completed";
    private static final String FAILED_STATUS = "Failed";
    static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess";
    static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure";
    static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal";
    static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
    private final PluginMetrics pluginMetrics;
    private final EnhancedSourceCoordinator enhancedSourceCoordinator;
    private final DynamoDbClient dynamoDBClient;
    private final ExecutorService executor;
    private final ManifestFileReader manifestFileReader;
    private final ExportTaskManager exportTaskManager;
    private final Counter exportJobSuccessCounter;
    private final Counter exportJobFailureCounter;
    private final Counter exportS3ObjectsTotalCounter;
    private final Counter exportRecordsTotalCounter;

    public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbClient dynamoDBClient, ManifestFileReader manifestFileReader, PluginMetrics pluginMetrics, DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) {
        this.enhancedSourceCoordinator = enhancedSourceCoordinator;
        this.dynamoDBClient = dynamoDBClient;
        this.pluginMetrics = pluginMetrics;
        this.exportTaskManager = new ExportTaskManager(dynamoDBClient, dynamoDBSourceAggregateMetrics);
        this.manifestFileReader = manifestFileReader;
        this.executor = Executors.newCachedThreadPool();
        this.exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT);
        this.exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT);
        this.exportS3ObjectsTotalCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT);
        this.exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
    }

    @Override
    public void run() {
        LOG.debug("Start running Export Scheduler");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Optional sourcePartition = this.enhancedSourceCoordinator.acquireAvailablePartition("EXPORT");
                if (sourcePartition.isPresent()) {
                    ExportPartition exportPartition = (ExportPartition)((Object)sourcePartition.get());
                    LOG.debug("Acquired an export partition: " + exportPartition.getPartitionKey());
                    String exportArn = this.getOrCreateExportArn(exportPartition);
                    if (exportArn == null) {
                        this.closeExportPartitionWithError(exportPartition);
                    } else {
                        CompletableFuture<String> checkStatus = CompletableFuture.supplyAsync(() -> this.checkExportStatus(exportPartition), this.executor);
                        checkStatus.whenComplete((BiConsumer)this.completeExport(exportPartition));
                    }
                }
                try {
                    Thread.sleep(60000L);
                    continue;
                }
                catch (InterruptedException e) {
                    LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
                }
            }
            catch (Exception e) {
                LOG.error("Received an exception during export from DynamoDB to S3, backing off and retrying", (Throwable)e);
                try {
                    Thread.sleep(60000L);
                    continue;
                }
                catch (InterruptedException ex) {
                    LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
                }
            }
            break;
        }
        LOG.warn("Export scheduler interrupted, looks like shutdown has triggered");
        this.executor.shutdownNow();
    }

    private BiConsumer<String, Throwable> completeExport(ExportPartition exportPartition) {
        return (status, ex) -> {
            if (ex != null) {
                LOG.warn("Check export status for {} failed with error {}", (Object)exportPartition.getPartitionKey(), (Object)ex.getMessage());
                this.enhancedSourceCoordinator.giveUpPartition((EnhancedSourcePartition)exportPartition);
            } else {
                LOG.debug("Check export status completed successfully");
                if (!"COMPLETED".equals(status)) {
                    this.closeExportPartitionWithError(exportPartition);
                    return;
                }
                LOG.debug("Start reading the manifest files");
                ExportProgressState state = exportPartition.getProgressState().get();
                String bucketName = state.getBucket();
                String exportArn = state.getExportArn();
                String manifestKey = this.exportTaskManager.getExportManifest(exportArn);
                LOG.debug("Export manifest summary file is " + manifestKey);
                ExportSummary summaryInfo = this.manifestFileReader.parseSummaryFile(bucketName, manifestKey);
                Instant exportTime = Instant.parse(summaryInfo.getExportTime());
                Map<String, Integer> dataFileInfo = this.manifestFileReader.parseDataFile(summaryInfo.getS3Bucket(), summaryInfo.getManifestFilesS3Key());
                this.createDataFilePartitions(exportArn, exportTime, bucketName, dataFileInfo);
                this.completeExportPartition(exportPartition);
            }
        };
    }

    private void createDataFilePartitions(String exportArn, Instant exportTime, String bucketName, Map<String, Integer> dataFileInfo) {
        LOG.info("Total of {} data files generated for export {}", (Object)dataFileInfo.size(), (Object)exportArn);
        AtomicInteger totalRecords = new AtomicInteger();
        AtomicInteger totalFiles = new AtomicInteger();
        dataFileInfo.forEach((key, size) -> {
            DataFileProgressState progressState = new DataFileProgressState();
            progressState.setTotal((int)size);
            progressState.setLoaded(0);
            progressState.setStartTime(exportTime.toEpochMilli());
            totalFiles.addAndGet(1);
            totalRecords.addAndGet((int)size);
            DataFilePartition partition = new DataFilePartition(exportArn, bucketName, (String)key, Optional.of(progressState));
            this.enhancedSourceCoordinator.createPartition((EnhancedSourcePartition)partition);
        });
        this.exportS3ObjectsTotalCounter.increment((double)totalFiles.get());
        this.exportRecordsTotalCounter.increment((double)totalRecords.get());
        LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0, totalRecords.get(), 0L);
        this.enhancedSourceCoordinator.createPartition((EnhancedSourcePartition)new GlobalState(exportArn, Optional.of(loadStatus.toMap())));
    }

    private void closeExportPartitionWithError(ExportPartition exportPartition) {
        LOG.error("The export from DynamoDb to S3 failed, it will be retried");
        this.exportJobFailureCounter.increment();
        ExportProgressState exportProgressState = exportPartition.getProgressState().get();
        exportProgressState.setExportArn(null);
        exportProgressState.setStatus(FAILED_STATUS);
        this.enhancedSourceCoordinator.closePartition((EnhancedSourcePartition)exportPartition, DEFAULT_CLOSE_DURATION, 36);
    }

    private void completeExportPartition(ExportPartition exportPartition) {
        this.exportJobSuccessCounter.increment();
        ExportProgressState state = exportPartition.getProgressState().get();
        state.setStatus(COMPLETED_STATUS);
        this.enhancedSourceCoordinator.completePartition((EnhancedSourcePartition)exportPartition);
    }

    private String checkExportStatus(ExportPartition exportPartition) {
        long lastCheckpointTime = System.currentTimeMillis();
        String exportArn = exportPartition.getProgressState().get().getExportArn();
        LOG.debug("Start Checking the status of export " + exportArn);
        while (true) {
            String status;
            if (System.currentTimeMillis() - lastCheckpointTime > 300000L) {
                this.enhancedSourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)exportPartition, null);
                lastCheckpointTime = System.currentTimeMillis();
            }
            if (!"IN_PROGRESS".equals(status = this.exportTaskManager.checkExportStatus(exportArn))) {
                LOG.info("Export {} is completed with final status {}", (Object)exportArn, (Object)status);
                return status;
            }
            LOG.debug("Export {} is still running in progress, sleep and recheck later", (Object)exportArn);
            try {
                Thread.sleep(30000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private String getOrCreateExportArn(ExportPartition exportPartition) {
        ExportProgressState state = exportPartition.getProgressState().get();
        if (state.getExportArn() != null) {
            LOG.info("Export Job has already submitted for table {} with export time {}", (Object)exportPartition.getTableArn(), (Object)exportPartition.getExportTime());
            return state.getExportArn();
        }
        LOG.info("Submitting a new export job for table {} with export time {}", (Object)exportPartition.getTableArn(), (Object)exportPartition.getExportTime());
        String exportArn = this.exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), state.getKmsKeyId(), exportPartition.getExportTime());
        if (exportArn != null) {
            LOG.info("Export arn is " + exportArn);
            state.setExportArn(exportArn);
            this.enhancedSourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)exportPartition, null);
        }
        return exportArn;
    }
}

