/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kafka.consumer;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter;
import org.opensearch.dataprepper.plugins.kafka.common.PlaintextKafkaDataConfig;
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaCustomConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumerFactory.class);
    private final StringDeserializer stringDeserializer = new StringDeserializer();
    private final SerializationFactory serializationFactory;
    private final AwsCredentialsSupplier awsCredentialsSupplier;
    private String schemaType = MessageFormat.PLAINTEXT.toString();

    public KafkaCustomConsumerFactory(SerializationFactory serializationFactory, AwsCredentialsSupplier awsCredentialsSupplier) {
        this.serializationFactory = serializationFactory;
        this.awsCredentialsSupplier = awsCredentialsSupplier;
    }

    public List<KafkaCustomConsumer> createConsumersForTopic(KafkaConsumerConfig kafkaConsumerConfig, TopicConsumerConfig topic, Buffer<Record<Event>> buffer, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager, ByteDecoder byteDecoder, AtomicBoolean shutdownInProgress, boolean topicNameInMetrics, CircuitBreaker circuitBreaker, CompressionOption compressionConfig) {
        Properties authProperties = new Properties();
        KafkaSecurityConfigurer.setAuthProperties(authProperties, kafkaConsumerConfig, LOG);
        KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, topicNameInMetrics);
        Properties consumerProperties = this.getConsumerProperties(kafkaConsumerConfig, topic, authProperties);
        MessageFormat schema = MessageFormat.getByMessageFormatByName(this.schemaType);
        ArrayList<KafkaCustomConsumer> consumers = new ArrayList<KafkaCustomConsumer>();
        AwsContext awsContext = new AwsContext(kafkaConsumerConfig, this.awsCredentialsSupplier);
        KeyFactory keyFactory = new KeyFactory(awsContext);
        PauseConsumePredicate pauseConsumePredicate = PauseConsumePredicate.circuitBreakingPredicate(circuitBreaker);
        try {
            int numWorkers = topic.getWorkers();
            IntStream.range(0, numWorkers).forEach(index -> {
                KafkaDataConfigAdapter dataConfig = new KafkaDataConfigAdapter(keyFactory, topic);
                Deserializer<?> keyDeserializer = this.serializationFactory.getDeserializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig));
                Deserializer<?> valueDeserializer = null;
                if (schema == MessageFormat.PLAINTEXT) {
                    valueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(kafkaConsumerConfig, awsContext);
                }
                if (valueDeserializer == null) {
                    valueDeserializer = this.serializationFactory.getDeserializer(dataConfig);
                }
                KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties, keyDeserializer, valueDeserializer);
                consumers.add(new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, kafkaConsumerConfig, topic, this.schemaType, acknowledgementSetManager, byteDecoder, topicMetrics, pauseConsumePredicate, compressionConfig));
            });
        }
        catch (Exception e) {
            if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) {
                LOG.error("The Kafka broker is not available.");
            } else {
                LOG.error("Failed to setup the Kafka Source Plugin.", (Throwable)e);
            }
            throw new RuntimeException(e);
        }
        return consumers;
    }

    private Properties getConsumerProperties(KafkaConsumerConfig sourceConfig, TopicConsumerConfig topicConfig, Properties authProperties) {
        Properties properties = (Properties)authProperties.clone();
        if (StringUtils.isNotEmpty((CharSequence)sourceConfig.getClientDnsLookup())) {
            ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup());
            switch (dnsLookupType) {
                case USE_ALL_DNS_IPS: {
                    properties.put("client.dns.lookup", ClientDNSLookupType.USE_ALL_DNS_IPS.toString());
                    break;
                }
                case CANONICAL_BOOTSTRAP: {
                    properties.put("client.dns.lookup", ClientDNSLookupType.CANONICAL_BOOTSTRAP.toString());
                    break;
                }
                case DEFAULT: {
                    properties.put("client.dns.lookup", ClientDNSLookupType.DEFAULT.toString());
                }
            }
        }
        KafkaCustomConsumerFactory.setConsumerTopicProperties(properties, topicConfig, topicConfig.getGroupId());
        this.setSchemaRegistryProperties(sourceConfig, properties, topicConfig);
        LOG.debug(DataPrepperMarkers.SENSITIVE, "Starting consumer with the properties : {}", (Object)properties);
        return properties;
    }

    public static void setConsumerTopicProperties(Properties properties, TopicConsumerConfig topicConfig, String groupId) {
        properties.put("group.id", groupId);
        if (Objects.nonNull(topicConfig.getClientId())) {
            properties.put("client.id", topicConfig.getClientId());
        }
        properties.put("max.partition.fetch.bytes", (Object)((int)topicConfig.getMaxPartitionFetchBytes()));
        properties.put("retry.backoff.ms", (Object)Long.valueOf(topicConfig.getRetryBackoff().toMillis()).intValue());
        properties.put("reconnect.backoff.ms", (Object)Long.valueOf(topicConfig.getReconnectBackoff().toMillis()).intValue());
        properties.put("enable.auto.commit", topicConfig.getAutoCommit());
        properties.put("auto.commit.interval.ms", (Object)Long.valueOf(topicConfig.getCommitInterval().toMillis()).intValue());
        properties.put("auto.offset.reset", topicConfig.getAutoOffsetReset());
        properties.put("max.poll.records", topicConfig.getConsumerMaxPollRecords());
        properties.put("max.poll.interval.ms", (Object)Long.valueOf(topicConfig.getMaxPollInterval().toMillis()).intValue());
        properties.put("session.timeout.ms", (Object)Long.valueOf(topicConfig.getSessionTimeOut().toMillis()).intValue());
        properties.put("heartbeat.interval.ms", (Object)Long.valueOf(topicConfig.getHeartBeatInterval().toMillis()).intValue());
        properties.put("fetch.max.bytes", (Object)((int)topicConfig.getFetchMaxBytes()));
        properties.put("fetch.max.wait.ms", topicConfig.getFetchMaxWait());
        properties.put("fetch.min.bytes", (Object)((int)topicConfig.getFetchMinBytes()));
        properties.put("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
    }

    private void setSchemaRegistryProperties(KafkaConsumerConfig kafkaConsumerConfig, Properties properties, TopicConfig topicConfig) {
        SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig();
        if (Objects.isNull(schemaConfig)) {
            this.setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(properties, topicConfig);
            return;
        }
        if (schemaConfig.getType() == SchemaRegistryType.AWS_GLUE) {
            return;
        }
        if (schemaConfig.getType() == SchemaRegistryType.CONFLUENT) {
            this.setupConfluentSchemaRegistry(schemaConfig, kafkaConsumerConfig, properties, topicConfig);
        }
    }

    private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, TopicConfig topicConfig) {
        MessageFormat dataFormat = topicConfig.getSerdeFormat();
        this.schemaType = dataFormat.toString();
    }

    private void setPropertiesForSchemaRegistryConnectivity(KafkaConsumerConfig kafkaConsumerConfig, Properties properties) {
        AuthConfig authConfig = kafkaConsumerConfig.getAuthConfig();
        String schemaRegistryApiKey = kafkaConsumerConfig.getSchemaConfig().getSchemaRegistryApiKey();
        String schemaRegistryApiSecret = kafkaConsumerConfig.getSchemaConfig().getSchemaRegistryApiSecret();
        if ("USER_INFO".equalsIgnoreCase(kafkaConsumerConfig.getSchemaConfig().getBasicAuthCredentialsSource()) && authConfig.getSaslAuthConfig().getPlainTextAuthConfig() != null) {
            String schemaBasicAuthUserInfo = schemaRegistryApiKey.concat(":").concat(schemaRegistryApiSecret);
            properties.put("schema.registry.basic.auth.user.info", schemaBasicAuthUserInfo);
            properties.put("basic.auth.credentials.source", "USER_INFO");
        }
        if (authConfig != null && authConfig.getSaslAuthConfig() != null) {
            PlainTextAuthConfig plainTextAuthConfig = authConfig.getSaslAuthConfig().getPlainTextAuthConfig();
            OAuthConfig oAuthConfig = authConfig.getSaslAuthConfig().getOAuthConfig();
            if (oAuthConfig != null) {
                properties.put("sasl.mechanism", oAuthConfig.getOauthSaslMechanism());
                properties.put("security.protocol", oAuthConfig.getOauthSecurityProtocol());
            }
        }
    }

    private MessageFormat determineSchemaMessageFormat() {
        return null;
    }

    private void setPropertiesForSchemaType(KafkaConsumerConfig kafkaConsumerConfig, Properties properties, TopicConfig topic) {
        Properties prop;
        Properties propertyMap = prop = properties;
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("schema.registry.url", this.getSchemaRegistryUrl(kafkaConsumerConfig));
        properties.put("auto.register.schemas", (Object)false);
        CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(properties.getProperty("schema.registry.url"), 100, (Map)propertyMap);
        SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig();
        try {
            String subject = topic.getName() + "-value";
            this.schemaType = schemaConfig.getVersion() != null ? schemaRegistryClient.getSchemaMetadata(subject, kafkaConsumerConfig.getSchemaConfig().getVersion().intValue()).getSchemaType() : schemaRegistryClient.getLatestSchemaMetadata(subject).getSchemaType();
        }
        catch (RestClientException | IOException e) {
            LOG.error("Failed to connect to the schema registry...");
            throw new RuntimeException(e);
        }
        if (this.schemaType.equalsIgnoreCase(MessageFormat.JSON.toString())) {
            properties.put("value.deserializer", KafkaJsonDeserializer.class);
        } else if (this.schemaType.equalsIgnoreCase(MessageFormat.AVRO.toString())) {
            properties.put("value.deserializer", KafkaAvroDeserializer.class);
        } else {
            properties.put("value.deserializer", StringDeserializer.class);
        }
    }

    private String getSchemaRegistryUrl(KafkaConsumerConfig kafkaConsumerConfig) {
        return kafkaConsumerConfig.getSchemaConfig().getRegistryURL();
    }

    private void setupConfluentSchemaRegistry(SchemaConfig schemaConfig, KafkaConsumerConfig kafkaConsumerConfig, Properties properties, TopicConfig topicConfig) {
        if (!StringUtils.isNotEmpty((CharSequence)schemaConfig.getRegistryURL())) {
            throw new RuntimeException("RegistryURL must be specified for confluent schema registry");
        }
        this.setPropertiesForSchemaRegistryConnectivity(kafkaConsumerConfig, properties);
        this.setPropertiesForSchemaType(kafkaConsumerConfig, properties, topicConfig);
    }
}

