/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kafka.producer;

import java.util.Objects;
import java.util.Properties;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter;
import org.opensearch.dataprepper.plugins.kafka.common.PlaintextKafkaDataConfig;
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaProducerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.RestUtils;
import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaCustomProducerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumerFactory.class);
    private final SerializationFactory serializationFactory;
    private final AwsCredentialsSupplier awsCredentialsSupplier;
    private final TopicServiceFactory topicServiceFactory;

    public KafkaCustomProducerFactory(SerializationFactory serializationFactory, AwsCredentialsSupplier awsCredentialsSupplier, TopicServiceFactory topicServiceFactory) {
        this.serializationFactory = serializationFactory;
        this.awsCredentialsSupplier = awsCredentialsSupplier;
        this.topicServiceFactory = topicServiceFactory;
    }

    public KafkaCustomProducer createProducer(KafkaProducerConfig kafkaProducerConfig, ExpressionEvaluator expressionEvaluator, SinkContext sinkContext, PluginMetrics pluginMetrics, DLQSink dlqSink, boolean topicNameInMetrics) {
        return this.createProducer(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics, CompressionOption.NONE);
    }

    public KafkaCustomProducer createProducer(KafkaProducerConfig kafkaProducerConfig, ExpressionEvaluator expressionEvaluator, SinkContext sinkContext, PluginMetrics pluginMetrics, DLQSink dlqSink, boolean topicNameInMetrics, CompressionOption manualCompressionConfig) {
        int producerMaxRequestSize;
        AwsContext awsContext = new AwsContext(kafkaProducerConfig, this.awsCredentialsSupplier);
        KeyFactory keyFactory = new KeyFactory(awsContext);
        Integer maxRequestSize = null;
        KafkaProducerProperties producerProperties = kafkaProducerConfig.getKafkaProducerProperties();
        if (producerProperties != null && (producerMaxRequestSize = producerProperties.getMaxRequestSize()) > 0x100000) {
            maxRequestSize = producerMaxRequestSize;
        }
        this.prepareTopicAndSchema(kafkaProducerConfig, maxRequestSize);
        Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaProducerConfig);
        properties = Objects.requireNonNull(properties);
        KafkaSecurityConfigurer.setAuthProperties(properties, kafkaProducerConfig, LOG);
        TopicProducerConfig topic = kafkaProducerConfig.getTopic();
        KafkaDataConfigAdapter dataConfig = new KafkaDataConfigAdapter(keyFactory, topic);
        Serializer<?> keyDeserializer = this.serializationFactory.getSerializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig));
        Serializer<?> valueSerializer = this.serializationFactory.getSerializer(dataConfig);
        KafkaProducer producer = new KafkaProducer(properties, keyDeserializer, valueSerializer);
        KafkaTopicProducerMetrics topicMetrics = new KafkaTopicProducerMetrics(topic.getName(), pluginMetrics, topicNameInMetrics);
        KafkaProducerMetrics.registerProducer(pluginMetrics, producer);
        String topicName = ObjectUtils.isEmpty((Object)kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName();
        SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build();
        return new KafkaCustomProducer(producer, kafkaProducerConfig, dlqSink, expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService, manualCompressionConfig);
    }

    private void prepareTopicAndSchema(KafkaProducerConfig kafkaProducerConfig, Integer maxRequestSize) {
        this.checkTopicCreationCriteriaAndCreateTopic(kafkaProducerConfig, maxRequestSize);
        SchemaConfig schemaConfig = kafkaProducerConfig.getSchemaConfig();
        if (schemaConfig != null && schemaConfig.isCreate().booleanValue()) {
            RestUtils restUtils = new RestUtils(schemaConfig);
            String topic = kafkaProducerConfig.getTopic().getName();
            SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getRegisterationAndCompatibilityService(topic, kafkaProducerConfig.getSerdeFormat(), restUtils, schemaConfig).build();
            schemaService.registerSchema(topic);
        }
    }

    private void checkTopicCreationCriteriaAndCreateTopic(KafkaProducerConfig kafkaProducerConfig, Integer maxRequestSize) {
        TopicProducerConfig topic = kafkaProducerConfig.getTopic();
        if (topic.isCreateTopic()) {
            TopicService topicService = this.topicServiceFactory.createTopicService(kafkaProducerConfig);
            Long maxMessageBytes = null;
            if (maxRequestSize != null) {
                maxMessageBytes = (long)maxRequestSize;
            }
            topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor(), maxMessageBytes);
            topicService.closeAdminClient();
        }
    }
}

