/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.mongo.leader;

import com.google.common.base.Preconditions;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.S3FolderPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderScheduler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class);
    public static final String EXPORT_PREFIX = "EXPORT-";
    static final int DEFAULT_EXTEND_LEASE_MINUTES = 3;
    private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1L);
    private final MongoDBSourceConfig sourceConfig;
    private final EnhancedSourceCoordinator coordinator;
    private final String s3PathPrefix;
    private final Duration leaseInterval;
    private LeaderPartition leaderPartition;

    public LeaderScheduler(EnhancedSourceCoordinator coordinator, MongoDBSourceConfig sourceConfig, String s3PathPrefix) {
        this(coordinator, sourceConfig, s3PathPrefix, DEFAULT_LEASE_INTERVAL);
    }

    LeaderScheduler(EnhancedSourceCoordinator coordinator, MongoDBSourceConfig sourceConfig, String s3PathPrefix, Duration leaseInterval) {
        this.sourceConfig = sourceConfig;
        this.coordinator = coordinator;
        Preconditions.checkArgument((boolean)Objects.nonNull(s3PathPrefix), (Object)"S3 path prefix must not be null");
        this.s3PathPrefix = s3PathPrefix;
        this.leaseInterval = leaseInterval;
    }

    @Override
    public void run() {
        LOG.info("Starting Leader Scheduler for initialization and stream discovery");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                LeaderProgressState leaderProgressState;
                if (this.leaderPartition == null) {
                    Optional sourcePartition = this.coordinator.acquireAvailablePartition("LEADER");
                    LOG.info("Leader partition {}", (Object)sourcePartition);
                    if (sourcePartition.isPresent()) {
                        LOG.info("Running as a LEADER node");
                        this.leaderPartition = (LeaderPartition)((Object)sourcePartition.get());
                    }
                }
                if (this.leaderPartition == null || (leaderProgressState = this.leaderPartition.getProgressState().get()).isInitialized()) continue;
                LOG.info("The service is not been initialized");
                this.init();
            }
            catch (Exception e) {
                LOG.error("Exception occurred in primary leader scheduling loop", (Throwable)e);
            }
            finally {
                if (this.leaderPartition != null) {
                    this.coordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.leaderPartition, Duration.ofMinutes(3L));
                }
                try {
                    Thread.sleep(this.leaseInterval.toMillis());
                }
                catch (InterruptedException e) {
                    LOG.info("InterruptedException occurred");
                    break;
                }
            }
        }
        LOG.warn("Quitting Leader Scheduler");
        if (this.leaderPartition != null) {
            this.coordinator.giveUpPartition((EnhancedSourcePartition)this.leaderPartition);
        }
    }

    private void init() {
        LOG.info("Try to initialize DocumentDB Leader Partition");
        this.sourceConfig.getCollections().forEach(collectionConfig -> {
            this.coordinator.createPartition((EnhancedSourcePartition)new GlobalState(collectionConfig.getCollection(), null));
            Instant startTime = Instant.now();
            boolean exportRequired = collectionConfig.isExport();
            LOG.info("Ingestion mode export {} and stream {} for Collection {}", new Object[]{collectionConfig.isExport(), collectionConfig.isStream(), collectionConfig.getCollection()});
            if (exportRequired) {
                this.createExportPartition((CollectionConfig)collectionConfig, startTime);
                this.createExportGlobalState((CollectionConfig)collectionConfig);
            }
            String s3Prefix = this.s3PathPrefix + collectionConfig.getCollection();
            this.createS3Partition(this.sourceConfig.getS3Bucket(), this.sourceConfig.getS3Region(), s3Prefix, (CollectionConfig)collectionConfig);
            if (collectionConfig.isStream()) {
                this.createStreamPartition((CollectionConfig)collectionConfig, startTime, exportRequired);
            }
        });
        LOG.debug("Update initialization state");
        LeaderProgressState leaderProgressState = this.leaderPartition.getProgressState().get();
        leaderProgressState.setInitialized(true);
    }

    private void createS3Partition(String s3Bucket, String s3Region, String s3PathPrefix, CollectionConfig collectionConfig) {
        LOG.info("Creating s3 folder global partition: {}", (Object)collectionConfig.getCollection());
        this.coordinator.createPartition((EnhancedSourcePartition)new S3FolderPartition(s3Bucket, s3PathPrefix, s3Region, collectionConfig.getCollection(), collectionConfig.getPartitionCount()));
    }

    private void createStreamPartition(CollectionConfig collectionConfig, Instant streamTime, boolean waitForExport) {
        LOG.info("Creating stream global partition: {}", (Object)collectionConfig.getCollection());
        StreamProgressState streamProgressState = new StreamProgressState();
        streamProgressState.setWaitForExport(waitForExport);
        streamProgressState.setStartTime(streamTime.toEpochMilli());
        streamProgressState.setLastUpdateTimestamp(Instant.now().toEpochMilli());
        this.coordinator.createPartition((EnhancedSourcePartition)new StreamPartition(collectionConfig.getCollection(), streamProgressState));
    }

    private void createExportPartition(CollectionConfig collectionConfig, Instant exportTime) {
        LOG.info("Creating export global partition for collection: {}", (Object)collectionConfig.getCollection());
        ExportProgressState exportProgressState = new ExportProgressState();
        exportProgressState.setCollectionName(collectionConfig.getCollectionName());
        exportProgressState.setDatabaseName(collectionConfig.getDatabaseName());
        exportProgressState.setExportTime(exportTime.toString());
        ExportPartition exportPartition = new ExportPartition(collectionConfig.getCollection(), collectionConfig.getExportBatchSize(), exportTime, exportProgressState);
        this.coordinator.createPartition((EnhancedSourcePartition)exportPartition);
    }

    private void createExportGlobalState(CollectionConfig collectionConfig) {
        ExportLoadStatus exportLoadStatus = new ExportLoadStatus(0L, 0L, 0L, Instant.now().toEpochMilli(), false);
        this.coordinator.createPartition((EnhancedSourcePartition)new GlobalState(EXPORT_PREFIX + collectionConfig.getCollection(), exportLoadStatus.toMap()));
    }
}

