/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds;

import java.util.Objects;
import java.util.function.Function;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination;
import org.opensearch.dataprepper.plugins.source.rds.ClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.RdsService;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.coordination.PartitionFactory;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="rds", pluginType=Source.class, pluginConfigurationType=RdsSourceConfig.class)
public class RdsSource
implements Source<Record<Event>>,
UsesEnhancedSourceCoordination {
    private static final Logger LOG = LoggerFactory.getLogger(RdsSource.class);
    private final ClientFactory clientFactory;
    private final PluginMetrics pluginMetrics;
    private final RdsSourceConfig sourceConfig;
    private final EventFactory eventFactory;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final PluginConfigObservable pluginConfigObservable;
    private final PipelineDescription pipelineDescription;
    private EnhancedSourceCoordinator sourceCoordinator;
    private RdsService rdsService;

    @DataPrepperPluginConstructor
    public RdsSource(PluginMetrics pluginMetrics, RdsSourceConfig sourceConfig, EventFactory eventFactory, AwsCredentialsSupplier awsCredentialsSupplier, AcknowledgementSetManager acknowledgementSetManager, PluginConfigObservable pluginConfigObservable, PipelineDescription pipelineDescription) {
        this.pluginMetrics = pluginMetrics;
        this.sourceConfig = sourceConfig;
        this.eventFactory = eventFactory;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.pluginConfigObservable = pluginConfigObservable;
        this.pipelineDescription = pipelineDescription;
        this.clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig);
    }

    public void start(Buffer<Record<Event>> buffer) {
        LOG.info("Starting RDS source");
        Objects.requireNonNull(this.sourceCoordinator);
        this.sourceCoordinator.createPartition((EnhancedSourcePartition)new LeaderPartition());
        this.rdsService = new RdsService(this.sourceCoordinator, this.sourceConfig, this.eventFactory, this.clientFactory, this.pluginMetrics, this.acknowledgementSetManager, this.pluginConfigObservable, this.pipelineDescription);
        LOG.info("Start RDS service");
        this.rdsService.start(buffer);
    }

    public void stop() {
        LOG.info("Stop RDS service");
        if (Objects.nonNull(this.rdsService)) {
            this.rdsService.shutdown();
        }
    }

    public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) {
        this.sourceCoordinator = sourceCoordinator;
        this.sourceCoordinator.initialize();
    }

    public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
        return new PartitionFactory();
    }

    public boolean areAcknowledgementsEnabled() {
        return this.sourceConfig.isAcknowledgmentsEnabled();
    }
}

