/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.mongo.documentdb;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.documentdb.MongoTasksRefresher;
import org.opensearch.dataprepper.plugins.mongo.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.mongo.s3partition.S3PartitionCreatorScheduler;
import org.opensearch.dataprepper.plugins.mongo.utils.DocumentDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DocumentDBService {
    private static final Logger LOG = LoggerFactory.getLogger(DocumentDBService.class);
    private static final String S3_PATH_DELIMITER = "/";
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final PluginMetrics pluginMetrics;
    private final MongoDBSourceConfig sourceConfig;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final PluginConfigObservable pluginConfigObservable;
    private final DocumentDBSourceAggregateMetrics documentDBAggregateMetrics;
    private ExecutorService leaderExecutor;
    private MongoTasksRefresher mongoTasksRefresher;

    public DocumentDBService(EnhancedSourceCoordinator sourceCoordinator, MongoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager, PluginConfigObservable pluginConfigObservable) {
        this.sourceCoordinator = sourceCoordinator;
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.sourceConfig = sourceConfig;
        this.pluginConfigObservable = pluginConfigObservable;
        this.documentDBAggregateMetrics = new DocumentDBSourceAggregateMetrics();
    }

    public void start(Buffer<Record<Event>> buffer) {
        ArrayList<Runnable> runnableList = new ArrayList<Runnable>();
        String s3PathPrefix = this.getS3PathPrefix();
        LeaderScheduler leaderScheduler = new LeaderScheduler(this.sourceCoordinator, this.sourceConfig, s3PathPrefix);
        runnableList.add(leaderScheduler);
        List<String> collections = this.sourceConfig.getCollections().stream().map(CollectionConfig::getCollection).collect(Collectors.toList());
        if (!collections.isEmpty()) {
            S3PartitionCreatorScheduler s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(this.sourceCoordinator, collections);
            runnableList.add(s3PartitionCreatorScheduler);
        }
        this.leaderExecutor = Executors.newFixedThreadPool(runnableList.size(), (ThreadFactory)BackgroundThreadFactory.defaultExecutorThreadFactory((String)"documentdb-source"));
        runnableList.forEach(this.leaderExecutor::submit);
        this.mongoTasksRefresher = new MongoTasksRefresher(buffer, this.sourceCoordinator, this.pluginMetrics, this.acknowledgementSetManager, numThread -> Executors.newFixedThreadPool(numThread, (ThreadFactory)BackgroundThreadFactory.defaultExecutorThreadFactory((String)"documentdb-source")), s3PathPrefix, this.documentDBAggregateMetrics);
        this.mongoTasksRefresher.initialize(this.sourceConfig);
        this.pluginConfigObservable.addPluginConfigObserver(pluginConfig -> this.mongoTasksRefresher.update((MongoDBSourceConfig)pluginConfig));
    }

    private String getS3PathPrefix() {
        Object s3UserPathPrefix = this.sourceConfig.getS3Prefix() != null && !this.sourceConfig.getS3Prefix().isBlank() ? this.sourceConfig.getS3Prefix() + S3_PATH_DELIMITER : "";
        Instant now = Instant.now();
        String s3PathPrefix = this.sourceCoordinator.getPartitionPrefix() != null ? (String)s3UserPathPrefix + this.sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + now.toEpochMilli() + S3_PATH_DELIMITER : (String)s3UserPathPrefix + now.toEpochMilli() + S3_PATH_DELIMITER;
        return s3PathPrefix;
    }

    public void shutdown() {
        if (this.leaderExecutor != null) {
            LOG.info("shutdown DocumentDB Service scheduler and worker");
            this.leaderExecutor.shutdownNow();
        }
        if (this.mongoTasksRefresher != null) {
            LOG.info("shutdown DocumentDB Task refresher");
            this.mongoTasksRefresher.shutdown();
        }
    }
}

