/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.sink.s3;

import java.time.Duration;
import java.util.Collection;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.CompressionEngine;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.ClientFactory;
import org.opensearch.dataprepper.plugins.sink.s3.ExtensionProvider;
import org.opensearch.dataprepper.plugins.sink.s3.KeyGenerator;
import org.opensearch.dataprepper.plugins.sink.s3.S3BucketSelector;
import org.opensearch.dataprepper.plugins.sink.s3.S3OutputCodecContext;
import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig;
import org.opensearch.dataprepper.plugins.sink.s3.S3SinkService;
import org.opensearch.dataprepper.plugins.sink.s3.StandardExtensionProvider;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CodecBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec;
import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.ConfigBucketOwnerProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@DataPrepperPlugin(name="s3", pluginType=Sink.class, pluginConfigurationType=S3SinkConfig.class)
public class S3Sink
extends AbstractSink<Record<Event>> {
    private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
    private static final Duration RETRY_FLUSH_BACKOFF = Duration.ofSeconds(5L);
    private final S3SinkConfig s3SinkConfig;
    private volatile boolean sinkInitialized;
    private final S3SinkService s3SinkService;
    private final BufferFactory bufferFactory;
    private final SinkContext sinkContext;

    @DataPrepperPluginConstructor
    public S3Sink(PluginSetting pluginSetting, S3SinkConfig s3SinkConfig, PluginFactory pluginFactory, SinkContext sinkContext, AwsCredentialsSupplier awsCredentialsSupplier, ExpressionEvaluator expressionEvaluator) {
        super(pluginSetting);
        String bucketName;
        this.s3SinkConfig = s3SinkConfig;
        this.sinkContext = sinkContext;
        PluginModel codecConfiguration = s3SinkConfig.getCodec();
        CodecFactory codecFactory = new CodecFactory(pluginFactory, codecConfiguration);
        ConfigBucketOwnerProviderFactory configBucketOwnerProviderFactory = new ConfigBucketOwnerProviderFactory();
        BucketOwnerProvider bucketOwnerProvider = configBucketOwnerProviderFactory.createBucketOwnerProvider(s3SinkConfig);
        PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
        OutputCodec testCodec = (OutputCodec)pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings, new Object[0]);
        this.sinkInitialized = Boolean.FALSE;
        S3AsyncClient s3Client = ClientFactory.createS3AsyncClient(s3SinkConfig, awsCredentialsSupplier);
        BufferFactory innerBufferFactory = s3SinkConfig.getBufferType().getBufferFactory();
        if (testCodec instanceof ParquetOutputCodec && s3SinkConfig.getBufferType() != BufferTypeOptions.INMEMORY) {
            throw new InvalidPluginConfigurationException("The Parquet sink codec is an in_memory buffer only.");
        }
        if (testCodec instanceof BufferedCodec) {
            innerBufferFactory = new CodecBufferFactory(innerBufferFactory, (BufferedCodec)testCodec);
        }
        CompressionOption compressionOption = s3SinkConfig.getCompression();
        CompressionEngine compressionEngine = compressionOption.getCompressionEngine();
        this.bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, testCodec);
        ExtensionProvider extensionProvider = StandardExtensionProvider.create(testCodec, compressionOption);
        S3BucketSelector s3BucketSelector = null;
        if (s3SinkConfig.getBucketSelector() != null) {
            s3BucketSelector = this.loadS3BucketSelector(pluginFactory);
            s3BucketSelector.initialize(s3SinkConfig);
            bucketName = s3BucketSelector.getBucketName();
        } else {
            bucketName = s3SinkConfig.getBucketName();
        }
        KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, s3BucketSelector, extensionProvider, expressionEvaluator);
        if (s3SinkConfig.getObjectKeyOptions().getPathPrefix() != null && !expressionEvaluator.isValidFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).booleanValue()) {
            throw new InvalidPluginConfigurationException("path_prefix is not a valid format expression");
        }
        if (s3SinkConfig.getObjectKeyOptions().getNamePattern() != null && !expressionEvaluator.isValidFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern()).booleanValue()) {
            throw new InvalidPluginConfigurationException("name_pattern is not a valid format expression");
        }
        if (bucketName != null && !expressionEvaluator.isValidFormatExpression(bucketName).booleanValue()) {
            throw new InvalidPluginConfigurationException("bucket name is not a valid format expression");
        }
        S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext((SinkContext)sinkContext), compressionOption);
        testCodec.validateAgainstCodecContext((OutputCodecContext)s3OutputCodecContext);
        S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig, s3BucketSelector);
        S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, this.bufferFactory, codecFactory, s3Client, bucketOwnerProvider);
        this.s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, RETRY_FLUSH_BACKOFF, this.pluginMetrics, s3GroupManager);
    }

    private S3BucketSelector loadS3BucketSelector(PluginFactory pluginFactory) {
        PluginModel modeConfiguration = this.s3SinkConfig.getBucketSelector();
        PluginSetting modePluginSetting = new PluginSetting(modeConfiguration.getPluginName(), modeConfiguration.getPluginSettings());
        return (S3BucketSelector)pluginFactory.loadPlugin(S3BucketSelector.class, modePluginSetting, new Object[0]);
    }

    public boolean isReady() {
        return this.sinkInitialized;
    }

    public void doInitialize() {
        try {
            this.doInitializeInternal();
        }
        catch (InvalidPluginConfigurationException e) {
            LOG.error("Invalid plugin configuration, Hence failed to initialize s3-sink plugin.");
            this.shutdown();
            throw e;
        }
        catch (Exception e) {
            LOG.error("Failed to initialize s3-sink plugin.");
            this.shutdown();
            throw e;
        }
    }

    private void doInitializeInternal() {
        this.sinkInitialized = Boolean.TRUE;
    }

    public void doOutput(Collection<Record<Event>> records) {
        this.s3SinkService.output(records);
    }
}

