/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.export;

import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader;
import org.opensearch.dataprepper.plugins.source.rds.export.S3ObjectReader;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.model.LoadStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;

public class DataFileScheduler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DataFileScheduler.class);
    private final AtomicInteger numOfWorkers = new AtomicInteger(0);
    private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2000;
    private static final Duration DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT = Duration.ofMinutes(30L);
    static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed";
    static final String EXPORT_S3_OBJECTS_ERROR_COUNT = "exportS3ObjectsErrors";
    static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers";
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final ExecutorService executor;
    private final RdsSourceConfig sourceConfig;
    private final S3ObjectReader objectReader;
    private final InputCodec codec;
    private final ExportRecordConverter recordConverter;
    private final Buffer<Record<Event>> buffer;
    private final PluginMetrics pluginMetrics;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final Counter exportFileSuccessCounter;
    private final Counter exportFileErrorCounter;
    private final AtomicInteger activeExportS3ObjectConsumersGauge;
    private volatile boolean shutdownRequested = false;

    public DataFileScheduler(EnhancedSourceCoordinator sourceCoordinator, RdsSourceConfig sourceConfig, String s3Prefix, S3Client s3Client, EventFactory eventFactory, Buffer<Record<Event>> buffer, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager) {
        this.sourceCoordinator = sourceCoordinator;
        this.sourceConfig = sourceConfig;
        this.codec = new ParquetInputCodec(eventFactory);
        this.objectReader = new S3ObjectReader(s3Client);
        this.recordConverter = new ExportRecordConverter(s3Prefix, sourceConfig.getPartitionCount());
        this.executor = Executors.newFixedThreadPool(1);
        this.buffer = buffer;
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT);
        this.exportFileErrorCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_ERROR_COUNT);
        this.activeExportS3ObjectConsumersGauge = (AtomicInteger)pluginMetrics.gauge(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE, (Object)this.numOfWorkers, AtomicInteger::get);
    }

    @Override
    public void run() {
        LOG.debug("Starting Data File Scheduler to process S3 data files for export");
        while (!this.shutdownRequested && !Thread.currentThread().isInterrupted()) {
            try {
                Optional sourcePartition;
                if (this.numOfWorkers.get() < 1 && (sourcePartition = this.sourceCoordinator.acquireAvailablePartition("DATAFILE")).isPresent()) {
                    LOG.debug("Acquired data file partition");
                    DataFilePartition dataFilePartition = (DataFilePartition)((Object)sourcePartition.get());
                    LOG.debug("Start processing data file partition");
                    this.processDataFilePartition(dataFilePartition);
                }
                try {
                    Thread.sleep(2000L);
                    continue;
                }
                catch (InterruptedException e) {
                    LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing");
                }
            }
            catch (Exception e) {
                LOG.error("Received an exception while processing an S3 data file, backing off and retrying", (Throwable)e);
                try {
                    Thread.sleep(2000L);
                    continue;
                }
                catch (InterruptedException ex) {
                    LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing");
                }
            }
            break;
        }
        LOG.warn("Data file scheduler is interrupted, stopping all data file loaders...");
        this.executor.shutdown();
    }

    public void shutdown() {
        this.shutdownRequested = true;
    }

    private void processDataFilePartition(DataFilePartition dataFilePartition) {
        boolean isAcknowledgmentsEnabled = this.sourceConfig.isAcknowledgmentsEnabled();
        AcknowledgementSet acknowledgementSet = null;
        if (this.sourceConfig.isAcknowledgmentsEnabled()) {
            acknowledgementSet = this.acknowledgementSetManager.create(result -> {
                if (result.booleanValue()) {
                    this.completeDataLoader(dataFilePartition).accept(null, null);
                    LOG.info("Received acknowledgment of completion from sink for data file {}", (Object)dataFilePartition.getKey());
                } else {
                    this.exportFileErrorCounter.increment();
                    LOG.warn("Negative acknowledgment received for data file {}, retrying", (Object)dataFilePartition.getKey());
                    this.sourceCoordinator.giveUpPartition((EnhancedSourcePartition)dataFilePartition);
                }
            }, this.sourceConfig.getDataFileAcknowledgmentTimeout());
        }
        DataFileLoader loader = DataFileLoader.create(dataFilePartition, this.codec, this.buffer, this.objectReader, this.recordConverter, this.pluginMetrics, this.sourceCoordinator, acknowledgementSet, this.sourceConfig.getDataFileAcknowledgmentTimeout(), this.getDBTableMetadata());
        CompletableFuture<Void> runLoader = CompletableFuture.runAsync(loader, this.executor);
        if (isAcknowledgmentsEnabled) {
            runLoader.whenComplete((v, ex) -> {
                if (ex != null) {
                    this.exportFileErrorCounter.increment();
                    LOG.error("There was an exception while processing an S3 data file: {}", ex);
                    this.sourceCoordinator.giveUpPartition((EnhancedSourcePartition)dataFilePartition);
                }
                this.numOfWorkers.decrementAndGet();
            });
        } else {
            runLoader.whenComplete((BiConsumer)this.completeDataLoader(dataFilePartition));
        }
        this.numOfWorkers.incrementAndGet();
    }

    private void updateLoadStatus(String exportTaskId, Duration timeout) {
        Instant endTime = Instant.now().plus(timeout);
        while (Instant.now().isBefore(endTime)) {
            Optional globalStatePartition = this.sourceCoordinator.getPartition(exportTaskId);
            if (globalStatePartition.isEmpty()) {
                LOG.error("Failed to get data file load status for {}", (Object)exportTaskId);
                return;
            }
            GlobalState globalState = (GlobalState)((Object)globalStatePartition.get());
            LoadStatus loadStatus = LoadStatus.fromMap(globalState.getProgressState().get());
            loadStatus.setLoadedFiles(loadStatus.getLoadedFiles() + 1);
            LOG.info("Current data file load status: total {} loaded {}", (Object)loadStatus.getTotalFiles(), (Object)loadStatus.getLoadedFiles());
            globalState.setProgressState(loadStatus.toMap());
            try {
                this.sourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)globalState, null);
                if (!this.sourceConfig.isStreamEnabled() || loadStatus.getLoadedFiles() != loadStatus.getTotalFiles()) break;
                LOG.info("All exports are done, streaming can continue...");
                this.sourceCoordinator.createPartition((EnhancedSourcePartition)new GlobalState("stream-for-" + this.sourceConfig.getDbIdentifier(), null));
                break;
            }
            catch (Exception e) {
                LOG.error("Failed to update the global status, looks like the status was out of date, will retry..");
            }
        }
    }

    private BiConsumer<Void, Throwable> completeDataLoader(DataFilePartition dataFilePartition) {
        return (v, ex) -> {
            if (ex == null) {
                this.exportFileSuccessCounter.increment();
                this.updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT);
                this.sourceCoordinator.completePartition((EnhancedSourcePartition)dataFilePartition);
            } else {
                this.exportFileErrorCounter.increment();
                LOG.error("There was an exception while processing an S3 data file", ex);
                this.sourceCoordinator.giveUpPartition((EnhancedSourcePartition)dataFilePartition);
            }
            this.numOfWorkers.decrementAndGet();
        };
    }

    private DbTableMetadata getDBTableMetadata() {
        Optional globalStatePartition = this.sourceCoordinator.getPartition(this.sourceConfig.getDbIdentifier());
        GlobalState globalState = (GlobalState)((Object)globalStatePartition.get());
        return DbTableMetadata.fromMap(globalState.getProgressState().get());
    }
}

